Tuesday, 14 February 2012

Reactive Extensions (Rx)

Overview

MSDN Definition

“Rx is a library to compose asynchronous and event-based programs using observable collections and LINQ-style query operators.”

Rx provides extension methods on top of Linq to enable developers to implement the observer pattern on query results (IEnumerables e.g. a collection or sequence).

The benefits of Rx are in it’s enabling of concurrency and it’s composability.  It allows for easy concurrent processing over a collection along with simple declarative call backs on completion and exceptions.  This programming model allows for simple orchestration of async operations.

To get familiar with Rx, in the examples below we’ll look at simple enumerations of data to get a feel for the programming style.  We can then take a look at Rx for events, with events after all just being a sequence of data (found in the EventArgs).

How To Get Reactive Extensions

Use Nuget, search for Rx and bring in the Reactive.Main library (System.Reactive namespace).

rx-Nuget

Simple example

1.Convert the linq query into an observable (using the Rx extension method).

2. Write a delegate which will execute for:

  • Each item in the enumeration (OnNext method in the examples below)
  • When an error occurs (OnError method in the examples below)
  • When the enumeration completes (OnCompleted method in the examples below)

3. Subscribe to the query results.  This is whether you hook up each of your delegates and start the enumeration.

rx-Simple

“Finally” delegate

There is another delegate we can set, that runs after the processing has completed, regardless of whether the enumeration successfully got the end of the sequence or not e.g. An error occurred half way through processing so the completed delegate doesn’t run.  The Finally delegate will always run which might be useful for clean up.

rx-FinallyDelegate

How to Configure Threading

rx-HowToConfigureThreads

Notice how all the "getting" and "processing" are sequential and in keeping with the enumeration (within each group) even though the processing might be occurring on different threads.

Early Cancellation of Processing

You can cancel the processing of a collection, here is an example:

rx-EarlyCancellation

“Using” A Disposable Object

This example code disposes of an object after the completed delegate has run.

rx-UsingDisposableObjects

A real world example may look like this:

Demo Source Code

You can download the demo here:  http://stevenhollidge.com/blog-source-code/RxExtensionsDemo.zip

1 comment: