Showing posts with label Tasks. Show all posts
Showing posts with label Tasks. Show all posts

Monday, 23 June 2014

From Task to Rx to Streaming data

Here is a simple example of a ViewModel calling a LoadCustomers service:

public async Task LoadCustomers()
{
IsLoading = true;
Customers.Clear();
var customers = await services.GetAllCustomers();
Customers.AddRange(customers);
IsLoading = false;
}
Note: I’m using the PRISM extension method that allows for adding a range to an observable collection, giving less notifications.  The error handling has been removed to focus on the code.

Here’s the equivalent for Rx:
public async Task LoadCustomers()
{
IsLoading = true;
Customers.Clear();
services.GetAllCustomers().ToObservable()
.SubscribeOn(NewThreadScheduler.Default)
.ObserveOnDispatcher()
.Subscribe(
customers => Customers.AddRange(customers),
() => IsLoading = false);
}

The SubscribeOn extension method is ensuring the GetAllCustomers call is being made on a separate thread to keep the UI as free as possible.  Once the response comes back, the ObserveOnDispatcher ensures the UI thread runs our Customers.AddRange code along with the IsLoading = false code which is running on the Observable OnCompleted event.

So any benefits from this?  Not really.  But imagine our GetAllCustomers wasn’t just a call to a web endpoint.

First we’ll remove the ToObservable() extension method and code up the observable ourselves so we understand what’s actually going on under the hood:

public async Task LoadCustomers()
{
IsLoading = true;
Customers.Clear();

var source = Observable.Create<IEnumerable<Customer>>(
async o =>
{
var response = await services.GetAllCustomers();
o.OnNext(response);
o.OnCompleted();
return Disposable.Empty;
});

source
.SubscribeOn(NewThreadScheduler.Default)
.ObserveOnDispatcher()
.Subscribe(
customers => Customers.AddRange(customers),
() => IsLoading = false);
}

Now if we start to require more work being done to retrieve data than just a web call, like perhaps converting DTOs to UI models, this can be done within the observable code.  Remember this work is not being done on the UI thread so our app is nice and responsive.

Now imagine our services returned observables rather than DTOs or UI models:

public void LoadCustomers()
{
services.GetAllCustomers()
.SubscribeOn(NewThreadScheduler.Default)
.ObserveOnDispatcher()
.Subscribe(customers => Customers.AddRange(customers));
}

We now have a viewmodel that is dealing with observables and doesn’t care whether that data is coming from a web service call, file I/O or a Nirvana (Universal) endpoint that is streaming data.

Saturday, 7 December 2013

Using TPL to avoid callback hell

In this example, we fire off three service calls which individually have a continuation of converting the resultant DTO to a model object.

We then call a fourth service, to get the Spot which only fires once all the previous three service calls have completed and created their model object.

var tasks = new Task[]
{
services.GetAsTask<DTOs.MoneyMarketRate>()
.ContinueWith(t => MoneyMarketRate = Mapper.Map<DTOs.MoneyMarketRate, MoneyMarketRate>(t.Result)),

services.GetAsTask<DTOs.InvestmentBoundaries>()
.ContinueWith(t => InvestmentBoundaries = Mapper.Map<DTOs.InvestmentBoundaries, InvestmentBoundaries>(t.Result)),

services.GetAsTask<DTOs.TradingDate>()
.ContinueWith(t => TradingDate = Mapper.Map<DTOs.TradingDate, TradingDate>(t.Result)),
};

Task.Factory.ContinueWhenAll(
tasks,
ts => services.GetAsTask<DTOs.Spot>()
.ContinueWith(t => Spot = Mapper.Map<DTOs.Spot, Spot>(t.Result)));
You can take a look at my Silverlight source project here:  https://github.com/stevenh77/UsingTasksToAvoidCallbackHell
The problem with this code is that the exceptions are not properly handled.  For that we can leverage async and await:
try 
{
var tasks = new []
{
services.GetAsync<DTOs.MoneyMarketRate, MoneyMarketRate>(x => MoneyMarketRate = x),
services.GetAsync<DTOs.InvestmentBoundaries, InvestmentBoundaries>(x => InvestmentBoundaries = x),
services.GetAsync<DTOs.TradingDate, TradingDate>(x => TradingDate = x),
};

await TaskEx.WhenAll(tasks);
await services.GetAsync<DTOs.Spot, Spot>(x => Spot = x);
}
catch (Exception e)
{
MessageBox.Show(e.Message);
}

I also refactored the service executor to use async and await and move the setting to the property to an action that can be passed in.
public class ServiceExecutor
{
const string BaseAddress = "http://localhost:8080/Services/";

public async Task GetAsync<TDto, TModel>(Action<TModel> actionToSetPropertyValue)
{
var client = new WebClient();
var serviceName = typeof(TDto).Name + ".ashx";
var response = await client.DownloadStringTaskAsync(new Uri(BaseAddress + serviceName, UriKind.Absolute));
var dto = JsonConvert.DeserializeObject<TDto>(response);
actionToSetPropertyValue.Invoke(Mapper.Map<TDto, TModel>(dto));
}
}

Refactored source:  https://github.com/stevenh77/UsingTasksToAvoidCallbackHellWithAsyncAndWait/

Saturday, 30 June 2012

Async Task.Delay

Internal code for Task.Delay (.NET 4.5)

