Running RSpec and Cucumber from C#

You might be wondering why on earth you’d want to do this, but it’s trickier than it looks.  I’ve published a gist for it.  There’s a couple of comments about how to get things set up as well.

The bad news is, RSpec works, but Cucumber doesn’t.  Instead I’m getting this:

can't convert Array into java::util::List (TypeError) 
C:/Program Files/IronRuby 1.0v4/lib/ironruby/gems/1.8/gems/gherkin-2.0.2-universal-dotnet/lib/gherkin/native/ikvm.rb:37:in `new' 
C:/Program Files/IronRuby 1.0v4/lib/ironruby/gems/1.8/gems/cucumber-0.8.3/bin/../lib/cucumber/cli/configuration.rb:29:in `parse!' 
C:/Program Files/IronRuby 1.0v4/lib/ironruby/gems/1.8/gems/cucumber-0.8.3/bin/../lib/cucumber/cli/main.rb:94:in `configuration' 
C:/Program Files/IronRuby 1.0v4/lib/ironruby/gems/1.8/gems/cucumber-0.8.3/bin/../lib/cucumber/cli/main.rb:43:in `execute!' 
C:/Program Files/IronRuby 1.0v4/lib/ironruby/gems/1.8/gems/cucumber-0.8.3/bin/../lib/cucumber/cli/main.rb:25:in `execute' 
C:/Program Files/IronRuby 1.0v4/lib/ironruby/gems/1.8/gems/cucumber-0.8.3/bin/cucumber:8 
C:Program FilesIronRuby 1.0v4bincucumber:19:in `load' 
C:Program FilesIronRuby 1.0v4bincucumber:19 

Yes, you’re reading the first line correctly.  The downside of IKVM, of course, is Java Exceptions that mean nothing to you.  This is quite frustrating for me, because I really wanted to start using Cucumber.  So if anyone’s managed to crack this, drop me a line.  Believe me, I’ve tried enough different versions of the gherkin and cucumber gems to last a lifetime.

On the other hand, I’m rather enjoying RSpec.

Shard Balancer: Another Crack at QueueChannel

For those of you bored with this problem, I promise this is the last time I post about this.  That’s because I’ve finally come up with a solution I actually like.  Here, the model is that the message order still matters, but the state is in a database, rather than in memory.  This allows us to change our minds about which thread runs which orders.  I got the idea from reading about sharding and resharding strategies.  One of the approaches to resharding is to assign everything a “virtual shard” where there are a lot of virtual shards and only a few physical shards.  Then, when you need to expand, you can move virtual shards between physical shards.

Here, the “virtual shard” is the identification of the order.  The “physical shard” is the Retlang fiber.  Only one virtual shard is assigned to a physical shard at any given time, so the metaphor breaks down a bit.  Nonetheless, the basic idea is that when a worker is ready, it gets assigned all of the messages for a shard.  Then that shard is locked, so that messages just back up until the worker is finished.  After that point, any new messages can be assigned to a worker, and the worker is available for work again.

This is actually pretty similar to the keyed batch concept in Retlang, which led me to support subscriptions using Action<K, IEnumerable<T>> as well as Action<T>.  The former could, for instance, choose to only commit its changes at the end of a batch.  To support this use case properly, I needed to introduce a maximum batch size as well.  The batch size does matter: the larger the batch, the more efficient the code is.  If you don’t need batching at all, you can always set it to int.MaxValue.

Performance

If you take a look at the test code, you’ll see it calculates a rough “efficiency” number.  That’s the time it took compared to the fastest possible.  In my tests, I’m seeing 98% efficiency.  There’s a couple of commented out lines that test QueueChannel in the same manner.  That shows a figure of 99%, so the order preservation and batching does impose some overhead.

Going back to the model of the system, we assumed that there was no in memory state.  In practice, that doesn’t have to be the case.  The shard itself can be mutable.  In fact, if you’re tracking renamed objects (e.g. cancel/replaces in Fix) you’re going to be doing this the whole time.  You can modify the shard during processing, but if you modify state during processing and assignment you’re going to need a lock.  This is much more lock-free parallelism than I’d hoped to achieve with this approach, and pretty much renders the previous approach irrelevant.

A Walk Around The Code

I’ve decided to try publishing the code as a gist this time.  It’s certainly less effort for me to do than the usual HTML code, but it’s not as flexible: the syntax colouring isn’t up to much and there’s no control over the presentation of the files.  So, the implementation can be found in the following files:

  • ShardBalancer.cs
  • ShardConsumer.cs
  • IConsumer.cs

Arguably, IConsumer doesn’t need to exist.  There’s extremely tight coupling between the balancer and the consumer so the only purpose it serves is to highlight how the balancer calls to the consumer.  I wrote this version from scratch, which explains the change in coding style, but the basic pattern is still Retlang’s hungry consumer implementation.

The test harness consists of:

  • Program.cs
  • Execution.cs
  • SummationService.cs
  • OrderDb.cs

Observations

I think I’ve finally come up with a solution to the problem with which I’m happy.  This solution actually balances very well whilst still observing necessary message ordering.  If you watch it in action, it actually behaves in a similar to processor assignment of processors at the OS level.  I’m particularly pleased that it can be used to batch messages, which could enable some problems to outperform a QueueChannel implementation under the right circumstances.

  • The balancer has two locks.  It’s extremely important that the implementation of Wakeup does no work itself.  Otherwise the solution would deadlock.
  • If KeyValuePair<T, V> was immutable, the batch dequeue could return KeyValuePair<TShard, IEnumerable<TValue>>.  This highlights a problem that’s going to be with us for a long time: the APIs were never built with the new type system in mind.
  • The DataToActions code is a bit on the unreadable side, but the alternative is two different implementations of ShardConsumer, which didn’t appeal.
  • The mechanism to empty the queue is vastly more elegant than the FlushAndWait implementation in the previous version.  EmptyEvent isn’t threadsafe in .NET 4.
  • The code needs a trivial change to compile with Retlang trunk.

You could experiment with switching strategies if the number of queued shards was less than the number of available threads.  Instead of choosing randomly, you could pick the longest queue and throw the entire thing onto a thread.  This is particularly attractive if you’re working with a terminating process as this would speed up the process of closing down at the end of the day.  Something that this code does badly, as does Retlang in general, is report exactly what’s going on internally.  I’m going to see if I can come up with a solution for that.  Don’t hold your breath, though 🙂

Technorati Tags: ,

A Future for Inversion of Control Containers

.NET has a lot of IoC containers.  This is undoubtedly a good thing: diversification is a sign of strength in the ecosystem.  MEF is showing how specialization can be used to solve specific issues.  I’m going to try to set out one vision of how IoC containers should evolve.  This is not my opinion of how they all should evolve, it’s simply my vision of what I would like from one of the many containers.

I’ve argued for some time that containers are all about configuration.  If you don’t believe that, then you should consider why Yadic isn’t your favourite container.  I’ve also been agitating for some time to get environmental deltas included in Castle Windsor, but actually this doesn’t even go far enough.  Let’s talk about what’s still wrong:

  • There’s no standard way to specify external configuration.  (i.e. you can’t separate configuration from wiring)
  • There’s no standard way to examine the runtime configuration.
  • There’s no standard way to report of the validity of the runtime configuration.
  • There’s no standard way of having a centralised configuration source.
  • There’s no sensible way of changing the configuration other than hacking files around.
  • You can’t change the runtime configuration and have that reflected in the state of the container.  Live.

Binsor achieves the first in a general way, but in a fashion that isn’t really acceptable.  nServiceBus thinks this through a lot, but only solves it for the case of using nServiceBus.  StructureMap provides the second at an API level, but not at an ops level. 

Live Diagnostics

StructureMap has, at the time of writing, much better diagnostics than Castle Windsor, which still firmly believes in the “try activating the component and see if it throws an exception” model.  However, why can’t we do the following?

  • Ability to embed a web server in your application.  Seriously, why do we keep writing remoting+winforms applications just to do simple diagnostics?
  • Ability to see all of the live configuration options.
  • The ability to modify the live configuration options.
  • Ability to see which configuration settings have issues.

You might think I’m talking futurology here, but Sam Newman and his mates have already built it.  (You should watch the entire talk.)  There’s no reason that work can’t be generalised in an opinionated manner that requires developers to think about these issues up front.

A possible objection is that this functionality is pretty much orthogonal to what containers do right now.  That’s correct to a certain extent, but it ignores a fundamental issue: every container I’ve used (let’s ignore MEF for now) works on the basis that, ultimately, the configuration for an application can be expressed as an XML file.  This expresses itself in a number of ways:

  • Things are strings that shouldn’t be.
  • Configuration can’t change during the lifetime of the container.
  • The container can’t recreate objects based on a change in configuration.  There’s no model for even expressing how that lifecycle should work.

Viewed from a certain angle, IoC containers at the moment are basically better XML configuration files.  There’s no reason they can’t be a better WMI.

DR is for Disaster Recovery

I was having a drink with a supplier the other day and he was explaining that during the outage “all of our settings had been replicated to DR, so we couldn’t switch to DR either.”  The implication was that automatically replicating settings to DR was a bad idea, precisely to avoid a scenario such as this.  Personally, I think it’s more about understanding purpose.  What do you want to do with your DR solution?

  • Deal with a complete physical loss of your primary site?
  • Handle hardware failure on one of your machines?
  • Handle a software failure on one of your systems?

Of these, only the first is actually disaster recovery.  The second is known as redundancy and the third rollback.  “Switching to DR” is intended for disaster recovery scenarios only.  The problem is, these are all problems and all too often we try to solve all three at once under the heading of “DR” without actually thinking about what the letters stand for.  Sadly, this tends to lead to solutions that achieve none of the aims.  It’s not a question of balancing competing priorities here, it’s a question of recognizing that the problems are separate and so should their solutions.

Technorati Tags:

Understand What You Measure

I never really intended to start blogging about my work kitchen, but it keeps providing me with anecdotes.  Today, I was thinking about the milk.  There are six fridges in the office, and typically you’ll find that they each have eight semi-skimmed jugs of milk.  There’s also a single jug of skimmed milk.  Sometimes there’s none.  So, someone somewhere has made a decision as to the appropriate amount of skimmed and semi-skimmed milk to provide.  Now we enter the realms of pure speculation, but bear with me.

superman-got-milk-ad-commercial

How do you determine what the mix should be?  Well, the obvious googley answer is: you measure consumption and use that as feedback.  I’m pretty sure that’s what someone at the supplier is doing.  The thing is, there’s an absolutely tiny amount of skimmed milk being supplied, far less than you’d expect given a glance at the supermarket.  So what’s wrong?

Ask anyone who drinks skimmed milk at the office and they can tell you: the skimmed milk tastes bad.  I mean, really bad, so bad you think it’s off.  Typically, the first time you try it you pour it down the sink.  The second time you realize it always tastes like that.  Every so often you see someone new do the exact same thing.

Selling Ice Cream In Winter

Famously, if a government department wants to kill something, they’ll run a study under circumstances that make it look incredibly unpopular.  But even when someone isn’t deliberately trying to manipulate the figures, unexpected factors occur.  The good news is, for user interfaces, there’s an easy answer: learn a bit about usability testing.  With internal systems the problem is harder.  Worst of all is with performance questions.  Just because you’ve reduced one number, doesn’t mean you haven’t increased another.  Ultimately, you need to understand your numbers holistically if you’ve got any hope of doing something other than performance whack-a-mole.

Technorati Tags: ,

What is n, anyway?

I’ve got about 500 CDs, and they’re in a complete mess.  So, I want to sort them.  What sort algorithm would you use?  Ask a computer science graduate and he’d tell you that heapsort is the single threaded algorithm of choice.  If you had people helping you, you could consider mergesort.  Of course, neither of these approaches is actually sensible.  Here’s an idea that actually works:

  • First go through the CDs and sort them into buckets according to the first letter.
  • Insertion sort the CDs belonging to a particular letter.
  • Stick the sorted CDs back on the shelves in order.

So why does our complexity analysis fail for this problem?  The reason is our assumptions are incorrect.  All the analysis of sort algorithms assumes that it is effectively free to move the objects (because they can be pointers).  Hence, all analysis of sorting algorithms optimizes for the number of comparisons With a physical CD to move around, this is no longer the case.  Equally, we assume all tests on our keys are equal.  With sorting CDs, it’s relatively low cost to determine the first letter of the band name.  A full comparison, including ordering by date of publication, is significantly harder.*

