provides tango utilities for fandango, like database search methods and emulated Attribute Event/Value types
This module is a light-weight set of utilities for PyTango.
Classes dedicated for device management will go to fandango.device
Methods for Astor-like management will go to fandango.servers
.. contents::
#python imports
import time,re,os,traceback
#pytango imports
import PyTango
from PyTango import AttrQuality,EventType,DevState,AttrDataFormat,AttrWriteType,CmdArgType
if 'Device_4Impl' not in dir(PyTango):
PyTango.Device_4Impl = PyTango.Device_3Impl
import fandango
import objects
import threading
from fandango.functional import *
from dicts import CaselessDefaultDict,CaselessDict
from objects import Object,Struct
from log import Logger,except2str,printf
from excepts import exc2str
__test__ = {}
#taurus imports, here USE_TAU is defined for all fandango
TAU,USE_TAU = None,False
[docs]def loadTaurus():
print '#'*80
print '%s fandango.tango.loadTaurus()'%time.ctime()
assert str(os.getenv('USE_TAU')).strip().lower() not in 'no,false,0'
import taurus
TAU = taurus
TAU_LOGGER = taurus.core.util.Logger
"""USE_TAU will be used to choose between taurus.Device and PyTango.DeviceProxy"""
print 'fandango.tango: USE_TAU disabled'
TAU = None
return bool(TAU)
[docs]def TGet(*args):
Universal fandango helper, it will return a matching Tango object depending on the arguments passed
Objects are: database (), server (*/*), attribute ((:/)?*/*/*/*), device (*)
if not args:
return get_database()
arg0 = args[0]
if arg0.count('/')==1:
return fandango.servers.ServersDict(arg0)
if arg0.count('/')>(2+(':' in arg0)):
return sorted(get_matching_attributes(arg0)) if isRegexp(arg0,WILDCARDS+' ') else check_attribute(arg0,brief=True)
return sorted(get_matching_devices(arg0)) if isRegexp(arg0,WILDCARDS+' ') else get_device(arg0)
__test__['fandango.tango.TGet'] = ('sys/database/2',['sys/database/2'],{})
'Lime White White Lime White Lime LightBlue Yellow Red Brown LightBlue Orange Magenta Grey'.split()
##@name Access Tango Devices and Database
##TangoDatabase singletone
global TangoDatabase,TangoDevice,TangoProxies
TangoDatabase,TangoDevice,TangoProxies = None,None,None
[docs]def get_tango_host(dev_name='',use_db=False):
If device is a tango model, it will extract the host from the model URL
If devicesis none, then environment variable or PyTango.Database are used to extract the host
If TANGO_HOST is not defined it will always fallback to PyTango.Database()
if dev_name:
if isString(dev_name):
m = matchCl(rehost,dev_name)
m,use_db = None,dev_name
return m.groups()[0] if m else get_tango_host(use_db=use_db)
elif use_db:
use_db = use_db if hasattr(use_db,'get_db_host') else get_database()
host,port = use_db.get_db_host(),int(use_db.get_db_port())
if (matchCl('.*[a-z].*',host.lower())
#and PyTango.__version_number__ < 800):
): #The bug is back!!
#Remove domain name
host = host.strip().split('.')[0]
return "%s:%d"%(host,port)
host = os.getenv('TANGO_HOST')
return host or get_tango_host(use_db=True)
print('ERROR: get_tango_host(): '+traceback.format_exc())
return 'localhost:10000'
[docs]def get_database(host='',port='',use_tau=False):
Method to get a singleton instance of the Tango Database
host/port can be a host,port tuple; a 'host:port' string or a taurus model.
@TODO: host/port is checked only at first creation, once initialized you can't change HOST
global TangoDatabase
global TAU,USE_TAU
if host in (True,False): use_tau,host,port = host,'','' #For backwards compatibility
if '/' in host:
# Parsing a taurus model
host = get_tango_host(host)
if ':' in host:
# Parsing a host:port string
host,port = host.split(':')
args = [host,int(port)] if host and port else []
if False and not args and TangoDatabase:
t = time.time()
#TangoDatabase.get_info() #TOO SLOW TO BE A CHECK!
print time.time()-t
return TangoDatabase
pass #defaulting to Taurus/PyTango
if use_tau and not TAU: TAU = loadTaurus()
db = (use_tau and TAU and TAU.Database(*args)) or PyTango.Database(*args)
if not args: TangoDatabase = db
return db
print traceback.format_exc()
[docs]def get_proxy(argin,use_tau=False,keep=False):
Returns attribute or device proxy depending on argin syntax
if argin.count('/')>(2+(':' in argin)):
return PyTango.AttributeProxy(argin)
return get_device(argin,use_tau,keep)
[docs]def get_device(dev,use_tau=False,keep=False):
if use_tau and not TAU: use_tau = loadTaurus()
if isinstance(dev,basestring):
if dev.count('/')==1: dev = 'dserver/'+dev
if use_tau and TAU:
return TAU.Device(dev)
global TangoProxies
if keep and TangoProxies is None:
TangoProxies = ProxiesDict(use_tau=use_tau)
if TangoProxies and (dev in TangoProxies or keep):
return TangoProxies[dev]
return PyTango.DeviceProxy(dev)
elif isinstance(dev,PyTango.DeviceProxy) or (use_tau and TAU and isinstance(dev,TAU.core.tango.TangoDevice)):
return dev
return None
[docs]def get_database_device(use_tau=False,db=None):
global TangoDevice
td = TangoDevice
if db is None and td is not None:
return td
dev_name = (db or get_database(use_tau=use_tau)).dev_name()
dev_name = get_tango_host(use_db=db)+'/'+dev_name if db else dev_name
td = get_device(dev_name,use_tau=use_tau)
if db is None:
TangoDevice = td
return td
[docs]def add_new_device(server,klass,device):
for c in (server+klass+device):
if re.match('[^a-zA-Z0-9\-\/_]',c):
raise Exception,"'%s' Character Not Allowed! (only a-z,A-Z,0-9,/,-,_)"%c
dev_info = PyTango.DbDevInfo()
dev_info.name = device
dev_info.klass = klass
dev_info.server = server
[docs]def get_device_info(dev,db=None):
This method provides an alternative to DeviceProxy.info() for those devices that are not running
#vals = PyTango.DeviceProxy('sys/database/2').DbGetDeviceInfo(dev)
vals = None
if ':' in dev:
model = fandango.Struct(parse_tango_model(dev))
db = get_database(model.host,model.port)
dev = '/'.join(model.device.split('/')[-3:])
dd = get_database_device(db=db)
vals = dd.DbGetDeviceInfo(dev)
di = Struct([(k,v) for k,v in zip(('name','ior','level','server','host','started','stopped'),vals[1])])
di.exported,di.PID = vals[0]
di.dev_class = dd.DbGetClassForDevice(dev)
return di
raise Exception('get_device_info(%s,%s,%s): %s'%(
[docs]def get_device_host(dev):
Asks the Database device server about the host of this device
return get_device_info(dev).host
[docs]def get_device_started(target):
""" Returns device started time """
return get_database_device().DbGetDeviceInfo(target)[-1][5]
[docs]def get_device_for_alias(alias):
""" returns the device name for a given alias """
try: return get_database().get_device_alias(alias)
except Exception,e:
if 'no device found' in str(e).lower(): return None
return None #raise e
[docs]def get_alias_for_device(dev):
""" return alias for this device """
try: return get_database().get_alias(dev) #.get_database_device().DbGetDeviceAlias(dev)
except Exception,e:
if 'no alias found' in str(e).lower(): return None
return None #raise e
[docs]def get_alias_dict(exp='*'):
returns an {alias:device} dictionary with all matching alias from Tango DB
:param exp:
tango = get_database()
return dict((k,tango.get_device_alias(k)) for k in tango.get_device_alias_list(exp))
[docs]def get_real_name(dev,attr=None):
It translate any device/attribute string by name/alias/label
:param device: Expected format is [host:port/][device][/attribute]; where device can be either a/b/c or alias
:param attr: optional, when passed it will be regexp matched against attributes/labels
if isString(dev):
if attr is None and dev.count('/') in (1,4 if ':' in dev else 3):
dev,attr = dev.rsplit('/',1)
if '/' not in dev:
dev = get_device_for_alias(dev)
if attr is None: return dev
for a in get_device_attributes(dev):
if matchCl(attr,a): return (dev+'/'+a)
if matchCl(attr,get_attribute_label(dev+'/'+a)): return (dev+'/'+a)
return None
[docs]def get_full_name(model):
""" Returns full schema name as needed by HDB++ api
if ':' not in model:
model = get_tango_host()+'/'+model
if not model.startswith('tango://'):
model = 'tango://'+model
return model
[docs]def get_device_commands(dev):
returns a list of device command names
return [c.cmd_name for c in get_device(dev).command_list_query()]
[docs]def get_device_attributes(dev,expressions='*'):
""" Given a device name it returns the attributes matching any of the given expressions """
expressions = map(toRegexp,toList(expressions))
al = (get_device(dev) if isString(dev) else dev).get_attribute_list()
result = [a for a in al for expr in expressions if matchCl(expr,a,terminate=True)]
return result
[docs]def get_device_labels(target,filters='*',brief=True):
Returns an {attr:label} dict for all attributes of this device
Filters will be a regular expression to apply to attr or label.
If brief is True (default) only those attributes with label are returned.
This method works offline, does not need device to be running
labels = {}
if isString(target): d = get_device(target)
else: d,target = target,target.name()
db = get_database()
attrlist = db.get_device_attribute_list(target,filters) if brief and hasattr(db,'get_device_attribute_list') else d.get_attribute_list()
for a in attrlist:
l = get_attribute_label(target+'/'+a,use_db=True) if brief else d.get_attribute_config(a).label
if (not filters or any(map(matchCl,(filters,filters),(a,l)))) and (not brief or l!=a):
labels[a] = l
return labels
[docs]def set_device_labels(target,labels):
Applies an {attr:label} dict for attributes of this device
labels = CaselessDict(labels)
d = get_device(target)
for a in d.get_attribute_list():
if a in labels:
ac = d.get_attribute_config(a)
ac.label = labels[a]
return labels
[docs]def get_matching_device_attribute_labels(device,attribute):
""" To get all gauge port labels: get_matching_device_attribute_labels('*vgct*','p*') """
devs = get_matching_devices(device)
return dict((t+'/'+a,l) for t in devs for a,l in get_device_labels(t,attribute).items() if check_device(t))
[docs]def get_attribute_info(device,attribute):
This method returns attribute info in the attr_list format
It parses values returned by PyTango.DeviceProxy(device).get_attribute_config(attribute)
return format:
PyTango.READ_WRITE, 2],
if isString(device): device = get_device(device)
ai = device.get_attribute_config(attribute)
types = [PyTango.CmdArgType.values.get(ai.data_type),ai.data_format,ai.writable]
if ai.max_dim_x>1: types.append(ai.max_dim_x)
if ai.max_dim_y>0: types.append(ai.max_dim_y)
formats = dict((k,getattr(ai,k))
for k,v in (('label',''),('unit','No unit'),('format',''))
if getattr(ai,k)!=v)
return [types,formats]
[docs]def get_attribute_config(target):
d,a = target.rsplit('/',1)
return get_device(d).get_attribute_config(a)
[docs]def get_attribute_events(target,polled=True,throw=False):
d,a = target.rsplit('/',1)
dp = get_device(d)
polling = dp.get_attribute_poll_period(a)
if polled and not polling:
return None
aei = dp.get_attribute_config(a).events
r = {'polling':polling}
for k,t in (
r[k],v = [None]*len(t),None
for i,p in enumerate(t):
v = str2float(getattr(getattr(aei,k),p))
r[k][i] = v
except: pass #print(k,i,p,v)
if not any(r[k]): r.pop(k)
return r
except Exception,e:
if throw: raise e
return None
[docs]def get_attribute_label(target,use_db=True):
dev,attr = target.rsplit('/',1)
if not use_db: #using AttributeProxy
if attr.lower() in ('state','status'):
return attr
cf = get_attribute_config(target)
return cf.label
else: #using DatabaseDevice
return (get_database().get_device_attribute_property(dev,[attr])[attr].get('label',[attr]) or [''])[0]
[docs]def set_attribute_label(target,label='',unit=''):
if target.lower().rsplit('/')[-1] in ('state','status'):
ap = PyTango.AttributeProxy(target)
cf = ap.get_config()
if label: cf.label = label
if unit: cf.unit = unit
[docs]def parse_db_command_array(data,keys=1,depth=2):
This command will parse data received from DbGetDeviceAttributeProperty2 command.
DB device commands return data in this format: X XSize Y YSize Z ZSize ZValue W WSize WValue
This corresponds to {X:{Y:{Z:[Zdata],W:[Wdata]}}}
Depth of the array is 2 by default
label_of_dev_test_attribute = parse_db_command_array(dbd.DbGetDeviceAttributeProperty2([dev,attr]).,keys=1,depth=2)[dev][attr]['label'][0]
dict = {}
#print '[%s]: %s' % (keys,data)
for x in range(keys):
key = data.pop(0)
try: length = data.pop(0)
except: return None
#print '%s,%s,%s => '%(key,length,depth)
if depth:
k,v = key,parse_db_command_array(data,keys=int(length),depth=depth-1)
length = int(length)
k,v = key,[data.pop(0) for y in range(length)]
k,v = key,[length]
#print '\t%s,%s'%(k,v)
return dict
[docs]def get_class_property(klass,property,db=None):
It returns class property value or just first item if value list has lenght==1
prop = (db or get_database()).get_class_property(klass,[property])[property]
return prop if len(prop)!=1 else prop[0]
[docs]def put_class_property(klass,property,value=None,db=None):
Two syntax are possible:
- put_class_property(class,{property:value})
- put_class_property(class,property,value)
if not isMapping(property):
if isSequence(value) and not isinstance(value,list):
value = list(value)
property = {property:value}
for p,v in property.items():
if isSequence(v):
if len(v)==1:
property[p] = v[0]
elif not isinstance(value,list):
property[p] = list(v)
return (db or get_database()).put_class_property(klass,property)
[docs]def get_device_property(device,property,db=None):
It returns device property value or just first item if value list has lenght==1
prop = (db or get_database()).get_device_property(device,[property])[property]
return prop if len(prop)!=1 else prop[0]
[docs]def put_device_property(device,property,value=None,db=None):
Two syntax are possible:
- put_device_property(device,{property:value})
- put_device_property(device,property,value)
if not isMapping(property):
if isSequence(value) and not isinstance(value,list):
value = list(value)
property = {property:value}
for p,v in property.items():
if isSequence(v):
if len(v)==1:
property[p] = v[0]
elif not isinstance(value,list):
property[p] = list(v)
return (db or get_database()).put_device_property(device,property)
[docs]def get_devices_properties(device_expr,properties,hosts=[],port=10000):
get_devices_properties('*alarms*',props,hosts=[get_bl_host(i) for i in bls])
props must be an string as passed to Database.get_device_property(); regexp are not enabled!
get_matching_device_properties enhanced with multi-host support
@TODO: Compare performance of this method with get_matching_device_properties
expr = device_expr
if not isSequence(properties): properties = [properties]
get_devs = lambda db, reg : [d for d in db.get_device_name('*','*') if not d.startswith('dserver') and matchCl(reg,d)]
if hosts: tango_dbs = dict(('%s:%s'%(h,port),PyTango.Database(h,port)) for h in hosts)
else: tango_dbs = {get_tango_host():get_database()}
return dict(('/'.join((host,d) if hosts else (d,)),db.get_device_property(d,properties))
for host,db in tango_dbs.items() for d in get_devs(db,expr))
[docs]def get_matching_device_properties(devs,props,hosts=[],exclude='*dserver*',port=10000,trace=False):
get_matching_device_properties enhanced with multi-host support
@props: regexp are enabled!
get_devices_properties('*alarms*',props,hosts=[get_bl_host(i) for i in bls])
@TODO: Compare performance of this method with get_devices_properties
db = get_database()
result = {}
if not isSequence(devs): devs = [devs]
if not isSequence(props): props = [props]
if hosts:
hosts = [h if ':' in h else '%s:%s'%(h,port) for h in hosts]
hosts = set(get_tango_host(d) for d in devs)
result = {}
for h in hosts:
result[h] = {}
db = get_database(h)
exps = [h+'/'+e if ':' not in e else e for e in devs]
if trace: print(exps)
hdevs = [d.replace(h+'/','') for d in get_matching_devices(exps,fullname=False)]
if trace: print('%s: %s vs %s'%(h,hdevs,props))
for d in hdevs:
if exclude and matchCl(exclude,d): continue
dprops = [p for p in db.get_device_property_list(d,'*') if matchAny(props,p)]
if not dprops: continue
if trace: print(d,dprops)
vals = db.get_device_property(d,dprops)
vals = dict((k,list(v) if isSequence(v) else v) for k,v in vals.items())
if len(hosts)==1 and len(hdevs)==1:
return vals
result[h][d] = vals
if len(hosts)==1:
return result[h]
return result
[docs]def property_undo(dev,prop,epoch):
db = get_database()
his = db.get_device_property_history(dev,prop)
valids = [h for h in his if str2time(h.get_date())<epoch]
news = [h for h in his if str2time(h.get_date())>epoch]
if valids and news:
print('Restoring property %s/%s to %s'%(dev,prop,valids[-1].get_date()))
elif not valids:print('No property values found for %s/%s before %s'%(dev,prop,time2str(epoch)))
elif not news: print('Property %s/%s not modified after %s'%(dev,prop,time2str(epoch)))
[docs]def get_property_history(dev,prop):
db = get_database()
his = db.get_device_property_history(dev,prop)
return [(str2time(h.get_date()),h.get_value()) for h in his]
[docs]def get_server_property(name,instance,prop):
name = clsub("(dserver/|.py)","",name)
return get_device_property('dserver/'+name+'/'+instance,prop)
# Property extensions
[docs]def get_extension_arg(x):
return x.split(':',1)[-1].split('#')[0]
[docs]def _copy_extension(prop,row,db=None):
#This extension will copy property contents from argument
db = db or get_database()
return db.get_device_property(get_extension_arg(row),[prop])[prop]
[docs]def _file_extension(prop,row,db=None):
#This extension will copy property contents from filename
f = open(get_extension_arg(row))
r = f.readlines()
return r
return []
[docs]def _attr_extension(prop,row,db=None):
This extension will replace the line by an attribute forwarding formula ($Arg = Type(ATTR('arg'))
Syntax is @ATTR:[alias=]model [+formula]
db = db or get_database()
args = row.split(':',1)[-1].split('#')[0].split('=')
s = args[-1].lower() #formula
model = (searchCl(retango,s).group())
ai = get_attribute_config(model) #config
t = cast_tango_type(ai.data_type).__name__ #pytype
f = str(ai.data_format) #format
a = args[0] if len(args)>1 else model.split('/')[-1]
s = s.replace(model,"ATTR('%s')"%model)
r = "%s=%s(%s,%s)"%(a,f,t,s)
return [r]
print('fandango.tango._attr_extension(%s,%s) failed!'%(prop,row))
return []
EXTENSIONS = {'@COPY:':_copy_extension,'@FILE:':_file_extension,'@ATTR:':_attr_extension}
[docs]def check_property_extensions(prop,value,db=None,extensions=EXTENSIONS,filters=[]):
db = db or get_database()
#print(prop in DynamicDSClass.device_property_list,fandango.isSequence(value),any(str(s).startswith(e) for e in DynamicDS._EXTENSIONS for s in value))
if (not filters or prop in filters) and fandango.isSequence(value) and any(str(s).startswith(e) for e in extensions for s in value):
parsed,get_arg = [],(lambda x:x.split(':',1)[-1].split('#')[0])
for v in value:
#if v.startswith('@COPY:'): parsed.extend(DynamicDS._copy_extension(prop,v))
#elif v.startswith('@FILE:'): parsed.extend(DynamicDS._file_extension(prop,v))
ext,f = first([(e,f) for e,f in extensions.items() if v.startswith(e)] or [(None,None)])
if ext: parsed.extend(f(prop,v))
else: parsed.append(v)
except: print('check_property_extensions(%s,%s): %s'%(prop,value,traceback.format_exc()))
return parsed
return value
##@name Methods for searching the database with regular expressions
#Regular Expressions
metachars = re.compile('([.][*])|([.][^*])|([$^+\-?{}\[\]|()])')
#alnum = '[a-zA-Z_\*][a-zA-Z0-9-_\*]*' #[a-zA-Z0-9-_]+ #Added wildcards
alnum = '(?:[a-zA-Z0-9-_\*]|(?:\.\*))(?:[a-zA-Z0-9-_\*]|(?:\.\*))*'
no_alnum = '[^a-zA-Z0-9-_]'
no_quotes = '(?:^|$|[^\'"a-zA-Z0-9_\./])'
rehost = '(?:(?P<host>'+alnum+'(?:\.'+alnum+')?'+'(?:\.'+alnum+')?'+'(?:\.'+alnum+')?'+'[\:][0-9]+)(?:/))' #(?:'+alnum+':[0-9]+/)?
redev = '(?P<device>'+'(?:'+'/'.join([alnum]*3)+'))' #It matches a device name
reattr = '(?:/(?P<attribute>'+alnum+')(?:(?:\\.)(?P<what>quality|time|value|exception|history))?)' #Matches attribute and extension
retango = '(?:tango://)?'+(rehost+'?')+redev+(reattr+'?')+'(?:\$?)'
[docs]def parse_labels(text):
if any(text.startswith(c[0]) and text.endswith(c[1]) for c in [('{','}'),('(',')'),('[',']')]):
labels = eval(text)
return labels
except Exception,e:
print 'ERROR! Unable to parse labels property: %s'%str(e)
return []
exprs = text.split(',')
if all(':' in ex for ex in exprs):
labels = [tuple(e.split(':',1)) for e in exprs]
labels = [(e,e) for e in exprs]
return labels
[docs]def get_model_name(model):
if isString(model):
m = searchCl(retango,str(model).lower())
return m.group() if m else str(model).lower()
model = model.getFullName()
model = model.getModelName()
print traceback.format_exc()
return str(model).lower()
[docs]def parse_tango_model(name,use_tau=False,use_host=False):
{'attributename': 'state',
'attribute': 'state',
'devicename': 'bo01/vc/ipct-01', #Always short name
'device': 'cts:10000/bo01/vc/ipct-01', #Will contain host if use_host or host!=TANGO_HOST
'host': 'cts',
'port': '10000',
'scheme': 'tango'}
values = {'scheme':'tango'}
values['host'],values['port'] = defhost = get_tango_host().split(':',1)
if not use_tau or not TAU: raise Exception('NotTau')
from taurus.core import tango as tctango
from taurus.core import AttributeNameValidator,DeviceNameValidator
validator = {tctango.TangoDevice:DeviceNameValidator,tctango.TangoAttribute:AttributeNameValidator}
values.update((k,v) for k,v in validator[tctango.TangoFactory().findObjectClass(name)]().getParams(name).items() if v)
name = str(name).replace('tango://','')
m = re.match(fandango.tango.retango,name)
if m:
gd = m.groupdict()
values['device'] = '/'.join([s for s in gd['device'].split('/') if ':' not in s])
if gd.get('attribute'): values['attribute'] = gd['attribute']
if gd.get('host'): values['host'],values['port'] = gd['host'].split(':',1)
if 'device' not in values:
return None
values['devicename'] = values['device']
values['model'] = '%s:%s/%s'%(values['host'],values['port'],values['device'])
if use_host or tuple(defhost) != (values['host'],values['port']):
values['device'] = values['model']
if 'attribute' in values:
values['attributename'] = values['attribute']
values['model'] = values['model']+'/'+values['attribute']
return values
TANGO_KEEPTIME = 60 #This variable controls how often the Tango Database will be queried
[docs]class get_all_devices(objects.SingletonMap):
_keeptime = TANGO_KEEPTIME
def __init__(self,exported=False,keeptime=None,host=''):
self._all_devs = []
self._last_call = 0
self._exported = exported
self._host = host and get_tango_host(host)
if keeptime: self.set_keeptime(keeptime)
[docs] def set_keeptime(klass,keeptime):
klass._keeptime = max(keeptime,60) #Only 1 query/minute to DB allowed
[docs] def get_all_devs(self):
now = time.time()
if not self._all_devs or now>(self._last_call+self._keeptime):
#print 'updating all_devs ...............................'
db = get_database(self._host)
self._all_devs = sorted(map(str.lower,
(db.get_device_exported('*') if self._exported
else db.get_device_name('*','*'))))
self._last_call = now
return self._all_devs
def __new__(cls,*p,**k):
instance = objects.SingletonMap.__new__(cls,*p,**k)
return instance.get_all_devs()
[docs]def get_class_devices(klass,db=None):
""" Returns all registered devices for a given class
if not db:
db = get_database()
if isString(db):
db = get_database(db)
return sorted(str(d).lower() for d in db.get_device_name('*',klass))
[docs]def get_matching_devices(expressions,limit=0,exported=False,fullname=False,trace=False):
Searches for devices matching expressions, if exported is True only running devices are returned
Tango host will be included in the matched name if fullname is True
if not isSequence(expressions): expressions = [expressions]
defhost = get_tango_host()
hosts = list(set((m.groups()[0] if m else None) for m in (matchCl(rehost,e) for e in expressions)))
fullname = fullname or ':' in str(expressions) or len(hosts)>1 or hosts[0] not in (defhost,None) #Dont count slashes, as regexps may be complex
all_devs = []
if trace: print(hosts,fullname)
for host in hosts:
if host in (None,defhost):
db_devs = get_all_devices(exported)
odb = PyTango.Database(*host.split(':'))
db_devs = odb.get_device_exported('*') if exported else odb.get_device_name('*','*')
prefix = '%s/'%(host or defhost)
all_devs.extend(prefix+d for d in db_devs)
expressions = map(toRegexp,toList(expressions))
if trace: print(expressions)
if not fullname: all_devs = [r.split('/',1)[-1] if matchCl(rehost,r) else r for r in all_devs]
condition = lambda d: any(matchCl("(%s/)?(%s)"%(defhost,e),d,terminate=True) for e in expressions)
result = sorted(filter(condition,all_devs))
return sorted(result[:limit] if limit else result)
[docs]def get_matching_servers(expressions,tango_host='',exported=False):
Return all servers in the given tango tango_host matching the given expressions.
:param exported: whether servers should be running or not
expressions = toSequence(expressions)
servers = get_database(tango_host).get_server_list()
servers = sorted(set(s for s in servers if matchAny(expressions,s)))
if exported:
exported = get_all_devices(exported=True,host=tango_host)
servers = [s for s in servers if ('dserver/'+s).lower() in exported]
return sorted(servers)
[docs]def find_devices(*args,**kwargs):
#A get_matching_devices() alias, just for backwards compatibility
return get_matching_devices(*args,**kwargs)
[docs]def get_matching_attributes(expressions,limit=0,fullname=None,trace=False):
Returns all matching device/attribute pairs.
regexp only allowed in attribute names
:param expressions: a list of expressions like [domain_wild/family_wild/member_wild/attribute_regexp]
attrs = []
def_host = get_tango_host()
matches = []
if not isSequence(expressions): expressions = [expressions]
fullname = any(matchCl(rehost,e) for e in expressions)
for e in expressions:
match = matchCl(retango,e,terminate=True)
if not match:
if '/' not in e:
host,dev,attr = def_host,e.rsplit('/',1)[0],'state'
#raise Exception('Expression must match domain/family/member/attribute shape!: %s'%e)
host,dev,attr = def_host,e.rsplit('/',1)[0],e.rsplit('/',1)[1]
host,dev,attr = [d[k] for k in ('host','device','attribute') for d in (match.groupdict(),)]
host,attr = host or def_host,attr or 'state'
if trace: print('get_matching_attributes(%s): match:%s,host:%s,dev:%s,attr:%s'%(e,bool(match),host,dev,attr))
fullname = fullname or any(m[0]!=def_host for m in matches)
for host,dev,attr in matches:
if fullname and host not in dev:
dev = host+'/'+dev
for d in get_matching_devices(dev,exported=True,fullname=fullname):
if matchCl(attr,'state',terminate=True):
if attr.lower().strip() != 'state':
ats = sorted(get_device_attributes(d,[attr]),key=str.lower)
attrs.extend([d+'/'+a for a in ats])
if limit and len(attrs)>limit: break
print 'Unable to get attributes for %s'%d
print traceback.format_exc()
result = sorted(set(attrs))
return result[:limit] if limit else result
[docs]def find_attributes(*args,**kwargs):
#A get_matching_attributes() alias, just for backwards compatibility
return get_matching_attributes(*args,**kwargs)
[docs]def get_all_models(expressions,limit=1000):
Customization of get_matching_attributes to be usable in Taurus widgets.
It returns all the available Tango attributes (exported!) matching any of a list of regular expressions.
if isinstance(expressions,str): #evaluating expressions ....
if any(re.match(s,expressions) for s in ('\{.*\}','\(.*\)','\[.*\]')): expressions = list(eval(expressions))
else: expressions = expressions.split(',')
types = [list,tuple,dict]
from PyQt4 import Qt
except: pass
if isinstance(expressions,types):
expressions = list(str(e) for e in expressions)
print 'In get_all_models(%s:"%s") ...' % (type(expressions),expressions)
db = get_database()
if 'SimulationDatabase' in str(type(db)): #used by TauWidgets displayable in QtDesigner
return expressions
return get_matching_attributes(expressions,limit)
## Methods for managing device/attribute lists
[docs]def attr2str(attr_value):
att_name = '%s='%attr_value.name if hasattr(attr_value,'name') else ''
if hasattr(attr_value,'value'):
return '%s%s(%s)' %(att_name,type(attr_value.value).__name__,attr_value.value)
return '%s%s(%s)' %(att_name,type(attr_value).__name__,attr_value)
[docs]def get_domain(model):
if model.count('/') in (2,3): return model.split['/'][0]
else: return ''
[docs]def get_family(model):
if model.count('/') in (2,3): return model.split['/'][1]
else: return ''
[docs]def get_member(model):
if model.count('/') in (2,3): return model.split['/'][2]
else: return ''
[docs]def get_distinct_devices(attrs):
""" It returns a list with the distinct device names appearing in a list """
return sorted(list(set(a.rsplit('/',1)[0] for a in attrs)))
[docs]def get_distinct_domains(attrs):
""" It returns a list with the distinc member names appearing in a list """
return sorted(list(set(a.split('/')[0].split('-')[0] for a in attrs)))
[docs]def get_distinct_families(attrs):
""" It returns a list with the distinc member names appearing in a list """
return sorted(list(set(a.split('/')[1].split('-')[0] for a in attrs)))
[docs]def get_distinct_members(attrs):
""" It returns a list with the distinc member names appearing in a list """
return sorted(list(set(a.split('/')[2].split('-')[0] for a in attrs)))
[docs]def get_distinct_attributes(attrs):
""" It returns a list with the distinc attribute names (excluding device) appearing in a list """
return sorted(list(set(a.rsplit('/',1)[-1] for a in attrs)))
[docs]def reduce_distinct(group1,group2):
""" It returns a list of (device,domain,family,member,attribute) keys that appear in group1 and not in group2 """
vals,rates = {},{}
target = 'devices'
k1,k2 = get_distinct_devices(group1),get_distinct_devices(group2)
vals[target] = [k for k in k1 if k not in k2]
rates[target] = float(len(vals[target]))/(len(k1))
except: vals[target],rates[target] = [],0
target = 'domains'
k1,k2 = get_distinct_domains(group1),get_distinct_domains(group2)
vals[target] = [k for k in k1 if k not in k2]
rates[target] = float(len(vals[target]))/(len(k1))
except: vals[target],rates[target] = [],0
target = 'families'
k1,k2 = get_distinct_families(group1),get_distinct_families(group2)
vals[target] = [k for k in k1 if k not in k2]
rates[target] = float(len(vals[target]))/(len(k1))
except: vals[target],rates[target] = [],0
target = 'members'
k1,k2 = get_distinct_members(group1),get_distinct_members(group2)
vals[target] = [k for k in k1 if k not in k2]
rates[target] = float(len(vals[target]))/(len(k1))
except: vals[target],rates[target] = [],0
target = 'attributes'
k1,k2 = get_distinct_attributes(group1),get_distinct_attributes(group2)
vals[target] = [k for k in k1 if k not in k2]
rates[target] = float(len(vals[target]))/(len(k1))
except: vals[target],rates[target] = [],0
return first((vals[k],rates[k]) for k,r in rates.items() if r == max(rates.values()))
## Methods to export device/attributes/properties to dictionaries
[docs]def export_attribute_to_dict(model,attribute=None,value=None,keep=False,as_struct=False):
get attribute config, format and value from Tango and return it as a dictionary:
:param model: can be a full tango model, a device name or a device proxy
keys: min_alarm,name,data_type,format,max_alarm,ch_event,data_format,value,
attr,proxy = Struct(),None
if not isString(model):
model,proxy = model.name(),model
model = parse_tango_model(model)
attr.device = model['device']
proxy = proxy or get_device(attr.device,keep=keep)
attr.database = '%s:%s'%(model['host'],model['port'])
attr.name = model.get('attribute',None) or attribute or 'state'
attr.model = '/'.join((attr.database,attr.device,attr.name))
attr.color = 'Lime'
attr.time = 0
def vrepr(v):
try: return str(attr.format)%(v)
except: return str(v)
def cleandict(d):
for k,v in d.items():
if v in ('Not specified','No %s'%k):
d[k] = ''
return d
v = value or check_attribute(attr.database+'/'+attr.device+'/'+attr.name)
if v and 'DevFailed' not in str(v):
ac = proxy.get_attribute_config(attr.name)
attr.description = ac.description if ac.description!='No description' else ''
attr.data_format = str(ac.data_format)
attr.data_type = str(PyTango.CmdArgType.values[ac.data_type])
attr.writable = str(ac.writable)
attr.label,attr.min_alarm,attr.max_alarm = ac.label,ac.min_alarm,ac.max_alarm
attr.unit,attr.format = ac.unit,ac.format
attr.standard_unit,attr.display_unit = ac.standard_unit,ac.display_unit
attr.ch_event = fandango.obj2dict(ac.events.ch_event)
attr.arch_event = fandango.obj2dict(ac.events.arch_event)
attr.alarms = fandango.obj2dict(ac.alarms)
attr.quality = str(v.quality)
attr.time = ctime2time(v.time)
if attr.data_format!='SCALAR':
attr.value = list(v.value if v.value is not None and v.dim_x else [])
sep = '\n' if attr.data_type == 'DevString' else ','
svalue = map(vrepr,attr.value)
attr.string = sep.join(svalue)
if 'numpy' in str(type(v.value)):
attr.value = map(fandango.str2type,svalue)
if attr.data_type in ('DevState','DevBoolean'):
attr.value = int(v.value)
attr.string = str(v.value)
attr.value = v.value
attr.string = vrepr(v.value)
if attr.unit.strip() not in ('','No unit'):
attr.string += ' %s'%(attr.unit)
attr.polling = proxy.get_attribute_poll_period(attr.name)
attr.value = None
attr.string = str(v)
if attr.value is None:
attr.data_type = None
elif attr.data_type == 'DevState':
attr.color = TANGO_STATE_COLORS.get(attr.string,'Grey')
elif 'ALARM' in attr.quality:
elif 'WARNING' in attr.quality:
elif 'INVALID' in attr.quality:
attr.color = TANGO_STATE_COLORS['OFF']
except Exception,e:
return dict(attr) if not as_struct else Struct(dict(attr))
[docs]def export_commands_to_dict(device,target='*'):
""" export all device commands config to a dictionary """
name,proxy = (device,get_device(device)) if isString(device) else (device.name(),device)
dct = {}
for c in proxy.command_list_query():
if not fandango.matchCl(target,c.cmd_name): continue
dct[c.cmd_name] = fandango.obj2dict(c)
dct[c.cmd_name]['device'] = name
dct[c.cmd_name]['name'] = c.cmd_name
return dct
[docs]def export_properties_to_dict(device,target='*'):
""" export device or class properties to dictionary """
if '/' in device:
return get_matching_device_properties(device,target)
db = get_database()
props = [p for p in db.get_class_property_list(device) if fandango.matchCl(target,p)]
return dict((k,v if isString(v) else list(v)) for k,v in db.get_class_property(device,props).items())
[docs]def export_device_to_dict(device,commands=True,properties=True):
This method can be used to export the current configuration of devices, attributes and properties to a file.
The dictionary will get properties, class properties, attributes, commands, attribute config, event config, alarm config and pollings.
.. code-block python:
data = dict((d,fandango.tango.export_device_to_dict(d)) for d in fandango.tango.get_matching_devices('*00/*/*'))
i = get_device_info(device)
dct = Struct(fandango.obj2dict(i,fltr=lambda n: n in 'dev_class host level name server'.split()))
dct.attributes,dct.commands = {},{}
if check_device(device):
proxy = get_device(device)
dct.attributes = dict((a,export_attribute_to_dict(proxy,a)) for a in proxy.get_attribute_list())
if commands:
dct.commands = export_commands_to_dict(proxy)
if properties:
dct.properties = export_properties_to_dict(device)
dct.class_properties = export_properties_to_dict(dct.dev_class)
return dict(dct)
## Methods for checking device/attribute availability
[docs]def get_server_pid(server):
return get_device_info('dserver/'+server).PID
print 'failed to use server Admin, using OS'
pid = fandango.linos.get_process_pid(server.replace('/','.py '))
if not pid: pid = fandango.linos.get_process_pid(server.replace('/',' '),exclude='javaArch|grep|screen|0:00')
return pid
[docs]def check_host(host):
Pings a hostname, returns False if unreachable
import fandango.linos
print 'Checking host %s'%host
return fandango.linos.ping(host)[host]
[docs]def check_starter(host):
Checks host's Starter server
if check_host(host):
return check_device('tango/admin/%s'%(host.split('.')[0]))
return False
[docs]def check_device(dev,attribute=None,command=None,full=False,admin=False,bad_state=False):
Command may be 'StateDetailed' for testing HdbArchivers
It will return True for devices ok, False for devices not running and None for unresponsive devices.
if full or admin:
info = get_device_info(dev)
if not info.exported:
return False
if full and not check_host(info.host):
return False
if not check_device('dserver/%s'%info.server,full=False):
return False
dp = PyTango.DeviceProxy(dev)
return False
if attribute: dp.read_attribute(attribute)
elif command: dp.command_inout(command)
s = dp.state()
if bad_state:
assert s not in bad_state and str(s) not in bad_state
return str(s) #True
return True
return None
[docs]def check_attribute(attr,readable=False,timeout=0,brief=False,trace=False):
""" checks if attribute is available.
:param readable: Whether if it's mandatory that the attribute returns a value or if it must simply exist.
:param timeout: Checks if the attribute value have been effectively updated (check zombie processes).
:param brief: return just .value instead of AttrValue object
if hasattr(attr,'read'):
proxy = attr
dev,att = attr.lower().rsplit('/',1)
assert att in [str(s).lower() for s in PyTango.DeviceProxy(dev).get_attribute_list()]
proxy = PyTango.AttributeProxy(attr)
attvalue = proxy.read()
if readable and attvalue.quality == PyTango.AttrQuality.ATTR_INVALID:
return None
elif timeout and attvalue.time.totime()<(time.time()-timeout):
return None
if not brief:
return attvalue
return getattr(attvalue,'value',getattr(attvalue,'rvalue',None))
except Exception,e:
if trace: traceback.print_exc()
return None if readable or brief else e
if trace: traceback.print_exc()
return None
[docs]def read_attribute(attr,timeout=0,full=False):
""" Alias to check_attribute(attr,brief=True)"""
return check_attribute(attr,timeout=timeout,brief=not full)
[docs]def check_device_list(devices,attribute=None,command=None):
This method will check a list of devices grouping them by host and server; minimizing the amount of pings to do.
result = {}
from collections import defaultdict
hosts = defaultdict(lambda:defaultdict(list))
for dev in devices:
info = get_device_info(dev)
if info.exported:
result[dev] = False
for host,servs in hosts.items():
if not check_host(host):
print 'Host %s failed, discarding %d devices'%(host,sum(len(s) for s in servs.values()))
result.update((d,False) for s in servs.values() for d in s)
for server,devs in servs.items():
if not check_device('dserver/%s'%server,full=False):
print 'Server %s failed, discarding %d devices'%(server,len(devs))
result.update((d,False) for d in devs)
for d in devs:
result[d] = check_device(d,attribute=attribute,command=command,full=False)
return result
## Methods usable from within DeviceImpl instances
[docs]def cast_tango_type(value_type):
""" Returns the python equivalent to a Tango type"""
if value_type in (PyTango.DevBoolean,):
return bool
elif value_type in (PyTango.DevDouble,PyTango.DevFloat):
return float
elif value_type in (PyTango.DevLong64,PyTango.DevULong64):
return long
elif value_type in (PyTango.DevState,PyTango.DevShort,PyTango.DevInt,PyTango.DevLong,PyTango.DevULong,PyTango.DevUShort,PyTango.DevUChar):
return int
return str
[docs]def get_device_help(self,str_format='text'):
"""This command returns help text for this device class and its parents"""
if str_format=='text' or True:
linesep,docsep,item = '\n','\n\n------------\n\n',' * %s'
doc = ''
parents = []
for b in type(self).__bases__:
if b.__doc__:
doc = b.__name__+':'+b.__doc__
docs = [type(self).__name__+'(%s)'%','.join(parents),doc]
for t,o in (
('Class Properties','class_property_list'),('Device Properties','device_property_list'),
c = self.get_device_class()
d = getattr(c,o).keys()
if d:
docs.append(t+linesep+linesep+linesep.join(item%k for k in d))
except Exception,e:
raise e
return docsep.join(docs)
[docs]def get_internal_devices():
""" Gets all devices declared in the current Tango server """
U = PyTango.Util.instance()
dct = fandango.CaselessDict()
for klass in U.get_class_list():
for dev in U.get_device_list_by_class(klass.get_name()):
return dct
return {}
[docs]def read_internal_attribute(device,attribute):
This method allows several things:
* If a device object (Impl or Proxy) is given, it is used to read the attribute
* If the attribute belongs to a device in the SAME SERVER, it accesses directly to the device object
* If the attribute belongs to an external SERVER, use PyTango proxies to read it
* It can manage dynamic attributes used within the same SERVER calling read_dyn_attr
device must be a DevImpl object or string, attribute must be an string
if the device is not internal this method will connect to a PyTango Proxy
the method will return a fakeAttributeValue object
print 'read_internal_attribute(%s,%s)'%(device,attribute)
import dynamic
if isString(device):
device = get_internal_devices().get(device,(getattr(attribute,'parent',None) or get_device(device,use_tau=False,keep=True)))
attr = attribute if isinstance(attribute,fakeAttributeValue) else fakeAttributeValue(name=attribute,parent=device)
isProxy, isDyn = isinstance(device,PyTango.DeviceProxy),hasattr(device,'read_dyn_attr')
aname = attr.name.lower()
if aname=='state':
if isProxy: attr.set_value(device.state())
elif hasattr(device,'last_state'): attr.set_value(device.last_state)
else: attr.set_value(device.get_state())
print '%s = %s' % (attr.name,attr.value)
attr.error = ''
if isProxy:
print ('fandango.read_internal_attribute(): calling DeviceProxy(%s).read_attribute(%s)'%(attr.device,attr.name))
val = device.read_attribute(attr.name)
allow_method,read_method = None,None
for s in dir(device):
if s.lower()=='is_%s_allowed'%aname: allow_method = s
if s.lower()=='read_%s'%aname: read_method = s
print ('fandango.read_internal_attribute(): calling %s.is_%s_allowed()'%(attr.device,attr.name))
is_allowed = (not allow_method) or getattr(device,allow_method)(PyTango.AttReqType.READ_REQ)
if not is_allowed:
attr.throw_exception('%s.read_%s method is not allowed!!!'%(device,aname))
elif not read_method:
if isDyn:
print ('fandango.read_internal_attribute(): calling %s(%s).read_dyn_attr(%s)'%(device.myClass,attr.device,attr.name))
if not device.myClass:
attr.throw_exception('\t%s is a dynamic device not initialized yet.'%attr.device)
#Returning valid values
device.myClass.DynDev = device
if dynamic.USE_STATIC_METHODS: device.read_dyn_attr(device,attr)
else: device.read_dyn_attr(attr)
print '%s = %s' % (attr.name,attr.value)
attr.error = ''
attr.throw_exception('%s.read_%s method not found!!!'%(device,aname,[d for d in dir(device)]))
#Returning valid values
msg = ('fandango.read_internal_attribute(): calling %s.read_%s()'%(attr.device,aname))
print msg
print '%s = %s' % (attr.name,attr.value)
attr.error = ''
return attr
[docs]def get_polled_attrs(device,others=None):
@TODO: Tango8 has its own get_polled_attr method; check for incompatibilities
if a device is passed, it returns the polled_attr property as a dictionary
if a list of values is passed, it converts to dictionary
others argument allows to get extra property values in a single DB call;
e.g others = ['polled_cmd'] would append the polled commands to the list
if isSequence(device):
return CaselessDict(zip(map(str.lower,device[::2]),map(float,device[1::2])))
elif isinstance(device,PyTango.DeviceProxy):
attrs = device.get_attribute_list()
periods = [(a.lower(),int(dp.get_attribute_poll_period(a))) for a in attrs]
return CaselessDict((a,p) for a,p in periods if p)
others = others or []
if isinstance(device,PyTango.DeviceImpl):
db = PyTango.Util.instance().get_database()
#polled_attrs = {}
#for st in self.get_admin_device().DevPollStatus(device.get_name()):
#lines = st.split('\n')
#try: polled_attrs[lines[0].split()[-1]]=lines[1].split()[-1]
#except: pass
#return polled_attrs
device = device.get_name()
db = fandango.get_database()
props = db.get_device_property(device,['polled_attr']+toSequence(others))
d = get_polled_attrs(props.pop('polled_attr'))
if others: d.update(props)
return d
## A useful fake attribute value and event class
[docs]class fakeAttributeValue(object):
This class simulates a modifiable AttributeValue object (not available in PyTango)
It is the class used to read values from Dev4Tango devices (valves, pseudos, composer, etc ...)
It also has a read(cache) method to be used as a TaurusAttribute or AttributeProxy (but it returns self if cache is not used)
The cache is controlled by keeptime variable (milliseconds)
:param parent: Apart of common Attribute arguments, parent will be used to keep a proxy to the parent object (a DeviceProxy or DeviceImpl)
def __init__(self,name,value=None,time_=0.,quality=PyTango.AttrQuality.ATTR_VALID,dim_x=1,dim_y=1,parent=None,device='',error=False,keeptime=0):
self.device=device or (self.name.rsplit('/',1)[0] if '/' in self.name else '')
self.set_date(time_ or time.time())
self.write_value = self.wvalue = None
self.error = error
self.keeptime = keeptime*1e3 if keeptime<10. else keeptime
self.lastread = 0
self.type = type(value)
def __repr__(self):
return 'fakeAttributeValue(%s,%s,%s,%s,error=%s)'%(self.name,fandango.log.shortstr(self.value),time.ctime(self.get_time()),self.quality,self.error)
__str__ = __repr__
[docs] def get_name(self): return self.name
[docs] def get_value(self): return self.value
[docs] def get_date(self): return self.time
[docs] def get_time(self): return self.time.totime()
[docs] def get_quality(self): return self.quality
[docs] def read(self,cache=True):
#Method to emulate AttributeProxy returning an AttributeValue
#print '\t\tfakeAttributeValue(%s/%s).read(%s)'%(self.device,self.name,cache)
if not self.parent:
self.parent = get_device(self.device,use_tau=False,keep=True)
if not cache or 0<self.keeptime<(time.time()-self.read()):
return read_internal_attribute(self.parent,self) #it's important to pass self as argument so values will be kept
return self
[docs] def throw_exception(self,msg=''):
self.error = msg or traceback.format_exc()
print 'fakeAttributeValue(%s).throw_exception(%s)'%(self.name,self.error)
#event_type = fakeEventType.lookup['Error']
raise Exception(self.error)
[docs] def set_value(self,value,dim_x=1,dim_y=1):
self.value = self.rvalue = value
if (dim_x,dim_y) == (1,1):
if isSequence(value):
dim_x = len(value)
if len(value)>1 and isSequence(value[0]):
dim_y = len(value[0])
self.dim_x = dim_x
self.dim_y = dim_y
self.lastread = time.time()
[docs] def set_date(self,timestamp):
if not isinstance(timestamp,PyTango.TimeVal):
[docs] def set_quality(self,quality):
[docs] def set_value_date(self,value,date):
[docs] def set_value_date_quality(self,value,date,quality):
[docs] def set_write_value(self,value):
self.write_value = self.wvalue = value
[docs] def get_write_value(self,data = None):
if data is None: data = []
if isSequence(self.write_value):
[data.append(v) for v in self.write_value]
return data
from dicts import Enumeration
fakeEventType = Enumeration(
'fakeEventType', (
[docs]class fakeEvent(object):
def __init__(self,device,attr_name,attr_value,err,errors):
## The ProxiesDict class, to manage DeviceProxy pools
[docs]class ProxiesDict(CaselessDefaultDict,Object): #class ProxiesDict(dict,log.Logger):
''' Dictionary that stores PyTango.DeviceProxies
It is like a normal dictionary but creates a new proxy each time that the "get" method is called
An earlier version is used in PyTangoArchiving.utils module
This class must be substituted by Tau.Core.TauManager().getFactory()()
def __init__(self,use_tau = False):
self.log = Logger('ProxiesDict')
self.use_tau = TAU and use_tau
def __default_factory__(self,dev_name):
Called by defaultdict_fromkey.__missing__ method
If a key doesn't exists this method is called and returns a proxy for a given device.
If the proxy caused an exception (usually because device doesn't exists) a None value is returned
if dev_name not in self.keys():
self.log.debug( 'Getting a Proxy for %s'%dev_name)
devklass,attrklass = (TAU.Device,TAU.Attribute) if self.use_tau else (PyTango.DeviceProxy,PyTango.AttributeProxy)
dev = (attrklass if str(dev_name).count('/')==(4 if ':' in dev_name else 3) else devklass)(dev_name)
except Exception,e:
print('ProxiesDict: %s doesnt exist!'%dev_name)
#print traceback.format_exc()
#raise e
dev = None
return dev
[docs] def get(self,dev_name):
return self[dev_name]
[docs] def get_admin(self,dev_name):
'''Adds to the dictionary the admin device for a given device name and returns a proxy to it.'''
dev = self[dev_name]
class_ = dev.info().dev_class
admin = dev.info().server_id
return self['dserver/'+admin]
[docs] def pop(self,dev_name):
'''Removes a device from the dict'''
if dev_name not in self.keys(): return
self.log.debug( 'Deleting the Proxy for %s'%dev_name)
return CaselessDefaultDict.pop(self,dev_name)
## Tango formula evaluation
import dicts
[docs]def get_attribute_time(value):
#Gets epoch for a Tango value
return getattr(value.time,'tv_sec',value.time)
[docs]class TangoedValue(object):
[docs]def getTangoValue(obj,device=None):
This method may be used to return objects from read_attribute or FIND() that are still computable and keep quality/time members
try to avoid spectrums.; this method doesn't work for numpy arrays so I have to convert them to less efficient lists.
p = parse_tango_model(obj if type(obj) is str else (device or ''))
if p: device,host = p['device'],p['host']+':'+p['port']
else: host = get_tango_host()
if type(obj) is str:
obj = PyTango.AttributeProxy(obj).read()
if hasattr(obj,'quality'):
value,quality,t,name,ty = obj.value,obj.quality,get_attribute_time(obj),obj.name,obj.type
if isSequence(value):
value = value.tolist() if hasattr(value,'tolist') else list(value)
value,quality,t,name,ty = obj,PyTango.AttrQuality.ATTR_VALID,time.time(),'',type(obj)
try: domain,family,member = device.split('/')[-3:]
except: domain,family,member = '','',''
Type = type(value)
Type = Type if (Type not in (bool,None,type(None)) and ty!=PyTango.CmdArgType.DevState) else int
nt = type('tangoed_'+Type.__name__,(Type,TangoedValue),{})
o = nt(value or 0)
[setattr(o,k,v) for k,v in (
return o
print traceback.format_exc()
return obj
def __repr__(self):
return 'v(%s)'%(self.value)
[docs]class TangoEval(object):
Class for Tango formula evaluation; used by Panic-like formulas
te = fandango.TangoEval(cache=3)
te.eval('test/sim/test-00/A * test/sim/test-00/S.delta')
Out: 2.6307095848792521 #A value multiplied by delta of S in its last 3 values
Attributes in the formulas may be (it is recommended to insert spaces between attribute names and operators):
.. code::
dom/fam/memb/attrib >= V1 #Will evaluate the attribute value
d/f/m/a1 > V2 and d/f/m/a2 == V3 #Comparing 2 attributes
d/f/m.quality != QALARM #Using attribute quality
d/f/m/State == UNKNOWN #States can be compared directly
d/f/m/A.exception #True if exception occurred
d/f/m/A.time #Attribute value time
d/f/m/A.value #Explicit value
d/f/m/A.delta #Increase/decrease of the value since the first value in cache (if cache and cache_depth are set)
d/f/m/A.all #Instead of just value will return an AttributeValue object
All tango-like variables are parsed. TangoEval.macros can be used to do regexp-based substitution. By default FIND(regexp) will be replaced by a list of matching attributes.
FIND([a-zA-Z0-9\/].*) macro allows to get any attribute matching a regular expression
Any variable in _locals is evaluated or explicitly replaced in the formula if matches $(); e.g. FIND($(VARNAME)/*/*)
T() < T('YYYY/MM/DD hh:mm') allow to compare actual time with any time
:use_events: will manage events using the callbacks.EventSource object. It will redirect
all events to TangoEval.eventReceived method. If an event_hook callback is passed as argument,
both TangoEval object and result of eval will be sent to it.
eval() will be triggered by events only if event_hook is True or a callable
FIND_EXP = 'FIND\(((?:[ \'\"])?[^)]*(?:[ \'\"])?)\)' #FIND( optional quotes and whatever is not ')' )
#operators = '[><=][=>]?|and|or|in|not in|not'
#l_split = re.split(operators,formula)#.replace(' ',''))
alnum = '[a-zA-Z0-9-_]+'
no_alnum = '[^a-zA-Z0-9-_]'
no_quotes = '(?:^|$|[^\'"a-zA-Z0-9_\./])'
redev = '(?P<device>(?:'+alnum+':[0-9]+/{1,2})?(?:'+'/'.join([alnum]*3)+'))' #It matches a device name
reattr = '(?:/(?P<attribute>'+alnum+')(?:(?:\\.)(?P<what>quality|time|value|exception|delta|all|hist))?)?' #Matches attribute and extension
retango = redev+reattr#+'(?!/)'
regexp = no_quotes + retango + no_quotes.replace('\.','').replace(':','=') #Excludes attr_names between quotes, accepts value type methods
def __init__(self,formula='',launch=True,timeout=1000,keeptime=100,
trace=False, proxies=None, attributes=None, cache=0, use_events = False,
event_hook = None,
self.formula = formula
self.source = ''
self.variables = []
self.timeout = timeout
self.keeptime = keeptime
self.proxies = proxies or ProxiesDict() #use_tau=self.use_tau)
self.use_events = use_events or kwargs.get('use_tau',False)
self.event_hook = event_hook
if attributes:
self.attributes = attributes
from callbacks import CachedAttributeProxy,EventListener
if self.use_events:
proxy = (lambda a: CachedAttributeProxy(a,keeptime=self.keeptime,
proxy = (lambda a: CachedAttributeProxy(a,keeptime=self.keeptime))
self.attributes = dicts.CaselessDefaultDict(proxy)
self.previous = dicts.CaselessDict() #Keeps last values for each variable
self.last = dicts.CaselessDict() #Keeps values from the last eval execution only
self.cache_depth = cache
self.cache = dicts.CaselessDefaultDict(lambda k:list()) if self.cache_depth else None#Keeps [cache]
self.result = None
self.macros = [('FIND(%s)',self.FIND_EXP,self.find_macro)]
self._trace = trace
self._defaults = dict([(str(v),v) for v in PyTango.DevState.values.values()]+[(str(q),q) for q in PyTango.AttrQuality.values.values()])
self._defaults['T'] = str2time
self._defaults['str2time'] = str2time
self._defaults['time'] = time
self._defaults['NOW'] = time.time
self._defaults['DEVICES'] = self.proxies
self._defaults['DEV'] = lambda x:self.proxies[x]
self._defaults['NAMES'] = lambda x: get_matching_devices(x) if x.count('/')<3 else get_matching_attributes(x)
self._defaults['CACHE'] = self.cache
self._defaults['PREV'] = self.previous
#For ComposerDS syntax compatibility
self._defaults['READ'] = self._defaults['ATTR'] = self._defaults['XATTR'] = self.read_attribute
#self._locals['now'] = time.time() #Updated at execution time
self._defaults.update((k,v) for k,v in {'get_domain':get_domain,'get_family':get_family,'get_member':get_member,'parse':parse_tango_model}.items())
#self._defaults.update((k,None) for k in ('os','sys',)) #Updating Not allowed models
self._locals = dict(self._defaults) #Having 2 dictionaries to reload defaults when needed
if self.formula and launch:
if not self._trace:
print 'TangoEval: result = %s' % self.result
[docs] def trace(self,msg):
if self._trace: print 'TangoEval: %s'%str(msg)
[docs] def set_timeout(self,timeout):
self.timeout = int(timeout)
self.trace('timeout: %s'%timeout)
[docs] def find_macro(self,target):
Gets a match of FIND_EXP and applies get_matching_attributes to the expresion found.
exp = target.replace('"','').replace("'",'').strip()
exp,sep,what = exp.partition('.')
res = str(sorted(d.lower()+sep+what for d in get_matching_attributes([exp],trace=self._trace)))
return res.replace('"','').replace("'",'')
[docs] def add_macro(self,macro_name,macro_expression,macro_function):
Add a new macro to be parsed by parse_formula. It will apply macro_function to the target found by macro_expression; the result will later replace macro_name%target
e.g: self.add_macro('FIND(%s)',self.FIND_EXP,self.find_macro) #where FIND_EXP = 'FIND\(((?:[ \'\"])?[^)]*(?:[ \'\"])?)\)'
[docs] def parse_variables(self,formula,_locals=None,parsed=False):
''' This method parses attributes declarated in formulas with the following formats:
TAG1: dom/fam/memb/attrib >= V1 #A comment
TAG2: d/f/m/a1 > V2 and d/f/m/a2 == V3
TAG3: d/f/m.quality != QALARM #Another comment
TAG4: d/f/m/State ##A description?, Why not
- a None value if the alarm is not parsable
- a list of (device,attribute,value/time/quality) tuples
#self.trace( regexp)
idev,iattr,ival = 0,1,2 #indexes of the expression matching device,attribute and value
if not parsed: formula = self.parse_formula(formula,_locals)
##@var all_vars list of tuples with (device,/attribute) name matches
#self.variables = [(s[idev],s[iattr],s[ival] or 'value') for s in re.findall(regexp,formula) if s[idev]]
variables = [s for s in re.findall(self.regexp,formula)]
self.trace('parse_variables(...): %s'%(variables))
return variables
[docs] def read_attribute(self,device,attribute='',what='',_raise=True, timeout=None):
Executes a read_attribute and returns the value requested
:param _raise: if attribute is empty or 'State' exceptions will be rethrown
timeout = timeout or self.timeout
if not attribute:
if device.split(':')[-1].count('/')>2:
device,attribute = device.rsplit('/',1)
attribute = 'state'
aname = (device+'/'+attribute).lower()
if aname not in self.attributes:
dp = self.proxies[device]
try: dp.set_timeout_millis(timeout)
except: self.trace('unable to set %s proxy timeout to %s ms: %s'%(device,timeout,except2str()))
# Disabled because we want DevFailed to be triggered
#attr_list = [a.name.lower() for a in dp.attribute_list_query()]
#if attribute.lower() not in attr_list: #raise Exception,'TangoEval_AttributeDoesntExist_%s'%attribute
value = self.attributes[aname].read()
if self.cache_depth and not any(get_attribute_time(v)==get_attribute_time(value) for v in self.cache[aname]):
while len(self.cache[aname])>=self.cache_depth: self.cache[aname].pop(-1)
if what == 'all':
if self.cache_depth:
try: setattr(value,'delta',self.get_delta(aname))
except: pass
elif what in ('value',''):
value = getTangoValue(value,device=device)
elif what == 'time': value = get_attribute_time(value)
elif what == 'exception': value = isinstance(getattr(value,'value',None),PyTango.DevFailed)
elif what == 'delta': value = self.get_delta(aname)
else: value = getattr(value,what)
self.trace('read_attribute(%s/%s.%s) => %s'%(device,attribute,what,value))
except Exception,e:
if isinstance(e,PyTango.DevFailed) and what=='exception':
return e
elif _raise and not isNaN(_raise):
raise e
self.trace('TangoEval: ERROR(%s.%s)! Unable to get %s for attribute %s/%s: %s' % (type(e),_raise,what,device,attribute,except2str(e)))
value = _raise
return value
[docs] def update_locals(self,dct=None):
if dct:
if not hasattr(dct,'keys'): dct = dict(dct)
self._locals['now'] = self._locals['t'] = time.time()
self._locals['formula'] = self.formula
return self._locals
[docs] def parse_tag(self,target,wildcard='_'):
return wildcard+target.replace('/',wildcard).replace('-',wildcard).replace('.',wildcard).replace(':',wildcard).replace('_',wildcard).lower()
[docs] def get_delta(self,target):
target = (device+'/'+attribute).lower() ; returns difference between first and last cached value
if self.cache and target in self.cache:
cache = self.cache.get(target)
var_name = self.parse_tag(target)
if var_name in self.previous:
device,attribute = self.parse_variables(target)[0][:2]
cache = [self.read_attribute(device,attribute,'all'),self.previous[var_name]]
cache = []
delta = 0 if not cache else (cache[0].value-cache[-1].value)
self.trace('get_delta(%s); cache[%d] = %s; delta = %s' % (target,len(cache),[v.value for v in cache],delta))
return delta
[docs] def eventReceived(self, src, type_, value):
Method to implement the event notification
Source will be an object, type a PyTango EventType, evt_value an AttrValue
Regarding PANIC, the eventReceived hook must be in the PanicAPI, not here
self.trace('eventReceived: %s.%s'%(src,type_))
if self.event_hook:
if type_ in ('change','archive','quality','user_event','periodic'):
if 1e3*(now()-self._locals['now'])>self.keeptime:
r = self.eval()
if isCallable(self.event_hook):
[docs] def eval(self,formula=None,previous=None,_locals=None ,_raise=FAILED_VALUE):
Evaluates the given formula.
Previous can be used to add extra local values, or predefined values for attributes ({'a/b/c/d':1} that would override its reading
Any variable in locals is evaluated or explicitly replaced in the formula if appearing with brackets (e.g. FIND({VARNAME}/*/*))
:param _raise: if attribute is empty or 'State' exceptions will be rethrown
self.formula = (formula or self.formula).strip()
previous = previous or {}
_locals = _locals or {}
for x in ['or','and','not','in','is','now']: #Check for case-dependent operators
self.formula = self.formula.replace(' '+x.upper()+' ',' '+x+' ')
if self.formula.startswith(x.upper()+' '):
self.formula = x+self.formula[len(x):]
self.formula = self.formula.replace(' || ',' or ')
self.formula = self.formula.replace(' && ',' and ')
#self.previous.update(previous or {}) #<<< Values passed as eval locals are persistent, do we really want that?!?
self.formula = self.parse_formula(self.formula) #Replacement of FIND(...), env variables and comments.
variables = self.parse_variables(self.formula,parsed=True) #Extract the list of tango variables
self.trace('eval(_raise=%s): variables in formula are %s' % (_raise,variables))
self.source = self.formula #It will be modified on each iteration
targets = [(device + (attribute and '/%s'%attribute) + (what and '.%s'%what),device,attribute,what) for device,attribute,what in variables]
## NOTE!: It is very important to keep the same order in which expressions were extracted
for target,device,attribute,what in targets:
var_name = self.parse_tag(target)
#self.trace('\t%s => %s'%(target,var_name))
#Reading or Overriding attribute value, if overriden value will not be kept for future iterations
r = _raise if not any(d==device and a==attribute and w=='exception' for t,d,a,w in targets) else FAILED_VALUE
self.previous[var_name] = previous.get(target,
self.read_attribute(device,attribute or 'State',what,_raise=r))
#Remove attr/name, keep only the variable tag
#Every occurrence of the attribute is managed separately, read_attribute already uses caches within polling intervals
self.source = self.source.replace(target,var_name,1)
#Used from alarm messages
self.last[target] = self.previous[var_name]
except Exception,e:
self.trace('eval(r=%s): Unable to obtain %s values'%(r,target))
self.last[target] = e
raise e
self.trace('formula = %s' % (self.source))
self.trace('previous.items():\n'+'\n'.join(str((str(k),str(i))) for k,i in self.previous.items()))
self.result = eval(self.source,dict(self.previous),self._locals)
self.trace('result = %s' % str(self.result))
return self.result
## Tango Command executions
[docs]class TangoCommand(object):
This class encapsulates a call to a Tango Command, it manages asynchronous commands in a background thread or process.
It also allows to setup a "feedback" condition to validate that the command has been executed.
The usage would be like::
tc = TangoCommand('move',DeviceProxy('just/a/motor'),asynch=True,process=False)
#In this example the value of the command will be returned once the state will change
result = tc(args,feedback='state',expected=PyTango.DevState.MOVING,timeout=10000)
#In this other example, it will be the value of the state what will be returned
result = tc(args,feedback='state',timeout=10000)
:param command: the name of a tango command; or a callable
:param device: a device that can be an string, a DeviceProxy or a TaurusDevice
:param timeout: when using asynchronous commands default timeout can be overriden
:param feedback: attribute, command or callable to be executed
:param expected: if not None, value that feedback must have to consider the command successful
:param wait: time to wait for feedback (once command has been executed)
:param asynch: to perform the wait in a different thread instead of blocking
:param process: whether to use a different process to execute the command (if CPU intensive or trhead-blocking)
[docs] class CommandException(Exception): pass
[docs] class CommandTimeout(Exception): pass
[docs] class BadResult(Exception): pass
[docs] class BadFeedback(Exception): pass
Proxies = ProxiesDict()
def __init__(self,command,device=None,timeout=None,feedback=None,expected=None,wait=3.,asynch=True,process=False):
self.device = device
self.proxy = TangoCommand.Proxies[self.device]
if isinstance(command,basestring):
if '/' in command:
d,self.command = command.rsplit('/',1)
if not self.device: self.device = d
else: self.command = command
self.info = self.proxy.command_query(self.command)
else: #May be a callable
self.command,self.info = command,None
self.timeout = timeout or 3.
self.feedback = feedback and self._parse_feedback(feedback)
self.expected = expected
self.wait = wait
self.asynch = asynch
self.process = process
self.event = threading.Event()
if process:
import fandango.threads
self.process = fandango.threads.WorkerThread(device+'/'+command,process=True)
self.process = None
[docs] def trace(self,msg,severity='DEBUG'):
print '%s %s fandango.TangoCommand: %s'%(severity,time.ctime(),msg)
def _parse_feedback(self,feedback):
if isCallable(feedback):
self.feedback = feedback
elif isinstance(feedback,basestring):
if '/' in feedback:
device,target = feedback.rsplit('/',1) if feedback.count('/')>=(4 if ':' in feedback else 3) else (feedback,state)
device,target = self.device,feedback
proxy = TangoCommand.Proxies[device]
attrs,comms = proxy.get_attribute_list(),[cmd.cmd_name for cmd in proxy.command_list_query()]
if inCl(target,comms):
self.feedback = (lambda d=device,c=target: TangoCommand.Proxies[d].command_inout(c))
elif inCl(target,attrs):
self.feedback = (lambda d=device,a=target: TangoCommand.Proxies[d].read_attribute(a).value)
raise TangoCommand.CommandException('UnknownFeedbackMethod_%s'%feedback)
return self.feedback
def __call__(self,*args,**kwargs):
[docs] def execute(self,args=None,timeout=None,feedback=None,expected=None,wait=None,asynch=None):
self.trace('%s/%s(%s)'%(self.device,self.command,args or ''))
#args = (args or []) #Not convinient
timeout = notNone(timeout,self.timeout)
if feedback is not None:
feedback = self._parse_feedback(feedback)
feedback = self.feedback
expected = notNone(expected,self.expected)
wait = notNone(wait,self.wait)
asynch = notNone(asynch,self.asynch)
t0 = time.time()
result = None
if isString(self.command):
if not asynch:
if args: result = self.proxy.command_inout(self.command,args)
else: result = self.proxy.command_inout(self.command)
self.trace('Using asynchronous commands')
if args: cid = self.proxy.command_inout_asynch(self.command,args)
else: cid = self.proxy.command_inout_asynch(self.command)
while timeout > (time.time()-t0):
result = self.proxy.command_inout_reply(cid)
except PyTango.DevFailed,e:
if 'AsynReplyNotArrived' in str(e):
#elif any(q in str(e) for q in ('DeviceTimedOut','BadAsynPollId')):
#BadAsynPollId is received once the command is discarded
raise TangoCommand.CommandException(str(e).replace('\n','')[:100])
elif isCallable(self.command):
result = self.command(args)
t1 = time.time()
if t1 > (t0+self.timeout):
raise TangoCommand.CommandTimeout(str(self.timeout*1000)+' ms')
if feedback is not None:
self.trace('Using feedback: %s'%feedback)
tt,tw = min((timeout,(t1-t0+wait))),max((0.025,wait/10.))
now,got = t1,None
while True:
now = time.time()
got = type(expected)(feedback())
if not wait or expected is None or got==expected:
self.trace('Feedback (%s) obtained after %s s'%(got,time.time()-t0))
if now > (t0+timeout):
raise TangoCommand.CommandTimeout(str(self.timeout*1000)+' ms')
if now > (t1+wait):
if expected is None:
return got
elif got==expected:
self.trace('Result (%s,%s==%s) verified after %s s'%(result,got,expected,time.time()-t0))
return (result if result is not None else got)
raise TangoCommand.BadFeedback('%s!=%s'%(got,expected))
elif expected is None or result == expected:
self.trace('Result obtained after %s s'%(time.time()-t0))
return result
raise TangoCommand.BadResult(str(result))
if self.timeout < time.time()-t0:
raise TangoCommand.CommandTimeout(str(self.timeout*1000)+' ms')
return result
