=================================== NumPy And High-Performance Chunking =================================== This page contains two sections related to NumPy_: .. _NumPy: http://www.numpy.org/ * `NumPy Array Sinks`_ describes how to transform the output of a pipeline to a NumPy array, and some points regarding this. * `High Performance Chunking`_ describes stages that chunk the data internally for high performance. Each such stage uses NumPy arrays internally. The relevant stages are in the :py:mod:`dagpype.np` sub-package. ----------------- NumPy Array Sinks ----------------- Much of the purpose of this library is for preprocessing data for further processing using other Python libraries. To build a pipe resulting in a NumPy array, we can do something like one of the following: :: >>> a = stream_vals('rain.txt') | np.to_array() >>> type(a) >>> a.shape (61,) >>> a = stream_vals('meteo.csv', ('wind', 'rain')) | np.to_array() >>> type(a) >>> a.shape (60, 2) >>> a = stream_vals('meteo.csv', ('wind', 'rain')) | \ ... filt(pre = lambda (wind, rain) : wind < 10 and rain < 10) | \ ... np.to_array() >>> type(a) >>> a.shape (48, 2) If the the sole reason we're creating an array is for applying a NumPy function, we can :ref:`chain sinks `: :: >>> print stream_vals('meteo.csv', 'rain') | (to_array() | sink(lambda a : numpy.median(a))) and, of course, we can apply more than a single function to the array, like this: :: >>> print stream_vals('meteo.csv', 'rain') | \ ... (to_array() | sink(lambda a : (numpy.median(a), numpy.kurtosis(a)))) or like this: :: >>> print stream_vals('meteo.csv', 'rain') | \ ... (to_array() | sink(lambda a : numpy.median(a)) + sink(lambda a : numpy.kurtosis(a))) Some aggregates, e.g., the median, cannot be calculated (or even approximated) using constant memory. This might cause a problem if the dataset is large. In such cases, we can use sub-sampling. The following samples approximately 1% of the elements, and uses them to find the median: :: >>> stream_vals('meteo.csv', 'rain') | prob_rand_sample(0.01) | (to_array() | sink(lambda a : numpy.median(a)) The following samples (with replacement) 100 of the elements, then uses them to find the median: :: >>> stream_vals('meteo.csv', 'rain') | size_rand_sample(100) | (to_array() | sink(lambda a : numpy.median(a)) Other aggregates can be calculated using constant memory. In this case, using a DAGPype stage is more efficient than first streaming into a NumPy array, then calculating the aggregate. E.g., finding the mean through an array, like this: :: >>> print stream_vals('meteo.csv', 'rain') | (to_array() | sink(lambda a : numpy.mean(a))) can use much more memory than this version :: >>> print stream_vals('meteo.csv', 'rain') | ave() ------------------------- High Performance Chunking ------------------------- Modern numeric libraries process data more efficiently in chunks. Even if the original data is logically a sequence of individual elements, we can utilize stages that chunk it, then process these chunks. The size of the chunks depends on the system: they should be large enough to take advantage of the chunk performance of the numerical library, but not so large that they overburden system memory. See the :doc:`performance` page for the effect. .. figure:: Chunking.png :scale: 50 % :alt: Chunking E.g., the following code snippet shows how to calculate the correlation between two variables stored in a binary format: :: print np.chunk_stream_bytes(_f_name, num_cols = 2) | np.corr() The first stage streams chunks of data into arrays, in this case of 2 columns. The second stage calculates their correlation. If the file is in CSV format, we can do the following: :: np.chunk_stream_vals('meteo.csv', ('day', 'wind')) | np.corr() The first stage reads the 'day' and 'wind' columns from the CSV file, and emits tuples of chunks. A stream of individual elements can be chunked to a stream of NumPy arrays using the :py:func:`dagpype.np.chunk` stage, then processed using other :py:mod:`dagpype.np` stages: :: >> source([1, 2, 3, 4]) | np.chunk() | np.mean() its complementary stage is :py:func:`dagpype.np.unchunk`. The stages that actively chunk data from a stream in :py:mod:`dagpype.np` take the required chunk size as a parameter. For example, :py:func:`dagpype.np.chunk_stream_bytes` has the following interface: :: def chunk_stream_bytes(stream, max_elems = 8192, dtype = numpy.float64, num_cols = 1): """ Reads a binary file containing a numpy.array, and emits a series of chunks. Each chunk is a numpy array with num_cols columns. Arguments: stream -- Either the name of a file or a *binary* stream. Keyword Arguments: max_elems -- Number of rows per chunk (last might have less) (default 8192). dtype -- Underlying element type (default numpy.float64) num_cols -- Number of columns in the chunks' arrays (default 1). See Also: np.chunk_stream_vals np.chunks_to_stream_bytes Example: >>> # Reads from a binary file, and writes the cumulative average to a different one. >>> np.chunk_stream_bytes('foo.dat') | np.cum_ave() | np.chunks_to_stream_bytes('wind_ave.dat') """ A stream of chunks can be processed by either stages in :py:mod:`dagpype.np` or :py:mod:`dagpype`, however, the stages in :py:mod:`dagpype.np` semantically deal with the elements composing the array, whereas those in :py:mod:`dagpype` consider the arrays the elements themselves. For example: :: >>> source([1, 2, 3, 4]) | np.chunk() | np.count() 4 >>> source([1, 2, 3, 4]) | np.chunk() | count() 1 In the above two examples, :py:func:`dagpype.np.chunk` happened to chunk the 4 elements into a single chunk. The first pipeline counted the total number of elements in the chunks as 4, and the second pipeline counted a single chunk. Given NumPy's wealth of ways to manipulate arrays, it is often possible to manipulate a chunked stream by using :py:mod:`dagpype`'s :py:func:`dagpype.filt` function with NumPy constructs, instead of writing specialized chunk-aware stages. For example, to calculate the correlation, pruning out values greater than 10 in each of some data, we can use: :: np.chunk_stream_bytes(_f_name, num_cols = 2) | \ filt(lambda a : a[logical_and(a[:, 0] < 10, a[:, 1] < 10), :]) | \ np.corr() and to truncate outliers to 10, we can use: :: np.chunk_stream_bytes(_f_name, num_cols = 2) | \ filt(lambda a : where(a, a < 10, a, 10)) | \ np.corr()