A stage is a pipeline element that creates data for the pipeline (e.g., by reading it from a file), manipulates it (e.g., by calculating its absolute value), or consumes it (e.g., by writing it to a file, or by calculating its average). The first type is a source, the second is a filter, and the third is a sink.
A source is a pipeline stage to which elements are not sent - it only sends on. A typical source is something that reads text files, CSV files, a numpy.Array, and so forth.
The dagpype.source() function takes anything iterable. for example, to create a counter counting from 0 up to 5, we can use:
or, more generally,
def counter(n): return source(range(n))
The dagpype.sources() decorator transforms a generator function to a source. Thus another way to create a counter is through
def count(n): @sources def _act(): for i in range(n): yield i return _act
The latter method is more appropriate for sources whose logic is more complex and stateful. Here is the classic Fibonacci example:
def fib(n): @sources def _act(): a, b, counter = 0, 1, 0 while True: if (counter > n): return yield a a, b = b, a + b counter += 1 return _act
A filter is a pipeline stage to which elements are sent, and which sends elements as well.
The dagpype.filters() decorator transforms a coroutine function to a filter. The coroutine obtains elements through (yield), and sends them on through send. The GeneratorExit exception indicates that no more elements are coming.
For example, to create an absolute-value filter, we can use
def abs_(): def _act(target): try: while True: target.send((yield)) except GeneratorExit: target.close() return _act
A filter need not necessarily send an element per element sent to it. Here is an appending filter:
def append(stuff): def _act(target): try: while True: target.send((yield)) except GeneratorExit: target.send(stuff) target.close() return _act
In the very common case where a filter does send at most an element per element sent to it, possibly blocking some of them, the helper function filt can be used:
def filt(trans = None, pre = None, post = None): """ Filter (transform elements and / or suppress them). Keyword arguments: trans = Transformation function for each element (default None). pre = Suppression function checked against each element before transformation function, if any (default None). post = Suppression function checked against each element after transformation function, if any (default None).
The dagpype.abs_() filter, for example, can be written as:
filt(lambda x : abs(x))
A sink is a pipeline stage to which elements are sent for it to perform some final action.
The dagpype.sinks() decorator transforms a coroutine function to a filter. The coroutine obtains elements through (yield). It sends its final output in the GeneratorExit handler.
For example, to create a sink determining if the sequence has even or odd length, we can use:
def odd_count(): def _act(target): try: n = 0 while True: (yield) n += 1 except GeneratorExit: # This is the final result of the sink. target.send(n % 2 == 1) target.close() return _act
There are two common special cases in which a sink can be formed more easily using the dagpype.sink() function.
In the first case, the desired result is some Python object independent from the input sequence. Calling dagpype.sink() with a Python object (except a function) forms a sink whose output is unconditionally that object. For example:
>> source([1, 2]) | sink('hello') 'hello' >> source([1, 2, 3]) | sink('hello') 'hello'
In the second case, the desired result is the application of a function to the last element of the sequence. Calling dagpype.sink() with a function forms a sink whose output is the application of the function to the last element of the sequence. For example:
>> source([1, 2, 3]) | sink(lambda x : x ** 2) 9