.. _user_guide: ********** User Guide ********** Structure of a Mrs Program ========================== A Mrs program is a user-specified class that is passed to ``mrs.main``. Although the ``mrs.MapReduce`` class implements some common functionality, a MapReduce program is not required to inherit from this class. Any class that specifies ``__init__`` and ``run`` methods with the appropriate signatures is a valid Mrs program. These two required methods are: - ``__init__(self, opts, args)`` (mandatory) The ``__init__`` method is called with an ``opts`` object (created by ``optparse``) and an ``args`` list. These arguments are created on the master, but the ``__init__`` method is called once on each slave (with identical arguments). - ``run(self, job)`` (mandatory) The role of the ``run`` method is to submit datasets for evaluation and process their results. It is called with a ``job`` object, an instance of ``mrs.job.Job``, which provides methods such as ``local_data``, ``file_data``, ``map_data``, and ``reduce_data`` for creating dataset objects and submitting them for asynchronous evaluation. The job also provides ``wait`` and ``progress`` methods for determining whether datasets are complete. Additional optional methods are: - ``update_parser(cls, parser)`` (class method) The optional ``update_parser`` method, which must be a class method if specified, adds custom command-line arguments to the given option parser (``optparse.OptionParser``). - ``partition(self, key, serialized_key, n)`` If specified, the ``partition`` method serves as the default partition function for map and reduce datasets. See the entry on the ``parter`` below for more information. - ``bypass(self)`` If the ``Bypass`` implementation is specified on the command line, then Mrs invokes the `bypass` method instead of the ``run`` method. This provides a convenient mechanism for including serial and parallel implementations of a program side by side. The Default MapReduce Class =========================== The ``mrs.MapReduce`` class in the file ``mrs/mapreduce.py`` provides simple default implementations for each of the methods used by Mrs. It serves both as a reasonable default and also as an example. Programs can optionally inherit from this class and override any functions as needed. In its basic form, the ``mrs.MapReduce`` class assumes that a MapReduce program consists of a single map phase followed by a single reduce phase. One or more filenames specified on the command-line are used to read text files to input to the map function, and a path provided on the command-line indicates the directory for output using a human-readable file format. The following additional methods can be overridden in a minimal MapReduce program that uses the ``mrs.MapReduce`` class: - ``map(self, key, value)`` A generator which takes a key and a value and yields one or more key-value pairs. See the MapReduce paper for more information about the role of a map function. - ``reduce(self, key, values)`` A generator which takes a key and a value iterator and yields one or more values associated with the key. See the MapReduce paper for more information about the role of a reduce function. - ``input_data(self, job)`` Returns a dataset object that is used as the input to the map dataset. By default, it uses filenames specified on the command-line (all of the positional arguments except the last). - ``output_dir(self)`` Returns a path that is used for the output directory of the reduce dataset. By default, it uses a path specified on the command-line (the last positional arguments). The following list details the default behavior for some of the methods provided by the ``mrs.MapReduce`` class: - ``update_parser(cls, parser)`` (class method) Modifies and returns the given option parser (from the ``optparse`` module). By default, it adds a simple usage statement. Many programs add command-line options or change the usage statement. - ``run(self, job)`` Submits datasets to the ``mrs.job.Job`` and optionally waits for and processes the completed datasets. By default, it assumes that there is a single map dataset using the ``map`` method and a single reduce dataset using the ``reduce`` method. The input to the map dataset is given by ``input_data``, and the output from the reduce dataset is written using the human-readable ``TextWriter`` file format to the directory specified by ``output_dir``. A program that is more complex than a simple text processor can override the ``run`` method to add any sequence of map and reduce datasets. Of course, any of these methods may be overridden to customize behavior. The Job and Datasets ==================== A dataset represents a set of key-value pairs distributed across a set of files. Datasets may be associated with a list of URLs, locally-produced data, or the output of map and reduce tasks executed in parallel. The ``run`` method of a MapReduce program submits datasets to the job for asynchronous evaluation. It may also download and process the results of completed datasets. The job provides a set of methods that create and submit datasets: - ``file_data(filenames)`` Creates a ``FileData`` dataset associated with a set of URLs. - ``local_data(itr)`` Creates a dataset from a locally-computed iterator, making the data available for parallel processing. - ``map_data(input, mapper)`` Performs a map operation on an input dataset using the given map function, which is required to be a method of the MapReduce program. - ``reduce_data(input, reducer)`` Performs a reduce operation on an input dataset using the given reduce function, which is required to be a method of the MapReduce program. - ``reducemap_data(input, reducer, mapper)`` Performs a consolidated reduce-map operation on an input dataset using the given reduce and map functions, which are required to be methods of the MapReduce program. Most types of datasets accept a variety of optional keyword arguments: - ``splits`` An integer indicating the number of output splits. For example, if a local data is used as the input for a map operation, then the number of splits of the local data will dictate the number of map tasks in the subsequent map operation. Likewise, the number of splits of a map operation will determine the number of subsequent reduce tasks. - ``outdir`` A string specifying a directory where output data will be stored. - ``parter`` A method of the MapReduce program that is used to partition data to subsequent tasks. See the MapReduce paper for more information. A partition function takes three arguments: ``key``, ``serialized_key``, and ``n``. Although both the key object and the serialized key are available, it will generally use one or the other. For example, ``mod_partition`` expects an integer and uses the unserialized form, while ``hash_partition`` uses a hash function that operates on bytes, so it uses the serialized form. The argument ``n`` specifies the number of splits and is usually used as a modulus (e.g., ``return x % n``). - ``combiner`` (for map and reducemap datasets) A method of the MapReduce program that serves as a pre-reducer within a map task. See the MapReduce paper for more information. The job's ``progress`` method reports the fraction of the given dataset that is complete, and its ``wait`` method returns when any of the given datasets have completed evaluation (or if the optional timeout has expired). The following methods of ``DataSet`` objects are useful for processing the results of completed datasets: - ``close()`` Closes the dataset to future use and allows any associated temporary files to be deleted as soon as other datasets no longer depend on them. - ``fetchall()`` Downloads all of the key-value pairs associated with the dataset. - ``data()`` Iterates over all of the key-value pairs in the dataset. The IterativeMR Class ===================== The ``IterativeMR`` class implements a ``run`` method that gives a producer-consumer interface for creating iterative MapReduce programs. In this model, datasets are pipelined. The ``producer`` method is called when the number of runnable datasets is low, and the ``consumer`` method is called when each dataset completes. This model reduces complexity particularly for programs where processing on the master occurs concurrently with map and reduce tasks (for example, programs with convergence checks). Advanced Features ================= Pseudorandom Number Generation ------------------------------ Nondeterministic results fundamentally make debugging difficult and testing impossible. Setting a random seed is a simple way to make stochastic algorithms deterministic. However, in a MapReduce program, setting a fixed random seed at the beginning of each map or reduce task would make all tasks use the same sequence of random numbers. The ``mrs.MapReduce`` class provides a ``random`` method that returns a random number generator. The method takes a variable number of integer arguments and ensures that the random number generator is unique for any particular combination of inputs. Because of the large size of the internal state of the Mersenne Twister, the ``random`` method can accept around 300 arguments that are each 64-bit integers. This makes it easy to generate a unique random number generator in each task or even to create identical random number generators in different tasks that need to duplicate specific calculations. Custom Serializers ------------------ By default, inputs and outputs for map and reduce functions are serialized using Python's `Pickle Module `_. In most cases, this is convenient, but in some cases, performance may be improved by using custom serializers and deserializers, or even no serializers at all. For many applications, custom serializers may have no noticeable effect on performance, so this is an optional advanced feature. Each dataset accepts ``key_serializer`` and ``value_serializer`` keyword arguments that specify serializers to be used for outputting each key and value. Alternatively, the serializers can be specified directly on a map or reduce function using the ``mrs.output_serializers`` decorator. In each case, the arguments are either the name of a serializer attribute of the program or the attribute itself. A serializer must have both ``dumps`` and ``loads`` methods. The ``mrs.MapReduce`` class has three default serializers provided: ``raw_serializer`` (bytes), ``int_serializer``, and ``str_serializer``, so if you extend ``mrs.MapReduce``, you automatically get access to these three serializers. Deserializers are not explicitly given because they are specified in the parent dataset. The map function in the ``wordcount2.py`` example uses the str serializer for the output key and the int serializer for the output value:: @mrs.output_serializers(key='str_serializer', value='int_serializer') def map(self, key, value): #do stuff yield new_key_str, new_value_int Alternativly, we could have specified the serializers by using the actual attribute rather than the name (e.g. ``key=mrs.MapReduce.str_serializer``). To define your own custom serializer, add the serializer as an attribute to your program, and then pass it in as the argument to the decorator. Here is an example using the ``mrs.make_protobuf_serializer`` helper:: class MyProgram(mrs.MapReduce): ... my_serializer = mrs.make_protobuf_serializer(some_proto) @mrs.output_serializers(value=my_serializer) def map(self, key, value): #do stuff yield new_key, message For one final example, we use the the ``mrs.make_struct_serializer`` helper to create a serialzer which knows how to pack and unpack tuples of a specific type. In this case, we will deal with tuples which are pairs of longs:: class MyProgram(mrs.MapReduce): ... long_pair_serializer = mrs.make_struct_serializer('=LL') @mrs.output_serializers(value=long_pair_serializer): def map(self, key, value): #do stuff yield new_key, (long_value_1, long_value_2) You may use any serializer type you want, so long as the serializer has the ``loads`` and ``dumps`` methods which respectively decode and encode your data. There are a few helpers for common data formats already included: - ``mrs.make_struct_serializer`` Creates a serializer for tuples of primative types. It takes format strings as defined in the struct module to define how the tuples used by the serializer will be structured. - ``mrs.make_primative_serializer`` Creates a serializer for a single primative type. Like the struct serializer, it takes a struct format string, but only allows structs with a single primative type. This is useful because the struct serializer always expects and returns tuples, even with the tuple has only one element. - ``mrs.make_protobuf_serializer`` Creates a serializer from a protocol buffer object. - ``mrs.str_serializer`` A pre-made serializer for Python string values. It is also an attribute ``mrs.MapReduce`` so you do not need to create extra attributes in your own program. - ``mrs.int_serializer`` A pre-made serializer for Python integer values. It is an attribute of ``mrs.MapReduce`` so you do not need to create extra attributes in your own program. It is not as fast as using a primative serializer, but will allow you to use arbitrarily large integer values. - ``mrs.raw_serializer`` A pre-made serializer which is a no-op. It is also an attribute of ``mrs.MapReduce`` so you do not need to create extra attributes in your own program. It is useful when you already have data in binary format and do not need to apply any serialization to that data. Tips ==== Network Filesystems ------------------- Local temporary storage is specified with the ``--mrs-tmpdir`` option, which defaults to ``/var/tmp``. Do not under any circumstances configure Mrs to store temporary files on a remote filesystem (e.g., NFS). Temporary files are shared between slaves using HTTP, and adding an extra layer of network communication is unnecessary and slow. In some circumstances, it may be possible to store intermediate files on a remote filesystem. In such situations, the master's ``--mrs-shared`` option can be used to configure a globally accessible directory. However, this is only suitable for CPU-bound programs--a central bottleneck is disastrous for I/O-bound programs. Even in the case of CPU-bound programs, a central ``--mrs-shared`` directory for intermediate files can be problematic. For example, if the Python files from Mrs are being run from a network filesystem, the extra load from ``--mrs-shared`` can cause the Python interpreter to hang for extended periods of time while waiting on remote bytecode files. Memory for Sorting ------------------ The ``-mrs-max-sort-size`` option determines the maximum amount of data that will be loaded for sorting at a time in a reduce task. Reduce tasks can have input that is larger than the maximum sort size, but large input data is split into smaller chunks for sorting. If more memory is available, performance may improve if data can be sorted in RAM. However, it is important not to allow a Mrs process to use more memory than is available. Sorting on disk is much faster than heavy swapping, and running out of memory can cause Mrs to crash. When choosing a maximum sort size, recognize that the amount of required memory is greater than the sort size. Specifically, Mrs must store a list of tuples containing the deserialized key, serialized key, and serialized value. On a 64-bit machine with Python 2.7, this adds the memory required by the deserialized key plus an overhead of about 162 bytes per key-value pair (8 bytes for a pointer to a tuple, 56 bytes for a tuple, 8*3 bytes for pointers in each tuple, plus 37*2 bytes for the strings). Due to this overhead, it's important not to set ``--mrs-max-sort-size`` anywhere close to the total available memory. Examples ======== The ``examples`` directory includes several examples of MapReduce programs of varying levels of complexity. WordCount --------- Mrs includes two versions of WordCount: - `wordcount.py `_: - `wordcount2.py `_: The ``wordcount.py`` example demonstrates the simplest possible program in Mrs. The ``wordcount2.py`` example is a more full-featured version of WordCount that uses a combiner and explicitly sets a serializer (instead of automatically using pickle). Pi Estimator ------------ Hadoop includes a ``PiEstimator`` example program that estimates the value of pi by evaluating samples from a 2D `Halton sequence `_ and counting the proportion that fall within the unit circle. Mrs includes two versions of this program: - `pi/pure_pi.py `_ - `pi/c_pi.py `_ with `pi/halton.c `_ The ``pure_pi.py`` example is a simple Mrs program with a custom ``run`` method that demonstrates how a program can process data computed in a parallel dataset. The ``c_pi.py`` demonstrates how a function rewritten in C can remove the inherent performance penalty incurred by using a high-level programming language like Python. This fast implementation of the pi estimator uses ``ctypes`` to load a function from the ``halton.so`` file, which was compiled from ``halton.c`` using the included `Makefile `_. Note that ``c_pi.py`` in Mrs using Python and C is much faster than the equivalent optimized `Pi.java `_ program in Hadoop using Java. Other Contributed Examples -------------------------- The ``examples/contrib`` directory includes a few examples that may not be runnable without specific libraries or data but which show how Mrs is used in real-life problems. - `walk_analyzer.py `_ The ``walk_analyzer.py`` program analyzes pregenerated lists of random walks through a graph. This program includes examples of using custom serializers to improve performance. It also serves as a helpful example of how to break down large input files. A simpler variant, `conditional_prob.py `_, that simply computes conditional probabilities is available as for comparison. - `dependency_parse.py `_