Welcome to pydruid’s documentation!

pydruid exposes a simple API to create, execute, and analyze Druid queries. pydruid can parse query results into Pandas DataFrame objects for subsequent data analysis, which offers a tight integration between Druid, the SciPy stack (for scientific computing) and scikit-learn (for machine learning). Additionally, pydruid can export query results into TSV or JSON for further processing with your favorite tool, e.g., R, Julia, Matlab, or Excel.

Below is a reference for the PyDruid class, describing the functions to use for querying and exporting, complete with examples. For additional examples, see the pydruid README.

pydruid.client module

class pydruid.client.BaseDruidClient(url, endpoint)

Bases: object

export_pandas()

Export the current query result to a Pandas DataFrame object.

Deprecated since version Use: Query.export_pandas() method instead

export_tsv(dest_path)

Export the current query result to a tsv file.

Deprecated since version Use: Query.export_tsv() method instead.

groupby(**kwargs)

A group-by query groups a results set (the requested aggregate metrics) by the specified dimension(s).

Required key/value pairs:

Parameters:
  • datasource (str) – Data source to query
  • granularity (str) – Time bucket to aggregate data by hour, day, minute, etc.,
  • intervals (str or list) – ISO-8601 intervals for which to run the query on
  • aggregations (dict) – A map from aggregator name to one of the pydruid.utils.aggregators e.g., doublesum
  • dimensions (list) – The dimensions to group by
Returns:

The query result

Return type:

Query

Optional key/value pairs:

Parameters:
  • filter (pydruid.utils.filters.Filter) – Indicates which rows of data to include in the query
  • having (pydruid.utils.having.Having) – Indicates which groups in results set of query to keep
  • post_aggregations – A dict with string key = ‘post_aggregator_name’, and value pydruid.utils.PostAggregator
  • context (dict) – A dict of query context options
  • limit_spec (dict) – A dict of parameters defining how to limit the rows returned, as specified in the Druid api documentation

Example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
    >>> group = client.groupby(
            datasource='twitterstream',
            granularity='hour',
            intervals='2013-10-04/pt1h',
            dimensions=["user_name", "reply_to_name"],
            filter=~(Dimension("reply_to_name") == "Not A Reply"),
            aggregations={"count": doublesum("count")},
            context={"timeout": 1000}
            limit_spec={
                "type": "default",
                "limit": 50,
                "columns" : ["count"]
            }
        )
    >>> for k in range(2):
        ...     print group[k]
    >>> {'timestamp': '2013-10-04T00:00:00.000Z', 'version': 'v1', 'event': {'count': 1.0, 'user_name': 'user_1', 'reply_to_name': 'user_2'}}
    >>> {'timestamp': '2013-10-04T00:00:00.000Z', 'version': 'v1', 'event': {'count': 1.0, 'user_name': 'user_2', 'reply_to_name': 'user_3'}}
segment_metadata(**kwargs)

A segment meta-data query returns per segment information about:

  • Cardinality of all the columns present
  • Column type
  • Estimated size in bytes
  • Estimated size in bytes of each column
  • Interval the segment covers
  • Segment ID

Required key/value pairs:

Parameters:
  • datasource (str) – Data source to query
  • intervals (str or list) – ISO-8601 intervals for which to run the query on

Optional key/value pairs:

Parameters:context (dict) – A dict of query context options
Returns:The query result
Return type:Query

Example:

1
2
3
4
5
    >>> meta = client.segment_metadata(datasource='twitterstream', intervals = '2013-10-04/pt1h')
    >>> print meta[0].keys()
    >>> ['intervals', 'id', 'columns', 'size']
    >>> print meta[0]['columns']['tweet_length']
    >>> {'errorMessage': None, 'cardinality': None, 'type': 'FLOAT', 'size': 30908008}
select(**kwargs)

A select query returns raw Druid rows and supports pagination.

Required key/value pairs:

Parameters:
  • datasource (str) – Data source to query
  • granularity (str) – Time bucket to aggregate data by hour, day, minute, etc.
  • paging_spec (dict) – Indicates offsets into different scanned segments
  • intervals (str or list) – ISO-8601 intervals for which to run the query on

Optional key/value pairs:

