Reactive Extensions: What on Earth is IConnectableObservable?

Since Reactive Extensions seems to have no documentation worth speaking of, I figure it’s worth writing up stuff as I learn it.  One of the major differences between Retlang and Reactive Extensions is that Rx has a first order concept of a message source.  ISubscriber defines approximately the same interface as IObservable, but the only implementation is Channel, which is the equivalent of subject.  A fair number of methods within Rx are geared to producing sources to react to.

So, let’s consider the following extremely boring Rx source:

public class NumberSource : IObservable<int>
{
    Subject<int> underlying = new Subject<int>();
    private Thread thread;

    public NumberSource()
    {
        thread = new Thread(() =>
                                    {
                                        for (int i = 0; i < 10000; i++) underlying.OnNext(i);
                                    });
        thread.Start();
    }


    public IDisposable Subscribe(IObserver<int> observer)
    {           
        return underlying.Subscribe(observer);
    }
}

And let’s consume it using the following code:

static void Main() {
    var x = new NumberSource().ObserveOn(Scheduler.ThreadPool);
    x.Subscribe(n => Console.WriteLine("A" + n));
    Console.ReadKey();
}

What would you expect this to print?  Actually it prints exactly nothing.  The problem is: the thread actually finished before the subscribe ever gets to run.  Figuring out the timing of subscribes is an important problem in understanding Rx code.  Now, Observable.Defer won’t help us this time, but we can just fix the code.

public class NumberSource : IObservable<int>
{
    Subject<int> underlying = new Subject<int>();
    private Thread thread;

    public NumberSource()
    {
        thread = new Thread(() =>
                                    {
                                        for (int i = 0; i < 10000; i++) underlying.OnNext(i);
                                    });
        
    }


    public IDisposable Subscribe(IObserver<int> observer)
    {
        thread.Start();    
        return underlying.Subscribe(observer);
    }
}

Handling Multiple Subscribers

Okay, now our code works, until we do this:

static void Main() {
    var x = new NumberSource().ObserveOn(Scheduler.ThreadPool);
    x.Subscribe(n => Console.WriteLine("A" + n));
    x.Subscribe(n => Console.WriteLine("B" + n));
    Console.ReadKey();
}

Now the code just plain crashes.  So, if we’ve got multiple subscribers, we can’t make the code work with the subscriber in the constructor, nor in the Subscribe method.  Checking to see if the thread had already started wouldn’t work either, because B would just miss some of the messages.  What we need is a way to separate the thread start from the subscription.  This is where connectable observable comes in.

public interface IConnectableObservable<out T> : IObservable<T> {
    IDisposable Connect();
}

So, we can now put the thread.Start in the connect

public class NumberSource : IConnectableObservable<int>
{
    Subject<int> underlying = new Subject<int>();
    private Thread thread;

    public NumberSource()
    {
        thread = new Thread(() =>
                                    {
                                        for (int i = 0; i < 10000; i++) underlying.OnNext(i);
                                    });        
    }


    public IDisposable Subscribe(IObserver<int> observer)
    {
        return underlying.Subscribe(observer);
    }


    public IDisposable Connect()
    {
        thread.Start();
        return null;
    }
}

Subscribing is a bit more complex as well:

static void Main() {
    var ns = new NumberSource();
    var x = ns.ObserveOn(Scheduler.ThreadPool);
    x.Subscribe(n => Console.WriteLine("A" + n));
    x.Subscribe(n => Console.WriteLine("B" + n));
    ns.Connect();
    Console.ReadKey();
}

Simplifying

So, we have working code again, but it’s got pretty complex.  Let’s critique our NumberSource class.  For one thing, it doesn’t actually handle disconnections.  I could make the code more complex to implement that correctly, but I’m not going to bother.  For another, I’m using Subject<T>.  Why? Because handling multiple subscriptions is a pain.

Quite a lot of sources in Rx actually only support one subscriber, so let’s simplify the code down to the simplest possible implementation of NumberSource

public class NumberSource : IObservable<int>
{
    private IObserver<int> subscriber;
    private readonly Thread thread;

    public NumberSource()
    {
        thread = new Thread(() => {
            for (int i = 0; i < 10000; i++)
            {
                var current = subscriber;
                if (current != null)
                {
                    current.OnNext(i);
                }
            }
        });
    }


    public IDisposable Subscribe(IObserver<int> observer)
    {
        subscriber = observer;        
        thread.Start();        
        return new AnonymousDisposable(() => subscriber = null);
    }
}

Note the slightly tricky code around OnNext, which avoids thread safety issues (you’ll be familiar with this idiom if you’ve read the Retlang source code).  Now we need a way of turning multiple subscribers into a single subscriber.  That’s what the Publish method does: converts an observable into a connectable observable.  This makes the Main method much more readable:

static void Main() {
    var x = new NumberSource().ObserveOn(Scheduler.ThreadPool).Publish();
    x.Subscribe(n => Console.WriteLine("A" + n));
    x.Subscribe(n => Console.WriteLine("B" + n));
    x.Connect();
    Console.ReadKey();
}

If you run the code, you’ll observe that the As and Bs come in lock step.  This is a direct result of the way that publish works: it turns two subscriptions into one.  Publish does pretty much everything you need, so it’s highly unlikely you’ll ever actually implement IConnectableObservable yourself.  Simply write an observable that starts when it receives a subscription and let publish handle everything for you.  Try not to confuse Publish in Retlang with Publish in Rx…

Technorati Tags: ,,

Published by

Julian Birch

Full time dad, does a bit of coding on the side.

One thought on “Reactive Extensions: What on Earth is IConnectableObservable?”

Leave a comment