Showing posts with label async. Show all posts
Showing posts with label async. Show all posts

Saturday, 7 December 2013

Using TPL to avoid callback hell

In this example, we fire off three service calls which individually have a continuation of converting the resultant DTO to a model object.

We then call a fourth service, to get the Spot which only fires once all the previous three service calls have completed and created their model object.

var tasks = new Task[]
{
services.GetAsTask<DTOs.MoneyMarketRate>()
.ContinueWith(t => MoneyMarketRate = Mapper.Map<DTOs.MoneyMarketRate, MoneyMarketRate>(t.Result)),

services.GetAsTask<DTOs.InvestmentBoundaries>()
.ContinueWith(t => InvestmentBoundaries = Mapper.Map<DTOs.InvestmentBoundaries, InvestmentBoundaries>(t.Result)),

services.GetAsTask<DTOs.TradingDate>()
.ContinueWith(t => TradingDate = Mapper.Map<DTOs.TradingDate, TradingDate>(t.Result)),
};

Task.Factory.ContinueWhenAll(
tasks,
ts => services.GetAsTask<DTOs.Spot>()
.ContinueWith(t => Spot = Mapper.Map<DTOs.Spot, Spot>(t.Result)));
You can take a look at my Silverlight source project here:  https://github.com/stevenh77/UsingTasksToAvoidCallbackHell
The problem with this code is that the exceptions are not properly handled.  For that we can leverage async and await:
try 
{
var tasks = new []
{
services.GetAsync<DTOs.MoneyMarketRate, MoneyMarketRate>(x => MoneyMarketRate = x),
services.GetAsync<DTOs.InvestmentBoundaries, InvestmentBoundaries>(x => InvestmentBoundaries = x),
services.GetAsync<DTOs.TradingDate, TradingDate>(x => TradingDate = x),
};

await TaskEx.WhenAll(tasks);
await services.GetAsync<DTOs.Spot, Spot>(x => Spot = x);
}
catch (Exception e)
{
MessageBox.Show(e.Message);
}

I also refactored the service executor to use async and await and move the setting to the property to an action that can be passed in.
public class ServiceExecutor
{
const string BaseAddress = "http://localhost:8080/Services/";

public async Task GetAsync<TDto, TModel>(Action<TModel> actionToSetPropertyValue)
{
var client = new WebClient();
var serviceName = typeof(TDto).Name + ".ashx";
var response = await client.DownloadStringTaskAsync(new Uri(BaseAddress + serviceName, UriKind.Absolute));
var dto = JsonConvert.DeserializeObject<TDto>(response);
actionToSetPropertyValue.Invoke(Mapper.Map<TDto, TModel>(dto));
}
}

Refactored source:  https://github.com/stevenh77/UsingTasksToAvoidCallbackHellWithAsyncAndWait/

Friday, 19 July 2013

Sequential vs Parallel Service Requests

Sequential

image
public async Task<HttpResponseMessage> DownloadWithHttpClient()
{
var httpClient = new HttpClient();
var request = new HttpRequestMessage(HttpMethod.Get, "http://stevenhollidge.com/sample.json?" + DateTime.Now.Ticks);
var response = await httpClient.SendAsync(request);
return response;
}

private async void HttpClientGoButton_OnClick(object sender, RoutedEventArgs e)
{
for (int i = 0; i < 9; i++)
{
await DownloadWithHttpClient();
}
}

Parallel


image
public async Task<HttpResponseMessage> DownloadWithHttpClient()
{
var httpClient = new HttpClient();
var request = new HttpRequestMessage(HttpMethod.Get, "http://stevenhollidge.com/sample.json?" + DateTime.Now.Ticks);
var response = await httpClient.SendAsync(request);
return response;
}

private async void HttpClientGoButton_OnClick(object sender, RoutedEventArgs e)
{
for (int i = 0; i < 9; i++)
{
DownloadWithHttpClient();
}
}

Monday, 7 January 2013

Async WCF interface with T4

T4 template that takes a DLL, pulls out all the WCF interfaces and gives your client development team a proxy class to code against.

namespace ServerInterfaces
{
[ServiceContract]
public interface ILoginFacade
{
[OperationContract]
[WebInvoke(RequestFormat = WebMessageFormat.Json, ResponseFormat = WebMessageFormat.Json,
Method = "GET", UriTemplate = "/Login")]
GetLoginResponse GetLogin()

[OperationContract]
[WebInvoke(RequestFormat = WebMessageFormat.Json, ResponseFormat = WebMessageFormat.Json,
Method = "PUT", UriTemplate = "/Login")]
UpdateLoginResponse UpdateLogin(UpdateLoginRequest request);

[OperationContract]
[WebInvoke(RequestFormat = WebMessageFormat.Json, ResponseFormat = WebMessageFormat.Json,
Method = "GET", UriTemplate = "/Login/WriteToLog")]
void WriteToLog();
}

[ServiceContract]
public interface ITradeFacade
{
[OperationContract]
[WebInvoke(RequestFormat = WebMessageFormat.Json, ResponseFormat = WebMessageFormat.Json,
Method = "PUT", UriTemplate = "/Trades")]
int UpdateTradesDatabase(string userId);
}
}

