# -*- coding: utf-8 -*-
# --------------------------------------------------------------------
# The MIT License (MIT)
#
# Copyright (c) 2015 Jonathan Labéjof <jonathan.labejof@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# --------------------------------------------------------------------
__all__ = [
'get_proxy', 'proxify_routine', 'proxify_elt', 'is_proxy', 'proxified_elt'
]
"""Module in charge of creating proxies like the design pattern ``proxy``.
A proxy is based on a callable element. It respects its signature but not the
implementation.
"""
from time import time
from functools import wraps
from types import MethodType, FunctionType
from opcode import opmap
from inspect import (
getmembers, isroutine, ismethod, getargspec, getfile, isbuiltin, isclass
)
from b3j0f.utils.version import PY2, PY3, basestring
from b3j0f.utils.path import lookup
# consts for interception loading
LOAD_GLOBAL = opmap['LOAD_GLOBAL']
LOAD_CONST = opmap['LOAD_CONST']
#: list of attributes to set after proxifying a function
WRAPPER_ASSIGNMENTS = ['__doc__', '__name__']
#: list of attributes to update after proxifying a function
WRAPPER_UPDATES = ['__dict__']
#: lambda function name
__LAMBDA_NAME__ = (lambda: None).__name__
#: proxy class name
__PROXY_CLASS__ = 'Proxy'
#: attribute name for proxified element
__PROXIFIED__ = '__proxified__'
[docs]def proxify_elt(elt, bases=None, _dict=None):
"""Proxify a class with input elt.
:param elt: elt to proxify.
:param bases: elt class base classes.
:param _dict: elt class content.
:return: proxified element.
:raises: TypeError if elt does not implement all routines of bases and
_dict.
"""
# ensure _dict is a dictionary
proxy_dict = {} if _dict is None else _dict.copy()
# set of proxified attribute names which are proxified during bases parsing
# and avoid to proxify them twice during _dict parsing
proxified_attribute_names = set()
# ensure bases is a tuple of types
if bases is None:
bases = ()
elif isinstance(bases, basestring):
bases = (lookup(bases),)
elif isclass(bases):
bases = (bases,)
else: # fill proxy_dict with routines of bases
bases = tuple(bases)
for base in bases:
for name, member in getmembers(base, lambda m: isroutine(m)):
if not hasattr(elt, name):
raise TypeError(
"Wrong elt {0}. Must implement {1} ({2}) of {3}".
format(elt, name, member, base)
)
if name in proxy_dict:
proxy_routine = proxy_dict[name]
routine_proxy = proxify_routine(proxy_routine)
proxy_dict[name] = routine_proxy
proxified_attribute_names.add(name)
else:
proxy_dict[name] = member
# proxify proxy_dict
for name in proxy_dict:
value = proxy_dict[name]
if not hasattr(elt, name):
raise TypeError(
"Wrong elt {0}. Must implement {1} ({2})".format(
elt, name, value
)
)
if isroutine(value):
# if member has not already been proxified
if name not in proxified_attribute_names:
# proxify it
value = proxify_routine(value)
proxy_dict[name] = value
# generate a new proxy class
cls = type('Proxy', bases, proxy_dict)
# delete initialization methods
try:
delattr(cls, '__new__')
except AttributeError:
pass
try:
delattr(cls, '__init__')
except AttributeError:
pass
# instantiate proxy cls
result = cls()
# bind elt to proxy
setattr(result, __PROXIFIED__, elt)
return result
[docs]def proxify_routine(routine, impl=None):
"""Proxify a routine with input impl.
:param routine: routine to proxify.
:param impl: new impl to use. If None, use routine.
"""
# init impl
impl = routine if impl is None else impl
try:
__file__ = getfile(routine)
except TypeError:
__file__ = '<string>'
isMethod = ismethod(routine)
if isMethod:
function = routine.__func__
else:
function = routine
# flag which indicates that the function is not a pure python function
# and has to be wrapped
wrap_function = not hasattr(function, '__code__')
try:
# get params from routine
args, varargs, kwargs, _ = getargspec(function)
except TypeError:
# in case of error, wrap the function
wrap_function = True
if wrap_function:
# if function is not pure python, create a generic one
# with assignments
assigned = []
for wrapper_assignment in WRAPPER_ASSIGNMENTS:
if hasattr(function, wrapper_assignment):
assigned.append(wrapper_assignment)
# and updates
updated = []
for wrapper_update in WRAPPER_UPDATES:
if hasattr(function, wrapper_update):
updated.append(wrapper_update)
@wraps(function, assigned=assigned, updated=updated)
def function(*args, **kwargs):
pass
# get params from routine wrapper
args, varargs, kwargs, _ = getargspec(function)
# get params from routine
name = function.__name__
# flag for lambda function
isLambda = __LAMBDA_NAME__ == function.__name__
if isLambda:
name = '_{0}'.format(int(time()))
# get join method for reducing concatenation time execution
join = "".join
# default indentation
indent = ' '
if isLambda:
newcodestr = "{0} = lambda ".format(name)
else:
newcodestr = "def {0}(".format(name)
if args:
newcodestr = join((newcodestr, "{0}".format(args[0])))
for arg in args[1:]:
newcodestr = join((newcodestr, ", {0}".format(arg)))
if varargs is not None:
if args:
newcodestr = join((newcodestr, ", "))
newcodestr = join((newcodestr, "*{0}".format(varargs)))
if kwargs is not None:
if args or varargs is not None:
newcodestr = join((newcodestr, ", "))
newcodestr = join((newcodestr, "**{0}".format(kwargs)))
# insert impl call
if isLambda:
newcodestr = join((newcodestr, ": impl("))
else:
newcodestr = join((
newcodestr,
"):\n{0}return impl(".format(indent))
)
if args:
newcodestr = join((newcodestr, "{0}".format(args[0])))
for arg in args[1:]:
newcodestr = join((newcodestr, ", {0}".format(arg)))
if varargs is not None:
if args:
newcodestr = join((newcodestr, ", "))
newcodestr = join((newcodestr, "*{0}".format(varargs)))
if kwargs is not None:
if args or varargs is not None:
newcodestr = join((newcodestr, ", "))
newcodestr = join((newcodestr, "**{0}".format(kwargs)))
newcodestr = join((newcodestr, ")\n"))
# compile newcodestr
code = compile(newcodestr, __file__, 'single')
# define the code with the new function
_globals = {}
exec(code, _globals)
# get new code
newco = _globals[name].__code__
# get new consts list
newconsts = list(newco.co_consts)
if PY3:
newcode = list(newco.co_code)
else:
newcode = map(ord, newco.co_code)
consts_values = {'impl': impl}
# change LOAD_GLOBAL to LOAD_CONST
index = 0
newcodelen = len(newcode)
while index < newcodelen:
if newcode[index] == LOAD_GLOBAL:
oparg = newcode[index + 1] + (newcode[index + 2] << 8)
name = newco.co_names[oparg]
if name in consts_values:
const_value = consts_values[name]
if const_value in newconsts:
pos = newconsts.index(const_value)
else:
pos = len(newconsts)
newconsts.append(consts_values[name])
newcode[index] = LOAD_CONST
newcode[index + 1] = pos & 0xFF
newcode[index + 2] = pos >> 8
index += 1
# get code string
codestr = bytes(newcode) if PY3 else join(map(chr, newcode))
# get vargs
vargs = [
newco.co_argcount, newco.co_nlocals, newco.co_stacksize,
newco.co_flags, codestr, tuple(newconsts), newco.co_names,
newco.co_varnames, newco.co_filename, newco.co_name,
newco.co_firstlineno, newco.co_lnotab,
function.__code__.co_freevars,
newco.co_cellvars
]
if PY3:
vargs.insert(1, newco.co_kwonlyargcount)
# instanciate a new code object
codeobj = type(newco)(*vargs)
# instanciate a new function
if function is None or isbuiltin(function):
result = FunctionType(codeobj, {})
else:
result = type(function)(
codeobj, function.__globals__, function.__name__,
function.__defaults__, function.__closure__
)
# set wrapping assignments
for wrapper_assignment in WRAPPER_ASSIGNMENTS:
try:
value = getattr(function, wrapper_assignment)
except AttributeError:
pass
else:
setattr(result, wrapper_assignment, value)
# set proxy module
result.__module__ = proxify_routine.__module__
# update wrapping updating
for wrapper_update in WRAPPER_UPDATES:
try:
value = getattr(function, wrapper_update)
except AttributeError:
pass
else:
getattr(result, wrapper_update).update(value)
# set proxyfied element on proxy
setattr(result, __PROXIFIED__, routine)
if isMethod: # create a new method
args = [result, routine.__self__]
if PY2:
args.append(routine.im_class)
result = MethodType(*args)
return result
[docs]def get_proxy(elt, bases=None, _dict=None):
"""Get proxy from an elt.
:param elt: elt to proxify.
:type elt: object or function/method
:param bases: base types to enrich in the result cls if not None.
:param _dict: class members to proxify if not None.
"""
if isroutine(elt):
result = proxify_routine(elt)
else: # in case of object, result is a Proxy
result = proxify_elt(elt, bases=bases, _dict=_dict)
return result
[docs]def proxified_elt(proxy):
"""Get proxified element.
:param proxy: proxy element from where get proxified element.
:return: proxified element. None if proxy is not proxified.
"""
if ismethod(proxy):
proxy = proxy.__func__
result = getattr(proxy, __PROXIFIED__, None)
return result
[docs]def is_proxy(elt):
"""Return True if elt is a proxy.
:param elt: elt to check such as a proxy.
:return: True iif elt is a proxy.
:rtype: bool
"""
if ismethod(elt):
elt = elt.__func__
result = hasattr(elt, __PROXIFIED__)
return result