The run-on-subscribe model I described in the previous post on Rx is an important concept within the system. So much so that there’s a method Observable.Defer. Defer takes a factory and creates a new observable based on the factory that runs when you subscribe. It’s basically a lazy create-on-subscribe. However, it’s worth bearing in mind that Defer only works if your subscription occurs before the observable produces a value. Now, if you examine our previous code, you’ll see that there was no sensible way of achieving that.
This raises the interesting question of how FromAsyncPattern does its job, since it’s trying to solve a very similar problem. To understand it properly, let’s review how FromAsyncPattern is used:
- You call Observable.FromAsyncPattern. You give it a Begin and an End method.
- This returns a function that takes the same parameters as Begin and returns an observable.
- This observable is a one-shot observable: it only ever yields one value: the asynchronous result of the call you just specified.
Practical Usage of FromAsyncPattern
On its own, this would be pretty useless. You’d need to set up a subscription each time you called it. So, we need to use some more functions:
- Observable.While, which can be used to repeat an observable.
- Observable.Defer, which we’ve already mentioned.
public static IObservable<TSource> While<TSource>(Func<bool> condition, IObservable<TSource> source); public static IObservable<TValue> Defer<TValue>(Func<IObservable<TValue>> observableFactory);
So, the standard structure is to call While(functionThatDeterminesWhetherToContinue, Defer(anonymousDelegateThatInvolvesTheResultOfCallingFromAsyncPattern). Like this:
var tcpListener = new TcpListener(new IPAddress(new byte[] { 127, 0, 0, 1 }), 8080); tcpListener.Start(); var getSocket = Observable.FromAsyncPattern<Socket>(tcpListener.BeginAcceptSocket, tcpListener.EndAcceptSocket); Observable.While(() => true, Observable.Defer(() => getSocket())) .ObserveOn(Scheduler.ThreadPool) .Subscribe(HandleSocket);
I’m afraid that’s pretty much the simplest use of FromAsyncPattern achievable. As an aside, if you’re not extremely happy with the scoping and lifetime rules of anonymous delegates, you don’t stand a chance programming with Rx. It uses them everywhere. (Some readers will be wondering why on earth I’m pointing this out; in my experience 95% of .NET developers don’t really understand this stuff.)
So How Does it Work Then?
If you stare at this code for a while, you’ll be left wondering whether the call to BeginAcceptSocket occurs when you call getSocket. Actually, it does, which raises an interesting question: how on earth does it manage to guarantee sending the result to its subscriber given that we saw last time that’s not possible. The answer’s quite simple: it cheats:
- getSocket calls BeginAcceptSocket and sets up a call back
- The call back passes the result to an AsyncSubject<Socket>
- AsyncSubject passes on the result if you’re subscribed.
- If you’re not subscribed, it keeps the last Notification in memory and transmits it when you subscribe.
So, AsyncSubject is a buffer that contains one element. Now, if you were producing a multiple objects, you’d have a problem. Did you want all of the values produced, or only the last one? In the case of an one-shot observable, you don’t have that difficulty.
While, on the other hand, only starts when you subscribe. Concat does the same thing (indeed, While uses Concat internally). You can see this with the following code:
static void Main() { var w = Observable.While(() => true, Observable.Generate(0, n => n < 10, n => { Console.WriteLine(n); return n + 1; }, n => n, Scheduler.ThreadPool)); Console.ReadKey(); w.Subscribe(n => Console.WriteLine("Subscribed " + n)); Console.ReadKey(); }
As we’ve seen here, the whole “hot and cold” observable thing is really quite complex. It’s not clear that appeals to the duality of iterator and observer make it any easier to understand. In any event, there’s very little way of telling exactly when an observable starts generating values short of reading the source code. It’s a pity it’s closed source.