Data Pipes and Data Processing Streams

For introduction on Streams see /processing_streams

See also

For complete information about nodes see Node Reference

class brewery.streams.Stream(nodes=None, connections=None)

Creates a data stream.

Parameters :
  • nodes - dictionary with keys as node names and values as nodes
  • connections - list of two-item tuples. Each tuple contains source and target node or source and target node name.
  • stream - another stream or
configure(config=None)

Configure node properties based on configuration. Only named nodes can be configured at the moment.

config is a list of dictionaries with keys: node - node name, parameter - node parameter name, value - parameter value

fork()

Creates a construction fork of the stream. Used for constructing streams in functional fashion. Example:

stream = Stream()

fork = stream.fork()
fork.csv_source("fork.csv")
fork.formatted_printer()

stream.run()

Fork responds to node names as functions. The function arguments are the same as node constructor (__init__ method) arguments. Each call will append new node to the fork and will connect the new node to the previous node in the fork.

To configure current node you can use fork.node, like:

fork.csv_source("fork.csv")
fork.node.read_header = True

To set actual node name use set_name():

fork.csv_source("fork.csv")
fork.set_name("source")

...

source_node = stream.node("source")

To fork a fork, just call fork()

run()

Run all nodes in the stream.

Each node is being wrapped and run in a separate thread.

When an exception occurs, the stream is stopped and all catched exceptions are stored in attribute exceptions.

update(nodes=None, connections=None)

Adds nodes and connections specified in the dictionary. Dictionary might contain node names instead of real classes. You can use this method for creating stream from a dictionary that was created from a JSON file, for example.

class brewery.streams.StreamRuntimeError(message=None, node=None, exception=None)

Exception raised when a node fails during run() phase.

Attributes:
  • message: exception message
  • node: node where exception was raised
  • exception: exception that was raised while running the node
  • traceback: stack traceback
  • inputs: array of field lists for each input
  • output: output field list
print_exception(output=None)

Prints exception and details in human readable form. You can specify IO stream object in output parameter. By default text is printed to standard output.

class brewery.streams.FieldError

Exception raised on field incompatibility or missing fields.

class brewery.streams.Pipe(buffer_size=1000)

Creates uni-drectional data pipe for passing data between two threads in batches of size buffer_size.

If receiving node is finished with source data and does not want anything any more, it should send done_receiving() to the pipe. In most cases, stream runner will send done_receiving() to all input pipes when node’s run() method is finished.

If sending node is finished, it should send done_sending() to the pipe, however this is not necessary in most cases, as the method for running stream flushes outputs automatically on when node run() method is finished.

closed()

Return True if pipe is closed - not sending or not receiving data any more.

done_receiving()

Close pipe from either side

done_sending()

Close pipe from sender side

put(obj)

Put data object into the pipe buffer. When buffer is full it is enqueued and receiving node can get all buffered data objects.

Puttin object into pipe is not thread safe. Only one thread sohuld write to the pipe.

rows()

Get data object from pipe. If there is no buffer ready, wait until source object sends some data.

class brewery.nodes.Node

Creates a new data processing node.

Attributes :
  • inputs: input pipes
  • outputs: output pipes
  • description: custom node annotation
configure(config, protected=False)

Configure node.

Parameters :
  • config - a dictionary containing node attributes as keys and values as attribute values. Key type is ignored as it is used for node creation.
  • protected - if set to True only non-protected attributes are set. Attempt to set protected attribute will result in an exception. Use protected when you are configuring nodes through a user interface or a custom tool. Default is False: all attributes can be set.

If key in the config dictionary does not refer to a node attribute specified in node description, then it is ignored.

finalize()

Finalizes the node. Default implementation does nothing.

classmethod identifier()

Returns an identifier name of the node class. Identifier is used for construction of streams from dictionaries or for any other out-of-program constructions.

Node identifier is specified in the node_info dictioanry as name. If no explicit identifier is specified, then decamelized class name will be used with node suffix removed. For example: CSVSourceNode will be csv_source.

initialize()

Initializes the node. Initialization is separated from creation. Put any Node subclass initialization in this method. Default implementation does nothing.

input

Return single node imput if exists. Convenience property for nodes which process only one input. Raises exception if there are no inputs or are more than one imput.

input_fields

Return fields from input pipe, if there is one and only one input pipe.

output_field_names

Convenience method for gettin names of fields generated by the node. For more information see brewery.nodes.Node.output_fields()

output_fields

Return fields passed to the output by the node.

Subclasses should override this method. Default implementation returns same fields as input has, raises exception when there are more inputs or if there is no input connected.

put(obj)

Put row into all output pipes.

Raises NodeFinished exception when node’s target nodes are not receiving data anymore. In most cases this exception might be ignored, as it is handled in the node thread wrapper. If you want to perform necessary clean-up in the run() method before exiting, you should handle this exception and then re-reaise it or just simply return from run().

This method can be called only from node’s run() method. Do not call it from initialize() or finalize().

put_record(obj)

Put record into all output pipes. Convenience method. Not recommended to be used.

Depreciated.

reset_type(name)

Remove all retype information for field name

retype(name, **attributes)

Retype an output field name to field field.

run()

Main method for running the node code. Subclasses should implement this method.

class brewery.nodes.SourceNode

Abstract class for all source nodes

All source nodes should provide an attribute or implement a property (@property) called output_fields.

class brewery.nodes.TargetNode

Abstract class for all target nodes

This Page