Parameters:
  • filter (pydruid.utils.filters.Filter) – Indicates which rows of data to include in the query
  • dimensions (list) – The list of dimensions to select. If left empty, all dimensions are returned
  • metrics (list) – The list of metrics to select. If left empty, all metrics are returned
  • context (dict) – A dict of query context options
Returns:

The query result

Return type:

Query

Example:

1
2
3
4
5
6
7
8
9
    >>> raw_data = client.select(
            datasource=twitterstream,
            granularity='all',
            intervals='2013-06-14/pt1h',
            paging_spec={'pagingIdentifies': {}, 'threshold': 1},
            context={"timeout": 1000}
        )
    >>> print raw_data
    >>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': {'pagingIdentifiers': {'twitterstream_2013-06-14T00:00:00.000Z_2013-06-15T00:00:00.000Z_2013-06-15T08:00:00.000Z_v1': 1, 'events': [{'segmentId': 'twitterstream_2013-06-14T00:00:00.000Z_2013-06-15T00:00:00.000Z_2013-06-15T08:00:00.000Z_v1', 'offset': 0, 'event': {'timestamp': '2013-06-14T00:00:00.000Z', 'dim': 'value'}}]}}]
time_boundary(**kwargs)

A time boundary query returns the min and max timestamps present in a data source.

Required key/value pairs:

Parameters:datasource (str) – Data source to query

Optional key/value pairs:

Parameters:context (dict) – A dict of query context options
Returns:The query result
Return type:Query

Example:

1
2
3
    >>> bound = client.time_boundary(datasource='twitterstream')
    >>> print bound
    >>> [{'timestamp': '2011-09-14T15:00:00.000Z', 'result': {'minTime': '2011-09-14T15:00:00.000Z', 'maxTime': '2014-03-04T23:44:00.000Z'}}]
timeseries(**kwargs)

A timeseries query returns the values of the requested metrics (in aggregate) for each timestamp.

Required key/value pairs:

Parameters:
  • datasource (str) – Data source to query
  • granularity (str) – Time bucket to aggregate data by hour, day, minute, etc.,
  • intervals (str or list) – ISO-8601 intervals for which to run the query on
  • aggregations (dict) – A map from aggregator name to one of the pydruid.utils.aggregators e.g., doublesum
Returns:

The query result

Return type:

Query

Optional key/value pairs:

Parameters:
  • filter (pydruid.utils.filters.Filter) – Indicates which rows of data to include in the query
  • post_aggregations – A dict with string key = ‘post_aggregator_name’, and value pydruid.utils.PostAggregator
  • context (dict) – A dict of query context options

Example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    >>> counts = client.timeseries(
            datasource=twitterstream,
            granularity='hour',
            intervals='2013-06-14/pt1h',
            aggregations={"count": doublesum("count"), "rows": count("rows")},
            post_aggregations={'percent': (Field('count') / Field('rows')) * Const(100))},
            context={"timeout": 1000}
        )
    >>> print counts
    >>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': {'count': 9619.0, 'rows': 8007, 'percent': 120.13238416385663}}]
topn(**kwargs)

A TopN query returns a set of the values in a given dimension, sorted by a specified metric. Conceptually, a topN can be thought of as an approximate GroupByQuery over a single dimension with an Ordering spec. TopNs are faster and more resource efficient than GroupBy for this use case.

Required key/value pairs:

Parameters:
  • datasource (str) – Data source to query
  • granularity (str) – Aggregate data by hour, day, minute, etc.,
  • intervals (str or list) – ISO-8601 intervals of data to query
  • aggregations (dict) – A map from aggregator name to one of the pydruid.utils.aggregators e.g., doublesum
  • dimension (str) – Dimension to run the query against
  • metric (str) – Metric over which to sort the specified dimension by
  • threshold (int) – How many of the top items to return
Returns:

The query result

Return type:

Query

Optional key/value pairs:

Parameters:
  • filter (pydruid.utils.filters.Filter) – Indicates which rows of data to include in the query
  • post_aggregations – A dict with string key = ‘post_aggregator_name’, and value pydruid.utils.PostAggregator
  • context (dict) – A dict of query context options

Example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    >>> top = client.topn(
                datasource='twitterstream',
                granularity='all',
                intervals='2013-06-14/pt1h',
                aggregations={"count": doublesum("count")},
                dimension='user_name',
                metric='count',
                filter=Dimension('user_lang') == 'en',
                threshold=1,
                context={"timeout": 1000}
            )
    >>> print top
    >>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': [{'count': 22.0, 'user': "cool_user"}}]}]
