Tweaking the Retlang Queue Channel

Okay, it’s time for one of my Friday code dumps.  I’ll warn you now: this post is ten pages long and I’m not really happy with it.  It’s probably going to continue to develop over time.  However, one of the things I like about Clojure is something I like about Retlang: the code is short.  As developers, we know you can’t measure productivity, but still, we tend to associate large amounts of code with large amount of functionality, which in turn we associate with “better”.

Retlang, on the other hand, is full of short pieces of code that do powerful and useful things.  The QueueChannel is an example of this.  Conceptually similar to nServiceBus’s distributor, it’s a pure load balancing solution: messages get processed on the first available queue.  However, a hardware load balancer typically does a bit more than this.  In particular, users tend to get associated with particular servers, which allows local caching to be effective.  Good servers take advantage of this for performance, but don’t rely on it.

Partition and Sum

One of the standard tutorials for Retlang is a lock free way of summing a large number of asynchronously produced numbers.  This basically works by single-threading the addition in the SummationSink.  Let’s complicate the example a bit:

  • You are receiving a large number of executions.
  • Executions have an order reference and a quantity
  • You need a sum of the quantities per order.
  • For the sake of argument, imagine that adding these numbers took significant time.

Now, ignoring the obvious plinq implementation of this, how would a reactive implementation of this problem look?

using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace LoadBalancer {
    class Execution {
        public int Quantity { get; set; }
        public int OrderId { get; set; }
    }

    public class SumExecutionsService {
        Dictionary<int, int> sumByOrderId = new Dictionary<int, int>();

        internal void Add(Execution value) {
            if (sumByOrderId.ContainsKey(value.OrderId)) {
                sumByOrderId[value.OrderId] += value.Quantity;
            } else {
                sumByOrderId[value.OrderId] = value.Quantity;
            }
            Thread.Sleep(value.Quantity);
        }

        public IEnumerable<int> OrderIds() {
            return sumByOrderId.Keys;
        }

        public long Sum() {
            return sumByOrderId.Values.Sum();
        }
    }
}

 

Ripping off the code for the summation tutorial would give us a working example.  Except that you’ve missed an important non-functional requirement: the actual summation is taking too long.  Using QueueChannel would make it faster, but you’d end up with orders being processed on multiple threads by multiple services.  (In this case you could patch up afterwards, but let’s assume you’re doing something where the operations don’t commute.)

So, what you want is something where you can tell which service/fiber is handling which request and keep it there.

using System.Collections.Generic;
using Retlang.Core;

namespace LoadBalancer {
    public interface IBalancer<T> {
        IDisposingExecutor Appropriate(T value);
        void Assigned(T value, IDisposingExecutor fiber);
    }

    internal class NullBalancer<T> : IBalancer<T>
    {
        public IDisposingExecutor Appropriate(T value) {
            return null;
        }

        public void Assigned(T value, IDisposingExecutor fiber) {
            return;
        }
    }


    class ExecutionBalancer : IBalancer<Execution> {
        Dictionary<int, IDisposingExecutor> fibersByOrderId = new Dictionary<int, IDisposingExecutor>();

        public IDisposingExecutor Appropriate(Execution value) {
            IDisposingExecutor result;
            return fibersByOrderId.TryGetValue(value.OrderId, out result) ? result : null;
        }

        public void Assigned(Execution value, IDisposingExecutor fiber) {
            fibersByOrderId[value.OrderId] = fiber;
        }
    }
}

So, what we want now, is an implementation of QueueChannel that takes a balancer as a constructor.  The behaviour when it’s given a null balancer should be unchanged from the original implementation.

A Sticky Queue Channel

So, here’s the modified version of QueueChannel.  It’d be nice to be able to unpick it from the consumer, but at the moment they’re pretty tightly coupled.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Retlang.Channels;
using Retlang.Core;

