Understanding MapReduce #1: The Assumptions

I finally had a light-bulb go off in my head about MapReduce.  To be honest, part of the problem with understanding it is that the implementation most of us look at (Hadoop) has a considerable amount of implementation detail visible at all times.  I’m going to try to explain some of the fundamentals behind it in terms of C#, which has two great advantages:

  • I know it
  • LINQ gives us a fairly decent declarative syntax for expressing algorithms declaratively.

Now, as everyone knows, Map and Reduce are lisp terms for project and aggregation respectively.  In LINQ, these are called Select (or SelectMany) and Aggregate (or just “apply function”).  MapReduce simply applies one and then the other.

        public static TResult MapReduce1
            <TResult, TMapped, TInput>
            (
            Func<TInput, TMapped> map,
            Func<IEnumerable<TMapped>, TResult> reduce,
            IEnumerable<TInput> inputs) {

            return reduce(
                from input in inputs
                select map(input)
                );
        }

That’s it!  So why is it so clever?  Well, what google did was to change the assumptions a bit.  The irony is that by adding in more conditions, they actually came up with something more general, not less.  So, let’s take a look at some of those assumptions:

  • The map always returns a list. 
  • The reduce function operates on the same input type as output type.
  • The reduce function is idempotent.  In plain english, if you reduce the output of a reduce, your output will be equal to your input.

The first one’s a gimme.  Returning a list doesn’t make a blind bit of difference.  You could just return one item for every input and you’d be back to the original function.  However, the restriction on the reduce is hugely powerful.  In particular, it allows for the distribution of partial reduces.  I’m not going to show that in code today.

Version 2 of the code looks pretty similar:

        public static IEnumerable<TResult> MapReduce2
            <TResult, TInput>
            (
            Func<TInput, IEnumerable<TResult>> map,
            Func<IEnumerable<TResult>, IEnumerable<TResult>> reduce,
            IEnumerable<TInput> inputs) {

            return reduce(
                from input in inputs
                from mapped in map(input)
                select mapped
                );
        }

We’ve got an extra from to deal with, but otherwise this is pretty tame.  Note that we’ve made the reduce return a list as well.  Again, it doesn’t make much of a difference.  We’ll abstract away the concept of applying a map.

        public static IEnumerable<TResult> MapReduce2b
            <TResult, TInput>
            (
            Func<TInput, IEnumerable<TResult>> map,
            Func<IEnumerable<TResult>, IEnumerable<TResult>> reduce,
            IEnumerable<TInput> inputs) {

            Func<IEnumerable<TInput>, IEnumerable<TResult>> applyMap =
                mapInputs => mapInputs.SelectMany(map);
            return reduce(applyMap(inputs));
        }

Now things get interesting.  MapReduce assumes that you’re using Tuples everywhere.  This is the most important step.  The point is, it groups on the basis of the keys.  We can also use different keys for mapped data and the results of reduces, although the type system restricts how useful that could be.  Now version 3 does look somewhat more complex.

        public class Tuple<TKey, TValue> 
        {
            public TKey Key;
            public TValue Value;
        }

        public static IEnumerable<Tuple<TKey, TValue>> MapReduce3
            <TKey, TValue, TInput>
            (
            Func<TInput, IEnumerable<Tuple<TKey, TValue>>> map,
            Func<TKey, IEnumerable<TValue>, IEnumerable<Tuple<TKey, TValue>>> reduce,
            IEnumerable<TInput> inputs) {
            Func<IEnumerable<Tuple<TKey, TValue>>, IEnumerable<Tuple<TKey, TValue>>> applyReduce =
                results => from result in results
                           group result.Value by result.Key into grouped
                           from reduced in reduce(grouped.Key, grouped)
                           select reduced;
            Func<IEnumerable<TInput>, IEnumerable<Tuple<TKey, TValue>>> applyMap =
                mapInputs => mapInputs.SelectMany(map);
            return applyReduce(applyMap(inputs));
        }

The important bit is the way we’ve redefined the reduce operation.  Now the reduce operation operates on a list of values for a particular key (it can still return whatever it likes).  The applyReduce function demonstrates how this concept of reduce maps onto the old concept of reduce.

