Thursday, 12 April 2012

RX Frequency Cheat Sheet

Ok, so you’re down with observables and you get the use of where and select statements to restrict and project your observables/events.  Now you want to filter on a time frequency.

Here’s a quick way to visually plan out your pipeline with RX.

Basic Subscribe

The basics, you want to know about every single event:

image

var observable = Enumerable.Range(1, 1000000).ToObservable();
using (observable.Subscribe(Console.WriteLine))

Sample


You want to get the latest event every X seconds.


image

var observable = Enumerable.Range(1, 1000000).ToObservable();
using (observable
.Sample(TimeSpan.FromSeconds(1))
.Subscribe(Console.WriteLine))

Regulate (custom extension method)


Slow down the events to a manageable speed without losing any events.

image

Use the extension method provided by John Rayner


http://sharpfellows.com/post/Rx-Controlling-frequency-of-events.aspx

var observable = Enumerable.Range(1, 1000000).ToObservable();
using (observable
.Regulate(TimeSpan.FromSeconds(1))
.Subscribe(Console.WriteLine))

Buffer


Receive all events every X seconds.


image

IObservable<long> observable = Observable.Interval(TimeSpan.FromMilliseconds(250));
using (observable
.Buffer(TimeSpan.FromSeconds(1))
.Subscribe(x => { foreach (var item in x) Console.WriteLine(item); }))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}

 


Here’s a few other useful tricks:


Merge


Merge two event streams into one.


image

IObservable<int> observable1 = (new[] { 1, 3, 5, 7, 9 }).ToObservable();
IObservable<int> observable2 = (new[] { 2, 4, 6, 8, 10 }).ToObservable();

using (observable1
.Merge(observable2)
.Subscribe(Console.WriteLine))

Or like this:


image


ConCat


Concatenates two streams.


image

IObservable<int> observable1 = (new[] { 1, 3, 5, 7, 9 }).ToObservable();
IObservable<int> observable2 = (new[] { 2, 4, 6, 8, 10 }).ToObservable();

using (observable1
.Concat(observable2)
.Subscribe(Console.WriteLine))

Zip


From two streams, takes an event from each, applies a selector logic and returns an output.


image


IObservable<int> observable1 = (new[] {1, 3, 5, 7, 9}).ToObservable();
IObservable<int> observable2 = (new[] {2, 4, 6, 8, 10}).ToObservable();

using (observable1
.Zip(observable2, (x, y) => x > y ? x : y)
.Subscribe(Console.WriteLine))

Distinct Until Changed


Only receive events that are different from the previous event.


image

IObservable<int> observable = (new[] { 1, 1, 1, 2, 2, 3, 4, 4 })
.ToObservable();
using (observable
.DistinctUntilChanged()
.Subscribe(Console.WriteLine))

Interval


Fires the event every specified duration.


image

IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(1));

using (observable.Subscribe(Console.WriteLine))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}

Delay


Delays the start of the event stream, preserving the interval between events.

image
IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(0.5));

using (observable
.Delay(TimeSpan.FromSeconds(1))
.Subscribe(Console.WriteLine))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}

TimeInterval


Wraps the event, exposing the event as it’s value and an Interval property.  The interval is the amount of time since the previous event.


image

var observable = Observable.Interval(TimeSpan.FromMilliseconds(750)).TimeInterval();

using (observable.Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Interval)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}

Timestamp


Wraps the event, exposing the event as it’s value and a timestamp property.


image

var observable = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp();

using (observable.Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}

Throttle


Holds back on receiving events until the throttle duration has elapsed between two events.


image

var dictionarySuggest = userInput
.Throttle(TimeSpan.FromMilliseconds(250))
.SelectMany(input => serverCall(input));

2 comments:

  1. Great job! :)
    An RX cheat sheet was exactly what I was looking for.

    ReplyDelete
  2. Thanks!, great on visualizing the stream concept.

    ReplyDelete