====== Stages ====== A *stage* is a pipeline element that creates data for the pipeline (e.g., by reading it from a file), manipulates it (e.g., by calculating its absolute value), or consumes it (e.g., by writing it to a file, or by calculating its average). The first type is a source_, the second is a filter_, and the third is a sink_. .. Note:: * This page describes the different types of stages, and how to create new ones. * :doc:`connecting_stages` describes how to connect stages to form DAGs. * :doc:`reference` describes the available stages. .. _source: ------- Sources ------- A *source* is a pipeline stage to which elements are not sent - it only sends on. A typical source is something that reads text files, CSV files, a numpy.Array, and so forth. A source is formed either using the :py:func:`dagpype.source` function, or using the :py:func:`dagpype.sources` decorator. Creating A Source Via The :py:func:`dagpype.source` Function ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The :py:func:`dagpype.source` function takes anything iterable. for example, to create a counter counting from 0 up to 5, we can use: :: source(range(5)) or, more generally, :: def counter(n): return source(range(n)) Creating A Source Via The :py:func:`dagpype.sources` Decorator ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The :py:func:`dagpype.sources` decorator transforms a generator function to a source. Thus another way to create a counter is through :: def count(n): @sources def _act(): for i in range(n): yield i return _act The latter method is more appropriate for sources whose logic is more complex and stateful. Here is the classic Fibonacci example: :: def fib(n): @sources def _act(): a, b, counter = 0, 1, 0 while True: if (counter > n): return yield a a, b = b, a + b counter += 1 return _act .. _filter: ------- Filters ------- A *filter* is a pipeline stage to which elements are sent, and which sends elements as well. A filter, in general, is formed using the :py:func:`dagpype.filters` decorator. Many common filters can be formed using the :py:func:`dagpype.filt` function. Creating A Filter Via The :py:func:`dagpype.filters` Decorator ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The :py:func:`dagpype.filters` decorator transforms a coroutine function to a filter. The coroutine obtains elements through ``(yield)``, and sends them on through ``send``. The :py:exc:`GeneratorExit` exception indicates that no more elements are coming. For example, to create an absolute-value filter, we can use :: def abs_(): def _act(target): try: while True: target.send((yield)) except GeneratorExit: target.close() return _act A filter need not necessarily send an element per element sent to it. Here is an appending filter: :: def append(stuff): def _act(target): try: while True: target.send((yield)) except GeneratorExit: target.send(stuff) target.close() return _act Creating A Filter Via The :py:func:`dagpype.filt` Function ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ In the very common case where a filter does send at most an element per element sent to it, possibly blocking some of them, the helper function `filt` can be used: :: def filt(trans = None, pre = None, post = None): """ Filter (transform elements and / or suppress them). Keyword arguments: trans = Transformation function for each element (default None). pre = Suppression function checked against each element before transformation function, if any (default None). post = Suppression function checked against each element after transformation function, if any (default None). The :py:func:`dagpype.abs_` filter, for example, can be written as: :: filt(lambda x : abs(x)) .. _sink: ----- Sinks ----- A *sink* is a pipeline stage to which elements are sent for it to perform some final action. A sink, in general, is formed using the :py:func:`dagpype.sinks` decorator. Many common sinks can be formed using the :py:func:`dagpype.sink` function. Creating A Sink Via The :py:func:`dagpype.sinks` Decorator ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The :py:func:`dagpype.sinks` decorator transforms a coroutine function to a filter. The coroutine obtains elements through `(yield)`. It sends its final output in the :py:exc:`GeneratorExit` handler. For example, to create a sink determining if the sequence has even or odd length, we can use: :: def odd_count(): def _act(target): try: n = 0 while True: (yield) n += 1 except GeneratorExit: # This is the final result of the sink. target.send(n % 2 == 1) target.close() return _act Creating A Sink Via The :py:func:`dagpype.sink` Function ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ There are two common special cases in which a sink can be formed more easily using the :py:func:`dagpype.sink` function. In the first case, the desired result is some Python object independent from the input sequence. Calling :py:func:`dagpype.sink` with a Python object (except a function) forms a sink whose output is unconditionally that object. For example: :: >> source([1, 2]) | sink('hello') 'hello' >> source([1, 2, 3]) | sink('hello') 'hello' In the second case, the desired result is the application of a function to the last element of the sequence. Calling :py:func:`dagpype.sink` with a function forms a sink whose output is the application of the function to the last element of the sequence. For example: :: >> source([1, 2, 3]) | sink(lambda x : x ** 2) 9