API Reference

Operators

Operators allow for generation of certain types of tasks that become nodes in the DAG when instantiated. All operators derive from BaseOperator and inherit many attributes and methods that way. Refer to the BaseOperator documentation for more details.

There are 3 main types of operators:

  • Operators that performs an action, or tell another system to perform an action
  • Transfer operators move data from one system to another
  • Sensors are a certain type of operator that will keep running until a certain criterion is met. Examples include a specific file landing in HDFS or S3, a partition appearing in Hive, or a specific time of the day. Sensors are derived from BaseSensorOperator and run a poke method at a specified poke_interval until it returns True.

BaseOperator

All operators are derived from BaseOperator and acquire much functionality through inheritance. Since this is the core of the engine, it’s worth taking the time to understand the parameters of BaseOperator to understand the primitive features that can be leveraged in your DAGs.

class airflow.models.BaseOperator(task_id, owner='airflow', email=None, email_on_retry=True, email_on_failure=True, retries=0, retry_delay=datetime.timedelta(0, 300), retry_exponential_backoff=False, max_retry_delay=None, start_date=None, end_date=None, schedule_interval=None, depends_on_past=False, wait_for_downstream=False, dag=None, params=None, default_args=None, adhoc=False, priority_weight=1, queue='default', pool=None, sla=None, execution_timeout=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, trigger_rule=u'all_success', resources=None, *args, **kwargs)[source]

Abstract base class for all operators. Since operators create objects that become node in the dag, BaseOperator contains many recursive methods for dag crawling behavior. To derive this class, you are expected to override the constructor as well as the ‘execute’ method.

Operators derived from this task should perform or trigger certain tasks synchronously (wait for completion). Example of operators could be an operator the runs a Pig job (PigOperator), a sensor operator that waits for a partition to land in Hive (HiveSensorOperator), or one that moves data from Hive to MySQL (Hive2MySqlOperator). Instances of these operators (tasks) target specific operations, running specific scripts, functions or data transfers.

This class is abstract and shouldn’t be instantiated. Instantiating a class derived from this one results in the creation of a task object, which ultimately becomes a node in DAG objects. Task dependencies should be set by using the set_upstream and/or set_downstream methods.

Note that this class is derived from SQLAlchemy’s Base class, which allows us to push metadata regarding tasks to the database. Deriving this classes needs to implement the polymorphic specificities documented in SQLAlchemy. This should become clear while reading the code for other operators.

Parameters:
  • task_id (string) – a unique, meaningful id for the task
  • owner (string) – the owner of the task, using the unix username is recommended
  • retries (int) – the number of retries that should be performed before failing the task
  • retry_delay (timedelta) – delay between retries
  • retry_exponential_backoff (bool) – allow progressive longer waits between retries by using exponential backoff algorithm on retry delay (delay will be converted into seconds)
  • max_retry_delay (timedelta) – maximum delay interval between retries
  • start_date (datetime) – The start_date for the task, determines the execution_date for the first task instance. The best practice is to have the start_date rounded to your DAG’s schedule_interval. Daily jobs have their start_date some day at 00:00:00, hourly jobs have their start_date at 00:00 of a specific hour. Note that Airflow simply looks at the latest execution_date and adds the schedule_interval to determine the next execution_date. It is also very important to note that different tasks’ dependencies need to line up in time. If task A depends on task B and their start_date are offset in a way that their execution_date don’t line up, A’s dependencies will never be met. If you are looking to delay a task, for example running a daily task at 2AM, look into the TimeSensor and TimeDeltaSensor. We advise against using dynamic start_date and recommend using fixed ones. Read the FAQ entry about start_date for more information.
  • end_date (datetime) – if specified, the scheduler won’t go beyond this date
  • depends_on_past (bool) – when set to true, task instances will run sequentially while relying on the previous task’s schedule to succeed. The task instance for the start_date is allowed to run.
  • wait_for_downstream (bool) – when set to true, an instance of task X will wait for tasks immediately downstream of the previous instance of task X to finish successfully before it runs. This is useful if the different instances of a task X alter the same asset, and this asset is used by tasks downstream of task X. Note that depends_on_past is forced to True wherever wait_for_downstream is used.
  • queue (str) – which queue to target when running this job. Not all executors implement queue management, the CeleryExecutor does support targeting specific queues.
  • dag (DAG) – a reference to the dag the task is attached to (if any)
  • priority_weight (int) – priority weight of this task against other task. This allows the executor to trigger higher priority tasks before others when things get backed up.
  • pool (str) – the slot pool this task should run in, slot pools are a way to limit concurrency for certain tasks
  • sla (datetime.timedelta) – time by which the job is expected to succeed. Note that this represents the timedelta after the period is closed. For example if you set an SLA of 1 hour, the scheduler would send dan email soon after 1:00AM on the 2016-01-02 if the 2016-01-01 instance has not succeede yet. The scheduler pays special attention for jobs with an SLA and sends alert emails for sla misses. SLA misses are also recorded in the database for future reference. All tasks that share the same SLA time get bundled in a single email, sent soon after that time. SLA notification are sent once and only once for each task instance.
  • execution_timeout (datetime.timedelta) – max time allowed for the execution of this task instance, if it goes beyond it will raise and fail.
  • on_failure_callback (callable) – a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.
  • on_retry_callback – much like the on_failure_callback excepts that it is executed when retries occur.
  • on_success_callback (callable) – much like the on_failure_callback excepts that it is executed when the task succeeds.
  • trigger_rule (str) – defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | dummy} default is all_success. Options can be set as string or using the constants defined in the static class airflow.utils.TriggerRule
  • resources (dict) – A map of resource parameter names (the argument names of the Resources constructor) to their values.

BaseSensorOperator

All sensors are derived from BaseSensorOperator. All sensors inherit the timeout and poke_interval on top of the BaseOperator attributes.

class airflow.operators.sensors.BaseSensorOperator(poke_interval=60, timeout=604800, soft_fail=False, *args, **kwargs)[source]

Sensor operators are derived from this class an inherit these attributes.

Sensor operators keep executing at a time interval and succeed when
a criteria is met and fail if and when they time out.
Parameters:
  • soft_fail (bool) – Set to true to mark the task as SKIPPED on failure
  • poke_interval (int) – Time in seconds that the job should wait in between each tries
  • timeout (int) – Time, in seconds before the task times out and fails.

Operator API

Importer that dynamically loads a class and module from its parent. This allows Airflow to support from airflow.operators import BashOperator even though BashOperator is actually in airflow.operators.bash_operator.

The importer also takes over for the parent_module by wrapping it. This is required to support attribute-based usage:

