===================================
NumPy And High-Performance Chunking
===================================
This page contains two sections related to NumPy_:
.. _NumPy: http://www.numpy.org/
* `NumPy Array Sinks`_ describes how to transform the output of a pipeline to a NumPy array, and some points regarding this.
* `High Performance Chunking`_ describes stages that chunk the data internally for high performance. Each such stage uses NumPy arrays internally.
The relevant stages are in the :py:mod:`dagpype.np` sub-package.
-----------------
NumPy Array Sinks
-----------------
Much of the purpose of this library is for preprocessing data for further processing using other Python libraries.
To build a pipe resulting in a NumPy array, we can do something like one of the following:
::
>>> a = stream_vals('rain.txt') | np.to_array()
>>> type(a)
>>> a.shape
(61,)
>>> a = stream_vals('meteo.csv', ('wind', 'rain')) | np.to_array()
>>> type(a)
>>> a.shape
(60, 2)
>>> a = stream_vals('meteo.csv', ('wind', 'rain')) | \
... filt(pre = lambda (wind, rain) : wind < 10 and rain < 10) | \
... np.to_array()
>>> type(a)
>>> a.shape
(48, 2)
If the the sole reason we're creating an array is for applying a NumPy function, we can :ref:`chain sinks `:
::
>>> print stream_vals('meteo.csv', 'rain') | (to_array() | sink(lambda a : numpy.median(a)))
and, of course, we can apply more than a single function to the array, like this:
::
>>> print stream_vals('meteo.csv', 'rain') | \
... (to_array() | sink(lambda a : (numpy.median(a), numpy.kurtosis(a))))
or like this:
::
>>> print stream_vals('meteo.csv', 'rain') | \
... (to_array() | sink(lambda a : numpy.median(a)) + sink(lambda a : numpy.kurtosis(a)))
Some aggregates, e.g., the median, cannot be calculated (or even approximated) using constant memory. This might cause a problem if the dataset is large. In such cases, we can use sub-sampling. The following samples approximately 1%
of the elements, and uses them to find the median:
::
>>> stream_vals('meteo.csv', 'rain') | prob_rand_sample(0.01) | (to_array() | sink(lambda a : numpy.median(a))
The following samples (with replacement) 100 of the elements, then uses them to find the median:
::
>>> stream_vals('meteo.csv', 'rain') | size_rand_sample(100) | (to_array() | sink(lambda a : numpy.median(a))
Other aggregates can be calculated using constant memory. In this case, using a DAGPype stage is more efficient than first streaming into a NumPy array, then calculating the aggregate. E.g., finding the mean through an array, like this:
::
>>> print stream_vals('meteo.csv', 'rain') | (to_array() | sink(lambda a : numpy.mean(a)))
can use much more memory than this version
::
>>> print stream_vals('meteo.csv', 'rain') | ave()
-------------------------
High Performance Chunking
-------------------------
Modern numeric libraries process data more efficiently in chunks. Even if the original data is logically a sequence of individual elements, we can utilize stages that chunk it, then process these chunks. The size of the chunks depends on the system: they should be large enough to take advantage of the chunk performance of the numerical library, but not so large that they overburden system memory.
See the :doc:`performance` page for the effect.
.. figure:: Chunking.png
:scale: 50 %
:alt: Chunking
E.g., the following code snippet shows how to calculate the correlation between two variables stored in a binary format:
::
print np.chunk_stream_bytes(_f_name, num_cols = 2) | np.corr()
The first stage streams chunks of data into arrays, in this case of 2 columns. The second stage calculates their correlation. If the file is in CSV format, we can do the following:
::
np.chunk_stream_vals('meteo.csv', ('day', 'wind')) | np.corr()
The first stage reads the 'day' and 'wind' columns from the CSV file, and emits tuples of chunks.
A stream of individual elements can be chunked to a stream of NumPy arrays using the :py:func:`dagpype.np.chunk` stage, then processed using other :py:mod:`dagpype.np` stages:
::
>> source([1, 2, 3, 4]) | np.chunk() | np.mean()
its complementary stage is :py:func:`dagpype.np.unchunk`.
The stages that actively chunk data from a stream in :py:mod:`dagpype.np` take the required chunk size as a parameter. For example, :py:func:`dagpype.np.chunk_stream_bytes` has the following interface:
::
def chunk_stream_bytes(stream, max_elems = 8192, dtype = numpy.float64, num_cols = 1):
"""
Reads a binary file containing a numpy.array, and emits a series of chunks. Each chunk
is a numpy array with num_cols columns.
Arguments:
stream -- Either the name of a file or a *binary* stream.
Keyword Arguments:
max_elems -- Number of rows per chunk (last might have less) (default 8192).
dtype -- Underlying element type (default numpy.float64)
num_cols -- Number of columns in the chunks' arrays (default 1).
See Also:
np.chunk_stream_vals
np.chunks_to_stream_bytes
Example:
>>> # Reads from a binary file, and writes the cumulative average to a different one.
>>> np.chunk_stream_bytes('foo.dat') | np.cum_ave() | np.chunks_to_stream_bytes('wind_ave.dat')
"""
A stream of chunks can be processed by either stages in :py:mod:`dagpype.np` or :py:mod:`dagpype`, however, the stages in :py:mod:`dagpype.np` semantically deal with the elements composing the array, whereas those in :py:mod:`dagpype` consider the arrays the elements themselves. For example:
::
>>> source([1, 2, 3, 4]) | np.chunk() | np.count()
4
>>> source([1, 2, 3, 4]) | np.chunk() | count()
1
In the above two examples, :py:func:`dagpype.np.chunk` happened to chunk the 4 elements into a single chunk. The first pipeline counted the total number of elements in the chunks as 4, and the second pipeline counted a single chunk.
Given NumPy's wealth of ways to manipulate arrays, it is often possible to manipulate a chunked stream by using
:py:mod:`dagpype`'s :py:func:`dagpype.filt` function with NumPy constructs, instead of writing specialized chunk-aware stages. For example, to calculate the correlation, pruning out values greater than 10 in each of some data, we can use:
::
np.chunk_stream_bytes(_f_name, num_cols = 2) | \
filt(lambda a : a[logical_and(a[:, 0] < 10, a[:, 1] < 10), :]) | \
np.corr()
and to truncate outliers to 10, we can use:
::
np.chunk_stream_bytes(_f_name, num_cols = 2) | \
filt(lambda a : where(a, a < 10, a, 10)) | \
np.corr()