Reactive Extensions: Making a Pausable Observable

As I’ve previous discussed, exactly when something starts in Rx is a bit of an issue.  Sometimes, for instance when dealing with network traffic, we need to do some work before we receive any more data.  We could simply buffer it, but that runs the risk of the long queue problem.  So, we need some way of telling an observable to start, stop and resume.  IConnectableObservable gives us exactly the interface that we want here: the Connect method represents Start/Resume, with disposing of its return value represents Pause.

Obviously, if extra values are produced, we need them buffered until we reconnect.  I had a couple of other requirements: to behave like Observable.While (because I’m asynchronously reading from a stream) and to allow multiple “connections” at once.

So, here’s the code:

public static class ObservableHelper {
    public static IConnectableObservable<TSource> WhileResumable<TSource>(Func<bool> condition, IObservable<TSource> source) {
        var buffer = new Queue<TSource>();
        var subscriptionsCount = 0;
        var isRunning = Disposable.Create(() => {
            lock (buffer)
            {
                subscriptionsCount--;
            }
        });
        var raw = Observable.CreateWithDisposable<TSource>(subscriber => {
            lock (buffer)
            {
                subscriptionsCount++;
                if (subscriptionsCount == 1)
                {
                    while (buffer.Count > 0) {
                        subscriber.OnNext(buffer.Dequeue());
                    }
                    Observable.While(() => subscriptionsCount > 0 && condition(), source)
                        .Subscribe(
                            v => { if (subscriptionsCount == 0) buffer.Enqueue(v); else subscriber.OnNext(v); },
                            e => subscriber.OnError(e),
                            () => { if (subscriptionsCount > 0) subscriber.OnCompleted(); }
                        );
                }
            }
            return isRunning;
        });
        return raw.Publish();
    }
}

I’ll reiterate my previous point about scoping of anonymous functions.  If you don’t understand them completely, you’re not going to get very far with Reactive Extensions.  The above code is short and achieves its aims, but we’ve done the following:

  • Created an anonymous disposable using a lambda.
  • Created an anonymous observable using another lambda using the same scope of the disposable.
  • Constructed a repeating observable within that.  (Using another lambda)
  • Subscribed to that observable using lambda expressions that use the “raw” scope.

Equally, we have a chain of observables:

  • The source observable that is to be repeated.
  • The while observable that actually does the work
  • The raw observable that starts when you subscribe and buffers when you unsubscribe.
  • The final result, which is a connectable version of the raw observable.

Constructions like this are extremely common in Rx (and Clojure, for that matter), but rare in C# code in general.  You’ll also note, I haven’t even touched on LINQ.

Technorati Tags: ,

IronRuby: Only Resting?

By now everyone and his twitter account has heard the news about Jimmy Schementi.  There’s also the obvious back and forth about whether IronRuby can now be considered dead.  Most of it has been of the form of “the community will decide” versus “without Microsoft support, it’s a dodo”.

Nailed to its perch

Let me start my own take on this with my own experiences of IronRuby.  I’ve been concentrating on using it as a testing platform for C# code, with some success.  Here’s my take on the current state of affairs:

IronRuby 1.0 is a relatively good implementation of Ruby 1.8.  Since the community at large seems to be having serious trouble moving to 1.9, a hiccup in IronRuby’s progress wouldn’t seem to be that big a deal.  However, that’s to ignore some huge obstacles.

As I’ve already touched upon, Cucumber may have worked a a year ago, but it doesn’t work now.  Multiply this by the number of native packages for Ruby and you have a serious problem.  However, the community support isn’t the only issue.  The performance while debugging C# code round-tripping to IronRuby code is appalling.  Borderline unusable.  Now, I don’t need editor support for IronRuby, I’m a big boy and can find my own text editor.  But being unable to debug C# in a timely fashion?

