mrjob.job - defining your job

class mrjob.job.MRJob(args=None)

The base class for all MapReduce jobs. See __init__() for details.

One-step jobs

MRJob.mapper(key, value)

Re-define this to define the mapper for a one-step job.

Yields zero or more tuples of (out_key, out_value).

Parameters:
  • key – A value parsed from input.
  • value – A value parsed from input.

If you don’t re-define this, your job will have a mapper that simply yields (key, value) as-is.

By default (if you don’t mess with Protocols):
  • key will be None
  • value will be the raw input line, with newline stripped.
  • out_key and out_value must be JSON-encodable: numeric, unicode, boolean, None, list, or dict whose keys are unicodes.
MRJob.reducer(key, values)

Re-define this to define the reducer for a one-step job.

Yields one or more tuples of (out_key, out_value)

Parameters:
  • key – A key which was yielded by the mapper
  • value – A generator which yields all values yielded by the mapper which correspond to key.
By default (if you don’t mess with Protocols):
  • out_key and out_value must be JSON-encodable.
  • key and value will have been decoded from JSON (so tuples will become lists).
MRJob.combiner(key, values)

Re-define this to define the combiner for a one-step job.

Yields one or more tuples of (out_key, out_value)

Parameters:
  • key – A key which was yielded by the mapper
  • value – A generator which yields all values yielded by one mapper task/node which correspond to key.
By default (if you don’t mess with Protocols):
  • out_key and out_value must be JSON-encodable.
  • key and value will have been decoded from JSON (so tuples will become lists).
MRJob.mapper_init()

Re-define this to define an action to run before the mapper processes any input.

One use for this function is to initialize mapper-specific helper structures.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.mapper_final()

Re-define this to define an action to run after the mapper reaches the end of input.

One way to use this is to store a total in an instance variable, and output it after reading all input data. See mrjob.examples for an example.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.reducer_init()

Re-define this to define an action to run before the reducer processes any input.

One use for this function is to initialize reducer-specific helper structures.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.reducer_final()

Re-define this to define an action to run after the reducer reaches the end of input.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.combiner_init()

Re-define this to define an action to run before the combiner processes any input.

One use for this function is to initialize combiner-specific helper structures.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.combiner_final()

Re-define this to define an action to run after the combiner reaches the end of input.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.mapper_cmd()

Re-define this to define the mapper for a one-step job as a shell command. If you define your mapper this way, the command will be passed unchanged to Hadoop Streaming, with some minor exceptions. For important specifics, see Shell commands as steps.

Basic example:

def mapper_cmd(self):
    return 'cat'
MRJob.reducer_cmd()

Re-define this to define the reducer for a one-step job as a shell command. If you define your mapper this way, the command will be passed unchanged to Hadoop Streaming, with some minor exceptions. For specifics, see Shell commands as steps.

Basic example:

def reducer_cmd(self):
    return 'cat'
MRJob.combiner_cmd()

Re-define this to define the combiner for a one-step job as a shell command. If you define your mapper this way, the command will be passed unchanged to Hadoop Streaming, with some minor exceptions. For specifics, see Shell commands as steps.

Basic example:

def combiner_cmd(self):
    return 'cat'
MRJob.mapper_pre_filter()

Re-define this to specify a shell command to filter the mapper’s input before it gets to your job’s mapper in a one-step job. For important specifics, see Filtering task input with shell commands.

Basic example:

def mapper_pre_filter(self):
    return 'grep "ponies"'
MRJob.reducer_pre_filter()

Re-define this to specify a shell command to filter the reducer’s input before it gets to your job’s reducer in a one-step job. For important specifics, see Filtering task input with shell commands.

Basic example:

def reducer_pre_filter(self):
    return 'grep "ponies"'
MRJob.combiner_pre_filter()

Re-define this to specify a shell command to filter the combiner’s input before it gets to your job’s combiner in a one-step job. For important specifics, see Filtering task input with shell commands.

Basic example:

def combiner_pre_filter(self):
    return 'grep "ponies"'

Multi-step jobs

MRJob.steps()

Re-define this to make a multi-step job.

If you don’t re-define this, we’ll automatically create a one-step job using any of mapper(), mapper_init(), mapper_final(), reducer_init(), reducer_final(), and reducer() that you’ve re-defined. For example:

def steps(self):
    return [self.mr(mapper=self.transform_input,
                    reducer=self.consolidate_1),
            self.mr(reducer_init=self.log_mapper_init,
                    reducer=self.consolidate_2)]
Returns:a list of steps constructed with mr()
classmethod MRJob.mr(*args, **kwargs)

Define a Python step (mapper, reducer, and/or any combination of mapper_init, reducer_final, etc.) for your job.

Used by steps(). (Don’t re-define this, just call it!) See Multi-step jobs for sample usage.

Accepts the following keyword arguments.

Parameters:
  • mapper – function with same function signature as mapper(), or None for an identity mapper.
  • reducer – function with same function signature as reducer(), or None for no reducer.
  • combiner – function with same function signature as combiner(), or None for no combiner.
  • mapper_init – function with same function signature as mapper_init(), or None for no initial mapper action.
  • mapper_final – function with same function signature as mapper_final(), or None for no final mapper action.
  • reducer_init – function with same function signature as reducer_init(), or None for no initial reducer action.
  • reducer_final – function with same function signature as reducer_final(), or None for no final reducer action.
  • combiner_init – function with same function signature as combiner_init(), or None for no initial combiner action.
  • combiner_final – function with same function signature as combiner_final(), or None for no final combiner action.
  • jobconf – dictionary with custom jobconf arguments to pass to hadoop.

This is just a wrapper for MRStep, plus a little logic to support deprecated use of positional arguments.

classmethod MRJob.jar(*args, **kwargs)

Alias for JarStep.

Deprecated since version 0.4.2.

Running the job

classmethod MRJob.run()

Entry point for running job from the command-line.

This is also the entry point when a mapper or reducer is run by Hadoop Streaming.

Does one of:

MRJob.__init__(args=None)

Entry point for running your job from other Python code.

You can pass in command-line arguments, and the job will act the same way it would if it were run from the command line. For example, to run your job on EMR:

mr_job = MRYourJob(args=['-r', 'emr'])
with mr_job.make_runner() as runner:
    ...

Passing in None is the same as passing in [] (if you want to parse args from sys.argv, call MRJob.run()).

For a full list of command-line arguments, run: python -m mrjob.job --help

MRJob.make_runner()

Make a runner based on command-line arguments, so we can launch this job on EMR, on Hadoop, or locally.

Return type:mrjob.runner.MRJobRunner

Parsing output

MRJob.parse_output_line(line)

Parse a line from the final output of this MRJob into (key, value). Used extensively in tests like this:

runner.run()
for line in runner.stream_output():
    key, value = mr_job.parse_output_line(line)

Counters and status messages

MRJob.increment_counter(group, counter, amount=1)

Increment a counter in Hadoop streaming by printing to stderr. If the type of either group or counter is unicode, then the counter will be written as unicode. Otherwise, the counter will be written as ASCII. Although writing non-ASCII will succeed, the resulting counter names may not be displayed correctly at the end of the job.

Parameters:
  • group (str) – counter group
  • counter (str) – description of the counter
  • amount (int) – how much to increment the counter by

Commas in counter or group will be automatically replaced with semicolons (commas confuse Hadoop streaming).

MRJob.set_status(msg)

Set the job status in hadoop streaming by printing to stderr.

This is also a good way of doing a keepalive for a job that goes a long time between outputs; Hadoop streaming usually times out jobs that give no output for longer than 10 minutes.

If the type of msg is unicode, then the message will be written as unicode. Otherwise, it will be written as ASCII.

Setting protocols

MRJob.INPUT_PROTOCOL = <class 'mrjob.protocol.RawValueProtocol'>

Protocol for reading input to the first mapper in your job. Default: RawValueProtocol.

For example you know your input data were in JSON format, you could set:

INPUT_PROTOCOL = JSONValueProtocol

in your class, and your initial mapper would receive decoded JSONs rather than strings.

See mrjob.protocol for the full list of protocols.

MRJob.INTERNAL_PROTOCOL = <class 'mrjob.protocol.JSONProtocol'>

Protocol for communication between steps and final output. Default: JSONProtocol.

For example if your step output weren’t JSON-encodable, you could set:

INTERNAL_PROTOCOL = PickleProtocol

and step output would be encoded as string-escaped pickles.

