Showing posts with label reactive extensions. Show all posts
Showing posts with label reactive extensions. Show all posts

Sunday, 29 June 2014

DetectStale within a stream

The DetectStale<T> extension method within the Reactive Trader RX example application detects when a stream has stopped producing values and injects an IsStale indicator to allow the UI to react.

The idea is that while the stream is steadily providing values all is good.  As soon as a value hasn’t been received within a set period of time, a new value is sent down the stream to say “this steam has gone stale”.

First here is an example of it being used within a test:

image
The DetectStale internally uses a timer to know whether it should be sending in effect an null/empty stale value. Here are another couple of tests for the DetectStale extension method:

Here is the DetectStale code itself: You can the source subscription on line 42 calling the normal/expected OnNext method with the value.  After that they call the scheduleStale method containing the timer with Stale / null value.

Thursday, 12 April 2012

RX Frequency Cheat Sheet

Ok, so you’re down with observables and you get the use of where and select statements to restrict and project your observables/events.  Now you want to filter on a time frequency.

Here’s a quick way to visually plan out your pipeline with RX.

Basic Subscribe

The basics, you want to know about every single event:

image

var observable = Enumerable.Range(1, 1000000).ToObservable();
using (observable.Subscribe(Console.WriteLine))

Sample


You want to get the latest event every X seconds.


image

var observable = Enumerable.Range(1, 1000000).ToObservable();
using (observable
.Sample(TimeSpan.FromSeconds(1))
.Subscribe(Console.WriteLine))

Regulate (custom extension method)


Slow down the events to a manageable speed without losing any events.

image

Use the extension method provided by John Rayner


http://sharpfellows.com/post/Rx-Controlling-frequency-of-events.aspx

var observable = Enumerable.Range(1, 1000000).ToObservable();
using (observable
.Regulate(TimeSpan.FromSeconds(1))
.Subscribe(Console.WriteLine))

Buffer


Receive all events every X seconds.


image

IObservable<long> observable = Observable.Interval(TimeSpan.FromMilliseconds(250));
using (observable
.Buffer(TimeSpan.FromSeconds(1))
.Subscribe(x => { foreach (var item in x) Console.WriteLine(item); }))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}

 


Here’s a few other useful tricks:


Merge


Merge two event streams into one.


image

IObservable<int> observable1 = (new[] { 1, 3, 5, 7, 9 }).ToObservable();
IObservable<int> observable2 = (new[] { 2, 4, 6, 8, 10 }).ToObservable();

using (observable1
.Merge(observable2)
.Subscribe(Console.WriteLine))

Or like this:


image


ConCat


Concatenates two streams.


image

IObservable<int> observable1 = (new[] { 1, 3, 5, 7, 9 }).ToObservable();
IObservable<int> observable2 = (new[] { 2, 4, 6, 8, 10 }).ToObservable();

using (observable1
.Concat(observable2)
.Subscribe(Console.WriteLine))

Zip


From two streams, takes an event from each, applies a selector logic and returns an output.


image


IObservable<int> observable1 = (new[] {1, 3, 5, 7, 9}).ToObservable();
IObservable<int> observable2 = (new[] {2, 4, 6, 8, 10}).ToObservable();

using (observable1
.Zip(observable2, (x, y) => x > y ? x : y)
.Subscribe(Console.WriteLine))

Distinct Until Changed


Only receive events that are different from the previous event.


image

IObservable<int> observable = (new[] { 1, 1, 1, 2, 2, 3, 4, 4 })
.ToObservable();
using (observable
.DistinctUntilChanged()
.Subscribe(Console.WriteLine))

Interval


Fires the event every specified duration.


image

IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(1));

using (observable.Subscribe(Console.WriteLine))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}

Delay


Delays the start of the event stream, preserving the interval between events.

image
IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(0.5));

using (observable
.Delay(TimeSpan.FromSeconds(1))
.Subscribe(Console.WriteLine))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}

TimeInterval


Wraps the event, exposing the event as it’s value and an Interval property.  The interval is the amount of time since the previous event.


image

var observable = Observable.Interval(TimeSpan.FromMilliseconds(750)).TimeInterval();

using (observable.Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Interval)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}

Timestamp


Wraps the event, exposing the event as it’s value and a timestamp property.


image

var observable = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp();

using (observable.Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}

Throttle


Holds back on receiving events until the throttle duration has elapsed between two events.


image

var dictionarySuggest = userInput
.Throttle(TimeSpan.FromMilliseconds(250))
.SelectMany(input => serverCall(input));

Tuesday, 14 February 2012

Reactive Extensions (Rx)

Overview

MSDN Definition

“Rx is a library to compose asynchronous and event-based programs using observable collections and LINQ-style query operators.”

Rx provides extension methods on top of Linq to enable developers to implement the observer pattern on query results (IEnumerables e.g. a collection or sequence).

The benefits of Rx are in it’s enabling of concurrency and it’s composability.  It allows for easy concurrent processing over a collection along with simple declarative call backs on completion and exceptions.  This programming model allows for simple orchestration of async operations.

To get familiar with Rx, in the examples below we’ll look at simple enumerations of data to get a feel for the programming style.  We can then take a look at Rx for events, with events after all just being a sequence of data (found in the EventArgs).

How To Get Reactive Extensions

Use Nuget, search for Rx and bring in the Reactive.Main library (System.Reactive namespace).

rx-Nuget

Simple example

1.Convert the linq query into an observable (using the Rx extension method).

2. Write a delegate which will execute for:

  • Each item in the enumeration (OnNext method in the examples below)
  • When an error occurs (OnError method in the examples below)
  • When the enumeration completes (OnCompleted method in the examples below)

3. Subscribe to the query results.  This is whether you hook up each of your delegates and start the enumeration.

rx-Simple

“Finally” delegate

There is another delegate we can set, that runs after the processing has completed, regardless of whether the enumeration successfully got the end of the sequence or not e.g. An error occurred half way through processing so the completed delegate doesn’t run.  The Finally delegate will always run which might be useful for clean up.

rx-FinallyDelegate

How to Configure Threading

rx-HowToConfigureThreads

Notice how all the "getting" and "processing" are sequential and in keeping with the enumeration (within each group) even though the processing might be occurring on different threads.

Early Cancellation of Processing

You can cancel the processing of a collection, here is an example:

rx-EarlyCancellation

“Using” A Disposable Object

This example code disposes of an object after the completed delegate has run.

rx-UsingDisposableObjects

A real world example may look like this:

Demo Source Code

You can download the demo here:  http://stevenhollidge.com/blog-source-code/RxExtensionsDemo.zip