dataduct.pipeline package

dataduct.pipeline.activity module

Base class for data pipeline instance

class dataduct.pipeline.activity.Activity(dependsOn, maximumRetries, **kwargs)

Bases: dataduct.pipeline.pipeline_object.PipelineObject

Base class for pipeline activities

depends_on

Get the dependent activities for the activity

Returns:dependent activities for this activity
Return type:result
input

Get the input node for the activity

Returns:Input node for this activity
Return type:result
maximum_retries

Get the maximum retries for the activity

Returns:maximum retries for this activity
Return type:result
output

Get the output node for the activity

Returns:output node for this activity
Return type:result

dataduct.pipeline.copy_activity module

Pipeline object class for CopyActivity

class dataduct.pipeline.copy_activity.CopyActivity(id, input_node, output_node, resource, schedule, max_retries=None, depends_on=None, **kwargs)

Bases: dataduct.pipeline.activity.Activity

EC2 Resource class

dataduct.pipeline.data_pipeline module

Base class for data pipeline instance

class dataduct.pipeline.data_pipeline.DataPipeline(unique_id=None, name=None, pipeline_id=None)

Bases: object

DataPipeline classes with objects and metadata.

The DataPipeline represents the data-pipeline, and is responsible for collecting all pipeline objects, validating the pipeline, and executing it.

activate()

Activate the datapipeline

add_object(pipeline_object)

Add an object to the datapipeline

aws_format

Create a list aws readable format dicts of all pipeline objects

Returns:result – list of AWS-readable dict of all objects
Return type:list of dict
delete()

Deletes the datapipeline

id

Fetch the id of the datapipeline

Returns:id – id of the datapipeline
Return type:str
instance_details()

List details of all the pipeline instances

Returns:result – Dictionary mapping run date to a list of pipeline instances combined per date
Return type:dict of list
update_pipeline_definition()

Updates the datapipeline definition

validate_pipeline_definition()

Validate the current pipeline

dataduct.pipeline.default_object module

Pipeline object class for default metadata

class dataduct.pipeline.default_object.DefaultObject(id='Default', sns=None, scheduleType='cron', failureAndRerunMode='CASCADE', **kwargs)

Bases: dataduct.pipeline.pipeline_object.PipelineObject

Default object added to all pipelines

dataduct.pipeline.ec2_resource module

Pipeline object class for ec2 resource

class dataduct.pipeline.ec2_resource.Ec2Resource(id, s3_log_dir=None, schedule=None, terminate_after='6 Hours', instance_type='m1.large', ami='ami-c65de9ae', security_group='analytics-etl', **kwargs)

Bases: dataduct.pipeline.pipeline_object.PipelineObject

EC2 Resource class

dataduct.pipeline.emr_activity module

Pipeline object class for EmrActivity

class dataduct.pipeline.emr_activity.EmrActivity(id, resource, schedule, emr_step_string, output_node=None, additional_files=None, max_retries=None, depends_on=None)

Bases: dataduct.pipeline.activity.Activity

EMR Activity class

dataduct.pipeline.emr_resource module

Pipeline object class for emr resource

class dataduct.pipeline.emr_resource.EmrResource(id, s3_log_dir, schedule, num_instances=3, instance_size='m1.large', bootstrap=None, num_task_instances=None, task_bid_price=None, task_instance_type='m1.large', master_instance_size='m1.large', terminate_after='6 Hours', hadoop_version=None, install_hive=None, install_pig=None, ami_version='2.4.7')

Bases: dataduct.pipeline.pipeline_object.PipelineObject

EMR Resource class

dataduct.pipeline.mysql_node module

Pipeline object class for MysqlNode

class dataduct.pipeline.mysql_node.MysqlNode(id, schedule, host, database, username, password, sql, table, depends_on=None)

Bases: dataduct.pipeline.pipeline_object.PipelineObject

MySQL Data Node class

database

Get the database name for the MySQL node

Returns:result – database name for this MySQL node
Return type:str
table

Get the table name for the MySQL node

Returns:result – table name for this MySQL node
Return type:str

dataduct.pipeline.pipeline_object module

Base class for data pipeline objects

class dataduct.pipeline.pipeline_object.PipelineObject(id, **kwargs)

Bases: object

DataPipeline class with steps and metadata.

The pipeline object has a one-to-one mapping with the AWS pipeline objects, which are described at the following url: http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-pipeline-objects.html The pipeline object acts much like a dictionary (with similar getters and setters) which provides access to all aws attributes.

__delitem__(key)

Delete the key from object fields

Parameters:key (str) – Key of the item to be fetched
__getitem__(key)

Fetch the items associated with a key

Note: This value will be a list if there are multiple values
associated with one key. Otherwise, it will be the singleton item.
Parameters:key (str) – Key of the item to be fetched
Returns:result – value(s) associated with the key
Return type:list or singleton
__setitem__(key, value)

Set an key value field

Parameters:
  • key (str) – Key of the item to be fetched
  • value – Value of the item to be fetched