Units Matter

The point is, although we tend to ignore this, the units of n matter.  At different times, the performance of a system can be dominated by different factors

  • Number of simultaneous requests
  • Size of most complex workflows (think saga if you’re an nServiceBus guy)
  • Number of workflows
  • Sheer number of messages
  • Memory Consumption

This isn’t theoretical, I’ve got a single system where I’ve needed to optimize on all of these bases at one time or another.    If you want a system to scale, out or up, you’ve got to understand what units you’re dealing with.  Next, you’ve got to track them.  Probably the biggest performance secret of all time is good diagnostics, live and historical.

*There is also the problem that doing heapsort by hand is likely to be error prone, but that’s not the point I’m making.

Technorati Tags:

Tip: Never Orphan a Collection in NHibernate

Some guidelines on how to deal with a collection property in NHibernate:

  • Create a protected empty constructor for the use of NHibernate.  There should be no code in this constructor.
  • Create public constructors with parameters to allow you to create objects within your code.  These should initialize the collection properties, typically as empty collections.  (Use a simple List<T> until you’re more sophisticated.)
  • Never, ever, ever, set the collection property after construction.
  • Don’t allow it to have a public set.*
  • If you need to replace it, clear it and put the new values in.

Why is this so important?  Well, if you load the entity from the database, your collection property won’t be a List<T> anymore: it’ll be an object that NHibernate is using to track the one to many relationship.  If that exact object stops being connected to your entity, NHibernate will get upset and you can’t save your objects anymore. 

Unfortunately, people starting out tend to get an elevator pitch for NHibernate that suggest you can write your entities how you like and then it’ll just work.  That isn’t the case.  If you’re an NHibernate ninja, you’ll be able to break some of these guidelines.  But if you’re having problems, check that your entities follow them.

*Actually, I don’t tend to put public set on any entity.  This is part of my “make methods have names that explain the business process by which the entity changes” strategy.  But that’s a whole different blog post…

Technorati Tags:

LazyWeak: A Low Memory Implementation of Lazy

Lazy<T> is a new class in .NET 4, but I imagine most developers have been running their own versions for years.  It’s used to avoid the fetching of expensive resources.  However, the fundamental assumption is that it’s the fetching of T that is expensive.  Assume, instead, that the cost of fetching/calculating T isn’t your main concern, but the memory storage cost.  For this, you want something with slightly different characteristics:

  • T is computed when needed.
  • T is cached
  • T is thrown away if the memory is at a premium.
  • T can be recreated if necessary
  • The external API should be the same as Lazy<T>

Sadly, the last isn’t quite possible, since Microsoft didn’t create an ILazy<T> while they were there.  (One day someone at Microsoft will read this and figure out where they’ve been going wrong.)  So, this is as close as I can manage:

using System;

/// <summary>
/// Like Lazy, only can recreate the object on demand
/// </summary>
/// <typeparam name="T"></typeparam>
public class LazyWeak<T> 
{
    private static readonly object __noObject = 3;

    private readonly Func<T> _factory;
    private readonly WeakReference _reference;


    public LazyWeak(Func<T> factory, T initial) {
        _factory = factory;
        _reference = new WeakReference(initial);
    }

    public LazyWeak(Func<T> factory) 
    {
        _factory = factory;
        _reference = new WeakReference(__noObject);
    }

    public bool IsValueCreated
    {
        get
        {
            return _reference.IsAlive && !ReferenceEquals(_reference.Target, __noObject);
        }
    }

    public T Value
    {
        get
        {
            var result = _reference.Target;
            if (ReferenceEquals(result, __noObject) || !_reference.IsAlive)
            {
                _reference.Target = result = _factory();
                
            }
            return (T)result;
        }
    }
}

You can create the LazyWeak<T> with an initial value.  This makes no sense in the case of Lazy<T>, but can be pretty useful here.  I’ve also aimed to ensure that the code is thread-safe.  If you spot a bug, let me know.

Finally, a quick test:

public class LazyTest
{
    static void Main()
    {
        int calls = 0;
        Func<int> factory = () => { calls++; return 7; };
        var lazy = new LazyWeak<int>(factory, 7);
        Console.WriteLine(string.Format("{0}:{1}", lazy.Value, calls));
        Console.WriteLine(string.Format("{0}:{1}", lazy.Value, calls));
        Console.WriteLine("GC");
        System.GC.Collect();
        Console.WriteLine(string.Format("{0}:{1}", lazy.Value, calls));
    }
}

There are lots of advantages of relying on the garbage collector, but you lose control over what’s going on. 

So, what’s it good for?  Well, it uses less memory than just storing T or Lazy<T> and it’s faster than repeatedly calling Func<T>.  So, it falls between those two, which may be the performance sweet spot for some problem in your application.  On the other hand, it’s fairly important that T is immutable.  There’s nothing stopping you changing T, but it would revert back to its original state each time the garbage collector ran.

It’s worth mentioning that this is one of many partial solutions to the long queue problem: stick LazyWeak<T> instead of T on the queue.  This can allow you to just keep keys in memory and leave values persisted elsewhere.  If you want to do both, you want a proper external queue.

Tweaking the Retlang Queue Channel

Okay, it’s time for one of my Friday code dumps.  I’ll warn you now: this post is ten pages long and I’m not really happy with it.  It’s probably going to continue to develop over time.  However, one of the things I like about Clojure is something I like about Retlang: the code is short.  As developers, we know you can’t measure productivity, but still, we tend to associate large amounts of code with large amount of functionality, which in turn we associate with “better”.

Retlang, on the other hand, is full of short pieces of code that do powerful and useful things.  The QueueChannel is an example of this.  Conceptually similar to nServiceBus’s distributor, it’s a pure load balancing solution: messages get processed on the first available queue.  However, a hardware load balancer typically does a bit more than this.  In particular, users tend to get associated with particular servers, which allows local caching to be effective.  Good servers take advantage of this for performance, but don’t rely on it.

Partition and Sum

One of the standard tutorials for Retlang is a lock free way of summing a large number of asynchronously produced numbers.  This basically works by single-threading the addition in the SummationSink.  Let’s complicate the example a bit:

  • You are receiving a large number of executions.
  • Executions have an order reference and a quantity
  • You need a sum of the quantities per order.
  • For the sake of argument, imagine that adding these numbers took significant time.

Now, ignoring the obvious plinq implementation of this, how would a reactive implementation of this problem look?

using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace LoadBalancer {
    class Execution {
        public int Quantity { get; set; }
        public int OrderId { get; set; }
    }

    public class SumExecutionsService {
        Dictionary<int, int> sumByOrderId = new Dictionary<int, int>();

        internal void Add(Execution value) {
            if (sumByOrderId.ContainsKey(value.OrderId)) {
                sumByOrderId[value.OrderId] += value.Quantity;
            } else {
                sumByOrderId[value.OrderId] = value.Quantity;
            }
            Thread.Sleep(value.Quantity);
        }

        public IEnumerable<int> OrderIds() {
            return sumByOrderId.Keys;
        }

        public long Sum() {
            return sumByOrderId.Values.Sum();
        }
    }
}

 

Ripping off the code for the summation tutorial would give us a working example.  Except that you’ve missed an important non-functional requirement: the actual summation is taking too long.  Using QueueChannel would make it faster, but you’d end up with orders being processed on multiple threads by multiple services.  (In this case you could patch up afterwards, but let’s assume you’re doing something where the operations don’t commute.)

So, what you want is something where you can tell which service/fiber is handling which request and keep it there.

using System.Collections.Generic;
using Retlang.Core;

namespace LoadBalancer {
    public interface IBalancer<T> {
        IDisposingExecutor Appropriate(T value);
        void Assigned(T value, IDisposingExecutor fiber);
    }

    internal class NullBalancer<T> : IBalancer<T>
    {
        public IDisposingExecutor Appropriate(T value) {
            return null;
        }

        public void Assigned(T value, IDisposingExecutor fiber) {
            return;
        }
    }


    class ExecutionBalancer : IBalancer<Execution> {
        Dictionary<int, IDisposingExecutor> fibersByOrderId = new Dictionary<int, IDisposingExecutor>();

        public IDisposingExecutor Appropriate(Execution value) {
            IDisposingExecutor result;
            return fibersByOrderId.TryGetValue(value.OrderId, out result) ? result : null;
        }