See mrjob.protocol for the full list of protocols.

MRJob.OUTPUT_PROTOCOL = <class 'mrjob.protocol.JSONProtocol'>

Protocol to use for writing output. Default: JSONProtocol.

For example, if you wanted the final output in repr, you could set:

OUTPUT_PROTOCOL = ReprProtocol

See mrjob.protocol for the full list of protocols.

MRJob.input_protocol()

Instance of the protocol to use to convert input lines to Python objects. Default behavior is to return an instance of INPUT_PROTOCOL.

MRJob.internal_protocol()

Instance of the protocol to use to communicate between steps. Default behavior is to return an instance of INTERNAL_PROTOCOL.

MRJob.output_protocol()

Instance of the protocol to use to convert Python objects to output lines. Default behavior is to return an instance of OUTPUT_PROTOCOL.

MRJob.pick_protocols(step_num, step_type)

Pick the protocol classes to use for reading and writing for the given step.

Parameters:
  • step_num (int) – which step to run (e.g. 0 for the first step)
  • step_type (str) – one of ‘mapper’, ‘combiner’, or ‘reducer’
Returns:

(read_function, write_function)

By default, we use one protocol for reading input, one internal protocol for communication between steps, and one protocol for final output (which is usually the same as the internal protocol). Protocols can be controlled by setting INPUT_PROTOCOL, INTERNAL_PROTOCOL, and OUTPUT_PROTOCOL.

Re-define this if you need fine control over which protocols are used by which steps.

Secondary sort

MRJob.SORT_VALUES = None

Set this to True if you would like reducers to receive the values associated with any key in sorted order (sorted by their encoded value). Also known as secondary sort.

This can be useful if you expect more values than you can fit in memory to be associated with one key, but you want to apply information in a small subset of these values to information in the other values. For example, you may want to convert counts to percentages, and to do this you first need to know the total count.

Even though values are sorted by their encoded value, most encodings will sort strings in order. For example, you could have values like: ['A', <total>], ['B', <count_name>, <count>], and the value containing the total should come first regardless of what protocol you’re using.

See jobconf() and partitioner() for more about how this works.

New in version 0.4.1.

Command-line options

See Defining command line options for information on adding command line options to your job. See Configuration quick reference for a complete list of all configuration options.

MRJob.configure_options()
MRJob.add_passthrough_option(*args, **kwargs)

Function to create options which both the job runner and the job itself respect (we use this for protocols, for example).

Use it like you would use optparse.OptionParser.add_option():

def configure_options(self):
    super(MRYourJob, self).configure_options()
    self.add_passthrough_option(
        '--max-ngram-size', type='int', default=4, help='...')

Specify an opt_group keyword argument to add the option to that OptionGroup rather than the top-level OptionParser.

If you want to pass files through to the mapper/reducer, use add_file_option() instead.

MRJob.add_file_option(*args, **kwargs)

Add a command-line option that sends an external file (e.g. a SQLite DB) to Hadoop:

def configure_options(self):
   super(MRYourJob, self).configure_options()
   self.add_file_option('--scoring-db', help=...)

This does the right thing: the file will be uploaded to the working dir of the script on Hadoop, and the script will be passed the same option, but with the local name of the file in the script’s working directory.

We suggest against sending Berkeley DBs to your job, as Berkeley DB is not forwards-compatible (so a Berkeley DB that you construct on your computer may not be readable from within Hadoop). Use SQLite databases instead. If all you need is an on-disk hash table, try out the sqlite3dbm module.

MRJob.load_options(args)

Load command-line options into self.options.

Called from __init__() after configure_options().

Parameters:args (list of str) – a list of command line arguments. None will be treated the same as [].

Re-define if you want to post-process command-line arguments:

def load_options(self, args):
    super(MRYourJob, self).load_options(args)

    self.stop_words = self.options.stop_words.split(',')
    ...
MRJob.is_mapper_or_reducer()

True if this is a mapper/reducer.

This is mostly useful inside load_options(), to disable loading options when we aren’t running inside Hadoop Streaming.

MRJob.OPTION_CLASS = <class optparse.Option at 0x10144f600>

Job runner configuration

MRJob.job_runner_kwargs()

Keyword arguments used to create runners when make_runner() is called.

Returns:map from arg name to value