Generated Client Proxy

   
/***************************************************************************************
**** GENERATED CODE: Use t4toolbox & tangiblet4editor to update AsyncInterfaces.tt ****
****************************************************************************************/
using System;
using Newtonsoft.Json;

namespace WcfServiceDirectory
{
public partial class Services
{
public Services()
{
this.LoginFacade = new LoginFacade();
this.TradeFacade = new TradeFacade();
}

public LoginFacade LoginFacade { get; set; }
public TradeFacade TradeFacade { get; set; }
}

public class LoginFacade
{
private IServiceExecutor serviceExecutor;

public LoginFacade() { this.serviceExecutor = new ServiceExecutor(); }

public void GetLogin(Action<CallCompleteEventArgs<DTOs.GetLoginResponse>> callback)
{
serviceExecutor.Get<DTOs.GetLoginResponse>("/Login", callback);
}

public void UpdateLogin(DTOs.UpdateLoginRequest request, Action<CallCompleteEventArgs<DTOs.UpdateLoginResponse>> callback)
{
serviceExecutor.Put<DTOs.UpdateLoginResponse>("/Login", JsonConvert.SerializeObject(request), callback);
}

public void WriteToLog(Action<CallCompleteEventArgs<Object>> callback)
{
serviceExecutor.Get<Object>("/Login/WriteToLog", callback);
}
}

public class TradeFacade
{
private IServiceExecutor serviceExecutor;

public TradeFacade() { this.serviceExecutor = new ServiceExecutor(); }

public void UpdateTradesDatabase(System.String userId, Action<CallCompleteEventArgs<System.Int32>> callback)
{
serviceExecutor.Put<System.Int32>("/Trades", JsonConvert.SerializeObject(userId), callback);
}
}
}

Core REST service execution code

namespace ClientProxy
{
public interface IServiceExecutor
{
void Get<TResponse>(string uriTemplate, Action<CallCompleteEventArgs<TResponse>> callback);
void Put<TResponse>(string uriTemplate, string request, Action<CallCompleteEventArgs<TResponse>> callback);
}

public class ServiceExecutor : IServiceExecutor
{
ServiceEnvironment serviceEnvironment = new ServiceEnvironment() { UseHttpS = false, BaseAddress = "localhost", Port = 52802 };

public void Get<TResponse>(string uriTemplate, Action<CallCompleteEventArgs<TResponse>> callback)
{
var client = new WebClient();
var address = GetUri(uriTemplate);
client.DownloadStringCompleted += (sender, eventArgs) =>
{
if (callback == null) return;
var response = JsonConvert.DeserializeObject<TResponse>(eventArgs.Result);
callback(new CallCompleteEventArgs<TResponse>(response, eventArgs));
};
client.DownloadStringAsync(address);
}

public void Put<TResponse>(string uriTemplate, string request, Action<CallCompleteEventArgs<TResponse>> callback)
{
var client = new WebClient();
var address = GetUri(uriTemplate);
client.UploadStringCompleted += (sender, eventArgs) =>
{
if (callback == null) return;
var response = JsonConvert.DeserializeObject<TResponse>(eventArgs.Result);
callback(new CallCompleteEventArgs<TResponse>(response, eventArgs));
};
client.Headers[HttpRequestHeader.ContentType] = "application/json";
client.UploadStringAsync(address, "PUT", request);
}

private Uri GetUri(string uriTemplate)
{
var uriString = string.Format("http{0}://{1}:{2}/Facades{3}",
serviceEnvironment.UseHttpS ? "s" : "",
serviceEnvironment.BaseAddress,
serviceEnvironment.Port,
uriTemplate);
return new Uri(uriString);
}
}
}

Source code


The solution features:



  • DTOS:  A portable class library project containing DTOs
  • Interfaces:  A class library containing the WCF interfaces.  This project also contains the T4 template scripts
  • WCF ServiceDirectory:  A Silverlight project which calls the rest services , contains REST service calling code.
  • A website project with Web API (WCF) REST Services

https://github.com/stevenh77/AsyncServiceInterfaces

Sunday, 27 May 2012

Silverlight Async Rest Wcf

Here is a simple example application that shows off:

  • “Aync” and “await” within Silverlight 5 in Visual Studio 11
  • Calling a simple RESTful WCF interface hosted with no config or svc file

image

image

image

MainPage.xaml.cs (code behind)

private async void LoadDataButton_Click(object sender, RoutedEventArgs e)
{
var request = new WebClient();
var uri = new Uri("http://localhost:5349/services/cars");
var jsonString = await request.DownloadStringTaskAsync(uri);

using (var stream = new MemoryStream(Encoding.Unicode.GetBytes(jsonString.ToCharArray())))
{
var serializer = new DataContractJsonSerializer(typeof (Car[]));
var cars = (Car[]) serializer.ReadObject(stream);
CarsListBox.ItemsSource = cars;
}
}