add_additional_files(new_files)

Add new s3 files

Parameters:new_files (S3File) – list of new S3 files for the activity
aws_format()

Create the aws readable format of object

Returns:The AWS-readable dict format of the object
Return type:result
id

Fetch the id of the pipeline object

Returns:id – id of the pipeline object
Return type:str
s3_files

Fetch the list of files associated with the pipeline object

Returns:result – List of files to be uploaded to s3
Return type:list of S3Files

dataduct.pipeline.precondition module

Pipeline object class for the precondition step

class dataduct.pipeline.precondition.Precondition(id, is_directory=True, **kwargs)

Bases: dataduct.pipeline.pipeline_object.PipelineObject

Precondition object added to all pipelines

dataduct.pipeline.redshift_copy_activity module

Pipeline object class for RedshiftCopyActivity

class dataduct.pipeline.redshift_copy_activity.RedshiftCopyActivity(id, resource, schedule, input_node, output_node, insert_mode, command_options=None, max_retries=None, depends_on=None)

Bases: dataduct.pipeline.activity.Activity

EMR Activity class

dataduct.pipeline.redshift_database module

Pipeline object class for redshift database

class dataduct.pipeline.redshift_database.RedshiftDatabase(id, database_name='coursera', cluster_id='coursera', username='sbajaj', password='8YqCLvyr8hiPFQ')

Bases: dataduct.pipeline.pipeline_object.PipelineObject

Redshift resource class

dataduct.pipeline.redshift_node module

Pipeline object class for RedshiftNode

class dataduct.pipeline.redshift_node.RedshiftNode(id, schedule, redshift_database, schema_name, table_name)

Bases: dataduct.pipeline.pipeline_object.PipelineObject

Redshift Data Node class

schema

Get the schema name for the redshift node

Returns:result – schema name for this redshift node
Return type:str
table

Get the table name for the redshift node

Returns:result – table name for this redshift node
Return type:str

dataduct.pipeline.s3_node module

Pipeline object class for S3Node

class dataduct.pipeline.s3_node.S3Node(id, schedule, s3_path, precondition=None, format=None, **kwargs)

Bases: dataduct.pipeline.pipeline_object.PipelineObject

S3 Data Node class

path()

Get the s3_path associated with the S3 data node

Returns:s3_path – The s3 path of the node can a directory or file
Return type:S3Path

dataduct.pipeline.schedule module

Pipeline object class for the schedule

class dataduct.pipeline.schedule.Schedule(id, frequency='one-time', delay=None, load_hour=None, load_minutes=None, **kwargs)

Bases: dataduct.pipeline.pipeline_object.PipelineObject

Schedule object added to all pipelines

dataduct.pipeline.shell_command_activity module

Pipeline object class for ShellCommandActivity

class dataduct.pipeline.shell_command_activity.ShellCommandActivity(id, input_node, output_node, resource, schedule, script_uri=None, script_arguments=None, command=None, max_retries=None, depends_on=None, additional_s3_files=None)

Bases: dataduct.pipeline.activity.Activity

ShellCommandActivity class

dataduct.pipeline.sns_alarm module

Pipeline object class for sns

class dataduct.pipeline.sns_alarm.SNSAlarm(id, pipeline_name=None, failure_message=None, **kwargs)

Bases: dataduct.pipeline.pipeline_object.PipelineObject

SNS object added to all pipelines

dataduct.pipeline.sql_activity module

Pipeline object class for SqlActivity

class dataduct.pipeline.sql_activity.SqlActivity(id, resource, schedule, script, database, script_arguments=None, queue=None, max_retries=None, depends_on=None)

Bases: dataduct.pipeline.activity.Activity

Sql Activity class

dataduct.pipeline.utils module

Shared utility functions

dataduct.pipeline.utils.get_list_from_boto(func, response_key, *args, **kwargs)

Get a paginated list from boto

Parameters:
  • func (function) – Function to call
  • response_key (str) – Key which points to a list
Returns:

results – Aggregated list of items indicated by the response key

Return type:

list

dataduct.pipeline.utils.get_response_from_boto(fn, *args, **kwargs)

Expotentially decay sleep times between calls incase of failures

Note

If there is a rate limit error, sleep until the error goes away

Parameters:func (function) – Function to call
Returns:response – request response.
Return type:json
Input:
func(function): Function to call *args(optional): arguments **kwargs(optional): keyword arguments
dataduct.pipeline.utils.list_pipeline_instances(pipeline_id, conn=None, increment=25)

List details of all the pipeline instances

Parameters:
  • pipeline_id (str) – id of the pipeline
  • conn (DataPipelineConnection) – boto connection to datapipeline
  • increment (int) – rate of increments in API calls
Returns:

instances – list of pipeline instances

Return type:

list

dataduct.pipeline.utils.list_pipelines(conn=None)

Fetch a list of all pipelines with boto

Parameters:conn (DataPipelineConnection) – boto connection to datapipeline
Returns:pipelines – list of pipelines fetched with boto
Return type:list

Module contents