public static Task Delay(int millisecondsDelay, CancellationToken cancellationToken)
{
if (millisecondsDelay < -1)
{
throw new ArgumentOutOfRangeException("millisecondsDelay", Environment.GetResourceString("Task_Delay_InvalidMillisecondsDelay"));
}
if (cancellationToken.IsCancellationRequested)
{
return FromCancellation(cancellationToken);
}
if (millisecondsDelay == 0)
{
return CompletedTask;
}
DelayPromise state = new DelayPromise(cancellationToken);
if (cancellationToken.CanBeCanceled)
{
state.Registration = cancellationToken.InternalRegisterWithoutEC(delegate (object state) {
((DelayPromise) state).Complete();
}, state);
}
if (millisecondsDelay != -1)
{
state.Timer = new Timer(delegate (object state) {
((DelayPromise) state).Complete();
}, state, millisecondsDelay, -1);
state.Timer.KeepRootedWhileScheduled();
}
return state;
}

The following examples were taken from samples within LinqPad 4.


How to implement Task.Delay in 4.0

/* You can write Task-based asynchronous methods by utilizing a TaskCompletionSource.
A TaskCompletionSource gives you a 'slave' Task that you can manually signal.
Calling SetResult() signals the task as complete, and any continuations kick off. */

void Main()
{
for (int i = 0; i < 10000; i++)
{
Task task = Delay (2000);
task.ContinueWith (_ => "Done".Dump());
}
}

Task Delay (int milliseconds) // Asynchronous NON-BLOCKING method
{
var tcs = new TaskCompletionSource<object>();
new Timer (_ => tcs.SetResult (null)).Change (milliseconds, -1);
return tcs.Task;
}

How NOT to implement Task.Delay 4.0

/* Instead of using a TaskCompletionSource, you can get (seemingly) the same result by
calling Task.Factory.StartNew. This method runs a delegate on a pooled thread,
which is fine for compute-bound work. However, it's not great for I/O-bound work
because you tie up a thread, blocking for the duration of the operation! */

void Main()
{
for (int i = 0; i < 10000; i++)
{
Task task = Delay (2000);
task.ContinueWith (_ => "Done".Dump());
}
}

Task Delay (int milliseconds) // Asynchronous non-blocking wrapper....
{
return Task.Factory.StartNew (() =>
{
Thread.Sleep (2000); // ... around a BLOCKING method!
});
}

// This approach is correct for COMPUTE-BOUND operations.

Returning a Value

// There's also a generic subclass of Task called Task<TResult>. This has a Result
// property which stores a RETURN VALUE of the concurrent operation.

void Main()
{
Task<int> task = GetAnswerToLifeUniverseAndEverything();
task.ContinueWith (_ => task.Result.Dump());
}

Task<int> GetAnswerToLifeUniverseAndEverything () // We're now returning a Task of int
{
var tcs = new TaskCompletionSource<int>(); // Call SetResult with a int instead:
new Timer (_ => tcs.SetResult (42)).Change (3000, -1);
return tcs.Task;
}

// This is great, because most methods return a value of some sort!
//
// You can think of Task<int> as 'an int in the future'.

Dealing with Exceptions

// Tasks also have an Exception property for storing the ERROR should a concurrent 
// operation fail. Calling SetException on a TaskCompletionSource signals as complete
// while populating its Exception property instead of the Result property.

void Main()
{
Task<int> task = GetAnswerToLifeUniverseAndEverything();
task.ContinueWith (_ => task.Exception.InnerException.Dump());
}

Task<int> GetAnswerToLifeUniverseAndEverything()
{
var tcs = new TaskCompletionSource<int>();
new Timer (_ => tcs.SetException (new Exception ("You're not going to like the answer!"))).Change (2000, -1);
return tcs.Task;
}

Example Usage


// Kick off a download operation in the background:
Task<string> task = new WebClient().DownloadStringTaskAsync (new Uri ("http://www.albahari.com/threading"));

// Call the .ContinueWith method to tell a task to do something when it's finished.
task.ContinueWith (_ =>
{
if (task.Exception != null)
task.Exception.InnerException.Dump();
else
{
string html = task.Result;
html.Dump();
}
});

Saturday, 18 February 2012

TryExecute with timeout and maxAttempts

A nice piece of code for calling services with a timeout and maximum attempts, in case the service should cause an exception.

It follows the standard practise for "Try" in .Net, like TryParse, of returning a boolean to indicate success and the result as an out parameter. Should you wish to find out the exception that may have occurred this is also passed as an out parameter.

I've also included an Execute mthod that will raise an exception should the service request timeout or fail more than the maximum number of attempts,

The source code solution includes unit tests in MBUnit, NUnit, xUnit and MSTest:

http://stevenhollidge.com/blog-source-code/TryExecute.zip

Monday, 23 May 2011

Common Tasks using the TPL

Want a quick way to remember how to spin up tasks in .Net 4.0?  Here’s a few examples:

Thursday, 19 May 2011

Multi threaded apps with Tasks

Here is an example of running multiple methods simultaneously:

ScreenShot026

The Task Parallel Library will decide whether new threads are required, and spin them up from the thread pool as required.

Chain of Command pattern with Tasks

You can create a new thread and that can run a series of sequential methods, with the result of each passed onto it’s successor using Tasks and the ContinueWith keyword. 

The example below uses a fluent interface to chain each method together:

image

Tasks vs Threading

With the introduction of the Task Parallel Library (TPL) within .NET 4.0, creating a Task rather than a Thread is now the preferred way to create multithreaded applications.

Here is an example showing the creation of a thread using the old style Threading vs the new Tasks:

ScreenShot024

Why chose Task over Threading?

Behind the scenes, tasks are queued to the ThreadPool, which has been enhanced with algorithms (like hill-climbing) that determine and adjust to the number of threads that maximizes throughput. This makes tasks relatively lightweight, and you can create many of them to enable fine-grained parallelism. To complement this, widely-known work-stealing algorithms are employed to provide load-balancing.