In short, if you’re using IronRuby right now, you’re going to have problems calling into a C# extension and you’re going to have trouble using a gem as well.  For an early adopter that’s not a problem.  But if this is the finished product, we’ve got a problem on our hands.  Ironically, I didn’t want to just use it as a platform for rails and leave .NET behind.  I wanted to use it to make my C# code better.

Oh yes, and RSpec absolutely rocks.

A Much, Much Bigger Picture

Where exactly does office fit in here?

I sincerely believe that the .NET runtime is, at a technical level, hands down superior to the JVM, MRI or CPython runtimes.  But the community (of which I am a part) simply cannot compete with Java, Ruby or Python.  They’ve got better tools, more ideas, faster delivery cycles and more people.  And here I’m only talking about language/platform communities.  Node.js is really exciting, there’s 100 versions of NoSQL that are worth investigating.

I can’t really fault Karl Seguin’s logic, but this is the logic of the SKU, which I’ve already had a rant about.  This is killing Windows (not .NET, Windows) as a development platform and no-one seems to be noticing.  I basically have two choices: ditch Windows as a platform, or make do with inferior solutions for web applications, database server, testing suites and so on.  It’s not quite as aggressive as Steve Job’s rewriting the App Store rules to dictate how I program, but it’s still not a pleasant choice.

The Next Big Language

picBullElephantTomClaytor

All of this is ignoring the elephant in the corner: JavaScript.  Microsoft’s DLR strategy was always fundamentally flawed in not addressing JavaScript.  It seems that large numbers of people at Microsoft just don’t really like or understand the language, which has lead to them spending years trying to turn it into something that it isn’t.  It sounds like they’ve finally got a clue on IE9, but does this approach have anything even to do with .NET.  This means that not only have they missed the Ruby and Python waves, they’re just about to miss the next one.  Node is turning JavaScript into a serious out-of-browser development language at a point where Microsoft appears to have finally given up on the idea.

You mustn’t read too much into one blog post, but you’ve got to question Microsoft’s game plan.  Because right at the moment, all of their strategies seem to be about trying to lock people into a crumbling platform, rather than trying to create the platform that everyone wants to use.  The real kicker of it is: .NET really is a great platform.  I don’t want to stop using it, I want it to actually live up to its ambition and let me use tools from any platform on mine.

NOTES:  The puzzle picture is by Lynette Cook, the elephant is by Tom Claytor.  I don’t know who’s the artist behind the parrot picture, but I got it from this gag post.

How to Resolve Fix Sequence Number Issues

Sequence number breaks in FIX sessions seem to terrify most people.  This is understandable: all of a sudden, a perfectly decent connection refuses to come up.  What’s worse, if you get it wrong whilst fixing it, you run the risk of trashing data.  However, not understanding them is dangerous.  For one thing, without understanding them, it’s unlikely you’ll be able to prevent them.  Worse, even if you never take a system down during trading hours, it could still crash. 

Anyone who deals with a fix system should know how to resolve a sequence number mismatch.  I recommend practicing on a test system, though.

A Refresher on How Fix Works

Let’s say that Alice is sending Bob messages.  Since they don’t want to miss a message, they label every message with a sequence number.  If Bob gets a sequence number that’s too high, he just sends Alice a request to resend the messages.

Now, breaks occur when systems recover from an outage.  When they start off again, Alice and Bob send each other where they think they’re up to.  If Bob gets a sequence number that’s too low, he gets worried.  Alice seems to have forgotten she’s sent some messages.  Bob now doesn’t trust Alice and refuses to talk to her until she gets her head together.  That’s a sequence break.

So, here’s what can happen

  • Counterpart sequence number too high:  Bob hasn’t received enough, so the connection comes up and he asks for a resend.
  • Counterpart sequence number correct: Bob has received exactly the right number, so the connection comes up and there’s no problems.
  • Counterpart sequence number too low: Alice has forgotten something.  Connection doesn’t come up.

