dispel4py package

Subpackages

Submodules

dispel4py.base module

Base PEs implementing typical processing patterns.

class dispel4py.base.BasePE(inputs=[], outputs=[], numInputs=0, numOutputs=0)

Bases: dispel4py.core.GenericPE

A basic implementation of a GenericPE that allows to easily extend the GenericPE with named inputs and outputs.

INPUT_NAME = 'input'
OUTPUT_NAME = 'output'
class dispel4py.base.CompositePE(create_graph=None, params=None)

Bases: dispel4py.workflow_graph.WorkflowGraph

Super class for composite PEs. When extending or instantiating this PE a function may be provided to populate the graph. For example:

def create_graph(graph):
    prod = Producer()
    cons = Consumer()
    graph.connect(prod, 'output', cons, 'input')
    graph._map_output('output', cons, 'output')


class MyCompositePE(CompositePE):
    def __init__(self):
        CompositePE.__init__(self, create_graph)

This composite PE can be created with the usual command:

comp = MyCompositeTestPE()

It is also possible to pass parameters to the composite PE when instantiating:

def create_graph(limit):
    prod = Producer(limit)
    cons = Consumer()
    graph.connect(prod, 'output', cons, 'input')
    graph._map_output('output', cons, 'output')

class ParameterisedCompositePE(CompositePE):
    def __init__(self, limit)
        CompositePE.__init__(self, create_graph, limit)
class dispel4py.base.ConsumerPE

Bases: dispel4py.base.BasePE

A PE that has one input named “input” and no outputs. Subclasses are expected to override _process() to implement processing.

INPUT_NAME = 'input'
process(inputs)

Calls the implementation of _process() of the subclass with the data read from the input stream.

class dispel4py.base.IterativePE

Bases: dispel4py.base.BasePE

An iterative PE that has one input and one output stream. When executed, this PE produces one output data block for each input block. Subclasses are expected to override _process() to implement processing.

INPUT_NAME = 'input'
OUTPUT_NAME = 'output'
process(inputs)

Calls the implementation of _process() of the subclass with the data read from the input stream and writes the return value to the output stream.

class dispel4py.base.ProducerPE

Bases: dispel4py.base.BasePE

A PE that has no input and one output named “output”. Subclasses are expected to override _process() to implement processing.

OUTPUT_NAME = 'output'
process(inputs)

Calls the implementation of _process() of the subclass.

class dispel4py.base.SimpleFunctionPE(compute_fn=None, params={})

Bases: dispel4py.base.IterativePE

INPUT_NAME = 'input'
OUTPUT_NAME = 'output'
dispel4py.base.create_iterative_chain(functions, FunctionPE_class=<class 'dispel4py.base.SimpleFunctionPE'>, name_prefix='PE_', name_suffix='')

Creates a composite PE wrapping a pipeline that processes obspy streams. :param chain: list of functions that process data iteratively. The function accepts one input parameter, data, and returns an output data block (or None). :param requestId: id of the request that the stream is associated with :param controlParameters: environment parameters for the processing elements :rtype: dictionary inputs and outputs of the composite PE that was created

dispel4py.core module

The core module for dispel4py.

class dispel4py.core.GenericPE(numprocesses=1)

Bases: object

Base class for Dispel4Py processing elements (PEs). Custom PEs are expected to extend this class and override the ‘process’ function.

Custom PEs must override __init__() to declare the inputs and outputs that can be connected within the workflow graph, by defining a NAME and possibly a TYPE. The type of a connection is specific to the enactment system. In the example below the target system is Storm and the type declares what kind of tuples are produced:

out1[TYPE] = ['timestamp', 'origin', 'streams']

In some cases, the output types are determined dynamically and depend on the input types, for example when implementing a filter which consumes any type of blocks but the type of the output is the same as the type of the input. The graph framework supports this by propagating types across the workflow before enactment and providing each PE with the input types that it can expect in the method:

setInputTypes(self, types)

which can be overridden to deduce output types from input types or to raise an error if the types are not acceptable. The PE may then override the method:

getOutputTypes(self)

to declare the output types that it produces. In the example of a filter PE this method would return the input types.

Custom PEs may implement the method preprocess() to initialise variables or data before processing commences.

Example implementation:

import traceback
import cStringIO
import base64
from obspy.core import read,UTCDateTime,Stream,Trace
from dispel4py.core import GenericPE, NAME, TYPE

INPUT_NAME = 'input'
OUTPUT_NAME = 'output'

class AppendAndSynchronize(GenericPE):

    def __init__(self):
        GenericPE.__init__(self)
        self._add_input(INPUT_NAME)
        self._add_output(OUTPUT_NAME, ['timestamp', 'origin', 'streams'])

    def process(self, inputs):
        values = inputs[INPUT_NAME]
        parameters = values[0]
        origin = values[1]
        data = values[2:]
        if not data:
            self.error+= "No Data";
            raise Exception("No Data!")

        streams=list();
        while data:
            streamItem=data.pop(0);
            streams.append(eval(streamItem["data"]))

        # Reads the first file
        st = read(self.rootpath+"%s" % (streams[0].pop(0),))

        #Reads the following files
        while streams[0]:
            ff= "%s" % (streams[0].pop(0),)
            st += read(self.rootpath+ff)

        starttime="%s" % (parameters["starttime"])
        endtime="%s" % (parameters["endtime"])

        st=st.slice(UTCDateTime(starttime),UTCDateTime(endtime));
        streamtransfer={}
        if type(st) == Stream:
            memory_file = cStringIO.StringIO()
            mseed = st.write(memory_file, format="MSEED")
            streamtransfer={"data":base64.b64encode(memory_file.getvalue())}
        output = [ parameters, origin, streamtransfer ]

        return { OUTPUT_NAME : output }
