As I’ve previous discussed, exactly when something starts in Rx is a bit of an issue. Sometimes, for instance when dealing with network traffic, we need to do some work before we receive any more data. We could simply buffer it, but that runs the risk of the long queue problem. So, we need some way of telling an observable to start, stop and resume. IConnectableObservable gives us exactly the interface that we want here: the Connect method represents Start/Resume, with disposing of its return value represents Pause.
Obviously, if extra values are produced, we need them buffered until we reconnect. I had a couple of other requirements: to behave like Observable.While (because I’m asynchronously reading from a stream) and to allow multiple “connections” at once.
So, here’s the code:
public static class ObservableHelper { public static IConnectableObservable<TSource> WhileResumable<TSource>(Func<bool> condition, IObservable<TSource> source) { var buffer = new Queue<TSource>(); var subscriptionsCount = 0; var isRunning = Disposable.Create(() => { lock (buffer) { subscriptionsCount--; } }); var raw = Observable.CreateWithDisposable<TSource>(subscriber => { lock (buffer) { subscriptionsCount++; if (subscriptionsCount == 1) { while (buffer.Count > 0) { subscriber.OnNext(buffer.Dequeue()); } Observable.While(() => subscriptionsCount > 0 && condition(), source) .Subscribe( v => { if (subscriptionsCount == 0) buffer.Enqueue(v); else subscriber.OnNext(v); }, e => subscriber.OnError(e), () => { if (subscriptionsCount > 0) subscriber.OnCompleted(); } ); } } return isRunning; }); return raw.Publish(); } }
I’ll reiterate my previous point about scoping of anonymous functions. If you don’t understand them completely, you’re not going to get very far with Reactive Extensions. The above code is short and achieves its aims, but we’ve done the following:
- Created an anonymous disposable using a lambda.
- Created an anonymous observable using another lambda using the same scope of the disposable.
- Constructed a repeating observable within that. (Using another lambda)
- Subscribed to that observable using lambda expressions that use the “raw” scope.
Equally, we have a chain of observables:
- The source observable that is to be repeated.
- The while observable that actually does the work
- The raw observable that starts when you subscribe and buffers when you unsubscribe.
- The final result, which is a connectable version of the raw observable.
Constructions like this are extremely common in Rx (and Clojure, for that matter), but rare in C# code in general. You’ll also note, I haven’t even touched on LINQ.