namespace LoadBalancer
{
    public class BalancingQueueChannel<T> : IQueueChannel<T> {
        private readonly IBalancer<T> _balancer;
        private readonly Queue<T> _globalQueue = new Queue<T>();
        private readonly Dictionary<IDisposingExecutor, Queue<T>> _localQueues = new Dictionary<IDisposingExecutor, Queue<T>>();
        AutoResetEvent waitHandle = null;
        internal event Action SignalEvent;

        public BalancingQueueChannel(IBalancer<T> balancer) {
            _balancer = balancer;
        }

        public IUnsubscriber Subscribe(IDisposingExecutor executor, Action<T> onMessage) {
            lock (_localQueues) {
                if (!_localQueues.ContainsKey(executor)) {
                    _localQueues[executor] = new Queue<T>();
                }
            }
            var consumer = new BalancingQueueConsumer<T>(executor, onMessage, this);
            consumer.Subscribe();
            return consumer;
        }

        internal bool Pop(IDisposingExecutor destination, out T msg) {
            bool hasTransferredToLocalQueue = false;
            lock (_localQueues) {
                try {
                    var localQueue = _localQueues[destination];
                    if (localQueue.Count > 0) {
                        msg = localQueue.Dequeue();
                        return true;
                    }
                    // Not found in local queue, now it's time to process the global queue
                    lock (_globalQueue) {
                        while (_globalQueue.Count > 0) {
                            if (_globalQueue.Count == 1) {
                                int x = 0;
                            }
                            T candidateMessage = _globalQueue.Dequeue();
                            var fiber = _balancer.Appropriate(candidateMessage);
                            if (fiber == null || fiber == destination) {
                                _balancer.Assigned(candidateMessage, destination);
                                msg = candidateMessage;
                                return true;
                            }
                            hasTransferredToLocalQueue = true;
                            _localQueues[fiber].Enqueue(candidateMessage);
                        }
                        msg = default(T);
                        return false;
                    }
                } finally {
                    if (!hasTransferredToLocalQueue) {
                        CheckQueueEmpty();
                    }
                }
            }
        }

        private void CheckQueueEmpty() {
            if (waitHandle != null && _localQueues.All(l => l.Value.Count == 0)) {
                lock (_globalQueue) {
                    if (_globalQueue.Count == 0) {
                        waitHandle.Set();
                    }
                }
            }
        }

        internal int Count() {
            lock (_localQueues) {
                lock (_globalQueue) {
                    return _globalQueue.Count + _localQueues.Sum(x => x.Value.Count);
                }
            }
        }

        internal int Count(IDisposingExecutor executor) {
            lock (_localQueues) {
                lock (_globalQueue) {
                    return _globalQueue.Count + _localQueues[executor].Count;
                }
            }
        }

        public void FlushAndWait() {
            waitHandle = new AutoResetEvent(false);
            lock (_localQueues) {
                CheckQueueEmpty();
            }
            waitHandle.WaitOne();
            var queues = _localQueues.Select(x => {
                var handle = new AutoResetEvent(false);
                x.Key.Enqueue(() => handle.Set());
                return handle;
            }).ToArray();
            WaitHandle.WaitAll(queues);

        }
        /// <summary>
        public void Publish(T message) {
            lock (_globalQueue) {
                _globalQueue.Enqueue(message);
            }
            var onSignal = SignalEvent;
            if (onSignal != null) {
                onSignal();
            }
        }
    }

    internal class BalancingQueueConsumer<T> : IUnsubscriber {
        private bool _flushPending;
        private readonly IDisposingExecutor _target;
        private readonly Action<T> _callback;
        private readonly BalancingQueueChannel<T> _channel;

        public BalancingQueueConsumer(IDisposingExecutor target, Action<T> callback, BalancingQueueChannel<T> channel) {
            _target = target;
            _callback = callback;
            _channel = channel;
        }

        public void Signal() {
            lock (this) {
                if (_flushPending) {
                    return;
                }
                _target.Enqueue(ConsumeNext);
                _flushPending = true;
            }
        }

