Source code for fito.operations.decorate

from __future__ import print_function

import inspect
from functools import wraps

from fito.operations.operation import Operation
from fito.specs.fields import UnboundField, PrimitiveField, BaseSpecField, KwargsField, SpecField
from fito.specs.base import get_import_path, Spec

try:
    import cPickle
except ImportError:
    import pickle as cPickle


[docs]class GenericDecorator(Spec): """ Abstracts all the boilerplate required to build a decorator that works on functions, instance methods and class methods :param method_type: if is None, the decorated function is assumed to be a function, otherwise it is assumed to be a method. If method_type == 'instance' the function is assumed to be an instance method otherwise a classmethod """ method_type = PrimitiveField(0, default=None) def __get__(self, instance, owner): if (instance is None and self.method_type == 'instance') or \ (owner is None and self.method_type == 'class'): return self first_arg = instance if self.method_type == 'instance' else owner @wraps(self.func) def new_f(*args, **kwargs): return self.func(first_arg, *args, **kwargs) cls = type(instance) if instance is not None else owner assert cls is not None return self.create_decorated(self.func, new_f, inspect.getargspec(self.func), first_arg=first_arg) def __call__(self, func): if self.method_type: self.func = func return self else: return self.create_decorated(func, func) def create_decorated(self, to_wrap, func_to_execute, f_spec=None, first_arg=None): """ Abstract method that should be implemented in order to build a decorator The difference between `to_wrap` and `func_to_execute` is the fact that in the case of instance methods and class methods, `func_to_execute` has the first argument already binded. If `to_wrap` is just a function, then `to_wrap == func_to_execute` :param to_wrap: Original wrapped function :param func_to_execute: You should execute this function :param f_spec: The argspec of the function to be decorated, if None, it should be computed from to_wrap (TODO: remove this argument) :param first_arg: `self` if it's an instance method, `cls` if it's a classmethod, None otherwise """ raise NotImplementedError()
[docs]class as_operation(GenericDecorator): """ Creates an operation from a callable :param out_type: Base class of the operation to be built. Defaults to `Operation` :param out_name: Name of the class to be built, deafults to the decorated function name. """ out_type = PrimitiveField(default=Operation) out_name = PrimitiveField(default=None) cache_on = SpecField(default=None) args_specifications = KwargsField() def create_decorated(self, to_wrap, func_to_execute, f_spec=None, first_arg=None): f_spec = f_spec or inspect.getargspec(to_wrap) OperationClass = operation_from_func( to_wrap=to_wrap, func_to_execute=func_to_execute, out_type=self.out_type, out_name=self.out_name, args_specifications=self.args_specifications, f_spec=f_spec, method_type=self.method_type, first_arg=first_arg, cache_on=self.cache_on ) return OperationClass @staticmethod def get_current_operation(): """ Should be called inside a function decorated with as_operation """ # f_back brings you to the calling function, f_back brings you to the apply method of the # dynamically created operation frame = inspect.currentframe() try: res = frame.f_back.f_back.f_locals['self'] if not isinstance(res, Operation): raise RuntimeError( "This function should be called inside an operation created with the as_operation decorator" ) return res finally: # Avoid reference cycle del frame
[docs]def get_default_values(f_spec): """ Returns a mapping from a function spec (output of inspect.getargspec) """ if f_spec.defaults is None: default_values = {} else: args_with_defaults = f_spec.args[-len(f_spec.defaults):] default_values = dict(zip(args_with_defaults, f_spec.defaults)) return default_values
[docs]def operation_from_func(to_wrap, func_to_execute, out_type, out_name, args_specifications, f_spec=None, method_type=None, first_arg=None, cache_on=None): """ In the case of methods, to_wrap is not the same to func_to_execute :param to_wrap: See `GenericDecorator.create_decorated` for an explanation :param func_to_execute: See `GenericDecorator.create_decorated` for an explanation :param cache_on: A data store onto which the operation should be cached :return: """ f_spec = f_spec or inspect.getargspec(to_wrap) out_name = out_name or to_wrap.__name__ # TODO: find the first_arg where the method was defined if method_type == 'instance' and not isinstance(first_arg, Spec): # Only when it's an instance of Spec we can identify out_name = '{}@{}'.format(out_name, id(first_arg)) default_values = get_default_values(f_spec) attrs = {} binded_pos = 0 unbinded_pos = 0 for arg in f_spec.args: if method_type == 'instance' and arg == 'self': continue if method_type == 'class' and arg == 'cls': continue if arg in args_specifications: spec = args_specifications[arg] if inspect.isclass(spec) and issubclass(spec, Spec): spec = SpecField(base_type=spec) # It can be either a class, or the instance itself if inspect.isclass(spec) or inspect.isfunction(spec): spec = spec() if isinstance(spec, UnboundField): spec.pos = unbinded_pos unbinded_pos += 1 else: spec.pos = binded_pos binded_pos += 1 else: spec = PrimitiveField(binded_pos) binded_pos += 1 if arg in default_values: spec.default = default_values[arg] attrs[arg] = spec def get_this_args(self, runner=None): this_args = {} for k, v in attrs.iteritems(): value = getattr(self, k) if isinstance(v, BaseSpecField) and runner is not None and isinstance(value, Operation): value = runner.execute(value) this_args[k] = value return this_args def to_dict(self, include_all=False): res = super(out_type, self).to_dict(include_all=include_all) if method_type is not None: res['type'] = get_import_path(first_arg, func_to_execute.__name__) else: res['type'] = get_import_path(func_to_execute) return res def apply(self, runner): this_args = self.get_this_args(runner) return func_to_execute(**this_args) cls_attrs = attrs.copy() cls_attrs['func'] = staticmethod(func_to_execute) cls_attrs['apply'] = apply cls_attrs['get_this_args'] = get_this_args cls_attrs['to_dict'] = to_dict cls = Operation.type2spec_class(out_name) if cls is None: # if the class does not exist, create it cls = type(out_name, (out_type,), cls_attrs) else: # otherwise update it for k, v in cls_attrs.iteritems(): setattr(cls, k, v) if cache_on is not None: cls.default_data_store = cache_on else: cls.default_data_store = None cls.__module__ = to_wrap.__module__ return cls