# Chris Riederer
# 2016-02-17
"""Dplyr-style operations on top of pandas DataFrame."""
from functools import wraps
import itertools
import operator
import sys
import types
import warnings
warnings.simplefilter("once")
import six
from six.moves import range
import numpy as np
import pandas
from pandas import DataFrame
__version__ = "0.0.4"
[docs]class Manager(object):
"""Object which helps create a delayed computational unit.
Typically will be set as a global variable ``X``.
``X.foo`` will refer to the ``"foo"`` column of the DataFrame in which it is later
applied.
Manager can be used in two ways:
1. attribute notation: ``X.foo``
2. item notation: ``X["foo"]``
Attribute notation is preferred but item notation can be used in cases where
column names contain characters on which python will choke, such as spaces,
periods, and so forth.
"""
def __getattr__(self, attr):
return Later(attr)
def __getitem__(self, key):
return Later(key)
X = Manager()
reversible_operators = [
["__add__", "__radd__"],
["__sub__", "__rsub__"],
["__mul__", "__rmul__"],
["__floordiv__", "__rfloordiv__"],
["__div__", "__rdiv__"],
["__truediv__", "__rtruediv__"],
["__mod__", "__rmod__"],
["__divmod__", "__rdivmod__"],
["__pow__", "__rpow__"],
["__lshift__", "__rlshift__"],
["__rshift__", "__rrshift__"],
["__and__", "__rand__"],
["__or__", "__ror__"],
["__xor__", "__rxor__"],
]
normal_operators = [
"__abs__", "__concat__", "__contains__", "__delitem__", "__delslice__",
"__eq__", "__file__", "__ge__", "__getitem__", "__getslice__", "__gt__",
"__iadd__", "__iand__", "__iconcat__", "__idiv__", "__ifloordiv__",
"__ilshift__", "__imod__", "__imul__", "__index__", "__inv__", "__invert__",
"__ior__", "__ipow__", "__irepeat__", "__irshift__", "__isub__",
"__itruediv__", "__ixor__", "__le__", "__lt__", "__ne__", "__neg__",
"__not__", "__package__", "__pos__", "__repeat__", "__setitem__",
"__setslice__", "__radd__", "__rsub__", "__rmul__", "__rfloordiv__",
"__rdiv__", "__rtruediv__", "__rmod__", "__rdivmod__", "__rpow__",
"__rlshift__", "__rand__", "__ror__", "__rxor__", # "__rrshift__",
]
def create_reversible_func(func_name):
def reversible_func(self, arg):
self._UpdateStrAttr(func_name)
self._UpdateStrCallArgs([arg], {})
def use_operator(df):
if isinstance(arg, Later):
altered_arg = arg.applyFcns(self.origDf)
else:
altered_arg = arg
return getattr(operator, func_name)(df, altered_arg)
self.todo.append(use_operator)
return self
return reversible_func
def instrument_operator_hooks(cls):
def add_hook(name):
def op_hook(self, *args, **kwargs):
self._UpdateStrAttr(name)
self._UpdateStrCallArgs(args, kwargs)
if len(args) > 0 and type(args[0]) == Later:
self.todo.append(lambda df: getattr(df, name)(args[0].applyFcns(self.origDf)))
else:
self.todo.append(lambda df: getattr(df, name)(*args, **kwargs))
return self
try:
setattr(cls, name, op_hook)
except (AttributeError, TypeError):
pass # skip __name__ and __doc__ and the like
for hook_name in normal_operators:
add_hook(hook_name)
for func_name, rfunc_name in reversible_operators:
setattr(cls, func_name, create_reversible_func(func_name))
return cls
def _addQuotes(item):
return '"' + item + '"' if isinstance(item, str) else item
@instrument_operator_hooks
[docs]class Later(object):
"""Object which represents a computation to be carried out later.
The Later object allows us to save computation that cannot currently be
executed. It will later receive a DataFrame as an input, and all computation
will be carried out upon this DataFrame object.
Thus, we can refer to columns of the DataFrame as inputs to functions without
having the DataFrame currently available:
>>> diamonds >> sift(X.carat > 4) >> select(X.carat, X.price)
Out:
carat price
25998 4.01 15223
25999 4.01 15223
27130 4.13 17329
27415 5.01 18018
27630 4.50 18531
The special Later name, ``"_"`` will refer to the entire DataFrame. For example,
>>> diamonds >> sample_n(6) >> select(X.carat, X.price) >> X._.T
Out:
18966 19729 9445 49951 3087 33128
carat 1.16 1.52 0.9 0.3 0.74 0.31
price 7803.00 8299.00 4593.0 540.0 3315.00 816.00
"""
def __init__(self, name):
self.name = name
if name == "_":
self.todo = [lambda df: df]
else:
self.todo = [lambda df: df[self.name]]
self._str = 'data["{0}"]'.format(name)
def applyFcns(self, df):
self.origDf = df
stmt = df
for func in self.todo:
stmt = func(stmt)
return stmt
def __str__(self):
return "{0}".format(self._str)
def __repr__(self):
return "{0}".format(self._str)
def __getattr__(self, attr):
self.todo.append(lambda df: getattr(df, attr))
self._UpdateStrAttr(attr)
return self
def __call__(self, *args, **kwargs):
self.todo.append(lambda foo: foo.__call__(*args, **kwargs))
self._UpdateStrCallArgs(args, kwargs)
return self
def __rrshift__(self, df):
otherDf = DplyFrame(df.copy(deep=True))
return self.applyFcns(otherDf)
def __nonzero__(self):
raise ValueError("This python code evaluates if this Later is 'True' or "
"'False' immediately, instead of waiting for the values "
"to become available. This is ambiguous. Try writing your "
"code inside a DelayFunction or use if_else.")
def _UpdateStrAttr(self, attr):
self._str += ".{0}".format(attr)
def _UpdateStrCallArgs(self, args, kwargs):
# We sort here because keyword arguments get arbitrary ordering inside the
# function call. Support PEP 0468 to help fix this issue!
# https://www.python.org/dev/peps/pep-0468/
kwargs_strs = sorted(["{0}={1}".format(k, _addQuotes(v))
for k, v in kwargs.items()])
input_strs = list(map(str, args)) + kwargs_strs
input_str = ", ".join(input_strs)
self._str += "({0})".format(input_str)
def CreateLaterFunction(fcn, *args, **kwargs):
laterFcn = Later(fcn.__name__)
laterFcn.fcn = fcn
laterFcn.args = args
laterFcn.kwargs = kwargs
def apply_function(self, df):
self.origDf = df
args = [a.applyFcns(self.origDf) if type(a) == Later else a
for a in self.args]
kwargs = {k: v.applyFcns(self.origDf) if type(v) == Later else v
for k, v in six.iteritems(self.kwargs)}
return self.fcn(*args, **kwargs)
laterFcn.todo = [lambda df: apply_function(laterFcn, df)]
laterFcn._str = '{0}'.format(fcn.__name__)
laterFcn._UpdateStrCallArgs(args, kwargs)
return laterFcn
def DelayFunction(fcn):
def DelayedFcnCall(*args, **kwargs):
# Check to see if any args or kw are Later. If not, return normal fcn.
if (len([a for a in args if isinstance(a, Later)]) == 0 and
len([v for k, v in kwargs.items() if isinstance(v, Later)]) == 0):
return fcn(*args, **kwargs)
else:
return CreateLaterFunction(fcn, *args, **kwargs)
return DelayedFcnCall
[docs]class DplyFrame(DataFrame):
"""A subclass of the pandas DataFrame with methods for function piping.
This class implements two main features on top of the pandas DataFrame. First,
dplyr-style groups. In contrast to SQL-style or pandas style groups, rows are
not collapsed and replaced with a function value.
Second, >> is overloaded on the DataFrame so that functions on the right-hand
side of this equation are called on the object. For example,
>>> df >> select(X.carat)
will call a function (created from the "select" call) on df.
Currently, these inputs need to be one of the following:
* A "Later"
* The "ungroup" function call
* A function that returns a pandas DataFrame or DplyFrame.
"""
_metadata = ["_grouped_on", "_grouped_self"]
def __init__(self, *args, **kwargs):
super(DplyFrame, self).__init__(*args, **kwargs)
self._grouped_on = None
self._current_group = None
self._grouped_self = None
if len(args) == 1 and isinstance(args[0], DplyFrame):
self._copy_attrs(args[0])
def _copy_attrs(self, df):
for attr in self._metadata:
self.__dict__[attr] = getattr(df, attr, None)
@property
def _constructor(self):
return DplyFrame
def group_self(self, names):
self._grouped_on = names
self._grouped_self = self.groupby(names)
def ungroup(self):
self._grouped_on = None
self._grouped_self = None
def apply_on_groups(self, delayedFcn):
outDf = self._grouped_self.apply(delayedFcn)
# Remove multi-index created from grouping and applying
for grouped_name in outDf.index.names[:-1]:
if grouped_name in outDf:
outDf.reset_index(level=0, drop=True, inplace=True)
else:
outDf.reset_index(level=0, inplace=True)
# Drop all 0 index, created by summarize
if (outDf.index == 0).all():
outDf.reset_index(drop=True, inplace=True)
outDf.group_self(self._grouped_on)
return outDf
def __rshift__(self, delayedFcn):
if type(delayedFcn) == Later:
return delayedFcn.applyFcns(self)
if delayedFcn == UngroupDF:
otherDf = DplyFrame(self.copy(deep=True))
return delayedFcn(otherDf)
if self._grouped_self:
outDf = self.apply_on_groups(delayedFcn)
return outDf
else:
otherDf = DplyFrame(self.copy(deep=True))
return delayedFcn(otherDf)
def ApplyToDataframe(fcn):
@wraps(fcn)
def DplyrFcn(*args, **kwargs):
data_arg = None
if len(args) > 0 and isinstance(args[0], pandas.DataFrame):
# data_arg = args[0].copy(deep=True)
data_arg = args[0]
args = args[1:]
fcn_to_apply = fcn(*args, **kwargs)
if data_arg is None:
return fcn_to_apply
else:
return data_arg >> fcn_to_apply
return DplyrFcn
@ApplyToDataframe
[docs]def sift(*args):
"""Filters rows of the data that meet input criteria.
Giving multiple arguments to sift is equivalent to a logical "and".
>>> df >> sift(X.carat > 4, X.cut == "Premium")
# Out:
# carat cut color clarity depth table price x ...
# 4.01 Premium I I1 61.0 61 15223 10.14
# 4.01 Premium J I1 62.5 62 15223 10.02
As in pandas, use bitwise logical operators like ``|``, ``&``:
>>> df >> sift((X.carat > 4) | (X.cut == "Ideal")) >> head(2)
# Out: carat cut color clarity depth ...
# 0.23 Ideal E SI2 61.5
# 0.23 Ideal J VS1 62.8
"""
def f(df):
# TODO: This function is a candidate for improvement!
final_filter = pandas.Series([True for t in range(len(df))])
final_filter.index = df.index
for arg in args:
stmt = arg.applyFcns(df)
final_filter = final_filter & stmt
if final_filter.dtype != bool:
raise Exception("Inputs to filter must be boolean")
return df[final_filter]
return f
def dfilter(*args, **kwargs):
warnings.warn("'dfilter' is deprecated. Please use 'sift' instead.",
DeprecationWarning)
return sift(*args, **kwargs)
@ApplyToDataframe
[docs]def select(*args):
"""Select specific columns from DataFrame.
Output will be DplyFrame type. Order of columns will be the same as input into
select.
>>> diamonds >> select(X.color, X.carat) >> head(3)
Out:
color carat
0 E 0.23
1 E 0.21
2 E 0.23
"""
names = [column.name for column in args]
return lambda df: df[[column.name for column in args]]
@ApplyToDataframe
[docs]def mutate(*args, **kwargs):
"""Adds a column to the DataFrame.
This can use existing columns of the DataFrame as input.
>>> (diamonds >>
mutate(carat_bin=X.carat.round()) >>
group_by(X.cut, X.carat_bin) >>
summarize(avg_price=X.price.mean()))
Out:
avg_price carat_bin cut
0 863.908535 0 Ideal
1 4213.864948 1 Ideal
2 12838.984078 2 Ideal
...
27 13466.823529 3 Fair
28 15842.666667 4 Fair
29 18018.000000 5 Fair
"""
def addColumns(df):
for arg in args:
if isinstance(arg, Later):
df[str(arg)] = arg.applyFcns(df)
else:
df[str(arg)] = arg
ordered = kwargs.pop("__order", None)
if ordered is not None:
s1 = set(ordered)
s2 = set(kwargs)
missing_order = s1 - s2
if (len(missing_order) > 0):
raise ValueError(", ".join(missing_order) +
" in __order not found in keyword arguments")
missing_kwargs = s2 - s1
if (len(missing_kwargs) > 0):
raise ValueError(", ".join(missing_kwargs) + " not found in __order")
kv = [(key, kwargs[key]) for key in ordered]
else:
kv = sorted(kwargs.items(), key = lambda e: e[0])
for key, val in kv:
if type(val) == Later:
df[key] = val.applyFcns(df)
else:
df[key] = val
return df
return addColumns
@ApplyToDataframe
def group_by(*args, **kwargs):
def GroupDF(df):
if args and max([len(arg.todo) for arg in args]) > 1:
raise ValueError(
"Expressions not allowed as positional args. Use keyword args.")
group_columns = [arg.name for arg in args]
if kwargs:
group_columns.extend(kwargs.keys())
df = df >> mutate(**kwargs)
df.group_self(group_columns)
return df
return GroupDF
@ApplyToDataframe
def summarize(**kwargs):
def CreateSummarizedDf(df):
input_dict = {k: val.applyFcns(df) for k, val in six.iteritems(kwargs)}
if len(input_dict) == 0:
return DplyFrame({}, index=index)
if hasattr(df, "_current_group") and df._current_group:
input_dict.update(df._current_group)
index = [0]
return DplyFrame(input_dict, index=index)
return CreateSummarizedDf
def UngroupDF(df):
# df._grouped_on = None
# df._group_dict = None
df.ungroup()
return df
@ApplyToDataframe
def ungroup():
return UngroupDF
@ApplyToDataframe
[docs]def arrange(*args):
"""Sort DataFrame by the input column arguments.
>>> diamonds >> sample_n(5) >> arrange(X.price) >> select(X.depth, X.price)
Out:
depth price
28547 61.0 675
35132 59.1 889
42526 61.3 1323
3468 61.6 3392
23829 62.0 11903
"""
names = [column.name for column in args]
def f(df):
sortby_df = df >> mutate(*args)
index = sortby_df.sort_values([str(arg) for arg in args]).index
return df.loc[index]
return f
@ApplyToDataframe
[docs]def head(*args, **kwargs):
"""Returns first n rows"""
return lambda df: df.head(*args, **kwargs)
@ApplyToDataframe
[docs]def sample_n(n):
"""Randomly sample n rows from the DataFrame"""
return lambda df: DplyFrame(df.sample(n))
@ApplyToDataframe
[docs]def sample_frac(frac):
"""Randomly sample `frac` fraction of the DataFrame"""
return lambda df: DplyFrame(df.sample(frac=frac))
@ApplyToDataframe
[docs]def sample(*args, **kwargs):
"""Convenience method that calls into pandas DataFrame's sample method"""
return lambda df: df.sample(*args, **kwargs)
@ApplyToDataframe
def nrow():
return lambda df: len(df)
@DelayFunction
def PairwiseGreater(series1, series2):
index = series1.index
newSeries = pandas.Series([max(s1, s2) for s1, s2 in zip(series1, series2)])
newSeries.index = index
return newSeries
@DelayFunction
def if_else(bool_series, series_true, series_false):
index = bool_series.index
newSeries = pandas.Series([s1 if b else s2 for b, s1, s2
in zip(bool_series, series_true, series_false)])
newSeries.index = index
return newSeries