Service Interface

[ServiceContract]
interface IService<T>
{
[OperationContract]
IList<T> GetAll();

[OperationContract]
T Get(string id);

[OperationContract]
void Insert(T entity);

[OperationContract]
void Update(string id, T entity);

[OperationContract]
void Delete(string id);
}

Service Implementation

[AspNetCompatibilityRequirements(RequirementsMode = AspNetCompatibilityRequirementsMode.Allowed)]
public class CarService : IService<Car>
{
private readonly IRepository<Car> _repo;

public CarService()
{
_repo = new FakeCarRepository();
}

public CarService(IRepository<Car> repo)
{
_repo = repo;
}

[WebGet(UriTemplate = "Cars", ResponseFormat = WebMessageFormat.Json)]
public IList<Car> GetAll()
{
var cars = _repo.GetAll();
return cars;
}

[WebGet(UriTemplate = "Car/{id}", ResponseFormat = WebMessageFormat.Json)]
public Car Get(string id)
{
var car = _repo.GetById(id);
return car;
}

[WebInvoke(UriTemplate = "Car", Method = "POST")]
public void Insert(Car car)
{
_repo.Insert(car);
}

[WebInvoke(UriTemplate = "Car/{id}", Method = "PUT")]
public void Update(string id, Car car)
{
_repo.Update(car);
}

[WebInvoke(UriTemplate = "Car/{id}", Method = "DELETE")]
public void Delete(string id)
{
_repo.Delete(id);
}
}

DTO

[DataContract]
public class Car
{
[DataMember]
public int Id { get; set; }
[DataMember]
public string Make { get; set; }
[DataMember]
public string Model { get; set; }
[DataMember]
public int Year { get; set; }
}

How come no config or svc file?


Global.asax

public class Global : HttpApplication
{
protected void Application_Start(object sender, EventArgs e)
{
RouteTable.Routes.Add(new ServiceRoute("services", new WebServiceHostFactory(), typeof(CarService)));
}
}

Web.config

<?xml version="1.0"?>
<configuration>
<system.web>
<compilation debug="true" targetFramework="4.0" />
</system.web>
<system.serviceModel>
<serviceHostingEnvironment aspNetCompatibilityEnabled="true"/>
<standardEndpoints>
<webHttpEndpoint>
<standardEndpoint name="" helpEnabled="true" automaticFormatSelectionEnabled="true"/>
</webHttpEndpoint>
</standardEndpoints>
</system.serviceModel>
</configuration>

Source


http://stevenhollidge.com/blog-source-code/SilverlightAsyncRestWcf.zip

Tuesday, 14 February 2012

Reactive Extensions (Rx)

Overview

MSDN Definition

“Rx is a library to compose asynchronous and event-based programs using observable collections and LINQ-style query operators.”

Rx provides extension methods on top of Linq to enable developers to implement the observer pattern on query results (IEnumerables e.g. a collection or sequence).

The benefits of Rx are in it’s enabling of concurrency and it’s composability.  It allows for easy concurrent processing over a collection along with simple declarative call backs on completion and exceptions.  This programming model allows for simple orchestration of async operations.

To get familiar with Rx, in the examples below we’ll look at simple enumerations of data to get a feel for the programming style.  We can then take a look at Rx for events, with events after all just being a sequence of data (found in the EventArgs).

How To Get Reactive Extensions

Use Nuget, search for Rx and bring in the Reactive.Main library (System.Reactive namespace).

rx-Nuget

Simple example

1.Convert the linq query into an observable (using the Rx extension method).

2. Write a delegate which will execute for:

  • Each item in the enumeration (OnNext method in the examples below)
  • When an error occurs (OnError method in the examples below)
  • When the enumeration completes (OnCompleted method in the examples below)

3. Subscribe to the query results.  This is whether you hook up each of your delegates and start the enumeration.

rx-Simple

“Finally” delegate

There is another delegate we can set, that runs after the processing has completed, regardless of whether the enumeration successfully got the end of the sequence or not e.g. An error occurred half way through processing so the completed delegate doesn’t run.  The Finally delegate will always run which might be useful for clean up.

rx-FinallyDelegate

How to Configure Threading

rx-HowToConfigureThreads

Notice how all the "getting" and "processing" are sequential and in keeping with the enumeration (within each group) even though the processing might be occurring on different threads.

Early Cancellation of Processing

You can cancel the processing of a collection, here is an example:

rx-EarlyCancellation

“Using” A Disposable Object

This example code disposes of an object after the completed delegate has run.

rx-UsingDisposableObjects

A real world example may look like this:

Demo Source Code

You can download the demo here:  http://stevenhollidge.com/blog-source-code/RxExtensionsDemo.zip