ETLPipeline¶
Class definition for DataPipeline
- class dataduct.etl_pipeline.ETLPipeline(name, frequency='one-time', ec2_resource_terminate_after='6 Hours', delay=None, emr_cluster_config=None, load_time=None, max_retries=0)¶
Bases: object
DataPipeline class with steps and metadata.
Datapipeline class contains all the metadata regarding the pipeline and has functionality to add steps to the pipeline
- __str__()¶
Formatted output when printing the pipeline object
Returns: output – Formatted string output Return type: str
- activate()¶
Activate the given pipeline definition
Activates an existing data pipeline & uploads all required files to s3
- add_step(step, is_bootstrap=False)¶
Add a step to the pipeline
Parameters: - step (ETLStep) – Step object that should be added to the pipeline
- is_bootstrap (bool) – flag indicating bootstrap steps
- bootstrap_steps¶
Get the bootstrap_steps for the pipeline
Returns: bootstrap_steps for the pipeline Return type: result
- create_base_objects()¶
Create base pipeline objects
Create base pipeline objects, which are maintained by ETLPipeline. The remaining objects (all of which are accessible through pipeline_objects) are maintained by the ETLStep.
- create_bootstrap_steps(resource_type)¶
Create the boostrap steps for installation on all machines
Parameters: resource_type (enum of str) – type of resource we’re bootstraping can be ec2 / emr
- create_pipeline_object(object_class, **kwargs)¶
Abstract factory for creating, naming, and storing pipeline objects
Parameters: object_class (PipelineObject) – a class of pipeline objects Returns: new_object – Creates object based on class. Name of object is created on its type and index if not provided Return type: PipelineObject
- create_steps(steps_params, is_bootstrap=False)¶
Create pipeline steps and add appropriate dependencies
Note
Unless the input of a particular step is specified, it is assumed that its input is the preceding step.
Parameters: - steps_params (list of dict) – List of dictionary of step params
- is_bootstrap (bool) – flag indicating bootstrap steps
Returns: steps – list of etl step objects
Return type: list of ETLStep
- delete_if_exists()¶
Delete the pipelines with the same name as current pipeline
- determine_step_class(type, step_args)¶
Determine step class from input to correct ETL step types
Parameters: - type (str) – string specifing type of the objects
- step_args (dict) – dictionary of step arguments
Returns: step_class – Class object for the specific type step_args(dict): dictionary of step arguments
Return type: ETLStep
- ec2_resource¶
Get the ec2 resource associated with the pipeline
Note
This will create the object if it doesn’t exist
Returns: ec2_resource – lazily-constructed ec2_resource Return type: Ec2Resource
- emr_cluster¶
Get the emr resource associated with the pipeline
Note
This will create the object if it doesn’t exist
Returns: emr_resource – lazily-constructed emr_resource Return type: EmrResource
- name¶
Get the name of the pipeline
Returns: name of the pipeline Return type: result
- parse_step_args(type, **kwargs)¶
Parse step arguments from input to correct ETL step types
Parameters: type (str) – string specifing type of the objects Returns: step_class – Class object for the specific type step_args(dict): dictionary of step arguments Return type: ETLStep
- pipeline_objects()¶
Get all pipeline objects associated with the ETL
Returns: result – All steps related to the ETL i.e. all base objects as well as ones owned by steps Return type: list of PipelineObject
- redshift_database¶
Get the redshift database associated with the pipeline
Note
This will create the object if it doesn’t exist
Returns: redshift_database – lazily-constructed redshift database Return type: Object
- s3_data_dir¶
Fetch the S3 data directory
Returns: s3_dir – Directory where s3 data will be stored. Return type: S3Directory
- s3_files()¶
Get all s3 files associated with the ETL
Returns: result – All s3files related to the ETL Return type: list of s3files
- s3_log_dir¶
Fetch the S3 log directory
Returns: s3_dir – Directory where s3 log will be stored. Return type: S3Directory
- s3_source_dir¶
Fetch the S3 src directory
Returns: s3_dir – Directory where s3 src will be stored. Return type: S3Directory
- step(step_id)¶
Fetch a single step from the pipeline
Parameters: step_id (str) – id of the step to be fetched Returns: step – Step matching the step_id. If not found, None will be returned Return type: ETLStep
- translate_input_nodes(input_node)¶
Translate names from YAML to input_nodes
For steps which may take s3 as input, check whether they require multiple inputs. These inputs will be represented as a dictionary mapping step-names to filenames used in that step. E.g.
{ "step1": "eventing_activity_table", "step2": "activity_type_table" }
When this is the case, we translate this to a dictionary in the following form, and pass that as the ‘input_form’:
{ "eventing_activity_table": [node for step1], "activity_type_table": [node for step2] }
Parameters: input_node (dict) – map of input node string Returns: output – map of string : S3Node Return type: dict of S3Node
- validate()¶
Validate the given pipeline definition by creating a pipeline
Returns: errors – list of errors in the pipeline, empty if no errors Return type: list