Showing posts with label rx. Show all posts
Showing posts with label rx. Show all posts

Sunday, 29 June 2014

DetectStale within a stream

The DetectStale<T> extension method within the Reactive Trader RX example application detects when a stream has stopped producing values and injects an IsStale indicator to allow the UI to react.

The idea is that while the stream is steadily providing values all is good.  As soon as a value hasn’t been received within a set period of time, a new value is sent down the stream to say “this steam has gone stale”.

First here is an example of it being used within a test:

image
The DetectStale internally uses a timer to know whether it should be sending in effect an null/empty stale value. Here are another couple of tests for the DetectStale extension method:

Here is the DetectStale code itself: You can the source subscription on line 42 calling the normal/expected OnNext method with the value.  After that they call the scheduleStale method containing the timer with Stale / null value.

Monday, 23 June 2014

From Task to Rx to Streaming data

Here is a simple example of a ViewModel calling a LoadCustomers service:

public async Task LoadCustomers()
{
IsLoading = true;
Customers.Clear();
var customers = await services.GetAllCustomers();
Customers.AddRange(customers);
IsLoading = false;
}
Note: I’m using the PRISM extension method that allows for adding a range to an observable collection, giving less notifications.  The error handling has been removed to focus on the code.

Here’s the equivalent for Rx:
public async Task LoadCustomers()
{
IsLoading = true;
Customers.Clear();
services.GetAllCustomers().ToObservable()
.SubscribeOn(NewThreadScheduler.Default)
.ObserveOnDispatcher()
.Subscribe(
customers => Customers.AddRange(customers),
() => IsLoading = false);
}

The SubscribeOn extension method is ensuring the GetAllCustomers call is being made on a separate thread to keep the UI as free as possible.  Once the response comes back, the ObserveOnDispatcher ensures the UI thread runs our Customers.AddRange code along with the IsLoading = false code which is running on the Observable OnCompleted event.

So any benefits from this?  Not really.  But imagine our GetAllCustomers wasn’t just a call to a web endpoint.

First we’ll remove the ToObservable() extension method and code up the observable ourselves so we understand what’s actually going on under the hood:

public async Task LoadCustomers()
{
IsLoading = true;
Customers.Clear();

var source = Observable.Create<IEnumerable<Customer>>(
async o =>
{
var response = await services.GetAllCustomers();
o.OnNext(response);
o.OnCompleted();
return Disposable.Empty;
});

source
.SubscribeOn(NewThreadScheduler.Default)
.ObserveOnDispatcher()
.Subscribe(
customers => Customers.AddRange(customers),
() => IsLoading = false);
}

Now if we start to require more work being done to retrieve data than just a web call, like perhaps converting DTOs to UI models, this can be done within the observable code.  Remember this work is not being done on the UI thread so our app is nice and responsive.

Now imagine our services returned observables rather than DTOs or UI models:

public void LoadCustomers()
{
services.GetAllCustomers()
.SubscribeOn(NewThreadScheduler.Default)
.ObserveOnDispatcher()
.Subscribe(customers => Customers.AddRange(customers));
}

We now have a viewmodel that is dealing with observables and doesn’t care whether that data is coming from a web service call, file I/O or a Nirvana (Universal) endpoint that is streaming data.

Thursday, 12 April 2012

RX Frequency Cheat Sheet

Ok, so you’re down with observables and you get the use of where and select statements to restrict and project your observables/events.  Now you want to filter on a time frequency.

Here’s a quick way to visually plan out your pipeline with RX.

Basic Subscribe

The basics, you want to know about every single event:

image

var observable = Enumerable.Range(1, 1000000).ToObservable();
using (observable.Subscribe(Console.WriteLine))

Sample


You want to get the latest event every X seconds.


image

var observable = Enumerable.Range(1, 1000000).ToObservable();
using (observable
.Sample(TimeSpan.FromSeconds(1))
.Subscribe(Console.WriteLine))

