# Connecting Stages To DAGs¶

Stages can be connected recursively to form any DAG (Directed Acyclic Graph). | (pipe) is used to chain stages, and + (fan) is used to stack stages.

## | (Pipe)¶

1. <source> | <filter> forms a source.

For example,

source([1, 2, 3, 4]) | filt(lambda x : x ** 2)


is a source sending 1, then 4, then 9, then 16.

2. <filter_1> | <filter_2> forms a filter.

For example,

select_inds((0, 1)) | cast((float, float))


is a filter that, if fed a sequence of tuples, would transform ('1', '2', '3') to the tuple (1, 2).

3. <filter> | <sink> forms a sink.

For example,

prepend('results:') | to_stream('out.txt'):

is a sink that prints to ‘out.txt’ first ‘results:’, then whatever is sent to it.

4. <source> | <sink> results in the last object the sink sends on.

For example,

source(range(4)) | sum_of()


is 6.

1. <sink_1> | <sink_2> forms a sink.

For example,

to_array() | sink(lambda a : numpy.median(a))


is a sink that calculates a median of a sequence.

Note

Parentheses might be necessary for chaining sinks. For example,

>> source([1, 2, 3, 4, 5]) | (to_array() | sink(lambda a : numpy.median(a)))
3

is fine, but

 >> source([1, 2, 3, 4, 5]) | to_array() | sink(lambda a : numpy.median(a))
TypeError: unsupported operand type(s) for |: 'int' and '_SnkPiped'

is not, because this is the same as

>> numpy.array([1, 2, 3, 4, 5]) | sink(lambda a : numpy.median(a))

## + (Fan)¶

1. <source_1> + <source_2> + ... + <source_n> forms a source which sends tuples of the source sends.

For example,

source([1, 2, 3]) + source([4, 5, 6])


sends the sequence of pairs (1, 4), (2, 5), (3, 6).

Similarly,

read_lines('wind.txt') + read_lines('rain.txt') + read_lines('hail.txt')


is a source which sends a sequence of triplets of lines from files.

2. <filter_1> + <filter_2> + ... + <filter_n> forms a filter to which whatever is sent, is duplicated along the n branches, and the sends of these branches are combined to tuples.

For example,

filt(lambda x : min(x, 5)) + filt(lambda x : min(x, 10))


forms a filter, that transforms each element into a pair, the first value of which is the element truncated to 5, and the second value of which is the element truncated to 10.

Similarly,

skip_n(-5) + skip_n(5)


forms a filter which transforms a sequence into a pair of itself shifted 5 to the past and the future.

3. <sink_1> + <sink_2> + ... + <sink_n> forms a sink whose result is the tuple of the branch results.

For example,

min_() + max_() + sum_()


is a sink that returns the triplet of the minimum, maximum, and sum of the sequence passed through it.