Source code for fandango.dynamic

#!/usr/bin/env python
"""
#############################################################################
##
## project :     Tango Control System
##
## $Author: Sergi Rubio Manrique, srubio@cells.es $
##
## $Revision: 2008 $
##
## copyleft :    ALBA Synchrotron Controls Section, CELLS
##               Bellaterra
##               Spain
##
#############################################################################
##
## This file is part of Tango Control System
##
## Tango Control System is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as published
## by the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## Tango Control System is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
###########################################################################

"""

__doc__ = """
@package dynamic
<pre>
CLASSES FOR TANGO DEVICE SERVER WITH DYNAMIC ATTRIBUTES
by Sergi Rubio Manrique, srubio@cells.es
ALBA Synchrotron Control Group
Created 5th November 2007

This Module includes several classes:
    DynamicDS : template for dynamic attributes and states.
    DynamicDSTypes : for managing TANGOs Attribute Types
    DynamicAttribute : allows operations between Tango attributes while maintaining quality and date values

    Usage:

    1.DON'T FORGET TO ADD THIS CALLS IN YOUR DEVICE SERVER
        from dynamic import *

    2.TO init_device:
        #It is necessary to avoid all devices to have the same property values, should be unnecessary with newest PyTango release
        self.get_DynDS_properties()

    3.TO always_executed_hook or read_attr_hardware:
        self.prepare_DynDS()
        #or
        DynamicDS.always_executed_hook(self)

    4.DECLARATION OF CLASSES SHOULD BE:
        #class PyPLC(PyTango.Device_3Impl):
        class PyPLC(DynamicDS):
        #class PyPLCClass(PyTango.PyDeviceClass):
        class PyPLCClass(DynamicDSClass):

        NOTE: To make your devices Pogoable again you can substitute this change with 2 calls to:
            addTangoInterface(PyPLC,DynamicDS) and addTangoInterface(PyPLCClass,DynamicDSClass)
        This calls must be inserted in the __main__ method of your Class.py file and also at the end of the file.

    5.AND THE __init__ METHOD:
    def __init__(self,cl, name):
        #PyTango.Device_3Impl.__init__(self,cl,name)
        DynamicDS.__init__(self,cl,name,_locals={},useDynStates=True)

        #The _locals dictionary allows to parse the commands of the class to be available in attributes declaration:
        DynamicDS.__init__(self,cl,name,_locals={
            'Command0': lambda argin: self.Command0(argin),
            'Command1': lambda _addr,val: self.Command1([_addr,val]), #typical Tango command that requires an array as argument
            'Command2': lambda argin,VALUE=None: self.Command1([argin,VALUE]), #typical write command, with VALUE defaulting to None only argin is used
            },useDynStates=False)
        # This will be stored in the self._locals dictionary, that could be extended later by calling eval_attr(...,_locals={...}).
        # useDynStates argument allows to switch between generating State using alarm/warning configuration of the attributes
        # or use the commands introduced in the DynamicDS property (it must be done in __init__ as cannot be changed later).
        ...

    IF YOU WANT TO USE YOUR OWN METHODS IN THE PROPERTY TEXTS YOU SHOULD CUSTOMIZE evalAttr(Atrr_Name) and evalState(expression)
        Copy it to your device server and edit it, the arguments isREAD and isWRITE allows to differentiate Read/Write access
        </pre>
"""

import PyTango,sys,threading,time,traceback,re,inspect
from PyTango import AttrQuality,DevState

import fandango,tango,objects
from excepts import *
from objects import self_locked
from dicts import SortedDict,CaselessDefaultDict,defaultdict,CaselessDict
from log import Logger,shortstr
import functional as fun

#The methods for reading/writing dynamic attributes must be Static for PyTango versions prior to 7.2.2
if getattr(PyTango,'__version_number__',0)<722:
    USE_STATIC_METHODS = True
    print('PyTango Version is %s: fandango.dynamic.USE_STATIC_METHODS = %s' % (PyTango.__version__,USE_STATIC_METHODS))
else: 
    USE_STATIC_METHODS = False

import os
MEM_CHECK = str(os.environ.get('PYMEMCHECK')).lower() in ('yes','true','1')
if MEM_CHECK and 'HEAPY' not in locals():
    try: 
        import guppy
        HEAPY = guppy.hpy()
        HEAPY.setrelheap()    
    except:
        MEM_CHECK=False
else: MEM_CHECK = HEAPY = guppy = None

GARBAGE_COLLECTION=False
GARBAGE,NEW_GARBAGE = None,None
if GARBAGE_COLLECTION:
    import gc
        
if 'Device_4Impl' not in dir(PyTango): PyTango.Device_4Impl = PyTango.Device_3Impl
if 'DeviceClass' not in dir(PyTango): PyTango.DeviceClass = PyTango.PyDeviceClass

