Thinking MapReduce with Hadoop

Maarten Winkels

Apache Hadoop promises "a software platform that lets one easily write and run applications that process vast amounts of data". Sure enough, when reading the documentation, descriptions like:

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

Are simple enough to read and understand, but how do you apply MapReduce to a problem you face in a real-life project?

This blog tries to give some insight into how to apply MapReduce with Hadoop.

What is Hadoop?

Hadoop is basically two things:

  1. A distributed file system -- HDFS
  2. A MapReduce framework that allows algorithms to work on data in the distributed file system in parallel

The Hadoop Distributed File System (HDFS) is really the heart of Hadoop. It provides scalability, reliability and performance at a low cost. The system is designed to run on commodity hardware. Although the system is written in Java, there are other ways to access and use it.

MapReduce is a software framework that allows computation to run on a cluster. It uses HDFS for data-proximity: The computation will be distributed and run in parallel on the cluster and each process will access and process data that is available locally on its node. This gives a major performance boost. Furthermore the framework provides reliability, because processes that fail will automatically restarted at other nodes.

When to apply MapReduce?

MapReduce is only useful for systems that process large amounts of data. There is a overhead for starting tasks and there is always the network overhead. When talking about large amounts of data we mean GBs and above rather then MBs.

Another important aspect is the data usage pattern in your application. If you need to 'randomly' read and write data, for example based on a certain request coming in, MapReduce cannot help you. MapReduce really shines when data is read in a batch-like streaming manner.

The final requirement for using MapReduce is that the algorithm can be described as a map-and-reduce process. In this blog I want to focus on this last aspect.

How to apply MapReduce? - Another example

So how do you describe an algorithm in a MapReduce manner? To illustrate, nothing works better than an example. As with every example that I have seen for Hadoop, it is a bit academic. What I'm trying to explain is how a MapReduce algorithm is different from a normal approach and how to go about designing that algorithm.

The main thing with a MapReduce algorithm is that it reasons about < key , value > pairs all along, from the input format to the output format, if necessary using synthetic keys. If your input is a simple flat file, it will by default break it up on line ends and provide the offset into the file as key and the line as value. The main strength of the algorithm lies with the fact that between the map and the reduce phase, it will sort the data by key. The framework will then provide all data with the same key to the same reducer instance. Any successful MapReduce algorithm should leverage this mechanism.

The problem - Finding Anagrams
Say you have to find Anagrams in a very large input file. How would implement this?

I think a first attempt would have some sort of function like this:

  public static boolean isAnagram(String first, String second) {
    // Checks that the two inputs are anagrams, by checking they have all the same characters.
    // Left as exercise for the user...
  }

The application would have to somehow execute this function on all pairs of words in the input. However fast this method would be, the overall execution would still take quite some time.

Hadoopifying...
How do you now design a MapReduce algorithm that will give the desired answer? The key lies in finding a function that will produce the same key for all words that are anagrams. Applying this in the map phase will use the power of the MapReduce framework to deliver all words that are anagrams to the same reducer. The solution, when found, is deceivingly simple as usual:

  public static String sortCharacters(String input) {
    char[] cs = input.toCharArray();
    Arrays.sort(cs);
    return new String(cs);
  }

By sorting all the characters in all the words in the input, all anagrams will have the same key:

  aspired -> adeiprs
  despair -> adeiprs

Now the list of characters to the right has no meaning, but all anagrams will have exactly the same result for this function.

Implementation
Once the algorithm is found the implementation using Hadoop is quite straightforward and simple (though pretty long...).

public class AnagramFinder extends Configured implements Tool {

  public static class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text> {

    private Text sortedText = new Text();
    private Text outputValue = new Text();

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      StringTokenizer tokenizer = new StringTokenizer(value.toString(),
          " \t\n\r\f,.:()!?", false);
      while (tokenizer.hasMoreTokens()) {
        String token = tokenizer.nextToken().trim().toLowerCase();
        sortedText.set(sort(token));
        outputValue.set(token);
        context.write(sortedText, outputValue);
      }
    }

