# NumPy And High-Performance Chunking¶

• NumPy Array Sinks describes how to transform the output of a pipeline to a NumPy array, and some points regarding this.
• High Performance Chunking describes stages that chunk the data internally for high performance. Each such stage uses NumPy arrays internally.

The relevant stages are in the dagpype.np sub-package.

## NumPy Array Sinks¶

Much of the purpose of this library is for preprocessing data for further processing using other Python libraries.

To build a pipe resulting in a NumPy array, we can do something like one of the following:

>>> a = stream_vals('rain.txt') | np.to_array()
>>> type(a)
<type 'numpy.ndarray'>
>>> a.shape
(61,)

>>> a = stream_vals('meteo.csv', ('wind', 'rain')) | np.to_array()
>>> type(a)
<type 'numpy.ndarray'>
>>> a.shape
(60, 2)

>>> a = stream_vals('meteo.csv', ('wind', 'rain')) | \
...     filt(pre = lambda (wind, rain) : wind < 10 and rain < 10) | \
...     np.to_array()
>>> type(a)
<type 'numpy.ndarray'>
>>> a.shape
(48, 2)


If the the sole reason we’re creating an array is for applying a NumPy function, we can chain sinks:

>>> print stream_vals('meteo.csv', 'rain') | (to_array() | sink(lambda a : numpy.median(a)))


and, of course, we can apply more than a single function to the array, like this:

>>> print stream_vals('meteo.csv', 'rain') | \
...     (to_array() | sink(lambda a : (numpy.median(a), numpy.kurtosis(a))))


or like this:

>>> print stream_vals('meteo.csv', 'rain') | \
...     (to_array() | sink(lambda a : numpy.median(a)) + sink(lambda a : numpy.kurtosis(a)))


Some aggregates, e.g., the median, cannot be calculated (or even approximated) using constant memory. This might cause a problem if the dataset is large. In such cases, we can use sub-sampling. The following samples approximately 1% of the elements, and uses them to find the median:

>>> stream_vals('meteo.csv', 'rain') | prob_rand_sample(0.01) | (to_array() | sink(lambda a : numpy.median(a))


The following samples (with replacement) 100 of the elements, then uses them to find the median:

>>> stream_vals('meteo.csv', 'rain') | size_rand_sample(100) | (to_array() | sink(lambda a : numpy.median(a))


Other aggregates can be calculated using constant memory. In this case, using a DAGPype stage is more efficient than first streaming into a NumPy array, then calculating the aggregate. E.g., finding the mean through an array, like this:

>>> print stream_vals('meteo.csv', 'rain') | (to_array() | sink(lambda a : numpy.mean(a)))


can use much more memory than this version

>>> print stream_vals('meteo.csv', 'rain') | ave()


## High Performance Chunking¶

Modern numeric libraries process data more efficiently in chunks. Even if the original data is logically a sequence of individual elements, we can utilize stages that chunk it, then process these chunks. The size of the chunks depends on the system: they should be large enough to take advantage of the chunk performance of the numerical library, but not so large that they overburden system memory.

See the Performance page for the effect.

E.g., the following code snippet shows how to calculate the correlation between two variables stored in a binary format:

print np.chunk_stream_bytes(_f_name, num_cols = 2) | np.corr()


The first stage streams chunks of data into arrays, in this case of 2 columns. The second stage calculates their correlation. If the file is in CSV format, we can do the following:

np.chunk_stream_vals('meteo.csv', ('day', 'wind')) | np.corr()


The first stage reads the ‘day’ and ‘wind’ columns from the CSV file, and emits tuples of chunks.

A stream of individual elements can be chunked to a stream of NumPy arrays using the dagpype.np.chunk() stage, then processed using other dagpype.np stages:

>> source([1, 2, 3, 4]) | np.chunk() | np.mean()

its complementary stage is dagpype.np.unchunk().

The stages that actively chunk data from a stream in dagpype.np take the required chunk size as a parameter. For example, dagpype.np.chunk_stream_bytes() has the following interface:

def chunk_stream_bytes(stream, max_elems = 8192, dtype = numpy.float64, num_cols = 1):
"""
Reads a binary file containing a numpy.array, and emits a series of chunks. Each chunk
is a numpy array with num_cols columns.

Arguments:
stream -- Either the name of a file or a *binary* stream.

Keyword Arguments:
max_elems -- Number of rows per chunk (last might have less) (default 8192).
dtype -- Underlying element type (default numpy.float64)
num_cols -- Number of columns in the chunks' arrays (default 1).

np.chunk_stream_vals
np.chunks_to_stream_bytes

Example:

>>> # Reads from a binary file, and writes the cumulative average to a different one.
>>> np.chunk_stream_bytes('foo.dat') | np.cum_ave() | np.chunks_to_stream_bytes('wind_ave.dat')
"""


A stream of chunks can be processed by either stages in dagpype.np or dagpype, however, the stages in dagpype.np semantically deal with the elements composing the array, whereas those in dagpype consider the arrays the elements themselves. For example:

>>> source([1, 2, 3, 4]) | np.chunk() | np.count()
4
>>> source([1, 2, 3, 4]) | np.chunk() | count()
1


In the above two examples, dagpype.np.chunk() happened to chunk the 4 elements into a single chunk. The first pipeline counted the total number of elements in the chunks as 4, and the second pipeline counted a single chunk.

Given NumPy’s wealth of ways to manipulate arrays, it is often possible to manipulate a chunked stream by using dagpype‘s dagpype.filt() function with NumPy constructs, instead of writing specialized chunk-aware stages. For example, to calculate the correlation, pruning out values greater than 10 in each of some data, we can use:

np.chunk_stream_bytes(_f_name, num_cols = 2) | \
filt(lambda a : a[logical_and(a[:, 0] < 10, a[:, 1] < 10), :]) | \
np.corr()


and to truncate outliers to 10, we can use:

np.chunk_stream_bytes(_f_name, num_cols = 2) | \
filt(lambda a : where(a, a < 10, a, 10)) | \
np.corr()