Re-define this if you want finer control of runner initialization.

You might find mrjob.conf.combine_dicts() useful if you want to add or change lots of keyword arguments.

MRJob.local_job_runner_kwargs()

Keyword arguments to create create runners when make_runner() is called, when we run a job locally (-r local).

Returns:map from arg name to value

Re-define this if you want finer control when running jobs locally.

MRJob.emr_job_runner_kwargs()

Keyword arguments to create create runners when make_runner() is called, when we run a job on EMR (-r emr).

Returns:map from arg name to value

Re-define this if you want finer control when running jobs on EMR.

MRJob.hadoop_job_runner_kwargs()

Keyword arguments to create create runners when make_runner() is called, when we run a job on EMR (-r hadoop).

Returns:map from arg name to value

Re-define this if you want finer control when running jobs on hadoop.

MRJob.generate_passthrough_arguments()

Returns a list of arguments to pass to subprocesses, either on hadoop or executed via subprocess.

These are passed to mrjob.runner.MRJobRunner.__init__() as extra_args.

MRJob.generate_file_upload_args()

Figure out file upload args to pass through to the job runner.

Instead of generating a list of args, we’re generating a list of tuples of ('--argname', path)

These are passed to mrjob.runner.MRJobRunner.__init__() as file_upload_args.

classmethod MRJob.mr_job_script()

Path of this script. This returns the file containing this class.

Running specific parts of jobs

MRJob.run_job()

Run the all steps of the job, logging errors (and debugging output if --verbose is specified) to STDERR and streaming the output to STDOUT.

Called from run(). You’d probably only want to call this directly from automated tests.

MRJob.run_mapper(step_num=0)

Run the mapper and final mapper action for the given step.

Parameters:step_num (int) – which step to run (0-indexed)

If we encounter a line that can’t be decoded by our input protocol, or a tuple that can’t be encoded by our output protocol, we’ll increment a counter rather than raising an exception. If –strict-protocols is set, then an exception is raised

Called from run(). You’d probably only want to call this directly from automated tests.

MRJob.run_reducer(step_num=0)

Run the reducer for the given step.

Parameters:step_num (int) – which step to run (0-indexed)

If we encounter a line that can’t be decoded by our input protocol, or a tuple that can’t be encoded by our output protocol, we’ll increment a counter rather than raising an exception. If –strict-protocols is set, then an exception is raised

Called from run(). You’d probably only want to call this directly from automated tests.

MRJob.run_combiner(step_num=0)

Run the combiner for the given step.

Parameters:step_num (int) – which step to run (0-indexed)

If we encounter a line that can’t be decoded by our input protocol, or a tuple that can’t be encoded by our output protocol, we’ll increment a counter rather than raising an exception. If –strict-protocols is set, then an exception is raised

Called from run(). You’d probably only want to call this directly from automated tests.

MRJob.show_steps()

Print information about how many steps there are, and whether they contain a mapper or reducer. Job runners (see Runners) use this to determine how Hadoop should call this script.

Called from run(). You’d probably only want to call this directly from automated tests.

We currently output something like MR M R, but expect this to change!

Hadoop configuration

MRJob.HADOOP_INPUT_FORMAT = None

Optional name of an optional Hadoop InputFormat class, e.g. 'org.apache.hadoop.mapred.lib.NLineInputFormat'.

Passed to Hadoop with the first step of this job with the -inputformat option.

If you require more sophisticated behavior, try hadoop_input_format() or the hadoop_input_format argument to mrjob.runner.MRJobRunner.__init__().

MRJob.hadoop_input_format()

Optional Hadoop InputFormat class to parse input for the first step of the job.

Normally, setting HADOOP_INPUT_FORMAT is sufficient; redefining this method is only for when you want to get fancy.

MRJob.HADOOP_OUTPUT_FORMAT = None

Optional name of an optional Hadoop OutputFormat class, e.g. 'org.apache.hadoop.mapred.FileOutputFormat'.

Passed to Hadoop with the last step of this job with the -outputformat option.

If you require more sophisticated behavior, try hadoop_output_format() or the hadoop_output_format argument to mrjob.runner.MRJobRunner.__init__().

MRJob.hadoop_output_format()

Optional Hadoop OutputFormat class to write output for the last step of the job.

