Fundamentals

Installation

Install with pip:

pip install mrjob

or from a git clone of the source code:

python setup.py test && python setup.py install

Writing your first job

Open a file called mr_word_count.py and type this into it:

from mrjob.job import MRJob


class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    MRWordFrequencyCount.run()

Now go back to the command line, find your favorite body of text (such mrjob’s README.rst, or even your new file mr_word_count.py), and try this:

$ python mr_word_count.py my_file.txt

You should see something like this:

"chars" 3654
"lines" 123
"words" 417

Congratulations! You’ve just written and run your first program with mrjob.

What’s happening

A job is defined by a class that inherits from MRJob. This class contains methods that define the steps of your job.

A “step” consists of a mapper, a combiner, and a reducer. All of those are optional, though you must have at least one. So you could have a step that’s just a mapper, or just a combiner and a reducer.

When you only have one step, all you have to do is write methods called mapper(), combiner(), and reducer().

The mapper() method takes a key and a value as args (in this case, the key is ignored and a single line of text input is the value) and yields as many key-value pairs as it likes. The reduce() method takes a key and an iterator of values and also yields as many key-value pairs as it likes. (In this case, it sums the values for each key, which represent the numbers of characters, words, and lines in the input.)

Warning

Forgetting the following information will result in confusion.

The final required component of a job file is these two lines at the end of the file, every time:

if __name__ == '__main__':
    MRWordCounter.run()  # where MRWordCounter is your job class

These lines pass control over the command line arguments and execution to mrjob. Without them, your job will not work. For more information, see Hadoop Streaming and mrjob and Why can’t I put the job class and run code in the same file?.

Running your job different ways

The most basic way to run your job is on the command line:

$ python my_job.py input.txt

By default, output will be written to stdout.

You can pass input via stdin, but be aware that mrjob will just dump it to a file first:

$ python my_job.py < input.txt

You can pass multiple input files, mixed with stdin (using the - character):

$ python my_job.py input1.txt input2.txt - < input3.txt

By default, mrjob will run your job in a single Python process. This provides the friendliest debugging experience, but it’s not exactly distributed computing!

You change the way the job is run with the -r/--runner option. You can use -r inline (the default), -r local, -r hadoop, or -r emr.

To run your job in multiple subprocesses with a few Hadoop features simulated, use -r local.

To run it on your Hadoop cluster, use -r hadoop.

If you have Dataproc configured (see Dataproc Quickstart), you can run it there with -r dataproc.

Your input files can come from HDFS if you’re using Hadoop, or GCS if you’re using Dataproc:

$ python my_job.py -r dataproc gcs://my-inputs/input.txt
$ python my_job.py -r hadoop hdfs://my_home/input.txt

If you have Elastic MapReduce configured (see Elastic MapReduce Quickstart), you can run it there with -r emr.

Your input files can come from HDFS if you’re using Hadoop, or S3 if you’re using EMR:

$ python my_job.py -r emr s3://my-inputs/input.txt
$ python my_job.py -r hadoop hdfs://my_home/input.txt

If your code spans multiple files, see Uploading your source tree.

Writing your second job

Most of the time, you’ll need more than one step in your job. To define multiple steps, override steps() to return a list of MRSteps.

Here’s a job that finds the most commonly used word in the input:

from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")


class MRMostUsedWord(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        # optimization: sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        yield None, (sum(counts), word)

    # discard the key; it is just None
    def reducer_find_max_word(self, _, word_count_pairs):
        # each item of word_count_pairs is (count, word),
        # so yielding one results in key=counts, value=word
        yield max(word_count_pairs)


if __name__ == '__main__':
    MRMostUsedWord.run()

Configuration

mrjob has an overflowing cornucopia of configuration options. You’ll want to specify some on the command line, some in a config file.

You can put a config file at /etc/mrjob.conf, ~/.mrjob.conf, or ./mrjob.conf for mrjob to find it without passing it via --conf-path.

Config files are interpreted as YAML if you have the yaml module installed. Otherwise, they are interpreted as JSON.

See Config file format and location for in-depth information. Here is an example file:

runners:
  emr:
    aws-region: us-west-1
    python_archives:
      - a_library_I_use_on_emr.tar.gz
  inline:
    local_tmp_dir: $HOME/.tmp