Embarrassingly parallel for loops

Common usage

Joblib provides a simple helper class to write parallel for loops using multiprocessing. The core idea is to write the code to be executed as a generator expression, and convert it to parallel computing:

>>> from math import sqrt
>>> [sqrt(i ** 2) for i in range(10)]
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

can be spread over 2 CPUs using the following:

>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

Under the hood, the Parallel object create a multiprocessing pool that forks the Python interpreter in multiple processes to execute each of the items of the list. The delayed function is a simple trick to be able to create a tuple (function, args, kwargs) with a function-call syntax.

Warning

Under Windows, it is important to protect the main loop of code to avoid recursive spawning of subprocesses when using joblib.Parallel. In other words, you should be writing code like this:

import ....

def function1(...):
    ...

def function2(...):
    ...

...
if __name__ == '__main__':
    # do stuff with imports and functions defined about
    ...

No code should run outside of the “if __name__ == ‘__main__’” blocks, only imports and definitions.

Using the threading backend

By default Parallel uses the Python multiprocessing module to fork separate Python worker processes to execute tasks concurrently on separate CPUs. This is a reasonable default for generic Python programs but it induces some overhead as the input and output data need to be serialized in a queue for communication with the worker processes.

If you know that the function you are calling is based on a compiled extension that releases the Python Global Interpreter Lock (GIL) during most of its computation then it might be more efficient to use threads instead of Python processes as concurrent workers. For instance this is the case if you write the CPU intensive part of your code inside a with nogil block of a Cython function.

To use the threads, just pass "threading" as the value of the backend parameter of the Parallel constructor:

>>> Parallel(n_jobs=2, backend="threading")(
...     delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

Reusing a pool of workers

Some algorithms require to make several consecutive calls to a parallel function interleaved with processing of the intermediate results. Calling Parallel several times in a loop is sub-optimal because it will create and destroy a pool of workers (threads or processes) several times which can cause a significant overhead.

For this case it is more efficient to use the context manager API of the Parallel class to re-use the same pool of workers for several calls to the Parallel object:

>>> with Parallel(n_jobs=2) as parallel:
...    accumulator = 0.
...    n_iter = 0
...    while accumulator < 1000:
...        results = parallel(delayed(sqrt)(accumulator + i ** 2)
...                           for i in range(5))
...        accumulator += sum(results)  # synchronization barrier
...        n_iter += 1
...
>>> (accumulator, n_iter)                            
(1136.596..., 14)

Working with numerical data in shared memory (memmaping)

By default the workers of the pool are real Python processes forked using the multiprocessing module of the Python standard library when n_jobs != 1. The arguments passed as input to the Parallel call are serialized and reallocated in the memory of each worker process.

This can be problematic for large arguments as they will be reallocated n_jobs times by the workers.

As this problem can often occur in scientific computing with numpy based datastructures, joblib.Parallel provides a special handling for large arrays to automatically dump them on the filesystem and pass a reference to the worker to open them as memory map on that file using the numpy.memmap subclass of numpy.ndarray. This makes it possible to share a segment of data between all the worker processes.

Note

The following only applies with the default "multiprocessing" backend. If your code can release the GIL, then using backend="threading" is even more efficient.

Automated array to memmap conversion

The automated array to memmap conversion is triggered by a configurable threshold on the size of the array:

>>> import numpy as np
>>> from joblib import Parallel, delayed
>>> from joblib.pool import has_shareable_memory

>>> Parallel(n_jobs=2, max_nbytes=1e6)(
...     delayed(has_shareable_memory)(np.ones(int(i)))
...     for i in [1e2, 1e4, 1e6])
[False, False, True]

By default the data is dumped to the /dev/shm shared-memory partition if it exists and writeable (typically the case under Linux). Otherwise the operating system’s temporary folder is used. The location of the temporary data files can be customized by passing a temp_folder argument to the Parallel constructor.

Passing max_nbytes=None makes it possible to disable the automated array to memmap conversion.

Manual management of memmaped input data

For even finer tuning of the memory usage it is also possible to dump the array as an memmap directly from the parent process to free the memory before forking the worker processes. For instance let’s allocate a large array in the memory of the parent process:

>>> large_array = np.ones(int(1e6))

Dump it to a local file for memmaping:

>>> import tempfile
>>> import os
>>> from joblib import load, dump

>>> temp_folder = tempfile.mkdtemp()
>>> filename = os.path.join(temp_folder, 'joblib_test.mmap')
>>> if os.path.exists(filename): os.unlink(filename)
>>> _ = dump(large_array, filename)
>>> large_memmap = load(filename, mmap_mode='r+')

The large_memmap variable is pointing to a numpy.memmap instance:

>>> large_memmap.__class__.__name__, large_array.nbytes, large_array.shape
('memmap', 8000000, (1000000,))

>>> np.allclose(large_array, large_memmap)
True

We can free the original array from the main process memory:

>>> del large_array
>>> import gc
>>> _ = gc.collect()

It is possible to slice large_memmap into a smaller memmap:

>>> small_memmap = large_memmap[2:5]
>>> small_memmap.__class__.__name__, small_memmap.nbytes, small_memmap.shape
('memmap', 24, (3,))

Finally we can also take a np.ndarray view backed on that same memory mapped file:

>>> small_array = np.asarray(small_memmap)
>>> small_array.__class__.__name__, small_array.nbytes, small_array.shape
('ndarray', 24, (3,))

All those three datastructures point to the same memory buffer and this same buffer will also be reused directly by the worker processes of a Parallel call:

>>> Parallel(n_jobs=2, max_nbytes=None)(
...     delayed(has_shareable_memory)(a)
...     for a in [large_memmap, small_memmap, small_array])
[True, True, True]

Note that here we used max_nbytes=None to disable the auto-dumping feature of Parallel. small_array is still in shared memory in the worker processes because it was already backed by shared memory in the parent process. The pickling machinery of Parallel multiprocessing queues are able to detect this situation and optimize it on the fly to limit the number of memory copies.

Writing parallel computation results in shared memory

If you open your data using the w+ or r+ mode in the main program, the worker will get r+ mode access. Thus the worker will be able to write its results directly to the original data, alleviating the need of the serialization to send back the results to the parent process.

Here is an example script on parallel processing with preallocated numpy.memmap datastructures:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""Demonstrate the usage of numpy.memmap with joblib.Parallel

This example shows how to preallocate data in memmap arrays both for input and
output of the parallel worker processes.

Sample output for this program::

    [Worker 93486] Sum for row 0 is -1599.756454
    [Worker 93487] Sum for row 1 is -243.253165
    [Worker 93488] Sum for row 3 is 610.201883
    [Worker 93489] Sum for row 2 is 187.982005
    [Worker 93489] Sum for row 7 is 326.381617
    [Worker 93486] Sum for row 4 is 137.324438
    [Worker 93489] Sum for row 8 is -198.225809
    [Worker 93487] Sum for row 5 is -1062.852066
    [Worker 93488] Sum for row 6 is 1666.334107
    [Worker 93486] Sum for row 9 is -463.711714
    Expected sums computed in the parent process:
    [-1599.75645426  -243.25316471   187.98200458   610.20188337   137.32443803
     -1062.85206633  1666.33410715   326.38161713  -198.22580876  -463.71171369]
    Actual sums computed by the worker processes:
    [-1599.75645426  -243.25316471   187.98200458   610.20188337   137.32443803
     -1062.85206633  1666.33410715   326.38161713  -198.22580876  -463.71171369]

"""
import tempfile
import shutil
import os
import numpy as np

from joblib import Parallel, delayed
from joblib import load, dump


def sum_row(input, output, i):
    """Compute the sum of a row in input and store it in output"""
    sum_ = input[i, :].sum()
    print("[Worker %d] Sum for row %d is %f" % (os.getpid(), i, sum_))
    output[i] = sum_

if __name__ == "__main__":
    rng = np.random.RandomState(42)
    folder = tempfile.mkdtemp()
    samples_name = os.path.join(folder, 'samples')
    sums_name = os.path.join(folder, 'sums')
    try:
        # Generate some data and an allocate an output buffer
        samples = rng.normal(size=(10, int(1e6)))

        # Pre-allocate a writeable shared memory map as a container for the
        # results of the parallel computation
        sums = np.memmap(sums_name, dtype=samples.dtype,
                         shape=samples.shape[0], mode='w+')

        # Dump the input data to disk to free the memory
        dump(samples, samples_name)

        # Release the reference on the original in memory array and replace it
        # by a reference to the memmap array so that the garbage collector can
        # release the memory before forking. gc.collect() is internally called
        # in Parallel just before forking.
        samples = load(samples_name, mmap_mode='r')

        # Fork the worker processes to perform computation concurrently
        Parallel(n_jobs=4)(delayed(sum_row)(samples, sums, i)
                           for i in range(samples.shape[0]))

        # Compare the results from the output buffer with the ground truth
        print("Expected sums computed in the parent process:")
        expected_result = samples.sum(axis=1)
        print(expected_result)

        print("Actual sums computed by the worker processes:")
        print(sums)

        assert np.allclose(expected_result, sums)
    finally:
        try:
            shutil.rmtree(folder)
        except:
            print("Failed to delete: " + folder)

Warning

Having concurrent workers write on overlapping shared memory data segments, for instance by using inplace operators and assignments on a numpy.memmap instance, can lead to data corruption as numpy does not offer atomic operations. The previous example does not risk that issue as each task is updating an exclusive segment of the shared result array.

Some C/C++ compilers offer lock-free atomic primitives such as add-and-fetch or compare-and-swap that could be exposed to Python via CFFI for instance. However providing numpy-aware atomic constructs is outside of the scope of the joblib project.

A final note: don’t forget to clean up any temporary folder when you are done with the computation:

>>> import shutil
>>> try:
...     shutil.rmtree(temp_folder)
... except OSError:
...     pass  # this can sometimes fail under Windows

Bad interaction of multiprocessing and third-party libraries

Prior to Python 3.4 the 'multiprocessing' backend of joblib can only use the fork strategy to create worker processes under non-Windows systems. This can cause some third-party libraries to crash or freeze. Such libraries include as Apple vecLib / Accelerate (used by NumPy under OSX), some old version of OpenBLAS (prior to 0.2.10) or the OpenMP runtime implementation from GCC.

To avoid this problem joblib.Parallel can be configured to use the 'forkserver' start method on Python 3.4 and later. The start method has to be configured by setting the JOBLIB_START_METHOD environment variable to 'forkserver' instead of the default 'fork' start method. However the user should be aware that using the 'forkserver' method prevents joblib.Parallel to call function interactively defined in a shell session.

You can read more on this topic in the multiprocessing documentation.

Under Windows the fork system call does not exist at all so this problem does not exist (but multiprocessing has more overhead).

Custom backend API (experimental)

New in version 0.10.

Warning

The custom backend API is experimental and subject to change without going through a deprecation cycle.

User can provide their own implementation of a parallel processing backend in addition to the 'multiprocessing' and 'threading' backends provided by default. A backend is registered with the joblib.register_parallel_backend() function by passing a name and a backend factory.

The backend factory can be any callable that returns an instance of ParallelBackendBase. Please refer to the default backends source code as a reference if you want to implement your own custom backend.

Note that it is possible to register a backend class that has some mandatory constructor parameters such as the network address and connection credentials for a remote cluster computing service:

class MyCustomBackend(ParallelBackendBase):

    def __init__(self, endpoint, api_key):
       self.endpoint = endpoint
       self.api_key = api_key

    ...
    # Do something with self.endpoint and self.api_key somewhere in
    # one of the method of the class

register_parallel_backend('custom', MyCustomBackend)

The connection parameters can then be passed to the joblib.parallel_backend() context manager:

with parallel_backend('custom', endpoint='http://compute', api_key='42'):
    Parallel()(delayed(some_function)(i) for i in range(10))

Using the context manager can be helpful when using a third-party library that uses joblib.Parallel internally while not exposing the backend argument in its own API.

Parallel reference documentation

class joblib.Parallel(n_jobs=1, backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs', batch_size='auto', temp_folder=None, max_nbytes='1M', mmap_mode='r')

Helper class for readable parallel mapping.

Parameters:

n_jobs: int, default: 1

The maximum number of concurrently running jobs, such as the number of Python worker processes when backend=”multiprocessing” or the size of the thread-pool when backend=”threading”. If -1 all CPUs are used. If 1 is given, no parallel computing code is used at all, which is useful for debugging. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Thus for n_jobs = -2, all CPUs but one are used.

backend: str, ParallelBackendBase instance or None, default: ‘multiprocessing’

Specify the parallelization backend implementation. Supported backends are:

  • “multiprocessing” used by default, can induce some communication and memory overhead when exchanging input and output data with the worker Python processes.
  • “threading” is a very low-overhead backend but it suffers from the Python Global Interpreter Lock if the called function relies a lot on Python objects. “threading” is mostly useful when the execution bottleneck is a compiled extension that explicitly releases the GIL (for instance a Cython loop wrapped in a “with nogil” block or an expensive call to a library such as NumPy).
  • finally, you can register backends by calling register_parallel_backend. This will allow you to implement a backend of your liking.

verbose: int, optional

The verbosity level: if non zero, progress messages are printed. Above 50, the output is sent to stdout. The frequency of the messages increases with the verbosity level. If it more than 10, all iterations are reported.

timeout: float, optional

Timeout limit for each task to complete. If any task takes longer a TimeOutError will be raised. Only applied when n_jobs != 1

pre_dispatch: {‘all’, integer, or expression, as in ‘3*n_jobs’}

The number of batches (of tasks) to be pre-dispatched. Default is ‘2*n_jobs’. When batch_size=”auto” this is reasonable default and the multiprocessing workers should never starve.

batch_size: int or ‘auto’, default: ‘auto’

The number of atomic tasks to dispatch at once to each worker. When individual evaluations are very fast, multiprocessing can be slower than sequential computation because of the overhead. Batching fast computations together can mitigate this. The 'auto' strategy keeps track of the time it takes for a batch to complete, and dynamically adjusts the batch size to keep the time on the order of half a second, using a heuristic. The initial batch size is 1. batch_size="auto" with backend="threading" will dispatch batches of a single task at a time as the threading backend has very little overhead and using larger batch size has not proved to bring any gain in that case.

temp_folder: str, optional

Folder to be used by the pool for memmaping large arrays for sharing memory with worker processes. If None, this will try in order:

  • a folder pointed by the JOBLIB_TEMP_FOLDER environment variable,
  • /dev/shm if the folder exists and is writable: this is a RAMdisk filesystem available by default on modern Linux distributions,
  • the default system temporary folder that can be overridden with TMP, TMPDIR or TEMP environment variables, typically /tmp under Unix operating systems.

Only active when backend=”multiprocessing”.

max_nbytes int, str, or None, optional, 1M by default

Threshold on the size of arrays passed to the workers that triggers automated memory mapping in temp_folder. Can be an int in Bytes, or a human-readable string, e.g., ‘1M’ for 1 megabyte. Use None to disable memmaping of large arrays. Only active when backend=”multiprocessing”.

mmap_mode: {None, ‘r+’, ‘r’, ‘w+’, ‘c’}

Memmapping mode for numpy arrays passed to workers. See ‘max_nbytes’ parameter documentation for more details.

Notes

This object uses the multiprocessing module to compute in parallel the application of a function to many different arguments. The main functionality it brings in addition to using the raw multiprocessing API are (see examples for details):

  • More readable code, in particular since it avoids constructing list of arguments.
  • Easier debugging:
    • informative tracebacks even when the error happens on the client side
    • using ‘n_jobs=1’ enables to turn off parallel computing for debugging without changing the codepath
    • early capture of pickling errors
  • An optional progress meter.
  • Interruption of multiprocesses jobs with ‘Ctrl-C’
  • Flexible pickling control for the communication to and from the worker processes.
  • Ability to use shared memory efficiently with worker processes for large numpy-based datastructures.

Examples

A simple example:

>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

Reshaping the output when the function has several return values:

>>> from math import modf
>>> from joblib import Parallel, delayed
>>> r = Parallel(n_jobs=1)(delayed(modf)(i/2.) for i in range(10))
>>> res, i = zip(*r)
>>> res
(0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5)
>>> i
(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0)

The progress meter: the higher the value of verbose, the more messages:

>>> from time import sleep
>>> from joblib import Parallel, delayed
>>> r = Parallel(n_jobs=2, verbose=5)(delayed(sleep)(.1) for _ in range(10)) 
[Parallel(n_jobs=2)]: Done   1 out of  10 | elapsed:    0.1s remaining:    0.9s
[Parallel(n_jobs=2)]: Done   3 out of  10 | elapsed:    0.2s remaining:    0.5s
[Parallel(n_jobs=2)]: Done   6 out of  10 | elapsed:    0.3s remaining:    0.2s
[Parallel(n_jobs=2)]: Done   9 out of  10 | elapsed:    0.5s remaining:    0.1s
[Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    0.5s finished

Traceback example, note how the line of the error is indicated as well as the values of the parameter passed to the function that triggered the exception, even though the traceback happens in the child process:

>>> from heapq import nlargest
>>> from joblib import Parallel, delayed
>>> Parallel(n_jobs=2)(delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3)) 
#...
---------------------------------------------------------------------------
Sub-process traceback:
---------------------------------------------------------------------------
TypeError                                          Mon Nov 12 11:37:46 2012
PID: 12934                                    Python 2.7.3: /usr/bin/python
...........................................................................
/usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None)
    419         if n >= size:
    420             return sorted(iterable, key=key, reverse=True)[:n]
    421
    422     # When key is none, use simpler decoration
    423     if key is None:
--> 424         it = izip(iterable, count(0,-1))                    # decorate
    425         result = _nlargest(n, it)
    426         return map(itemgetter(0), result)                   # undecorate
    427
    428     # General case, slowest method
 TypeError: izip argument #1 must support iteration
___________________________________________________________________________

Using pre_dispatch in a producer/consumer situation, where the data is generated on the fly. Note how the producer is first called 3 times before the parallel loop is initiated, and then called to generate new data on the fly. In this case the total number of iterations cannot be reported in the progress messages:

>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> def producer():
...     for i in range(6):
...         print('Produced %s' % i)
...         yield i
>>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
...                delayed(sqrt)(i) for i in producer()) 
Produced 0
Produced 1
Produced 2
[Parallel(n_jobs=2)]: Done 1 jobs     | elapsed:  0.0s
Produced 3
[Parallel(n_jobs=2)]: Done 2 jobs     | elapsed:  0.0s
Produced 4
[Parallel(n_jobs=2)]: Done 3 jobs     | elapsed:  0.0s
Produced 5
[Parallel(n_jobs=2)]: Done 4 jobs     | elapsed:  0.0s
[Parallel(n_jobs=2)]: Done 5 out of 6 | elapsed:  0.0s remaining: 0.0s
[Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed:  0.0s finished
joblib.delayed(function, check_pickle=True)

Decorator used to capture the arguments of a function.

Pass check_pickle=False when:

  • performing a possibly repeated check is too costly and has been done already once outside of the call to delayed.
  • when used in conjunction Parallel(backend=’threading’).
joblib.register_parallel_backend(name, factory, make_default=False)

Register a new Parallel backend factory.

The new backend can then be selected by passing its name as the backend argument to the Parallel class. Moreover, the default backend can be overwritten globally by setting make_default=True.

The factory can be any callable that takes no argument and return an instance of ParallelBackendBase.

Warning: this function is experimental and subject to change in a future version of joblib.

New in version 0.10.

joblib.parallel_backend(backend, n_jobs=-1, **backend_params)

Change the default backend used by Parallel inside a with block.

If backend is a string it must match a previously registered implementation using the register_parallel_backend function.

Alternatively backend can be passed directly as an instance.

By default all available workers will be used (n_jobs=-1) unless the caller passes an explicit value for the n_jobs parameter.

This is an alternative to passing a backend='backend_name' argument to the Parallel class constructor. It is particularly useful when calling into library code that uses joblib internally but does not expose the backend argument in its own API.

>>> from operator import neg
>>> with parallel_backend('threading'):
...     print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
...
[-1, -2, -3, -4, -5]

Warning: this function is experimental and subject to change in a future version of joblib.

New in version 0.10.