dataduct.pipeline package¶
dataduct.pipeline.activity module¶
Base class for data pipeline instance
- class dataduct.pipeline.activity.Activity(dependsOn, maximumRetries, **kwargs)¶
Bases: dataduct.pipeline.pipeline_object.PipelineObject
Base class for pipeline activities
- depends_on¶
Get the dependent activities for the activity
Returns: dependent activities for this activity Return type: result
- input¶
Get the input node for the activity
Returns: Input node for this activity Return type: result
- maximum_retries¶
Get the maximum retries for the activity
Returns: maximum retries for this activity Return type: result
- output¶
Get the output node for the activity
Returns: output node for this activity Return type: result
dataduct.pipeline.copy_activity module¶
Pipeline object class for CopyActivity
- class dataduct.pipeline.copy_activity.CopyActivity(id, input_node, output_node, resource, schedule, max_retries=None, depends_on=None, **kwargs)¶
Bases: dataduct.pipeline.activity.Activity
EC2 Resource class
dataduct.pipeline.data_pipeline module¶
Base class for data pipeline instance
- class dataduct.pipeline.data_pipeline.DataPipeline(unique_id=None, name=None, pipeline_id=None)¶
Bases: object
DataPipeline classes with objects and metadata.
The DataPipeline represents the data-pipeline, and is responsible for collecting all pipeline objects, validating the pipeline, and executing it.
- activate()¶
Activate the datapipeline
- add_object(pipeline_object)¶
Add an object to the datapipeline
- aws_format¶
Create a list aws readable format dicts of all pipeline objects
Returns: result – list of AWS-readable dict of all objects Return type: list of dict
- delete()¶
Deletes the datapipeline
- id¶
Fetch the id of the datapipeline
Returns: id – id of the datapipeline Return type: str
- instance_details()¶
List details of all the pipeline instances
Returns: result – Dictionary mapping run date to a list of pipeline instances combined per date Return type: dict of list
- update_pipeline_definition()¶
Updates the datapipeline definition
- validate_pipeline_definition()¶
Validate the current pipeline
dataduct.pipeline.default_object module¶
Pipeline object class for default metadata
- class dataduct.pipeline.default_object.DefaultObject(id='Default', sns=None, scheduleType='cron', failureAndRerunMode='CASCADE', **kwargs)¶
Bases: dataduct.pipeline.pipeline_object.PipelineObject
Default object added to all pipelines
dataduct.pipeline.ec2_resource module¶
Pipeline object class for ec2 resource
- class dataduct.pipeline.ec2_resource.Ec2Resource(id, s3_log_dir=None, schedule=None, terminate_after='6 Hours', instance_type='m1.large', ami='ami-c65de9ae', security_group='analytics-etl', **kwargs)¶
Bases: dataduct.pipeline.pipeline_object.PipelineObject
EC2 Resource class
dataduct.pipeline.emr_activity module¶
Pipeline object class for EmrActivity
- class dataduct.pipeline.emr_activity.EmrActivity(id, resource, schedule, emr_step_string, output_node=None, additional_files=None, max_retries=None, depends_on=None)¶
Bases: dataduct.pipeline.activity.Activity
EMR Activity class
dataduct.pipeline.emr_resource module¶
Pipeline object class for emr resource
- class dataduct.pipeline.emr_resource.EmrResource(id, s3_log_dir, schedule, num_instances=3, instance_size='m1.large', bootstrap=None, num_task_instances=None, task_bid_price=None, task_instance_type='m1.large', master_instance_size='m1.large', terminate_after='6 Hours', hadoop_version=None, install_hive=None, install_pig=None, ami_version='2.4.7')¶
Bases: dataduct.pipeline.pipeline_object.PipelineObject
EMR Resource class
dataduct.pipeline.mysql_node module¶
Pipeline object class for MysqlNode
- class dataduct.pipeline.mysql_node.MysqlNode(id, schedule, host, database, username, password, sql, table, depends_on=None)¶
Bases: dataduct.pipeline.pipeline_object.PipelineObject
MySQL Data Node class
- database¶
Get the database name for the MySQL node
Returns: result – database name for this MySQL node Return type: str
- table¶
Get the table name for the MySQL node
Returns: result – table name for this MySQL node Return type: str
dataduct.pipeline.pipeline_object module¶
Base class for data pipeline objects
- class dataduct.pipeline.pipeline_object.PipelineObject(id, **kwargs)¶
Bases: object
DataPipeline class with steps and metadata.
The pipeline object has a one-to-one mapping with the AWS pipeline objects, which are described at the following url: http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-pipeline-objects.html The pipeline object acts much like a dictionary (with similar getters and setters) which provides access to all aws attributes.
- __delitem__(key)¶
Delete the key from object fields
Parameters: key (str) – Key of the item to be fetched
- __getitem__(key)¶
Fetch the items associated with a key
- Note: This value will be a list if there are multiple values
- associated with one key. Otherwise, it will be the singleton item.
Parameters: key (str) – Key of the item to be fetched Returns: result – value(s) associated with the key Return type: list or singleton
- __setitem__(key, value)¶
Set an key value field
Parameters: - key (str) – Key of the item to be fetched
- value – Value of the item to be fetched
- add_additional_files(new_files)¶
Add new s3 files
Parameters: new_files (S3File) – list of new S3 files for the activity
- aws_format()¶
Create the aws readable format of object
Returns: The AWS-readable dict format of the object Return type: result
- id¶
Fetch the id of the pipeline object
Returns: id – id of the pipeline object Return type: str
- s3_files¶
Fetch the list of files associated with the pipeline object
Returns: result – List of files to be uploaded to s3 Return type: list of S3Files
dataduct.pipeline.precondition module¶
Pipeline object class for the precondition step
- class dataduct.pipeline.precondition.Precondition(id, is_directory=True, **kwargs)¶
Bases: dataduct.pipeline.pipeline_object.PipelineObject
Precondition object added to all pipelines
dataduct.pipeline.redshift_copy_activity module¶
Pipeline object class for RedshiftCopyActivity
- class dataduct.pipeline.redshift_copy_activity.RedshiftCopyActivity(id, resource, schedule, input_node, output_node, insert_mode, command_options=None, max_retries=None, depends_on=None)¶
Bases: dataduct.pipeline.activity.Activity
EMR Activity class
dataduct.pipeline.redshift_database module¶
Pipeline object class for redshift database
- class dataduct.pipeline.redshift_database.RedshiftDatabase(id, database_name='coursera', cluster_id='coursera', username='sbajaj', password='8YqCLvyr8hiPFQ')¶
Bases: dataduct.pipeline.pipeline_object.PipelineObject
Redshift resource class
dataduct.pipeline.redshift_node module¶
Pipeline object class for RedshiftNode
- class dataduct.pipeline.redshift_node.RedshiftNode(id, schedule, redshift_database, schema_name, table_name)¶
Bases: dataduct.pipeline.pipeline_object.PipelineObject
Redshift Data Node class
- schema¶
Get the schema name for the redshift node
Returns: result – schema name for this redshift node Return type: str
- table¶
Get the table name for the redshift node
Returns: result – table name for this redshift node Return type: str
dataduct.pipeline.s3_node module¶
Pipeline object class for S3Node
- class dataduct.pipeline.s3_node.S3Node(id, schedule, s3_path, precondition=None, format=None, **kwargs)¶
Bases: dataduct.pipeline.pipeline_object.PipelineObject
S3 Data Node class
- path()¶
Get the s3_path associated with the S3 data node
Returns: s3_path – The s3 path of the node can a directory or file Return type: S3Path
dataduct.pipeline.schedule module¶
Pipeline object class for the schedule
- class dataduct.pipeline.schedule.Schedule(id, frequency='one-time', delay=None, load_hour=None, load_minutes=None, **kwargs)¶
Bases: dataduct.pipeline.pipeline_object.PipelineObject
Schedule object added to all pipelines
dataduct.pipeline.shell_command_activity module¶
Pipeline object class for ShellCommandActivity
- class dataduct.pipeline.shell_command_activity.ShellCommandActivity(id, input_node, output_node, resource, schedule, script_uri=None, script_arguments=None, command=None, max_retries=None, depends_on=None, additional_s3_files=None)¶
Bases: dataduct.pipeline.activity.Activity
ShellCommandActivity class
dataduct.pipeline.sns_alarm module¶
Pipeline object class for sns
- class dataduct.pipeline.sns_alarm.SNSAlarm(id, pipeline_name=None, failure_message=None, **kwargs)¶
Bases: dataduct.pipeline.pipeline_object.PipelineObject
SNS object added to all pipelines
dataduct.pipeline.sql_activity module¶
Pipeline object class for SqlActivity
- class dataduct.pipeline.sql_activity.SqlActivity(id, resource, schedule, script, database, script_arguments=None, queue=None, max_retries=None, depends_on=None)¶
Bases: dataduct.pipeline.activity.Activity
Sql Activity class
dataduct.pipeline.utils module¶
Shared utility functions
- dataduct.pipeline.utils.get_list_from_boto(func, response_key, *args, **kwargs)¶
Get a paginated list from boto
Parameters: - func (function) – Function to call
- response_key (str) – Key which points to a list
Returns: results – Aggregated list of items indicated by the response key
Return type: list
- dataduct.pipeline.utils.get_response_from_boto(fn, *args, **kwargs)¶
Expotentially decay sleep times between calls incase of failures
Note
If there is a rate limit error, sleep until the error goes away
Parameters: func (function) – Function to call Returns: response – request response. Return type: json
- dataduct.pipeline.utils.list_pipeline_instances(pipeline_id, conn=None, increment=25)¶
List details of all the pipeline instances
Parameters: - pipeline_id (str) – id of the pipeline
- conn (DataPipelineConnection) – boto connection to datapipeline
- increment (int) – rate of increments in API calls
Returns: instances – list of pipeline instances
Return type: list
- dataduct.pipeline.utils.list_pipelines(conn=None)¶
Fetch a list of all pipelines with boto
Parameters: conn (DataPipelineConnection) – boto connection to datapipeline Returns: pipelines – list of pipelines fetched with boto Return type: list