When a connection doesn’t come up, it’s important to figure out exactly why the sender has lost information.  In practice, however, the most common cause is that someone’s forgotten a heartbeat message.  QuickFix sometimes does this on shutdown.

QuickFix Specific Stuff

If you open up the QuickFix sequence numbers file (your service has to be off to open it) you’ll see two numbers.  On the left is the current sequence numbers for messages you’ve sent.  On the right, messages you’ve received.  Bear in mind that the numbers are the other way around for your counterpart.  If you forget this, your connection may come up, but you’ll wish it hadn’t.  All you need to do is to mutually agree a pair of numbers. 

Bear in mind that Alice and Bob are in fact indulging in two-way communication.  The QuickFix logs for your side may show a resend request followed by a logout.  In those circumstances, your counterpart is probably seeing a bad sequence number on his side, which means you’re the guy who’s forgotten how much he’s sent.

Whatever number you decide upon, check the main message log for what you’re about to replay.  This will usually just be heartbeats.  If it isn’t, you’ve got a reliability problem.

So, here’s the quick guide:

  • On the left (sent), at least as high as the other guy thinks he’s received.
  • On the right (received), lower or equal to what the other guy thinks he’s sent.

If you’re running QuickFix on both sides, you can always just email the file across and swap the numbers round.

Deep Hacking

You can actually say you’ve sent more messages than you think you have.  This can get you out of trouble, but it will completely snarf you if someone then asks for a replay.  Thankfully, most don’t ask for a replay unless they lose messages. 

Technorati Tags: ,

Reactive Extensions: How does FromAsyncPattern work?

The run-on-subscribe model I described in the previous post on Rx is an important concept within the system.  So much so that there’s a method Observable.Defer.  Defer takes a factory and creates a new observable based on the factory that runs when you subscribe.  It’s basically a lazy create-on-subscribe.  However, it’s worth bearing in mind that Defer only works if your subscription occurs before the observable produces a value.  Now, if you examine our previous code, you’ll see that there was no sensible way of achieving that.

This raises the interesting question of how FromAsyncPattern does its job, since it’s trying to solve a very similar problem.  To understand it properly, let’s review how FromAsyncPattern is used:

  • You call Observable.FromAsyncPattern.  You give it a Begin and an End method. 
  • This returns a function that takes the same parameters as Begin and returns an observable.
  • This observable is a one-shot observable: it only ever yields one value: the asynchronous result of the call you just specified.

Practical Usage of FromAsyncPattern

On its own, this would be pretty useless.  You’d need to set up a subscription each time you called it.  So, we need to use some more functions:

  • Observable.While, which can be used to repeat an observable.
  • Observable.Defer, which we’ve already mentioned.
public static IObservable<TSource> While<TSource>(Func<bool> condition, IObservable<TSource> source);
public static IObservable<TValue> Defer<TValue>(Func<IObservable<TValue>> observableFactory);

