Datastores and Data Streams
For introduction on Data Streams see streams — Data Analysis and Processing Streams
-
class brewery.ds.Field(name, storage_type='unknown', analytical_type='typeless', concrete_storage_type=None, missing_values=None, label=None)
Metadata - information about a field in a dataset or in a datastream.
Attributes : |
- name - field name
- label - optional human readable field label
- storage_type - Normalized data storage type. The data storage type
is abstracted
- concrete_storage_type (optional, recommended) - Data store/database
dependent storage type - this is the real name of data type as used
in a database where the field comes from or where the field is going
to be created (this might be null if unknown)
- analytical_type - data type used in data mining algorithms
- missing_values (optional) - Array of values that represent missing
values in the dataset for given field
|
-
to_dict()
Return dictionary representation of the field.
-
class brewery.ds.FieldList(fields=None)
Create a list of Field objects from a list of strings, dictionaries or tuples
How fields are consutrcuted:
- string: field name is set
- tuple: (field_name, storaget_type, analytical_type), the field_name is
obligatory, rest is optional
- dict: contains key-value pairs for initializing a Field object
For strings and in if not explicitly specified in a tuple or a dict case, then following rules
apply:
- storage_type is set to unknown
- analytical_type is set to typeless
-
append(field)
Appends a field to the list. This method requires field to be
instance of Field
-
copy(fields=None)
Return a shallow copy of the list.
Parameters : |
- fields - list of fields to be copied.
|
-
field(name)
Return a field with name name
-
fields(names=None)
Return a tuple with fields. names specifies which fields are returned. When names is
None all fields are returned.
-
index(field)
Return index of a field
-
indexes(fields)
Return a tuple with indexes of fields from fields in a data row. Fields
should be a list of Field objects or strings.
This method is useful when it is more desirable to process data as rows (arrays), not as
dictionaries, for example for performance purposes.
-
names(indexes=None)
Return names of fields in the list.
Parameters : |
- indexes - list of indexes for which field names should be collected. If set to
None then all field names are collected - this is default behaviour.
|
-
retype(dictionary)
Retype fields according to the dictionary. Dictionary contains
field names as keys and field attribute dictionary as values.
-
selectors(fields=None)
Return a list representing field selector - which fields are
selected from a row.
-
class brewery.ds.DataStream
A data stream object – abstract class.
The subclasses should provide:
fields are FieldList objects representing fields passed
through the receiving stream - either read from data source
(DataSource.rows()) or written to data target
(DataTarget.append()).
Subclasses should populate the fields property (or implenet an
accessor).
The subclasses might override:
The class supports context management, for example:
with ds.CSVDataSource("output.csv") as s:
for row in s.rows():
print row
In this case, the initialize() and finalize() methods are called
automatically.
-
finalize()
Subclasses might put finalisation code here, for example:
- closing a file stream
- sending data over network
- writing a chart image to a file
Default implementation does nothing.
-
initialize()
Delayed stream initialisation code. Subclasses might override this
method to implement file or handle opening, connecting to a database,
doing web authentication, ... By default this method does nothing.
The method does not take any arguments, it expects pre-configured
object.
-
class brewery.ds.DataSource
Abstrac class for data sources.
-
read_fields(limit=0, collapse=False)
Read field descriptions from data source. You should use this for datasets that do not
provide metadata directly, such as CSV files, document bases databases or directories with
structured files. Does nothing in relational databases, as fields are represented by table
columns and table metadata can obtained from database easily.
Note that this method can be quite costly, as by default all records within dataset are read
and analysed.
After executing this method, stream fields is set to the newly read field list and may
be configured (set more appropriate data types for example).
Arguments : |
- limit: read only specified number of records from dataset to guess field properties
- collapse: whether records are collapsed into flat structure or not
|
Returns: tuple with Field objects. Order of fields is datastore adapter specific.
-
records()
Return iterable object with dict objects. This is one of two methods for reading from
data source. Subclasses should implement this method.
-
rows()
Return iterable object with tuples. This is one of two methods for reading from
data source. Subclasses should implement this method.
-
class brewery.ds.DataTarget
Abstrac class for data targets.
-
append(object)
Append an object into dataset. Object can be a tuple, array or a dict object. If tuple
or array is used, then value position should correspond to field position in the field list,
if dict is used, the keys should be valid field names.
-
class brewery.ds.CSVDataSource(resource, read_header=True, dialect=None, encoding=None, detect_header=False, sample_size=200, skip_rows=None, empty_as_null=True, fields=None, **reader_args)
Creates a CSV data source stream.
Attributes : |
- resource: file name, URL or a file handle with CVS data
- read_header: flag determining whether first line contains header
or not. True by default.
- encoding: source character encoding, by default no conversion is
performed.
- detect_headers: try to determine whether data source has headers
in first row or not
- sample_size: maximum bytes to be read when detecting encoding
and headers in file. By default it is set to 200 bytes to
prevent loading huge CSV files at once.
- skip_rows: number of rows to be skipped. Default: None
- empty_as_null: treat empty strings as Null values
|
Note: avoid auto-detection when you are reading from remote URL
stream.
-
initialize()
Initialize CSV source stream:
- perform autodetection if required:
- detect encoding from a sample data (if requested)
#. detect whether CSV has headers from a sample data (if
requested)
create CSV reader object
read CSV headers if requested and initialize stream fields
If fields are explicitly set prior to initialization, and header
reading is requested, then the header row is just skipped and fields
that were set before are used. Do not set fields if you want to read
the header.
All fields are set to storage_type = string and
analytical_type = unknown.
-
class brewery.ds.XLSDataSource(resource, sheet=None, encoding=None, skip_rows=None, read_header=True)
Creates a XLS spreadsheet data source stream.
Attributes : |
resource: file name, URL or file-like object
sheet: sheet index number (as int) or sheet name (as str)
- read_header: flag determining whether first line contains header or not.
True by default.
|
-
initialize()
Initialize XLS source stream:
-
class brewery.ds.MongoDBDataSource(collection, database=None, host=None, port=None, expand=False, **mongo_args)
Creates a MongoDB data source stream.
Attributes : |
collection: mongo collection name
database: database name
host: mongo database server host, default is localhost
port: mongo port, default is 27017
- expand: expand dictionary values and treat children as top-level keys with dot ‘.’
separated key path to the child..
|
-
initialize()
Initialize Mongo source stream:
-
class brewery.ds.ESDataSource(document_type, database=None, host=None, port=None, expand=False, **elasticsearch_args)
Creates a ElasticSearch data source stream.
Attributes : |
document_type: elasticsearch document_type name
database: database name
host: elasticsearch database server host, default is localhost
port: elasticsearch port, default is 27017
- expand: expand dictionary values and treat children as top-level keys with dot ‘.’
separated key path to the child..
|
-
initialize()
Initialize ElasticSearch source stream:
-
class brewery.ds.GoogleSpreadsheetDataSource(spreadsheet_key=None, spreadsheet_name=None, worksheet_id=None, worksheet_name=None, query_string='', username=None, password=None)
Creates a Google Spreadsheet data source stream.
Attributes : |
- spreadsheet_key: The unique key for the spreadsheet, this
usually in the the form ‘pk23...We’ or ‘o23...423.12,,,3’.
spreadsheet_name: The title of the spreadsheets.
worksheet_id: ID of a worksheet
worksheet_name: name of a worksheet
query_string: optional query string for row selection
username: Google account user name
password: Google account password
|
You should provide either spreadsheet_key or spreadsheet_name, if more than one spreadsheet with
given name are found, then the first in list returned by Google is used.
For worksheet selection you should provide either worksheet_id or worksheet_name. If more than
one worksheet with given name are found, then the first in list returned by Google is used. If
no worksheet_id nor worksheet_name are provided, then first worksheet in the workbook is used.
For details on query string syntax see the section on sq under
http://code.google.com/apis/spreadsheets/reference.html#list_Parameters
-
initialize()
Connect to the Google documents, authenticate.
-
class brewery.ds.YamlDirectoryDataSource(path, extension='yml', expand=False, filename_field=None)
Creates a YAML directory data source stream.
The data source reads files from a directory and treats each file as single record. For example,
following directory will contain 3 records:
data/
contract_0.yml
contract_1.yml
contract_2.yml
Optionally one can specify a field where file name will be stored.
Attributes : |
path: directory with YAML files
extension: file extension to look for, default is yml,if none is given, then
all regular files in the directory are read
- expand: expand dictionary values and treat children as top-level keys with dot ‘.’
separated key path to the child.. Default: False
filename_field: if present, then filename is streamed in a field with given name,
or if record is requested, then filename will be in first field.
|
-
class brewery.ds.YamlDirectoryDataTarget(path, filename_template='record_${__index}.yml', expand=False, filename_start_index=0, truncate=False)
Creates a directory data target with YAML files as records.
Attributes : |
- path: directory with YAML files
- extension: file extension to use
- expand: expand dictionary values and treat children as top-level keys with dot ‘.’
separated key path to the child.. Default: False
- filename_template: template string used for creating file names. ${key} is replaced
with record value for key. __index is used for auto-generated file index from
filename_start_index. Default filename template is record_${__index}.yml which
results in filenames record_0.yml, record_1.yml, ...
- filename_start_index - first value of __index filename template value, by default 0
- filename_field: if present, then filename is taken from that field.
- truncate: remove all existing files in the directory. Default is False.
|
-
class brewery.ds.SQLDataSource(connection=None, url=None, table=None, statement=None, schema=None, autoinit=True, **options)
Creates a relational database data source stream.
Attributes : |
- url: SQLAlchemy URL - either this or connection should be specified
- connection: SQLAlchemy database connection - either this or url should be specified
- table: table name
- statement: SQL statement to be used as a data source (not supported yet)
- autoinit: initialize on creation, no explicit initialize() is
needed
- options: SQL alchemy connect() options
|
-
initialize()
Initialize source stream. If the fields are not initialized, then
they are read from the table.
-
class brewery.ds.SQLDataTarget(connection=None, url=None, table=None, schema=None, truncate=False, create=False, replace=False, add_id_key=False, id_key_name=None, buffer_size=None, fields=None, concrete_type_map=None, **options)
Creates a relational database data target stream.
Attributes : |
- url: SQLAlchemy URL - either this or connection should be specified
- connection: SQLAlchemy database connection - either this or url should be specified
- table: table name
- truncate: whether truncate table or not
- create: whether create table on initialize() or not
- replace: Set to True if creation should replace existing table or not, otherwise
initialization will fail on attempt to create a table which already exists.
- options: other SQLAlchemy connect() options
- add_id_key: whether to add auto-increment key column or not. Works only if create
is True
- id_key_name: name of the auto-increment key. Default is ‘id’
- buffer_size: size of INSERT buffer - how many records are collected before they are
inserted using multi-insert statement. Default is 1000
- fields : fieldlist for a new table
|
Note: avoid auto-detection when you are reading from remote URL stream.
-
finalize()
Closes the stream, flushes buffered data
-
initialize()
Initialize source stream:
-
class brewery.ds.StreamAuditor(distinct_threshold=10)
Target stream for auditing data values from stream. For more information about probed value
properties, please refer to brewery.dq.FieldStatistics
-
append(obj)
Probe row or record and update statistics.
-
field_statistics
Return field statistics as dictionary: keys are field names, values are
brewery.dq.FieldStatistics objects