ETLPipeline

Class definition for DataPipeline

class dataduct.etl_pipeline.ETLPipeline(name, frequency='one-time', ec2_resource_terminate_after='6 Hours', delay=None, emr_cluster_config=None, load_time=None, max_retries=0)

Bases: object

DataPipeline class with steps and metadata.

Datapipeline class contains all the metadata regarding the pipeline and has functionality to add steps to the pipeline

__str__()

Formatted output when printing the pipeline object

Returns:output – Formatted string output
Return type:str
activate()

Activate the given pipeline definition

Activates an existing data pipeline & uploads all required files to s3

add_step(step, is_bootstrap=False)

Add a step to the pipeline

Parameters:
  • step (ETLStep) – Step object that should be added to the pipeline
  • is_bootstrap (bool) – flag indicating bootstrap steps
bootstrap_steps

Get the bootstrap_steps for the pipeline

Returns:bootstrap_steps for the pipeline
Return type:result
create_base_objects()

Create base pipeline objects

Create base pipeline objects, which are maintained by ETLPipeline. The remaining objects (all of which are accessible through pipeline_objects) are maintained by the ETLStep.

create_bootstrap_steps(resource_type)

Create the boostrap steps for installation on all machines

Parameters:resource_type (enum of str) – type of resource we’re bootstraping can be ec2 / emr
create_pipeline_object(object_class, **kwargs)

Abstract factory for creating, naming, and storing pipeline objects

Parameters:object_class (PipelineObject) – a class of pipeline objects
Returns:new_object – Creates object based on class. Name of object is created on its type and index if not provided
Return type:PipelineObject
create_steps(steps_params, is_bootstrap=False)

Create pipeline steps and add appropriate dependencies

Note

Unless the input of a particular step is specified, it is assumed that its input is the preceding step.

Parameters:
  • steps_params (list of dict) – List of dictionary of step params
  • is_bootstrap (bool) – flag indicating bootstrap steps
Returns:

steps – list of etl step objects

Return type:

list of ETLStep

delete_if_exists()

Delete the pipelines with the same name as current pipeline

determine_step_class(type, step_args)

Determine step class from input to correct ETL step types

Parameters:
  • type (str) – string specifing type of the objects
  • step_args (dict) – dictionary of step arguments
Returns:

step_class – Class object for the specific type step_args(dict): dictionary of step arguments

Return type:

ETLStep

ec2_resource

Get the ec2 resource associated with the pipeline

Note

This will create the object if it doesn’t exist

Returns:ec2_resource – lazily-constructed ec2_resource
Return type:Ec2Resource
emr_cluster

Get the emr resource associated with the pipeline

Note

This will create the object if it doesn’t exist

Returns:emr_resource – lazily-constructed emr_resource
Return type:EmrResource
name

Get the name of the pipeline

Returns:name of the pipeline
Return type:result
parse_step_args(type, **kwargs)

Parse step arguments from input to correct ETL step types

Parameters:type (str) – string specifing type of the objects
Returns:step_class – Class object for the specific type step_args(dict): dictionary of step arguments
Return type:ETLStep
pipeline_objects()

Get all pipeline objects associated with the ETL

Returns:result – All steps related to the ETL i.e. all base objects as well as ones owned by steps
Return type:list of PipelineObject
redshift_database

Get the redshift database associated with the pipeline

Note

This will create the object if it doesn’t exist

Returns:redshift_database – lazily-constructed redshift database
Return type:Object
s3_data_dir

Fetch the S3 data directory

Returns:s3_dir – Directory where s3 data will be stored.
Return type:S3Directory
s3_files()

Get all s3 files associated with the ETL

Returns:result – All s3files related to the ETL
Return type:list of s3files
s3_log_dir

Fetch the S3 log directory

Returns:s3_dir – Directory where s3 log will be stored.
Return type:S3Directory
s3_source_dir

Fetch the S3 src directory

Returns:s3_dir – Directory where s3 src will be stored.
Return type:S3Directory
step(step_id)

Fetch a single step from the pipeline

Parameters:step_id (str) – id of the step to be fetched
Returns:step – Step matching the step_id. If not found, None will be returned
Return type:ETLStep
translate_input_nodes(input_node)

Translate names from YAML to input_nodes

For steps which may take s3 as input, check whether they require multiple inputs. These inputs will be represented as a dictionary mapping step-names to filenames used in that step. E.g.

{
    "step1": "eventing_activity_table",
    "step2": "activity_type_table"
}

When this is the case, we translate this to a dictionary in the following form, and pass that as the ‘input_form’:

{
    "eventing_activity_table": [node for step1],
    "activity_type_table": [node for step2]
}
Parameters:input_node (dict) – map of input node string
Returns:output – map of string : S3Node
Return type:dict of S3Node
validate()

Validate the given pipeline definition by creating a pipeline

Returns:errors – list of errors in the pipeline, empty if no errors
Return type:list