        private void ConsumeNext() {
            try {
                T msg;
                if (_channel.Pop(_target, out msg)) {
                    _callback(msg);
                }
            } finally {
                lock (this) {
                    if (_channel.Count(_target) == 0) {
                        _flushPending = false;
                    } else {
                        _target.Enqueue(ConsumeNext);
                    }
                }
            }
        }

        public void Dispose() {
            _channel.SignalEvent -= Signal;
        }

        internal void Subscribe() {
            _channel.SignalEvent += Signal;
        }
    }
}

 

You’ll notice the addition of FlushAndWait.  Retlang typically doesn’t have operations for checking that something is finished.  This reflects a real-time bias in the code: I imagine the original use cases simply shut down without clearing their queues.  However, it’s pretty useful functionality for testing purposes.  It’s also useful if you ever wanted to rebalance the queue: you’d have to stop processing first, then reset your balancer and continue.

Let’s Test It

using System;
using System.Linq;
using Retlang.Fibers;

namespace LoadBalancer {
    class Program {
        static void Main()
        {
            // Set up services
            var serviceFibers = Enumerable.Range(0, 16).Select(
                x => new { Service = new SumExecutionsService(), Fiber = new ThreadFiber() }
            ).ToList();
            var balancingQueue = new BalancingQueueChannel<Execution>(new ExecutionBalancer());
            foreach (var serviceFiber in serviceFibers)
            {
                serviceFiber.Fiber.Start();
                balancingQueue.Subscribe(serviceFiber.Fiber, serviceFiber.Service.Add);
            }

            // Create the orders
            var random = new Random();
            for (int i = 0; i < 25000; i++)
            {
                balancingQueue.Publish(new Execution {
                    OrderId = random.Next(1, 1000),
                    Quantity = random.Next(1, 10)
                });
            }

            // Wait for the end of the calculation
            balancingQueue.FlushAndWait();

            // Check the results
            var orderIds = serviceFibers.SelectMany(x => x.Service.OrderIds());
            Console.WriteLine("Order IDs were {0}handled by individual services.",
                orderIds.Count() == orderIds.Distinct().Count() ? "" : "NOT ");
            foreach (var service in serviceFibers)
            {
                Console.WriteLine(service.Service.Sum());
            }
        }
    }
}

So why doesn’t nServiceBus do this?

It’s fairly obvious that you could implement something like this in nServiceBus as well.  However, the nature of the application causes additional complications:

  • The balancer would need to be implemented on the distributor.  This isn’t really a problem for Retlang, where everything’s in memory anyway.
  • The distributor can handle multiple message types.  The code above simply can’t do that.
  • Currently the distributor has no state.  State on the QueueChannel isn’t as big a problem, simply because if the process fails both the QueueChannel and it’s consumers lose their state simultaneously.
  • Addressing these would result in significantly more infrastructure around code loading and rebalancing.  Retlang has no code loading other than the CLR.

So, the solution above is good for situations in which your services have important in-memory state that is closely associated with non-commutative messages.  It suffers, on the other hand, from the same issues as sharding: the need to rebalance every so often. 

Problems

As I said before, I’m not 100% happy with this post, not only because it’s 10 pages long.  The stickiness solves some problems and creates others.  In the worst case, you end up with no load balancing at all.  If you’ve got persistent state, you could move orders between fibers.  However, this would require you to check if an order was currently assigned to a particular thread, which in turn would involve tracking a count of currently queued messages per order.  This, in turn, would make IBalancer significantly more complex.  Then there’s the question of dynamic rebalancing, which I can assure you is a real, not a theoretical problem.  Basically, there’s no perfect solution: you’re always going to be balancing scaling, raw performance and potential race conditions.  I suspect this is basically just the CAP theorem in disguise.  Next, I’m going to try an approach based around sharding.  This will trade in memory state for better balancing.

Published by

Julian Birch

Full time dad, does a bit of coding on the side.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s