class pydruid.client.PyDruid(url, endpoint)

Bases: pydruid.client.BaseDruidClient

PyDruid contains the functions for creating and executing Druid queries. Returns Query objects that can be used for exporting query results into TSV files or pandas.DataFrame objects for subsequent analysis.

Parameters:
  • url (str) – URL of Broker node in the Druid cluster
  • endpoint (str) – Endpoint that Broker listens for queries on

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
    >>> from pydruid.client import *

    >>> query = PyDruid('http://localhost:8083', 'druid/v2/')

    >>> top = query.topn(
            datasource='twitterstream',
            granularity='all',
            intervals='2013-10-04/pt1h',
            aggregations={"count": doublesum("count")},
            dimension='user_name',
            filter = Dimension('user_lang') == 'en',
            metric='count',
            threshold=2
        )

    >>> print json.dumps(top.query_dict, indent=2)
    >>> {
          "metric": "count",
          "aggregations": [
            {
              "type": "doubleSum",
              "fieldName": "count",
              "name": "count"
            }
          ],
          "dimension": "user_name",
          "filter": {
            "type": "selector",
            "dimension": "user_lang",
            "value": "en"
          },
          "intervals": "2013-10-04/pt1h",
          "dataSource": "twitterstream",
          "granularity": "all",
          "threshold": 2,
          "queryType": "topN"
        }

    >>> print top.result
    >>> [{'timestamp': '2013-10-04T00:00:00.000Z',
        'result': [{'count': 7.0, 'user_name': 'user_1'}, {'count': 6.0, 'user_name': 'user_2'}]}]

    >>> df = top.export_pandas()
    >>> print df
    >>>    count                 timestamp      user_name
        0      7  2013-10-04T00:00:00.000Z         user_1
        1      6  2013-10-04T00:00:00.000Z         user_2

pydruid.async_client module

class pydruid.async_client.AsyncPyDruid(url, endpoint)

Bases: pydruid.client.BaseDruidClient

Asynchronous PyDruid client which mirrors functionality of the synchronous PyDruid, but it executes queries asynchronously (using an asynchronous http client from Tornado framework).

Returns Query objects that can be used for exporting query results into TSV files or pandas.DataFrame objects for subsequent analysis.

Parameters:
  • url (str) – URL of Broker node in the Druid cluster
  • endpoint (str) – Endpoint that Broker listens for queries on

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
    >>> from pydruid.async_client import *

    >>> query = AsyncPyDruid('http://localhost:8083', 'druid/v2/')

    >>> top = yield query.topn(
            datasource='twitterstream',
            granularity='all',
            intervals='2013-10-04/pt1h',
            aggregations={"count": doublesum("count")},
            dimension='user_name',
            filter = Dimension('user_lang') == 'en',
            metric='count',
            threshold=2
        )

    >>> print json.dumps(top.query_dict, indent=2)
    >>> {
          "metric": "count",
          "aggregations": [
            {
              "type": "doubleSum",
              "fieldName": "count",
              "name": "count"
            }
          ],
          "dimension": "user_name",
          "filter": {
            "type": "selector",
            "dimension": "user_lang",
            "value": "en"
          },
          "intervals": "2013-10-04/pt1h",
          "dataSource": "twitterstream",
          "granularity": "all",
          "threshold": 2,
          "queryType": "topN"
        }

    >>> print top.result
    >>> [{'timestamp': '2013-10-04T00:00:00.000Z',
        'result': [{'count': 7.0, 'user_name': 'user_1'}, {'count': 6.0, 'user_name': 'user_2'}]}]

    >>> df = top.export_pandas()
    >>> print df
    >>>    count                 timestamp      user_name
        0      7  2013-10-04T00:00:00.000Z         user_1
        1      6  2013-10-04T00:00:00.000Z         user_2
groupby(**kwargs)
segment_metadata(**kwargs)
select(**kwargs)
time_boundary(**kwargs)
timeseries(**kwargs)
topn(**kwargs)

pydruid.query module

class pydruid.query.Query(query_dict, query_type)

Bases: collections.abc.MutableSequence

