The DetectStale<T> extension method within the Reactive Trader RX example application detects when a stream has stopped producing values and injects an IsStale indicator to allow the UI to react.
The idea is that while the stream is steadily providing values all is good. As soon as a value hasn’t been received within a set period of time, a new value is sent down the stream to say “this steam has gone stale”.
First here is an example of it being used within a test:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Test] | |
public void StaleShouldBeInsertedOnSilentStreamOnce() | |
{ | |
var testScheduler = new TestScheduler(); | |
var stalenessPeriod = TimeSpan.FromTicks(10); | |
var results = new List<Timestamped<IStale<int>>>(); | |
Observable.Never<int>() | |
.DetectStale(stalenessPeriod, testScheduler) | |
.Timestamp(testScheduler) | |
.Subscribe(results.Add); | |
testScheduler.AdvanceBy(25); | |
Assert.AreEqual(1, results.Count); | |
Assert.AreEqual(10, results[0].Timestamp.Ticks); | |
} |
The DetectStale internally uses a timer to know whether it should be sending in effect an null/empty stale value. Here are another couple of tests for the DetectStale extension method:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Test] | |
public void AfterAnUpdateStaleShouldHappenAfterPeriod() | |
{ | |
var testScheduler = new TestScheduler(); | |
var stalenessPeriod = TimeSpan.FromTicks(10); | |
var results = new List<Timestamped<IStale<long>>>(); | |
Observable.Timer(TimeSpan.FromTicks(5), testScheduler).Concat(Observable.Never<long>()) | |
.DetectStale(stalenessPeriod, testScheduler) | |
.Timestamp(testScheduler) | |
.Subscribe(results.Add); | |
testScheduler.AdvanceBy(30); | |
Assert.AreEqual(2, results.Count); | |
Assert.IsFalse(results[0].Value.IsStale, "First update should not be a stale"); | |
Assert.AreEqual(5, results[0].Timestamp.Ticks, "First update should not be delayed"); | |
Assert.IsTrue(results[1].Value.IsStale, "Second update should be a stale"); | |
Assert.AreEqual(15, results[1].Timestamp.Ticks, "Second update should happen after first update + stale period"); | |
} | |
[Test] | |
public void DetectStaleDoesNotThrowOnSynchronousStreams() | |
{ | |
var result = Observable.Return(1) | |
.DetectStale(TimeSpan.FromSeconds(1), new TestScheduler()) | |
.Wait(); | |
Assert.AreEqual(1, result.Update); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// Detects when a stream becomes inactive for some period of time | |
/// </summary> | |
/// <typeparam name="T">update type</typeparam> | |
/// <param name="source">source stream</param> | |
/// <param name="stalenessPeriod">if source steam does not OnNext any update during this period, it is declared staled</param> | |
/// <param name="scheduler"></param> | |
/// <returns></returns> | |
public static IObservable<IStale<T>> DetectStale<T>(this IObservable<T> source, TimeSpan stalenessPeriod, IScheduler scheduler) | |
{ | |
return Observable.Create<IStale<T>>(observer => | |
{ | |
var timerSubscription = new SerialDisposable(); | |
var observerLock = new object(); | |
Action scheduleStale = () => | |
{ | |
timerSubscription.Disposable = Observable | |
.Timer(stalenessPeriod, scheduler) | |
.Subscribe(_ => | |
{ | |
lock (observerLock) | |
{ | |
observer.OnNext(new Stale<T>()); | |
} | |
}); | |
}; | |
var sourceSubscription = source.Subscribe( | |
x => | |
{ | |
// cancel any scheduled stale update | |
var disposable = timerSubscription.Disposable; | |
if (disposable != null) | |
disposable.Dispose(); | |
lock (observerLock) | |
{ | |
observer.OnNext(new Stale<T>(x)); | |
} | |
scheduleStale(); | |
}, | |
observer.OnError, | |
observer.OnCompleted); | |
scheduleStale(); | |
return new CompositeDisposable { sourceSubscription, timerSubscription }; | |
}); | |
} |
No comments:
Post a Comment