# Stages¶

A stage is a pipeline element that creates data for the pipeline (e.g., by reading it from a file), manipulates it (e.g., by calculating its absolute value), or consumes it (e.g., by writing it to a file, or by calculating its average). The first type is a source, the second is a filter, and the third is a sink.

Note

• This page describes the different types of stages, and how to create new ones.
• Connecting Stages To DAGs describes how to connect stages to form DAGs.
• Reference describes the available stages.

## Sources¶

A source is a pipeline stage to which elements are not sent - it only sends on. A typical source is something that reads text files, CSV files, a numpy.Array, and so forth.

A source is formed either using the dagpype.source() function, or using the dagpype.sources() decorator.

### Creating A Source Via The dagpype.source() Function¶

The dagpype.source() function takes anything iterable. for example, to create a counter counting from 0 up to 5, we can use:

source(range(5))


or, more generally,

def counter(n):
return source(range(n))


### Creating A Source Via The dagpype.sources() Decorator¶

The dagpype.sources() decorator transforms a generator function to a source. Thus another way to create a counter is through

def count(n):
@sources
def _act():
for i in range(n):
yield i
return _act


The latter method is more appropriate for sources whose logic is more complex and stateful. Here is the classic Fibonacci example:

def fib(n):
@sources
def _act():
a, b, counter = 0, 1, 0
while True:
if (counter > n): return
yield a
a, b = b, a + b
counter += 1
return _act


## Filters¶

A filter is a pipeline stage to which elements are sent, and which sends elements as well.

A filter, in general, is formed using the dagpype.filters() decorator. Many common filters can be formed using the dagpype.filt() function.

### Creating A Filter Via The dagpype.filters() Decorator¶

The dagpype.filters() decorator transforms a coroutine function to a filter. The coroutine obtains elements through (yield), and sends them on through send. The GeneratorExit exception indicates that no more elements are coming.

For example, to create an absolute-value filter, we can use

def abs_():
def _act(target):
try:
while True:
target.send((yield))
except GeneratorExit:
target.close()
return _act


A filter need not necessarily send an element per element sent to it. Here is an appending filter:

def append(stuff):
def _act(target):
try:
while True:
target.send((yield))
except GeneratorExit:
target.send(stuff)
target.close()
return _act


### Creating A Filter Via The dagpype.filt() Function¶

In the very common case where a filter does send at most an element per element sent to it, possibly blocking some of them, the helper function filt can be used:

def filt(trans = None, pre = None, post = None):
"""
Filter (transform elements and / or suppress them).

Keyword arguments:
trans = Transformation function for each element (default None).
pre = Suppression function checked against each element before
transformation function, if any (default None).
post = Suppression function checked against each element after
transformation function, if any (default None).

The dagpype.abs_() filter, for example, can be written as:

filt(lambda x : abs(x))


## Sinks¶

A sink is a pipeline stage to which elements are sent for it to perform some final action.

A sink, in general, is formed using the dagpype.sinks() decorator. Many common sinks can be formed using the dagpype.sink() function.

### Creating A Sink Via The dagpype.sinks() Decorator¶

The dagpype.sinks() decorator transforms a coroutine function to a filter. The coroutine obtains elements through (yield). It sends its final output in the GeneratorExit handler.

For example, to create a sink determining if the sequence has even or odd length, we can use:

def odd_count():
def _act(target):
try:
n = 0
while True:
(yield)
n += 1
except GeneratorExit:
# This is the final result of the sink.
target.send(n % 2 == 1)
target.close()
return _act


### Creating A Sink Via The dagpype.sink() Function¶

There are two common special cases in which a sink can be formed more easily using the dagpype.sink() function.

In the first case, the desired result is some Python object independent from the input sequence. Calling dagpype.sink() with a Python object (except a function) forms a sink whose output is unconditionally that object. For example:

>> source([1, 2]) | sink('hello')
'hello'

>> source([1, 2, 3]) | sink('hello')
'hello'

In the second case, the desired result is the application of a function to the last element of the sequence. Calling dagpype.sink() with a function forms a sink whose output is the application of the function to the last element of the sequence. For example:

>> source([1, 2, 3]) | sink(lambda x : x ** 2)
9