    protected String sort(String input) {
      char[] cs = input.toCharArray();
      Arrays.sort(cs);
      return new String(cs);
    }

  }

  public static class Combiner extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {

    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      Set<Text> uniques = new HashSet<Text>();
      for (Text value : values) {
        if (uniques.add(value)) {
          context.write(key, value);
        }
      }
    }
  }

  public static class Reducer extends org.apache.hadoop.mapreduce.Reducer<Text, Text, IntWritable, Text> {

    private IntWritable count = new IntWritable();
    private Text outputValue = new Text();

    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      Set<Text> uniques = new HashSet<Text>();
      int size = 0;
      StringBuilder builder = new StringBuilder();
      for (Text value : values) {
        if (uniques.add(value)) {
          size++;
          builder.append(value.toString());
          builder.append(',');
        }
      }
      builder.setLength(builder.length() - 1);

      if (size > 1) {
        count.set(size);
        outputValue.set(builder.toString());
        context.write(count, outputValue);
      }
    }

  }

  public int run(String[] args) throws Exception {
    Path inputPath = new Path(args[0]);
    Path outputPath = new Path(args[1]);

    Job job = new Job(getConf(), "Anagram Finder");

    job.setJarByClass(AnagramFinder.class);

    FileInputFormat.setInputPaths(job, inputPath);
    FileOutputFormat.setOutputPath(job, outputPath);

    job.setMapperClass(Mapper.class);
    job.setCombinerClass(Combiner.class);
    job.setReducerClass(Reducer.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    
    return job.waitForCompletion(false) ? 0 : -1;
  }

  public static void main(String[] args) throws Exception {
    System.exit(ToolRunner.run(new Configuration(), new AnagramFinder(), args));
  }
}

The main parts of the implementation are the following:

Mapper - Breaks up the input text in tokens (filtering some common punctuation marks) and applies the character sorting to arrive at the required key.
Combiner (optional) - Removes duplicate values from the input.
Reducer - Collects anagrams and outputs the number of anagrams (key) and all the words concatenated (value).
Main and Run - This code configures the job to run on the MapReduce framework.

The Combiner is used to do some preprocessing for the reducer. The main reason for this is that results from Mappers that are run on different nodes will be processed on the same node and will thus have to travel the network. To minimize network load, the Combiner might reduce the number of < key , value > pairs that will be processed, as shown here by filtering out duplicates. The process can however not rely on all < key , value > pairs to be processed by a single Combiner, so the Reducer will also have to remove duplicates.

It is interesting to see that the concept Anagram doesn't materialize anywhere in this code. The fact that the code finds anagrams follows from the fact that all anagrams will have the same value from the sort function and that is used as the map output key. This might be quite confusing for readers.

Conclusion

The main challenge posed by Hadoop is coming up with a good algorithm for MapReduce applications. The algorithm will mostly be the result of the whole MapReduce process and might not be easy to understand from the code. This is because some of the functionality that the framework provides might be key to the algorithm. Good documentation that describes the whole process is vital to overcome this problem. Once an algorithm is designed, impelementing it in Hadoop is quite straightforward.

Comments (3)

  1. Jonas Bandi - Reply

    July 2, 2009 at 11:27 am

    Thanks a lot for this nice introduction.
    I never had time or need to find my way into this topic. This was a nice kick-start.

  2. Lars Vonk - Reply

    July 3, 2009 at 9:07 am

    Good read, thanks Maarten.

  3. Tony - Reply

    June 16, 2011 at 11:23 am

    This is a good article and very helpful. Beside, I love a phrase in your article such as "Are simple enough to read and understand, but how do you apply MapReduce to a problem you face in a real-life project?"
    I agree with you that. This is also a problem I am meeting.

    Thank you very much for your article, Maarten.

Add a Comment