from airflow import operators
operators.BashOperator(...)
class airflow.operators.BashOperator(bash_command, xcom_push=False, env=None, output_encoding='utf-8', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Execute a Bash script, command or set of commands.

Parameters:
  • bash_command (string) – The command, set of commands or reference to a bash script (must be ‘.sh’) to be executed.
  • xcom_push (bool) – If xcom_push is True, the last line written to stdout will also be pushed to an XCom when the bash command completes.
  • env (dict) – If env is not None, it must be a mapping that defines the environment variables for the new process; these are used instead of inheriting the current process environment, which is the default behavior. (templated)
execute(context)[source]

Execute the bash command in a temporary directory which will be cleaned afterwards

class airflow.operators.BranchPythonOperator(python_callable, op_args=None, op_kwargs=None, provide_context=False, templates_dict=None, templates_exts=None, *args, **kwargs)[source]

Bases: python_operator.PythonOperator

Allows a workflow to “branch” or follow a single path following the execution of this task.

It derives the PythonOperator and expects a Python function that returns the task_id to follow. The task_id returned should point to a task directly downstream from {self}. All other “branches” or directly downstream tasks are marked with a state of skipped so that these paths can’t move forward. The skipped states are propageted downstream to allow for the DAG state to fill up and the DAG run’s state to be inferred.

Note that using tasks with depends_on_past=True downstream from BranchPythonOperator is logically unsound as skipped status will invariably lead to block tasks that depend on their past successes. skipped states propagates where all directly upstream tasks are skipped.

class airflow.operators.TriggerDagRunOperator(trigger_dag_id, python_callable, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Triggers a DAG run for a specified dag_id if a criteria is met

Parameters:
  • trigger_dag_id (str) – the dag_id to trigger
  • python_callable (python callable) – a reference to a python function that will be called while passing it the context object and a placeholder object obj for your callable to fill and return if you want a DagRun created. This obj object contains a run_id and payload attribute that you can modify in your function. The run_id should be a unique identifier for that DAG run, and the payload has to be a picklable object that will be made available to your tasks while executing that DAG run. Your function header should look like def foo(context, dag_run_obj):
class airflow.operators.DummyOperator(*args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Operator that does literally nothing. It can be used to group tasks in a DAG.

class airflow.operators.EmailOperator(to, subject, html_content, files=None, cc=None, bcc=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Sends an email.

Parameters:
  • to (list or string (comma or semicolon delimited)) – list of emails to send the email to
  • subject (string) – subject line for the email (templated)
  • html_content (string) – content of the email (templated), html markup is allowed
  • files (list) – file names to attach in email
  • cc (list or string (comma or semicolon delimited)) – list of recipients to be added in CC field
  • bcc (list or string (comma or semicolon delimited)) – list of recipients to be added in BCC field
class airflow.operators.ExternalTaskSensor(external_dag_id, external_task_id, allowed_states=None, execution_delta=None, execution_date_fn=None, *args, **kwargs)[source]

Bases: sensors.BaseSensorOperator

Waits for a task to complete in a different DAG

Parameters:
  • external_dag_id (string) – The dag_id that contains the task you want to wait for
  • external_task_id (string) – The task_id that contains the task you want to wait for
  • allowed_states (list) – list of allowed states, default is ['success']
  • execution_delta (datetime.timedelta) – time difference with the previous execution to look at, the default is the same execution_date as the current task. For yesterday, use [positive!] datetime.timedelta(days=1). Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.
  • execution_date_fn (callable) – function that receives the current execution date and returns the desired execution date to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.
class airflow.operators.GenericTransfer(sql, destination_table, source_conn_id, destination_conn_id, preoperator=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from a connection to another, assuming that they both provide the required methods in their respective hooks. The source hook needs to expose a get_records method, and the destination a insert_rows method.

This is mean to be used on small-ish datasets that fit in memory.

Parameters:
  • sql (str) – SQL query to execute against the source database
  • destination_table (str) – target table
  • source_conn_id (str) – source connection
  • destination_conn_id (str) – source connection
  • preoperator (str or list of str) – sql statement or list of statements to be executed prior to loading the data
class airflow.operators.HdfsSensor(filepath, hdfs_conn_id='hdfs_default', *args, **kwargs)[source]

Bases: sensors.BaseSensorOperator

Waits for a file or folder to land in HDFS

class airflow.operators.Hive2SambaOperator(hql, destination_filepath, samba_conn_id='samba_default', hiveserver2_conn_id='hiveserver2_default', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Executes hql code in a specific Hive database and loads the results of the query as a csv to a Samba location.

Parameters:
  • hql (string) – the hql to be exported
  • hiveserver2_conn_id (string) – reference to the hiveserver2 service
  • samba_conn_id (string) – reference to the samba destination
class airflow.operators.HiveOperator(hql, hive_cli_conn_id='hive_cli_default', schema='default', hiveconf_jinja_translate=False, script_begin_tag=None, run_as_owner=False, mapred_queue=None, mapred_queue_priority=None, mapred_job_name=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Executes hql code in a specific Hive database.

Parameters:
  • hql (string) – the hql to be executed
  • hive_cli_conn_id (string) – reference to the Hive database
  • hiveconf_jinja_translate (boolean) – when True, hiveconf-type templating ${var} gets translated into jinja-type templating {{ var }}. Note that you may want to use this along with the DAG(user_defined_macros=myargs) parameter. View the DAG object documentation for more details.
  • script_begin_tag (str) – If defined, the operator will get rid of the part of the script before the first occurrence of script_begin_tag
  • mapred_queue (string) – queue used by the Hadoop CapacityScheduler
  • mapred_queue_priority (string) – priority within CapacityScheduler queue. Possible settings include: VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW
  • mapred_job_name (string) – This name will appear in the jobtracker. This can make monitoring easier.
class airflow.operators.HivePartitionSensor(table, partition="ds='{{ ds }}'", metastore_conn_id='metastore_default', schema='default', poke_interval=180, *args, **kwargs)[source]

Bases: sensors.BaseSensorOperator

Waits for a partition to show up in Hive.

Note: Because partition supports general logical operators, it can be inefficient. Consider using NamedHivePartitionSensor instead if you don’t need the full flexibility of HivePartitionSensor.

Parameters:
  • table (string) – The name of the table to wait for, supports the dot notation (my_database.my_table)
  • partition (string) – The partition clause to wait for. This is passed as is to the metastore Thrift client get_partitions_by_filter method, and apparently supports SQL like notation as in ds='2015-01-01' AND type='value' and comparison operators as in "ds>=2015-01-01"
  • metastore_conn_id (str) – reference to the metastore thrift service connection id
class airflow.operators.HiveToDruidTransfer(sql, druid_datasource, ts_dim, metric_spec=None, hive_cli_conn_id='hive_cli_default', druid_ingest_conn_id='druid_ingest_default', metastore_conn_id='metastore_default', hadoop_dependency_coordinates=None, intervals=None, num_shards=-1, target_partition_size=-1, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from Hive to Druid, [del]note that for now the data is loaded into memory before being pushed to Druid, so this operator should be used for smallish amount of data.[/del]

Parameters:
  • sql (str) – SQL query to execute against the Druid database
  • druid_datasource (str) – the datasource you want to ingest into in druid
  • ts_dim (str) – the timestamp dimension
  • metric_spec (list) – the metrics you want to define for your data
  • hive_cli_conn_id (str) – the hive connection id
  • druid_ingest_conn_id (str) – the druid ingest connection id
  • metastore_conn_id (str) – the metastore connection id
  • hadoop_dependency_coordinates (list of str) – list of coordinates to squeeze int the ingest json
  • intervals (list) – list of time intervals that defines segments, this is passed as is to the json object
class airflow.operators.HiveToMySqlTransfer(sql, mysql_table, hiveserver2_conn_id='hiveserver2_default', mysql_conn_id='mysql_default', mysql_preoperator=None, mysql_postoperator=None, bulk_load=False, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from Hive to MySQL, note that for now the data is loaded into memory before being pushed to MySQL, so this operator should be used for smallish amount of data.

Parameters:
  • sql (str) – SQL query to execute against the MySQL database
  • mysql_table (str) – target MySQL table, use dot notation to target a specific database
  • mysql_conn_id (str) – source mysql connection
  • hiveserver2_conn_id (str) – destination hive connection
  • mysql_preoperator (str) – sql statement to run against mysql prior to import, typically use to truncate of delete in place of the data coming in, allowing the task to be idempotent (running the task twice won’t double load data)
  • mysql_postoperator (str) – sql statement to run against mysql after the import, typically used to move data from staging to production and issue cleanup commands.
  • bulk_load (bool) – flag to use bulk_load option. This loads mysql directly from a tab-delimited text file using the LOAD DATA LOCAL INFILE command. This option requires an extra connection parameter for the destination MySQL connection: {‘local_infile’: true}.
class airflow.operators.SimpleHttpOperator(endpoint, method='POST', data=None, headers=None, response_check=None, extra_options=None, http_conn_id='http_default', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Calls an endpoint on an HTTP system to execute an action

Parameters:
  • http_conn_id (string) – The connection to run the sensor against
  • endpoint (string) – The relative part of the full url
  • method (string) – The HTTP method to use, default = “POST”
  • data (For POST/PUT, depends on the content-type parameter, for GET a dictionary of key/value string pairs) – The data to pass. POST-data in POST/PUT and params in the URL for a GET request.
  • headers (a dictionary of string key/value pairs) – The HTTP headers to be added to the GET request
  • response_check (A lambda or defined function.) – A check against the ‘requests’ response object. Returns True for ‘pass’ and False otherwise.
  • extra_options (A dictionary of options, where key is string and value depends on the option that's being modified.) – Extra options for the ‘requests’ library, see the ‘requests’ documentation (options to modify timeout, ssl, etc.)
class airflow.operators.HttpSensor(endpoint, http_conn_id='http_default', params=None, headers=None, response_check=None, extra_options=None, *args, **kwargs)[source]

Bases: sensors.BaseSensorOperator

Executes a HTTP get statement and returns False on failure:
404 not found or response_check function returned False
Parameters:
  • http_conn_id (string) – The connection to run the sensor against
  • endpoint (string) – The relative part of the full url
  • params (a dictionary of string key/value pairs) – The parameters to be added to the GET url
  • headers (a dictionary of string key/value pairs) – The HTTP headers to be added to the GET request
  • response_check (A lambda or defined function.) – A check against the ‘requests’ response object. Returns True for ‘pass’ and False otherwise.
  • extra_options (A dictionary of options, where key is string and value depends on the option that's being modified.) – Extra options for the ‘requests’ library, see the ‘requests’ documentation (options to modify timeout, ssl, etc.)
class airflow.operators.MetastorePartitionSensor(table, partition_name, schema='default', mysql_conn_id='metastore_mysql', *args, **kwargs)[source]

Bases: sensors.SqlSensor

An alternative to the HivePartitionSensor that talk directly to the MySQL db. This was created as a result of observing sub optimal queries generated by the Metastore thrift service when hitting subpartitioned tables. The Thrift service’s queries were written in a way that wouldn’t leverage the indexes.

Parameters:
  • schema (str) – the schema
  • table (str) – the table
  • partition_name (str) – the partition name, as defined in the PARTITIONS table of the Metastore. Order of the fields does matter. Examples: ds=2016-01-01 or ds=2016-01-01/sub=foo for a sub partitioned table
  • mysql_conn_id (str) – a reference to the MySQL conn_id for the metastore
class airflow.operators.MsSqlOperator(sql, mssql_conn_id='mssql_default', parameters=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Executes sql code in a specific Microsoft SQL database :param mssql_conn_id: reference to a specific mssql database :type mssql_conn_id: string :param sql: the sql code to be executed :type sql: string or string pointing to a template file. File must have a ‘.sql’ extensions.

class airflow.operators.MsSqlToHiveTransfer(sql, hive_table, create=True, recreate=False, partition=None, delimiter=u'x01', mssql_conn_id='mssql_default', hive_cli_conn_id='hive_cli_default', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from Microsoft SQL Server to Hive. The operator runs your query against Microsoft SQL Server, stores the file locally before loading it into a Hive table. If the create or recreate arguments are set to True, a CREATE TABLE and DROP TABLE statements are generated. Hive data types are inferred from the cursor’s metadata. Note that the table generated in Hive uses STORED AS textfile which isn’t the most efficient serialization format. If a large amount of data is loaded and/or if the table gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using a HiveOperator. :param sql: SQL query to execute against the Microsoft SQL Server database :type sql: str :param hive_table: target Hive table, use dot notation to target a specific database :type hive_table: str :param create: whether to create the table if it doesn’t exist :type create: bool :param recreate: whether to drop and recreate the table at every execution :type recreate: bool :param partition: target partition as a dict of partition columns and values :type partition: dict :param delimiter: field delimiter in the file :type delimiter: str :param mssql_conn_id: source Microsoft SQL Server connection :type mssql_conn_id: str :param hive_conn_id: destination hive connection :type hive_conn_id: str

class airflow.operators.MySqlOperator(sql, mysql_conn_id='mysql_default', parameters=None, autocommit=False, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Executes sql code in a specific MySQL database

Parameters:
  • mysql_conn_id (string) – reference to a specific mysql database
  • sql (Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in '.sql') – the sql code to be executed
class airflow.operators.MySqlToHiveTransfer(sql, hive_table, create=True, recreate=False, partition=None, delimiter=u'x01', mysql_conn_id='mysql_default', hive_cli_conn_id='hive_cli_default', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from MySql to Hive. The operator runs your query against MySQL, stores the file locally before loading it into a Hive table. If the create or recreate arguments are set to True, a CREATE TABLE and DROP TABLE statements are generated. Hive data types are inferred from the cursor’s metadata. Note that the table generated in Hive uses STORED AS textfile which isn’t the most efficient serialization format. If a large amount of data is loaded and/or if the table gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using a HiveOperator.

Parameters:
  • sql (str) – SQL query to execute against the MySQL database
  • hive_table (str) – target Hive table, use dot notation to target a specific database
  • create (bool) – whether to create the table if it doesn’t exist
  • recreate (bool) – whether to drop and recreate the table at every execution
  • partition (dict) – target partition as a dict of partition columns and values
  • delimiter (str) – field delimiter in the file
  • mysql_conn_id (str) – source mysql connection
  • hive_conn_id (str) – destination hive connection
class airflow.operators.NamedHivePartitionSensor(partition_names, metastore_conn_id='metastore_default', poke_interval=180, *args, **kwargs)[source]

Bases: sensors.BaseSensorOperator

Waits for a set of partitions to show up in Hive.

Parameters:
  • partition_names (list of strings) – List of fully qualified names of the partitions to wait for. A fully qualified name is of the form schema.table/pk1=pv1/pk2=pv2, for example, default.users/ds=2016-01-01. This is passed as is to the metastore Thrift client get_partitions_by_name method. Note that you cannot use logical or comparison operators as in HivePartitionSensor.
  • metastore_conn_id (str) – reference to the metastore thrift service connection id
class airflow.operators.PostgresOperator(sql, postgres_conn_id='postgres_default', autocommit=False, parameters=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Executes sql code in a specific Postgres database

Parameters:
  • postgres_conn_id (string) – reference to a specific postgres database
  • sql (Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in '.sql') – the sql code to be executed
class airflow.operators.PrestoCheckOperator(sql, presto_conn_id='presto_default', *args, **kwargs)[source]

Bases: airflow.operators.check_operator.CheckOperator

Performs checks against Presto. The PrestoCheckOperator expects a sql query that will return a single row. Each value on that first row is evaluated using python bool casting. If any of the values return False the check is failed and errors out.

Note that Python bool casting evals the following as False:

  • False
  • 0
  • Empty string ("")
  • Empty list ([])
  • Empty dictionary or set ({})

Given a query like SELECT COUNT(*) FROM foo, it will fail only if the count == 0. You can craft much more complex query that could, for instance, check that the table has the same number of rows as the source table upstream, or that the count of today’s partition is greater than yesterday’s partition, or that a set of metrics are less than 3 standard deviation for the 7 day average.

This operator can be used as a data quality check in your pipeline, and depending on where you put it in your DAG, you have the choice to stop the critical path, preventing from publishing dubious data, or on the side and receive email alterts without stopping the progress of the DAG.

Parameters:
  • sql (string) – the sql to be executed
  • presto_conn_id (string) – reference to the Presto database
class airflow.operators.PrestoIntervalCheckOperator(table, metrics_thresholds, date_filter_column='ds', days_back=-7, presto_conn_id='presto_default', *args, **kwargs)[source]

Bases: airflow.operators.check_operator.IntervalCheckOperator

Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.

Parameters:
  • table (str) – the table name
  • days_back (int) – number of days between ds and the ds we want to check against. Defaults to 7 days
  • metrics_threshold (dict) – a dictionary of ratios indexed by metrics
  • presto_conn_id (string) – reference to the Presto database
class airflow.operators.PrestoValueCheckOperator(sql, pass_value, tolerance=None, presto_conn_id='presto_default', *args, **kwargs)[source]

Bases: airflow.operators.check_operator.ValueCheckOperator

Performs a simple value check using sql code.

Parameters:
  • sql (string) – the sql to be executed
  • presto_conn_id (string) – reference to the Presto database
class airflow.operators.PythonOperator(python_callable, op_args=None, op_kwargs=None, provide_context=False, templates_dict=None, templates_exts=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Executes a Python callable

Parameters:
  • python_callable (python callable) – A reference to an object that is callable
  • op_kwargs (dict) – a dictionary of keyword arguments that will get unpacked in your function
  • op_args (list) – a list of positional arguments that will get unpacked when calling your callable
  • provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates. For this to work, you need to define **kwargs in your function header.
  • templates_dict (dict of str) – a dictionary where the values are templates that will get templated by the Airflow engine sometime between __init__ and execute takes place and are made available in your callable’s context after the template has been applied
  • templates_exts – a list of file extensions to resolve while processing templated fields, for examples ['.sql', '.hql']
class airflow.operators.S3KeySensor(bucket_key, bucket_name=None, wildcard_match=False, s3_conn_id='s3_default', *args, **kwargs)[source]

Bases: sensors.BaseSensorOperator

Waits for a key (a file-like instance on S3) to be present in a S3 bucket. S3 being a key/value it does not support folders. The path is just a key a resource.

Parameters:
  • bucket_key (str) – The key being waited on. Supports full s3:// style url or relative path from root level.
  • bucket_name (str) – Name of the S3 bucket
  • wildcard_match (bool) – whether the bucket_key should be interpreted as a Unix wildcard pattern
  • s3_conn_id (str) – a reference to the s3 connection
class airflow.operators.S3ToHiveTransfer(s3_key, field_dict, hive_table, delimiter=', ', create=True, recreate=False, partition=None, headers=False, check_headers=False, wildcard_match=False, s3_conn_id='s3_default', hive_cli_conn_id='hive_cli_default', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from S3 to Hive. The operator downloads a file from S3, stores the file locally before loading it into a Hive table. If the create or recreate arguments are set to True, a CREATE TABLE and DROP TABLE statements are generated. Hive data types are inferred from the cursor’s metadata from.

Note that the table generated in Hive uses STORED AS textfile which isn’t the most efficient serialization format. If a large amount of data is loaded and/or if the tables gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using a HiveOperator.

Parameters:
  • s3_key (str) – The key to be retrieved from S3
  • field_dict (dict) – A dictionary of the fields name in the file as keys and their Hive types as values
  • hive_table (str) – target Hive table, use dot notation to target a specific database
  • create (bool) – whether to create the table if it doesn’t exist
  • recreate (bool) – whether to drop and recreate the table at every execution
  • partition (dict) – target partition as a dict of partition columns and values
  • headers (bool) – whether the file contains column names on the first line
  • check_headers (bool) – whether the column names on the first line should be checked against the keys of field_dict
  • wildcard_match (bool) – whether the s3_key should be interpreted as a Unix wildcard pattern
  • delimiter (str) – field delimiter in the file
  • s3_conn_id (str) – source s3 connection
  • hive_conn_id (str) – destination hive connection
class airflow.operators.ShortCircuitOperator(python_callable, op_args=None, op_kwargs=None, provide_context=False, templates_dict=None, templates_exts=None, *args, **kwargs)[source]

Bases: python_operator.PythonOperator

Allows a workflow to continue only if a condition is met. Otherwise, the workflow “short-circuits” and downstream tasks are skipped.

The ShortCircuitOperator is derived from the PythonOperator. It evaluates a condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with a state of “skipped”. If the condition is True, downstream tasks proceed as normal.

The condition is determined by the result of python_callable.

class airflow.operators.SlackAPIOperator(token='unset', method='unset', api_params=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Base Slack Operator The SlackAPIPostOperator is derived from this operator. In the future additional Slack API Operators will be derived from this class as well

Parameters:
construct_api_call_params()[source]

Used by the execute function. Allows templating on the source fields of the api_call_params dict before construction

Override in child classes. Each SlackAPIOperator child class is responsible for having a construct_api_call_params function which sets self.api_call_params with a dict of API call parameters (https://api.slack.com/methods)

execute(**kwargs)[source]

SlackAPIOperator calls will not fail even if the call is not unsuccessful. It should not prevent a DAG from completing in success

class airflow.operators.SlackAPIPostOperator(channel='#general', username='Airflow', text='No message has been set.nHere is a cat video insteadnhttps://www.youtube.com/watch?v=J---aiyznGQ', icon_url='https://raw.githubusercontent.com/airbnb/airflow/master/airflow/www/static/pin_100.png', attachments=None, *args, **kwargs)[source]

Bases: slack_operator.SlackAPIOperator

Posts messages to a slack channel

Parameters:
  • channel (string) – channel in which to post message on slack name (#general) or ID (C12318391)
  • username (string) – Username that airflow will be posting to Slack as
  • text (string) – message to send to slack
  • icon_url (string) – url to icon used for this message
  • attachments (array of hashes) – extra formatting details - see https://api.slack.com/docs/attachments
class airflow.operators.SqlSensor(conn_id, sql, *args, **kwargs)[source]

Bases: sensors.BaseSensorOperator

Runs a sql statement until a criteria is met. It will keep trying until sql returns no row, or if the first cell in (0, ‘0’, ‘’).

Parameters:
  • conn_id (string) – The connection to run the sensor against
  • sql – The sql to run. To pass, it needs to return at least one cell that contains a non-zero / empty string value.
class airflow.operators.TimeSensor(target_time, *args, **kwargs)[source]

Bases: sensors.BaseSensorOperator

Waits until the specified time of the day.

Parameters:target_time (datetime.time) – time after which the job succeeds
class airflow.operators.WebHdfsSensor(filepath, webhdfs_conn_id='webhdfs_default', *args, **kwargs)[source]

Bases: sensors.BaseSensorOperator

Waits for a file or folder to land in HDFS

class airflow.operators.docker_operator.DockerOperator(image, api_version=None, command=None, cpus=1.0, docker_url='unix://var/run/docker.sock', environment=None, force_pull=False, mem_limit=None, network_mode=None, tls_ca_cert=None, tls_client_cert=None, tls_client_key=None, tls_hostname=None, tls_ssl_version=None, tmp_dir='/tmp/airflow', user=None, volumes=None, xcom_push=False, xcom_all=False, *args, **kwargs)[source]

Execute a command inside a docker container.

A temporary directory is created on the host and mounted into a container to allow storing files that together exceed the default disk size of 10GB in a container. The path to the mounted directory can be accessed via the environment variable AIRFLOW_TMP_DIR.

Parameters:
  • image (str) – Docker image from which to create the container.
  • api_version (str) – Remote API version.
  • command (str or list) – Command to be run in the container.
  • cpus (float) – Number of CPUs to assign to the container. This value gets multiplied with 1024. See https://docs.docker.com/engine/reference/run/#cpu-share-constraint
  • docker_url (str) – URL of the host running the docker daemon.
  • environment (dict) – Environment variables to set in the container.
  • force_pull (bool) – Pull the docker image on every run.
  • mem_limit (float or str) – Maximum amount of memory the container can use. Either a float value, which represents the limit in bytes, or a string like 128m or 1g.
  • network_mode (str) – Network mode for the container.
  • tls_ca_cert (str) – Path to a PEM-encoded certificate authority to secure the docker connection.
  • tls_client_cert (str) – Path to the PEM-encoded certificate used to authenticate docker client.
  • tls_client_key (str) – Path to the PEM-encoded key used to authenticate docker client.
  • tls_hostname (str or bool) – Hostname to match against the docker server certificate or False to disable the check.
  • tls_ssl_version (str) – Version of SSL to use when communicating with docker daemon.
  • tmp_dir (str) – Mount point inside the container to a temporary directory created on the host by the operator. The path is also made available via the environment variable AIRFLOW_TMP_DIR inside the container.
  • user (int or str) – Default user inside the docker container.
  • volumes – List of volumes to mount into the container, e.g. ['/host/path:/container/path', '/host/path2:/container/path2:ro'].
  • xcom_push (bool) – Does the stdout will be pushed to the next step using XCom. The default is False.
  • xcom_all (bool) – Push all the stdout or just the last line. The default is False (last line).

Community-contributed Operators

Importer that dynamically loads a class and module from its parent. This allows Airflow to support from airflow.operators import BashOperator even though BashOperator is actually in airflow.operators.bash_operator.

The importer also takes over for the parent_module by wrapping it. This is required to support attribute-based usage:

from airflow import operators
operators.BashOperator(...)
class airflow.contrib.operators.SSHExecuteOperator(ssh_hook, bash_command, xcom_push=False, env=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Execute a Bash script, command or set of commands at remote host.

Parameters:
  • ssh_hook – A SSHHook that indicates the remote host you want to run the script
  • ssh_hook – SSHHook
  • bash_command (string) – The command, set of commands or reference to a bash script (must be ‘.sh’) to be executed.
  • env (dict) – If env is not None, it must be a mapping that defines the environment variables for the new process; these are used instead of inheriting the current process environment, which is the default behavior.
class airflow.contrib.operators.VerticaOperator(sql, vertica_conn_id='vertica_default', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Executes sql code in a specific Vertica database

Parameters:
  • vertica_conn_id (string) – reference to a specific Vertica database
  • sql (Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in '.sql') – the sql code to be executed
class airflow.contrib.operators.VerticaToHiveTransfer(sql, hive_table, create=True, recreate=False, partition=None, delimiter=u'x01', vertica_conn_id='vertica_default', hive_cli_conn_id='hive_cli_default', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from Vertia to Hive. The operator runs your query against Vertia, stores the file locally before loading it into a Hive table. If the create or recreate arguments are set to True, a CREATE TABLE and DROP TABLE statements are generated. Hive data types are inferred from the cursor’s metadata. Note that the table generated in Hive uses STORED AS textfile which isn’t the most efficient serialization format. If a large amount of data is loaded and/or if the table gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using a HiveOperator.

Parameters:
  • sql (str) – SQL query to execute against the Vertia database
  • hive_table (str) – target Hive table, use dot notation to target a specific database
  • create (bool) – whether to create the table if it doesn’t exist
  • recreate (bool) – whether to drop and recreate the table at every execution
  • partition (dict) – target partition as a dict of partition columns and values
  • delimiter (str) – field delimiter in the file
  • vertica_conn_id (str) – source Vertica connection
  • hive_conn_id (str) – destination hive connection
class airflow.contrib.operators.bigquery_operator.BigQueryOperator(bql, destination_dataset_table=False, write_disposition='WRITE_EMPTY', allow_large_results=False, bigquery_conn_id='bigquery_default', delegate_to=None, udf_config=False, *args, **kwargs)[source]

Executes BigQuery SQL queries in a specific BigQuery database

class airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator(source_project_dataset_table, destination_cloud_storage_uris, compression='NONE', export_format='CSV', field_delimiter=', ', print_header=True, bigquery_conn_id='bigquery_default', delegate_to=None, *args, **kwargs)[source]

Transfers a BigQuery table to a Google Cloud Storage bucket.

class airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator(bucket, object, filename=False, store_to_xcom_key=False, google_cloud_storage_conn_id='google_cloud_storage_default', delegate_to=None, *args, **kwargs)[source]

Downloads a file from Google Cloud Storage.

class airflow.contrib.operators.QuboleOperator(qubole_conn_id='qubole_default', *args, **kwargs)[source]

Execute tasks (commands) on QDS (https://qubole.com).

Parameters:qubole_conn_id (str) – Connection id which consists of qds auth_token
kwargs:
command_type:type of command to be executed, e.g. hivecmd, shellcmd, hadoopcmd
tags:array of tags to be assigned with the command
cluster_label:cluster label on which the command will be executed
name:name to be given to command

Arguments specific to command types

hivecmd:
query:inline query statement
script_location:
 s3 location containing query statement
sample_size:size of sample in bytes on which to run query
macros:macro values which were used in query
prestocmd:
query:inline query statement
script_location:
 s3 location containing query statement
macros:macro values which were used in query
hadoopcmd:
sub_commnad:must be one these [“jar”, “s3distcp”, “streaming”] followed by 1 or more args
shellcmd:
script:inline command with args
script_location:
 s3 location containing query statement
files:list of files in s3 bucket as file1,file2 format. These files will be copied into the working directory where the qubole command is being executed.
archives:list of archives in s3 bucket as archive1,archive2 format. These will be unarchived intothe working directory where the qubole command is being executed
parameters:any extra args which need to be passed to script (only when script_location is supplied)
pigcmd:
script:inline query statement (latin_statements)
script_location:
 s3 location containing pig query
parameters:any extra args which need to be passed to script (only when script_location is supplied
sparkcmd:
program:the complete Spark Program in Scala, SQL, Command, R, or Python
cmdline:spark-submit command line, all required information must be specify in cmdline itself.
sql:inline sql query
script_location:
 s3 location containing query statement
language:language of the program, Scala, SQL, Command, R, or Python
app_id:ID of an Spark job server app
arguments:spark-submit command line arguments
user_program_arguments:
 arguments that the user program takes in
macros:macro values which were used in query
dbtapquerycmd:
db_tap_id:data store ID of the target database, in Qubole.
query:inline query statement
macros:macro values which were used in query
dbexportcmd:
mode:1 (simple), 2 (advance)
hive_table:Name of the hive table
partition_spec:partition specification for Hive table.
dbtap_id:data store ID of the target database, in Qubole.
db_table:name of the db table
db_update_mode:allowinsert or updateonly
db_update_keys:columns used to determine the uniqueness of rows
export_dir:HDFS/S3 location from which data will be exported.
fields_terminated_by:
 hex of the char used as column separator in the dataset.
dbimportcmd:
mode:1 (simple), 2 (advance)
hive_table:Name of the hive table
dbtap_id:data store ID of the target database, in Qubole.
db_table:name of the db table
where_clause:where clause, if any
parallelism:number of parallel db connections to use for extracting data
extract_query:SQL query to extract data from db. $CONDITIONS must be part of the where clause.
boundary_query:Query to be used get range of row IDs to be extracted
split_column:Column used as row ID to split data into ranges (mode 2)

Note

Following fields are template-supported : query, script_location, sub_command, script, files, archives, program, cmdline, sql, where_clause, extract_query, boundary_query, macros, tags, name, parameters. You can also use .txt files for template driven use cases.

class airflow.contrib.operators.hipchat_operator.HipChatAPIOperator(token, base_url='https://api.hipchat.com/v2', *args, **kwargs)[source]

Base HipChat Operator. All derived HipChat operators reference from HipChat’s official REST API documentation at https://www.hipchat.com/docs/apiv2. Before using any HipChat API operators you need to get an authentication token at https://www.hipchat.com/docs/apiv2/auth. In the future additional HipChat operators will be derived from this class as well.

Parameters:
  • token (str) – HipChat REST API authentication token
  • base_url (str) – HipChat REST API base url.
class airflow.contrib.operators.hipchat_operator.HipChatAPISendRoomNotificationOperator(room_id, message, *args, **kwargs)[source]

Send notification to a specific HipChat room. More info: https://www.hipchat.com/docs/apiv2/method/send_room_notification

Parameters:
  • room_id (str) – Room in which to send notification on HipChat
  • message (str) – The message body
  • frm (str) – Label to be shown in addition to sender’s name
  • message_format (str) – How the notification is rendered: html or text
  • color (str) – Background color of the msg: yellow, green, red, purple, gray, or random
  • attach_to (str) – The message id to attach this notification to
  • notify (bool) – Whether this message should trigger a user notification
  • card (dict) – HipChat-defined card object

Macros

Here’s a list of variables and macros that can be used in templates

Default Variables

The Airflow engine passes a few variables by default that are accessible in all templates

Variable Description
{{ ds }} the execution date as YYYY-MM-DD
{{ ds_nodash }} the execution date as YYYYMMDD
{{ yesterday_ds }} yesterday’s date as YYYY-MM-DD
{{ yesterday_ds_nodash }} yesterday’s date as YYYYMMDD
{{ tomorrow_ds }} tomorrow’s date as YYYY-MM-DD
{{ tomorrow_ds_nodash }} tomorrow’s date as YYYYMMDD
{{ ts }} same as execution_date.isoformat()
{{ ts_nodash }} same as ts without - and :
{{ execution_date }} the execution_date, (datetime.datetime)
{{ dag }} the DAG object
{{ task }} the Task object
{{ macros }} a reference to the macros package, described below
{{ task_instance }} the task_instance object
{{ end_date }} same as {{ ds }}
{{ latest_date }} same as {{ ds }}
{{ ti }} same as {{ task_instance }}
{{ params }} a reference to the user-defined params dictionary
{{ var.value.my_var }} global defined variables represented as a dictionary
{{ var.json.my_var.path }} global defined variables represented as a dictionary with deserialized JSON object, append the path to the key within the JSON object
{{ task_instance_key_str }} a unique, human-readable key to the task instance formatted {dag_id}_{task_id}_{ds}
conf the full configuration object located at airflow.configuration.conf which represents the content of your airflow.cfg
run_id the run_id of the current DAG run
dag_run a reference to the DagRun object
test_mode whether the task instance was called using the CLI’s test subcommand

Note that you can access the object’s attributes and methods with simple dot notation. Here are some examples of what is possible: {{ task.owner }}, {{ task.task_id }}, {{ ti.hostname }}, ... Refer to the models documentation for more information on the objects’ attributes and methods.

The var template variable allows you to access variables defined in Airflow’s UI. You can access them as either plain-text or JSON. If you use JSON, you are also able to walk nested structures, such as dictionaries like: {{ var.json.my_dict_var.key1 }}

Macros

Macros are a way to expose objects to your templates and live under the macros namespace in your templates.

A few commonly used libraries and methods are made available.

Variable Description
macros.datetime The standard lib’s datetime.datetime
macros.timedelta The standard lib’s datetime.timedelta
macros.dateutil A reference to the dateutil package
macros.time The standard lib’s time
macros.uuid The standard lib’s uuid
macros.random The standard lib’s random

Some airflow specific macros are also defined:

airflow.macros.ds_add(ds, days)[source]

Add or subtract days from a YYYY-MM-DD

Parameters:
  • ds (str) – anchor date in YYYY-MM-DD format to add to
  • days (int) – number of days to add to the ds, you can use negative values
>>> ds_add('2015-01-01', 5)
'2015-01-06'
>>> ds_add('2015-01-06', -5)
'2015-01-01'
airflow.macros.ds_format(ds, input_format, output_format)[source]

Takes an input string and outputs another string as specified in the output format

Parameters:
  • ds (str) – input string which contains a date
  • input_format (str) – input string format. E.g. %Y-%m-%d
  • output_format (str) – output string format E.g. %Y-%m-%d
>>> ds_format('2015-01-01', "%Y-%m-%d", "%m-%d-%y")
'01-01-15'
>>> ds_format('1/5/2015', "%m/%d/%Y",  "%Y-%m-%d")
'2015-01-05'
airflow.macros.random() → x in the interval [0, 1).
airflow.macros.hive.closest_ds_partition(table, ds, before=True, schema='default', metastore_conn_id='metastore_default')[source]

This function finds the date in a list closest to the target date. An optional parameter can be given to get the closest before or after.

Parameters:
  • table (str) – A hive table name
  • ds (datetime.date list) – A datestamp %Y-%m-%d e.g. yyyy-mm-dd
  • before (bool or None) – closest before (True), after (False) or either side of ds
Returns:

The closest date

Return type:

str or None

>>> tbl = 'airflow.static_babynames_partitioned'
>>> closest_ds_partition(tbl, '2015-01-02')
'2015-01-01'
airflow.macros.hive.max_partition(table, schema='default', field=None, filter=None, metastore_conn_id='metastore_default')[source]

Gets the max partition for a table.

Parameters:
  • schema (string) – The hive schema the table lives in
  • table (string) – The hive table you are interested in, supports the dot notation as in “my_database.my_table”, if a dot is found, the schema param is disregarded
  • hive_conn_id (string) – The hive connection you are interested in. If your default is set you don’t need to use this parameter.
  • filter (string) – filter on a subset of partition as in sub_part=’specific_value’
  • field – the field to get the max value from. If there’s only one partition field, this will be inferred
>>> max_partition('airflow.static_babynames_partitioned')
'2015-01-01'

Models

Models are built on top of the SQLAlchemy ORM Base class, and instances are persisted in the database.

class airflow.models.DAG(dag_id, schedule_interval=datetime.timedelta(1), start_date=None, end_date=None, full_filepath=None, template_searchpath=None, user_defined_macros=None, default_args=None, concurrency=16, max_active_runs=16, dagrun_timeout=None, sla_miss_callback=None, params=None)[source]

Bases: airflow.dag.base_dag.BaseDag, airflow.utils.logging.LoggingMixin

A dag (directed acyclic graph) is a collection of tasks with directional dependencies. A dag also has a schedule, a start end an end date (optional). For each schedule, (say daily or hourly), the DAG needs to run each individual tasks as their dependencies are met. Certain tasks have the property of depending on their own past, meaning that they can’t run until their previous schedule (and upstream tasks) are completed.

DAGs essentially act as namespaces for tasks. A task_id can only be added once to a DAG.

Parameters:
  • dag_id (string) – The id of the DAG
  • schedule_interval (datetime.timedelta or dateutil.relativedelta.relativedelta or str that acts as a cron expression) – Defines how often that DAG runs, this timedelta object gets added to your latest task instance’s execution_date to figure out the next schedule
  • start_date (datetime.datetime) – The timestamp from which the scheduler will attempt to backfill
  • end_date (datetime.datetime) – A date beyond which your DAG won’t run, leave to None for open ended scheduling
  • template_searchpath (string or list of stings) – This list of folders (non relative) defines where jinja will look for your templates. Order matters. Note that jinja/airflow includes the path of your DAG file by default
  • user_defined_macros (dict) – a dictionary of macros that will be exposed in your jinja templates. For example, passing dict(foo='bar') to this argument allows you to {{ foo }} in all jinja templates related to this DAG. Note that you can pass any type of object here.
  • default_args (dict) – A dictionary of default parameters to be used as constructor keyword parameters when initialising operators. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains ‘depends_on_past’: True here and ‘depends_on_past’: False in the operator’s call default_args, the actual value will be False.
  • params (dict) – a dictionary of DAG level parameters that are made accessible in templates, namespaced under params. These params can be overridden at the task level.
  • concurrency (int) – the number of task instances allowed to run concurrently
  • max_active_runs (int) – maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won’t create new active DAG runs
  • dagrun_timeout (datetime.timedelta) – specify how long a DagRun should be up before timing out / failing, so that new DagRuns can be created
  • sla_miss_callback (types.FunctionType) – specify a function to call when reporting SLA timeouts.
add_task(task)[source]

Add a task to the DAG

Parameters:task (task) – the task you want to add
add_tasks(tasks)[source]

Add a list of tasks to the DAG

Parameters:task (list of tasks) – a lit of tasks you want to add
clear(start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, reset_dag_runs=True, dry_run=False)[source]

Clears a set of task instances associated with the current dag for a specified date range.

cli()[source]

Exposes a CLI specific to this DAG

concurrency_reached

Returns a boolean indicating whether the concurrency limit for this DAG has been reached

crawl_for_tasks(objects)[source]

Typically called at the end of a script by passing globals() as a parameter. This allows to not explicitly add every single task to the dag explicitly.

create_dagrun(*args, **kwargs)[source]

Creates a dag run from this dag including the tasks associated with this dag. Returns the dag run. :param run_id: defines the the run id for this dag run :type run_id: string :param execution_date: the execution date of this dag run :type execution_date: datetime :param state: the state of the dag run :type state: State :param start_date: the date this dag run should be evaluated :type state_date: datetime :param external_trigger: whether this dag run is externally triggered :type external_trigger: bool :param session: database session :type session: Session

static deactivate_stale_dags(*args, **kwargs)[source]

Deactivate any DAGs that were last touched by the scheduler before the expiration date. These DAGs were likely deleted.

Parameters:expiration_date – set inactive DAGs that were touched before this

time :type expiration_date: datetime :return: None

static deactivate_unknown_dags(*args, **kwargs)[source]

Given a list of known DAGs, deactivate any other DAGs that are marked as active in the ORM

Parameters:active_dag_ids (list[unicode]) – list of DAG IDs that are active
Returns:None
filepath

File location of where the dag object is instantiated

folder

Folder location of where the dag object is instantiated

get_template_env()[source]

Returns a jinja2 Environment while taking into account the DAGs template_searchpath and user_defined_macros

is_paused

Returns a boolean indicating whether this DAG is paused

latest_execution_date

Returns the latest date for which at least one task instance exists

normalize_schedule(dttm)[source]

Returns dttm + interval unless dttm is first interval then it returns dttm

run(start_date=None, end_date=None, mark_success=False, include_adhoc=False, local=False, executor=None, donot_pickle=False, ignore_dependencies=False, ignore_first_depends_on_past=False, pool=None)[source]

Runs the DAG.

set_dependency(upstream_task_id, downstream_task_id)[source]

Simple utility method to set dependency between two tasks that already have been added to the DAG using add_task()

sub_dag(task_regex, include_downstream=False, include_upstream=True)[source]

Returns a subset of the current dag as a deep copy of the current dag based on a regex that should match one or many tasks, and includes upstream and downstream neighbours based on the flag passed.

subdags

Returns a list of the subdag objects associated to this DAG

static sync_to_db(*args, **kwargs)[source]

Save attributes about this DAG to the DB. Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator.

Parameters:dag (DAG) – the DAG object to save to the DB

:own :param sync_time: The time that the DAG should be marked as sync’ed :type sync_time: datetime :return: None

tree_view()[source]

Shows an ascii tree representation of the DAG

class airflow.models.BaseOperator(task_id, owner='airflow', email=None, email_on_retry=True, email_on_failure=True, retries=0, retry_delay=datetime.timedelta(0, 300), retry_exponential_backoff=False, max_retry_delay=None, start_date=None, end_date=None, schedule_interval=None, depends_on_past=False, wait_for_downstream=False, dag=None, params=None, default_args=None, adhoc=False, priority_weight=1, queue='default', pool=None, sla=None, execution_timeout=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, trigger_rule=u'all_success', resources=None, *args, **kwargs)[source]

Bases: future.types.newobject.newobject

Abstract base class for all operators. Since operators create objects that become node in the dag, BaseOperator contains many recursive methods for dag crawling behavior. To derive this class, you are expected to override the constructor as well as the ‘execute’ method.

Operators derived from this task should perform or trigger certain tasks synchronously (wait for completion). Example of operators could be an operator the runs a Pig job (PigOperator), a sensor operator that waits for a partition to land in Hive (HiveSensorOperator), or one that moves data from Hive to MySQL (Hive2MySqlOperator). Instances of these operators (tasks) target specific operations, running specific scripts, functions or data transfers.

This class is abstract and shouldn’t be instantiated. Instantiating a class derived from this one results in the creation of a task object, which ultimately becomes a node in DAG objects. Task dependencies should be set by using the set_upstream and/or set_downstream methods.

Note that this class is derived from SQLAlchemy’s Base class, which allows us to push metadata regarding tasks to the database. Deriving this classes needs to implement the polymorphic specificities documented in SQLAlchemy. This should become clear while reading the code for other operators.

Parameters:
  • task_id (string) – a unique, meaningful id for the task
  • owner (string) – the owner of the task, using the unix username is recommended
  • retries (int) – the number of retries that should be performed before failing the task
  • retry_delay (timedelta) – delay between retries
  • retry_exponential_backoff (bool) – allow progressive longer waits between retries by using exponential backoff algorithm on retry delay (delay will be converted into seconds)
  • max_retry_delay (timedelta) – maximum delay interval between retries
  • start_date (datetime) – The start_date for the task, determines the execution_date for the first task instance. The best practice is to have the start_date rounded to your DAG’s schedule_interval. Daily jobs have their start_date some day at 00:00:00, hourly jobs have their start_date at 00:00 of a specific hour. Note that Airflow simply looks at the latest execution_date and adds the schedule_interval to determine the next execution_date. It is also very important to note that different tasks’ dependencies need to line up in time. If task A depends on task B and their start_date are offset in a way that their execution_date don’t line up, A’s dependencies will never be met. If you are looking to delay a task, for example running a daily task at 2AM, look into the TimeSensor and TimeDeltaSensor. We advise against using dynamic start_date and recommend using fixed ones. Read the FAQ entry about start_date for more information.
  • end_date (datetime) – if specified, the scheduler won’t go beyond this date
  • depends_on_past (bool) – when set to true, task instances will run sequentially while relying on the previous task’s schedule to succeed. The task instance for the start_date is allowed to run.
  • wait_for_downstream (bool) – when set to true, an instance of task X will wait for tasks immediately downstream of the previous instance of task X to finish successfully before it runs. This is useful if the different instances of a task X alter the same asset, and this asset is used by tasks downstream of task X. Note that depends_on_past is forced to True wherever wait_for_downstream is used.
  • queue (str) – which queue to target when running this job. Not all executors implement queue management, the CeleryExecutor does support targeting specific queues.
  • dag (DAG) – a reference to the dag the task is attached to (if any)
  • priority_weight (int) – priority weight of this task against other task. This allows the executor to trigger higher priority tasks before others when things get backed up.
  • pool (str) – the slot pool this task should run in, slot pools are a way to limit concurrency for certain tasks
  • sla (datetime.timedelta) – time by which the job is expected to succeed. Note that this represents the timedelta after the period is closed. For example if you set an SLA of 1 hour, the scheduler would send dan email soon after 1:00AM on the 2016-01-02 if the 2016-01-01 instance has not succeede yet. The scheduler pays special attention for jobs with an SLA and sends alert emails for sla misses. SLA misses are also recorded in the database for future reference. All tasks that share the same SLA time get bundled in a single email, sent soon after that time. SLA notification are sent once and only once for each task instance.
  • execution_timeout (datetime.timedelta) – max time allowed for the execution of this task instance, if it goes beyond it will raise and fail.
  • on_failure_callback (callable) – a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.
  • on_retry_callback – much like the on_failure_callback excepts that it is executed when retries occur.
  • on_success_callback (callable) – much like the on_failure_callback excepts that it is executed when the task succeeds.
  • trigger_rule (str) – defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | dummy} default is all_success. Options can be set as string or using the constants defined in the static class airflow.utils.TriggerRule
  • resources (dict) – A map of resource parameter names (the argument names of the Resources constructor) to their values.
clear(start_date=None, end_date=None, upstream=False, downstream=False)[source]

Clears the state of task instances associated with the task, following the parameters specified.

dag

Returns the Operator’s DAG if set, otherwise raises an error

detect_downstream_cycle(task=None)[source]

When invoked, this routine will raise an exception if a cycle is detected downstream from self. It is invoked when tasks are added to the DAG to detect cycles.

downstream_list

@property: list of tasks directly downstream

execute(context)[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

get_direct_relatives(upstream=False)[source]

Get the direct relatives to the current task, upstream or downstream.

get_flat_relatives(upstream=False, l=None)[source]

Get a flat list of relatives, either upstream or downstream.

get_task_instances(session, start_date=None, end_date=None)[source]

Get a set of task instance related to this task for a specific date range.

has_dag()[source]

Returns True if the Operator has been assigned to a DAG.

on_kill()[source]

Override this method to cleanup subprocesses when a task instance gets killed. Any use of the threading, subprocess or multiprocessing module within an operator needs to be cleaned up or it will leave ghost processes behind.

post_execute(context)[source]

This is triggered right after self.execute, it’s mostly a hook for people deriving operators.

pre_execute(context)[source]

This is triggered right before self.execute, it’s mostly a hook for people deriving operators.

prepare_template()[source]

Hook that is triggered after the templated fields get replaced by their content. If you need your operator to alter the content of the file before the template is rendered, it should override this method to do so.

render_template(attr, content, context)[source]

Renders a template either from a file or directly in a field, and returns the rendered result.

render_template_from_field(attr, content, context, jinja_env)[source]

Renders a template from a field. If the field is a string, it will simply render the string and return the result. If it is a collection or nested set of collections, it will traverse the structure and render all strings in it.

run(start_date=None, end_date=None, ignore_dependencies=False, ignore_first_depends_on_past=False, force=False, mark_success=False)[source]

Run a set of task instances for a date range.

schedule_interval

The schedule interval of the DAG always wins over individual tasks so that tasks within a DAG always line up. The task still needs a schedule_interval as it may not be attached to a DAG.

set_downstream(task_or_task_list)[source]

Set a task, or a task task to be directly downstream from the current task.

set_upstream(task_or_task_list)[source]

Set a task, or a task task to be directly upstream from the current task.

upstream_list

@property: list of tasks directly upstream

xcom_pull(context, task_ids, dag_id=None, key=u'return_value', include_prior_dates=None)[source]

See TaskInstance.xcom_pull()

xcom_push(context, key, value, execution_date=None)[source]

See TaskInstance.xcom_push()

class airflow.models.TaskInstance(task, execution_date, state=None)[source]

Bases: sqlalchemy.ext.declarative.api.Base

Task instances store the state of a task instance. This table is the authority and single source of truth around what tasks have run and the state they are in.

The SqlAchemy model doesn’t have a SqlAlchemy foreign key to the task or dag model deliberately to have more control over transactions.

Database transactions on this table should insure double triggers and any confusion around what task instances are or aren’t ready to run even while multiple schedulers may be firing task instances.

are_dependencies_met(*args, **kwargs)[source]

Returns a boolean on whether the upstream tasks are in a SUCCESS state and considers depends_on_past and the previous run’s state.

Parameters:
  • flag_upstream_failed (boolean) – This is a hack to generate the upstream_failed state creation while checking to see whether the task instance is runnable. It was the shortest path to add the feature
  • ignore_depends_on_past (boolean) – if True, ignores depends_on_past dependencies. Defaults to False.
  • verbose (boolean) – verbose provides more logging in the case where the task instance is evaluated as a check right before being executed. In the case of the scheduler evaluating the dependencies, this logging would be way too verbose.
are_dependents_done(*args, **kwargs)[source]

Checks whether the dependents of this task instance have all succeeded. This is meant to be used by wait_for_downstream.

This is useful when you do not want to start processing the next schedule of a task until the dependents are done. For instance, if the task DROPs and recreates a table.

clear_xcom_data(*args, **kwargs)[source]

Clears all XCom data from the database for the task instance

command(mark_success=False, ignore_dependencies=False, ignore_depends_on_past=False, force=False, local=False, pickle_id=None, raw=False, job_id=None, pool=None)[source]

Returns a command that can be executed anywhere where airflow is installed. This command is part of the message sent to executors by the orchestrator.

current_state(*args, **kwargs)[source]

Get the very latest state from the database, if a session is passed, we use and looking up the state becomes part of the session, otherwise a new session is used.

error(*args, **kwargs)[source]

Forces the task instance’s state to FAILED in the database.

evaluate_trigger_rule(*args, **kwargs)[source]

Returns a boolean on whether the current task can be scheduled for execution based on its trigger_rule.

Parameters:
  • flag_upstream_failed (boolean) – This is a hack to generate the upstream_failed state creation while checking to see whether the task instance is runnable. It was the shortest path to add the feature
  • successes (boolean) – Number of successful upstream tasks
  • skipped (boolean) – Number of skipped upstream tasks
  • failed (boolean) – Number of failed upstream tasks
  • upstream_failed (boolean) – Number of upstream_failed upstream tasks
  • done (boolean) – Number of completed upstream tasks
static generate_command(dag_id, task_id, execution_date, mark_success=False, ignore_dependencies=False, ignore_depends_on_past=False, force=False, local=False, pickle_id=None, file_path=None, raw=False, job_id=None, pool=None)[source]

Generates the shell command required to execute this task instance.

Parameters:
  • dag_id (unicode) – DAG ID
  • task_id (unicode) – Task ID
  • execution_date (datetime) – Execution date for the task
  • mark_success (bool) – Whether to mark the task as successful
  • ignore_dependencies – Whether to ignore the dependencies and run

anyway :type ignore_dependencies: bool :param ignore_depends_on_past: Whether to ignore the depends on past setting and run anyway :type ignore_depends_on_past: bool :param force: Whether to force running - see TaskInstance.run() :type force: bool :param local: Whether to run the task locally :type local: bool :param pickle_id: If the DAG was serialized to the DB, the ID associated with the pickled DAG :type pickle_id: unicode :param file_path: path to the file containing the DAG definition :param raw: raw mode (needs more details) :param job_id: job ID (needs more details) :param pool: the Airflow pool that the task should run in :type pool: unicode :return: shell command that can be used to run the task instance

is_premature()[source]

Returns whether a task is in UP_FOR_RETRY state and its retry interval has elapsed.

is_queueable(include_queued=False, ignore_depends_on_past=False, flag_upstream_failed=False)[source]

Returns a boolean on whether the task instance has met all dependencies and is ready to run. It considers the task’s state, the state of its dependencies, depends_on_past and makes sure the execution isn’t in the future. It doesn’t take into account whether the pool has a slot for it to run.

Parameters:
  • include_queued (boolean) – If True, tasks that have already been queued are included. Defaults to False.
  • ignore_depends_on_past (boolean) – if True, ignores depends_on_past dependencies. Defaults to False.
  • flag_upstream_failed (boolean) – This is a hack to generate the upstream_failed state creation while checking to see whether the task instance is runnable. It was the shortest path to add the feature
is_runnable(include_queued=False, ignore_depends_on_past=False, flag_upstream_failed=False)[source]

Returns whether a task is ready to run AND there’s room in the queue.

Parameters:
  • include_queued (boolean) – If True, tasks that are already QUEUED are considered “runnable”. Defaults to False.
  • ignore_depends_on_past (boolean) – if True, ignores depends_on_past dependencies. Defaults to False.
key

Returns a tuple that identifies the task instance uniquely

next_retry_datetime()[source]

Get datetime of the next retry if the task instance fails. For exponential backoff, retry_delay is used as base and will be converted to seconds.

pool_full(*args, **kwargs)[source]

Returns a boolean as to whether the slot pool has room for this task to run

ready_for_retry()[source]

Checks on whether the task instance is in the right state and timeframe to be retried.

refresh_from_db(*args, **kwargs)[source]

Refreshes the task instance from the database based on the primary key

Parameters:lock_for_update – if True, indicates that the database should lock the TaskInstance (issuing a FOR UPDATE clause) until the session is committed.
run(*args, **kwargs)[source]

Runs the task instance.

Parameters:
  • verbose (boolean) – whether to turn on more verbose loggin
  • ignore_dependencies (boolean) – Doesn’t check for deps, just runs
  • ignore_depends_on_past (boolean) – Ignore depends_on_past but respect other dependencies
  • force (boolean) – Forces a run regarless of previous success
  • mark_success (boolean) – Don’t run the task, mark its state as success
  • test_mode (boolean) – Doesn’t record success or failure in the DB
  • pool (str) – specifies the pool to use to run the task instance
xcom_pull(task_ids, dag_id=None, key=u'return_value', include_prior_dates=False)[source]

Pull XComs that optionally meet certain criteria.

The default value for key limits the search to XComs that were returned by other tasks (as opposed to those that were pushed manually). To remove this filter, pass key=None (or any desired value).

If a single task_id string is provided, the result is the value of the most recent matching XCom from that task_id. If multiple task_ids are provided, a tuple of matching values is returned. None is returned whenever no matches are found.

Parameters:
  • key (string) – A key for the XCom. If provided, only XComs with matching keys will be returned. The default key is ‘return_value’, also available as a constant XCOM_RETURN_KEY. This key is automatically given to XComs returned by tasks (as opposed to being pushed manually). To remove the filter, pass key=None.
  • task_ids (string or iterable of strings (representing task_ids)) – Only XComs from tasks with matching ids will be pulled. Can pass None to remove the filter.
  • dag_id (string) – If provided, only pulls XComs from this DAG. If None (default), the DAG of the calling task is used.
  • include_prior_dates (bool) – If False, only XComs from the current execution_date are returned. If True, XComs from previous dates are returned as well.
xcom_push(key, value, execution_date=None)[source]

Make an XCom available for tasks to pull.

Parameters:
  • key (string) – A key for the XCom
  • value (any pickleable object) – A value for the XCom. The value is pickled and stored in the database.
  • execution_date (datetime) – if provided, the XCom will not be visible until this date. This can be used, for example, to send a message to a task on a future date without it being immediately visible.
class airflow.models.DagBag(dag_folder=None, executor=<airflow.executors.local_executor.LocalExecutor object>, include_examples=True)[source]

Bases: airflow.dag.base_dag.BaseDagBag, airflow.utils.logging.LoggingMixin

A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings, like what database to use as a backend and what executor to use to fire off tasks. This makes it easier to run distinct environments for say production and development, tests, or for different teams or security profiles. What would have been system level settings are now dagbag level so that one system can run multiple, independent settings sets.

Parameters:
  • dag_folder (unicode) – the folder to scan to find DAGs
  • executor – the executor to use when executing task instances in this DagBag
  • include_examples (bool) – whether to include the examples that ship with airflow or not
  • sync_to_db (bool) – whether to sync the properties of the DAGs to the metadata DB while finding them, typically should be done by the scheduler job only
bag_dag(dag, parent_dag, root_dag)[source]

Adds the DAG into the bag, recurses into sub dags.

collect_dags(dag_folder=None, only_if_updated=True)[source]

Given a file path or a folder, this method looks for python modules, imports them and adds them to the dagbag collection.

Note that if a .airflowignore file is found while processing, the directory, it will behaves much like a .gitignore does, ignoring files that match any of the regex patterns specified in the file.

dagbag_report()[source]

Prints a report around DagBag loading stats

get_dag(dag_id)[source]

Gets the DAG out of the dictionary, and refreshes it if expired

kill_zombies(*args, **kwargs)[source]

Fails tasks that haven’t had a heartbeat in too long

process_file(filepath, only_if_updated=True, safe_mode=True)[source]

Given a path to a python module or zip file, this method imports the module and look for dag objects within it.

size()[source]
Returns:the amount of dags contained in this dagbag
class airflow.models.Connection(conn_id=None, conn_type=None, host=None, login=None, password=None, schema=None, port=None, extra=None, uri=None)[source]

Bases: sqlalchemy.ext.declarative.api.Base

Placeholder to store information about different database instances connection information. The idea here is that scripts use references to database instances (conn_id) instead of hard coding hostname, logins and passwords when using operators or hooks.

extra_dejson

Returns the extra property by deserializing json

Hooks

Importer that dynamically loads a class and module from its parent. This allows Airflow to support from airflow.operators import BashOperator even though BashOperator is actually in airflow.operators.bash_operator.

The importer also takes over for the parent_module by wrapping it. This is required to support attribute-based usage:

from airflow import operators
operators.BashOperator(...)
class airflow.hooks.DbApiHook(*args, **kwargs)[source]

Bases: airflow.hooks.base_hook.BaseHook

Abstract base class for sql hooks.

bulk_dump(table, tmp_file)[source]

Dumps a database table into a tab-delimited file

Parameters:
  • table (str) – The name of the source table
  • tmp_file (str) – The path of the target file
bulk_load(table, tmp_file)[source]

Loads a tab-delimited file into a database table

Parameters:
  • table (str) – The name of the target table
  • tmp_file (str) – The path of the file to load into the table
get_conn()[source]

Returns a connection object

get_cursor()[source]

Returns a cursor

get_first(sql, parameters=None)[source]

Executes the sql and returns the first resulting row.

Parameters:
  • sql (str or list) – the sql statement to be executed (str) or a list of sql statements to execute
  • parameters (mapping or iterable) – The parameters to render the SQL query with.
get_pandas_df(sql, parameters=None)[source]

Executes the sql and returns a pandas dataframe

Parameters:
  • sql (str or list) – the sql statement to be executed (str) or a list of sql statements to execute
  • parameters (mapping or iterable) – The parameters to render the SQL query with.
get_records(sql, parameters=None)[source]

Executes the sql and returns a set of records.

Parameters:
  • sql (str or list) – the sql statement to be executed (str) or a list of sql statements to execute
  • parameters (mapping or iterable) – The parameters to render the SQL query with.
insert_rows(table, rows, target_fields=None, commit_every=1000)[source]

A generic way to insert a set of tuples into a table, the whole set of inserts is treated as one transaction

Parameters:
  • table (str) – Name of the target table
  • rows (iterable of tuples) – The rows to insert into the table
  • target_fields (iterable of strings) – The names of the columns to fill in the table
  • commit_every (int) – The maximum number of rows to insert in one transaction. Set to 0 to insert all rows in one transaction.
run(sql, autocommit=False, parameters=None)[source]

Runs a command or a list of commands. Pass a list of sql statements to the sql parameter to get them to execute sequentially

Parameters:
  • sql (str or list) – the sql statement to be executed (str) or a list of sql statements to execute
  • autocommit (bool) – What to set the connection’s autocommit setting to before executing the query.
  • parameters (mapping or iterable) – The parameters to render the SQL query with.
class airflow.hooks.HiveCliHook(hive_cli_conn_id='hive_cli_default', run_as=None, mapred_queue=None, mapred_queue_priority=None, mapred_job_name=None)[source]

Bases: airflow.hooks.base_hook.BaseHook

Simple wrapper around the hive CLI.

It also supports the beeline a lighter CLI that runs JDBC and is replacing the heavier traditional CLI. To enable beeline, set the use_beeline param in the extra field of your connection as in { "use_beeline": true }

Note that you can also set default hive CLI parameters using the hive_cli_params to be used in your connection as in {"hive_cli_params": "-hiveconf mapred.job.tracker=some.jobtracker:444"} Parameters passed here can be overridden by run_cli’s hive_conf param

The extra connection parameter auth gets passed as in the jdbc connection string as is.

Parameters:
  • mapred_queue (string) – queue used by the Hadoop Scheduler (Capacity or Fair)
  • mapred_queue_priority (string) – priority within the job queue. Possible settings include: VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW
  • mapred_job_name (string) – This name will appear in the jobtracker. This can make monitoring easier.
load_file(filepath, table, delimiter=', ', field_dict=None, create=True, overwrite=True, partition=None, recreate=False)[source]

Loads a local file into Hive

Note that the table generated in Hive uses STORED AS textfile which isn’t the most efficient serialization format. If a large amount of data is loaded and/or if the tables gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using a HiveOperator.

Parameters:
  • table (str) – target Hive table, use dot notation to target a specific database
  • create (bool) – whether to create the table if it doesn’t exist
  • recreate (bool) – whether to drop and recreate the table at every execution
  • partition (dict) – target partition as a dict of partition columns and values
  • delimiter (str) – field delimiter in the file
run_cli(hql, schema=None, verbose=True, hive_conf=None)[source]

Run an hql statement using the hive cli. If hive_conf is specified it should be a dict and the entries will be set as key/value pairs in HiveConf

Parameters:hive_conf (dict) – if specified these key value pairs will be passed to hive as -hiveconf "key"="value". Note that they will be passed after the hive_cli_params and thus will override whatever values are specified in the database.
>>> hh = HiveCliHook()
>>> result = hh.run_cli("USE airflow;")
>>> ("OK" in result)
True
test_hql(hql)[source]

Test an hql statement using the hive cli and EXPLAIN

class airflow.hooks.HiveMetastoreHook(metastore_conn_id='metastore_default')[source]

Bases: airflow.hooks.base_hook.BaseHook

Wrapper to interact with the Hive Metastore

check_for_named_partition(schema, table, partition_name)[source]

Checks whether a partition with a given name exists

Parameters:
  • schema (string) – Name of hive schema (database) @table belongs to
  • table – Name of hive table @partition belongs to
Partition:

Name of the partitions to check for (eg a=b/c=d)

Return type:

boolean

>>> hh = HiveMetastoreHook()
>>> t = 'static_babynames_partitioned'
>>> hh.check_for_named_partition('airflow', t, "ds=2015-01-01")
True
>>> hh.check_for_named_partition('airflow', t, "ds=xxx")
False
check_for_partition(schema, table, partition)[source]

Checks whether a partition exists

Parameters:
  • schema (string) – Name of hive schema (database) @table belongs to
  • table – Name of hive table @partition belongs to
Partition:

Expression that matches the partitions to check for (eg a = ‘b’ AND c = ‘d’)

Return type:

boolean

>>> hh = HiveMetastoreHook()
>>> t = 'static_babynames_partitioned'
>>> hh.check_for_partition('airflow', t, "ds='2015-01-01'")
True
get_databases(pattern='*')[source]

Get a metastore table object

get_metastore_client()[source]

Returns a Hive thrift client.

get_partitions(schema, table_name, filter=None)[source]

Returns a list of all partitions in a table. Works only for tables with less than 32767 (java short max val). For subpartitioned table, the number might easily exceed this.

>>> hh = HiveMetastoreHook()
>>> t = 'static_babynames_partitioned'
>>> parts = hh.get_partitions(schema='airflow', table_name=t)
>>> len(parts)
1
>>> parts
[{'ds': '2015-01-01'}]
get_table(table_name, db='default')[source]

Get a metastore table object

>>> hh = HiveMetastoreHook()
>>> t = hh.get_table(db='airflow', table_name='static_babynames')
>>> t.tableName
'static_babynames'
>>> [col.name for col in t.sd.cols]
['state', 'year', 'name', 'gender', 'num']
get_tables(db, pattern='*')[source]

Get a metastore table object

max_partition(schema, table_name, field=None, filter=None)[source]

Returns the maximum value for all partitions in a table. Works only for tables that have a single partition key. For subpartitioned table, we recommend using signal tables.

>>> hh = HiveMetastoreHook()
>>> t = 'static_babynames_partitioned'
>>> hh.max_partition(schema='airflow', table_name=t)
'2015-01-01'
table_exists(table_name, db='default')[source]

Check if table exists

>>> hh = HiveMetastoreHook()
>>> hh.table_exists(db='airflow', table_name='static_babynames')
True
>>> hh.table_exists(db='airflow', table_name='does_not_exist')
False
class airflow.hooks.HiveServer2Hook(hiveserver2_conn_id='hiveserver2_default')[source]

Bases: airflow.hooks.base_hook.BaseHook

Wrapper around the impyla library

Note that the default authMechanism is PLAIN, to override it you can specify it in the extra of your connection in the UI as in

get_pandas_df(hql, schema='default')[source]

Get a pandas dataframe from a Hive query

>>> hh = HiveServer2Hook()
>>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100"
>>> df = hh.get_pandas_df(sql)
>>> len(df.index)
100
get_records(hql, schema='default')[source]

Get a set of records from a Hive query.

>>> hh = HiveServer2Hook()
>>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100"
>>> len(hh.get_records(sql))
100
class airflow.hooks.HttpHook(method='POST', http_conn_id='http_default')[source]

Bases: airflow.hooks.base_hook.BaseHook

Interact with HTTP servers.

get_conn(headers)[source]

Returns http session for use with requests

run(endpoint, data=None, headers=None, extra_options=None)[source]

Performs the request

run_and_check(session, prepped_request, extra_options)[source]

Grabs extra options like timeout and actually runs the request, checking for the result

class airflow.hooks.DruidHook(druid_query_conn_id='druid_query_default', druid_ingest_conn_id='druid_ingest_default')[source]

Bases: airflow.hooks.base_hook.BaseHook

Interact with druid.

construct_ingest_query(datasource, static_path, ts_dim, columns, metric_spec, intervals, num_shards, target_partition_size, hadoop_dependency_coordinates=None)[source]

Builds an ingest query for an HDFS TSV load.

Parameters:
  • datasource – target datasource in druid
  • columns – list of all columns in the TSV, in the right order
get_conn()[source]

Returns a druid connection object for query

load_from_hdfs(datasource, static_path, ts_dim, columns, intervals, num_shards, target_partition_size, metric_spec=None, hadoop_dependency_coordinates=None)[source]

load data to druid from hdfs

Parameters:
  • ts_dim – The column name to use as a timestamp
  • metric_spec – A list of dictionaries
class airflow.hooks.MsSqlHook(*args, **kwargs)[source]

Bases: airflow.hooks.dbapi_hook.DbApiHook

Interact with Microsoft SQL Server.

get_conn()[source]

Returns a mssql connection object

class airflow.hooks.MySqlHook(*args, **kwargs)[source]

Bases: airflow.hooks.dbapi_hook.DbApiHook

Interact with MySQL.

You can specify charset in the extra field of your connection as {"charset": "utf8"}. Also you can choose cursor as {"cursor": "SSCursor"}. Refer to the MySQLdb.cursors for more details.

bulk_load(table, tmp_file)[source]

Loads a tab-delimited file into a database table

get_conn()[source]

Returns a mysql connection object

class airflow.hooks.PostgresHook(*args, **kwargs)[source]

Bases: airflow.hooks.dbapi_hook.DbApiHook

Interact with Postgres. You can specify ssl parameters in the extra field of your connection as {"sslmode": "require", "sslcert": "/path/to/cert.pem", etc}.

class airflow.hooks.PrestoHook(*args, **kwargs)[source]

Bases: airflow.hooks.dbapi_hook.DbApiHook

Interact with Presto through PyHive!

>>> ph = PrestoHook()
>>> sql = "SELECT count(1) AS num FROM airflow.static_babynames"
>>> ph.get_records(sql)
[[340698]]
get_conn()[source]

Returns a connection object

get_first(hql, parameters=None)[source]

Returns only the first row, regardless of how many rows the query returns.

get_pandas_df(hql, parameters=None)[source]

Get a pandas dataframe from a sql query.

get_records(hql, parameters=None)[source]

Get a set of records from Presto

run(hql, parameters=None)[source]

Execute the statement against Presto. Can be used to create views.

class airflow.hooks.S3Hook(s3_conn_id='s3_default')[source]

Bases: airflow.hooks.base_hook.BaseHook

Interact with S3. This class is a wrapper around the boto library.

check_for_bucket(bucket_name)[source]

Check if bucket_name exists.

Parameters:bucket_name (str) – the name of the bucket
check_for_key(key, bucket_name=None)[source]

Checks that a key exists in a bucket

check_for_prefix(bucket_name, prefix, delimiter)[source]

Checks that a prefix exists in a bucket

check_for_wildcard_key(wildcard_key, bucket_name=None, delimiter='')[source]

Checks that a key matching a wildcard expression exists in a bucket

get_bucket(bucket_name)[source]

Returns a boto.s3.bucket.Bucket object

Parameters:bucket_name (str) – the name of the bucket
get_conn()[source]

Returns the boto S3Connection object.

get_key(key, bucket_name=None)[source]

Returns a boto.s3.key.Key object

Parameters:
  • key (str) – the path to the key
  • bucket_name (str) – the name of the bucket
get_wildcard_key(wildcard_key, bucket_name=None, delimiter='')[source]

Returns a boto.s3.key.Key object matching the regular expression

Parameters:
  • regex_key (str) – the path to the key
  • bucket_name (str) – the name of the bucket
list_keys(bucket_name, prefix='', delimiter='')[source]

Lists keys in a bucket under prefix and not containing delimiter

Parameters:
  • bucket_name (str) – the name of the bucket
  • prefix (str) – a key prefix
  • delimiter (str) – the delimiter marks key hierarchy.
list_prefixes(bucket_name, prefix='', delimiter='')[source]

Lists prefixes in a bucket under prefix

Parameters:
  • bucket_name (str) – the name of the bucket
  • prefix (str) – a key prefix
  • delimiter (str) – the delimiter marks key hierarchy.
load_file(filename, key, bucket_name=None, replace=False, multipart_bytes=5368709120)[source]

Loads a local file to S3

Parameters:
  • filename (str) – name of the file to load.
  • key (str) – S3 key that will point to the file
  • bucket_name (str) – Name of the bucket in which to store the file
  • replace (bool) – A flag to decide whether or not to overwrite the key if it already exists. If replace is False and the key exists, an error will be raised.
  • multipart_bytes (int) – If provided, the file is uploaded in parts of this size (minimum 5242880). The default value is 5GB, since S3 cannot accept non-multipart uploads for files larger than 5GB. If the file is smaller than the specified limit, the option will be ignored.
load_string(string_data, key, bucket_name=None, replace=False, encrypt=False)[source]

Loads a local file to S3

This is provided as a convenience to drop a file in S3. It uses the boto infrastructure to ship a file to s3. It is currently using only a single part download, and should not be used to move large files.

Parameters:
  • string_data (str) – string to set as content for the key.
  • key (str) – S3 key that will point to the file
  • bucket_name (str) – Name of the bucket in which to store the file
  • replace (bool) – A flag to decide whether or not to overwrite the key if it already exists
class airflow.hooks.SqliteHook(*args, **kwargs)[source]

Bases: airflow.hooks.dbapi_hook.DbApiHook

Interact with SQLite.

get_conn()[source]

Returns a sqlite connection object

class airflow.hooks.WebHDFSHook(webhdfs_conn_id='webhdfs_default', proxy_user=None)[source]

Bases: airflow.hooks.base_hook.BaseHook

Interact with HDFS. This class is a wrapper around the hdfscli library.

check_for_path(hdfs_path)[source]

Check for the existence of a path in HDFS by querying FileStatus.

get_conn()[source]

Returns a hdfscli InsecureClient object.

load_file(source, destination, overwrite=True, parallelism=1, **kwargs)[source]

Uploads a file to HDFS

Parameters:
  • source (str) – Local path to file or folder. If a folder, all the files inside of it will be uploaded (note that this implies that folders empty of files will not be created remotely).
  • destination (str) – PTarget HDFS path. If it already exists and is a directory, files will be uploaded inside.
  • overwrite (bool) – Overwrite any existing file or directory.
  • parallelism (int) – Number of threads to use for parallelization. A value of 0 (or negative) uses as many threads as there are files.
  • **kwargs – Keyword arguments forwarded to upload().

Community contributed hooks

Importer that dynamically loads a class and module from its parent. This allows Airflow to support from airflow.operators import BashOperator even though BashOperator is actually in airflow.operators.bash_operator.

The importer also takes over for the parent_module by wrapping it. This is required to support attribute-based usage:

from airflow import operators
operators.BashOperator(...)
class airflow.contrib.hooks.BigQueryHook(bigquery_conn_id='bigquery_default', delegate_to=None)[source]

Bases: airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook, airflow.hooks.dbapi_hook.DbApiHook

Interact with BigQuery. This hook uses the Google Cloud Platform connection.

get_conn()[source]

Returns a BigQuery PEP 249 connection object.

get_pandas_df(bql, parameters=None)[source]

Returns a Pandas DataFrame for the results produced by a BigQuery query. The DbApiHook method must be overridden because Pandas doesn’t support PEP 249 connections, except for SQLite. See:

https://github.com/pydata/pandas/blob/master/pandas/io/sql.py#L447 https://github.com/pydata/pandas/issues/6900

Parameters:bql (string) – The BigQuery SQL to execute.
get_service()[source]

Returns a BigQuery service object.

insert_rows(table, rows, target_fields=None, commit_every=1000)[source]

Insertion is currently unsupported. Theoretically, you could use BigQuery’s streaming API to insert rows into a table, but this hasn’t been implemented.

class airflow.contrib.hooks.GoogleCloudStorageHook(google_cloud_storage_conn_id='google_cloud_storage_default', delegate_to=None)[source]

Bases: airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook

Interact with Google Cloud Storage. This hook uses the Google Cloud Platform connection.

download(bucket, object, filename=False)[source]

Get a file from Google Cloud Storage.

Parameters:
  • bucket (string) – The bucket to fetch from.
  • object (string) – The object to fetch.
  • filename (string) – If set, a local file path where the file should be written to.
get_conn()[source]

Returns a Google Cloud Storage service object.

upload(bucket, object, filename, mime_type='application/octet-stream')[source]

Uploads a local file to Google Cloud Storage.

Parameters:
  • bucket (string) – The bucket to upload to.
  • object (string) – The object name to set when uploading the local file.
  • filename (string) – The local file path to the file to be uploaded.
  • mime_type (string) – The MIME type to set when uploading the file.
class airflow.contrib.hooks.VerticaHook(*args, **kwargs)[source]

Bases: airflow.hooks.dbapi_hook.DbApiHook

Interact with Vertica.

get_conn()[source]

Returns verticaql connection object

class airflow.contrib.hooks.FTPHook(ftp_conn_id='ftp_default')[source]

Bases: airflow.hooks.base_hook.BaseHook

Interact with FTP.

Errors that may occur throughout but should be handled downstream.

close_conn()[source]

Closes the connection. An error will occur if the connection wasn’t ever opened.

create_directory(path)[source]

Creates a directory on the remote system.

Parameters:path (str) – full path to the remote directory to create
delete_directory(path)[source]

Deletes a directory on the remote system.

Parameters:path (str) – full path to the remote directory to delete
delete_file(path)[source]

Removes a file on the FTP Server.

Parameters:path (str) – full path to the remote file
describe_directory(path)[source]

Returns a dictionary of {filename: {attributes}} for all files on the remote system (where the MLSD command is supported).

Parameters:path (str) – full path to the remote directory
get_conn()[source]

Returns a FTP connection object

list_directory(path, nlst=False)[source]

Returns a list of files on the remote system.

Parameters:path (str) – full path to the remote directory to list
rename(from_name, to_name)[source]

Rename a file.

Parameters:
  • from_name – rename file from name
  • to_name – rename file to name
retrieve_file(remote_full_path, local_full_path_or_buffer)[source]

Transfers the remote file to a local location.

If local_full_path_or_buffer is a string path, the file will be put at that location; if it is a file-like buffer, the file will be written to the buffer but not closed.

Parameters:
  • remote_full_path (str) – full path to the remote file
  • local_full_path_or_buffer – full path to the local file or a file-like buffer
store_file(remote_full_path, local_full_path_or_buffer)[source]

Transfers a local file to the remote location.

If local_full_path_or_buffer is a string path, the file will be read from that location; if it is a file-like buffer, the file will be read from the buffer but not closed.

Parameters:
  • remote_full_path (str) – full path to the remote file
  • local_full_path_or_buffer (str or file-like buffer) – full path to the local file or a file-like buffer
class airflow.contrib.hooks.SSHHook(conn_id='ssh_default')[source]

Bases: airflow.hooks.base_hook.BaseHook

Light-weight remote execution library and utilities.

Using this hook (which is just a convenience wrapper for subprocess), is created to let you stream data from a remotely stored file.

As a bonus, SSHHook also provides a really cool feature that let’s you set up ssh tunnels super easily using a python context manager (there is an example in the integration part of unittests).

Parameters:
  • key_file (str) – Typically the SSHHook uses the keys that are used by the user airflow is running under. This sets the behavior to use another file instead.
  • connect_timeout (int) – sets the connection timeout for this connection.
  • no_host_key_check (bool) – whether to check to host key. If True host keys will not be checked, but are also not stored in the current users’s known_hosts file.
  • tty (bool) – allocate a tty.
  • sshpass (bool) – Use to non-interactively perform password authentication by using sshpass.
Popen(cmd, **kwargs)[source]

Remote Popen

Parameters:
  • cmd – command to remotely execute
  • kwargs – extra arguments to Popen (see subprocess.Popen)
Returns:

handle to subprocess

check_output(cmd)[source]

Executes a remote command and returns the stdout a remote process. Simplified version of Popen when you only want the output as a string and detect any errors.

Parameters:cmd – command to remotely execute
Returns:stdout
tunnel(*args, **kwds)[source]

Creates a tunnel between two hosts. Like ssh -L <LOCAL_PORT>:host:<REMOTE_PORT>. Remember to close() the returned “tunnel” object in order to clean up after yourself when you are done with the tunnel.

Parameters:
  • local_port (int) –
  • remote_port (int) –
  • remote_host (str) –
Returns:

class airflow.contrib.hooks.CloudantHook(cloudant_conn_id='cloudant_default')[source]

Bases: airflow.hooks.base_hook.BaseHook

Interact with Cloudant.

This class is a thin wrapper around the cloudant python library. See the documentation here.

db()[source]

Returns the Database object for this hook.

See the documentation for cloudant-python here https://github.com/cloudant-labs/cloudant-python.

class airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook(google_cloud_storage_conn_id='google_cloud_storage_default', delegate_to=None)[source]

Interact with Google Cloud Storage. This hook uses the Google Cloud Platform connection.

Executors

Executors are the mechanism by which task instances get run.

class airflow.executors.LocalExecutor(parallelism=16)[source]

Bases: airflow.executors.base_executor.BaseExecutor

LocalExecutor executes tasks locally in parallel. It uses the multiprocessing Python library and queues to parallelize the execution of tasks.

class airflow.executors.CeleryExecutor(parallelism=16)[source]

Bases: airflow.executors.base_executor.BaseExecutor

CeleryExecutor is recommended for production use of Airflow. It allows distributing the execution of task instances to multiple worker nodes.

Celery is a simple, flexible and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.

class airflow.executors.SequentialExecutor[source]

Bases: airflow.executors.base_executor.BaseExecutor

This executor will only run one task instance at a time, can be used for debugging. It is also the only executor that can be used with sqlite since sqlite doesn’t support multiple connections.

Since we want airflow to work out of the box, it defaults to this SequentialExecutor alongside sqlite as you first install it.

Community-contributed executors

class airflow.contrib.executors.mesos_executor.MesosExecutor(parallelism=16)[source]

MesosExecutor allows distributing the execution of task instances to multiple mesos workers.

Apache Mesos is a distributed systems kernel which abstracts CPU, memory, storage, and other compute resources away from machines (physical or virtual), enabling fault-tolerant and elastic distributed systems to easily be built and run effectively. See http://mesos.apache.org/