        public void Assigned(Execution value, IDisposingExecutor fiber) {
            fibersByOrderId[value.OrderId] = fiber;
        }
    }
}

So, what we want now, is an implementation of QueueChannel that takes a balancer as a constructor.  The behaviour when it’s given a null balancer should be unchanged from the original implementation.

A Sticky Queue Channel

So, here’s the modified version of QueueChannel.  It’d be nice to be able to unpick it from the consumer, but at the moment they’re pretty tightly coupled.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Retlang.Channels;
using Retlang.Core;

namespace LoadBalancer
{
    public class BalancingQueueChannel<T> : IQueueChannel<T> {
        private readonly IBalancer<T> _balancer;
        private readonly Queue<T> _globalQueue = new Queue<T>();
        private readonly Dictionary<IDisposingExecutor, Queue<T>> _localQueues = new Dictionary<IDisposingExecutor, Queue<T>>();
        AutoResetEvent waitHandle = null;
        internal event Action SignalEvent;

        public BalancingQueueChannel(IBalancer<T> balancer) {
            _balancer = balancer;
        }

        public IUnsubscriber Subscribe(IDisposingExecutor executor, Action<T> onMessage) {
            lock (_localQueues) {
                if (!_localQueues.ContainsKey(executor)) {
                    _localQueues[executor] = new Queue<T>();
                }
            }
            var consumer = new BalancingQueueConsumer<T>(executor, onMessage, this);
            consumer.Subscribe();
            return consumer;
        }

        internal bool Pop(IDisposingExecutor destination, out T msg) {
            bool hasTransferredToLocalQueue = false;
            lock (_localQueues) {
                try {
                    var localQueue = _localQueues[destination];
                    if (localQueue.Count > 0) {
                        msg = localQueue.Dequeue();
                        return true;
                    }
                    // Not found in local queue, now it's time to process the global queue
                    lock (_globalQueue) {
                        while (_globalQueue.Count > 0) {
                            if (_globalQueue.Count == 1) {
                                int x = 0;
                            }
                            T candidateMessage = _globalQueue.Dequeue();
                            var fiber = _balancer.Appropriate(candidateMessage);
                            if (fiber == null || fiber == destination) {
                                _balancer.Assigned(candidateMessage, destination);
                                msg = candidateMessage;
                                return true;
                            }
                            hasTransferredToLocalQueue = true;
                            _localQueues[fiber].Enqueue(candidateMessage);
                        }
                        msg = default(T);
                        return false;
                    }
                } finally {
                    if (!hasTransferredToLocalQueue) {
                        CheckQueueEmpty();
                    }
                }
            }
        }

        private void CheckQueueEmpty() {
            if (waitHandle != null && _localQueues.All(l => l.Value.Count == 0)) {
                lock (_globalQueue) {
                    if (_globalQueue.Count == 0) {
                        waitHandle.Set();
                    }
                }
            }
        }

        internal int Count() {
            lock (_localQueues) {
                lock (_globalQueue) {
                    return _globalQueue.Count + _localQueues.Sum(x => x.Value.Count);
                }
            }
        }

        internal int Count(IDisposingExecutor executor) {
            lock (_localQueues) {
                lock (_globalQueue) {
                    return _globalQueue.Count + _localQueues[executor].Count;
                }
            }
        }

        public void FlushAndWait() {
            waitHandle = new AutoResetEvent(false);
            lock (_localQueues) {
                CheckQueueEmpty();
            }
            waitHandle.WaitOne();
            var queues = _localQueues.Select(x => {
                var handle = new AutoResetEvent(false);
                x.Key.Enqueue(() => handle.Set());
                return handle;
            }).ToArray();
            WaitHandle.WaitAll(queues);

        }
        /// <summary>
        public void Publish(T message) {
            lock (_globalQueue) {
                _globalQueue.Enqueue(message);
            }
            var onSignal = SignalEvent;
            if (onSignal != null) {
                onSignal();
            }
        }
    }

    internal class BalancingQueueConsumer<T> : IUnsubscriber {
        private bool _flushPending;
        private readonly IDisposingExecutor _target;
        private readonly Action<T> _callback;
        private readonly BalancingQueueChannel<T> _channel;

