Monday, 23 June 2014

From Task to Rx to Streaming data

Here is a simple example of a ViewModel calling a LoadCustomers service:

public async Task LoadCustomers()
{
IsLoading = true;
Customers.Clear();
var customers = await services.GetAllCustomers();
Customers.AddRange(customers);
IsLoading = false;
}
Note: I’m using the PRISM extension method that allows for adding a range to an observable collection, giving less notifications.  The error handling has been removed to focus on the code.

Here’s the equivalent for Rx:
public async Task LoadCustomers()
{
IsLoading = true;
Customers.Clear();
services.GetAllCustomers().ToObservable()
.SubscribeOn(NewThreadScheduler.Default)
.ObserveOnDispatcher()
.Subscribe(
customers => Customers.AddRange(customers),
() => IsLoading = false);
}

The SubscribeOn extension method is ensuring the GetAllCustomers call is being made on a separate thread to keep the UI as free as possible.  Once the response comes back, the ObserveOnDispatcher ensures the UI thread runs our Customers.AddRange code along with the IsLoading = false code which is running on the Observable OnCompleted event.

So any benefits from this?  Not really.  But imagine our GetAllCustomers wasn’t just a call to a web endpoint.

First we’ll remove the ToObservable() extension method and code up the observable ourselves so we understand what’s actually going on under the hood:

public async Task LoadCustomers()
{
IsLoading = true;
Customers.Clear();

var source = Observable.Create<IEnumerable<Customer>>(
async o =>
{
var response = await services.GetAllCustomers();
o.OnNext(response);
o.OnCompleted();
return Disposable.Empty;
});

source
.SubscribeOn(NewThreadScheduler.Default)
.ObserveOnDispatcher()
.Subscribe(
customers => Customers.AddRange(customers),
() => IsLoading = false);
}

Now if we start to require more work being done to retrieve data than just a web call, like perhaps converting DTOs to UI models, this can be done within the observable code.  Remember this work is not being done on the UI thread so our app is nice and responsive.

Now imagine our services returned observables rather than DTOs or UI models:

public void LoadCustomers()
{
services.GetAllCustomers()
.SubscribeOn(NewThreadScheduler.Default)
.ObserveOnDispatcher()
.Subscribe(customers => Customers.AddRange(customers));
}

We now have a viewmodel that is dealing with observables and doesn’t care whether that data is coming from a web service call, file I/O or a Nirvana (Universal) endpoint that is streaming data.

No comments:

Post a Comment