Sphinx logo

pipeline – classes for data reduction and analysis pipelines

The pipeline module contains classes and utilities for constructing data pipelines – linear constructs of operations that process input data, passing it through all pipeline stages.

Pipelines are represented by the Pipeline class, which is composed of a sequence of PipelineElement objects representing the processing stages. Astropysics includes some pipeline elements built-in, but PipelineElement can also easily be subclassed to provide cusotmized pipeline functionality.

Once the pipeline is constructed, data can be queued into the first stage with the Pipeline.feed() method (or Pipeline.feedMany()), where the type of the data expected is determined by the PipelineElement objects.

Special objects (subclasses of PipelineMessage) can also be fed into pipelines to indicate to various pipeline elements to change how they behave. This allows a pipeline to adapt based on the data. e.g. If two different exposure times are being processed in a image reduction pipeline, after the images from the first set of exposure times are processed, a AccumulateMessage can be added to provide a new set of darks before the second set of images are fed into the pipeline.

Classes and Inheritance Structure

Inheritance diagram of astropysics.pipeline

Module API

class astropysics.pipeline.AccumulateMessage(target, naccum=1, setattrname=None, filterfunc=None)

Bases: astropysics.pipeline.PipelineMessage

This object is a PipelineMessage that tells the target PipelineElement to accumulate the next naccum inputs from the earlier stage as a list, and then either pass this list in as data to the next stage, or set it to an attribute on the element.

Parameters:
  • target – The target pipeline element to receive this message.
  • naccum (int) – The number of inputs to accumulate before continuing normal operation of the pipeline element.
  • setattrname – The name of the attribute that the result of the accumulation should be set to. Alternatively, it can be None, in which case the accumulated list will be passed in as the next set of input data.
  • filterfunc – A callable that is called when as filterfunc(accum) where accum is the accumulated list of data, and should return the object to be set to the attribute setattrname. If None, this step is not performed (e.g. a list with the accumulated data is the result).
deliverMessage(elem)
class astropysics.pipeline.CallMethodMessage(target, methodname, *args, **kwargs)

Bases: astropysics.pipeline.PipelineMessage

This object is a PipelineMessage that calls a method on the target pipeline element.

The retval attribute stores the return value for the method call as a list (in the same order that the calls occur, if the message is passed to multiple elements).

Parameters:
  • target – The target pipeline element to receive this message.
  • methodname (string) – The name of the method that should be called.

Further arguments and keyword arguments will be passed in as the arguments and keyword for the method call. Thus,

pipeline = Pipeline([apipelineelement])
msg = CallMethodMessage(apipelineelement,'meth',arg1,kw2=val2)
pipeline.feed(msg)
pipeline.process()

is equivalent to

pipeline = Pipeline([apipelineelement])
apipelineelement.meth(arg1,kw2=val2)
deliverMessage(elem)
class astropysics.pipeline.Pipeline(elements=None)

Bases: object

This class represents a pipeline, composed of a linear sequence of PipelineElements joined together.

Note that where stage numbers are mentioned in the documentation and inputs, they are 0-indexed (e.g. the 1st stage is stage 0).

Parameters:elements (Sequence of PipelineElements or None) – Initial pipeline elements for this pipeline.
Raises TypeError:
 If the input is not a PipelineElement
addElement(stagenum, element)

Insert a PipelineElement to the pipeline at the specified stage number. Later stages are pushed forward along with their current data.

Parameters:
  • stagenum (int) – The stage number for the new element.
  • element (PipelineElement) – The PipelineElement object to insert.
Raises TypeError:
 

If the input is not a PipelineElement

clear(stages=None)

Clears the inputs of the stage(s) requested.

Parameters:stages (integer, sequence of integers or None) – The stage(s) to be cleared. If None, all will be cleared.
elements

A tuple containing the pipeline elements.

extract(extractall=False, autoprocess=False)

Extract from the last stage of the pipeline.

Parameters:
  • extractall (bool) – If True, a sequence of objects will be extracted from the final stage (combined with autoprocess, this will clear the pipeline).
  • autoprocess (bool) – If True, the stages will be repeatedly processed so that at least one item is in the final stage. (combined with extractall, this will clear the pipeline).
Raises IndexError:
 

If there is no data left to extract

feed(data)

Feed an initial datum into the first stage of the pipeline.

feedMany(dataeq)

Feed initial data into the first stage of the pipel

process(repeat=False)

Process the pipeline to the end.

Parameters:repeat (bool or int) – If True, the pipeline will by processed continually. If False, only one step through the pipeline will be run. If it is an integer the pipeline will by processed continually, it will be taken as a maximum number of times to attempt any given stage before a PipelineError is raised.
Raises PipelineError:
 If a stage is repeated repeat or more times.
Returns:A list with the return value of processStage() if repeat is False, or a list of lists if repeat is True.
processSingle(input, processinglimit=10, removefailed=True)

Processes the input value through the pipeline, skipping over anything in the queue and repeating a processing stage until complete.

Parameters:
  • input – The input value to be feed into the 1st stage
  • processinglimit (int) – The maximum number of times to attempt to process any stage. If 0, processing continues indefinitely (or until completion).
  • removefailed (bool) – If True and a stage reaches the processinglimit, the data object will be removed from the pipeline. Otherwise, the object will be left in place where it got stuck.
Raises PipelineError:
 

