Datastores and Data Streams

For introduction on Data Streams see streams — Data Analysis and Processing Streams

class brewery.ds.Field(name, storage_type='unknown', analytical_type='typeless', concrete_storage_type=None, missing_values=None, label=None)

Metadata - information about a field in a dataset or in a datastream.

Attributes :
  • name - field name
  • label - optional human readable field label
  • storage_type - Normalized data storage type. The data storage type is abstracted
  • concrete_storage_type (optional, recommended) - Data store/database dependent storage type - this is the real name of data type as used in a database where the field comes from or where the field is going to be created (this might be null if unknown)
  • analytical_type - data type used in data mining algorithms
  • missing_values (optional) - Array of values that represent missing values in the dataset for given field
to_dict()

Return dictionary representation of the field.

class brewery.ds.FieldList(fields=None)

Create a list of Field objects from a list of strings, dictionaries or tuples

How fields are consutrcuted:

  • string: field name is set
  • tuple: (field_name, storaget_type, analytical_type), the field_name is obligatory, rest is optional
  • dict: contains key-value pairs for initializing a Field object

For strings and in if not explicitly specified in a tuple or a dict case, then following rules apply:

  • storage_type is set to unknown
  • analytical_type is set to typeless
append(field)

Appends a field to the list. This method requires field to be instance of Field

copy(fields=None)

Return a shallow copy of the list.

Parameters :
  • fields - list of fields to be copied.
field(name)

Return a field with name name

fields(names=None)

Return a tuple with fields. names specifies which fields are returned. When names is None all fields are returned.

index(field)

Return index of a field

indexes(fields)

Return a tuple with indexes of fields from fields in a data row. Fields should be a list of Field objects or strings.

This method is useful when it is more desirable to process data as rows (arrays), not as dictionaries, for example for performance purposes.

names(indexes=None)

Return names of fields in the list.

Parameters :
  • indexes - list of indexes for which field names should be collected. If set to None then all field names are collected - this is default behaviour.
retype(dictionary)

Retype fields according to the dictionary. Dictionary contains field names as keys and field attribute dictionary as values.

selectors(fields=None)

Return a list representing field selector - which fields are selected from a row.

class brewery.ds.DataStream

A data stream object – abstract class.

The subclasses should provide:

  • fields

fields are FieldList objects representing fields passed through the receiving stream - either read from data source (DataSource.rows()) or written to data target (DataTarget.append()).

Subclasses should populate the fields property (or implenet an accessor).

The subclasses might override:

  • initialize()
  • finalize()

The class supports context management, for example:

with ds.CSVDataSource("output.csv") as s:
    for row in s.rows():
        print row

In this case, the initialize() and finalize() methods are called automatically.

finalize()

Subclasses might put finalisation code here, for example:

  • closing a file stream
  • sending data over network
  • writing a chart image to a file

Default implementation does nothing.

initialize()

Delayed stream initialisation code. Subclasses might override this method to implement file or handle opening, connecting to a database, doing web authentication, ... By default this method does nothing.

The method does not take any arguments, it expects pre-configured object.

class brewery.ds.DataSource

Abstrac class for data sources.

read_fields(limit=0, collapse=False)

Read field descriptions from data source. You should use this for datasets that do not provide metadata directly, such as CSV files, document bases databases or directories with structured files. Does nothing in relational databases, as fields are represented by table columns and table metadata can obtained from database easily.

Note that this method can be quite costly, as by default all records within dataset are read and analysed.

After executing this method, stream fields is set to the newly read field list and may be configured (set more appropriate data types for example).

Arguments :
  • limit: read only specified number of records from dataset to guess field properties
  • collapse: whether records are collapsed into flat structure or not

Returns: tuple with Field objects. Order of fields is datastore adapter specific.

records()

Return iterable object with dict objects. This is one of two methods for reading from data source. Subclasses should implement this method.

rows()

Return iterable object with tuples. This is one of two methods for reading from data source. Subclasses should implement this method.

class brewery.ds.DataTarget

Abstrac class for data targets.

append(object)

Append an object into dataset. Object can be a tuple, array or a dict object. If tuple or array is used, then value position should correspond to field position in the field list, if dict is used, the keys should be valid field names.

class brewery.ds.CSVDataSource(resource, read_header=True, dialect=None, encoding=None, detect_header=False, sample_size=200, skip_rows=None, empty_as_null=True, fields=None, **reader_args)

Creates a CSV data source stream.

Attributes :
  • resource: file name, URL or a file handle with CVS data
  • read_header: flag determining whether first line contains header or not. True by default.
  • encoding: source character encoding, by default no conversion is performed.
  • detect_headers: try to determine whether data source has headers in first row or not
  • sample_size: maximum bytes to be read when detecting encoding and headers in file. By default it is set to 200 bytes to prevent loading huge CSV files at once.
  • skip_rows: number of rows to be skipped. Default: None
  • empty_as_null: treat empty strings as Null values

Note: avoid auto-detection when you are reading from remote URL stream.

initialize()

Initialize CSV source stream:

  1. perform autodetection if required:
    1. detect encoding from a sample data (if requested)

    #. detect whether CSV has headers from a sample data (if requested)

  2. create CSV reader object

  3. read CSV headers if requested and initialize stream fields

If fields are explicitly set prior to initialization, and header reading is requested, then the header row is just skipped and fields that were set before are used. Do not set fields if you want to read the header.

All fields are set to storage_type = string and analytical_type = unknown.

class brewery.ds.XLSDataSource(resource, sheet=None, encoding=None, skip_rows=None, read_header=True)

Creates a XLS spreadsheet data source stream.

