The pipeline module contains classes and utilities for constructing data pipelines – linear constructs of operations that process input data, passing it through all pipeline stages.
Pipelines are represented by the Pipeline class, which is composed of a sequence of PipelineElement objects representing the processing stages. Astropysics includes some pipeline elements built-in, but PipelineElement can also easily be subclassed to provide cusotmized pipeline functionality.
Once the pipeline is constructed, data can be queued into the first stage with the Pipeline.feed() method (or Pipeline.feedMany()), where the type of the data expected is determined by the PipelineElement objects.
Special objects (subclasses of PipelineMessage) can also be fed into pipelines to indicate to various pipeline elements to change how they behave. This allows a pipeline to adapt based on the data. e.g. If two different exposure times are being processed in a image reduction pipeline, after the images from the first set of exposure times are processed, a AccumulateMessage can be added to provide a new set of darks before the second set of images are fed into the pipeline.
Bases: astropysics.pipeline.PipelineMessage
This object is a PipelineMessage that tells the target PipelineElement to accumulate the next naccum inputs from the earlier stage as a list, and then either pass this list in as data to the next stage, or set it to an attribute on the element.
Parameters: |
|
---|
Bases: astropysics.pipeline.PipelineMessage
This object is a PipelineMessage that calls a method on the target pipeline element.
The retval attribute stores the return value for the method call as a list (in the same order that the calls occur, if the message is passed to multiple elements).
Parameters: |
|
---|
Further arguments and keyword arguments will be passed in as the arguments and keyword for the method call. Thus,
pipeline = Pipeline([apipelineelement])
msg = CallMethodMessage(apipelineelement,'meth',arg1,kw2=val2)
pipeline.feed(msg)
pipeline.process()
is equivalent to
pipeline = Pipeline([apipelineelement])
apipelineelement.meth(arg1,kw2=val2)
Bases: object
This class represents a pipeline, composed of a linear sequence of PipelineElements joined together.
Note that where stage numbers are mentioned in the documentation and inputs, they are 0-indexed (e.g. the 1st stage is stage 0).
Parameters: | elements (Sequence of PipelineElements or None) – Initial pipeline elements for this pipeline. |
---|---|
Raises TypeError: | |
If the input is not a PipelineElement |
Insert a PipelineElement to the pipeline at the specified stage number. Later stages are pushed forward along with their current data.
Parameters: |
|
---|---|
Raises TypeError: | |
If the input is not a PipelineElement |
Clears the inputs of the stage(s) requested.
Parameters: | stages (integer, sequence of integers or None) – The stage(s) to be cleared. If None, all will be cleared. |
---|
A tuple containing the pipeline elements.
Extract from the last stage of the pipeline.
Parameters: |
|
---|---|
Raises IndexError: | |
If there is no data left to extract |
Feed an initial datum into the first stage of the pipeline.
Feed initial data into the first stage of the pipel
Process the pipeline to the end.
Parameters: | repeat (bool or int) – If True, the pipeline will by processed continually. If False, only one step through the pipeline will be run. If it is an integer the pipeline will by processed continually, it will be taken as a maximum number of times to attempt any given stage before a PipelineError is raised. |
---|---|
Raises PipelineError: | |
If a stage is repeated repeat or more times. | |
Returns: | A list with the return value of processStage() if repeat is False, or a list of lists if repeat is True. |
Processes the input value through the pipeline, skipping over anything in the queue and repeating a processing stage until complete.
Parameters: |
|
---|---|
Raises PipelineError: | |
If processinglimit is reached and the object has still not been processed. The data will be left in the front of the failed stage’s queue unless removefailed is True. |
|
Returns: | The result of the full pipeline (it will not be left in the final stage). |
Process a particular stage once, possibly processing earlier stages if there is nothing in the request stage’s input.
Parameters: | stagenum (int) – Stage number to process |
---|---|
Returns: | False if the stage didn’t complete, True if it did, or the string ‘message’ if it delivered a message instead of data. |
Processes the pipeline from the beginning up to the requested stage.
Parameters: |
|
---|---|
Returns: | A list with the return value of processStage() if repeat is False, or a list of lists if repeat is True. |
Raises PipelineError: | |
If an element has not completed after repeat processing cycles have been run. |
Remove the requested stage number. All later stages will move one stage left to fill the gap.
Parameters: |
---|
Bases: object
This class represents an element in a Pipeline. The _plintype attribute may be set (on classes or instances) as a processing indicator. See check_type() for valid types to use (it it is None, no checking is performed)
Subclassing
Implementing classes must override the following method:
This method should be overridden if this stage of the pipeline involves an interactive step:
This method should be implemented if an interactive step is to be performed by this element. In that case, plProcess() should return None, and this will be called after plProcess completes.
Parameters: | |
---|---|
Returns: | The data to be passed into the next stage or None if the interaction was not completed. In this case, the input will be run through plProcess() again. |
This method performs whatever data processing steps that this element of the pipeline is supposed to do.
Parameters: | |
---|---|
Returns: | None if processing was incomplete for any reason - otherwise, the return value will be passed to the next stage. |
Note
If this returns None and the data provided is not saved, it will disappear - the next processing attempt will feed in the next piece of data. If this is not the desired behavior, this method should call resaveData(data,pipeline,elemi)
This returns the data so that the next time this element is processed, it will receive the same data. This is intended to be used inside plProcess() if it returns None and the data should be reprocessed the next time the pipeline stage is run.
Bases: exceptions.Exception
This exception is used to indicate problems with the pipeline. It should NOT be used to indicate data matching problems within PipelineElements – ValueError or TypeError should be used for that purpose
Bases: object
This class represents a message that is passed through the pipeline as though it were data, but is never actually processed by the elements - instead, it performs an action when it reaches a particular target(s).
Subclassing
Parameters: | target – Either a specific object that this PipelineMessage should be delivered to, or a class. If it is a class, the message will be delivered to all of that class in the pipeline, otherwise it will only reach the actual object specified as the target. |
---|
.
This method must be overridden in subclasses, defining the action to be performed on the pipeline element this message is targeted at.
Parameters: | elem (PipelineElement) – The element this message is to be delivered to. It is guaranteed to be an object isTarget() method accept. |
---|
Identifies whether or not the supplied element is an intended target of this message.
Parameters: | elem – The element to be tested. |
---|---|
Returns: | bool indicating whether or not elem is the target. It can also be the string ‘continue’ if the message should be delivered to this target but also passed further down the pipeline. |
Bases: astropysics.pipeline.PipelineMessage
This object is a PipelineMessage that sets a specified set of attributes when it reaches its target.
Parameters: | target – The target object to receive this message. |
---|
Other keyword arguments are taken as the attributes to be set on the target. Thus,
pipeline = Pipeline([apipelineelement])
msg = SetAttributeMessage(apipelineelement,attr1=val1,attr2=val2)
pipeline.feed(msg)
pipeline.process()
is equivalent to
pipeline = Pipeline([apipelineelement])
apipelineelement.attr1 = val1
apipelineelement.attr2 = val2
A dictionary can also be passed in as kwargs:
pipeline = Pipeline([apipelineelement])
attrs = {'attr1':val1,'attr2':val2}
msg = SetAttributeMessage(apipelineelement,**attrs)
pipeline.feed(msg)
pipeline.process()