Sunday, 29 June 2014

DetectStale within a stream

The DetectStale<T> extension method within the Reactive Trader RX example application detects when a stream has stopped producing values and injects an IsStale indicator to allow the UI to react.

The idea is that while the stream is steadily providing values all is good.  As soon as a value hasn’t been received within a set period of time, a new value is sent down the stream to say “this steam has gone stale”.

First here is an example of it being used within a test:

[Test]
public void StaleShouldBeInsertedOnSilentStreamOnce()
{
var testScheduler = new TestScheduler();
var stalenessPeriod = TimeSpan.FromTicks(10);
var results = new List<Timestamped<IStale<int>>>();
Observable.Never<int>()
.DetectStale(stalenessPeriod, testScheduler)
.Timestamp(testScheduler)
.Subscribe(results.Add);
testScheduler.AdvanceBy(25);
Assert.AreEqual(1, results.Count);
Assert.AreEqual(10, results[0].Timestamp.Ticks);
}
image
The DetectStale internally uses a timer to know whether it should be sending in effect an null/empty stale value. Here are another couple of tests for the DetectStale extension method:

[Test]
public void AfterAnUpdateStaleShouldHappenAfterPeriod()
{
var testScheduler = new TestScheduler();
var stalenessPeriod = TimeSpan.FromTicks(10);
var results = new List<Timestamped<IStale<long>>>();
Observable.Timer(TimeSpan.FromTicks(5), testScheduler).Concat(Observable.Never<long>())
.DetectStale(stalenessPeriod, testScheduler)
.Timestamp(testScheduler)
.Subscribe(results.Add);
testScheduler.AdvanceBy(30);
Assert.AreEqual(2, results.Count);
Assert.IsFalse(results[0].Value.IsStale, "First update should not be a stale");
Assert.AreEqual(5, results[0].Timestamp.Ticks, "First update should not be delayed");
Assert.IsTrue(results[1].Value.IsStale, "Second update should be a stale");
Assert.AreEqual(15, results[1].Timestamp.Ticks, "Second update should happen after first update + stale period");
}
[Test]
public void DetectStaleDoesNotThrowOnSynchronousStreams()
{
var result = Observable.Return(1)
.DetectStale(TimeSpan.FromSeconds(1), new TestScheduler())
.Wait();
Assert.AreEqual(1, result.Update);
}
Here is the DetectStale code itself:
/// <summary>
/// Detects when a stream becomes inactive for some period of time
/// </summary>
/// <typeparam name="T">update type</typeparam>
/// <param name="source">source stream</param>
/// <param name="stalenessPeriod">if source steam does not OnNext any update during this period, it is declared staled</param>
/// <param name="scheduler"></param>
/// <returns></returns>
public static IObservable<IStale<T>> DetectStale<T>(this IObservable<T> source, TimeSpan stalenessPeriod, IScheduler scheduler)
{
return Observable.Create<IStale<T>>(observer =>
{
var timerSubscription = new SerialDisposable();
var observerLock = new object();
Action scheduleStale = () =>
{
timerSubscription.Disposable = Observable
.Timer(stalenessPeriod, scheduler)
.Subscribe(_ =>
{
lock (observerLock)
{
observer.OnNext(new Stale<T>());
}
});
};
var sourceSubscription = source.Subscribe(
x =>
{
// cancel any scheduled stale update
var disposable = timerSubscription.Disposable;
if (disposable != null)
disposable.Dispose();
lock (observerLock)
{
observer.OnNext(new Stale<T>(x));
}
scheduleStale();
},
observer.OnError,
observer.OnCompleted);
scheduleStale();
return new CompositeDisposable { sourceSubscription, timerSubscription };
});
}
view raw DetectStale hosted with ❤ by GitHub
You can the source subscription on line 42 calling the normal/expected OnNext method with the value.  After that they call the scheduleStale method containing the timer with Stale / null value.

No comments:

Post a Comment