[docs]class DynamicDS(PyTango.Device_4Impl,Logger): ''' Check fandango.dynamic.__doc__ for more information ...''' ###################################################################################################### # INTERNAL DYNAMIC DEVICE SERVER METHODS ###################################################################################################### dyn_comms = {} # To be defined here, not at __init__ nor init_device as it is shared by all instances def __init__(self,cl=None,name=None,_globals=None,_locals=None, useDynStates=True): print('> '+'~'*78) self.call__init__('Device_4Impl' in dir(PyTango) and PyTango.Device_4Impl or PyTango.Device_3Impl,cl,name) # Logger must be called after init to use Tango logs properly self.call__init__(Logger,name,format='%(levelname)-8s %(asctime)s %(name)s: %(message)s',level='INFO') self.warning( ' in DynamicDS(%s).__init__ ...'%str(name)) self.trace=False #Tango Properties self.DynamicAttributes = [] self.DynamicStates = [] self.DynamicStatus = [] self.CheckDependencies = True #Internals self.dyn_attrs = {} self.dyn_types = {} self.dyn_states = SortedDict() self.dyn_values = {} #<- That's the main cache used for attribute management self.dyn_qualities = {} #<- It keeps the dynamic qualities variables self.variables = {} self.state_lock = threading.Lock() self.DEFAULT_POLLING_PERIOD = 3000. #Setting default values for Class/Device properties for d in (DynamicDSClass.class_property_list,DynamicDSClass.device_property_list): for prop,value in d.items(): if not hasattr(self,prop): setattr(self,prop,(value[-1] if 'Array' in str(value[0]) else (value[-1][0] if (value and fun.isSequence(value[-1])) else value and value[-1])) ) print('UseTaurus = %s'%getattr(self,'UseTaurus',False)) if getattr(self,'UseTaurus',False): self.UseTaurus = bool(tango.loadTaurus()) # Internal object references self.__prepared = False self.myClass = None ## This dictionary stores XAttr valid arguments and AttributeProxy/TauAttribute objects self._external_attributes = CaselessDict() self._external_listeners = CaselessDict() #CaselessDefaultDict(set) self._external_commands = CaselessDict() self.time0 = time.time() #<< Used by child devices to check startup time self.simulationMode = False #If it is enabled the ForceAttr command overwrites the values of dyn_values self.clientLock=False #TODO: This flag allows clients to control the device edition, using isLocked(), Lock() and Unlock() self.lastAttributeValue = None #TODO: This variable will be used to keep value/time/quality of the last attribute read using a DeviceProxy self.last_state_exception = '' self.last_attr_exception = None self._init_count = 0 self._hook_epoch = 0 self._cycle_start = 0 self._total_usage = 0 self._last_period = {} self._last_read = {} self._read_times = {} self._read_count = defaultdict(int) self._eval_times = {} self._polled_attr_ = {} self.GARBAGE = [] ##Local variables and methods to be bound for the eval methods self._globals={} #globals().copy() if _globals: self._globals.update(_globals) self._locals = {} # Methods to access other device servers self._locals['Attr'] = lambda _name: self.getAttr(_name) self._locals['ATTR'] = lambda _name: self.getAttr(_name) self._locals['XAttr'] = lambda _name,default=None: self.getXAttr(_name,default=default) self._locals['XATTR'] = lambda _name,default=None: self.getXAttr(_name,default=default) self._locals['WATTR'] = lambda _name,value: self.getXAttr(_name,wvalue=value,write=True) self._locals['COMM'] = lambda _name,_argin=None,feedback=None,expected=None: self.getXCommand(_name,_argin,feedback,expected) self._locals['XDEV'] = lambda _name: self.getXDevice(_name) # Methods to manipulate internal variables self._locals['ForceAttr'] = lambda a,v=None: self.ForceAttr(a,v) self._locals['VAR'] = (lambda a=None,v=None,default=None,WRITE=None: self.ForceVar((a or self._locals.get('ATTRIBUTE')),VALUE=v,default=default,WRITE=WRITE)) self._locals['VARS'] = self.variables self._locals['GET'] = (lambda a,default=None: self.ForceVar(a,default=default) if (a in self.variables or default is not None) else self._locals.get(a,default)) self._locals['SET'] = lambda a,v: self.ForceVar(a,v) #self._locals['RWVAR'] = (lambda read_exp=(lambda arg=NAME:self.ForceVar(arg)), #write_exp=(lambda arg=NAME,val=VALUE:self.ForceVar(arg,val)), #aname=NAME, #_read=READ: #(_read and read_exp or write_exp)) self._locals['SetStatus'] = lambda status: [True,self.set_status(status)] self._locals['Add2Status'] = lambda status: [True,self.set_status(self.get_status()+status)] # Advance methods for evaluating formulas self._locals['self'] = self self._locals['EVAL'] = lambda formula: self.evaluateFormula(formula) self._locals['MATCH'] = lambda expr,cad: fun.matchCl(expr,cad) self._locals['DELAY'] = lambda secs: fandango.wait(secs) self._locals['FILE'] = lambda filename: DynamicDS.open_file(filename,device=self) #This command will allow to setup attributes from config files self._locals['FORMULA'] = self.get_attr_formula self._locals['MODELS'] = self.get_attr_models self._locals['time2str'] = fandango.time2str self._locals['ctime2time'] = fandango.ctime2time self._locals['now'] = fandango.now for k in dir(fandango.functional): if '2' in k or k.startswith('to') or k.startswith('is'): self._locals[k] = getattr(fandango.functional,k) # Methods for getting/writing properties self._locals['PROPERTY'] = lambda property,update=False: self.get_device_property(property,update) self._locals['WPROPERTY'] = lambda property,value: (self._db.put_device_property(self.get_name(),{property:[value]}),setattr(self,property,value)) # Methods for managing attribute types self._locals['DYN'] = DynamicAttribute self._locals['SCALAR'] = lambda t,v: castDynamicType(dims=0,klass=t,value=v) self._locals['SPECTRUM'] = lambda t,v: castDynamicType(dims=1,klass=t,value=v) self._locals['IMAGE'] = lambda t,v: castDynamicType(dims=2,klass=t,value=v) if MEM_CHECK: self._locals['guppy'] = guppy self._locals['heapy'] = HEAPY [self._locals.__setitem__(str(quality),quality) for quality in AttrQuality.values.values()] [self._locals.__setitem__(k,dst.pytype) for k,dst in DynamicDSTypes.items()] #Adding states for convenience evaluation self.TangoStates = dict((str(v),v) for k,v in PyTango.DevState.values.items()) self._locals.update(self.TangoStates) if _locals: self._locals.update(_locals) #New submitted methods have priority over the old ones self.useDynStates = useDynStates if self.useDynStates: self.warning('useDynamicStates is set, disabling automatic State generation by attribute config.'+ 'States fixed with set/get_state will continue working.') self.State = self.rawState self.dev_state = self.rawState print('< '+'~'*78)
[docs] def init_device(self): self.info( 'DynamicDS.init_device(%d)'%(self.get_init_count())) try: if not type(self) is DynamicDS and not self.get_init_count(): self.get_DynDS_properties() else: self.updateDynamicAttributes() for c,f in self.dyn_comms.items(): k = c.split('/')[-1] if self.get_name().lower() in c.lower() and k not in self._locals: self._locals.update({k:(lambda argin,cmd=k:self.evalCommand(cmd,argin))}) except: print(traceback.format_exc()) #self.warning(traceback.format_exc()) self._init_count +=1
[docs] def delete_device(self): self.warning( 'DynamicDS.delete_device(): ... ') ('Device_4Impl' in dir(PyTango) and PyTango.Device_4Impl or PyTango.Device_3Impl).delete_device(self)
[docs] def locals(self,key=None): return self._locals if key is None else self._locals.get(key)
[docs] def get_init_count(self): return self._init_count
[docs] def get_parent_class(self): return type(self).mro()[type(self).mro().index(DynamicDS)+1]
[docs] def prepare_DynDS(self): """ This code is placed here because its proper execution cannot be guaranteed during init_device(). """ try: if self.myClass is None: self.myClass = self.get_device_class() #Check polled to be repeated here but using admin (not allowed at Init()); not needed with Tango8 but needed again in Tango9!! (sigh) #if getattr(PyTango,'__version_number__',0) < 804: #self.check_polled_attributes(use_admin=True) ###@TODO REMOVE!!! IT IS CRASHING DEVICES IN TANGO9!! except Exception,e: print('prepare_DynDS failed!: %s' % str(e).replace('\n',';')) #traceback.format_exc() finally: self.__prepared = True return
@self_locked
[docs] def always_executed_hook(self): self.debug("In DynamicDS::always_executed_hook(TAU=%s)"%tango.TAU) try: self._hook_epoch = time.time() #Internal debugging if not self.__prepared: self.prepare_DynDS() #This code is placed here because its proper execution cannot be guaranteed during init_device() self.myClass.DynDev=self #VITAL: It tells the admin class which device attributes are going to be read if self.dyn_states: self.check_state() if self.DynamicStatus: self.check_status() except: self.last_state_exception = 'Exception in DynamicDS::always_executed_hook():\n'+str(traceback.format_exc()) self.error(self.last_state_exception) return
[docs] def read_attr_hardware(self,data): self.debug("In DynDS::read_attr_hardware()") attrs = self.get_device_attr() read_attrs = [attrs.get_attr_by_ind(d).get_name() for d in data] for a in read_attrs: self._read_count[a]+=1 return read_attrs #self.info("read_attr_hardware([%d]=%s)"%(len(data),str(read_attrs)[:80])) ## Edit this code in child classes if needed #try: #attrs = self.get_device_attr() #for d in data: #a_name = attrs.get_attr_by_ind(d).get_name() #if a_name in self.dyn_attrs: #pass #except Exception,e: #self.last_state_exception = 'Exception in read_attr_hardware: %s'%str(e) #self.error('Exception in read_attr_hardware: %s'%str(e))
[docs] def parseStaticAttributes(self,add=True,keep=True): """ Parsing StaticAttributes if defined. """ attrs = [] if hasattr(self,'StaticAttributes'): for a in self.StaticAttributes: aname = a.split('#')[0].split('=')[0].strip() if aname: attrs.append(aname) if any(d.startswith(aname) for d in self.DynamicAttributes): self.info('StaticAttribute %s overriden by DynamicAttributes Property'%(aname)) else: self.debug('Adding StaticAttribute %s to DynamicAttributes list'%(aname)) self.DynamicAttributes.append(a) if keep: #Adds to KeepAttributes if default value is overriden, works if dyn_attr is called after init_device (Tango7) if not any(k.lower()==aname.lower() for k in self.KeepAttributes): self.KeepAttributes.append(aname) self.KeepAttributes = [k for k in self.KeepAttributes if 'no'!=k.lower().strip()] return attrs else: return []
@staticmethod
[docs] def get_db(): DynamicDS._db = getattr(DynamicDS,'_db',None) or PyTango.Util.instance().get_database() return DynamicDS._db
[docs] def get_DynDS_properties(self,db=None): """ It forces DynamicDevice properties reading using the Database device. It has to be used as subclasses may not include all Dynamic* properties in their class generation. It is used by self.updateDynamicAttributes() and required in PyTango<3.0.4 """ self.info('In get_DynDS_properties(): updating DynamicDS properties from Database') ## THIS FUNCTION IS USED FROM updateDynamicAttributes self.get_device_properties(self.get_device_class()) #It will reload all subclass specific properties (with its own default values) self._db = db or self.get_db() if self.LogLevel: try: self.setLogLevel(self.LogLevel) except: self.warning('Unable to setLogLevel(%s)'%self.LogLevel) #Loading DynamicDS specific properties (from Class and Device) for method,target,config in ( (self._db.get_class_property,self.get_device_class().get_name(),DynamicDSClass.class_property_list), (self._db.get_device_property,self.get_name(),dict(list(DynamicDSClass.device_property_list.items())+[('polled_attr',[PyTango.DevVarStringArray,[]])])), ): #Default property values already loaded in __init__; here we are just updating props = [(prop,value) for prop,value in method(target,list(config)).items() if value] for prop,value in props: #Type of each property is adapted to python types dstype = DynamicDSTypes.get(str(config[prop][0]),DynamicDSType('','',pytype=list)) try: value = dstype.pytype(value if dstype.dimx>1 or dstype.dimy>1 else value[0]) except Exception,e: self.info('In get_DynDS_properties: %s(%s).%s property parsing failed: %s -> %s' % (type(self),self.get_name(),value,e)) value = config[prop][-1] if dstype.dimx>1 or dstype.dimy>1 else config[prop][-1][0] if prop=='polled_attr': self._polled_attr_ = tango.get_polled_attrs(value) else: #APPLYING @COPY/@FILE property extensions setattr(self,prop,self.check_property_extensions(prop,value)) self.info('In get_DynDS_properties: %s(%s) properties updated were: %s' % (type(self),self.get_name(),[t[0] for t in props])) [self.info('\t'+self.get_name()+'.'+str(p)+'='+str(getattr(self,p,None))) for p in config] if self.UseTaurus: self.UseTaurus = (tango.TAU or tango.loadTaurus()) and self.UseTaurus if self.LoadFromFile: DynamicDS.load_from_file(device=self) #Adding Static Attributes if defined in the SubClass if getattr(self,'StaticAttributes',None): self.parseStaticAttributes(add=True,keep=True) return
[docs] def get_device_property(self,property,update=False): if update or not hasattr(self,property): setattr(self,property,self._db.get_device_property(self.get_name(),[property])[property]) value = getattr(self,property) return value[0] if fandango.isSequence(value) and len(value)==1 else value
@staticmethod
[docs] def open_file(filename,device=None): print('DynamicDS().open_file(%s)'%(filename)) r = [] try: if device: if not hasattr(device,'PATH'): device.PATH = device.get_device_property('PATH') or '' if device.PATH: filename = device.PATH+'/'+filename f = open(filename) r = f.readlines() f.close() except: print(traceback.format_exc()) return r
@staticmethod
[docs] def load_from_file(filename=None,device=None): """ This line is put in a separate method to allow subclasses to override this behavior""" filename = filename or device.LoadFromFile data = DynamicDS.open_file(filename,device=device) if filename.lower().strip() not in ('no','false','') else [] if data and device: device.DynamicAttributes = list(data)+list(device.DynamicAttributes) return data
EXTENSIONS = dict(list(tango.EXTENSIONS.items())) @staticmethod
[docs] def check_property_extensions(prop,value,db=None,extensions=EXTENSIONS): return tango.check_property_extensions(prop,value,extensions=DynamicDS.EXTENSIONS,db=DynamicDS.get_db(),filters=DynamicDSClass.device_property_list)
[docs] def get_polled_attrs(self,load=False): #@TODO: Tango8 has its own get_polled_attr method; check for incompatibilities if load or not getattr(self,'_polled_attr_',None): self._polled_attr_ = tango.get_polled_attrs(self) return self._polled_attr_
[docs] def check_polled_attributes(self,db=None,new_attr={},use_admin=False): ''' If a PolledAttribute is removed of the Attributes declaration it can lead to SegmentationFault at Device Startup. polled_attr property must be verified to avoid that. The method .get_device_class() cannot be called to get the attr_list value for this class, therefore new_attr must be used to add to the valid attributes any attribute added by subclasses Polling configuration configured through properties has preference over the hard-coded values; but it seems that Tango does not always update that and polling periods have to be updated. Must be called twice (solved in PyTango8) - at dyn_attr to remove unwanted attributes from polled_attr - at prepareDynDS to update polling periods using the admin device ''' self.warning('In check_polled_attributes(%s,use_admin=%s)'%(new_attr,use_admin)) my_name = self.get_name() if use_admin: U = PyTango.Util.instance() admin = U.get_dserver_device() else: self._db = getattr(self,'_db',None) or PyTango.Database() new_attr = dict.fromkeys(new_attr,self.DEFAULT_POLLING_PERIOD) if isinstance(new_attr,list) else new_attr dyn_attrs = list(set(map(str.lower,['state','status']+self.dyn_attrs.keys()+new_attr.keys()))) pattrs = self.get_polled_attrs() npattrs = [] self.info('Already polled: '+str(pattrs)) #First: propagate all polled_attrs if they appear in the new attribute list or remove them if don't for att,period in pattrs.items(): if att in npattrs: continue #remove duplicated elif att.lower() in dyn_attrs: (npattrs.append(att.lower()),npattrs.append(period)) else: self.info('Removing Attribute %s from %s.polled_attr Property' % (att,my_name)) if use_admin: try: admin.rem_obj_polling([my_name,'attribute',att]) except: print(traceback.format_exc()) #Second: add new attributes to the list of attributes to configure; attributes where value is None will not be polled for n,v in new_attr.iteritems(): if n.lower() not in npattrs and v: (npattrs.append(n.lower()),npattrs.append(v)) self.info('Attribute %s added to %s.polled_attr Property' % (n,my_name)) #Third: apply the new configuration if use_admin: for i in range(len(npattrs))[::2]: try: att,period = npattrs[i],npattrs[i+1] if att not in pattrs: admin.add_obj_polling([[int(period)],[my_name,'attribute',att]]) else: admin.upd_obj_polling_period([[int(period)],[my_name,'attribute',att]]) except: print('Unable to set %s polling' % (npattrs[i])) print(traceback.format_exc()) else: self.info('Updating polled_attr: %s'%(npattrs,)) self._db.put_device_property(my_name,{'polled_attr':npattrs}) self.info('Out of check_polled_attributes ...')
[docs] def attribute_polling_report(self): self.debug('\n'+'-'*80) try: now = time.time() self._cycle_start = now-self._cycle_start if 'POLL' in self.dyn_values: self.debug('dyn_values[POLL] = %s ; locals[POLL] = %s' % (self.dyn_values['POLL'].value,self._locals['POLL'])) self.info('Last complete reading cycle took: %f seconds' % self._cycle_start) self.info('There were %d attribute readings.'%(sum(self._read_count.values() or [0]))) head = '%24s\t\t%10s\t\ttype\tinterval\tread_count\tread_time\teval_time\tcpu'%('Attribute','value') lines = [] target = list(t[-1] for t in reversed(sorted((v,k) for k,v in self._read_times.items())))[:7] target.extend(list(t[-1] for t in reversed(sorted((v,k) for k,v in self._read_count.items())) if t not in target)[:7]) for key in target: value = (self.dyn_values[key].value if key in self.dyn_values else 'NotKept') value = str(value)[:16-3]+'...' if len(str(value))>16 else str(value) lines.append('\t'.join([ '%24s'%key[:24], '\t%10s'%value[:16], '%s'%type(value).__name__ if value is not None else '...', '%d'%int(1e3*self._last_period[key]), '%d'%self._read_count[key], '%1.2e'%self._read_times[key], '%1.2e'%self._eval_times[key], '%1.2f%%'%(100*self._eval_times[key]/(self._total_usage or 1.))])) print(head) print('-'*max(len(l)+4*l.count('\t') for l in lines)) print('\n'.join(lines)) print('') self.info('%f s empty seconds in total; %f of CPU Usage' % (self._cycle_start-self._total_usage,self._total_usage/self._cycle_start)) self.info('%f of time used in expressions evaluation' % (sum(self._eval_times.values())/(sum(self._read_times.values()) or 1))) if False: #GARBAGE_COLLECTION: if not gc.isenabled(): gc.enable() gc.collect() grb = gc.get_objects() self.info('%d objects in garbage collector, %d objects are uncollectable.'%(len(grb),len(gc.garbage))) try: if self.GARBAGE: NEW_GARBAGE = [o for o in grb if o not in self.GARBAGE] self.info('New objects added to garbage are: %s' % ([str(o) for o in NEW_GARBAGE],)) except: print(traceback.format_exc()) self.GARBAGE = grb #if MEM_CHECK: #self._locals['heap'] = h = HEAPY.heap() #self.info(str(h)) for a in self._read_count: self._read_count[a] = 0 self._cycle_start = now self._total_usage = 0 except: self.error('DynamicDS.attribute_polling_report() failed!\n%s'%traceback.format_exc()) self.info('-'*80)
[docs] def check_attribute_events(self,aname,poll=False): self.UseEvents = [u.lower().strip() for u in self.UseEvents] self.debug('check_attribute_events(%s,%s,%s)'%(aname,poll,self.UseEvents)) if not len(self.UseEvents): return False elif self.UseEvents[0].lower().strip() in ('','no','false'): return False elif aname.lower().strip() == 'state': return True elif any(fun.matchCl(s,aname) for s in self.UseEvents): return True #Attrs explicitly set doesn't need event config elif self.UseEvents[0] in ('yes','true') and any(self.check_events_config(aname)): return True else: return False
[docs] def check_events_config(self,aname): cabs,crel = 0,0 try: ac = self.get_attribute_config_3(aname)[0] try: cabs = float(ac.event_prop.ch_event.abs_change) except: pass try: crel = float(ac.event_prop.ch_event.rel_change) except: pass except:pass return cabs,crel
[docs] def check_changed_event(self,aname,new_value): if aname not in self.dyn_values: return False try: v = self.dyn_values[aname].value new_value = getattr(new_value,'value',new_value) cabs,crel = self.check_events_config(aname) self.debug('In check_changed_event(%s): %s!=%s > (%s,%s)?'%(aname,shortstr(v),shortstr(new_value),cabs,crel)) if v is None: self.info('In check_changed_event(%s,%s): first value read!'%(aname,shortstr(new_value))) return True elif fun.isSequence(new_value) or fun.isSequence(v): if cabs>0 or crel>0: self.info('In check_changed_event(%s,%s): list changed!'%(aname,shortstr(new_value))) return bool(v!=new_value) else: try: v,new_value = (float(v) if v is not None else None),float(new_value) except Exception,e: self.info('In check_changed_event(%s): values not evaluable (%s,%s): %s'%(aname,shortstr(v),shortstr(new_value),e)) return bool((cabs>0 or crel>0) and v!=new_value) #False if cabs>0 and not v-cabs<new_value<v+cabs: self.info('In check_changed_event(%s,%s): absolute change!'%(aname,shortstr(new_value))) return True elif crel>0 and not v*(1-crel/100.)<new_value<v*(1+crel/100.): self.info('In check_changed_event(%s,%s): relative change!'%(aname,shortstr(new_value))) return True else: self.debug('In check_changed_event(%s,%s): nothing changed'%(aname,shortstr(new_value))) return False except: #Needed to prevent fails if attribute_config_3 is not available print(traceback.format_exc()) return False #------------------------------------------------------------------------------------------------------ # Attribute creators and read_/write_ related methods #------------------------------------------------------------------------------------------------------ ## Dynamic Attributes Creator
[docs] def dyn_attr(self): """ Dynamic Attributes Creator: It initializes the device from DynamicAttributes and DynamicStates properties. It is called by all DeviceNameClass classes that inherit from DynamicDSClass. It MUST be an static method, to bound dynamic attributes and avoid being read by other devices from the same server. This is why Read/Write attributes are staticmethods also. (in other devices lambda is used) """ self.info('\n'+'='*80+'\n'+'DynamicDS.dyn_attr( ... ), entering ...'+'\n'+'='*80) self.KeepAttributes = [s.lower() for s in self.KeepAttributes] ## Lambdas not used because using staticmethods; @TODO: test how it works in PyTango 7.1.2! #read_method = lambda attr,fire_event=True,s=self: s.read_dyn_attr(attr,fire_event) #write_method = lambda attr,fire_event=True,s=self: s.write_dyn_attr(attr,fire_event) if not hasattr(self,'DynamicStates'): self.error('DynamicDS property NOT INITIALIZED!') if self.DynamicStates: self.dyn_states = SortedDict() def add_state_formula(st,formula): self.dyn_states[st]={'formula':formula,'compiled':compile(formula,'<string>','eval')} self.info(self.get_name()+".dyn_attr(): new DynamicState '"+ st+"' = '"+formula+"'") for line in self.DynamicStates: # The character '#' is used for comments if not line.strip() or line.strip().startswith('#'): continue fields = (line.split('#')[0]).split('=',1) if not fields or len(fields) is 1 or '' in fields: self.debug( self.get_name()+".dyn.attr(): wrong format in DynamicStates Property!, "+line) continue st,formula = fields[0].upper().strip(),fields[1].strip() if st in self.TangoStates: add_state_formula(st,formula) elif st == 'STATE': [add_state_formula(s,'int(%s)==int(%s)'%(s,formula)) for s in self.TangoStates if not any(l.startswith(s) for l in self.DynamicStates)] else: self.debug( self.get_name()+".dyn.attr(): Unknown State: %s"%(line,)) #Attributes may be added to polling if having Events new_polled_attrs = set(self.get_polled_attrs().keys()) self.info('In %s.dyn_attr(): inspecting %d attributes ...' %(self.get_name(),len(self.DynamicAttributes))) for line in self.DynamicAttributes: if not line.strip() or line.strip().startswith('#'): continue fields=[] # The character '#' is used for comments in Attributes specification if '#' in line: fields = (line.split('#')[0]).split('=',1) else: fields = line.split('=',1) if fields is None or len(fields) is 1 or '' in fields: self.warning(self.get_name()+".dyn_attr(): wrong format in DynamicAttributes Property!, "+line) continue else: aname,formula=fields[0].strip(),fields[1].strip() self.info(self.get_name()+".dyn_attr(): new Attribute '"+aname+"' = '"+formula+"'") ## How to compare existing formulas? # Strip the typename from the beginning (so all typenames must be identified!) # .strip() spaces and compare # if the code is the same ... don't touch anything # if the code or the type changes ... remove the attribute! # dyn_attr() will create the attributes only if they don't exist. if aname not in self.dyn_values: create = True self.dyn_values[aname]=DynamicAttribute() self.dyn_values[aname].keep = self.KeepAttributes and (not 'no' in self.KeepAttributes) and any(q.lower() in self.KeepAttributes for q in [aname,'*','yes','true']) self.dyn_types[aname]=None else: self.info('\tAttribute %s already exists'%(aname,)) create = False #These 3 options may vary depending on PyTango>3; don't remove them from the code yet if 0: #Works on PyTango7 is_allowed = (lambda s,req_type,a=aname: type(self).is_dyn_allowed(s,req_type,a)) elif 0: is_allowed = self.is_dyn_allowed #May trigger exceptions because attr_name is None else: #Works on PyTango8 def is_allowed(req_type, attr_name=aname,s=self): return s.is_dyn_allowed(req_type,attr_name) is_allowed.__name__ = 'is_%s_allowed'%aname.lower() setattr(self,is_allowed.__name__,is_allowed) max_size = hasattr(self,'DynamicSpectrumSize') and self.DynamicSpectrumSize AttrType = PyTango.AttrWriteType.READ_WRITE if 'WRITE' in fun.re.split('[\[\(\]\)\ =,\+]',formula) else PyTango.AttrWriteType.READ for typename,dyntype in DynamicDSTypes.items(): if dyntype.match(formula): self.debug(self.get_name()+".dyn_attr(): '"+line+ "' matches " + typename + "=" + str(dyntype.labels)) if formula.startswith(typename+'('): #DevULong should not match DevULong64 formula=formula.lstrip(typename) self.debug('Creating attribute (%s,%s,dimx=%s,%s)'%(aname,dyntype.tangotype,dyntype.dimx,AttrType)) if dyntype.dimx==1: if (create): self.add_attribute(PyTango.Attr(aname,dyntype.tangotype, AttrType), \ self.read_dyn_attr,self.write_dyn_attr,is_allowed) #self.read_dyn_attr,self.write_dyn_attr,self.is_dyn_allowed) elif dyntype.dimy==1: if (create): self.add_attribute(PyTango.SpectrumAttr(aname,dyntype.tangotype, AttrType,max_size or dyntype.dimx), \ self.read_dyn_attr,self.write_dyn_attr,is_allowed) #self.read_dyn_attr,self.write_dyn_attr,self.is_dyn_allowed) else: if (create): self.add_attribute(PyTango.ImageAttr(aname,dyntype.tangotype, AttrType,max_size or dyntype.dimx,max_size or dyntype.dimy), \ self.read_dyn_attr,self.write_dyn_attr,is_allowed) #self.read_dyn_attr,self.write_dyn_attr,self.is_dyn_allowed) self.dyn_types[aname]=dyntype break if not self.dyn_types[aname]: self.debug("DynamicDS.dyn_attr(...): Type not matched for '"+line+"', using DevDouble by default" ) if max(map(formula.startswith,['list(','['])): dyntype = DynamicDSTypes['DevVarDoubleArray'] self.info('Creating attribute (%s,%s,dimx=%s,%s)'%(aname,dyntype.tangotype,dyntype.dimx,AttrType)) if (create): self.add_attribute(PyTango.SpectrumAttr(aname,dyntype.tangotype, AttrType,max_size or dyntype.dimx), \ self.read_dyn_attr,self.write_dyn_attr,is_allowed) #self.read_dyn_attr,self.write_dyn_attr,self.is_dyn_allowed) else: dyntype = DynamicDSTypes['DevDouble'] self.debug('Creating attribute (%s,%s,dimx=%s,%s)'%(aname,dyntype.tangotype,dyntype.dimx,AttrType)) if (create): self.add_attribute(PyTango.Attr(aname,PyTango.ArgType.DevDouble, AttrType), \ self.read_dyn_attr,self.write_dyn_attr,is_allowed) #self.read_dyn_attr,self.write_dyn_attr,self.is_dyn_allowed) self.dyn_types[aname]=dyntype #print 'Type of Dynamic Attribute "',aname,'=',formula,'" is "',str(self.dyn_types[aname].labels),'"' self.dyn_attrs[aname]=formula #TODO: Some day self.dyn_values should substitute both dyn_attrs and dyn_types self.dyn_values[aname].formula=formula try: self.dyn_values[aname].compiled=compile(formula.strip(),'<string>','eval') except: self.error(traceback.format_exc()) self.dyn_values[aname].type=self.dyn_types[aname] #Adding attributes to DynamicStates queue: for k,v in self.dyn_states.items(): if aname in v['formula']: #self.dyn_values[aname]=None self.dyn_values[aname].states_queue.append(k) self.info("DynamicDS.dyn_attr(...): Attribute %s added to attributes queue for State %s"%(aname,k)) #Setting up change events: if self.check_attribute_events(aname): self._locals[aname] = None if aname.lower() in new_polled_attrs: #THIS IS CONFIGURING EVENTS TO BE "MANUAL", pushed and unfiltered by Jive settings self.set_change_event(aname,True,False) else: new_polled_attrs.add(aname.lower()) elif self.dyn_values[aname].keep: self._locals[aname] = None if hasattr(self,'DynamicQualities') and self.DynamicQualities: ## DynamicQualities: (*)_VAL = ALARM if $_ALRM else VALID self.info('Updating Dynamic Qualities ........') import re self.dyn_qualities,exprs = {},{} vals = [(line.split('=')[0].strip(),line.split('=')[1].strip()) for line in self.DynamicQualities if '=' in line and not line.startswith('#')] for exp,value in vals: if '*' in exp and '.*' not in exp: exp=exp.replace('*','.*') if not exp.endswith('$'): exp+='$' exprs[exp] = value for aname in self.dyn_values.keys(): for exp,value in exprs.items(): try: match = re.match(exp,aname) if match: self.info('There is a Quality for this attribute!: '+str((aname,exp,shortstr(value)))) for group in (match.groups()): #It will replace $ in the formula for all strings matched by .* in the expression value=value.replace('$',group,1) #e.g: (.*)_VAL=ATTR_ALARM if ATTR('$_ALRM') ... => RF_VAL=ATTR_ALARM if ATTR('RF_ALRM') ... self.dyn_qualities[aname.lower()] = value except Exception,e: self.warning('In dyn_attr(qualities), re.match(%s(%s),%s(%s)) failed' % (type(exp),exp,type(aname),aname)) print(traceback.format_exc()) #Setting up state events: #THESE SETTINGS MAY BE NO LONGER NEEDED IN TANGO >= 7 if self.check_attribute_events('State'): if 'state' in new_polled_attrs: #Already polled self.warning('State events will be managed always by polling') self.set_change_event('State',True,False) #Implemented, don't check conditions to push else: new_polled_attrs.add('state') #To be added at next restart try: self.check_polled_attributes(new_attr=dict.fromkeys(new_polled_attrs,self.DEFAULT_POLLING_PERIOD)) except: print('DynamicDS.dyn_attr( ... ), unable to set polling for (%s): \n%s'%(new_polled_attrs,traceback.format_exc())) print('DynamicDS.dyn_attr( ... ), finished. Attributes ready to accept request ...') #dyn_attr MUST be an static method, to avoid attribute mismatching (self will be always passed as argument)
dyn_attr=staticmethod(dyn_attr)
[docs] def get_dyn_attr_list(self): """Gets all dynamic attribute names.""" return self.dyn_attrs.keys()
[docs] def get_attr_formula(self,aname,full=False): """ Returns the formula for the given attribute The as_tuple flag will return an attr,formula,compiled tuple """ if aname in self.dyn_values: formula = self.dyn_values[aname].formula compiled = self.dyn_values[aname].compiled else: #Getting a caseless attribute that match try: aname,formula,compiled = ((k,self.dyn_values[k].formula, self.dyn_values[k].compiled) for k in self.dyn_values if k.lower()==aname.lower()).next() except: self.warning('DynamicDS.evalAttr: %s doesnt match any Attribute name,' ' trying to evaluate ...'%(aname,)) formula,compiled=aname,None if full: return aname,formula,compiled else: #If no attribute is matching, attribute name is returned return formula
[docs] def is_dyn_allowed(self,req_type,attr_name=''): return (time.time()-self.time0) > 1e-3*self.StartupDelay #@Catched #Catched decorator is not compatible with PyTango_Throw_Exception
@self_locked
[docs] def read_dyn_attr(self,attr,fire_event=True): #if not USE_STATIC_METHODS: self = self.myClass.DynDev aname = attr.get_name() tstart=time.time() keep = aname in self.dyn_values and self.dyn_values[aname].keep or self.check_attribute_events(aname) self.debug('>'*80) self.debug("DynamicDS("+self.get_name()+")::read_dyn_atr("+attr.get_name()+"), entering at "+time.ctime()+"="+str(tstart)+"...") try: if keep and self.KeepTime and self._last_read.get(aname,0) and time.time()<(self._last_read[aname]+(self.KeepTime/1e3)): v = self.dyn_values[aname] self.debug('Returning cached (%s) value for %s: %s(%s)'%(time.ctime(self._last_read[aname]),aname,type(v.value),shortstr(v.value))) return attr.set_value_date_quality(v.value,v.date,v.quality) except Exception,e: self.warning('Unable to reload %s kept values, %s'%(aname,str(e))) try: result = self.evalAttr(aname) quality = getattr(result,'quality',self.get_quality_for_attribute(aname,result)) date = self.get_date_for_attribute(aname,result) result = self.dyn_types[aname].pytype(result) if hasattr(attr,'set_value_date_quality'): attr.set_value_date_quality(result,date,quality) else: #PyTango<7 if type(result) in (list,set,tuple): attr_DynAttr_read = [] for r in result: attr_DynAttr_read.append(r) try: PyTango.set_attribute_value_date_quality(attr_DynAttr_read,date,quality,len(attr_DynAttr_read),0) except: attr.set_value(attr_DynAttr_read) else: try: PyTango.set_attribute_value_date_quality(attr,result,date,quality) except: attr.set_value(result) text_result = (type(result) is list and result and '%s[%s]'%(type(result[0]),len(result))) or str(result) now=time.time() self._last_period[aname]=now-self._last_read.get(aname,0) self._last_read[aname]=now self._read_times[aname]=now-self._hook_epoch self._eval_times[aname]=now-tstart self._total_usage += now-self._hook_epoch self.debug('DynamicDS('+self.get_name()+").read_dyn_attr("+aname+")="+text_result+ ", ellapsed %1.2e"%(self._eval_times[aname])+" seconds.\n") #", finished at "+time.ctime(now)+"="+str(now)+", timestamp is %s"%str(date)+", difference is "+str(now-date)) if 'debug' in str(self.getLogLevel()) and (time.time()>(self._cycle_start+self.PollingCycle*1e-3) if hasattr(self,'PollingCycle') else aname==sorted(self.dyn_values.keys())[-1]): self.attribute_polling_report() except Exception, e: now=time.time() self.dyn_values[aname].update(e,now,PyTango.AttrQuality.ATTR_INVALID) #Exceptions always kept! self._last_period[aname]=now-self._last_read.get(aname,0) self._last_read[aname]=now self._read_times[aname]=now-self._hook_epoch #Internal debugging self._eval_times[aname]=now-tstart #Internal debugging if aname==self.dyn_values.keys()[-1]: self._cycle_start = now last_exc = str(e) self.error('DynamicDS_read_%s_Exception: %s' % (aname,last_exc)) if not isinstance(e,RethrownException): print(traceback.format_exc()) raise Exception('DynamicDS_read_%s_Exception: %s' % (aname,last_exc)) ##This hook has been used to force self to be passed always as argument and avoid dynattr missmatching
if USE_STATIC_METHODS: read_dyn_attr=staticmethod(read_dyn_attr) #@Catched @self_locked
[docs] def write_dyn_attr(self,attr,fire_event=True): aname = attr.get_name() self.debug("DynamicDS("+self.get_name()+")::write_dyn_atr("+aname+"), entering at "+time.ctime()+"...") #THIS CHANGE MUST BE TESTED AGAINST PYTANGO7!!!! For Scalar/Spectrum/Image R/W Attrs!! try: #PyTango8 data = attr.get_write_value() except: data = [] attr.get_write_value(data) if fun.isSequence(data) and self.dyn_types[aname].dimx==1: data = data[0] elif self.dyn_types[aname].dimy!=1: x = attr.get_max_dim_x() data = [data[i:i+x] for i in range(len(data))[::x]] self.setAttr(aname,data) #self.dyn_values[aname].update(result,time.time(),PyTango.AttrQuality.ATTR_VALID) ##if fire_event: self.fireAttrEvent(aname,data) ##This hook has been used to force self to be passed always as argument and avoid dynattr missmatching
if USE_STATIC_METHODS: write_dyn_attr=staticmethod(write_dyn_attr) #------------------------------------------------------------------------------------------------------ # Attributes and State Evaluation Methods #------------------------------------------------------------------------------------------------------ # DYNAMIC ATTRIBUTE EVALUATION ... Copy it to your device and add any method you will need
[docs] def evalAttr(self,aname,WRITE=False,VALUE=None,_locals=None): ''' SPECIFIC METHODS DEFINITION DONE IN self._locals!!! @remark Generators don't work inside eval!, use lists instead ''' self.debug("DynamicDS("+self.get_name()+ ")::evalAttr("+aname+"): ... last value was %s"%shortstr(getattr(self.dyn_values.get(aname,None),'value',None))) tstart = time.time() aname,formula,compiled = self.get_attr_formula(aname,full=True) try: #Checking attribute dependencies if self.CheckDependencies and aname in self.dyn_values: if self.dyn_values[aname].dependencies is None: self.debug("In evalAttr ... setting dependencies") self.dyn_values[aname].dependencies = set() a = aname.lower().strip() fs = (formula+'\n'+self.dyn_qualities.get(a,'')).lower() for k,v in self.dyn_values.items(): ks = k.lower().strip() if a==ks: continue if ks in fun.re.split("[^'\"_0-9a-zA-Z]",fs): #fun.searchCl("(^|[^'\"_0-9a-z])%s($|[^'\"_0-9a-z])"%k,formula): self.dyn_values[aname].dependencies.add(k) #Dependencies are case sensitive self.dyn_values[k].keep = True #Updating Last Attribute Values if self.dyn_values[aname].dependencies: now = time.time() for k in (self.dyn_values[aname].dependencies or []): self.debug("In evalAttr(%s) ... updating dependencies (%s,last read at %s, KeepTime is %s)"%(aname,k,self._last_read.get(k,0),self.KeepTime)) if self.KeepTime and (not self._last_read.get(k,0) or now>(self._last_read[k]+(self.KeepTime/1e3))): self.debug("In evalAttr ... updating %s value"%(k,)) if USE_STATIC_METHODS: self.read_dyn_attr(self,tango.fakeAttributeValue(k)) else: self.read_dyn_attr(tango.fakeAttributeValue(k)) v = self.dyn_values[k] if k.lower().strip()!=aname.lower().strip() and isinstance(v.value,Exception): self.warning('evalAttr(%s): An exception is rethrowed from attribute %s'%(aname,k)) raise RethrownException(v.value) #Exceptions are passed to dependent attributes else: self._locals[k]=v.value #.value else: self.debug("In evalAttr ... updating locals from dyn_values") for k,v in self.dyn_values.items(): if v.keep and k in formula: self._locals[k]=v.value self.debug("In evalAttr ... updating locals defaults") try: self._locals.update({ 't':time.time()-self.time0, 'WRITE':WRITE, 'READ':bool(not WRITE), 'ATTRIBUTE':aname, 'NAME':self.get_name(), 'VALUE':VALUE if VALUE is None or aname not in self.dyn_types else self.dyn_types[aname].pytype(VALUE), 'STATE':self.get_state(), 'LOCALS':self._locals, #'ATTRIBUTES':dict((a,getattr(self.dyn_values[a],'value',None)) for a in self.dyn_values if a in self._locals), 'ATTRIBUTES':sorted(self.dyn_values.keys()), 'XATTRS':self._external_attributes, }) #It is important to keep this values persistent; becoming available for quality/date/state/status management if _locals is not None: self._locals.update(_locals) #High Priority: variables passed as argument except Exception,e: self.error('<'*80) self.error(traceback.format_exc()) for t in (VALUE,type(VALUE),aname,self.dyn_types.get(aname,None),aname in self.dyn_types and self.dyn_types[aname].pytype): self.warning(str(t)) self.error('<'*80) raise e if WRITE: self.debug('%s::evalAttr(WRITE): Attribute=%s; formula=%s; VALUE=%s'%(self.get_name(),aname,formula,shortstr(VALUE))) elif aname in self.dyn_values: self.debug('%s::evalAttr(READ): Attribute=%s; formula=%s;'%(self.get_name(),aname,formula,)) else: self.info('%s::evalAttr(COMMAND): formula=%s;'%(self.get_name(),formula,)) result = eval(compiled or formula,self._globals,self._locals) #<<<<< EVAL! self.debug('eval result: '+str(result)) #Push/Keep Read Attributes if not WRITE and aname in self.dyn_values: quality = self.get_quality_for_attribute(aname,result) if hasattr(result,'quality'): result.quality = quality date = self.get_date_for_attribute(aname,result) has_events = self.check_attribute_events(aname) value = self.dyn_types[aname].pytype(result) #Events must be checked before updating the cache if has_events and self.check_changed_event(aname,result): self.info('>'*80) self.info('Pushing %s event!: %s(%s)'%(aname,type(result),shortstr(result))) self.push_change_event(aname,value,date,quality) #Updating the cache: keep = self.dyn_values[aname].keep or has_events if keep: old = self.dyn_values[aname].value self.dyn_values[aname].update(value,date,quality) self._locals[aname] = value (self.debug if self.KeepAttributes[0] in ('yes','true') else self.info)('evalAttr(%s): Value will be kept for later reuse' % (aname,)) #Updating state if needed: try: if old!=value and self.dyn_values.get(aname).states_queue: self.check_state() except: self.warning('Unable to check state!') self.warning(traceback.format_exc()) return result except PyTango.DevFailed, e: if self.trace: print('-'*80) print('\n'.join(['DynamicDS_evalAttr(%s)_WrongFormulaException:'%aname,'\t"%s"'%(formula,),str(traceback.format_exc())])) print('\n'.join([str(e.args[0])]) + '\n'+'*'*80) print('-'*80) err = e.args[0] self.error(e) raise e #Exception,';'.join([err.origin,err.reason,err.desc]) except Exception,e: if self.last_attr_exception and self.last_attr_exception[0]>tstart: e = self.last_attr_exception[-1] if 1: print('\n'.join(['DynamicDS_evalAttr_WrongFormulaException','%s is not a valid expression!'%(formula,),])) s = traceback.format_exc() self.error(s) raise Exception(s) finally: self._locals['ATTRIBUTE'] = '' # DYNAMIC STATE EVALUATION
[docs] def evalState(self,formula,_locals={}): """ Overloading the eval method to be used to evaluate State expressions ... To customize it: copy it to your device and add any method you will need @remark Generators don't work inside eval!, use lists instead The main difference with evalAttr is that evalState will not Check Dependencies nor push events """ self.debug('DynamicDS.evalState/evaluateFormula(%s)'%(isinstance(formula,str) and formula or 'code')) #MODIFIIED!! to use pure DynamicAttributes #Attr = lambda a: self.dyn_values[a].value t = time.time()-self.time0 for k,v in self.dyn_values.items(): self._locals[k]=v#.value #Updating Last Attribute Values __locals = {}#__locals=locals().copy() #Low priority: local variables __locals.update(self._locals) #Second priority: object statements __locals.update(_locals) #High Priority: variables passed as argument __locals.update( {'STATE':self.get_state(),'t':time.time()-self.time0,'NAME': self.get_name(), 'ATTRIBUTES':sorted(self.dyn_values.keys()),#'ATTRIBUTES':dict((a,getattr(self.dyn_values[a],'value',None)) for a in self.dyn_values if a in self._locals), 'FORMULAS':dict((k,v.formula) for k,v in self.dyn_values.items()), 'WRITE':False,'VALUE':None, }) return eval(formula,self._globals,__locals)
[docs] def evalCommand(self,cmd,argin=None): """This method will execute a command declared using DynamicCommands property""" k = cmd if '/' in cmd else self.get_name()+'/'+cmd assert k in self.dyn_comms.keys(),('%s command not declared in properties!'%(k,)) return self.evalAttr(self.dyn_comms[k],_locals={'ARGS':argin}) #------------------------------------------------------------------------------------------------------ # Methods usable inside Attributes declaration #------------------------------------------------------------------------------------------------------
[docs] def getAttr(self,aname,default=None,write=False,wvalue=None): """Evaluates an attribute and returns its Read value.""" try: al = aname.lower() if '/' in aname: value = self.getXAttr(aname,default,write,wvalue) elif al=='state': value = self.get_state() elif al=='status': value = self.get_status() elif al in map(str.lower,self.dyn_values.keys()): value = self.evalAttr(aname,WRITE=write,VALUE=fun.notNone(wvalue,default)) else: #Getting an Static attribute that match: method = getattr(self,'read_%s'%aname,getattr(self,'read_%s'%al,None)) if method is not None: self.warning('DynamicDS.getAttr: %s is an static attribute ...'%(aname,)) attr = tango.fakeAttributeValue(aname) method(attr) value = attr.value else: self.warning('DynamicDS.getAttr: %s doesnt match any Attribute name, trying to evaluate ...'%(aname,)) value = default return value except Exception,e: if default is not None: return default else: traceback.print_exc() raise e
[docs] def setAttr(self,aname,VALUE): """Evaluates the WRITE part of an Attribute, passing a VALUE.""" self.evalAttr(aname,WRITE=True,VALUE=VALUE)
[docs] def event_received(self,source,type_,attr_value): def log(prio,s,obj=self): #,level=self.log_obj.level): if obj.getLogLevel(prio)>=obj.log_obj.level: print('%s(%s) %s %s: %s' % ( prio.upper(), (obj.getLogLevel(prio),obj.log_obj.level), time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()), obj.get_name(),s)) if type_ == tango.fakeEventType.Config: log('debug','In DynamicDS.event_received(%s(%s),%s,%s): Config Event Not Implemented!'%( type(source).__name__,source,tango.fakeEventType[type_],type(attr_value).__name__,#getattr(attr_value,'value',attr_value) )) else: log('info','In DynamicDS.event_received(%s(%s),%s,%s)'%( type(source).__name__,source,tango.fakeEventType[type_],type(attr_value).__name__) ) try: if type_ in ('Error',tango.fakeEventType['Error']): log('error','Error received from %s: %s'%(source, attr_value)) full_name = tango.get_model_name(source) #.get_full_name() if full_name not in self._external_listeners: self.debug('%s does not trigger any dynamic attribute event'%full_name) elif self._external_listeners[full_name]: log('info','\t%s.listeners: %s'%(full_name,self._external_listeners[full_name])) for aname in self._external_listeners[full_name]: if self._locals['ATTRIBUTE'] == aname: #Variable already being evaluated continue else: log('info','\tforwarding event to %s ...'%aname) self.evalAttr(aname) except: print(traceback.format_exc()) return
[docs] def get_attr_models(self,attribute): """ Given a dynamic attribute name or formula, it will return a list of tango models appearing on it """ formula = self.get_attr_formula(attribute) matches = re.findall(tango.retango,formula) #Matches are models split in parts, need to be joined return ['/'.join(filter(bool,s)) for s in matches]
[docs] def getXDevice(self,dname): """ This method returns a DeviceProxy to the given attribute. """ if self.UseTaurus: return tango.TAU.Device(dname) else: return PyTango.DeviceProxy(dname)
[docs] def getXAttr(self,aname,default=None,write=False,wvalue=None): """ Performs an external Attribute reading, using a DeviceProxy to read own attributes. Argument could be: [attr_name] or [device_name](=State) or [device_name/attr_name] :returns: Attribute value or None """ params = tango.parse_tango_model(aname,use_host=False) #Device will contain TANGO_HOST only if differs from current if params: device,aname = params.get('device',None),params.get('attribute',aname) else: device,aname = aname.rsplit('/',1) if '/' in aname else '',aname (self.info if write else self.debug)("DynamicDS.getXAttr(%s,%s,write=%s): ..."%(device or self.get_name(),aname,write and '%s(%s)'%(type(wvalue),wvalue))) result = default #Returning an empty list because it is a False iterable value that can be converted to boolean (and False or None cannot be converted to iterable) try: if not device: self.info('getXAttr accessing to device itself ... using getAttr instead') if write: self.setAttr(aname,wvalue) result = wvalue else: result = self.getAttr(aname) else: devs_in_server = self.myClass and self.myClass.get_devs_in_server() or [] if device in devs_in_server: #READING FROM AN INTERNAL DEVICE self.debug('getXAttr accessing a device in the same server ... using getAttr') if aname.lower()=='state': result = devs_in_server[device].get_state() elif aname.lower()=='status': result = devs_in_server[device].get_status() elif write: devs_in_server[device].setAttr(aname,wvalue) result = wvalue else: result = devs_in_server[device].getAttr(aname) else: #READING FROM AN EXTERNAL DEVICE full_name = (device or self.get_name())+'/'+aname if full_name not in self._external_attributes: self.debug('%s.getXAttr: creating %s proxy to %s' % (self._locals.get('ATTRIBUTE'),'taurus' if self.UseTaurus else 'PyTango',full_name)) if self.UseTaurus: #USING TAURUS+EVENTS = CACHED VALUES a = tango.TAU.Attribute(full_name) #full_name = tango.get_model_name(a) #If the host is external it must be specified in the formula self._external_attributes[full_name] = a self._external_attributes[full_name].changePollingPeriod(self.DEFAULT_POLLING_PERIOD) if len(self._external_attributes) == 1: tango.TAU_LOGGER.disableLogOutput() if self._locals.get('ATTRIBUTE') and self.check_attribute_events(self._locals.get('ATTRIBUTE')): #If Attribute has events evalAttr() will be called at every event_received #If there's no events, then will be not necessary self.info('\t%s.addListener(%s)'%(full_name,self._locals['ATTRIBUTE'])) if tango.get_model_name(a) not in self._external_listeners: self._external_listeners[tango.get_model_name(a)]=set() self._external_listeners[tango.get_model_name(a)].add(self._locals['ATTRIBUTE']) self._external_attributes[full_name].addListener(self.event_received) else: #USING PLAIN PYTANGO (POLLING UNCACHED VALUES) self._external_attributes[full_name] = tango.CachedAttributeProxy(full_name,max((100,self.KeepTime)))#keeptime=self.KeepTime) else: self.debug('%s.getXAttr: using %s proxy to %s' % (self._locals.get('ATTRIBUTE'),'taurus' if self.UseTaurus else 'PyTango',full_name)) if write: self.info('getXAttr(Write): %s(%s)'%(type(wvalue),wvalue)) self._external_attributes[full_name].write(wvalue) result = wvalue else: attrval = self._external_attributes[full_name].read() result = attrval.value self.debug('%s.read() = %s ...'%(full_name,str(result)[:40])) except Exception,e: msg = 'Unable to read attribute %s from device %s: \n%s' % (str(aname),str(device),traceback.format_exc()) print(msg) self.error(msg) self.last_attr_exception = (time.time(),msg,e) #Exceptions are not re_thrown to allow other commands to be evaluated if this fails. finally: if hasattr(self,'myClass') and self.myClass: self.myClass.DynDev=self #NOT REDUNDANT: If a call to another device in the same server occurs this pointer could have been modified. #Check added to prevent exceptions due to empty arrays if hasattr(result,'__len__') and not len(result): result = default if hasattr(default,'__len__') else [] elif result is None: result = default self.debug('Out of getXAttr(%s)'%shortstr(result,40)) return result
[docs] def getXCommand(self,cmd,args=None,feedback=None,expected=None): """ Performs an external Command reading, using a DeviceProxy :param cmd_name: a/b/c/cmd """ self.info("DynamicDS(%s)::getXComm(%s,%s,%s,%s): ..."%(self.get_name(),cmd,args,feedback,expected)) if feedback is None: device,cmd = cmd.rsplit('/',1) if '/' in cmd else (self.get_name(),cmd) full_name = device+'/'+cmd result = None try: if device == self.get_name(): self.info('getXCommand accessing to device itself ...') result = getattr(self,cmd)(args) else: devs_in_server = self.myClass.get_devs_in_server() if device in devs_in_server: self.debug('getXCommand accessing a device in the same server ...') if cmd.lower()=='state': result = devs_in_server[device].get_state() elif cmd.lower()=='status': result = devs_in_server[device].get_status() else: result = getattr(devs_in_server[device],cmd)(argin) else: self.debug('getXCommand calling a proxy to %s' % (device,)) if full_name not in self._external_commands: if self.UseTaurus: self._external_commands[full_name] = tango.TAU.Device(device) if len(self._external_commands)==1: tango.TAU_LOGGER.disableLogOutput() else: self._external_commands[full_name] = PyTango.DeviceProxy(device) self.debug('getXCommand(%s(%s))'%(full_name,args)) if args in (None,[],()): result = self._external_commands[full_name].command_inout(cmd) else: result = self._external_commands[full_name].command_inout(cmd,args) #result = self._external_commands[full_name].command_inout(*([cmd,argin] if argin is not None else [cmd])) except Exception,e: msg = 'Unable to execute %s(%s): %s' % (full_name,args,traceback.format_exc()) self.last_attr_exception = (time.time(),msg,e) self.error(msg) #Exceptions are not re_thrown to allow other commands to be evaluated if this fails. finally: if hasattr(self,'myClass') and self.myClass: self.myClass.DynDev=self #NOT REDUNDANT: If a call to another device in the same server occurs this pointer could have been modified. return result else: if fun.isString(cmd): if '/' not in cmd: device = self.get_name() else: device,cmd = cmd.rsplit('/',1) else: device = self.get_name() if fun.isString(feedback) and '/' not in feedback: feedback = device+'/'+feedback return tango.TangoCommand(command=cmd,device=device,feedback=feedback,timeout=10.,wait=10.).execute(args,expected=expected)
[docs] def get_quality_for_attribute(self,aname,attr_value): self.debug('In get_quality_for_attribute(%s,%s)' % (aname,shortstr(attr_value,15)[:10])) formula = self.dyn_qualities.get(aname.lower()) or 'Not specified' try: if aname.lower() in self.dyn_qualities: self._locals['VALUE'] = getattr(attr_value,'value',attr_value) self._locals['DEFAULT'] = getattr(attr_value,'quality',PyTango.AttrQuality.ATTR_VALID) quality = eval(formula,{},self._locals) or PyTango.AttrQuality.ATTR_VALID else: quality = getattr(attr_value,'quality',PyTango.AttrQuality.ATTR_VALID) self.debug('\t%s.quality = %s'%(aname,quality)) return quality except Exception,e: self.error('Unable to generate quality for attribute %s: %s\n%s'%(aname,formula,traceback.format_exc())) return PyTango.AttrQuality.ATTR_VALID
[docs] def get_date_for_attribute(self,aname,value): if type(value) is DynamicAttribute: return value.date return time.time()
[docs] def ForceAttr(self,argin,VALUE=None): ''' Description: The arguments are AttributeName and an optional Value.<br> This command will force the value of the Attribute or will return the last forced value (if only one argument is passed). ''' if type(argin) is not list: argin = [argin] if len(argin)<1: raise Exception('At least 1 argument required (AttributeName)') if len(argin)<2: value=VALUE else: value=argin[1] aname = argin[0] if aname not in self.dyn_values.keys(): raise Exception('Unknown State or Attribute : ', aname) elif value is not None: self.dyn_values[aname].forced=value if self.simulationMode: self.dyn_values[aname].value=value else: value = self.dyn_values[aname].forced return value
[docs] def ForceVar(self,argin,VALUE=None,default=None,WRITE=None): ''' Management of "Forced Variables" in dynamic servers Description: The arguments are VariableName and an optional Value.<br> This command will force the value of a Variable or will return the last forced value (if only one argument is passed). There are several syntaxes that can be used to call variables. - VAR("MyVariable",default=0) : return Value if initialized, else 0 - VAR("MyVariable",VALUE) : Writes Value into variable - VAR("MyVariable",WRITE=True) : Writes VALUE as passed from a write_attribute() call; reads otherwise - This syntax replaces (VAR("MyVar",default=0) if not WRITE else VAR("MyVar",VALUE)) - GET("MyVariable") : helper to read variable - SET("MyVariable",VALUE) : helper to write variable :param default: value to initialize variable at first read :param WRITE: if True, VALUE env variable will overwrite VALUE argument ''' if type(argin) is not list: argin = [argin] if len(argin)<1: raise Exception('At least 1 argument required (AttributeName)') if len(argin)<2: value=VALUE else: value=argin[1] aname = argin[0] is_write = self._locals.get('WRITE') self.debug('VAR(%s,%s,%s,%s & %s)'%(aname,value,default,WRITE,is_write)) if WRITE and is_write: value = fun.notNone(self._locals.get('VALUE'),value) self.debug('Writing %s into %s'%(value,aname)) if value is not None: self.variables[aname] = value elif self.variables.get(aname) is None: self.variables[aname] = default value = self.variables[aname] return value #------------------------------------------------------------------------------------------------------ # State related methods #------------------------------------------------------------------------------------------------------
[docs] def rawState(self): self.debug('In DynamicDS.rawState(), overriding attribute-based State.') state = self.get_state() self.debug('In DynamicDS.State()='+str(state)) return state
[docs] def set_state(self,state,push=False): self._locals['STATE']=state try: if push and self.check_attribute_events('state'): self.info('DynamicDS(%s.set_state(): pushing new state event'%(self.get_name())) try: self.push_change_event('State',state,time.time(),PyTango.AttrQuality.ATTR_VALID) except Exception,e: self.warning('DynamicDS.push_event(State=%s) failed!: %s'%(state,e)) except Exception,e: self.warning('DynamicDS.check_attribute_events(State=%s) failed!: %s'%(state,e)) DynamicDS.get_parent_class(self).set_state(self,state)
[docs] def check_state(self,set_state=True,current=None): ''' The thread automatically close if there's no activity for 5 minutes, an always_executed_hook call or a new event will restart the thread. ''' new_state = self.get_state() try: if self.state_lock.locked(): self.debug('In DynamicDS.check_state(): lock already acquired') return new_state self.state_lock.acquire() if self.dyn_states: self.debug('In DynamicDS.check_state()') old_state = new_state if current is None else current ## @remarks: the device state is not changed if none of the DynamicStates evaluates to True #self.set_state(PyTango.DevState.UNKNOWN) self.last_state_exception = '' for state,value in self.dyn_states.items(): nstate,formula,code=state,value['formula'],value['compiled'] if nstate not in self.TangoStates: continue result=None try: result=self.evalState(code) #Use of self.evalState allows to overload it except Exception,e: self.error('DynamicDS(%s).check_state(): Exception in evalState(%s): %s'%(self.get_name(),formula,str(traceback.format_exc()))) self.last_state_exception += '\n'+time.ctime()+': '+str(traceback.format_exc()) self.info('In DynamicDS.check_state(): %s : %s ==> %s' % (state,value['formula'],result)) if result: new_state = self.TangoStates[nstate] if new_state!= old_state: self.info('DynamicDS(%s.check_state(): New State is %s := %s'%(self.get_name(),nstate,formula)) if set_state:self.set_state(new_state,push=True) break except Exception,e: print(traceback.format_exc()) raise e finally: if self.state_lock.locked(): self.state_lock.release() return new_state
[docs] def set_status(self,status,save=True): if save: #not any('STATUS' in s for s in self.DynamicStatus): #adds STATUS to locals only if not used in DynamicStatus? self._locals['STATUS']=status self.debug('STATUS: %s'%(status,)) DynamicDS.get_parent_class(self).set_status(self,status)
[docs] def set_full_status(self,status,set=True): if self.last_state_exception: status += '\nLast DynamicStateException was:\n\t'+self.last_state_exception if self.last_attr_exception: status += '\nLast DynamicAttributeException was:\n\t%s:%s'%(time.ctime(self.last_attr_exception[0]),str(self.last_attr_exception[1])) if set: self.set_status(status) return status
[docs] def check_status(self,set=True): status = self.get_status() if self.DynamicStatus: self.debug('In DynamicDS.check_status') try: status = '' for s in self.DynamicStatus: try: t = s and self.evaluateFormula(s) or '' status += t+'\n' except Exception,x: self.warning('\tevaluateStatus(%s) failed: %s'%(s,traceback.format_exc())) if set: self.set_status(status,save=False) except Exception,e: self.warning('Unable to generate DynamicStatus:\n%s'%traceback.format_exc()) return status.strip() #------------------------------------------------------------------------------------------------------ # Lock/Unlock Methods #------------------------------------------------------------------------------------------------------
[docs] def isLocked(self): return self.clientLock
[docs] def Lock(self): self.clientLock=True
[docs] def Unlock(self): self.clientLock=False #------------------------------------------------------------------------------------------------------ # DynamicAttribute manipulation methods #------------------------------------------------------------------------------------------------------
[docs] def Help(self,str_format='text'): """This command returns help for this device class and its parents""" return tango.get_device_help(self,str_format)
[docs] def updateDynamicAttributes(self): """Forces dynamic attributes update from properties. @warning : It will DELETE all attributes that does not appear in DynamicAttributes property or StaticAttributes list! """ self.warning('In updateDynamicAttributes(): reloading DynamicDS properties from Database') self.get_DynDS_properties() ##All attributes managed with dyn_attr() that does not appear in DynamicAttributes or StaticAttributes list will be removed! attrs_list = [name.split('=',1)[0].strip() for name in (self.DynamicAttributes + (getattr(self,'StaticAttributes',None) or []))] for a in self.dyn_attrs: if a not in attrs_list: self.warning('DynamicDS.updateDynamicAttributes(): Removing Attribute!: %sn not in [%s]' % (a,attrs_list)) try: self.remove_attribute(a) except Exception,e: self.error('Unable to remove attribute %s: %s' % (a,str(e))) DynamicDS.dyn_attr(self) #Updating DynamicCommands (just update of formulas) try: CreateDynamicCommands(type(self),type(self.get_device_class())) except: print('CreateDynamicCommands failed: %s'%traceback.format_exc()) #------------------------------------------------------------------ # EvaluateFormula command: # # Description: This execute eval(Expression), just to check if its sintax is adequate or not. # # argin: DevString PyTango Expression to evaluate # argout: DevString #------------------------------------------------------------------ #Methods started with underscore could be inherited by child device servers for debugging purposes
[docs] def evaluateFormula(self,argin): t0 = time.time() self.debug('\tevaluateFormula(%s)'%(argin,)) e = self.evalState(str(argin)) argout=str(e) self.debug('\tevaluateFormula took %s seconds'%(time.time()-t0)) return argout #------------------------------------------------------------------ # GetDynamicConfig command: # # Description: Return current property values # # argin: DevVoid # argout: DevString Return current property values #------------------------------------------------------------------ #Methods started with underscore could be inherited by child device servers for debugging purposes
[docs] def getDynamicConfig(self): exclude = 'DynamicAttributes','DynamicCommands','DynamicStates','DynamicStatus' return '\n'.join(sorted('%s: %s'%(k,getattr(self,k,None)) for l in (DynamicDSClass.class_property_list,DynamicDSClass.device_property_list) for k in l if k not in exclude)) #------------------------------------------------------------------ # GetMemUsage command: # # Description: Returns own process RSS memory usage (Mb). # # argin: DevVoid # argout: DevString Returns own process RSS memory usage (Mb) #------------------------------------------------------------------ #Methods started with underscore could be inherited by child device servers for debugging purposes
[docs] def getMemUsage(self): return fandango.linos.get_memory()/1e3 #------------------------------------------------------------------ # Read MemUsage attribute #------------------------------------------------------------------
[docs] def read_MemUsage(self, attr): self.debug("In read_MemUsage()") # Add your own code here attr.set_value(self.getMemUsage())
""" #THE FOLLOWING COMMANDS HAVE BEEN COPIED FROM PySignalSimulator AND ARE NOT STILL PORTED TO THIS CLASS BEHAVIOUR! #------------------------------------------------------------------ # SetExpression command: # # Description: The arguments are AttributeName,Expression.<br>This command will set the value of the Attribute to eval(Expression) each time that read_attribute is executed. # # argin: DevVarStringArray AttributeName,Expression #------------------------------------------------------------------ def SetExpression(self,argin): try: if len(argin)<2: raise Exception() aname = argin[0] ; exp = argin[1] if aname in self.dyn_attrs.keys(): def Attr(_name): return self.getAttr(_name) t = time.time()-self.time0 eval(exp) self.dyn_attrs[aname]=exp elif aname in self.TangoStates: i=-1 for s in self.SimStates: if aname in s: i = self.SimStates.index(s) self.SimStates[i]=str(aname+'='+exp) if i<0: self.SimStates.append(str(aname+'='+exp)) else: raise Exception('Unknown State or Attribute : ', aname) except Exception, e: PyTango.Except.throw_exception(str(e),'This is not a valid expression!','PySignalSimulator.SetExpression('+argin.__repr__()+')') """ """ #------------------------------------------------------------------ # StoreExpressions command: # # Description: This command forces the actual expressions for each attribute to be stored in the database, by writing the SimAttributes Property. #------------------------------------------------------------------ def StoreExpressions(self): prop=[] for att,val in self.dyn_attrs.items(): prop.append(str(att+'='+val)) db = PyTango.Database() db.put_device_property(self.get_name(),{'SimAttributes': prop, 'SimStates': self.SimStates}) #------------------------------------------------------------------ # ListExpressions command: # # Description: It returns a list with the expressions actually set for each attribute. # argout: DevString #------------------------------------------------------------------ def ListExpressions(self): argout='SimAttributes:\n' for att,val in self.dyn_attrs.items(): argout=argout+'\t'+str(att+'='+val)+'\n' argout=argout+'\nSimStates:\n' for s in self.SimStates: argout=argout+'\t'+s+'\n' return argout """ #------------------------------------------------------------------------------------------------------ # End Of DynamicDS class #------------------------------------------------------------------------------------------------------
[docs]class DynamicDSClass(PyTango.DeviceClass): #This device will point to the device actually being readed; it is set by read_attr_hardware() method; it should be thread safe DynDev = None # Class Properties class_property_list = { 'DynamicSpectrumSize': [PyTango.DevLong, "It will fix the maximum size for all Dynamic Attributes.", [ 4096 ] ], } # Device Properties device_property_list = { 'DynamicAttributes': [PyTango.DevVarStringArray, "Attributes and formulas to create for this device.\n"\ "This Tango Attributes will be generated dynamically using this syntax:\n"\ "\tT3=int(SomeCommand(7007)/10.)\n\n"\ "See the class description to know how to make any method available in attributes declaration.\n"\ "NOTE:Python generators dont work here, use comprehension lists instead.", [ '#Write here your Attribute formulas' ] ], 'DynamicStates': [PyTango.DevVarStringArray, "This property will allow to declare new States dinamically based on\n"\ "dynamic attributes changes. The function Attr will allow to use the\n"\ "value of attributes in formulas.\n\n\n\nALARM=Attr(T1)>70\nOK=1", [ '#Write here your State formulas' ] ], 'DynamicCommands': [PyTango.DevVarStringArray, "This property will allow to declare new Commands at startup with formulas like: \n"\ "\tSendStrings=DevLong(WATTR(NAME+'/Channel',SPECTRUM(str,ARGS)))", [ '#Write here your Command formulas' ] ], 'DynamicQualities': [PyTango.DevVarStringArray, "This property will allow to declare formulas for Attribute Qualities.", [] ], 'DynamicStatus': [PyTango.DevVarStringArray, "Each line generated by this property code will be added to status", [] ], 'LoadFromFile': [PyTango.DevString, "If not empty, a file where additional attribute formulas can be declared. It will be parsed BEFORE DynamicAttributes", ['no'] ], 'KeepAttributes': [PyTango.DevVarStringArray, "This property can be used to store the values of only needed attributes; values are 'yes', 'no' or a list of attribute names", ['yes'] ], 'KeepTime': [PyTango.DevDouble, "The kept value will be returned if a kept value is re-asked within this milliseconds time (Cache).", [ 200 ] ], 'StartupDelay': [PyTango.DevDouble, "The device server will wait this time in milliseconds before starting.", [ 1000 ] ], 'CheckDependencies': [PyTango.DevBoolean, "This property manages if dependencies between attributes are used to check readability.", [True] ], 'UseEvents': [PyTango.DevVarStringArray, "Value of this property will be yes/true,no/false or a list of attributes that will trigger push_event (if configured from jive)", ['false'] ], 'UseTaurus': [PyTango.DevBoolean, "This property manages if Taurus or PyTango will be used to read external attributes.", [False] ], 'LogLevel': [PyTango.DevString, "This property selects the log level (DEBUG/INFO/WARNING/ERROR)", ['INFO'] ], } # Command definitions cmd_list = { 'updateDynamicAttributes': [[PyTango.DevVoid, "Reloads properties and updates attributes"], [PyTango.DevVoid, "Reloads properties and updates attributes"], { 'Display level':PyTango.DispLevel.EXPERT, } ], 'evaluateFormula': [[PyTango.DevString, "formula to evaluate"], [PyTango.DevString, "formula to evaluate"], { 'Display level':PyTango.DispLevel.EXPERT, } ], 'Help': [[PyTango.DevVoid, ""], [PyTango.DevString, "python docstring"],], 'getDynamicConfig': [[PyTango.DevVoid, "Print current property values"], [PyTango.DevString, "Print current property values"], { 'Display level':PyTango.DispLevel.EXPERT, } ], 'getMemUsage': [[PyTango.DevVoid, "Returns own process RSS memory usage (Kb)"], [PyTango.DevDouble, "Returns own process RSS memory usage (Kb)"], { 'Display level':PyTango.DispLevel.EXPERT, } ], } # Attribute definitions attr_list = { 'MemUsage': [[PyTango.DevDouble, PyTango.SCALAR, PyTango.READ]], } @staticmethod def __new__(cls,*args,**kwargs): """ Adding own Properties/Commands to subclasses """ print('In DynamicDSClass.__new__(%s): updating properties'%(cls,)) dicts = ('class_property_list','device_property_list','cmd_list','attr_list') for d in dicts: dct = getattr(cls,d) for p,v in getattr(DynamicDSClass,d).items(): if p not in dct: dct[p] = v #cls = cls if cls is not DynamicDSClass else PyTango.DeviceClass instance = PyTango.DeviceClass.__new__(cls,*args,**kwargs) return instance
[docs] def dyn_attr(self,dev_list): print('In DynamicDSClass.dyn_attr(%s)'%(dev_list,)) for dev in dev_list: DynamicDS.dyn_attr(dev)
[docs] def get_devs_in_server(self,MyClass=None): """ Method for getting a dictionary with all the devices running in this server """ MyClass = MyClass or DynamicDS if not hasattr(MyClass,'_devs_in_server'): MyClass._devs_in_server = {} #This dict will keep an access to the class objects instantiated in this Tango server if not MyClass._devs_in_server: U = PyTango.Util.instance() for klass in U.get_class_list(): for dev in U.get_device_list_by_class(klass.get_name()): if isinstance(dev,DynamicDS): MyClass._devs_in_server[dev.get_name()]=dev return MyClass._devs_in_server #====================================================================================================== # # END OF DynamicDS AND DynamicDSClass CLASSES DECLARATION # #====================================================================================================== #====================================================================================================== # # Additional Classes for Attribute types management ... # #======================================================================================================
[docs]class DynamicDSType(object): """ Allows to parse all the Tango types for Attributes """ def __init__(self,tangotype,labels,pytype,dimx=1,dimy=1): self.tangotype=tangotype self.name=labels[0] if labels else '' self.labels=labels self.pytype=pytype self.dimx=dimx self.dimy=dimy
[docs] def match(self,expr): for l in self.labels: if re.match(l.replace('(','\(').replace('[','\[')+'[\(,]',expr): return True return False #: Dictionary that contains the definition of all types available in DynamicDS attributes #: Tango Type casting in formulas is done using int(value), SPECTRUM(int,value), IMAGE(int,value) for bool,int,float,str types
DynamicDSTypes={ #Labels will be matched at the beginning of formulas using re.match(label+'[(,]',formula) 'DevState':DynamicDSType(PyTango.ArgType.DevState,['DevState',],int), 'DevLong':DynamicDSType(PyTango.ArgType.DevLong,['DevLong','DevULong','int','SCALAR(int','DYN(int'],int), 'DevLong64':DynamicDSType(PyTango.ArgType.DevLong,['DevLong64','DevULong64','long','SCALAR(long'],long), 'DevShort':DynamicDSType(PyTango.ArgType.DevShort,['DevShort','DevUShort','short'],int), 'DevString':DynamicDSType(PyTango.ArgType.DevString,['DevString','str','SCALAR(str'],lambda x:str(x or '')), 'DevBoolean':DynamicDSType(PyTango.ArgType.DevBoolean,['DevBoolean','bit','bool','Bit','Flag','SCALAR(bool'],lambda x:False if str(x).strip().lower() in ('','0','none','false','no') else bool(x)), 'DevDouble':DynamicDSType(PyTango.ArgType.DevDouble,['DevDouble','DevDouble64','float','double','DevFloat','IeeeFloat','SCALAR(float'],float), 'DevVarLongArray':DynamicDSType(PyTango.ArgType.DevLong,['DevVarLongArray','DevVarULongArray','DevVarLong64Array','DevVarULong64Array','SPECTRUM(int','list(long','[long','list(int','[int'],lambda l:[int(i) for i in ([],l)[hasattr(l,'__iter__')]],4096,1), 'DevVarShortArray':DynamicDSType(PyTango.ArgType.DevShort,['DevVarShortArray','DevVarUShortArray','list(short','[short'],lambda l:[int(i) for i in ([],l)[hasattr(l,'__iter__')]],4096,1), 'DevVarStringArray':DynamicDSType(PyTango.ArgType.DevString,['DevVarStringArray','SPECTRUM(str','list(str','[str'],lambda l:[str(i) for i in ([],l)[hasattr(l,'__iter__')]],4096,1), 'DevVarBooleanArray':DynamicDSType(PyTango.ArgType.DevShort,['DevVarBooleanArray','SPECTRUM(bool','list(bool','[bool'],lambda l:[bool(i) for i in ([],l)[hasattr(l,'__iter__')]],4096,1), 'DevVarDoubleArray':DynamicDSType(PyTango.ArgType.DevDouble,['DevVarDoubleArray','SPECTRUM(float','DevVarFloatArray','list(double','[double','list(float','[float'],lambda l:[float(i) for i in ([],l)[hasattr(l,'__iter__')]],4096,1), 'DevVarLongImage':DynamicDSType(PyTango.ArgType.DevLong,['DevVarLongImage','IMAGE(int,'],lambda l:[map(int,i) for i in ([],l)[hasattr(l,'__iter__')]],4096,4096), 'DevVarShortImage':DynamicDSType(PyTango.ArgType.DevShort,['DevVarShortImage',],lambda l:[map(int,i) for i in ([],l)[hasattr(l,'__iter__')]],4096,4096), 'DevVarStringImage':DynamicDSType(PyTango.ArgType.DevString,['DevVarStringImage','IMAGE(str,'],lambda l:[map(str,i) for i in ([],l)[hasattr(l,'__iter__')]],4096,4096), 'DevVarBooleanImage':DynamicDSType(PyTango.ArgType.DevShort,['DevVarBooleanImage','IMAGE(bool,'],lambda l:[map(bool,i) for i in ([],l)[hasattr(l,'__iter__')]],4096,4096), 'DevVarDoubleImage':DynamicDSType(PyTango.ArgType.DevDouble,['DevVarDoubleImage','IMAGE(float,'],lambda l:[map(float,i) for i in ([],l)[hasattr(l,'__iter__')]],4096,4096), } DynamicDSTypes['DevVarFloatArray'] = DynamicDSTypes['DevVarDoubleArray'] DynamicDSTypes['DevULong'] = DynamicDSTypes['DevLong'] DynamicDSTypes['DevULong64'] = DynamicDSTypes['DevLong64'] DynamicDSTypes['DevUShort'] = DynamicDSTypes['DevShort'] DynamicDSTypes['DevDouble64'] = DynamicDSTypes['DevDouble']
[docs]def isTypeSupported(ttype,n_dim=None): if n_dim is not None and n_dim not in (0,1): return False ttype = getattr(ttype,'name',str(ttype)) return any(ttype in t.labels for t in DynamicDSTypes.values())
[docs]def castDynamicType(dims,klass,value): t = {(0,int):'DevLong',(0,float):'DevDouble',(0,bool):'DevBoolean',(0,str):'DevString', (1,int):'DevVarLongArray',(1,float):'DevVarDoubleArray',(1,bool):'DevVarBooleanArray',(1,str):'DevVarStringArray', (2,int):'DevVarLongImage',(2,float):'DevVarDoubleImage',(2,bool):'DevVarBooleanImage',(2,str):'DevVarStringImage',} return DynamicDSTypes[t[dims,klass]].pytype(value)
[docs]def CreateDynamicCommands(ds,ds_class): """ By convention all dynamic commands have argin=DevVarStringArray, argout=DevVarStringArray This function will check all dynamic devices declared within this server @todo an special declaration should allow to redefine that! DevComm(typein,typeout,code) The code to add a new command will be something like: #srubio: it has been added for backward compatibility PyPLC.WriteBit,PyPLCClass.cmd_list['WriteBit']=PyPLC.WriteFlag,[[PyTango.DevVarShortArray, "DEPRECATED, Use WriteFlag instead"], [PyTango.DevVoid, "DEPRECATED, Use WriteFlag instead"]] """ U = PyTango.Util.instance() server = U.get_ds_name() print('In DynamicDS.CreateDynamicCommands(%s)'%(server,)) db = U.get_database() #devices = DynamicDSClass('DynamicDS').get_devs_in_server() classes = list(db.get_device_class_list(server)) print('class = %s; classes = %s' % (ds.__name__,classes)) devs = [classes[i] for i in range(len(classes)-1) if classes[i+1]==ds.__name__] print('devs = %s'%(devs,)) if not hasattr(ds,'dyn_comms'): ds.dyn_comms = CaselessDict() for dev in devs: prop = db.get_device_property(dev,['DynamicCommands'])['DynamicCommands'] print('In DynamicDS.CreateDynamicCommands(%s.%s): %s'%(server,dev,prop)) prop = DynamicDS.check_property_extensions('DynamicCommands',prop) lines = [(dev+'/'+l.split('=',1)[0].strip(),l.split('=',1)[1].strip()) for l in [d.split('#')[0].strip() for d in prop if d] if l] ds.dyn_comms.update(lines) for name,formula in lines: #ds.dyn_comms.items(): name = name.rsplit('/',1)[-1] if name.lower() in [s.lower() for s in dir(ds)]: print('Dynamic Command %s.%s Already Exists, skipping!!!'%(type(ds),name)) continue name = ([n for n in ds_class.cmd_list.keys() if n.lower()==name.lower()] or [name])[0] return_type = PyTango.CmdArgType.names.get(formula.split('(')[0],PyTango.DevString) itype = ('SCALAR(int,ARGS)' in formula and PyTango.DevLong or 'SCALAR(float,ARGS)' in formula and PyTango.DevDouble or 'SCALAR(str,ARGS)' in formula and PyTango.DevString or 'SCALAR(bool,ARGS)' in formula and PyTango.DevBoolean or 'SPECTRUM(int,ARGS)' in formula and PyTango.DevVarLongArray or 'SPECTRUM(float,ARGS)' in formula and PyTango.DevVarDoubleArray or 'SPECTRUM(str,ARGS)' in formula and PyTango.DevVarStringArray or 'SPECTRUM(bool,ARGS)' in formula and PyTango.DevVarBooleanArray or 'ARGS' in formula and PyTango.DevVarStringArray or PyTango.DevVoid) ds_class.cmd_list[name] = [[itype, "ARGS"],[return_type, "result"],] #USING STATIC METHODS; THIS PART MAY BE SENSIBLE TO PyTANGO UPGRADES setattr(ds,name,lambda obj,argin=None,cmd_name=name:obj.evalCommand(cmd_name,argin)) #lambda obj,argin=None,cmd_name=name: (obj._locals.update((('ARGS',argin),)),obj.evalAttr(ds.dyn_comms[obj.get_name()+'/'+cmd_name]))[-1]) print('Out of DynamicDS.CreateDynamicCommands(%s)'%(server,)) return
[docs]class DynamicAttribute(object): ''' This class provides a background for dynamic attribute management and interoperativity Future subclasses could override the operands of the class to manage quality and date modifications ''' qualityOrder = [PyTango.AttrQuality.ATTR_VALID,PyTango.AttrQuality.ATTR_CHANGING,PyTango.AttrQuality.ATTR_WARNING, PyTango.AttrQuality.ATTR_ALARM,PyTango.AttrQuality.ATTR_INVALID] def __init__(self,value=None,date=0.,quality=AttrQuality.ATTR_VALID): self.value=value self.max_peak=(value if not hasattr(value,'__len__') else None,0) self.min_peak=(value if not hasattr(value,'__len__') else None,0) self.forced=None self.date=date self.quality=quality self.formula=None self.compiled = None self.states_queue=[] self.type=None self.keep = True self.dependencies = None #it will be initialized to set() in evalAttr self.primeOlder=False # #self.__add__ = lambda self,other: self.value.__add__(other)
[docs] def getItem(self,index=None): if type(self.value) is list or list in self.value.__class__.__bases__: return self.value.__getitem__(index) elif not index: return value else: raise Exception,'InvalidIndex%sFor%s'%(str(index),str(value))
[docs] def update(self,value,date,quality): self.value=value self.date=date self.quality=quality try: if value is not None and not hasattr(value,'__len__') and not isinstance(value,Exception): if self.max_peak[0] is None or self.value>self.max_peak[0]: self.max_peak = (value,date) if self.min_peak[0] is None or self.value<self.min_peak[0]: self.min_peak = (value,date) except: pass
def __add__(self,other): result = DynamicAttribute() #This method is wrong, qualities are not ordered by criticity! result.update(self.value.__add__(other),min([self.date,other.date]),max([self.quality,other.quality])) return result.value def __repr__(self,klass='DynamicAttribute'): r='%s({'%klass r+='"%s": %s; '%('value',repr(self.value)) r+='"%s": "%s"; '%('date',time.ctime(self.date)) r+='"%s": %s; '%('quality',str(self.quality)) if self.type: r+='"%s": %s; '%('type',hasattr(self.type,'labels') and self.type.labels[0] or str(self.type)) r+='})' if len(r)>80*2: r = r.replace(';',',\n\t') else: r = r.replace(';',',') return r
[docs] def operator(self,op_name,other=None,unary=False,multipleargs=False): #print('operator() called for %s(%s).%s(%s)'%(self.__class__,str(type(self.value)),op_name,other and other.__class__ or '')) value = self.value if value is None: if op_name in ['__nonzero__','__int__','__float__','__long__','__complex__']: value = 0 elif op_name in ['__str__','__repr__']: return '' else: return None result = DynamicAttribute() result.quality,result.date,result.primeOlder=self.quality,self.date,self.primeOlder op_name = '__len__' if op_name == '__nonzero__' and type(value) is list else op_name if op_name in ['__eq__','__lt__','__gt__','__ne__','__le__','__ge__'] and '__cmp__' in dir(value): if op_name is '__eq__': method = lambda s,x: not bool(s.__cmp__(x)) if op_name is '__ne__': method = lambda s,x: bool(s.__cmp__(x)) if op_name is '__lt__': method = lambda s,x: (s.__cmp__(x)<0) if op_name is '__le__': method = lambda s,x: (s.__cmp__(x)<=0) if op_name is '__gt__': method = lambda s,x: (s.__cmp__(x)>0) if op_name is '__ge__': method = lambda s,x: (s.__cmp__(x)>=0) elif hasattr(type(value),op_name) and hasattr(value,op_name): #Be Careful, method from the class and from the instance don't get the same args method = getattr(type(value),op_name) #print('Got %s from %s: %s'%(op_name,type(value),method)) #elif op_name in value.__class__.__base__.__dict__: # method = value.__class__.__base__.__dict__[op_name] else: raise Exception,'DynamicAttribute_WrongMethod%sFor%sType==(%s)'% (op_name,str(type(value)),value) if unary: if value is None and op_name in ['__nonzero__','__int__','__float__','__long__','__complex__']: result.value = method(0) else: result.value = method(value) elif multipleargs: args=[value]+list(other) result.value = method(*args) elif isinstance(other,DynamicAttribute): #print(str(self),'.',op_name,'(',str(other),')') result.quality = self.quality if self.qualityOrder.index(self.quality)>self.qualityOrder.index(other.quality) else other.quality result.date = min([self.date,other.date]) if self.primeOlder else max([self.date,other.date]) result.value = method(value,other.value) else: #print('%s,%s(%s),%s(%s)' % (method,type(value),value,type(other),other)) result.value = method(value,other) if op_name in ['__nonzero__','__int__','__float__','__long__','__complex__','__index__','__len__','__str__', '__eq__','__lt__','__gt__','__ne__','__le__','__ge__']: return result.value else: return result
def __add__(self,other): return self.operator(inspect.currentframe().f_code.co_name,other) def __mul__(self,other): return self.operator(inspect.currentframe().f_code.co_name,other) def __pow__(self,other): return self.operator(inspect.currentframe().f_code.co_name,other) def __sub__( self, other): return self.operator(inspect.currentframe().f_code.co_name,other) def __mod__( self, other): return self.operator(inspect.currentframe().f_code.co_name,other) def __div__(self,other): return self.operator(inspect.currentframe().f_code.co_name,other) def __rshift__(self,other): return self.operator(inspect.currentframe().f_code.co_name,other) def __lshift__(self,other): return self.operator(inspect.currentframe().f_code.co_name,other) #def __truediv__(self,other): return self.value.__class__.__dict__[inspect.currentframe().f_code.co_name](self.value,other) def __radd__(self,other): return self.operator(inspect.currentframe().f_code.co_name,other) def __rmul__(self,other): return self.operator(inspect.currentframe().f_code.co_name,other) def __rpow__(self,other): return self.operator(inspect.currentframe().f_code.co_name,other) def __rsub__( self, other): return self.operator(inspect.currentframe().f_code.co_name,other) def __rmod__( self, other): return self.operator(inspect.currentframe().f_code.co_name,other) def __rdiv__(self,other): return self.operator(inspect.currentframe().f_code.co_name,other) def __rrshift__(self,other): return self.operator(inspect.currentframe().f_code.co_name,other) def __rlshift__(self,other): return self.operator(inspect.currentframe().f_code.co_name,other) #def __rtruediv__(self,other): return self.value.__class__.__dict__[inspect.currentframe().f_code.co_name](self.value,other) def __complex__(self): return self.operator(inspect.currentframe().f_code.co_name,unary=True) def __float__(self): return self.operator(inspect.currentframe().f_code.co_name,unary=True) def __int__(self): return self.operator(inspect.currentframe().f_code.co_name,unary=True) def __long__(self): return self.operator(inspect.currentframe().f_code.co_name,unary=True) def __nonzero__(self): return self.operator(inspect.currentframe().f_code.co_name,unary=True) def __len__(self): return self.operator(inspect.currentframe().f_code.co_name,unary=True) def __str__(self): return self.operator(inspect.currentframe().f_code.co_name,unary=True) def __lt__( self, other): return self.operator(inspect.currentframe().f_code.co_name,other)#lower than def __le__( self, other): return self.operator(inspect.currentframe().f_code.co_name,other)#lower/equal def __eq__( self, other): return self.operator(inspect.currentframe().f_code.co_name,other)#equal def __ne__( self, other): return self.operator(inspect.currentframe().f_code.co_name,other)#not equal def __gt__( self, other): return self.operator(inspect.currentframe().f_code.co_name,other) def __ge__( self, other): return self.operator(inspect.currentframe().f_code.co_name,other) def __cmp__( self, other): return self.operator(inspect.currentframe().f_code.co_name,other)#returns like strcmp (neg=lower,0=equal,pos=greater) #List operations def __contains__(self,other): return self.operator(inspect.currentframe().f_code.co_name,other) def __getitem__(self,other): return self.operator(inspect.currentframe().f_code.co_name,other) def __getslice__(self,*args): return self.operator(inspect.currentframe().f_code.co_name,args,multipleargs=True) def __iter__(self,*args): return self.operator(inspect.currentframe().f_code.co_name,unary=True)
[docs] def next(self,*args): return self.operator(inspect.currentframe().f_code.co_name,unary=True)
[docs] def index(self,other): return self.operator(inspect.currentframe().f_code.co_name,other)
[docs] def append(self,other): return self.operator(inspect.currentframe().f_code.co_name,other)
[docs] def count(self,other): return self.operator(inspect.currentframe().f_code.co_name,other)
[docs] def extend(self,other): return self.operator(inspect.currentframe().f_code.co_name,other)
[docs] def sort(self,other): return self.operator(inspect.currentframe().f_code.co_name,other) #Boolean operations
def __and__( self, other): return self.operator(inspect.currentframe().f_code.co_name,other) def __xor__( self, other): return self.operator(inspect.currentframe().f_code.co_name,other) def __or__( self, other): return self.operator(inspect.currentframe().f_code.co_name,other) #Called to implement the unary arithmetic operations (-, +, abs() and ~). def __neg__( self): return self.operator(inspect.currentframe().f_code.co_name,unary=True) def __pos__( self): return self.operator(inspect.currentframe().f_code.co_name,unary=True) def __abs__( self): return self.operator(inspect.currentframe().f_code.co_name,unary=True) def __invert__( self): return self.operator(inspect.currentframe().f_code.co_name,unary=True) # #if op = add or sub or mul or div: # __rop__ (self,other): These methods are called to implement the binary arithmetic operations (+, -, *, /, %, divmod(), pow(), **, <<, >>, &, ^, |) with reflected (swapped) operands. These functions are only called if the left operand does not support the corresponding operation and the operands are of different types # # __iop__ (self,other): These methods are called to implement the augmented arithmetic operations (+=, -=, *=, /=, //=, %=, **=, <<=, >>=, &=, ^=, |=). #l.__add__ l.__doc__ l.__gt__ l.__le__ l.__reduce__ l.__setitem__ l.index #l.__class__ l.__eq__ l.__hash__ l.__len__ l.__reduce_ex__ l.__setslice__ l.insert #l.__contains__ l.__ge__ l.__iadd__ l.__lt__ l.__repr__ l.__str__ l.pop #l.__delattr__ l.__getattribute__ l.__imul__ l.__mul__ l.__reversed__ l.append l.remove #l.__delitem__ l.__getitem__ l.__init__ l.__ne__ l.__rmul__ l.count l.reverse #l.__delslice__ l.__getslice__ l.__iter__ l.__new__ l.__setattr__ l.extend l.sort #================================================================== # # Fandango DynamicDS Server main method # #==================================================================
[docs]class DynamicServer(object): """ The DynamicServer class provides .util .instance .db .classes to have access to Tango DS internals. To load your own custom classes you can override the load_class method to modify how classes are generated (see CopyCatDS as example) """ PROPERTY = 'PYTHON_CLASSPATH' def __init__(self,name='',classes={},add_debug=False,log='-v2',orb=[]): if not name: server,instance,logs,orb = self.parse_args() self.name = server+'/'+instance else: self.name,server,instance,logs = name,name.split('/')[0],name.split('/')[-1],log self.args = [server,instance,logs]+orb print('In DynamicServer(%s)'%(self.args,)) self.util = PyTango.Util(filter(bool,self.args)) self.instance = self.util.instance() self.db = self.instance.get_database() class_list = self.db.get_device_class_list(self.instance.get_ds_name()) #Device,Class object list self.classes = fandango.dicts.defaultdict(list) [self.classes[c].append(d) for d,c in zip(class_list[::2],class_list[1::2]) if c.lower()!='dserver'] for C,devs in classes.items(): for d in devs: if C not in self.classes or d not in self.classes[C]: fandango.tango.add_new_device(self.name,C,d) self.classes[C].append(d) self.path = (self.db.get_property(self.PROPERTY,['DeviceClasses'])['DeviceClasses'] or [''])[0] if self.path: sys.path.append(self.path) self.modules = {} [self.load_class(c) for c in self.classes] print('\nDynamicDS: %d classes loaded: %s'%(len(self.classes),','.join(self.classes))) if add_debug and 'DDebug' not in self.classes: from fandango.device import DDebug DDebug.addToServer(self.util,*(self.name.split('/')))
[docs] def parse_args(self,args=[]): import sys if not args:args = sys.argv assert len(args)>=2,'1 argument required!:\n\tpython dynamic.py instance [-vX]' print(args) server = args[0] if not fandango.re.match('^(.*[/])?dynamic.py$',args[0]) else 'DynamicServer' instance = args[1] logs,orb = '-v2' if instance!='-?' else '',[] for i in (2,3): if args[i:]: if args[i].startswith('-v'): logs = args[i] else: orb = args[i:i+2] break else: break ds_name = server+'/'+instance return (server,instance,logs,orb)
[docs] def load_class(self,c): try: if c in locals(): return locals[c] p = (self.db.get_property(self.PROPERTY,[c])[c] or [''])[0] print('\nLoading %s from %s' % (c,p or self.path)) if p: self.modules[c] = fandango.objects.loadModule(p) elif c in dir(fandango.device): self.modules[c] = fandango.device elif c in dir(fandango.interface): self.modules[c] = fandango.interface else: try: self.modules[c] = fandango.objects.loadModule(c) assert getattr(self.modules[c],c+'Class') except: m = self.path+'/%s/%s.py'%(c,c) print('\nLoading %s from %s' % (c,m)) self.modules[c] = fandango.objects.loadModule(m) self.util.add_TgClass(getattr(self.modules[c],c+'Class'),getattr(self.modules[c],c),c) except: traceback.print_exc() sys.exit(-1)
[docs] def main(self,args=None): #Args argument has no effect! @TODO print('DynamicDS.main(%s)'%(args or sys.argv,)) U = self.util.instance() U.server_init() U.server_run()
__doc__ = fandango.doc.get_fn_autodoc(__name__,vars(),module_vars=['DynamicDSTypes']) if __name__ == '__main__': print('.'*80) pyds = DynamicServer(add_debug=True) print('loaded ...') pyds.main() print('launched ...') #try: #py = PyTango.Util(sys.argv) ## Adding all commands/properties from fandango.DynamicDS #PySignalSimulator,PySignalSimulatorClass = FullTangoInheritance('PySignalSimulator',PySignalSimulator,PySignalSimulatorClass,DynamicDS,DynamicDSClass,ForceDevImpl=True) #py.add_TgClass(PySignalSimulatorClass,PySignalSimulator,'PySignalSimulator') #U = PyTango.Util.instance() #fandango.dynamic.CreateDynamicCommands(PySignalSimulator,PySignalSimulatorClass) #U.server_init() #U.server_run() #except PyTango.DevFailed,e: #print '-------> Received a DevFailed exception:',traceback.format_exc() #except Exception,e: #print '-------> An unforeseen exception occured....',traceback.format_exc() #else: ##Enabling subclassing #PySignalSimulator,PySignalSimulatorClass = FullTangoInheritance('PySignalSimulator',PySignalSimulator,PySignalSimulatorClass,DynamicDS,DynamicDSClass,ForceDevImpl=True)