Regulate (custom extension method)


Slow down the events to a manageable speed without losing any events.

image

Use the extension method provided by John Rayner


http://sharpfellows.com/post/Rx-Controlling-frequency-of-events.aspx

var observable = Enumerable.Range(1, 1000000).ToObservable();
using (observable
.Regulate(TimeSpan.FromSeconds(1))
.Subscribe(Console.WriteLine))

Buffer


Receive all events every X seconds.


image

IObservable<long> observable = Observable.Interval(TimeSpan.FromMilliseconds(250));
using (observable
.Buffer(TimeSpan.FromSeconds(1))
.Subscribe(x => { foreach (var item in x) Console.WriteLine(item); }))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}

 


Here’s a few other useful tricks:


Merge


Merge two event streams into one.


image

IObservable<int> observable1 = (new[] { 1, 3, 5, 7, 9 }).ToObservable();
IObservable<int> observable2 = (new[] { 2, 4, 6, 8, 10 }).ToObservable();

using (observable1
.Merge(observable2)
.Subscribe(Console.WriteLine))

Or like this:


image


ConCat


Concatenates two streams.


image

IObservable<int> observable1 = (new[] { 1, 3, 5, 7, 9 }).ToObservable();
IObservable<int> observable2 = (new[] { 2, 4, 6, 8, 10 }).ToObservable();

using (observable1
.Concat(observable2)
.Subscribe(Console.WriteLine))

Zip


From two streams, takes an event from each, applies a selector logic and returns an output.


image


IObservable<int> observable1 = (new[] {1, 3, 5, 7, 9}).ToObservable();
IObservable<int> observable2 = (new[] {2, 4, 6, 8, 10}).ToObservable();

using (observable1
.Zip(observable2, (x, y) => x > y ? x : y)
.Subscribe(Console.WriteLine))

Distinct Until Changed


Only receive events that are different from the previous event.


image

IObservable<int> observable = (new[] { 1, 1, 1, 2, 2, 3, 4, 4 })
.ToObservable();
using (observable
.DistinctUntilChanged()
.Subscribe(Console.WriteLine))

Interval


Fires the event every specified duration.


image

IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(1));

using (observable.Subscribe(Console.WriteLine))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}

Delay


Delays the start of the event stream, preserving the interval between events.

image
IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(0.5));

using (observable
.Delay(TimeSpan.FromSeconds(1))
.Subscribe(Console.WriteLine))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}

TimeInterval


Wraps the event, exposing the event as it’s value and an Interval property.  The interval is the amount of time since the previous event.


image

var observable = Observable.Interval(TimeSpan.FromMilliseconds(750)).TimeInterval();

using (observable.Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Interval)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}

Timestamp


Wraps the event, exposing the event as it’s value and a timestamp property.


image

var observable = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp();

using (observable.Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}

Throttle


Holds back on receiving events until the throttle duration has elapsed between two events.


image

var dictionarySuggest = userInput
.Throttle(TimeSpan.FromMilliseconds(250))
.SelectMany(input => serverCall(input));

Tuesday, 3 April 2012

WPF Metro Shell

Here is an example of a WPF application with pluggable components for core business functionality.

The solution features the following frameworks:

MVMMLight, MahApps.Metro, Ninject, ServiceLocator, Newtonsoft.Json, Rx Extensions

 

Projects

The main projects prefixed MetroWpf have a core shell window, login page and menu system.

MetroWpf-projects

Login

image

 

image

NOTE:

This is very much a work in progress but as I keep getting side tracked on other projects I thought I’d release what I have so far.

Source code:  http://stevenhollidge.com/blog-source-code/MetroWpf-CTP-v0.1.zip

HG repo:  http://hg.assembla.com/silverbladetech/file/4b8b38d17d24/MetroWpf

Friday, 17 February 2012

Rx Event Example