The LINQ syntax obscures one thing we’ve overlooked so far: how the grouping actually works.  The Hadoop implementation makes this far from explicit as well.  Hadoop does it by requiring all keys to implement “WriteableComparable”.  The direct translation would be to require TKey to implement IComparable.  However, we’ll go with a more .NET like way of doing things using IEqualityComparer<TKey>.  Here’s version 3 with an IEqualityComparer.

        public static IEnumerable<Tuple<TKey, TValue>> MapReduce4
            <TKey, TValue, TInput>
            (
            Func<TInput, IEnumerable<Tuple<TKey, TValue>>> map,
            Func<TKey, IEnumerable<TValue>, IEnumerable<Tuple<TKey, TValue>>> reduce,
            IEqualityComparer<TKey> groupRule,
            IEnumerable<TInput> inputs) {
            Func<IEnumerable<Tuple<TKey, TValue>>, IEnumerable<Tuple<TKey, TValue>>> applyReduce =
                results => results
                            .GroupBy(result => result.Key, result => result.Value, groupRule)
                            .SelectMany(grouped => reduce(grouped.Key, grouped));
            Func<IEnumerable<TInput>, IEnumerable<Tuple<TKey, TValue>>> applyMap =
                mapInputs => mapInputs.SelectMany(map);
            return applyReduce(applyMap(inputs));
        }

Now, I’ve tried to avoid talking about distribution concerns in this post, but here we’re forced into it.  The results of maps will potentially be transmitted across the network.  Therefore, it makes sense for the grouping to actually occur during the map.  Again, you might not see this in the Hadoop examples as the grouping is actually performed by the OutputCollector.  While we’re here, we’ll observe that the Hadoop standard of taking two inputs to the reduce function doesn’t make much sense in an environment in which IGrouping is a standard concept.  Thus, we can move the grouping call to the map as follows:

        public static IEnumerable<Tuple<TKey, TValue>> MapReduce5
            <TKey, TValue, TInput>
            (
            Func<TInput, IEnumerable<Tuple<TKey, TValue>>> map,
            Func<IGrouping<TKey, TValue>, IEnumerable<Tuple<TKey, TValue>>> reduce,
            IEqualityComparer<TKey> groupRule,
            IEnumerable<TInput> inputs) {
            Func<IEnumerable<IGrouping<TKey, TValue>>, IEnumerable<Tuple<TKey, TValue>>> applyReduce =
                results => results.SelectMany(reduce);
            Func<IEnumerable<TInput>, IEnumerable<IGrouping<TKey, TValue>>> applyMap =
                mapInputs => mapInputs
                    .SelectMany(map)
                    .GroupBy(result => result.Key, result => result.Value, groupRule);
            return applyReduce(applyMap(inputs));
        }

The problem with writing it out like this is that the Func definitions get to be most of the code.  Let’s see it again, simplified:

        public static IEnumerable<Tuple<TKey, TValue>> MapReduce6
            <TKey, TValue, TInput>
            (
                Func<TInput, IEnumerable<Tuple<TKey, TValue>>> map,
                Func<IGrouping<TKey, TValue>, IEnumerable<Tuple<TKey, TValue>>> reduce,
                IEqualityComparer<TKey> groupRule,
                IEnumerable<TInput> inputs) {
            Func<IEnumerable<Tuple<TKey, TValue>>, IEnumerable<IGrouping<TKey, TValue>>> collectOutput =
                mapped => mapped.GroupBy(result => result.Key, result => result.Value, groupRule);
            return collectOutput(inputs.SelectMany(map)).SelectMany(reduce);
        }

Now, Hadoop goes one stage further by insisting that the inputs also be tuples.  It then has a file handling system for generating those tuples from files.  Let us just, for the moment, observe that actually generating the list of inputs may be an expensive operation in itself.  So, we need to be able to deal with batches of inputs.  We’ll leave that problem until next time.

Published by

Julian Birch

Full time dad, does a bit of coding on the side.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s