#!/usr/bin/env python2.5
#############################################################################
##
## file : objects.py
##
## description : see below
##
## project : Tango Control System
##
## $Author: srubio@cells.es, tcoutinho@cells.es, homs@esrf.fr $
##
##
## $Revision: 2008 $
##
## copyleft : ALBA Synchrotron Controls Section, CELLS
## Bellaterra
## Spain
##
#############################################################################
##
## This file is part of Tango Control System
##
## Tango Control System is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as published
## by the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## Tango Control System is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
###########################################################################
"""
fandango.objects contains method for loading python modules and objects "on the run",
as well as several advanced types used within the fandango library
It includes 2 wonderful classes: Object (by Alejandro Homs) and Singleton (by Marc Santiago)
Other classes are borrowed from taurus.core.utils (by Tiago Coutinho)
"""
import __builtin__
from __builtin__ import object
from functional import *
from operator import isCallable
import Queue
import functools
try: from collections import namedtuple #Only available since python 2.6
except: pass
## Inspection methods
[docs]def dirModule(module):
return [a for a,v in module.__dict__.items() if getattr(v,'__module__','') == module.__name__]
[docs]def findModule(module):
from imp import find_module
return find_module(module)[1]
[docs]def loadModule(source,modulename=None):
#Loads a python module either from source file or from module
from imp import load_source,find_module,load_module
if modulename or '/' in source or '.' in source:
return load_source(modulename or replaceCl('[-\.]','_',source.split('/')[-1].split('.py')[0]),source)
else:
return load_module(source,*find_module(source))
[docs]def dirClasses(module,owned=False):
v = [a for a,v in module.__dict__.items() if isinstance(v,type)]
if owned: return [a for a in dirModule(module) if a in v]
else: return v
[docs]def copy(obj):
"""
This method will return a copy for a python primitive object.
It will not work for class objects unless they implement the __init__(other) constructor
"""
if hasattr(obj,'copy'):
o = obj.copy()
else:
try:
o = type(obj)(other=obj)
except:
o = type(obj)(obj)
return o
[docs]def obj2dict(obj,type_check=True,class_check=False,fltr=None):
""" Converts a python object to a dictionary with all its members as python primitives
:param fltr: a callable(name):bool method"""
dct = {}
try:
for name in dir(obj):
if fltr and not fltr(name):
continue
try:
attr = getattr(obj,name)
if hasattr(attr,'__call__'): continue
if name == 'inited_class_list': continue
if name.startswith('__'): continue
if type_check:
try:
if type(attr).__name__ not in dir(__builtin__):
if isinstance(attr,dict):
attr = dict((k,v) for k,v in attr.items())
else:
attr = str(attr)
except:
continue
dct[name] = attr
except Exception,e:
print(e)
if class_check:
klass = obj.__class__
if '__class__' not in dct: dct['__class__'] = klass.__name__
if '__bases__' not in dct: dct['__bases__'] = [b.__name__ for b in klass.__bases__]
if '__base__' not in dct: dct['__base__'] = klass.__base__.__name__
except Exception,e:
print(e)
return(dct)
## Useful class objects
[docs]class Struct(object):
"""
Metamorphic class to pass/retrieve data objects as object or dictionary
s = Struct(name='obj1',value=3.0)
s.setCastMethod(lambda k,v: str2type)
s.cast('3.0') : 3.0
s.keys() : ['name', 'value']
s.to_str() : "fandango.Struct({'name': obj1,'value': 3.0,})"
s.dict() : {'name': 'obj1', 'value': 3.0}
"""
def __init__(self,*args,**kwargs):
self.load(*args,**kwargs)
[docs] def load(self,*args,**kwargs):
dct = args[0] if len(args)==1 else (args or kwargs)
if isSequence(dct) and not isDictionary(dct): dct = dict.fromkeys(dct) #isDictionary also matches items lists
[setattr(self,k,v) for k,v in (dct.items() if hasattr(dct,'items') else dct)]
#Overriding dictionary methods
[docs] def update(self,*args,**kwargs): return self.load(*args,**kwargs)
[docs] def keys(self): return self.__dict__.keys()
[docs] def values(self): return self.__dict__.values()
[docs] def items(self): return self.__dict__.items()
[docs] def dict(self): return self.__dict__
[docs] def get(self,k,default=None):
try: #Some keys may raise exception
return getattr(self,k,default)
except:
return default
[docs] def get_key(self,value):
""" Reverse lookup """
for k,v in self.items():
if v == value:
return k
raise Exception('NotFound!')
[docs] def set(self,k,v): return setattr(self,k,v)
[docs] def setdefault(self,v): self.dict().setdefault(v)
[docs] def pop(self,k): return self.__dict__.pop(k)
[docs] def has_key(self,k): return self.__dict__.has_key(k)
def __getitem__(self,k): return getattr(self,k)
def __setitem__(self,k,v): return setattr(self,k,v)
def __contains__(self,k): return hasattr(self,k)
def __call__(self,*args,**kwargs):
"""getter with one string, setter if 2 are passed"""
assert len(args) in (1,2)
if len(args)==2: setattr(self,args[0],args[1])
elif len(args)==1 and isString(args[0]): return getattr(self,args[0])
else: self.load(*args,**kwargs)
def __repr__(self):
return 'fandango.Struct({\n'+'\n'.join("\t'%s': %s,"%(k,v) for k,v in self.__dict__.items())+'\n\t})'
def __str__(self):
return self.__repr__().replace('\n','').replace('\t','')
[docs] def to_str(self,order=None,sep=','):
""" This method provides a formatable string for sorting"""
return self.__str__() if order is None else (sep.join('%s'%self[k] for k in order))
[docs] def default_cast(self,key=None,value=None):
"""
This method checks if key is already defined.
If it is, it will return value as an evaluable string.
If it is not, then it will do same action on the passed value.
"""
if key not in self.keys() and not value:
key,value = None,key #defaults to single argument mode
value = notNone(value,key and self.get(key))
if not isString(value):
return value
else:
return str2type(value)
[docs] def cast(self,key=None,value=None,method=None):
"""
The cast() method is used to convert an struct to a pickable/json object.
Use set_cast_method(f) to override this call.
The cast method must accept both key and value keyword arguments.
"""
return (method or self.default_cast)(key,value)
[docs] def cast_items(self,items=[],update=True):
"""
The cast() method is used to convert an struct to a pickable/json object.
"""
items = items or self.items()
items = [(k,self.cast(value=v)) for k,v in self.items()]
if update:
[self.set(k,v) for k,v in items]
return items
[docs]def _fget(self,var):
return getattr(self,var)
[docs]def _fset(self,value,var):
setattr(self,var,value)
[docs]def _fdel(self,var):
delattr(self,var)
[docs]def make_property(var,fget=_fget,fset=_fset,fdel=_fdel):
""" This Class is in Beta, not fully implemented yet"""
return property(partial(fget,var=var),partial(fset,var=var),partial(fdel,var=var),doc='%s property'%var)
#class NamedProperty(property):
#"""
#"""
#def __init__(self,name,fget=None,fset=None,fdel=None):
#self.name = name
#mname = '%s%s'%(name[0].upper(),name[1:])
#lname = '%s%s'%(name[0].lower(),name[1:])
#property.__init__(fget,fset,fdel,doc='NamedProperty(%s)'%self._name)
#def get_attribute_name(self):
#return '_%s'self.name
[docs]def NamedProperty(name,fget=None,fset=None,fdel=None):#,doc=None):
"""
This Class is in Beta, not fully implemented yet
It makes easier to declare name independent property's (descriptors) by using template methods like:
def fget(self,var): # var is the identifier of the variable
return getattr(self,var)
def fset(self,value,var): # var is the identifier of the variable
setattr(self,var,value)
def fdel(self,var): # var is the identifier of the variable
delattr(self,var)
MyObject.X = Property(fget,fset,fdel,'X')
"""
return property(partial(fget,var=name) if fget else None,partial(fset,var=name) if fset else None,partial(fdel,var=name) if fdel else None,doc=name)
import threading
__lock__ = threading.RLock()
[docs]def locked(f,*args,**kwargs):
"""
decorator for secure-locked functions
A key-argument _lock can be used to use a custom Lock object
"""
_lock = kwargs.pop('_lock',__lock__)
try:
_lock.acquire()
return f(*args,**kwargs)
except Exception,e:
print 'Exception in%s(*%s,**%s): %s' % (f.__name__,args,kwargs,e)
finally:
_lock.release()
[docs]def self_locked(func,reentrant=True):
''' Decorator to make thread-safe class members
@deprecated
@note see in tau.core.utils.containers
Decorator to create thread-safe objects.
reentrant: CRITICAL:
With Lock() this decorator should not be used to decorate nested functions; it will cause Deadlock!
With RLock this problem is avoided ... but you should rely more on python threading.
'''
@functools.wraps(func)
def lock_fun(self,*args,**kwargs):
#self,args = args[0],args[1:]
if not hasattr(self,'lock'):
setattr(self,'lock',threading.RLock() if reentrant else threading.Lock())
if not hasattr(self,'trace'):
setattr(self,'trace',False)
self.lock.acquire()
try:
#if self.trace: print "locked: %s"%self.lock
result = func(self,*args,**kwargs)
finally:
self.lock.release()
#if self.trace: print "released: %s"%self.lock
return result
#lock_fun.__name__ = func.__name__
#lock_fun.__doc__ = func.__doc__
return lock_fun
###############################################################################
[docs]def NewClass(classname,classparent=None,classdict=None):
"""
Creates a new class on demand:
ReleaseNumber = NewClass('ReleaseNumber',tuple,{'__repr__':(lambda self:'.'.join(('%02d'%i for i in self)))})
"""
if classparent and not isSequence(classparent): classparent = (classparent,)
return type(classname,classparent or (object,),classdict or {})
###############################################################################
[docs]class Object(object):
"""
This class solves some problems when an object inherits from multiple classes
and some of them inherit from the same 'grandparent' class
"""
def __init__(self):
""" default initializer
@todo be more clever!
"""
pass
#self.name = None
## @var name
# Var does nothing
# @todo be more clever!
pass
[docs] def call__init__(self, klass, *args, **kw):
if 'inited_class_list' not in self.__dict__:
self.inited_class_list = []
if klass not in self.inited_class_list:
self.inited_class_list.append(klass)
#print('#'*80)
#print('%s(%s).call__init__(%s,%s)' % (
#type(self).__name__,klass.__name__,args,kw))
#print('#'*80)
klass.__init__(self, *args, **kw)
[docs] def call_all__init__(self, klass, *_args, **_kw):
''' Call __init__ recursively, for multiple dynamic inheritance.
@author srubio@cells.es
This method should be called only if all arguments are keywords!!!
Multiple __init__ calls with unnamed arguments is hard to manage:
All the _args values will be assigned to non-keyword args
e.g:
from objects import Object
class A(Object):
def __init__(self,a=2):
print 'A.__init__',a
class B(A):
def __init__(self,b):
print 'B.__init__',b
class C(B,A):
def __init__(self,c):
print 'C.__init__',c
class D(C,B):
def __init__(self,d=1,*args,**kwargs):
self.call_all__init__(D,*args,**kwargs)
print 'D.__init__',d
D(a=1,b=2,c=3,d=4)
'''
#if _args: raise Exception,'__init_all_Object_withUnnamedArgumentsException'
from inspect import getargspec
#print '%s.call_all__init__(%s,%s)' % (klass.__name__,_args,_kw)
for base in klass.__bases__:
if 'call__init__' in dir(base) and ('inited_class_list' not in self.__dict__ or base not in self.inited_class_list):
#print '\t%s.base is %s' % (klass.__name__,base.__name__)
nkw,i = {},0
try:
args,largs,kargs,vals = getargspec(base.__init__)
if kargs: nkw = dict(_kw)
for arg in args:
if arg == 'self': continue
if arg in _kw:
nkw[arg] = _kw[arg]
elif i<len(_args):
nkw[arg], i = _args[i], i+1
self.call_all__init__(base,*_args,**_kw)
self.call__init__(base,**nkw)
except Exception,e:
print 'Unable to execute %s.__init__!: %s' % (base.__name__,str(e))
return
[docs] def getAttrDict(self):
return obj2dict(self)
[docs] def updateAttrDict(self, other):
attr = other.getAttrDict()
self.__dict__.update(attr)
###############################################################################
[docs]class Singleton(object):
"""This class allows Singleton objects overriding __new__ and renaming __init__ to init_single
The __new__ method is overriden to force Singleton behaviour, the Singleton is created for the lowest subClass.
@warning although __new__ is overriden __init__ is still being called for each instance=Singleton(), this is way we replace it by __dub_init
"""
## Singleton object
__instance = None # the one, true Singleton, private members cannot be read directly
__dumb_init = (lambda self,*p,**k:None)
def __new__(cls, *p, **k):
if cls != type(cls.__instance):
__instance = object.__new__(cls)
#srubio: added init_single check to prevent redundant __init__ calls
if hasattr(cls,'__init__') and cls.__init__ != cls.__dumb_init:
setattr(cls,'init_single',cls.__init__)
setattr(cls,'__init__',cls.__dumb_init) #Needed to avoid parent __init__ methods to be called
if hasattr(cls,'init_single'):
cls.init_single(__instance,*p,**k) #If no __init__ or init_single has been defined it may trigger an object.__init__ warning!
cls.__instance = __instance #Done at the end to prevent failed __init__ to create singletons
return cls.__instance
@classmethod
[docs] def get_singleton(cls,*p,**k):
return cls.__instance or cls(*p,**k)
@classmethod
[docs] def clear_singleton(cls):
cls.__instance = None
[docs]class SingletonMap(object):
"""This class allows distinct Singleton objects for each args combination.
The __new__ method is overriden to force Singleton behaviour, the Singleton is created for the lowest subClass.
@warning although __new__ is overriden __init__ is still being called for each instance=Singleton(), this is way we replace it by __dub_init
"""
## Singleton object
__instances = {} # the one, true Singleton, private members cannot be read directly
__dumb_init = (lambda self,*p,**k:None)
def __new__(cls, *p, **k):
key = cls.parse_instance_key(*p,**k)
if cls != type(cls.__instances.get(key)):
__instance = object.__new__(cls)
__instance.__instance_key = key
#srubio: added init_single check to prevent redundant __init__ calls
if hasattr(cls,'__init__') and cls.__init__ != cls.__dumb_init:
setattr(cls,'init_single',cls.__init__)
setattr(cls,'__init__',cls.__dumb_init) #Needed to avoid parent __init__ methods to be called
if hasattr(cls,'init_single'):
cls.init_single(__instance,*p,**k) #If no __init__ or init_single has been defined it may trigger an object.__init__ warning!
cls.__instances[key] = __instance
#print '#'*80+'\n'+'%s.__instances[%s] = %s'%(str(cls),key,str(__instance))
return cls.__instances[key]
@classmethod
[docs] def get_singleton(cls,*p,**k):
key = cls.parse_instance_key(*p,**k)
return cls.__instances.get(key,cls(*p,**k))
@classmethod
[docs] def get_singletons(cls):
return cls.__instances
@classmethod
[docs] def clear_singleton(cls,*p,**k):
cls.__instances.pop(cls.parse_instance_key(*p,**k))
@classmethod
[docs] def clear_singletons(cls):
cls.__instances.clear()
@classmethod
[docs] def parse_instance_key(cls,*p,**k):
return '%s(*%s,**%s)' % (cls.__name__,list(p),list(sorted(k.items())))
[docs] def get_instance_key(self):
return self.__instance_key
###############################################################################
from functools import wraps
[docs]class nullDecorator(object):
"""
Empty decorator with null arguments, used to replace pyqtSignal,pyqtSlot
"""
def __init__(self,*args):
pass
def __call__(self,f):
return f
[docs]def decorator_with_args(decorator):
'''
Decorator with Arguments must be used with parenthesis: @decorated()
, even when arguments are not used!!!
This method gets an d(f,args,kwargs) decorator and returns a new single-argument decorator that embeds the new call inside.
But, this decorator disturbed stdout!!!!
There are some issues when calling nested decorators; it is clearly better to use Decorator classes instead.
'''
# decorator_with_args = lambda decorator: lambda *args, **kwargs: lambda func: decorator(func, *args, **kwargs)
return lambda *args, **kwargs: lambda func: decorator(func, *args, **kwargs)
[docs]class Decorated(object): pass
[docs]class Decorator(object):
"""
This generic class allows to differentiate decorators from common classes.
Inherit from it and use issubclass(klass,Decorator) to know if a class is a decorator
To add arguments to decorator reimplement __init__
To modify your wrapper reimplement __call__
A decorator __init__ with a single argument can be called like:
@D
def f(x):
pass
If you need a Decorator with arguments then __init__ will manage the
arguments and __call__ will take the function and return a wrapper instead.
@D(x,y)
def f(z):
pass
"""
def __init__(self,f):
self.f = f
self.call = wraps(f,self.__call__)
def __call__(self,*args,**kwargs):
return self.f(*args,**kwargs)
[docs]class ClassDecorator(Decorator): pass
[docs]class BoundDecorator(Decorator):#object):
"""
Decorates class methods keeping the bound status of its members
Inspired in https://wiki.python.org/moin/PythonDecoratorLibrary#Class_method_decorator_using_instance
Class method decorator specific to the instance.
It uses a descriptor to delay the definition of the
method wrapper.
To use it, just inherit from it and rewrite the wrapper method
Example:
from fandango.objects import BoundDecorator
BoundDecorator().tracer = 1
class X(object):
def __init__(self,name):
self.name = name
def f(self,*args):
return (self.name,args)
class D(BoundDecorator):
@staticmethod
def wrapper(instance,f,*args,**kwargs):
print('guess what?')
v = f(instance,*args,**kwargs)
return v[0]
x = X('a')
X.f = D()(X.f)
x.f()
"""
@staticmethod
[docs] def wrapper(instance,f,*args,**kwargs):
return f(instance, *args, **kwargs)
class _Tracer(object):
def __init__(self):
self._trace = False
def __get__(self,obj,type=None):return self
def __set__(self,obj,value):self._trace = value
def __nonzero__(self): return self._trace
def __call__(self,msg):
if self: print(msg)
tracer = _Tracer() #NOTE: Giving a value to Tracer only works with instances; not from the class
def __call__(this,f=None):
class _Descriptor(BoundDecorator):
#Inherits to get the wrapper from the BoundDecorator class and be able to exist "onDemand"
def __init__(self, f):
self.f = f
def __get__(self, instance, klass):
BoundDecorator.tracer('__get__(%s,%s)'%(instance,klass))
if instance is None:
# Class method was requested
return self.make_unbound(klass)
return self.make_bound(instance)
def make_unbound(self, klass):
BoundDecorator.tracer('make_unbound(%s)'%klass)
@wraps(self.f)
def wrapper(*args, **kwargs):
'''This documentation will disapear :)
This method may work well only without arguments
'''
#raise TypeError('unbound method %s() must be called with %s class '
#'as first argument (got nothing instead)'%(self.f.__name__,klass.__name__))
#return self.f(instance, *args, **kwargs)
BoundDecorator.tracer("Called the unbound method %s of %s"%(self.f.__name__, klass.__name__))
return partial(this.wrapper,f=f)(*args,**kwargs)
return wrapper
def make_bound(self, instance):
BoundDecorator.tracer('make_bound(%s)'%instance)
@wraps(self.f)
def wrapper(*args, **kwargs):
'''This documentation will disapear :)'''
BoundDecorator.tracer("Called the decorated method %s of %s"%(self.f.__name__, instance))
#return self.f(instance, *args, **kwargs)
return this.wrapper(instance,f,*args,**kwargs)
#wrapper = self.wrapper #wraps(self.f)(self.wrapper)
# This instance does not need the descriptor anymore,
# let it find the wrapper directly next time:
setattr(instance, self.f.__name__, wrapper)
return wrapper
return _Descriptor(f)
from . import doc
__doc__ = doc.get_fn_autodoc(__name__,vars())