Ok, so you’re down with observables and you get the use of where and select statements to restrict and project your observables/events. Now you want to filter on a time frequency.
Here’s a quick way to visually plan out your pipeline with RX.
Basic Subscribe
The basics, you want to know about every single event:
var observable = Enumerable.Range(1, 1000000).ToObservable();
using (observable.Subscribe(Console.WriteLine))
Sample
You want to get the latest event every X seconds.
var observable = Enumerable.Range(1, 1000000).ToObservable();
using (observable
.Sample(TimeSpan.FromSeconds(1))
.Subscribe(Console.WriteLine))
Regulate (custom extension method)
Slow down the events to a manageable speed without losing any events.
Use the extension method provided by John Rayner
http://sharpfellows.com/post/Rx-Controlling-frequency-of-events.aspx
var observable = Enumerable.Range(1, 1000000).ToObservable();
using (observable
.Regulate(TimeSpan.FromSeconds(1))
.Subscribe(Console.WriteLine))
Buffer
Receive all events every X seconds.
IObservable<long> observable = Observable.Interval(TimeSpan.FromMilliseconds(250));
using (observable
.Buffer(TimeSpan.FromSeconds(1))
.Subscribe(x => { foreach (var item in x) Console.WriteLine(item); }))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Here’s a few other useful tricks:
Merge
Merge two event streams into one.
IObservable<int> observable1 = (new[] { 1, 3, 5, 7, 9 }).ToObservable();
IObservable<int> observable2 = (new[] { 2, 4, 6, 8, 10 }).ToObservable();
using (observable1
.Merge(observable2)
.Subscribe(Console.WriteLine))
Or like this:
ConCat
Concatenates two streams.
IObservable<int> observable1 = (new[] { 1, 3, 5, 7, 9 }).ToObservable();
IObservable<int> observable2 = (new[] { 2, 4, 6, 8, 10 }).ToObservable();
using (observable1
.Concat(observable2)
.Subscribe(Console.WriteLine))
Zip
From two streams, takes an event from each, applies a selector logic and returns an output.
IObservable<int> observable1 = (new[] {1, 3, 5, 7, 9}).ToObservable();
IObservable<int> observable2 = (new[] {2, 4, 6, 8, 10}).ToObservable();
using (observable1
.Zip(observable2, (x, y) => x > y ? x : y)
.Subscribe(Console.WriteLine))
Distinct Until Changed
Only receive events that are different from the previous event.
IObservable<int> observable = (new[] { 1, 1, 1, 2, 2, 3, 4, 4 })
.ToObservable();
using (observable
.DistinctUntilChanged()
.Subscribe(Console.WriteLine))
Interval
Fires the event every specified duration.
IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(1));
using (observable.Subscribe(Console.WriteLine))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Delay
Delays the start of the event stream, preserving the interval between events.
IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(0.5));
using (observable
.Delay(TimeSpan.FromSeconds(1))
.Subscribe(Console.WriteLine))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
TimeInterval
Wraps the event, exposing the event as it’s value and an Interval property. The interval is the amount of time since the previous event.
var observable = Observable.Interval(TimeSpan.FromMilliseconds(750)).TimeInterval();
using (observable.Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Interval)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Timestamp
Wraps the event, exposing the event as it’s value and a timestamp property.
var observable = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp();
using (observable.Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Throttle
Holds back on receiving events until the throttle duration has elapsed between two events.
var dictionarySuggest = userInput
.Throttle(TimeSpan.FromMilliseconds(250))
.SelectMany(input => serverCall(input));
No comments:
Post a Comment