dataduct.steps package¶
Submodules¶
dataduct.steps.emr_streaming module¶
ETL step wrapper for EmrActivity can be executed on Ec2
- class dataduct.steps.emr_streaming.EMRStreamingStep(mapper, reducer=None, input=None, hadoop_params=None, depends_on=None, **kwargs)¶
Bases: dataduct.steps.etl_step.ETLStep
EMR Streaming Step class that helps run scripts on resouces
- merge_s3_nodes(input_nodes)¶
Override the merge S3Node case for EMR Streaming Step
Parameters: input_nodes (dict) – Map of the form {‘node_name’: node} Returns: output_node – list of input nodes depends_on(list): Empty list Return type: list of S3Node
- dataduct.steps.emr_streaming.create_command(mapper, reducer, input_uri, output, hadoop_params)¶
Create the command step string given the input to streaming step
dataduct.steps.etl_step module¶
Base class for an etl step
- class dataduct.steps.etl_step.ETLStep(id, s3_data_dir=None, s3_log_dir=None, s3_source_dir=None, schedule=None, resource=None, input_node=None, required_steps=None, max_retries=0)¶
Bases: object
ETL step class with activities and metadata.
An ETL Step is an abstraction over the set of each database object. It represents a chunk of objects having the following attributes:
- input
- activities
- output
Given the input parameters, the ETL Step is responsible for creating all necessary AWS pipeline objects, defining names for such objects, and defining s3 URI’s for all dependent files.
- __str__()¶
Output the ETL step when typecasted to string
- activities¶
Get all aws activites that are created for this step
Returns: All aws activites that are created for this step Return type: result
- add_required_steps(required_steps)¶
Add dependencies for this step
Parameters: required_steps (list of ETLStep) – dependencies of current step
- copy_s3(input_node, dest_uri)¶
Copy S3 node to new location in s3
Creates a copy activity. Instead of copying to just the output_node, we create an intermediate node. This node must point to a directory, not a file. Otherwise, AWS will try to copy the file line by line.
Parameters: - input_node (S3Node) – Source Node in S3
- dest_uri (S3Path) – Destination path in S3
Returns: activity – copy activity object
Return type: CopyActivity
- create_output_nodes(output_node, sub_dirs)¶
Create output node for all the subdirs
Parameters: sub_dirs (list of str) – Name of the subdirectories Returns: s3_output_nodes – Output nodes keyed with sub dirs Return type: dict of s3Node
- create_pipeline_object(object_class, **kwargs)¶
Create the pipeline objects associated with the step
Parameters: object_class (PipelineObject) – a class of pipeline objects Returns: new_object – Creates object based on class. Name of object is created on its type and index if not provided Return type: PipelineObject
- create_s3_data_node(s3_object=None, **kwargs)¶
Create an S3 DataNode for s3_file or s3_path
Parameters: s3_object (S3File / S3Path) – s3_object to be used to create the node Returns: s3_node – S3Node for the etl step Return type: S3Node
- create_script(s3_object)¶
Set the s3 path for s3 objects with the s3_source_dir
Parameters: s3_object (S3File) – S3file for which the source directory is set Returns: s3_object – S3File after the path is set Return type: S3File
- depends_on¶
Get the dependent activities for the etl step
Returns: dependent activities for this etl step Return type: result
- input¶
Get the input node for the etl step
Returns: Input node for this etl step Return type: result Note
Input is represented as None, a single node or dict of nodes
- maximum_retries¶
Get the maximum retries for the etl step
Returns: maximum retries for this etl step Return type: result
- merge_s3_nodes(input_nodes)¶
Merge multi S3 input nodes
We merge the multiple input nodes by using a copy activity
Parameters: input_nodes (dict of S3Node) – Key-Node pair like {‘node_name’: node} Returns: combined_node – New S3Node that has input nodes merged new_depends_on(list of str): new dependencies for the step Return type: S3Node
- output¶
Get the output node for the etl step
Returns: output node for this etl step Return type: result Note
An output S3 node, or multiple s3 nodes. If this step produces no s3 nodes, there will be no output. For steps producing s3 output, note that they may produce multiple output nodes. These nodes will be defined in a list of output directories (specified in the load-definition) to the node. For instance, the step defined as follows:
{ ..., "output": [ "output1":, "output2" ] }
will have output:
{ "output1": [s3 node pointing to .../output_1] "output2": [s3 node pointing to .../output_2] }
- pipeline_objects¶
Get all pipeline objects that are created for this step
Returns: All pipeline objects that are created for this step Return type: result
dataduct.steps.extract_local module¶
ETL step wrapper for creating an S3 node for input from local files
- class dataduct.steps.extract_local.ExtractLocalStep(path, **kwargs)¶
Bases: dataduct.steps.etl_step.ETLStep
ExtractLocal Step class that helps get data from a local file
dataduct.steps.extract_rds module¶
ETL step wrapper to extract data from RDS to S3
- class dataduct.steps.extract_rds.ExtractRdsStep(table=None, sql=None, host_name=None, database=None, depends_on=None, **kwargs)¶
Bases: dataduct.steps.etl_step.ETLStep
Extract Redshift Step class that helps get data out of redshift
- dataduct.steps.extract_rds.guess_input_tables(sql)¶
Guess input tables from the sql query
Returns: results – tables which are used in the sql statement Return type: list of str
dataduct.steps.extract_redshift module¶
ETL step wrapper for RedshiftCopyActivity to extract data to S3
- class dataduct.steps.extract_redshift.ExtractRedshiftStep(schema, table, redshift_database, insert_mode='TRUNCATE', depends_on=None, **kwargs)¶
Bases: dataduct.steps.etl_step.ETLStep
Extract Redshift Step class that helps get data out of redshift
dataduct.steps.extract_s3 module¶
ETL step wrapper for creating an S3 node for input
- class dataduct.steps.extract_s3.ExtractS3Step(uri, **kwargs)¶
Bases: dataduct.steps.etl_step.ETLStep
ExtractS3 Step class that helps get data from S3
dataduct.steps.load_redshift module¶
ETL step wrapper for RedshiftCopyActivity to load data into Redshift
- class dataduct.steps.load_redshift.LoadRedshiftStep(schema, table, redshift_database, insert_mode='TRUNCATE', max_errors=None, replace_invalid_char=None, depends_on=None, **kwargs)¶
Bases: dataduct.steps.etl_step.ETLStep
Load Redshift Step class that helps load data into redshift
dataduct.steps.sql_command module¶
ETL step wrapper for SqlActivity can be executed on Ec2
- class dataduct.steps.sql_command.SqlCommandStep(redshift_database, script=None, script_arguments=None, queue=None, command=None, depends_on=None, **kwargs)¶
Bases: dataduct.steps.etl_step.ETLStep
SQL Command Step class that helps run scripts on resouces
dataduct.steps.transform module¶
ETL step wrapper for shell command activity can be executed on Ec2 / EMR
- class dataduct.steps.transform.TransformStep(command=None, script=None, output=None, script_arguments=None, additional_s3_files=None, depends_on=None, **kwargs)¶
Bases: dataduct.steps.etl_step.ETLStep
Transform Step class that helps run scripts on resouces