For those of you bored with this problem, I promise this is the last time I post about this. That’s because I’ve finally come up with a solution I actually like. Here, the model is that the message order still matters, but the state is in a database, rather than in memory. This allows us to change our minds about which thread runs which orders. I got the idea from reading about sharding and resharding strategies. One of the approaches to resharding is to assign everything a “virtual shard” where there are a lot of virtual shards and only a few physical shards. Then, when you need to expand, you can move virtual shards between physical shards.
Here, the “virtual shard” is the identification of the order. The “physical shard” is the Retlang fiber. Only one virtual shard is assigned to a physical shard at any given time, so the metaphor breaks down a bit. Nonetheless, the basic idea is that when a worker is ready, it gets assigned all of the messages for a shard. Then that shard is locked, so that messages just back up until the worker is finished. After that point, any new messages can be assigned to a worker, and the worker is available for work again.
This is actually pretty similar to the keyed batch concept in Retlang, which led me to support subscriptions using Action<K, IEnumerable<T>> as well as Action<T>. The former could, for instance, choose to only commit its changes at the end of a batch. To support this use case properly, I needed to introduce a maximum batch size as well. The batch size does matter: the larger the batch, the more efficient the code is. If you don’t need batching at all, you can always set it to int.MaxValue.
If you take a look at the test code, you’ll see it calculates a rough “efficiency” number. That’s the time it took compared to the fastest possible. In my tests, I’m seeing 98% efficiency. There’s a couple of commented out lines that test QueueChannel in the same manner. That shows a figure of 99%, so the order preservation and batching does impose some overhead.
Going back to the model of the system, we assumed that there was no in memory state. In practice, that doesn’t have to be the case. The shard itself can be mutable. In fact, if you’re tracking renamed objects (e.g. cancel/replaces in Fix) you’re going to be doing this the whole time. You can modify the shard during processing, but if you modify state during processing and assignment you’re going to need a lock. This is much more lock-free parallelism than I’d hoped to achieve with this approach, and pretty much renders the previous approach irrelevant.
A Walk Around The Code
I’ve decided to try publishing the code as a gist this time. It’s certainly less effort for me to do than the usual HTML code, but it’s not as flexible: the syntax colouring isn’t up to much and there’s no control over the presentation of the files. So, the implementation can be found in the following files:
Arguably, IConsumer doesn’t need to exist. There’s extremely tight coupling between the balancer and the consumer so the only purpose it serves is to highlight how the balancer calls to the consumer. I wrote this version from scratch, which explains the change in coding style, but the basic pattern is still Retlang’s hungry consumer implementation.
The test harness consists of:
I think I’ve finally come up with a solution to the problem with which I’m happy. This solution actually balances very well whilst still observing necessary message ordering. If you watch it in action, it actually behaves in a similar to processor assignment of processors at the OS level. I’m particularly pleased that it can be used to batch messages, which could enable some problems to outperform a QueueChannel implementation under the right circumstances.
- The balancer has two locks. It’s extremely important that the implementation of Wakeup does no work itself. Otherwise the solution would deadlock.
- If KeyValuePair<T, V> was immutable, the batch dequeue could return KeyValuePair<TShard, IEnumerable<TValue>>. This highlights a problem that’s going to be with us for a long time: the APIs were never built with the new type system in mind.
- The DataToActions code is a bit on the unreadable side, but the alternative is two different implementations of ShardConsumer, which didn’t appeal.
- The mechanism to empty the queue is vastly more elegant than the FlushAndWait implementation in the previous version. EmptyEvent isn’t threadsafe in .NET 4.
- The code needs a trivial change to compile with Retlang trunk.
You could experiment with switching strategies if the number of queued shards was less than the number of available threads. Instead of choosing randomly, you could pick the longest queue and throw the entire thing onto a thread. This is particularly attractive if you’re working with a terminating process as this would speed up the process of closing down at the end of the day. Something that this code does badly, as does Retlang in general, is report exactly what’s going on internally. I’m going to see if I can come up with a solution for that. Don’t hold your breath, though 🙂