When subscribing to events Reactive Extensions (Rx) is a powerful way to filter, throttle and compose your .Net code. 

With this example I have a WPF application that listens to a FxRate pricing service, that’s pumping out 100 new prices a second via an event.

I’m going to make available the code to show how to hook up the front end Metro style application using MVVM with:

  • old skool event handlers
  • and then using Reactive Extensions (Rx)

Finally we update the Rx example to filter on just one of the exchange rates and also throttle back the rate of FxRates being received to one per second.

stockmarket-demo

The event signature

Old skool event handlers

Using MVVM, here we have my view model code stripped back just to show we have:

  • A pricing service
  • A subscribed command (data bound to the subscribed button on the UI) which toggles the subscribed property
  • A delegate (PriceUpdate) that handles the event when the subscribed button is clicked.  Click the button a second time to unsubscribe and remove the delegate from handling the event

Lines 25 and 30 show the adding and removal of the event handler, dependent on whether the user has clicked the subscribe button.

Rx Subscriptions

To do the same thing with Rx takes two lines of code (lines 18/19 and 21)

Filtering with Rx

Simply update our subscription to include a where filter that accepts the event coming in (e) and sets the predicate to Ccy (Currency) for the exchange to Euro to GBP.

Throttling with Rx

NOTE:  The code below was a workaround, please use Sample as shown here.

So we have filtered the events to only give us the Euro to GBP prices. Now let's enhance the solution just to give us the latest Euro to GBP price every two seconds.

Source code

Event handlers:  http://stevenhollidge.com/blog-source-code/standard-event-handler-wpf-metro-mvvm-stockmarket-pricing-app.zip

Reactive Extensions:  http://stevenhollidge.com/blog-source-code/Rx-wpf-metro-mvvm-stockmarket-pricing-app.zip

Tuesday, 14 February 2012

Reactive Extensions (Rx)

Overview

MSDN Definition

“Rx is a library to compose asynchronous and event-based programs using observable collections and LINQ-style query operators.”

Rx provides extension methods on top of Linq to enable developers to implement the observer pattern on query results (IEnumerables e.g. a collection or sequence).

The benefits of Rx are in it’s enabling of concurrency and it’s composability.  It allows for easy concurrent processing over a collection along with simple declarative call backs on completion and exceptions.  This programming model allows for simple orchestration of async operations.

To get familiar with Rx, in the examples below we’ll look at simple enumerations of data to get a feel for the programming style.  We can then take a look at Rx for events, with events after all just being a sequence of data (found in the EventArgs).

How To Get Reactive Extensions

Use Nuget, search for Rx and bring in the Reactive.Main library (System.Reactive namespace).

rx-Nuget

Simple example

1.Convert the linq query into an observable (using the Rx extension method).

2. Write a delegate which will execute for:

  • Each item in the enumeration (OnNext method in the examples below)
  • When an error occurs (OnError method in the examples below)
  • When the enumeration completes (OnCompleted method in the examples below)

3. Subscribe to the query results.  This is whether you hook up each of your delegates and start the enumeration.

rx-Simple

“Finally” delegate

There is another delegate we can set, that runs after the processing has completed, regardless of whether the enumeration successfully got the end of the sequence or not e.g. An error occurred half way through processing so the completed delegate doesn’t run.  The Finally delegate will always run which might be useful for clean up.

rx-FinallyDelegate

How to Configure Threading

rx-HowToConfigureThreads

Notice how all the "getting" and "processing" are sequential and in keeping with the enumeration (within each group) even though the processing might be occurring on different threads.

Early Cancellation of Processing

You can cancel the processing of a collection, here is an example:

rx-EarlyCancellation

“Using” A Disposable Object

This example code disposes of an object after the completed delegate has run.

rx-UsingDisposableObjects

A real world example may look like this:

Demo Source Code

You can download the demo here:  http://stevenhollidge.com/blog-source-code/RxExtensionsDemo.zip