Query objects are produced by PyDruid clients and can be used for exporting query results into TSV files or pandas.DataFrame objects for subsequent analysis. They also hold information about the issued query.

Query acts as a wrapper over raw result list of dictionaries.

Variables:
  • result_json (str) – JSON object representing a query result. Initial value: None
  • result (list) – Query result parsed into a list of dicts. Initial value: None
  • query_type (str) – Name of most recently run query, e.g., topN. Initial value: None
  • query_dict (dict) – JSON object representing the query. Initial value: None
export_pandas()

Export the current query result to a Pandas DataFrame object.

Returns:The DataFrame representing the query result
Return type:DataFrame
Raises:NotImplementedError

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
    >>> top = client.topn(
            datasource='twitterstream',
            granularity='all',
            intervals='2013-10-04/pt1h',
            aggregations={"count": doublesum("count")},
            dimension='user_name',
            filter = Dimension('user_lang') == 'en',
            metric='count',
            threshold=2
        )

    >>> df = top.export_pandas()
    >>> print df
    >>>    count                 timestamp      user_name
        0      7  2013-10-04T00:00:00.000Z         user_1
        1      6  2013-10-04T00:00:00.000Z         user_2
export_tsv(dest_path)

Export the current query result to a tsv file.

Parameters:dest_path (str) – file to write query results to
Raises:NotImplementedError

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
    >>> top = client.topn(
            datasource='twitterstream',
            granularity='all',
            intervals='2013-10-04/pt1h',
            aggregations={"count": doublesum("count")},
            dimension='user_name',
            filter = Dimension('user_lang') == 'en',
            metric='count',
            threshold=2
        )

    >>> top.export_tsv('top.tsv')
    >>> !cat top.tsv
    >>> count       user_name       timestamp
        7.0 user_1  2013-10-04T00:00:00.000Z
        6.0 user_2  2013-10-04T00:00:00.000Z
insert(index, value)
parse(data)
class pydruid.query.QueryBuilder

Bases: object

build_query(query_type, args)

Build query based on given query type and arguments.

Parameters:
  • query_type (string) – a type of query
  • args (dict) – the dict of args to be sent
Returns:

the resulting query

Return type:

Query

groupby(args)

A group-by query groups a results set (the requested aggregate metrics) by the specified dimension(s).

Parameters:args (dict) – dict of args
Returns:group by query
Return type:Query
search(args)

A search query returns dimension values that match the search specification.

Parameters:args (dict) – dict of args
Returns:search query
Return type:Query
segment_metadata(args)
  • Column type
  • Estimated size in bytes
  • Estimated size in bytes of each column
  • Interval the segment covers
  • Segment ID
Parameters:args (dict) – dict of args
Returns:segment metadata query
Return type:Query
select(args)

A select query returns raw Druid rows and supports pagination.

Parameters:args (dict) – dict of args
Returns:select query
Return type:Query
time_boundary(args)

A time boundary query returns the min and max timestamps present in a data source.

Parameters:args (dict) – dict of args
Returns:time boundary query
Return type:Query
timeseries(args)

A timeseries query returns the values of the requested metrics (in aggregate) for each timestamp.

Parameters:args (dict) – dict of args
Returns:timeseries query
Return type:Query
topn(args)

A TopN query returns a set of the values in a given dimension, sorted by a specified metric. Conceptually, a topN can be thought of as an approximate GroupByQuery over a single dimension with an Ordering spec. TopNs are faster and more resource efficient than GroupBy for this use case.

Parameters:args (dict) – dict of arguments
Returns:topn query
Return type:Query
static validate_query(query_type, valid_parts, args)

Validate the query parts so only allowed objects are sent.

Each query type can have an optional ‘context’ object attached which is used to set certain query context settings, etc. timeout or priority. As each query can have this object, there’s no need for it to be sent - it might as well be added here.

Parameters:
  • query_type (string) – a type of query
  • valid_parts (list) – a list of valid object names
  • args (dict) – the dict of args to be sent
Raises:

ValueError – if an invalid object is given

pydruid.utils.aggregators module