Attributes :
  • resource: file name, URL or file-like object

  • sheet: sheet index number (as int) or sheet name (as str)

  • read_header: flag determining whether first line contains header or not.

    True by default.

initialize()

Initialize XLS source stream:

class brewery.ds.MongoDBDataSource(collection, database=None, host=None, port=None, expand=False, **mongo_args)

Creates a MongoDB data source stream.

Attributes :
  • collection: mongo collection name

  • database: database name

  • host: mongo database server host, default is localhost

  • port: mongo port, default is 27017

  • expand: expand dictionary values and treat children as top-level keys with dot ‘.’

    separated key path to the child..

initialize()

Initialize Mongo source stream:

class brewery.ds.ESDataSource(document_type, database=None, host=None, port=None, expand=False, **elasticsearch_args)

Creates a ElasticSearch data source stream.

Attributes :
  • document_type: elasticsearch document_type name

  • database: database name

  • host: elasticsearch database server host, default is localhost

  • port: elasticsearch port, default is 27017

  • expand: expand dictionary values and treat children as top-level keys with dot ‘.’

    separated key path to the child..

initialize()

Initialize ElasticSearch source stream:

class brewery.ds.GoogleSpreadsheetDataSource(spreadsheet_key=None, spreadsheet_name=None, worksheet_id=None, worksheet_name=None, query_string='', username=None, password=None)

Creates a Google Spreadsheet data source stream.

Attributes :
  • spreadsheet_key: The unique key for the spreadsheet, this

    usually in the the form ‘pk23...We’ or ‘o23...423.12,,,3’.

  • spreadsheet_name: The title of the spreadsheets.

  • worksheet_id: ID of a worksheet

  • worksheet_name: name of a worksheet

  • query_string: optional query string for row selection

  • username: Google account user name

  • password: Google account password

You should provide either spreadsheet_key or spreadsheet_name, if more than one spreadsheet with given name are found, then the first in list returned by Google is used.

For worksheet selection you should provide either worksheet_id or worksheet_name. If more than one worksheet with given name are found, then the first in list returned by Google is used. If no worksheet_id nor worksheet_name are provided, then first worksheet in the workbook is used.

For details on query string syntax see the section on sq under http://code.google.com/apis/spreadsheets/reference.html#list_Parameters

initialize()

Connect to the Google documents, authenticate.

class brewery.ds.YamlDirectoryDataSource(path, extension='yml', expand=False, filename_field=None)

Creates a YAML directory data source stream.

The data source reads files from a directory and treats each file as single record. For example, following directory will contain 3 records:

data/
    contract_0.yml
    contract_1.yml
    contract_2.yml

Optionally one can specify a field where file name will be stored.

Attributes :
  • path: directory with YAML files

  • extension: file extension to look for, default is yml,if none is given, then all regular files in the directory are read

  • expand: expand dictionary values and treat children as top-level keys with dot ‘.’

    separated key path to the child.. Default: False

  • filename_field: if present, then filename is streamed in a field with given name, or if record is requested, then filename will be in first field.

class brewery.ds.YamlDirectoryDataTarget(path, filename_template='record_${__index}.yml', expand=False, filename_start_index=0, truncate=False)

Creates a directory data target with YAML files as records.

Attributes :
  • path: directory with YAML files
  • extension: file extension to use
  • expand: expand dictionary values and treat children as top-level keys with dot ‘.’ separated key path to the child.. Default: False
  • filename_template: template string used for creating file names. ${key} is replaced with record value for key. __index is used for auto-generated file index from filename_start_index. Default filename template is record_${__index}.yml which results in filenames record_0.yml, record_1.yml, ...
  • filename_start_index - first value of __index filename template value, by default 0
  • filename_field: if present, then filename is taken from that field.
  • truncate: remove all existing files in the directory. Default is False.
class brewery.ds.SQLDataSource(connection=None, url=None, table=None, statement=None, schema=None, autoinit=True, **options)

Creates a relational database data source stream.

Attributes :
  • url: SQLAlchemy URL - either this or connection should be specified
  • connection: SQLAlchemy database connection - either this or url should be specified
  • table: table name
  • statement: SQL statement to be used as a data source (not supported yet)
  • autoinit: initialize on creation, no explicit initialize() is needed
  • options: SQL alchemy connect() options
initialize()

Initialize source stream. If the fields are not initialized, then they are read from the table.

class brewery.ds.SQLDataTarget(connection=None, url=None, table=None, schema=None, truncate=False, create=False, replace=False, add_id_key=False, id_key_name=None, buffer_size=None, fields=None, concrete_type_map=None, **options)

Creates a relational database data target stream.

Attributes :
  • url: SQLAlchemy URL - either this or connection should be specified
  • connection: SQLAlchemy database connection - either this or url should be specified
  • table: table name
  • truncate: whether truncate table or not
  • create: whether create table on initialize() or not
  • replace: Set to True if creation should replace existing table or not, otherwise initialization will fail on attempt to create a table which already exists.
  • options: other SQLAlchemy connect() options
  • add_id_key: whether to add auto-increment key column or not. Works only if create is True
  • id_key_name: name of the auto-increment key. Default is ‘id’
  • buffer_size: size of INSERT buffer - how many records are collected before they are inserted using multi-insert statement. Default is 1000
  • fields : fieldlist for a new table

Note: avoid auto-detection when you are reading from remote URL stream.

finalize()

Closes the stream, flushes buffered data

initialize()

Initialize source stream:

class brewery.ds.StreamAuditor(distinct_threshold=10)

Target stream for auditing data values from stream. For more information about probed value properties, please refer to brewery.dq.FieldStatistics

append(obj)

Probe row or record and update statistics.

field_statistics

Return field statistics as dictionary: keys are field names, values are brewery.dq.FieldStatistics objects

This Page