So, the standard structure is to call While(functionThatDeterminesWhetherToContinue, Defer(anonymousDelegateThatInvolvesTheResultOfCallingFromAsyncPattern).  Like this:

var tcpListener = new TcpListener(new IPAddress(new byte[] { 127, 0, 0, 1 }), 8080);
tcpListener.Start();
var getSocket = Observable.FromAsyncPattern<Socket>(tcpListener.BeginAcceptSocket, tcpListener.EndAcceptSocket);
Observable.While(() => true, Observable.Defer(() => getSocket()))
    .ObserveOn(Scheduler.ThreadPool)
    .Subscribe(HandleSocket);

I’m afraid that’s pretty much the simplest use of FromAsyncPattern achievable.  As an aside, if you’re not extremely happy with the scoping and lifetime rules of anonymous delegates, you don’t stand a chance programming with Rx.  It uses them everywhere.  (Some readers will be wondering why on earth I’m pointing this out; in my experience 95% of .NET developers don’t really understand this stuff.)

So How Does it Work Then?

If you stare at this code for a while, you’ll be left wondering whether the call to BeginAcceptSocket occurs when you call getSocket.  Actually, it does, which raises an interesting question: how on earth does it manage to guarantee sending the result to its subscriber given that we saw last time that’s not possible.  The answer’s quite simple: it cheats:

  • getSocket calls BeginAcceptSocket and sets up a call back
  • The call back passes the result to an AsyncSubject<Socket>
  • AsyncSubject passes on the result if you’re subscribed.
  • If you’re not subscribed, it keeps the last Notification in memory and transmits it when you subscribe.

So, AsyncSubject is a buffer that contains one element.  Now, if you were producing a multiple objects, you’d have a problem.  Did you want all of the values produced, or only the last one?  In the case of an one-shot observable, you don’t have that difficulty.

While, on the other hand, only starts when you subscribe.  Concat does the same thing (indeed, While uses Concat internally).  You can see this with the following code:

static void Main() {
    var w = Observable.While(() => true, Observable.Generate(0, 
        n => n < 10, 
        n =>
            {
                Console.WriteLine(n);
                return n + 1;
            }, 
        n => n, 
        Scheduler.ThreadPool));
    Console.ReadKey();
    w.Subscribe(n => Console.WriteLine("Subscribed " + n));
    Console.ReadKey();
}

As we’ve seen here, the whole “hot and cold” observable thing is really quite complex.  It’s not clear that appeals to the duality of iterator and observer make it any easier to understand.  In any event, there’s very little way of telling exactly when an observable starts generating values short of reading the source code.  It’s a pity it’s closed source.

Technorati Tags: ,

Reactive Extensions: AnonymousDisposable

When you subscribe to a channel in Retlang, you get an IUnsubscriber back.  The equivalent of this in Rx is just IDisposable.  This makes AnonymousDisposable is a fairly vital class in ReactiveExtensions. I even used it in the last post. It’s a pity someone decided to mark it as internal (again).  So here’s another implementation of it:

public class AnonymousDisposable : IDisposable
{
    private readonly Action _onDispose;

    public AnonymousDisposable(Action onDispose)
    {
        _onDispose = onDispose;
    }

    public void Dispose()
    {
        _onDispose();
    }
}

Reactive Extensions: What on Earth is IConnectableObservable?

Since Reactive Extensions seems to have no documentation worth speaking of, I figure it’s worth writing up stuff as I learn it.  One of the major differences between Retlang and Reactive Extensions is that Rx has a first order concept of a message source.  ISubscriber defines approximately the same interface as IObservable, but the only implementation is Channel, which is the equivalent of subject.  A fair number of methods within Rx are geared to producing sources to react to.

So, let’s consider the following extremely boring Rx source:

public class NumberSource : IObservable<int>
{
    Subject<int> underlying = new Subject<int>();
    private Thread thread;

    public NumberSource()
    {
        thread = new Thread(() =>
                                    {
                                        for (int i = 0; i < 10000; i++) underlying.OnNext(i);
                                    });
        thread.Start();
    }


    public IDisposable Subscribe(IObserver<int> observer)
    {           
        return underlying.Subscribe(observer);
    }
}

And let’s consume it using the following code:

static void Main() {
    var x = new NumberSource().ObserveOn(Scheduler.ThreadPool);
    x.Subscribe(n => Console.WriteLine("A" + n));
    Console.ReadKey();
}

What would you expect this to print?  Actually it prints exactly nothing.  The problem is: the thread actually finished before the subscribe ever gets to run.  Figuring out the timing of subscribes is an important problem in understanding Rx code.  Now, Observable.Defer won’t help us this time, but we can just fix the code.

public class NumberSource : IObservable<int>
{
    Subject<int> underlying = new Subject<int>();
    private Thread thread;

    public NumberSource()
    {
        thread = new Thread(() =>
                                    {
                                        for (int i = 0; i < 10000; i++) underlying.OnNext(i);
                                    });
        
    }


    public IDisposable Subscribe(IObserver<int> observer)
    {
        thread.Start();    
        return underlying.Subscribe(observer);
    }
}

Handling Multiple Subscribers

Okay, now our code works, until we do this:

static void Main() {
    var x = new NumberSource().ObserveOn(Scheduler.ThreadPool);
    x.Subscribe(n => Console.WriteLine("A" + n));
    x.Subscribe(n => Console.WriteLine("B" + n));
    Console.ReadKey();
}

Now the code just plain crashes.  So, if we’ve got multiple subscribers, we can’t make the code work with the subscriber in the constructor, nor in the Subscribe method.  Checking to see if the thread had already started wouldn’t work either, because B would just miss some of the messages.  What we need is a way to separate the thread start from the subscription.  This is where connectable observable comes in.

public interface IConnectableObservable<out T> : IObservable<T> {
    IDisposable Connect();
}

So, we can now put the thread.Start in the connect

public class NumberSource : IConnectableObservable<int>
{
    Subject<int> underlying = new Subject<int>();
    private Thread thread;

    public NumberSource()
    {
        thread = new Thread(() =>
                                    {
                                        for (int i = 0; i < 10000; i++) underlying.OnNext(i);
                                    });        
    }


    public IDisposable Subscribe(IObserver<int> observer)
    {
        return underlying.Subscribe(observer);
    }


    public IDisposable Connect()
    {
        thread.Start();
        return null;
    }
}

Subscribing is a bit more complex as well:

static void Main() {
    var ns = new NumberSource();
    var x = ns.ObserveOn(Scheduler.ThreadPool);
    x.Subscribe(n => Console.WriteLine("A" + n));
    x.Subscribe(n => Console.WriteLine("B" + n));
    ns.Connect();
    Console.ReadKey();
}

Simplifying

So, we have working code again, but it’s got pretty complex.  Let’s critique our NumberSource class.  For one thing, it doesn’t actually handle disconnections.  I could make the code more complex to implement that correctly, but I’m not going to bother.  For another, I’m using Subject<T>.  Why? Because handling multiple subscriptions is a pain.

Quite a lot of sources in Rx actually only support one subscriber, so let’s simplify the code down to the simplest possible implementation of NumberSource

public class NumberSource : IObservable<int>
{
    private IObserver<int> subscriber;
    private readonly Thread thread;

    public NumberSource()
    {
        thread = new Thread(() => {
            for (int i = 0; i < 10000; i++)
            {
                var current = subscriber;
                if (current != null)
                {
                    current.OnNext(i);
                }
            }
        });
    }


    public IDisposable Subscribe(IObserver<int> observer)
    {
        subscriber = observer;        
        thread.Start();        
        return new AnonymousDisposable(() => subscriber = null);
    }
}

Note the slightly tricky code around OnNext, which avoids thread safety issues (you’ll be familiar with this idiom if you’ve read the Retlang source code).  Now we need a way of turning multiple subscribers into a single subscriber.  That’s what the Publish method does: converts an observable into a connectable observable.  This makes the Main method much more readable:

static void Main() {
    var x = new NumberSource().ObserveOn(Scheduler.ThreadPool).Publish();
    x.Subscribe(n => Console.WriteLine("A" + n));
    x.Subscribe(n => Console.WriteLine("B" + n));
    x.Connect();
    Console.ReadKey();
}

If you run the code, you’ll observe that the As and Bs come in lock step.  This is a direct result of the way that publish works: it turns two subscriptions into one.  Publish does pretty much everything you need, so it’s highly unlikely you’ll ever actually implement IConnectableObservable yourself.  Simply write an observable that starts when it receives a subscription and let publish handle everything for you.  Try not to confuse Publish in Retlang with Publish in Rx…

Technorati Tags: ,,