Normally, setting HADOOP_OUTPUT_FORMAT is sufficient; redefining this method is only for when you want to get fancy.

MRJob.JOBCONF = {}

Optional jobconf arguments we should always pass to Hadoop. This is a map from property name to value. e.g.:

{'stream.num.map.output.key.fields': '4'}

It’s recommended that you only use this to hard-code things that affect the semantics of your job, and leave performance tweaks to the command line or whatever you use to launch your job.

MRJob.jobconf()

-jobconf args to pass to hadoop streaming. This should be a map from property name to value.

By default, this combines jobconf options from the command lines with JOBCONF, with command line arguments taking precedence.

If SORT_VALUES is set, we also set these jobconf values:

stream.num.map.output.key.fields=2
mapred.text.key.partitioner.options=k1,1

We also blank out mapred.output.key.comparator.class and mapred.text.key.comparator.options to prevent interference from mrjob.conf.

SORT_VALUES can be overridden by JOBCONF, the command line, and step-specific jobconf values.

For example, if you know your values are numbers, and want to sort them in reverse, you could do:

SORT_VALUES = True

JOBCONF = {
  'mapred.output.key.comparator.class':
      'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
  'mapred.text.key.comparator.options': '-k1 -k2nr',
}

If you want to re-define this, it’s strongly recommended that do something like this, so as not to inadvertently disable the jobconf option:

def jobconf(self):
    orig_jobconf = super(MyMRJobClass, self).jobconf()
    custom_jobconf = ...

    return mrjob.conf.combine_dicts(orig_jobconf, custom_jobconf)
MRJob.PARTITIONER = None

Optional Hadoop partitioner class to use to determine how mapper output should be sorted and distributed to reducers. For example: 'org.apache.hadoop.mapred.lib.HashPartitioner'.

If you require more sophisticated behavior, try partitioner().

MRJob.partitioner()

Optional Hadoop partitioner class to use to determine how mapper output should be sorted and distributed to reducers.

By default, returns whatever is passed to --partitioner, or if that option isn’t used, PARTITIONER, or if that isn’t set, and SORT_VALUES is true, it’s set to 'org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner'.

You probably don’t need to re-define this; it’s just here for completeness.

Hooks for testing

MRJob.sandbox(stdin=None, stdout=None, stderr=None)

Redirect stdin, stdout, and stderr for automated testing.

You can set stdin, stdout, and stderr to file objects. By default, they’ll be set to empty StringIO objects. You can then access the job’s file handles through self.stdin, self.stdout, and self.stderr. See Testing jobs for more information about testing.

You may call sandbox multiple times (this will essentially clear the file handles).

stdin is empty by default. You can set it to anything that yields lines:

mr_job.sandbox(stdin=StringIO('some_data\n'))

or, equivalently:

mr_job.sandbox(stdin=['some_data\n'])

For convenience, this sandbox() returns self, so you can do:

mr_job = MRJobClassToTest().sandbox()

Simple testing example:

mr_job = MRYourJob.sandbox()
assert_equal(list(mr_job.reducer('foo', ['bar', 'baz'])), [...])

More complex testing example:

from StringIO import StringIO

mr_job = MRYourJob(args=[...])

fake_input = '"foo"\t"bar"\n"foo"\t"baz"\n'
mr_job.sandbox(stdin=StringIO(fake_input))

mr_job.run_reducer(link_num=0)
assert_equal(mr_job.parse_output(), ...)
assert_equal(mr_job.parse_counters(), ...)
MRJob.parse_output(protocol=None)

Convenience method for parsing output from any mapper or reducer, all at once.

This helps you test individual mappers and reducers by calling run_mapper() or run_reducer(). For example:

mr_job.sandbox(stdin=your_input)
mr_job.run_mapper(step_num=0)
output = mrjob.parse_output()
Parameters:protocol (protocol) – A protocol instance to use (e.g. JSONProtocol()),

This only works in sandbox mode. This does not clear self.stdout.

MRJob.parse_counters(counters=None)

Convenience method for reading counters. This only works in sandbox mode. This does not clear self.stderr.

Returns:a map from counter group to counter name to amount.

To read everything from self.stderr (including status messages) use mrjob.parse.parse_mr_job_stderr().

When writing unit tests, you may find MRJobRunner.counters() more useful.