py4sci

Table Of Contents

Previous topic

Mrs MapReduce Documentation

Next topic

Installation

This Page

About Mrs

Mrs (pronounced “missus”) is an implementation of MapReduce. Mrs arose out of the need for a MapReduce framework which is lightweight and easy to deploy, and efficient for computationally intense programs. Most MapReduce frameworks geared towards processing large amounts of data, on dedicated clusters. On the other hand, Mrs is easy to deploy on non-dedicated hardware such as supercomputers or private clusters. Mrs is also well suited for scientific computation. In particular, Mrs has many novel features which boost performance for iterative MapReduce programs. Finally, Mrs is designed to be a flexible and simple system, allowing researchers to quickly develop, test and deploy new ideas, without sacrificing performance.

Design Principles

Mrs is not the only open source MapReduce implementation. As pointed out on its web page, Hadoop is scalable and reliable. However, and without any intended disrespect to Hadoop, it is not particularly simple. While Hadoop is certainly an appropriate tool for large-scale data processing, Mrs intends to be more convenient for research and education.

Here are a few of the principles that guide the development of Mrs (which may be subconsciously inspired by the Zen of Python):

  • Keep it Simple

    One of the great things about the MapReduce model is that it simplifies parallel computation. Mrs tries to be Pythonic rather than Javariffic. The Mrs API requires as little mandatory complexity as possible.

  • Reduce Burden

    Many MapReduce frameworks are difficult to deploy and use. Mrs aims to make it easy to deploy in a variety of environments. In particular, we want Mrs to be easy to use even in super-computing or private cluster environments, as opposed to dedicated MapReduce environments that other tools require.

  • Don’t Repeat Yourself

    The world has many fine job schedulers and filesystems. If Mrs were married to a particular environment, it would not be as flexible. A Mrs program is just a program, not a daemon.

WordCount

The original MapReduce paper presented WordCount as a “Hello, world” example. WordCount simply counts the number of occurrences of each word in the input. The following barebones WordCount examples demonstrate some differences between Hadoop and Mrs.

Mrs WordCount

import mrs

class WordCount(mrs.MapReduce):
    def mapper(self, key, value):
        for word in value.split():
            yield (word, 1)

    def reducer(self, key, values):
        yield sum(values)

if __name__ == '__main__':
    mrs.main(WordCount)

Hadoop WordCount

package org.apache.hadoop.examples;

import java.io.*;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.MapReduceBase;

public class WordCount {

  /**
   * Counts the words in each line.
   * For each line of input, break the line into words and emit them as
   * (word, 1).
   */
  public static class MapClass extends MapReduceBase implements Mapper {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(WritableComparable key, Writable value,
        OutputCollector output,
        Reporter reporter) throws IOException {
      String line = ((Text)value).toString();
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        output.collect(word, one);
      }
    }
  }

  /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class Reduce extends MapReduceBase implements Reducer {

    public void reduce(WritableComparable key, Iterator values,
        OutputCollector output,
        Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += ((IntWritable) values.next()).get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }

  static void printUsage() {
    System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
    System.exit(1);
  }

  /**
   * The main driver for word count map/reduce program.
   * Invoke this method to submit the map/reduce job.
   * @throws IOException When there is communication problems with the
   *                     job tracker.
   */
  public static void main(String[] args) throws IOException {
    JobConf conf = new JobConf(WordCount.class);
    conf.setJobName("wordcount");

    // the keys are words (strings)
    conf.setOutputKeyClass(Text.class);
    // the values are counts (ints)
    conf.setOutputValueClass(IntWritable.class);

    conf.setMapperClass(MapClass.class);
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);

    List other_args = new ArrayList();
    for(int i=0; i < args.length; ++i) {
      try {
        if ("-m".equals(args[i])) {
          conf.setNumMapTasks(Integer.parseInt(args[++i]));
        } else if ("-r".equals(args[i])) {
          conf.setNumReduceTasks(Integer.parseInt(args[++i]));
        } else {
          other_args.add(args[i]);
        }
      } catch (NumberFormatException except) {
        System.out.println("ERROR: Integer expected instead of " + args[i]);
        printUsage();
      } catch (ArrayIndexOutOfBoundsException except) {
        System.out.println("ERROR: Required parameter missing from " +
                           args[i-1]);
        printUsage(); // exits
      }
    }
    // Make sure there are exactly 2 parameters left.
    if (other_args.size() != 2) {
      System.out.println("ERROR: Wrong number of parameters: " +
          other_args.size() + " instead of 2.");
      printUsage();
    }
    conf.setInputPath(new Path((String) other_args.get(0)));
    conf.setOutputPath(new Path((String) other_args.get(1)));

    // Uncomment to run locally in a single process
    // conf.set("mapred.job.tracker", "local");

    JobClient.runJob(conf);
  }

}