pydruid.utils.aggregators.build_aggregators(agg_input)
pydruid.utils.aggregators.cardinality(raw_column, by_row=False)
pydruid.utils.aggregators.count(raw_metric)
pydruid.utils.aggregators.doublesum(raw_metric)
pydruid.utils.aggregators.filtered(filter, agg)
pydruid.utils.aggregators.hyperunique(raw_metric)
pydruid.utils.aggregators.javascript(columns_list, fn_aggregate, fn_combine, fn_reset)
pydruid.utils.aggregators.longsum(raw_metric)
pydruid.utils.aggregators.max(raw_metric)
pydruid.utils.aggregators.min(raw_metric)

pydruid.utils.dimensions module

class pydruid.utils.dimensions.BaseRegexExtraction(expr)

Bases: pydruid.utils.dimensions.ExtractionFunction

build()
class pydruid.utils.dimensions.DimensionSpec(dimension, output_name, extraction_function=None)

Bases: object

build()
class pydruid.utils.dimensions.ExtractionFunction

Bases: object

build()
extraction_type = None
class pydruid.utils.dimensions.JavascriptExtraction(func, injective=False)

Bases: pydruid.utils.dimensions.ExtractionFunction

build()
extraction_type = 'javascript'
class pydruid.utils.dimensions.LookupExtraction(retain_missing_values=False, replace_missing_values=None, injective=False)

Bases: pydruid.utils.dimensions.ExtractionFunction

build()
build_lookup()
extraction_type = 'lookup'
lookup_type = None
class pydruid.utils.dimensions.MapLookupExtraction(mapping, **kwargs)

Bases: pydruid.utils.dimensions.LookupExtraction

build_lookup()
lookup_type = 'map'
class pydruid.utils.dimensions.NamespaceLookupExtraction(namespace, **kwargs)

Bases: pydruid.utils.dimensions.LookupExtraction

build_lookup()
lookup_type = 'namespace'
class pydruid.utils.dimensions.PartialExtraction(expr)

Bases: pydruid.utils.dimensions.BaseRegexExtraction

extraction_type = 'partial'
class pydruid.utils.dimensions.RegexExtraction(expr)

Bases: pydruid.utils.dimensions.BaseRegexExtraction

extraction_type = 'regex'
class pydruid.utils.dimensions.TimeFormatExtraction(format, locale=None, time_zone=None)

Bases: pydruid.utils.dimensions.ExtractionFunction

build()
extraction_type = 'timeFormat'
pydruid.utils.dimensions.build_dimension(dim)

pydruid.utils.filters module

class pydruid.utils.filters.Bound(dimension, lower, upper, lowerStrict=False, upperStrict=False, alphaNumeric=False)

Bases: pydruid.utils.filters.Filter

Bound filter can be used to filter by comparing dimension values to an upper value or/and a lower value.

Variables:
  • dimension (str) – Dimension to filter on.
  • lower (str) – Lower bound.
  • upper (str) – Upper bound.
  • lowerStrict (bool) – Strict lower inclusion. Initial value: False
  • upperStrict (bool) – Strict upper inclusion. Initial value: False
  • alphaNumeric (bool) – Numeric comparison. Initial value: False
class pydruid.utils.filters.Dimension(dim)

Bases: object

class pydruid.utils.filters.Filter(**args)

Bases: object

static build_filter(filter_obj)
show()
class pydruid.utils.filters.JavaScript(dim)

Bases: object

pydruid.utils.having module

class pydruid.utils.having.Aggregation(agg)

Bases: object

class pydruid.utils.having.Having(**args)

Bases: object

static build_having(having_obj)
show()

pydruid.utils.postaggregator module

class pydruid.utils.postaggregator.Const(value, output_name=None)

Bases: pydruid.utils.postaggregator.Postaggregator

class pydruid.utils.postaggregator.Field(name)

Bases: pydruid.utils.postaggregator.Postaggregator

class pydruid.utils.postaggregator.HyperUniqueCardinality(name)

Bases: pydruid.utils.postaggregator.Postaggregator

class pydruid.utils.postaggregator.Postaggregator(fn, fields, name)

Bases: object

static build_post_aggregators(postaggs)
fields(other)
class pydruid.utils.postaggregator.Quantile(name, probability)

Bases: pydruid.utils.postaggregator.Postaggregator

class pydruid.utils.postaggregator.Quantiles(name, probabilities)

Bases: pydruid.utils.postaggregator.Postaggregator

Indices and tables