        public BalancingQueueConsumer(IDisposingExecutor target, Action<T> callback, BalancingQueueChannel<T> channel) {
            _target = target;
            _callback = callback;
            _channel = channel;
        }

        public void Signal() {
            lock (this) {
                if (_flushPending) {
                    return;
                }
                _target.Enqueue(ConsumeNext);
                _flushPending = true;
            }
        }

        private void ConsumeNext() {
            try {
                T msg;
                if (_channel.Pop(_target, out msg)) {
                    _callback(msg);
                }
            } finally {
                lock (this) {
                    if (_channel.Count(_target) == 0) {
                        _flushPending = false;
                    } else {
                        _target.Enqueue(ConsumeNext);
                    }
                }
            }
        }

        public void Dispose() {
            _channel.SignalEvent -= Signal;
        }

        internal void Subscribe() {
            _channel.SignalEvent += Signal;
        }
    }
}

 

You’ll notice the addition of FlushAndWait.  Retlang typically doesn’t have operations for checking that something is finished.  This reflects a real-time bias in the code: I imagine the original use cases simply shut down without clearing their queues.  However, it’s pretty useful functionality for testing purposes.  It’s also useful if you ever wanted to rebalance the queue: you’d have to stop processing first, then reset your balancer and continue.

Let’s Test It

using System;
using System.Linq;
using Retlang.Fibers;

namespace LoadBalancer {
    class Program {
        static void Main()
        {
            // Set up services
            var serviceFibers = Enumerable.Range(0, 16).Select(
                x => new { Service = new SumExecutionsService(), Fiber = new ThreadFiber() }
            ).ToList();
            var balancingQueue = new BalancingQueueChannel<Execution>(new ExecutionBalancer());
            foreach (var serviceFiber in serviceFibers)
            {
                serviceFiber.Fiber.Start();
                balancingQueue.Subscribe(serviceFiber.Fiber, serviceFiber.Service.Add);
            }

            // Create the orders
            var random = new Random();
            for (int i = 0; i < 25000; i++)
            {
                balancingQueue.Publish(new Execution {
                    OrderId = random.Next(1, 1000),
                    Quantity = random.Next(1, 10)
                });
            }

            // Wait for the end of the calculation
            balancingQueue.FlushAndWait();

            // Check the results
            var orderIds = serviceFibers.SelectMany(x => x.Service.OrderIds());
            Console.WriteLine("Order IDs were {0}handled by individual services.",
                orderIds.Count() == orderIds.Distinct().Count() ? "" : "NOT ");
            foreach (var service in serviceFibers)
            {
                Console.WriteLine(service.Service.Sum());
            }
        }
    }
}

So why doesn’t nServiceBus do this?

It’s fairly obvious that you could implement something like this in nServiceBus as well.  However, the nature of the application causes additional complications:

  • The balancer would need to be implemented on the distributor.  This isn’t really a problem for Retlang, where everything’s in memory anyway.
  • The distributor can handle multiple message types.  The code above simply can’t do that.
  • Currently the distributor has no state.  State on the QueueChannel isn’t as big a problem, simply because if the process fails both the QueueChannel and it’s consumers lose their state simultaneously.
  • Addressing these would result in significantly more infrastructure around code loading and rebalancing.  Retlang has no code loading other than the CLR.

So, the solution above is good for situations in which your services have important in-memory state that is closely associated with non-commutative messages.  It suffers, on the other hand, from the same issues as sharding: the need to rebalance every so often. 

Problems

As I said before, I’m not 100% happy with this post, not only because it’s 10 pages long.  The stickiness solves some problems and creates others.  In the worst case, you end up with no load balancing at all.  If you’ve got persistent state, you could move orders between fibers.  However, this would require you to check if an order was currently assigned to a particular thread, which in turn would involve tracking a count of currently queued messages per order.  This, in turn, would make IBalancer significantly more complex.  Then there’s the question of dynamic rebalancing, which I can assure you is a real, not a theoretical problem.  Basically, there’s no perfect solution: you’re always going to be balancing scaling, raw performance and potential race conditions.  I suspect this is basically just the CAP theorem in disguise.  Next, I’m going to try an approach based around sharding.  This will trade in memory state for better balancing.