Welcome to pydruid’s documentation!¶
pydruid exposes a simple API to create, execute, and analyze Druid queries. pydruid can parse query results into Pandas DataFrame objects for subsequent data analysis, which offers a tight integration between Druid, the SciPy stack (for scientific computing) and scikit-learn (for machine learning). Additionally, pydruid can export query results into TSV or JSON for further processing with your favorite tool, e.g., R, Julia, Matlab, or Excel.
Below is a reference for the PyDruid class, describing the functions to use for querying and exporting, complete with examples. For additional examples, see the pydruid README.
pydruid.client module¶
-
class
pydruid.client.
BaseDruidClient
(url, endpoint)¶ Bases:
object
-
export_pandas
()¶ Export the current query result to a Pandas DataFrame object.
Deprecated since version Use: Query.export_pandas() method instead
-
export_tsv
(dest_path)¶ Export the current query result to a tsv file.
Deprecated since version Use: Query.export_tsv() method instead.
-
groupby
(**kwargs)¶ A group-by query groups a results set (the requested aggregate metrics) by the specified dimension(s).
Required key/value pairs:
Parameters: - datasource (str) – Data source to query
- granularity (str) – Time bucket to aggregate data by hour, day, minute, etc.,
- intervals (str or list) – ISO-8601 intervals for which to run the query on
- aggregations (dict) – A map from aggregator name to one of the pydruid.utils.aggregators e.g., doublesum
- dimensions (list) – The dimensions to group by
Returns: The query result
Return type: Optional key/value pairs:
Parameters: - filter (pydruid.utils.filters.Filter) – Indicates which rows of data to include in the query
- having (pydruid.utils.having.Having) – Indicates which groups in results set of query to keep
- post_aggregations – A dict with string key = ‘post_aggregator_name’, and value pydruid.utils.PostAggregator
- context (dict) – A dict of query context options
- limit_spec (dict) – A dict of parameters defining how to limit the rows returned, as specified in the Druid api documentation
Example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
>>> group = client.groupby( datasource='twitterstream', granularity='hour', intervals='2013-10-04/pt1h', dimensions=["user_name", "reply_to_name"], filter=~(Dimension("reply_to_name") == "Not A Reply"), aggregations={"count": doublesum("count")}, context={"timeout": 1000} limit_spec={ "type": "default", "limit": 50, "columns" : ["count"] } ) >>> for k in range(2): ... print group[k] >>> {'timestamp': '2013-10-04T00:00:00.000Z', 'version': 'v1', 'event': {'count': 1.0, 'user_name': 'user_1', 'reply_to_name': 'user_2'}} >>> {'timestamp': '2013-10-04T00:00:00.000Z', 'version': 'v1', 'event': {'count': 1.0, 'user_name': 'user_2', 'reply_to_name': 'user_3'}}
-
segment_metadata
(**kwargs)¶ A segment meta-data query returns per segment information about:
- Cardinality of all the columns present
- Column type
- Estimated size in bytes
- Estimated size in bytes of each column
- Interval the segment covers
- Segment ID
Required key/value pairs:
Parameters: - datasource (str) – Data source to query
- intervals (str or list) – ISO-8601 intervals for which to run the query on
Optional key/value pairs:
Parameters: context (dict) – A dict of query context options Returns: The query result Return type: Query Example:
1 2 3 4 5
>>> meta = client.segment_metadata(datasource='twitterstream', intervals = '2013-10-04/pt1h') >>> print meta[0].keys() >>> ['intervals', 'id', 'columns', 'size'] >>> print meta[0]['columns']['tweet_length'] >>> {'errorMessage': None, 'cardinality': None, 'type': 'FLOAT', 'size': 30908008}
-
select
(**kwargs)¶ A select query returns raw Druid rows and supports pagination.
Required key/value pairs:
Parameters: - datasource (str) – Data source to query
- granularity (str) – Time bucket to aggregate data by hour, day, minute, etc.
- paging_spec (dict) – Indicates offsets into different scanned segments
- intervals (str or list) – ISO-8601 intervals for which to run the query on
Optional key/value pairs:
Parameters: - filter (pydruid.utils.filters.Filter) – Indicates which rows of data to include in the query
- dimensions (list) – The list of dimensions to select. If left empty, all dimensions are returned
- metrics (list) – The list of metrics to select. If left empty, all metrics are returned
- context (dict) – A dict of query context options
Returns: The query result
Return type: Example:
1 2 3 4 5 6 7 8 9
>>> raw_data = client.select( datasource=twitterstream, granularity='all', intervals='2013-06-14/pt1h', paging_spec={'pagingIdentifies': {}, 'threshold': 1}, context={"timeout": 1000} ) >>> print raw_data >>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': {'pagingIdentifiers': {'twitterstream_2013-06-14T00:00:00.000Z_2013-06-15T00:00:00.000Z_2013-06-15T08:00:00.000Z_v1': 1, 'events': [{'segmentId': 'twitterstream_2013-06-14T00:00:00.000Z_2013-06-15T00:00:00.000Z_2013-06-15T08:00:00.000Z_v1', 'offset': 0, 'event': {'timestamp': '2013-06-14T00:00:00.000Z', 'dim': 'value'}}]}}]
-
time_boundary
(**kwargs)¶ A time boundary query returns the min and max timestamps present in a data source.
Required key/value pairs:
Parameters: datasource (str) – Data source to query Optional key/value pairs:
Parameters: context (dict) – A dict of query context options Returns: The query result Return type: Query Example:
1 2 3
>>> bound = client.time_boundary(datasource='twitterstream') >>> print bound >>> [{'timestamp': '2011-09-14T15:00:00.000Z', 'result': {'minTime': '2011-09-14T15:00:00.000Z', 'maxTime': '2014-03-04T23:44:00.000Z'}}]
-
timeseries
(**kwargs)¶ A timeseries query returns the values of the requested metrics (in aggregate) for each timestamp.
Required key/value pairs:
Parameters: - datasource (str) – Data source to query
- granularity (str) – Time bucket to aggregate data by hour, day, minute, etc.,
- intervals (str or list) – ISO-8601 intervals for which to run the query on
- aggregations (dict) – A map from aggregator name to one of the pydruid.utils.aggregators e.g., doublesum
Returns: The query result
Return type: Optional key/value pairs:
Parameters: - filter (pydruid.utils.filters.Filter) – Indicates which rows of data to include in the query
- post_aggregations – A dict with string key = ‘post_aggregator_name’, and value pydruid.utils.PostAggregator
- context (dict) – A dict of query context options
Example:
1 2 3 4 5 6 7 8 9 10
>>> counts = client.timeseries( datasource=twitterstream, granularity='hour', intervals='2013-06-14/pt1h', aggregations={"count": doublesum("count"), "rows": count("rows")}, post_aggregations={'percent': (Field('count') / Field('rows')) * Const(100))}, context={"timeout": 1000} ) >>> print counts >>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': {'count': 9619.0, 'rows': 8007, 'percent': 120.13238416385663}}]
-
topn
(**kwargs)¶ A TopN query returns a set of the values in a given dimension, sorted by a specified metric. Conceptually, a topN can be thought of as an approximate GroupByQuery over a single dimension with an Ordering spec. TopNs are faster and more resource efficient than GroupBy for this use case.
Required key/value pairs:
Parameters: - datasource (str) – Data source to query
- granularity (str) – Aggregate data by hour, day, minute, etc.,
- intervals (str or list) – ISO-8601 intervals of data to query
- aggregations (dict) – A map from aggregator name to one of the pydruid.utils.aggregators e.g., doublesum
- dimension (str) – Dimension to run the query against
- metric (str) – Metric over which to sort the specified dimension by
- threshold (int) – How many of the top items to return
Returns: The query result
Return type: Optional key/value pairs:
Parameters: - filter (pydruid.utils.filters.Filter) – Indicates which rows of data to include in the query
- post_aggregations – A dict with string key = ‘post_aggregator_name’, and value pydruid.utils.PostAggregator
- context (dict) – A dict of query context options
Example:
1 2 3 4 5 6 7 8 9 10 11 12 13
>>> top = client.topn( datasource='twitterstream', granularity='all', intervals='2013-06-14/pt1h', aggregations={"count": doublesum("count")}, dimension='user_name', metric='count', filter=Dimension('user_lang') == 'en', threshold=1, context={"timeout": 1000} ) >>> print top >>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': [{'count': 22.0, 'user': "cool_user"}}]}]
-
-
class
pydruid.client.
PyDruid
(url, endpoint)¶ Bases:
pydruid.client.BaseDruidClient
PyDruid contains the functions for creating and executing Druid queries. Returns Query objects that can be used for exporting query results into TSV files or pandas.DataFrame objects for subsequent analysis.
Parameters: - url (str) – URL of Broker node in the Druid cluster
- endpoint (str) – Endpoint that Broker listens for queries on
Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
>>> from pydruid.client import * >>> query = PyDruid('http://localhost:8083', 'druid/v2/') >>> top = query.topn( datasource='twitterstream', granularity='all', intervals='2013-10-04/pt1h', aggregations={"count": doublesum("count")}, dimension='user_name', filter = Dimension('user_lang') == 'en', metric='count', threshold=2 ) >>> print json.dumps(top.query_dict, indent=2) >>> { "metric": "count", "aggregations": [ { "type": "doubleSum", "fieldName": "count", "name": "count" } ], "dimension": "user_name", "filter": { "type": "selector", "dimension": "user_lang", "value": "en" }, "intervals": "2013-10-04/pt1h", "dataSource": "twitterstream", "granularity": "all", "threshold": 2, "queryType": "topN" } >>> print top.result >>> [{'timestamp': '2013-10-04T00:00:00.000Z', 'result': [{'count': 7.0, 'user_name': 'user_1'}, {'count': 6.0, 'user_name': 'user_2'}]}] >>> df = top.export_pandas() >>> print df >>> count timestamp user_name 0 7 2013-10-04T00:00:00.000Z user_1 1 6 2013-10-04T00:00:00.000Z user_2
pydruid.async_client module¶
-
class
pydruid.async_client.
AsyncPyDruid
(url, endpoint)¶ Bases:
pydruid.client.BaseDruidClient
Asynchronous PyDruid client which mirrors functionality of the synchronous PyDruid, but it executes queries asynchronously (using an asynchronous http client from Tornado framework).
Returns Query objects that can be used for exporting query results into TSV files or pandas.DataFrame objects for subsequent analysis.
Parameters: - url (str) – URL of Broker node in the Druid cluster
- endpoint (str) – Endpoint that Broker listens for queries on
Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
>>> from pydruid.async_client import * >>> query = AsyncPyDruid('http://localhost:8083', 'druid/v2/') >>> top = yield query.topn( datasource='twitterstream', granularity='all', intervals='2013-10-04/pt1h', aggregations={"count": doublesum("count")}, dimension='user_name', filter = Dimension('user_lang') == 'en', metric='count', threshold=2 ) >>> print json.dumps(top.query_dict, indent=2) >>> { "metric": "count", "aggregations": [ { "type": "doubleSum", "fieldName": "count", "name": "count" } ], "dimension": "user_name", "filter": { "type": "selector", "dimension": "user_lang", "value": "en" }, "intervals": "2013-10-04/pt1h", "dataSource": "twitterstream", "granularity": "all", "threshold": 2, "queryType": "topN" } >>> print top.result >>> [{'timestamp': '2013-10-04T00:00:00.000Z', 'result': [{'count': 7.0, 'user_name': 'user_1'}, {'count': 6.0, 'user_name': 'user_2'}]}] >>> df = top.export_pandas() >>> print df >>> count timestamp user_name 0 7 2013-10-04T00:00:00.000Z user_1 1 6 2013-10-04T00:00:00.000Z user_2
-
groupby
(**kwargs)¶
-
segment_metadata
(**kwargs)¶
-
select
(**kwargs)¶
-
time_boundary
(**kwargs)¶
-
timeseries
(**kwargs)¶
-
topn
(**kwargs)¶
pydruid.query module¶
-
class
pydruid.query.
Query
(query_dict, query_type)¶ Bases:
collections.abc.MutableSequence
Query objects are produced by PyDruid clients and can be used for exporting query results into TSV files or pandas.DataFrame objects for subsequent analysis. They also hold information about the issued query.
Query acts as a wrapper over raw result list of dictionaries.
Variables: - result_json (str) – JSON object representing a query result. Initial value: None
- result (list) – Query result parsed into a list of dicts. Initial value: None
- query_type (str) – Name of most recently run query, e.g., topN. Initial value: None
- query_dict (dict) – JSON object representing the query. Initial value: None
-
export_pandas
()¶ Export the current query result to a Pandas DataFrame object.
Returns: The DataFrame representing the query result Return type: DataFrame Raises: NotImplementedError – Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
>>> top = client.topn( datasource='twitterstream', granularity='all', intervals='2013-10-04/pt1h', aggregations={"count": doublesum("count")}, dimension='user_name', filter = Dimension('user_lang') == 'en', metric='count', threshold=2 ) >>> df = top.export_pandas() >>> print df >>> count timestamp user_name 0 7 2013-10-04T00:00:00.000Z user_1 1 6 2013-10-04T00:00:00.000Z user_2
-
export_tsv
(dest_path)¶ Export the current query result to a tsv file.
Parameters: dest_path (str) – file to write query results to Raises: NotImplementedError – Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
>>> top = client.topn( datasource='twitterstream', granularity='all', intervals='2013-10-04/pt1h', aggregations={"count": doublesum("count")}, dimension='user_name', filter = Dimension('user_lang') == 'en', metric='count', threshold=2 ) >>> top.export_tsv('top.tsv') >>> !cat top.tsv >>> count user_name timestamp 7.0 user_1 2013-10-04T00:00:00.000Z 6.0 user_2 2013-10-04T00:00:00.000Z
-
insert
(index, value)¶
-
parse
(data)¶
-
class
pydruid.query.
QueryBuilder
¶ Bases:
object
-
build_query
(query_type, args)¶ Build query based on given query type and arguments.
Parameters: - query_type (string) – a type of query
- args (dict) – the dict of args to be sent
Returns: the resulting query
Return type:
-
groupby
(args)¶ A group-by query groups a results set (the requested aggregate metrics) by the specified dimension(s).
Parameters: args (dict) – dict of args Returns: group by query Return type: Query
-
search
(args)¶ A search query returns dimension values that match the search specification.
Parameters: args (dict) – dict of args Returns: search query Return type: Query
-
segment_metadata
(args)¶ - Column type
- Estimated size in bytes
- Estimated size in bytes of each column
- Interval the segment covers
- Segment ID
Parameters: args (dict) – dict of args Returns: segment metadata query Return type: Query
-
select
(args)¶ A select query returns raw Druid rows and supports pagination.
Parameters: args (dict) – dict of args Returns: select query Return type: Query
-
time_boundary
(args)¶ A time boundary query returns the min and max timestamps present in a data source.
Parameters: args (dict) – dict of args Returns: time boundary query Return type: Query
-
timeseries
(args)¶ A timeseries query returns the values of the requested metrics (in aggregate) for each timestamp.
Parameters: args (dict) – dict of args Returns: timeseries query Return type: Query
-
topn
(args)¶ A TopN query returns a set of the values in a given dimension, sorted by a specified metric. Conceptually, a topN can be thought of as an approximate GroupByQuery over a single dimension with an Ordering spec. TopNs are faster and more resource efficient than GroupBy for this use case.
Parameters: args (dict) – dict of arguments Returns: topn query Return type: Query
-
static
validate_query
(query_type, valid_parts, args)¶ Validate the query parts so only allowed objects are sent.
Each query type can have an optional ‘context’ object attached which is used to set certain query context settings, etc. timeout or priority. As each query can have this object, there’s no need for it to be sent - it might as well be added here.
Parameters: - query_type (string) – a type of query
- valid_parts (list) – a list of valid object names
- args (dict) – the dict of args to be sent
Raises: ValueError – if an invalid object is given
-
pydruid.utils.aggregators module¶
-
pydruid.utils.aggregators.
build_aggregators
(agg_input)¶
-
pydruid.utils.aggregators.
cardinality
(raw_column, by_row=False)¶
-
pydruid.utils.aggregators.
count
(raw_metric)¶
-
pydruid.utils.aggregators.
doublesum
(raw_metric)¶
-
pydruid.utils.aggregators.
filtered
(filter, agg)¶
-
pydruid.utils.aggregators.
hyperunique
(raw_metric)¶
-
pydruid.utils.aggregators.
javascript
(columns_list, fn_aggregate, fn_combine, fn_reset)¶
-
pydruid.utils.aggregators.
longsum
(raw_metric)¶
-
pydruid.utils.aggregators.
max
(raw_metric)¶
-
pydruid.utils.aggregators.
min
(raw_metric)¶
pydruid.utils.dimensions module¶
-
class
pydruid.utils.dimensions.
BaseRegexExtraction
(expr)¶ Bases:
pydruid.utils.dimensions.ExtractionFunction
-
build
()¶
-
-
class
pydruid.utils.dimensions.
DimensionSpec
(dimension, output_name, extraction_function=None)¶ Bases:
object
-
build
()¶
-
-
class
pydruid.utils.dimensions.
JavascriptExtraction
(func, injective=False)¶ Bases:
pydruid.utils.dimensions.ExtractionFunction
-
build
()¶
-
extraction_type
= 'javascript'¶
-
-
class
pydruid.utils.dimensions.
LookupExtraction
(retain_missing_values=False, replace_missing_values=None, injective=False)¶ Bases:
pydruid.utils.dimensions.ExtractionFunction
-
build
()¶
-
build_lookup
()¶
-
extraction_type
= 'lookup'¶
-
lookup_type
= None¶
-
-
class
pydruid.utils.dimensions.
MapLookupExtraction
(mapping, **kwargs)¶ Bases:
pydruid.utils.dimensions.LookupExtraction
-
build_lookup
()¶
-
lookup_type
= 'map'¶
-
-
class
pydruid.utils.dimensions.
NamespaceLookupExtraction
(namespace, **kwargs)¶ Bases:
pydruid.utils.dimensions.LookupExtraction
-
build_lookup
()¶
-
lookup_type
= 'namespace'¶
-
-
class
pydruid.utils.dimensions.
PartialExtraction
(expr)¶ Bases:
pydruid.utils.dimensions.BaseRegexExtraction
-
extraction_type
= 'partial'¶
-
-
class
pydruid.utils.dimensions.
RegexExtraction
(expr)¶ Bases:
pydruid.utils.dimensions.BaseRegexExtraction
-
extraction_type
= 'regex'¶
-
-
class
pydruid.utils.dimensions.
TimeFormatExtraction
(format, locale=None, time_zone=None)¶ Bases:
pydruid.utils.dimensions.ExtractionFunction
-
build
()¶
-
extraction_type
= 'timeFormat'¶
-
-
pydruid.utils.dimensions.
build_dimension
(dim)¶
pydruid.utils.filters module¶
-
class
pydruid.utils.filters.
Bound
(dimension, lower, upper, lowerStrict=False, upperStrict=False, alphaNumeric=False)¶ Bases:
pydruid.utils.filters.Filter
Bound filter can be used to filter by comparing dimension values to an upper value or/and a lower value.
Variables: - dimension (str) – Dimension to filter on.
- lower (str) – Lower bound.
- upper (str) – Upper bound.
- lowerStrict (bool) – Strict lower inclusion. Initial value: False
- upperStrict (bool) – Strict upper inclusion. Initial value: False
- alphaNumeric (bool) – Numeric comparison. Initial value: False
-
class
pydruid.utils.filters.
Dimension
(dim)¶ Bases:
object
-
class
pydruid.utils.filters.
JavaScript
(dim)¶ Bases:
object
pydruid.utils.postaggregator module¶
-
class
pydruid.utils.postaggregator.
Const
(value, output_name=None)¶
-
class
pydruid.utils.postaggregator.
Field
(name)¶
-
class
pydruid.utils.postaggregator.
HyperUniqueCardinality
(name)¶
-
class
pydruid.utils.postaggregator.
Postaggregator
(fn, fields, name)¶ Bases:
object
-
static
build_post_aggregators
(postaggs)¶
-
fields
(other)¶
-
static
-
class
pydruid.utils.postaggregator.
Quantile
(name, probability)¶
-
class
pydruid.utils.postaggregator.
Quantiles
(name, probabilities)¶