dataduct.steps package

Submodules

dataduct.steps.emr_streaming module

ETL step wrapper for EmrActivity can be executed on Ec2

class dataduct.steps.emr_streaming.EMRStreamingStep(mapper, reducer=None, input=None, hadoop_params=None, depends_on=None, **kwargs)

Bases: dataduct.steps.etl_step.ETLStep

EMR Streaming Step class that helps run scripts on resouces

merge_s3_nodes(input_nodes)

Override the merge S3Node case for EMR Streaming Step

Parameters:input_nodes (dict) – Map of the form {‘node_name’: node}
Returns:output_node – list of input nodes depends_on(list): Empty list
Return type:list of S3Node
dataduct.steps.emr_streaming.create_command(mapper, reducer, input_uri, output, hadoop_params)

Create the command step string given the input to streaming step

dataduct.steps.etl_step module

Base class for an etl step

class dataduct.steps.etl_step.ETLStep(id, s3_data_dir=None, s3_log_dir=None, s3_source_dir=None, schedule=None, resource=None, input_node=None, required_steps=None, max_retries=0)

Bases: object

ETL step class with activities and metadata.

An ETL Step is an abstraction over the set of each database object. It represents a chunk of objects having the following attributes:

  • input
  • activities
  • output

Given the input parameters, the ETL Step is responsible for creating all necessary AWS pipeline objects, defining names for such objects, and defining s3 URI’s for all dependent files.

__str__()

Output the ETL step when typecasted to string

activities

Get all aws activites that are created for this step

Returns:All aws activites that are created for this step
Return type:result
add_required_steps(required_steps)

Add dependencies for this step

Parameters:required_steps (list of ETLStep) – dependencies of current step
copy_s3(input_node, dest_uri)

Copy S3 node to new location in s3

Creates a copy activity. Instead of copying to just the output_node, we create an intermediate node. This node must point to a directory, not a file. Otherwise, AWS will try to copy the file line by line.

Parameters:
  • input_node (S3Node) – Source Node in S3
  • dest_uri (S3Path) – Destination path in S3
Returns:

activity – copy activity object

Return type:

CopyActivity

create_output_nodes(output_node, sub_dirs)

Create output node for all the subdirs

Parameters:sub_dirs (list of str) – Name of the subdirectories
Returns:s3_output_nodes – Output nodes keyed with sub dirs
Return type:dict of s3Node
create_pipeline_object(object_class, **kwargs)

Create the pipeline objects associated with the step

Parameters:object_class (PipelineObject) – a class of pipeline objects
Returns:new_object – Creates object based on class. Name of object is created on its type and index if not provided
Return type:PipelineObject
create_s3_data_node(s3_object=None, **kwargs)

Create an S3 DataNode for s3_file or s3_path

Parameters:s3_object (S3File / S3Path) – s3_object to be used to create the node
Returns:s3_node – S3Node for the etl step
Return type:S3Node
create_script(s3_object)

Set the s3 path for s3 objects with the s3_source_dir

Parameters:s3_object (S3File) – S3file for which the source directory is set
Returns:s3_object – S3File after the path is set
Return type:S3File
depends_on

Get the dependent activities for the etl step

Returns:dependent activities for this etl step
Return type:result
input

Get the input node for the etl step

Returns:Input node for this etl step
Return type:result

Note

Input is represented as None, a single node or dict of nodes

maximum_retries

Get the maximum retries for the etl step

Returns:maximum retries for this etl step
Return type:result
merge_s3_nodes(input_nodes)

Merge multi S3 input nodes

We merge the multiple input nodes by using a copy activity

Parameters:input_nodes (dict of S3Node) – Key-Node pair like {‘node_name’: node}
Returns:combined_node – New S3Node that has input nodes merged new_depends_on(list of str): new dependencies for the step
Return type:S3Node
output

Get the output node for the etl step

Returns:output node for this etl step
Return type:result

Note

An output S3 node, or multiple s3 nodes. If this step produces no s3 nodes, there will be no output. For steps producing s3 output, note that they may produce multiple output nodes. These nodes will be defined in a list of output directories (specified in the load-definition) to the node. For instance, the step defined as follows:

{
    ...,
    "output": [
        "output1":,
        "output2"
    ]
}

will have output:

{
    "output1": [s3 node pointing to .../output_1]
    "output2": [s3 node pointing to .../output_2]
}
pipeline_objects

Get all pipeline objects that are created for this step

Returns:All pipeline objects that are created for this step
Return type:result

dataduct.steps.extract_local module

ETL step wrapper for creating an S3 node for input from local files

class dataduct.steps.extract_local.ExtractLocalStep(path, **kwargs)

Bases: dataduct.steps.etl_step.ETLStep

ExtractLocal Step class that helps get data from a local file

dataduct.steps.extract_rds module

ETL step wrapper to extract data from RDS to S3

class dataduct.steps.extract_rds.ExtractRdsStep(table=None, sql=None, host_name=None, database=None, depends_on=None, **kwargs)

Bases: dataduct.steps.etl_step.ETLStep

Extract Redshift Step class that helps get data out of redshift

dataduct.steps.extract_rds.guess_input_tables(sql)

Guess input tables from the sql query

Returns:results – tables which are used in the sql statement
Return type:list of str

dataduct.steps.extract_redshift module

ETL step wrapper for RedshiftCopyActivity to extract data to S3

class dataduct.steps.extract_redshift.ExtractRedshiftStep(schema, table, redshift_database, insert_mode='TRUNCATE', depends_on=None, **kwargs)

Bases: dataduct.steps.etl_step.ETLStep

Extract Redshift Step class that helps get data out of redshift

dataduct.steps.extract_s3 module

ETL step wrapper for creating an S3 node for input

class dataduct.steps.extract_s3.ExtractS3Step(uri, **kwargs)

Bases: dataduct.steps.etl_step.ETLStep

ExtractS3 Step class that helps get data from S3

dataduct.steps.load_redshift module

ETL step wrapper for RedshiftCopyActivity to load data into Redshift

class dataduct.steps.load_redshift.LoadRedshiftStep(schema, table, redshift_database, insert_mode='TRUNCATE', max_errors=None, replace_invalid_char=None, depends_on=None, **kwargs)

Bases: dataduct.steps.etl_step.ETLStep

Load Redshift Step class that helps load data into redshift

dataduct.steps.sql_command module

ETL step wrapper for SqlActivity can be executed on Ec2

class dataduct.steps.sql_command.SqlCommandStep(redshift_database, script=None, script_arguments=None, queue=None, command=None, depends_on=None, **kwargs)

Bases: dataduct.steps.etl_step.ETLStep

SQL Command Step class that helps run scripts on resouces

dataduct.steps.transform module

ETL step wrapper for shell command activity can be executed on Ec2 / EMR

class dataduct.steps.transform.TransformStep(command=None, script=None, output=None, script_arguments=None, additional_s3_files=None, depends_on=None, **kwargs)

Bases: dataduct.steps.etl_step.ETLStep

Transform Step class that helps run scripts on resouces

Module contents