Source code for eztable.table

import csv
import logging
import six.moves
import itertools
from collections import OrderedDict
from six import string_types
from weakref import WeakValueDictionary, WeakSet

from .columns import DerivedColumn, Column, DerivedTableColumn, StaticColumn, JoinColumn, ArrayColumn, describe_column, \
    NormalizedColumn, StandardizedColumn
from .row import TableRow
from .exceptions import InvalidData, InvalidJoinMode
from .index import Index
from .aggregation import Aggregation

log = logging.getLogger(__name__)

[docs]class Table(object): """The basic table class. Table objects contain any Python data type, however some features may be unavailable if the types are non-hashable. """
[docs] def __init__(self, schema, data=None): """ Every Table object has a schema. In it's simplest form, the schema can be nothing more than a list of string column-names. Specifying a schema this way will produce a non-typed table, in which any Python type can be stored in any column. Alternativly the schema can include type information. Instead of Specifying the schema item as a string, use (column_name, type), where type is a python type or class object, for example int, str. It is expected that most of the values stored in the table will be simple objects or native types such as numbers and strings, however it is also possible to store any python object as long as they are hashable. :param schema: Column names as a sequence of strings, or ('col_name', type) :type schema: list :param data: Optional rows of data to initialize the table. :type data: list of lists """ data = data or [] self._columns = [] self.indexes = WeakValueDictionary() self._listeners = WeakSet() for s in schema: if isinstance(s, string_types): self._columns.append(Column(s)) else: name, typ = s if isinstance(typ, str): col = ArrayColumn(name, column_type=typ) else: col = Column(name, column_type=typ) self._columns.append(col) for row in data: self.append(row)
[docs] def append(self, row): """Append a single row to this table. The row must match the table's schema, typically this means that the row should have the same number of items, however if types were specified then the type of each positional element must conform to the required type of the corresponding schema column. :param row: A single table row to be added :type row: List of objects, types must correspond with schema """ if len(row) != len(self._columns): raise InvalidData( "Expected %d columns, got %d" % (len(row), len(self._columns)) ) zipped = list(six.moves.zip(row, self._columns)) for v, c in zipped: if not c.validate(v): raise InvalidData( '%r is incompatible with type %s for column %s' % ( v, c.column_type, c.name ) ) for v, c in zipped: c.append(v) for l in self._listeners: l.notify('append', len(self) - 1)
[docs] def extend(self, iterable): """Append all rows in iterable to this table. Each row must conform to this table's schema. :param iterable: Iterator from which to extract rows :type iterable: iterable """ for row in iterable: self.append(row)
@property def schema(self): """Get the table's schema. This is a list of (name (string), type) tuples. """ s = [] for c in self._columns: s.append((c.name, c.column_type)) return s @property def column_names(self): """Get the table's column names as a list of strings. """ return [c.name for c in self._columns] @property def column_types(self): """Get the table's column types as a list of types. """ return [c.column_type for c in self._columns] @property def _column_descriptions(self): try: return [c.description for c in self._columns] except AttributeError as ae: log.exception(ae[0]) raise RuntimeError(ae[0]) def __iter__(self): """Iterate through the rows of this table. Each row behaves like a namedtuple. """ s = self._tablerow_schema() for r in six.moves.zip(*self._columns): yield TableRow(r, s) def _tablerow_schema(self): return OrderedDict((c.name, i) for i, c in enumerate(self._columns)) def __getslice__(self, start, stop): """Required to support slicing on Python 2.x """ return self.__getitem__(slice(start, stop, 1)) def _get_column(self, name): """Get a single Column object by name. Columns are list-like sequences. """ for c in self._columns: if c.name == name: return c raise KeyError(name)
[docs] def anti_project(self, *col_names): """Returns a new DerivedTable in which the named columns have been removed. Unless the new table is materialised it shares the same data as the table it was made from, hence extending or appending from the new table will also modify the projected table. Ordering of the original columns will be retained, except that the specified columns will no longer be accessible. :param col_names: List of colun names to remove :type col_names: List of strings """ if len(col_names) and not isinstance(col_names[0], string_types): col_names = col_names[0] if (not col_names or not all(isinstance(c, string_types) for c in col_names)): raise TypeError( "anti_project() takes either a list of strings, or positional " "arguments of type string" ) keep_cols = [c for c in self.column_names if c not in col_names] return self._project(keep_cols)
[docs] def project(self, *col_names): """Returns a new DerivedTable in which only the named columns remain in the order specified by col_names. :param col_names: List of column names to keep :type col_names: List of strings """ if len(col_names) and not isinstance(col_names[0], string_types): col_names = col_names[0] if (not col_names or not all(isinstance(c, string_types) for c in col_names)): raise TypeError( "project() takes either a list of strings, or positional " "arguments of type string" ) return self._project(col_names)
def _indices_func(self): """Internal function: This specifies the order in which the __iter__ method retreives rows. """ return six.moves.range(len(self))
[docs] def rename(self, old_names, new_names): """Rename columns in the table. Does not affect the order of columns. :param old_names: list of column names to rename. :param new_names: list of the new names to asign to the renamed columns. """ rename_dict = dict(zip(old_names, new_names)) return self._project( col_names=self.column_names, rename_dict=rename_dict )
def _project(self, col_names, rename_dict=None): """Implementation of project, anti_project and rename function""" rename_dict = rename_dict or {} cols = [self._get_column(c) for c in col_names] return ( DerivedTable(self._indices_func, cols, rename_dict=rename_dict) )
[docs] def expand_const(self, name, value, type=object): """Returns a new DerivedTable in which a single column of static data has been added. :param name: The name of the new column to be added :type name: str :param value: The constant value of the new column :type value: object :param type: Optional, specify a type constraint for the new column :type type: type """ return DerivedTable( self._indices_func, self._columns + [StaticColumn(name, value, self.__len__, type)], )
[docs] def expand(self, name, input_columns, fn, col_type=object): """Returns a new DerivedTable in which a new calculated column has been added. This column's value is determined by a function and a set of input columns. :param name: The name of the new derived coulumn. :type name: str :param input_columns: The input column names. :type input_columns: list of str :param fn; A function or lambda :param col_type: Optionally, constrain the value of this column by type """ incols = [] for c in input_columns: incols.append(self._get_column(c)) return DerivedTable( self._indices_func, self._columns + [DerivedColumn(name, incols, fn, col_type)] )
[docs] def hash(self, name, input_columns): """A convenience function that expands the table with a new hash column. """ return self.expand(name, input_columns, hash, int)
[docs] def copy(self): """Create a 'materialised' copy of this table. This converts all dynamically generated columns into StaticColumn objects. """ t = Table(self.schema) t.extend(self) return t
[docs] def add_index(self, cols): """Create a new index on a set of columns. Indexes are list-like objects which can be used to speed-up access to rows of data. Indexes improve the preformance of operations (e.g. joins). The Table class only holds a weak-reference to this object, hence the user must retain a reference to the index in order to prevent it from being garbage collected. :param cols: Column names to be included into the index. :type cols: List of strings. """ index_key = tuple(cols) if index_key in self.indexes: return self.indexes[index_key] i = Index(table=self, cols=cols) self.indexes[index_key] = i self._listeners.add(i) return i
[docs] def split(self): return self, self, self
[docs] def left_join(self, keys, other, other_keys=None): """Left join the other table onto this, return a table. :param keys: List of column names which will be matched. :param other: the other table to join on to this table. :param other_keys: Optional list of foreign keys """ other_keys = other_keys or keys return self._join( keys=keys, other=other, other_keys=other_keys, mode='left' )
[docs] def inner_join(self, keys, other, other_keys=None): """Left join the other table onto this, return a table. :param keys: List of column names which will be matched. :param other: the other table to join on to this table. :param other_keys: Optional list of foreign keys """ other_keys = other_keys or keys return self._join( keys=keys, other=other, other_keys=other_keys, mode='inner' )
def _join(self, keys, other, other_keys=None, mode='left'): other_keys = other_keys or keys return JoinTable( indices_func=self._indices_func, left_columns=self._columns, keys=keys, other=other, other_keys=other_keys, mode=mode )
[docs] def restrict(self, col_names, fn=None): """ Return a new DerivedTable object in which all visible rows satisfy some kind of logical constraint given by fn. :param col_names: List of column names to feed into fn :type col_names: list of strings :param fn: Should return True for any retained row. :type fn: fuunction or lambda """ cols = [self._get_column(cn) for cn in col_names] def indices_func(): for i in self._indices_func(): vals = [c[i] for c in cols] if fn(*vals): yield i return DerivedTable( indices_func=indices_func, columns=self._columns )
def __getitem__(self, key): if isinstance(key, slice): if key.step and key.step < 0: # islice doesn't support negative indices, convert to a list f = lambda: list(self._indices_func())[key] else: f = lambda: itertools.islice( self._indices_func(), key.start, key.stop, key.step or None ) return DerivedTable( f, self._columns[:], ) else: return self.get_row(key) def __getattr__(self, attr): try: return self._get_column(attr) except KeyError: raise AttributeError( "%r object has no attribute %r" % ( self.__class__.__name__, attr ) )
[docs] def get_row(self, key): """Get a single row from the table. :param key: Row index :type key: int """ s = self._tablerow_schema() return TableRow([c[key] for c in self._columns], s)
def __len__(self): return len(self._columns[0]) def __eq__(self, ano): if not isinstance(ano, Table): return False if self.schema != ano.schema: return False for a, b in six.moves.zip_longest(self, ano): if a != b: return False return True def _get_column_widths(self): """Get maximum column widths as a list of integers""" cl = [len(cd) for cd in self._column_descriptions] for r in self: for i, (m, c) in enumerate(zip(cl, r)): this_col_len = len(str(c)) if this_col_len > m: cl[i] = this_col_len return cl def __repr__(self): """Produce a handy representation of the table: The first row will be column names and types (if specified), subsequent rows will be the actual data in the table. All columns will be sperated by | symbols and padded with whitespace as required. """ cl = self._get_column_widths() out = [] def format_row(r): out.append( '| %s |' % (' | '.join(str(c).ljust(l) for l, c in zip(cl, r))) ) format_row(self._column_descriptions) for r in self: format_row(r) return '\n'.join(out)
[docs] def aggregate(self, keys, aggregations): """Summarize a table by grouping by one or more keys, and then apply aggregation functions to generate additional summarized columns. Aggregations are specified as a list of triples in the form: (column name (str), column type (type), column function (callable)) The column function should be a function that returns the type specified in the 2nd column. It's input will be each of the sub-tables speified by the grouping keys. >>> from eztable import table_literal >>> t = table_literal(\'\'\' ... | Attack(str) | Pokemon(str) | Level Obtained(int) | Attack Type(str) | ... | Thunder Shock | Pikachu | 1 | Electric | ... | Tackle | Pikachu | 1 | Normal | ... | Tail Whip | Pikachu | 1 | Normal | ... | Growl | Pikachu | 5 | Normal | ... | Quick Attack | Pikachu | 10 | Normal | ... | Thunder Wave | Pikachu | 13 | Electric | ... | Electro Ball | Pikachu | 18 | Electric | ... | Charm | Pikachu | 0 | Fairy | ... | Sweet Kiss | Pikachu | 0 | Fairy | ... \'\'\') >>> >>> agg = t.aggregate( ... keys=('Pokemon', 'Attack Type'), ... aggregations = [ ... ('Count', int, lambda t:len(t)) ... ] ... ) >>> >>> print agg | Pokemon (str) | Attack Type (str) | Count (int) | | Pikachu | Normal | 4 | | Pikachu | Electric | 3 | | Pikachu | Fairy | 2 | :param keys: List of column names to group by :type keys: List of strings :param aggregations: List of aggregations to calculate :type aggregations: list of tuples """ i = self.add_index(keys).reindex() return AggregationTable( self, i, keys=keys, aggregations=aggregations )
[docs] def to_csv(self, output_file, dialect="excel", descriptions=False): """ Save this table to a file in CSV format (or any dialect variation supported by Python's CSV library). :param output_file: A file or file-like object (not a filename) :param dialect: Any previously registered CSV writer dialect name :param descriptions: Set to True if you want to include column descriptions rather than column-names :return: None """ writer = csv.writer(output_file, dialect=dialect) if descriptions: writer.writerow(self._column_descriptions) else: writer.writerow(self.column_names) for row in self: writer.writerow(row)
[docs] def standardize(self, standardizations): def standardize_col(c): """ Inner function, normalize a column c if required :param c: eztable.column.Column :return: either a normalized column or the original column """ if c.name in standardizations: return StandardizedColumn(c, standardizations[c.name]) else: return c return DerivedTable(indices_func=self._indices_func, columns = [standardize_col(c) for c in self._columns] )
[docs] def normalize(self, normalizations): """ Return a version of the table with columns normalized. :param normalizations: dict mapping column names to their normalized range (typically 1). :return: A derived eztable.Table with the normalizations applied. """ def normalize_col(c): """ Inner function, normalize a column c if required :param c: toytable.column.Column :return: either a normalized colukn or the original column """ if c.name in normalizations: return NormalizedColumn(c, normalizations[c.name]) else: return c return DerivedTable(indices_func=self._indices_func, columns=[normalize_col(c) for c in self._columns] )
class AggregationTable(Table): def __init__(self, table, index, keys, aggregations): self.table = table self.i = index self.keys = keys self.aggregations = [Aggregation(*a) for a in aggregations] def _indices_func(self): return six.moves.range(len(self.i.unique_values())) def _iter_subtables(self): """Generator function that gives a sequnce of (values, table) which represents this table if it were split by the unique values in the selected columns. """ for uv in self.i.unique_values(): iterfn = self.i._get_iterator_fn_for_value(uv) yield uv, DerivedTable( indices_func=iterfn, columns=self.table._columns ) @property def column_names(self): return list(self.keys) + [a.name for a in self.aggregations] @property def column_types(self): table_types = [self.table._get_column(cn).column_type for cn in self.keys] aggregation_types = [a.column_type for a in self.aggregations] return table_types + aggregation_types def get_row(self, row): row_keys, subtable = next(itertools.islice( self._iter_subtables(), row, row + 1 )) r = row_keys + tuple(a(subtable) for a in self.aggregations) return TableRow(r, self.column_names) def __iter__(self): for row_keys, subtable in self._iter_subtables(): r = row_keys + tuple(a(subtable) for a in self.aggregations) yield TableRow(r, self.column_names) @property def schema(self): return list(zip(self.column_names, self.column_types)) def __getattr__(self, key): return ('Pikachu', 'Normal', 4) @property def _column_descriptions(self): return ( [describe_column(name, typ) for (name, typ) in self.schema] ) class DerivedTable(Table): """A view on an actual table, can include a smaller number of rows or columns than the orginal for performance reasons, certain functions are prohibited. """ def __init__(self, indices_func, columns, rename_dict=None): self._indices_func = indices_func self._columns = columns self._rename_dict = rename_dict or {} self._inv_rename_dict = {v: k for k, v in self._rename_dict.items()} @property def column_names(self): rd = self._rename_dict return [rd.get(c.name, c.name) for c in self._columns] def __iter__(self): cs = self._columns s = dict((c.name, i) for i, c in enumerate(cs)) cls = TableRow for i in self._indices_func(): # Slightly optimised, eg. we don't do LOAD_GLOBAL in this loop # i can be None (because of broken joins) r = (None if i is None else c[i] for c in cs) yield cls(r, s) def _get_column(self, name): actual_name = self._inv_rename_dict.get(name, name) actual_col = Table._get_column(self, actual_name) return DerivedTableColumn(self._indices_func, actual_col) def append(self, row): raise TypeError("Cannot do append on a non-materialised table.") def extend(self, rows): raise TypeError("Cannot do extend on a non-materialised table.") def __len__(self): idxs = self._indices_func() try: return len(idxs) except (AttributeError, TypeError): return sum(1 for _ in idxs) def get_row(self, key): try: idx = next(itertools.islice(self._indices_func(), key, None)) except StopIteration: raise IndexError(key) return Table.get_row(self, idx) class JoinTable(DerivedTable): """The result of a table join operation. Join tables extend the _indices_func behavior of DerivedTable, with _left_join_indices_func, which provides a sequence of pairs. """ def __init__(self, indices_func, left_columns, keys, other, other_keys, mode='left'): self._indices_func = indices_func self._left_columns = left_columns self._keys = keys self._other = other self._other_keys = other_keys self._mode = mode # Finally build an index self._join_index = other.add_index( cols=other_keys ).reindex() @property def _columns(self): return self._left_columns + self._join_columns def _left_join_indices_func(self): """Generator function which gives a sequence of pairs: The first value is the index for the row in this table. The second value is the index for the row in the joined table. """ kcs = self._key_columns for i in self._indices_func(): key = tuple(key[i] for key in kcs) try: yield i, self._join_index.index(key)[0] except KeyError: yield i, None def _inner_join_indices_func(self): """Generator function which provides the sequence of indexes for an inner join. """ kcs = self._key_columns for i in self._indices_func(): key = tuple(key[i] for key in kcs) try: yield i, self._join_index.index(key)[0] except KeyError: pass def _join_indices_func(self): """Generator function giving only the sequence of indices in the joined columns """ for _, ji in self.get_indeces_function()(): yield ji def get_indeces_function(self): try: return { 'left':self._left_join_indices_func, 'inner':self._inner_join_indices_func, }[self._mode] except KeyError: raise InvalidJoinMode(self._mode) def _get_column(self, name): """Get a single Column object by name. Columns are list-like sequences. """ for c in self._columns + self._join_columns: if c.name == name: return c raise KeyError(name) @property def _key_columns(self): return [self._get_column(k) for k in self._keys] @property def _join_columns(self): all_keys = set(self._keys + self._other_keys) return ( [JoinColumn(indices_func=self._join_indices_func, column=c) for c in self._other._columns if c.name not in all_keys] ) @property def column_names(self): """Get the table's column names as a list of strings. """ return [c.name for c in (self._left_columns + self._join_columns)] @property def schema(self): """Get the table's schema. This is a list of (name (string), type) tuples. The method on Table is overridden because we need to get the schema from both the original and joined columns. """ s = [] for c in self._columns: s.append((c.name, c.column_type)) return s def __getitem__(self, key): cs = self._left_columns jcs = self._join_columns try: i, ji = next(itertools.islice( self.get_indeces_function()(), key, key + 1)) except StopIteration: raise IndexError(key) s = dict((c.name, i) for i, c in enumerate(self._columns)) if ji is None: # Literally none! r = itertools.chain( (c[i] for c in cs), (None for jc in jcs) ) else: r = itertools.chain( (c[i] for c in cs), (jc._column[ji] for jc in jcs) ) return TableRow(r, s) def __iter__(self): cs = self._left_columns jcs = self._join_columns kcs = self._key_columns fn_i = self.get_indeces_function() s = dict((c.name, i) for i, c in enumerate(self._columns)) for i, ji in fn_i(): if ji is None: # Literally none! r = itertools.chain( (c[i] for c in cs), (None for jc in jcs) ) else: r = itertools.chain( (c[i] for c in cs), (jc._column[ji] for jc in jcs) ) yield TableRow(r, s)