getOutputTypes()

Returns the output types of this PE, in the form of a dictionary. This method may be overridden if output types are not static and depend on input types.

Note

This method is only called after the input types have been initialised in setInputTypes().

Return type:a dictionary mapping each output name to its type

By default it returns a dictionary of the types defined in the ‘outputconnections’ instance variable.

Usage example:

def getOutputTypes(self):
    output = { 'output1' : myInputs['input1'],
               'output2' : [ 'comment' ] }
postprocess()

This method is called once after the last block has been processed and a terminate message was sent to this PE.

preprocess()

This method called once before processing commences.

process(inputs)

The ‘inputs’ dictionary contains data from any or all of the streams that are connected to this PE, in any order. The return value of this function is a single output dictionary, with the names of the output streams as keys. PE base classes may override this method to allow preprocessing of data before calling _process() with the prepared data on the PE implementation. The data returned by the implementation may then be postprocessed before it is returned to the implementation framework.

Parameters:inputs (dictionary) – the input data for this iteration
Return type:a dictionary with the output data
setInputTypes(types)

Sets the input types of this PE, in the form of a dictionary. It is meant to be overridden, e.g. if output types depend on input.

Note

This method is always called before getOutputTypes().

Parameters:types (dictionary mapping input name to input type) – object types for each input stream

Usage example:

pe.setInputTypes({'input1':['t1', 't2', 't3'],                               'input2':['t4', 't5']})
write(name, data, **kwargs)

Allows for preprocessing of data to be written to the output pipe. This method should be overridden by PE base classes.

class dispel4py.core.LockstepPE

Bases: dispel4py.core.GenericPE

Representation of a PE which consumes its input in lockstep. The inputs dictionary that is passed to the process() function is guaranteed to contain one data item from each of the connected input streams.

class dispel4py.core.SourcePE

Bases: dispel4py.core.GenericPE

Representation of data-producing PE, i.e. a PE with no input connections.

dispel4py.utils module

Collection of dispel4py utilities.

class dispel4py.utils.DummyModule

Bases: module

dispel4py.utils.findWorkflowGraph(mod, attr)
dispel4py.utils.loadGraph(module_name, attr=None)

Loads a graph from the given module.

dispel4py.utils.loadGraphFromFile(module_name, path, attr=None)
dispel4py.utils.loadGraphFromSource(module_name, source, attr=None)
dispel4py.utils.loadGraphIgnoreImports(module_name, graph_var)

Loads a graph from the given module and ignores any import errors.

dispel4py.utils.loadIgnoreImports(module_name, attr_name, code)

Import a module from source and return the specified attribute.

Parameters:
  • module_name – name of the module to load
  • attr_name – name of the attribute within the module
  • code – source code of the module
dispel4py.utils.loadSource(module_name, path, attr_name)

Import a module from the given source file at ‘path’ and return the named attribute ‘attr_name’.

Parameters:
  • module_name – name of the module to load
  • path – location of the source file
  • attr_name – name of the attribute within the module
dispel4py.utils.loadSourceIgnoreImports(module_name, path, attr_name)

Import a module from the given source file at ‘path’ and return the named attribute ‘attr_name’. Any import errors are being ignored.

Parameters:
  • module_name – name of the module to load
  • path – location of the source file
  • attr_name – name of the attribute within the module
dispel4py.utils.load_graph(graph_source, attr=None)
dispel4py.utils.make_hash(o)

Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).

dispel4py.utils.total_size(o, handlers={}, verbose=False)

From: http://code.activestate.com/recipes/577504/ Returns the approximate memory footprint an object and all of its contents.

Automatically finds the contents of the following builtin containers and their subclasses: tuple, list, deque, dict, set and frozenset. To search other containers, add handlers to iterate over their contents:

handlers = {SomeContainerClass: iter,
OtherContainerClass: OtherContainerClass.get_elements}

dispel4py.visualisation module

The IPython module for visualising a dispel4py graph using Graphviz dot.

For example, to visualise a graph named ‘pipeline’:

from dispel4py.visualisation import display
display(pipeline)
dispel4py.visualisation.display(graph)

Visualises the input graph.

dispel4py.workflow_graph module

The dispel4py workflow graph.

class dispel4py.workflow_graph.WorkflowGraph

Bases: object

A graph representing the workflow and related methods

add(n)

Adds node n, which must be an instance of dispel4py.core.GenericPE, and returns the created workflow node.

Return type:WorkflowNode
connect(fromNode, fromConnection, toNode, toConnection)

Connect the two given nodes from the given output to the given input. If the nodes are not in the graph, they will be added.

Parameters:
  • fromNode – the source PE of the connection
  • fromConnection – the name of the output of the source node

‘fromNode’ :type fromConnection: String :param toNode: the destination PE of the connection :param toConnection: the name of the input of the destination node ‘toNode’ :type toConnection: String

flatten()

Subgraphs contained within composite PEs are added to the top level workflow.

getContainedObjects()
propagate_types()

Propagates the types throughout the graph by retrieving the output types from each node, starting from the root, and providing them to connected consumers.

class dispel4py.workflow_graph.WorkflowNode(o)

Wrapper class for workflow nodes - wraps around general subclasses of classes denoting PEs, that is GenericPEs.

WORKFLOW_NODE_CP = 2
WORKFLOW_NODE_FN = 1
WORKFLOW_NODE_PE = 0
getContainedObject()

Returns the wrapped PE or function.

node_counter = 0
dispel4py.workflow_graph.draw(graph)

Creates a representation of the workflow graph in the dot language.

dispel4py.workflow_graph.drawDot(graph)

Draws the workflow as a graph and creates a PNG image using graphviz dot.

dispel4py.workflow_graph.getConnectedInputs(node, graph)

Module contents

The core packages for Dispel4Py.