If processinglimit is reached and the object has still not been processed. The data will be left in the front of the failed stage’s queue unless removefailed is True.

Returns:

The result of the full pipeline (it will not be left in the final stage).

processStage(stagenum)

Process a particular stage once, possibly processing earlier stages if there is nothing in the request stage’s input.

Parameters:stagenum (int) – Stage number to process
Returns:False if the stage didn’t complete, True if it did, or the string ‘message’ if it delivered a message instead of data.
processToStage(stagenum, repeat=False)

Processes the pipeline from the beginning up to the requested stage.

Parameters:
  • stagenum (int) – The stage to process to.
  • repeat (bool or int) – If True, processing will continue until all earlier stages are empty. If it is an integer, it will be taken as a maximum number of times to attempt any given stage before a PipelineError is raised
Returns:

A list with the return value of processStage() if repeat is False, or a list of lists if repeat is True.

Raises PipelineError:
 

If an element has not completed after repeat processing cycles have been run.

removeElement(stagenum, keepdata=False)

Remove the requested stage number. All later stages will move one stage left to fill the gap.

Parameters:
  • stagenum (int) – The stage number to remove.
  • keepdata (bool) – If True, any data waiting to be processed in the removed stage is added to the next stage. Otherwise, the data is lost.
class astropysics.pipeline.PipelineElement

Bases: object

This class represents an element in a Pipeline. The _plintype attribute may be set (on classes or instances) as a processing indicator. See check_type() for valid types to use (it it is None, no checking is performed)

Subclassing

Implementing classes must override the following method:

This method should be overridden if this stage of the pipeline involves an interactive step:

plInteract(data, pipeline, elemi)

This method should be implemented if an interactive step is to be performed by this element. In that case, plProcess() should return None, and this will be called after plProcess completes.

Parameters:
  • data – The data massed in by the previous element of the pipeline. Type and interpretation is left to this method to test.
  • pipeline (Pipeline) – The Pipeline object that called this element.
  • elemi (int) – The index of this stage of the pipeline.
Returns:

The data to be passed into the next stage or None if the interaction was not completed. In this case, the input will be run through plProcess() again.

plProcess(data, pipeline, elemi)

This method performs whatever data processing steps that this element of the pipeline is supposed to do.

Parameters:
  • data – The data massed in by the previous element of the pipeline. Type and interpretation is left to this method to test.
  • pipeline (Pipeline) – The Pipeline object that called this element.
  • elemi (int) – The index of this stage of the pipeline.
Returns:

None if processing was incomplete for any reason - otherwise, the return value will be passed to the next stage.

Note

If this returns None and the data provided is not saved, it will disappear - the next processing attempt will feed in the next piece of data. If this is not the desired behavior, this method should call resaveData(data,pipeline,elemi)

resaveData(data, pipeline, elemi)

This returns the data so that the next time this element is processed, it will receive the same data. This is intended to be used inside plProcess() if it returns None and the data should be reprocessed the next time the pipeline stage is run.

exception astropysics.pipeline.PipelineError

Bases: exceptions.Exception

This exception is used to indicate problems with the pipeline. It should NOT be used to indicate data matching problems within PipelineElements – ValueError or TypeError should be used for that purpose

class astropysics.pipeline.PipelineMessage(target)

Bases: object

This class represents a message that is passed through the pipeline as though it were data, but is never actually processed by the elements - instead, it performs an action when it reaches a particular target(s).

Subclassing

  • Subclasses must implement deliverMessage() (see deliverMessage() docs for details)
  • Subclasses should call PipelineMessage.__init__() if overridding __init__().
  • Subclasses can override isTarget() to change how the message determines what its target is. This can also be set to deliver the message to multiple elements.
Parameters:target – Either a specific object that this PipelineMessage should be delivered to, or a class. If it is a class, the message will be delivered to all of that class in the pipeline, otherwise it will only reach the actual object specified as the target.

.

deliverMessage(elem)

This method must be overridden in subclasses, defining the action to be performed on the pipeline element this message is targeted at.

Parameters:elem (PipelineElement) – The element this message is to be delivered to. It is guaranteed to be an object isTarget() method accept.
isTarget(elem)

Identifies whether or not the supplied element is an intended target of this message.

Parameters:elem – The element to be tested.
Returns:bool indicating whether or not elem is the target. It can also be the string ‘continue’ if the message should be delivered to this target but also passed further down the pipeline.
class astropysics.pipeline.SetAttributeMessage(target, **attrs)

Bases: astropysics.pipeline.PipelineMessage

This object is a PipelineMessage that sets a specified set of attributes when it reaches its target.

Parameters:target – The target object to receive this message.

Other keyword arguments are taken as the attributes to be set on the target. Thus,

pipeline = Pipeline([apipelineelement])
msg = SetAttributeMessage(apipelineelement,attr1=val1,attr2=val2)
pipeline.feed(msg)
pipeline.process()

is equivalent to

pipeline = Pipeline([apipelineelement])
apipelineelement.attr1 = val1
apipelineelement.attr2 = val2

A dictionary can also be passed in as kwargs:

pipeline = Pipeline([apipelineelement])
attrs = {'attr1':val1,'attr2':val2}
msg = SetAttributeMessage(apipelineelement,**attrs)
pipeline.feed(msg)
pipeline.process()
deliverMessage(elem)