A Mrs program is a user-specified class that is passed to mrs.main. Although the mrs.MapReduce class implements some common functionality, a MapReduce program is not required to inherit from this class. Any class that specifies __init__ and run methods with the appropriate signatures is a valid Mrs program. These two required methods are:
__init__(self, opts, args) (mandatory)
The __init__ method is called with an opts object (created by optparse) and an args list. These arguments are created on the master, but the __init__ method is called once on each slave (with identical arguments).
run(self, job) (mandatory)
The role of the run method is to submit datasets for evaluation and process their results. It is called with a job object, an instance of mrs.job.Job, which provides methods such as local_data, file_data, map_data, and reduce_data for creating dataset objects and submitting them for asynchronous evaluation. The job also provides wait and progress methods for determining whether datasets are complete.
Additional optional methods are:
update_parser(cls, parser) (class method)
The optional update_parser method, which must be a class method if specified, adds custom command-line arguments to the given option parser (optparse.OptionParser).
partition(self, key, serialized_key, n)
If specified, the partition method serves as the default partition function for map and reduce datasets. See the entry on the parter below for more information.
If the Bypass implementation is specified on the command line, then Mrs invokes the bypass method instead of the run method. This provides a convenient mechanism for including serial and parallel implementations of a program side by side.
The mrs.MapReduce class in the file mrs/mapreduce.py provides simple default implementations for each of the methods used by Mrs. It serves both as a reasonable default and also as an example. Programs can optionally inherit from this class and override any functions as needed.
In its basic form, the mrs.MapReduce class assumes that a MapReduce program consists of a single map phase followed by a single reduce phase. One or more filenames specified on the command-line are used to read text files to input to the map function, and a path provided on the command-line indicates the directory for output using a human-readable file format.
The following additional methods can be overridden in a minimal MapReduce program that uses the mrs.MapReduce class:
map(self, key, value)
A generator which takes a key and a value and yields one or more key-value pairs. See the MapReduce paper for more information about the role of a map function.
reduce(self, key, values)
A generator which takes a key and a value iterator and yields one or more values associated with the key. See the MapReduce paper for more information about the role of a reduce function.
input_data(self, job)
Returns a dataset object that is used as the input to the map dataset. By default, it uses filenames specified on the command-line (all of the positional arguments except the last).
Returns a path that is used for the output directory of the reduce dataset. By default, it uses a path specified on the command-line (the last positional arguments).
The following list details the default behavior for some of the methods provided by the mrs.MapReduce class:
update_parser(cls, parser) (class method)
Modifies and returns the given option parser (from the optparse module). By default, it adds a simple usage statement. Many programs add command-line options or change the usage statement.
run(self, job)
Submits datasets to the mrs.job.Job and optionally waits for and processes the completed datasets. By default, it assumes that there is a single map dataset using the map method and a single reduce dataset using the reduce method. The input to the map dataset is given by input_data, and the output from the reduce dataset is written using the human-readable TextWriter file format to the directory specified by output_dir. A program that is more complex than a simple text processor can override the run method to add any sequence of map and reduce datasets.
Of course, any of these methods may be overridden to customize behavior.
A dataset represents a set of key-value pairs distributed across a set of files. Datasets may be associated with a list of URLs, locally-produced data, or the output of map and reduce tasks executed in parallel. The run method of a MapReduce program submits datasets to the job for asynchronous evaluation. It may also download and process the results of completed datasets. The job provides a set of methods that create and submit datasets:
Creates a FileData dataset associated with a set of URLs.
Creates a dataset from a locally-computed iterator, making the data available for parallel processing.
map_data(input, mapper)
Performs a map operation on an input dataset using the given map function, which is required to be a method of the MapReduce program.
reduce_data(input, reducer)
Performs a reduce operation on an input dataset using the given reduce function, which is required to be a method of the MapReduce program.
reducemap_data(input, reducer, mapper)
Performs a consolidated reduce-map operation on an input dataset using the given reduce and map functions, which are required to be methods of the MapReduce program.
Most types of datasets accept a variety of optional keyword arguments:
An integer indicating the number of output splits. For example, if a local data is used as the input for a map operation, then the number of splits of the local data will dictate the number of map tasks in the subsequent map operation. Likewise, the number of splits of a map operation will determine the number of subsequent reduce tasks.
A string specifying a directory where output data will be stored.
A method of the MapReduce program that is used to partition data to subsequent tasks. See the MapReduce paper for more information.
A partition function takes three arguments: key, serialized_key, and n. Although both the key object and the serialized key are available, it will generally use one or the other. For example, mod_partition expects an integer and uses the unserialized form, while hash_partition uses a hash function that operates on bytes, so it uses the serialized form. The argument n specifies the number of splits and is usually used as a modulus (e.g., return x % n).
combiner (for map and reducemap datasets)
A method of the MapReduce program that serves as a pre-reducer within a map task. See the MapReduce paper for more information.
The job’s progress method reports the fraction of the given dataset that is complete, and its wait method returns when any of the given datasets have completed evaluation (or if the optional timeout has expired).
The following methods of DataSet objects are useful for processing the results of completed datasets:
Closes the dataset to future use and allows any associated temporary files to be deleted as soon as other datasets no longer depend on them.
Downloads all of the key-value pairs associated with the dataset.
Iterates over all of the key-value pairs in the dataset.
The IterativeMR class implements a run method that gives a producer-consumer interface for creating iterative MapReduce programs. In this model, datasets are pipelined. The producer method is called when the number of runnable datasets is low, and the consumer method is called when each dataset completes. This model reduces complexity particularly for programs where processing on the master occurs concurrently with map and reduce tasks (for example, programs with convergence checks).
Nondeterministic results fundamentally make debugging difficult and testing impossible. Setting a random seed is a simple way to make stochastic algorithms deterministic. However, in a MapReduce program, setting a fixed random seed at the beginning of each map or reduce task would make all tasks use the same sequence of random numbers. The mrs.MapReduce class provides a random method that returns a random number generator. The method takes a variable number of integer arguments and ensures that the random number generator is unique for any particular combination of inputs. Because of the large size of the internal state of the Mersenne Twister, the random method can accept around 300 arguments that are each 64-bit integers. This makes it easy to generate a unique random number generator in each task or even to create identical random number generators in different tasks that need to duplicate specific calculations.
By default, inputs and outputs for map and reduce functions are serialized using Python’s Pickle Module. In most cases, this is convenient, but in some cases, performance may be improved by using custom serializers and deserializers, or even no serializers at all. For many applications, custom serializers may have no noticeable effect on performance, so this is an optional advanced feature.
Each dataset accepts key_serializer and value_serializer keyword arguments that specify serializers to be used for outputting each key and value. Alternatively, the serializers can be specified directly on a map or reduce function using the mrs.output_serializers decorator. In each case, the arguments are either the name of a serializer attribute of the program or the attribute itself. A serializer must have both dumps and loads methods. The mrs.MapReduce class has three default serializers provided: raw_serializer (bytes), int_serializer, and str_serializer, so if you extend mrs.MapReduce, you automatically get access to these three serializers. Deserializers are not explicitly given because they are specified in the parent dataset.
The map function in the wordcount2.py example uses the str serializer for the output key and the int serializer for the output value:
@mrs.output_serializers(key='str_serializer', value='int_serializer')
def map(self, key, value):
#do stuff
yield new_key_str, new_value_int
Alternativly, we could have specified the serializers by using the actual attribute rather than the name (e.g. key=mrs.MapReduce.str_serializer).
To define your own custom serializer, add the serializer as an attribute to your program, and then pass it in as the argument to the decorator. Here is an example using the mrs.make_protobuf_serializer helper:
class MyProgram(mrs.MapReduce):
my_serializer = mrs.make_protobuf_serializer(some_proto)
def map(self, key, value):
#do stuff
yield new_key, message
For one final example, we use the the mrs.make_struct_serializer helper to create a serialzer which knows how to pack and unpack tuples of a specific type. In this case, we will deal with tuples which are pairs of longs:
class MyProgram(mrs.MapReduce):
long_pair_serializer = mrs.make_struct_serializer('=LL')
def map(self, key, value):
#do stuff
yield new_key, (long_value_1, long_value_2)
You may use any serializer type you want, so long as the serializer has the loads and dumps methods which respectively decode and encode your data. There are a few helpers for common data formats already included:
Local temporary storage is specified with the --mrs-tmpdir option, which defaults to /var/tmp. Do not under any circumstances configure Mrs to store temporary files on a remote filesystem (e.g., NFS). Temporary files are shared between slaves using HTTP, and adding an extra layer of network communication is unnecessary and slow.
In some circumstances, it may be possible to store intermediate files on a remote filesystem. In such situations, the master’s --mrs-shared option can be used to configure a globally accessible directory. However, this is only suitable for CPU-bound programs–a central bottleneck is disastrous for I/O-bound programs. Even in the case of CPU-bound programs, a central --mrs-shared directory for intermediate files can be problematic. For example, if the Python files from Mrs are being run from a network filesystem, the extra load from --mrs-shared can cause the Python interpreter to hang for extended periods of time while waiting on remote bytecode files.
The -mrs-max-sort-size option determines the maximum amount of data that will be loaded for sorting at a time in a reduce task. Reduce tasks can have input that is larger than the maximum sort size, but large input data is split into smaller chunks for sorting. If more memory is available, performance may improve if data can be sorted in RAM. However, it is important not to allow a Mrs process to use more memory than is available. Sorting on disk is much faster than heavy swapping, and running out of memory can cause Mrs to crash.
When choosing a maximum sort size, recognize that the amount of required memory is greater than the sort size. Specifically, Mrs must store a list of tuples containing the deserialized key, serialized key, and serialized value. On a 64-bit machine with Python 2.7, this adds the memory required by the deserialized key plus an overhead of about 162 bytes per key-value pair (8 bytes for a pointer to a tuple, 56 bytes for a tuple, 8*3 bytes for pointers in each tuple, plus 37*2 bytes for the strings). Due to this overhead, it’s important not to set --mrs-max-sort-size anywhere close to the total available memory.
The examples directory includes several examples of MapReduce programs of varying levels of complexity.
Mrs includes two versions of WordCount:
The wordcount.py example demonstrates the simplest possible program in Mrs. The wordcount2.py example is a more full-featured version of WordCount that uses a combiner and explicitly sets a serializer (instead of automatically using pickle).
Hadoop includes a PiEstimator example program that estimates the value of pi by evaluating samples from a 2D Halton sequence and counting the proportion that fall within the unit circle. Mrs includes two versions of this program:
The pure_pi.py example is a simple Mrs program with a custom run method that demonstrates how a program can process data computed in a parallel dataset.
The c_pi.py demonstrates how a function rewritten in C can remove the inherent performance penalty incurred by using a high-level programming language like Python. This fast implementation of the pi estimator uses ctypes to load a function from the halton.so file, which was compiled from halton.c using the included Makefile.
Note that c_pi.py in Mrs using Python and C is much faster than the equivalent optimized Pi.java program in Hadoop using Java.
The examples/contrib directory includes a few examples that may not be runnable without specific libraries or data but which show how Mrs is used in real-life problems.
The walk_analyzer.py program analyzes pregenerated lists of random walks through a graph. This program includes examples of using custom serializers to improve performance. It also serves as a helpful example of how to break down large input files.
A simpler variant, conditional_prob.py, that simply computes conditional probabilities is available as for comparison.