#!/usr/bin/env python2.5
"""
#############################################################################
##
## file : callbacks..py
##
## description : This class manages a list of attributes subscribed to events that could have multiple receivers each one.
## It supplies the ATK AttributeList behaviour.
## device.DevChild and those inherited classes depends on that.
## Global objects are:
## _EventsList, _EventReceivers, _StatesList, _AttributesList, GlobalCallback
## ... _EventReceivers must be substituted by DevicesList
##
## project : Tango Control System
##
## $Author: Sergi Rubio Manrique, srubio@cells.es $
##
## $Revision: 2008 $
##
## copyleft : ALBA Synchrotron Controls Section, CELLS
## Bellaterra
## Spain
##
#############################################################################
##
## This file is part of Tango Control System
##
## Tango Control System is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as published
## by the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## Tango Control System is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
###########################################################################
"""
import sys,os,time,re
import threading,weakref
from copy import *
from excepts import getLastException,exc2str
from objects import *
from functional import *
from dicts import *
from tango import PyTango,EventType,fakeAttributeValue,ProxiesDict
from tango import get_full_name,get_attribute_events
from threads import ThreadedObject,Queue,timed_range,wait,threading
from log import Logger,printf
"""
@package callbacks
@par Internal Variables
@li _EventsList[attr_name] the last event received for each attribute
@li _StatesList[dev_name] keeps the last State read for each device
@li _AttributesList[dev_name] keeps the last AttributeValue struct for each attribute.
@remarks Mostly used by PyStateComposer device
It provides persistent storage.
lock.acquire and lock.release should be used to prevent threading problems!,
Use of the lists inside push_event is safe
"""
###############################################################################
# Lightweight implementation of TaurusListener/TaurusAttribute
EVENT_TO_POLLING_EXCEPTIONS = (
'API_AttributePollingNotStarted',
'API_DSFailedRegisteringEvent',
'API_NotificationServiceFailed',
'API_EventChannelNotExported',
'API_EventTimeout',
'API_EventPropertiesNotSet', #Typical for archive and data ready event
'API_CommandNotFound',
'API_PollObjNotFound',
)
EVENT_CONF_EXCEPTIONS = (
'API_AttributePollingNotStarted',
'API_EventPropertiesNotSet', #Typical for archive and data ready event
)
EVENT_TYPES = [ 'attr_conf', 'data_ready', 'user_event', 'periodic', 'change', 'archive','quality']
[docs]class AttrCallback(Logger,Object):
#def __call__(self,*args,**kwargs):
#self.attr_read(args,kwargs)
[docs] def attr_read(self,*args,**kwargs):
try:
print('Callback at %s'%time2str())
print('Received %d arguments'%len(args))
for a in args:
print('\t%s'%str(a)[:80])
for k,v in a.__dict__.items():
print('\t%s: %s'%(k,v))
return
except:
traceback.print_exc()
[docs]class EventListener(Logger,Object): #Logger,
"""
The EventListener class accepts 3 event hooks:
self.set_event_hook()
self.set_error_hook()
self.set_value_hook()
All of them need a callable accepting three arguments: src,type,value
In the case of value_hook it will not pass the event object but event.value/rvalue
"""
def __init__(self, name, parent=None,source=False):
"""
Pass name and parent object for logging.
If source is True or an object, this listener will subscribe for source or parent events.
If False, then subscription will have to be done using EventSource.addListener(EventListener)
"""
self.name,self.parent,self.source = name,parent,source
Logger.__init__(self,type(self).__name__+'(%s)'%self.name)
self.last_event_time = 0
#self.call__init__(Logger,name=name, parent=parent)
self.set_event_hook()
self.set_error_hook()
self.set_value_hook()
hooks = [o for o in (parent,source) if source and hasattr(o,'addListener')]
if hooks: hooks[0].addListener(self)
[docs] def set_event_hook(self,callable=None): self.event_hook = callable
[docs] def set_error_hook(self,callable=None): self.error_hook = callable
[docs] def set_value_hook(self,callable=None): self.value_hook = callable
[docs] def eventReceived(self, src, type_, value):
"""
Method to implement the event notification
Source will be an object, type a PyTango EventType, evt_value an AttrValue
"""
t0 = time.time()
inc = t0 - self.last_event_time
delay = int(1e3*(t0 - (ctime2time(value.time) if hasattr(value,'time') else t0)))
if getattr(value,'error',False):
self.warning('%s: eventReceived(%s,ERROR,%s): +%2.1f ms (%2.1f delay)'%(self.name,src,value,1e3*inc,delay))
if self.error_hook is not None:
try: self.error_hook(src,type_,value)
except: self.warning(traceback.format_exc())
else:
value = getattr(value,'value',getattr(value,'rvalue',type(value)))
self.debug('%s: eventReceived(%s,%s,%s): +%2.1f ms (%2.1f delay)'%(self.name,src,type_,value,1e3*inc,delay))
if value is not None and self.value_hook is not None:
try:self.value_hook(src,type_,value)
except: self.error(traceback.format_exc())
if self.event_hook:
try: self.event_hook(src,type_,value)
except: self.error(traceback.format_exc())
self.last_event_time = t0
[docs]class EventThread(Logger,ThreadedObject):
"""
This class processes both Events and Polling.
All listeners should implement eventReceived method
All sources must have a listeners list
On Current EventSource implementation this is a SINGLETONE!!!, its configuration
will apply for the whole running process!
The filtered argument will just forward last event received
on each loop iteration for each source. It can be set to False, True or a factor
of latency.
The latency (ms) specifies a time condition to abort event checking
and proceed to callback execution.
The real latency of the system is provided by inherited get_delay
and get_avg_delay methods
"""
MinWait = 0.0001 #Event processing limited to 10KHz maximum (rest are queued)
EVENT_POLLING_RATIO = 1000 #How many events to process before checking polled attributes
def __init__(self,period_ms=None,filtered=False,latency=10.,loglevel='WARNING'):
self.queue = Queue.Queue()
self.sources = set()
self.event = threading.Event()
period = 1e-3*(period_ms or 0) or self.MinWait
ThreadedObject.__init__(self,target=self.process,period=period)
Logger.__init__(self,type(self).__name__)
self.filtered,self.latency = filtered,latency
self.setLogLevel(loglevel)
[docs] def setup(self,period_ms=None,filtered=None,latency=None,loglevel=None):
"""
This method allows to reconfigure an already running EventThread
"""
self.filtered = notNone(filtered,self.filtered)
self.latency = notNone(latency,self.latency)
if period_ms is not None:
period = 1e-3*(period_ms or 0) or self.MinWait
self.set_period(period)
if loglevel is not None:
self.setLogLevel(loglevel)
[docs] def set_period_ms(self,period):
self.set_period(period*1e-3)
[docs] def put(self,*args):
self.queue.put(*args)
[docs] def get(self,*args,**kwargs):
return self.queue.get(*args,**kwargs)
[docs] def wait(self,tout=None):
self.event.wait(tout or self.MinWait)
[docs] def register(self,source):
self.info('EventThread.register(%s)'%source)
if not self.get_started():
self.start()
if source not in self.sources:
#self.sources[source.full_name] = source
source.full_name = str(source.full_name).lower()
self.sources.add(source)
source.last_read_time = time.time()
[docs] def get_source_name(self,source):
if isString(source):
name = source
else:
name = getattr(source,'full_name',getattr(source,'model',None))
#self.sources[name] = source
return str(name).lower()
[docs] def has_source(self,source):
if source in self.sources: return True
source = self.get_source_name(source)
return [s.full_name == source for s in self.sources]
[docs] def get_sources(self,source='.*'):
return [s for s in self.sources if clmatch(source,s.full_name)]
[docs] def get_source_names(self,source='.*'):
return sorted(set(s.full_name for s in self.get_sources(source)))
#def get_object(self,source):
#return self.sources.get(source,None) if isString(source) else source
[docs] def get_pending(self):
return self.queue.qsize()
[docs] def process(self):
"""
Currently, this implementation will process 100 events for each polling
each time as max.
"""
#self.debug('Processing Events')
WAS_EMPTY = False
queue = CaselessSortedDict()
t0,evs,polls = now(),0,0
filtered = self.filtered >= True
#First, extract events from the queue
for i in range(self.EVENT_POLLING_RATIO):
try:
data = self.get(block=False)
evs+=1
if isSequence(data):
source,args = data[0],data[1:]
else:
source,args = data,[]
name = self.get_source_name(source)
if not self.has_source(name): self.register(source)
if name not in queue: queue[name] = []
queue[name].append((source,args))
except Queue.Empty,e:
WAS_EMPTY = True
break
except Exception,e:
#The queue is empty, long period attributes can be processed
if 'empty' not in str(e).lower():
self.error(traceback.format_exc())
break
# Enable filtering if the performance is compromisesd
if (False < self.filtered < True) and (
(time.time()-t0)>(self.filtered * 1e-3*self.latency) or
self.queue.qsize() < self.EVENT_POLLING_RATIO):
filtered = True
if now()>(t0+1e-3*self.latency):
break
#Execute the events for each source
#Sequential execution of events received is relatively guaranteed
for s,events in sorted(queue.items()):
#@TODO: Event Filters should be implemented here
#including removal of PERIODIC events if a change event is already in queue
if filtered:
events = events[-1:]
while events:
try:
e = events.pop(0)
source,args = e
if filtered: #propagate last event to all sources
sources = self.get_sources(self.get_source_name(s))
else:
sources = [source] #each source should push its own events
for source in sources:
source.last_read_time = now()
self.fireEvent(source,*args)
self.wait(self.MinWait/10.) #breathing
except:
self.error('%s:%s\n%s'%(s,e,traceback.format_exc()))
self.wait(0) #breathing
## print('Executing pollings')
t0 = now()
pollings = []
for s in self.sources.copy():
s.checkEvents(tdiff=2e-3*s.polling_period)
if s.getMode():
nxt = s.last_read_time+(s.polling_period if s.isPollingEnabled()
else s.KeepAlive)/1e3
pollings.append((nxt,s))
for nxt,source in reversed(sorted(pollings)):
if t0 > nxt and (s.isPollingActive() or WAS_EMPTY):
try:
if WAS_EMPTY:
self.info('KeepAlive(%s) after %s ms'%(source.full_name,source.KeepAlive))
source.poll()
polls+=1
except:
traceback.print_exc()
if WAS_EMPTY: break
source.last_read_time = now()
## print('Check pending HW asynch requests')
asynch_hw = [s[1] for s in pollings if getattr(s,'pending_request',None) is not None]
for source in randomize(asynch_hw):
try:
source.asynch_hook() #<<<< THIS SHOULD BE ASYNCHRONOUS!
except Exception,e:
traceback.print_exc()
source.last_read_time = now() #<<< DO NOT UPDATE THAT HERE; DO IT ON ASYNCH()
break
self.wait(0)
if evs or polls: self.debug('Processed %d events, %d pollings'%(evs,polls))
return
[docs] def fireEvent(self,source,*args):
"""
It just retriggers every EventSource fireEvent method
If there's no method nor listeners, a callable is launched
"""
try:
if hasattr(source,'fireEvent'):
source.fireEvent(*args) ## Event types are filtered at EventSource.fireEvent
elif hasattr(source,'listeners'):
listeners = source.listeners.keys()
for l in listeners:
try: getattr(l,'eventReceived',l)(*([source]+list(args)))
except: traceback.print_exc()
finally: self.event.wait(self.MinWait/len(source.listeners))
elif isCallable(source):
source(*data[1:0])
except:
self.warning('fireEvent',source,args)
self.warning(traceback.format_exc())
[docs]class EventSource(Logger,SingletonMap):
"""
Simplified implementation of Taurus Attribute/Model and fandango.CachedAttributeProxy classes
It implements CachedAttributeProxy methods but doesnt inherit from it;
this regression is due to the lack of reliability of AttributeProxy in PyTango 9.
Documentation at doc/recipes/EventsAndCallbacks.rst
Slow Polling will be always enabled, as a KeepAlive is always kept reading
the attribute values at an slow rate.
Well, will be always enabled as long there are Listeners or Forced is True. If not polling
will not be activated and it will be simply a CachedAttributeProxy.
In this implementation, just a faster polling will be enabled if the
attribute provides no events. But the slow polling will never be fully disabled.
If no events are received after EVENT_TIMEOUT, polling wil be also enabled.
It will also subscribe to all attribute events, not only CHANGE and CONFIG
All types are: 'CHANGE_EVENT,PERIODIC_EVENT,QUALITY_EVENT,ARCHIVE_EVENT,ATTR_CONF_EVENT,DATA_READY_EVENT,USER_EVENT'
Arguments to EventSource(...) are:
- name : attribute name (simple or full)
- parent : device name or proxy
- enablePolling (force polling by default)
- pollingPeriod (3000)
- keeptime (500 ms) min. time (in ms!) between HW reads
- tango_asynch = True/False ; to use asynchronous Tango reading
- listeners = a list of listeners to be added at startup
- persistent = if True, a dummy listener is added to enforce subscription
Arguments are supported in CamelCase and lowercase to keep compatibility
with previous apis (CachedProxy and TaurusAttribute).
Keep in mind that if tango_asynch=True; you will not get the latest value
when doing read(cache=False). You must use read(cache=False,asynch=False) instead.
@TODO: Listeners should be assignable to only one type of eventl.
@TODO: read(cache=False) should trigger fireEvent if not called from poll()
"""
EVENT_TIMEOUT = 900 # 10s
DEFAULT_LOG = 'WARNING'
DEFAULT_EVENTS = [ 'periodic', 'change', 'archive', 'quality' ] #'user_event',
VALUE_EVENTS = ['periodic','change','archive','quality','user_event']
TAURUS_EVENTS = ['change','attr_conf']
QUEUE = None
DefaultPolling = 3000.
KeepAlive = 15000.
INSTANCES = []
PROXIES = ProxiesDict()
DUMMY = EventListener('dummy') #Dummy listener for subscription persistence
#States
STATES = Struct({
'UNSUBSCRIBED': 0, #Not using events at all
'PENDING': 1, #Polled, waiting for first event
'SUBSCRIBING': 2, #In the process of subscribing
'SUBSCRIBED': 3, #Using events only
'FORCED': 4, #Polling forced, interlaced with events
'FAILED': -1, #Not working at all
})
def __init__(self, name, keeptime=1000., fake=False, parent=None, **kw):
""" Arguments: loglevel, tango_asynch, pollingPeriod, keeptime, enablePolling, use_events """
if 0 < name.replace('//','/').count('/') < name.count(':')+3:
name += '/state'
self.simple_name = name.split('/')[-1]
if not parent and '/' in name:
parent = name.rsplit('/',1)[0]
self.fake = kw.get('fake',False)
if isString(parent):
self.device = parent # just to keep it alive
try:
self.proxy = not self.fake and self.PROXIES[parent]
except:
self.proxy = not self.fake and PyTango.DeviceProxy(parent)
else:
try:
self.device = parent.name()
self.proxy = parent
except:
raise Exception('A valid device name is needed: %s'%
([name,self.simple_name,parent]))
self.full_name = get_full_name('/'.join((self.device,self.simple_name)))
self.normal_name = '/'.join(self.device.split('/')[-3:]+[self.simple_name])
assert self.fake or self.proxy,'A valid device name is needed'
##Set logging
#self.call__init__(Logger,self.full_name) ##This fails, for unknown reasons
Logger.__init__(self,self.full_name)
self.setLogLevel(kw.get('loglevel',kw.get('log_level',
kw.get('logLevel',self.DEFAULT_LOG))))
self.info('Init()')
self.listeners = defaultdict(set) #[]
self.event_ids = dict() # An {EventType:ID} dict
self.state = self.STATES.UNSUBSCRIBED
self.tango_asynch = kw.get('tango_asynch',False)
self.write_with_read = kw.get('write_with_read',False)
## Set polling configuration
# Indicates if the attribute is being polled periodically
# stores if polling has been forced by user API
self.forced = kw.get('enablePolling',kw.get('enable_polling',
kw.get('forcePolling',kw.get('force_polling',False))))
self.polled = self.forced #Forced is permanent, polled may change
# current polling period in milliseconds
self.polling_period = kw.get("pollingPeriod",kw.get('polling_period',
iif(isNumber,self.forced,self.DefaultPolling,float)))
self.keep_time = kw.get('keep_time',keeptime)
# force tango events usage
self.use_events = kw.get("use_events",[]) or []
if self.use_events is True:
self.use_events = self.DEFAULT_EVENTS
self.attr_value = None
self.event_lock = threading.Lock()
self.last_event = dict()
self.last_event_time = 0
self.last_error = None
self.last_read_time = 0
self.pending_request = None
self.stats = defaultdict(int)
self.stats['start'] = now()
EventSource.INSTANCES.append(weakref.ref(self))
if self.forced:
self.activatePolling()
listeners = toList(kw.get('listeners',[]))
if kw.get('persistent',False):
listeners.append(EventSource.DUMMY)
map(self.addListener,listeners)
def __del__(self):
self.cleanUp()
def __str__(self):
return 'EventSource(%s)'%(self.full_name)
def __repr__(self):
return str(self)
[docs] def getParent(self):
return self.device
[docs] def getParentObj(self):
return self.proxy
[docs] def cleanUp(self):
self.debug("cleanUp")
while self.listeners:
self.removeListener(self.listeners.popitem(),exclude='')
if self.isPollingEnabled():
self.deactivatePolling()
if self.checkState('SUBSCRIBED','PENDING'):
self.unsubscribeEvents()
[docs] def resetStats(self):
t0 = now()
self.stats = defaultdict(int)
self.stats['start'] = t0
return t0
@staticmethod
[docs] def thread():
"""
It returns the current EventThread INSTANCE
Use EventSource.thread().setup(...) to configure it
"""
if EventSource.QUEUE is None:
EventSource.QUEUE = EventThread()
return EventSource.QUEUE
@staticmethod
[docs] def start_thread():
th = EventSource.thread()
if not th.get_started():
th.start()
[docs] def setState(self,state):
try:
state = self.STATES.get(state,state)
if self.state != state:
for k,v in self.STATES.items():
if v==self.state: o = k
if v==state: n = k
self.state = state
self.info('setState(): %s => %s'%(o,n))
except:
self.error('UNKNOWN STATE: %s'%state)
[docs] def checkState(self,*states):
states = states or [self.state]
states = [self.STATES.get(s,s) for s in states]
if self.state in states:
return self.STATES.get_key(self.state)
else:
return False
[docs] def getMode(self):
"""
SYNCHRONOUS = 0
EVENTS = 1
POLLED = 2
"""
m = (self.use_events and 1) or (self.forced and 2) or (self.polled and 3)
return int(m)
[docs] def isUsingEvents(self):
""" This method doesnt tell if it wants to use_events but if it is actually receiving them """
return self.checkState('SUBSCRIBED')
[docs] def enablePolling(self,force=False):
if force: self.activatePolling(force = force)
[docs] def forcePolling(self, period = None):
if period: self.activatePolling(period,force=True)
else: self.deactivatePolling()
[docs] def disablePolling(self):
self.deactivatePolling() # DON'T STOP THREADS HERE!
[docs] def isPollingEnabled(self):
""" It should be called isPollingAllowed instead """
return self.polled
[docs] def activatePolling(self,period = None, force = None):
#self.factory().addAttributeToPolling(self, self.getPollingPeriod())
self.polling_period = notNone(period,self.polling_period)
self.polled = True
self.info('activatePolling(%s,%s)'%(self.full_name,self.polling_period))
self.resetStats()
self.forced = notNone(force,self.forced)
self.thread().register(self)
[docs] def deactivatePolling(self):
#self.factory().removeAttributeFromPolling(self)
self.info('deactivatePolling(%s,%s)'%(self.full_name,self.state))
self.polled = False
self.forced = False
[docs] def isPollingActive(self):
return self.polled or self.forced
[docs] def isPollingForced(self):
return self.forced
[docs] def changePollingPeriod(self, period):
"""change polling period to period miliseconds """
self.polling_period = period
self.activatePolling()
[docs] def getPollingPeriod(self):
"""returns the polling period """
return self.polling_period
# LISTENER METHOD
def _listenerDied(self, weak_listener):
try:
self.listeners.pop(weak_listener)
except Exception, e:
pass
[docs] def addListener(self, listener,use_events=True,use_polling=False):
"""
Adds a Listener to this EventSource object.
use_events can be a boolean or a list of event types (change,attr_conf,periodic,archive)
"""
if not isCallable(listener) \
and not hasattr(listener,'eventReceived') \
and not hasattr(listener,'event_received'):
raise Exception('NotAValidListener!: %s'%listener)
if not use_events: use_events = []
elif use_events is True: use_events = self.use_events or self.DEFAULT_EVENTS
use_events = toList(use_events or self.use_events)
self.debug('addListener(%s,use_events=%s,polled=%s)'%(listener,use_events,use_polling))
self.forced = self.forced or use_polling
self.thread().register(self)
if use_events:
self.subscribeEvents(types=use_events,asynchronous=self.checkState('UNSUBSCRIBED'))
if self.forced and not self.polled:
self.activatePolling()
weak = weakref.ref(listener,self._listenerDied)
if weak not in self.listeners:
#This line is needed, as listeners may be polling only
self.listeners[weak] = set()
for e in use_events:
self.listeners[weak].add(e)
return True
[docs] def removeListener(self, listener, exclude='dummy'):
"""
Remove a listener object or callback.
:listener: can be object, weakref, sequence or '*'
"""
if listener == '*':
self.warning('Removing all listeners')
listener = [k for k in self.listeners.keys() if not k().name==exclude]
elif isString(listener):
listener = [k for k in self.listeners.keys() if k().name==listener]
if isSequence(listener):
while listener:
self.removeListener(listener.pop())
return
elif not isinstance(listener,weakref.ReferenceType):
listener = weakref.ref(listener,self._listenerDied)
try:
self.listeners.pop(listener)
except Exception, e:
return False
if not self.listeners:
self.unsubscribeEvents()
return True
[docs] def hasListeners(self):
""" returns True if anybody is listening to events from this attribute """
if not self.listeners:
return False
return len(self.listeners) > 0
[docs] def fireEvent(self, event_type, event_value, listeners=None):
"""
sends an event to all listeners or a specific one
event type filtering is done here
poll() events will be allowed to pass through
"""
pending = self.thread().get_pending()
if pending:
self.debug('fireEvent(%s), %d events still in queue'%(event_type,pending))
self.stats['fired']+=1
listeners = listeners or self.listeners.keys()
for l in listeners:
try:
#event filter will allow poll() events to pass through
if (event_type not in ('periodic',PyTango.EventType.PERIODIC_EVENT)
and l in self.listeners and event_type not in self.listeners[l]):
self.debug('%s event discarded'%event_type)
continue
if isinstance(l, weakref.ref):
l = l()
if hasattr(l,'eventReceived'):
self.debug('fireEvent(%s) => %s?' % (event_type,l))
l.eventReceived(self, event_type, event_value)
elif isCallable(l):
l(self, event_type, event_value)
except:
traceback.print_exc()
try:
vtime = ctime2time(getattr(event_value,'time',None))
self.stats['acc_latency'] += now()-vtime
self.stats['latency'] = self.stats['acc_latency']/self.stats['fired']
except: pass
# TANGO RELATED METHODS
[docs] def checkEventsReceived(self,types=None):
types = types or self.event_ids
if not types:
return False
for t in types:
if not any(clmatch(e,t) or clmatch(t,e) for e in self.last_event):
return False
return True
[docs] def subscribeEvents(self,types=None,asynchronous=True):
try:
self.event_lock.acquire()
t0 = self.resetStats()
types = toList(types) if types else []
if not isIterable(self.use_events): self.use_events = []
self.use_events = sorted(set((self.use_events+types) or self.DEFAULT_EVENTS))
if self.isUsingEvents() and self.checkEventsReceived(self.use_events):
self.warning('AlreadySubscribed!')
return False
self.info('subscribeEvents(%s,asynch=%s)'%(self.use_events,asynchronous))
if asynchronous:
# Subscription to be done by checkEvents()
#if self.checkState('SUBSCRIBING','PENDING'):
self.setState('UNSUBSCRIBED')
else:
self.last_event_time = now()
##self.thread().stop() #DONT DO THIS WITHIN THE LOOP HOOK!
try:
for k,type_ in EventType.names.items():
if any(clmatch(e,k) for e in self.use_events):
try:
if self.event_ids.get(type_) is not None:
self.debug('event %s already subscribed'%type_)
else:
self.info('SUBSCRIBING to %s events'%type_)
self.event_ids[type_] = self.proxy.subscribe_event(
self.simple_name,type_,self,[],True)
if not self.isUsingEvents():
self.setState('SUBSCRIBING')
except:
self.debug('event %s not subscribed'%type_)
## State will be kept in SUBSCRIBING until an event is received
## If nothing arrives after N seconds, polling will be activated
self.info('subscribeEvents() took %.2f seconds'%(now()-t0))
except:
self.error('subscribeEvents(): \n'+traceback.format_exc())
finally:
pass
#self.start_thread() #DONT DO THIS WITHIN THE LOOP HOOK!
self.thread().register(self)
except:
self.error('subscribeEvents(): \n'+traceback.format_exc())
finally:
self.event_lock.release()
return True
[docs] def checkEvents(self,tdiff=None,vdiff=None):
"""
tdiff: max time difference allowed between last_event_time and current time
vdiff: difference between last event value and current hw value
"""
#self.debug('checkEvents(...)')
delta = now()-self.last_event_time
if delta > (tdiff or self.EVENT_TIMEOUT): tdiff = delta
## @TODO: vdiff should be compared against event config
vdiff = vdiff if not isSequence(vdiff) else any(vdiff)
r = True
if self.use_events:
if self.checkState('UNSUBSCRIBED'):
self.subscribeEvents(asynchronous=False)
if tdiff and (self.checkState('SUBSCRIBING') or (vdiff and self.isUsingEvents())):
if not self.isUsingEvents():
self.warning('Event subscribing failed (tdiff=%s,vdiff=%s) switching to polling'%(tdiff,vdiff))
self.setState('PENDING')
else:
self.warning('Values differ and no event received (check sardana/attr_conf)! (tdiff=%s,vdiff=%s)'%(tdiff,vdiff))
r = False
if self.listeners:
if not self.polled and (self.forced or self.use_events and not self.isUsingEvents()):
self.info('checkEvents(): events not subscribed, enabling polling')
self.activatePolling()
else:
if self.polled and not self.forced:
self.info('checkEvents(): no clients, stop polling')
self.deactivatePolling()
# Do not unsubscribe events if not called explicitly
#if self.isUsingEvents():
#self.info('checkEvents(): no clients, disabling events')
#self.unsubscribeEvents()
return r
[docs] def unsubscribeEvents(self):
self.info('unsubscribeEvents(...)')
self.use_events = []
for type_,ID in self.event_ids.items():
try:
self.proxy.unsubscribe_event(ID)
self.event_ids.pop(type_)
except Exception,e:
self.debug('Error unsubscribing %s events: %s'%(type_,e))
if not self.hasListeners() and not self.isPollingForced():
self.deactivatePolling()
self.setState('UNSUBSCRIBED')
[docs] def decodeAttrInfoEx(self,attr_conf):
pass
[docs] def set_cache(self,value,t=None):
"""
set_cache and fake are used by PyAlarm.update_locals
It's used to emulate alarm state reading from other devices
"""
self.last_read_time = t or time.time()
self.attr_value = hasattr(value,'value') and value or fakeAttributeValue(self.full_name,value,error=False)
self.fireEvent(EventType.PERIODIC_EVENT,self.attr_value)
[docs] def write(self, value, with_read=None):
"""
The with_read argument will trigger a read() call and an update in all listeners
"""
with_read = with_read if with_read is not None else self.write_with_read
self.debug('write(...,with_read=%s)'%with_read)
self.stats['write']+=1
self.proxy.write_attribute(self.simple_name,value)
if with_read:
return self.read(cache=False)
[docs] def read(self, cache=None,asynch=None,_raise=True):
"""
Read last value acquired,
if cache = False or not polled it will trigger
a proxy.read_attribute() call.
If asynch=True/False, self.tango_asynch will be overriden for this call.
"""
self.debug('read(cache=%s,asynch=%s)'%(cache,asynch))
asynch = notNone(asynch,self.tango_asynch)
t0 = now()
# If it was just updated, return cache
if cache is None:
vtime = ctime2time(getattr(self.attr_value,'time',None))
if self.fake or (t0 < (vtime + self.keep_time*1e-3)):
cache = True
# If not polled, force HW reading
elif not self.getMode():
cache = False
else:
cache = True
self.asynch_hook() # Check for pending asynchronous results
if not cache or self.attr_value is None:
if not self.checkState('UNSUBSCRIBED') and not self.last_event:
self.info('Attribute first reading (subscribed but no events received yet)')
self.stats['read']+=1
self.debug('%s.read_attribute(%s,%s,%s)'%(self.device,self.simple_name,self.tango_asynch,self.pending_request))
try:
## Do not merge these IF's, order matters
if asynch:
if self.pending_request is not None:
self.attr_value = notNone(self.asynch_hook(),self.attr_value)
else:
self.pending_request = self.proxy.read_attribute_asynch(self.simple_name),now()
self.attr_value = notNone(self.asynch_hook(),self.attr_value)
else:
self.attr_value = self.proxy.read_attribute(self.simple_name)
except Exception,e:
# fakeAttributeValue initialized with full_name
print('EventSource.read(%s) failed!:\n%s'%(self.full_name,exc2str(e)))#traceback.format_exc().split('desc')[-1][:80]))
self.attr_value = fakeAttributeValue(self.full_name,value=e,error=e)
self.last_read_time = t0
self.fireEvent(EventType.PERIODIC_EVENT,self.attr_value)
if _raise and getattr(self.attr_value,'error',False):
raise self.attr_value.value
else:
return self.attr_value
[docs] def asynch_hook(self):
if self.pending_request is None:
return None
self.debug('asynch_hook(%s)'%str(self.pending_request))
try:
self.attr_value = self.proxy.read_attribute_reply(self.pending_request[0])
self.pending_request = None
return self.attr_value
except PyTango.AsynReplyNotArrived,e:
return None
#except PyTango.CommunicationFailed,e:
##Device killed or restarted
#self.pending_request = None
#return None
#except PyTango.AsynCall
##raise Exception,'CHECK HERE FOR EXPIRED REQUEST OR REQUEST NO VALID ANYMORE (e.g. DEVICE RESTARTED)'
#self.pending_request = None
#return None
except Exception, e:
self.pending_request = None
self.attr_value = e
return None
[docs] def getValueObj(self):
v = self.read(cache=True)
try: return v.value
except: return v
[docs] def poll(self):
t0 = now()
self.debug('poll(+%s): %s'%(t0-self.last_read_time,self.stats['poll']))
self.stats['poll']+=1
if self.checkState('SUBSCRIBING'):
## While subscribing, polling is ignored and resumed on SUBSCRIBED/PENDING state
self.thread().lasts[self.full_name] = self.last_read_time = t0
return
try:
prev = self.attr_value and self.attr_value.value
## The read() call will trigger a fireEvent()
self.attr_value = self.read(cache=False) #(self.attr_value is not None))
av = getattr(self.attr_value,'value',self.attr_value)
diff = prev != av
r = self.checkEvents(vdiff=diff)
if diff and self.isUsingEvents():
self.info('Polled a subscribed attribute; diff: %s!=%s'%(prev,av))
except Exception,e:
self.debug('poll(%s): %s'%(self.full_name,exc2str(e)))
##FIRE EVENT IS ALREADY DONE IN read() METHOD!
[docs] def push_event(self,event):
try:
REG_FAILED = 'API_DSFailedRegisteringEvent'
NOT_READY = 'API_AttributeNotDataReadyEnabled'
NOT_PROPS = 'API_EventPropertiesNotSet'
self.stats['total_events']+=1
t0 = now()
type_ = event.event
self.stats[type_]+=1
et = ctime2time(event.reception_date)
if et<1e9:
if not event.err:
self.warning('%s event time is 0!'%type_)
et = t0
if event.err:
self.last_error = event
has_evs,is_polled = self.isUsingEvents(),self.isPollingActive()
value = event.errors[0]
reason = event.errors[0].reason
(self.info if (self.last_event_time < time.time()-5*self.KeepAlive) else self.debug)(
'push_event(%s,err=%s,has_events=%s,polled=%s)'%(type_,reason,has_evs,is_polled))
if reason == 'API_EventPropertiesNotSet' and self.isUsingEvents():
#Nothing to do, other event types are already subscribed
return
elif reason in EVENT_TO_POLLING_EXCEPTIONS:
if reason in EVENT_CONF_EXCEPTIONS and any(self.last_event.values()):
#Discard config exceptions if some other event is already working well
return
elif self.use_events and not is_polled:
self.warning('EVENTS_FAILED! (%s,%s,%s,%s): reactivating Polling'%(type_,reason,has_evs,is_polled))
self.warning(str([et,type_,event.err,]))
self.setState('PENDING')
self.activatePolling()
return
else:
if reason not in (NOT_PROPS,NOT_READY,REG_FAILED):
# If push_event is executed, attributes are being received
self.warning('ERROR in %s(%s): %s(%s)'%(self.full_name,type_,type(value),reason))
self.last_event_time = et or time.time() #A valid error is a valid event
self.setState('SUBSCRIBED')
else:
#Ignore the event
return
elif isinstance(event,PyTango.AttrConfEventData):
self.debug('push_event(%s)'%str(type_))
value = event.attr_conf
self.decodeAttrInfoEx(value)
#(Taurus sends here a read cache=False instead of AttrConf)
else:
(self.debug if self.last_event.get(type_,None) else self.info)(
'push_event(%s,err=%s,has_events=%s)'%(type_,event.err,self.isUsingEvents()))
self.setState('SUBSCRIBED')
try:
self.attr_value = value = event.attr_value
except:
self.warning('push_event(%s): no value in %s'%(type_,dir(event)))
self.attr_value = value = None
if str(event.event)=='periodic' and (t0-self.last_event_time)<0.01:
#Dropping unnecessary periodic events
self.last_event[type_] = event
return
self.last_event_time = et or time.time()
if self.isPollingActive() and not self.isPollingForced():
self.info('push_event(): Event received, deactivating polling')
self.warning(str([et,type_,event.attr_value]))
self.deactivatePolling()
delay = t0-self.last_event_time
if delay > 1e3 : self.warning('push_event was %f seconds late'%delay)
self.last_event[type_] = event
#Instead of firingEvent, I return and pass the value to the queue
self.thread().put((self,type_,value))
except:
self.error(type(event),dir(event))
traceback.print_exc()
###############################################################################
[docs]class TangoListener(EventListener):
__doc__ = EventListener.__doc__
pass
[docs]class TangoAttribute(EventSource):
__doc__ = EventSource.__doc__
pass
#For backwards compatibility
import fandango.tango
[docs]class CachedAttributeProxy(EventSource):
pass
setattr(fandango.tango,'CachedAttributeProxy',CachedAttributeProxy)
[docs]def get_test_objects(model='controls02:10000/test/sim/tonto/Pressure',period=1.):
"""
v,tl = fandango.callbacks.get_test_objects()
"""
tv = TangoAttribute(model)
tl = TangoListener('test')
tv.setLogLevel('DEBUG')
tl.setLogLevel('DEBUG')
tv.thread().set_period(period)
tv.thread().setLogLevel('DEBUG')
#tv.addListener(tl,use_events=False)
return tv,tl
###############################################################################
# OLD API, DEPRECATED
_EventsList = {}
_EventReceivers = {}
_StatesList = {}
_AttributesList = {}
[docs]class EventStruct():
name = ''
event = None
receivers = []
[docs]class TAttr(EventStruct):
"""
This class is used to keep information about events received,
example of usage inside device.DevChild
"""
def __init__(self,name,dev_name='',proxy=None,event_id=None):
self.name=name
self.event=None #This is the last event received
self.event_id=event_id #This is the ID received when subscribing
self.dp=proxy #This is the device proxy
self.dev_name=dev_name
self.receivers=[] #This is the list of composers that must receive this event
self.attr_value=None
self.State=PyTango.DevState.UNKNOWN
def __str__(self):
return str(name)+","+str(self.event)+","+TangoStates[self.State]+";"
[docs] def set(self, event):
self.event=event#copy(event)
self.attr_value=self.event.attr_value
#def command_queue(cmd_list,args_list=[],timeout=5000,threads=10):
#''' executes a set of commands asynchronously with the specified timeout
#'''
#from threading import Thread
#from Queue import Queue
#if args_list and len(cmd_list)!=len(args_list):
#raise Exception,'cmd_list and args_list lengths differ!'
#num_threads = max(len(cmd_list),max_threads)
#queue = Queue()
#results = {}
##wraps system ping command
#def pinger(i, q, r):
#"""Pings subnet"""
#wait = threading.Event().wait
#while True:
#wait(.3)
#ip = q.get()
#r[ip] = (not ret)
#q.task_done()
##Spawn thread pool
#for i in range(num_threads):
#worker = Thread(target=pinger, args=(i, queue,results))
#worker.setDaemon(True)
#worker.start()
##Place work in queue
#for ip in ips:
#queue.put(ip)
##Wait until worker threads are done to exit
#queue.join()
#return results
###############################################################################
# OLD API, DEPRECATED
[docs]class EventCallback():
"""
It provides persistent storage.
lock.acquire and lock.release should be used to prevent threading problems!,
Use of the lists inside push_event is safe
"""
def __init__(self):
self.lock=threading.RLock()
self.TimeOutErrors=0
self.NotifdExceptions=['OutOfSync','EventTimeout','NotificationServiceFailed']
[docs] def push_event(self,event):
self.lock.acquire()
try:
#Pruning tango:$TANGO_HOST and other tags
attr_name = '/'.join(event.attr_name.split('/')[-4:])
dev_name = hasattr(event.device,'name') and event.device.name() or event.device
print "in EventCallback.push_event(",dev_name,": ",attr_name,")"
if not event.err and event.attr_value is not None:
print "in EventCallback.push_event(...): ",attr_name,"=", event.attr_value.value
self.TimeOutErrors=0
_EventsList[attr_name.lower()].set(event)
if attr_name.lower().endswith('/state'):
_StatesList[dev_name.lower()]=event.attr_value.value
_AttributesList[event.attr_name.lower()]=event.attr_value
else:
print 'in EventCallback.push_event(...): Received an Error Event!: ',event.errors
_EventsList[attr_name.lower()].set(event)
#if 'OutOfSync' in event.errors[0]['reason']: or 'API_EventTimeout' in event.errors[0]['reason']:
#if [e for e in event.errors if hasattr(e,'keys') and 'reason' in e.keys() and any(re.search(exp,e['reason']) for exp in self.NotifdExceptions)]:
reasons = [getattr(e,'reason',e.get('reason',str(e)) if hasattr(e,'get') else str(e)) for e in event.errors] #Needed to catch both PyTango3 and PyTango7 exceptions
if any(n in r for n in self.NotifdExceptions for r in reasons):
print 'callbacks=> DISCARDED EVENT FOR NOTIFD REASONS!!! %s(%s)' \
%(dev_name,reasons)
self.TimeOutErrors+=1
self.lock.release()
return
else:
self.TimeOutErrors=0
if attr_name.lower().endswith('/state'):
_StatesList[dev_name.lower()]=None #An unreaded State cannot be UNKNOWN, it must be None to notify that an exception occurred!
_AttributesList[attr_name.lower()]=None
#Launching Device.push_event()
for rec in _EventsList[attr_name].receivers:
if rec in _EventReceivers.keys(): _EventReceivers[rec].push_event(event)
elif hasattr(rec,'push_event'): rec.push_event(_event)
elif isinstance(rec,threading.Event): rec.set()
elif callable(rec): rec()
else: raise 'UnknownEventReceiverType'
except Exception,e:
print 'exception in EventCallback.push_event(): ',e, ";", getLastException()
self.lock.release()
###############################################################################
# OLD API, DEPRECATED
#THIS IS THE EVENTS CALLBACK SINGLETONE:
GlobalCallback = EventCallback()
## @TODO ... implemented in addTAttr and addReceiver ... missing a dp attribute to finish the work
#def subscribeToAttribute(subscriber,att_name):
#"""
#subscriber: a DeviceImpl object or the name of an already subscribed object
#attribute: the FULL_NAME of the attribute to subscribe
#"""
#if att_name.count('/')<3: raise 'subscribeToAttribute_IncompleteAttributeName'
#if isinstance(subscriber,PyTango.DeviceImpl):
#EventReceivers[subscriber.get_name()]=subscriber
#elif isinstance(subscriber,str):
#subscriber=_EventReceivers[subscriber]
#else: raise 'subscribeToAttribute_UnknownSubscriberException'
#if not att_name in _EventsList.keys():
#print 'subscribeToAttribute(%s,%s)'%(subscriber.get_name(),att_name)
#EventsList[att_name] = TAttr(att_name)
#EventsList[att_name].receivers.append(subscriber)
#EventsList[att_name].event_id = self.dp.subscribe_event(att,PyTango.EventType.CHANGE_EVENT,GlobalCallback,[],True)
#EventsList[att_name].dev_name = att_name.rsplit('/',0)
#AttributesList[att_name]=None #it could be done inside _EventsList?!?!? ... or could AttributeList substitute _EventsList?
##It will not be initialized here ... as it differs in DevChild an DevsList
##EventsList[att_name].dp = self.dp
#if att=='State': #DevsList should substitute that
#StatesList[_EventsList[att_name].dev_name]=PyTango.DevState.UNKNOWN
#print "In ", self.get_name(), "::check_dp_attributes()", ": Listing Device/Attributes in _EventsList:"
#for a,t in _EventsList.items(): print "\tAttribute: ",a,"\tDevice: ",t.dev_name,"\n"
#else:
#print "In ", self.get_name(), "::check_dp_attributes(",att_name,")", ": This attribute is already in the list, adding composer to receivers list."
#if not subscriber.get_name() in _EventsList[att_name].receivers and not subscriber in _EventsList[att_name].receivers:
#EventsList[att_name].receivers.append(subscriber)
#pass5
###############################################################################
# OLD API, DEPRECATED
[docs]def inStatesList(devname):
print 'In callbacks.inStatesList ...'
GlobalCallback.lock.acquire()
print 'Checking if %s in %s.'%(devname,str(_StatesList.keys()))
value=bool(devname.lower() in _StatesList.keys())
GlobalCallback.lock.release()
return bool
[docs]def getStateFor(devname):
print 'In callbacks.getStateFor ...'
GlobalCallback.lock.acquire()
state = _StatesList[devname.lower()] if devname.lower() in _StatesList.keys() else None
GlobalCallback.lock.release()
return state
[docs]def setStateFor(devname,state):
print 'In callbacks.setStateFor ...'
GlobalCallback.lock.acquire()
_StatesList[devname.lower()]=state
GlobalCallback.lock.release()
return state
[docs]def setAttributeValue(attr_name,attr_value):
print 'In callbacks.setAttributeValue(%s)'%attr_name
GlobalCallback.lock.acquire()
_AttributesList[attr_name.lower()]=attr_value
GlobalCallback.lock.release()
return attr_value
[docs]def inAttributesList(attname):
GlobalCallback.lock.acquire()
value=bool(attname.lower() in _AttributesList.keys())
GlobalCallback.lock.release()
return bool
[docs]def getAttrValueFor(attname):
GlobalCallback.lock.acquire()
value=_AttributesList[attname.lower()]
GlobalCallback.lock.release()
return value
[docs]def inEventsList(attname):
GlobalCallback.lock.acquire()
value=bool(attname.lower() in _EventsList.keys())
GlobalCallback.lock.release()
return value
[docs]def getEventFor(attname):
GlobalCallback.lock.acquire()
event=_EventsList[attname.lower()]
GlobalCallback.lock.release()
return event
[docs]def getEventItems():
GlobalCallback.lock.acquire()
result = _EventsList.items()
GlobalCallback.lock.release()
return result
[docs]def getSubscribedItems(receiver):
'''It returns a list with all devices managed by callbacks to which this receiver is effectively subscribed'''
GlobalCallback.lock.acquire()
result = []
for ev in _EventsList.values():
if receiver in ev.receivers:
result.append (ev.dev_name)
GlobalCallback.lock.release()
return result
###############################################################################
# OLD API, DEPRECATED
[docs]def addTAttr(tattr):
try:
GlobalCallback.lock.acquire()
att_name = tattr.name.lower()
_EventsList[att_name] = tattr
_AttributesList[att_name]=None
if att_name.endswith=='/state':
_StatesList[tattr.dev_name.lower()]=None
except: print traceback.format_exc()
finally: GlobalCallback.lock.release()
return
[docs]def addReceiver(attribute,receiver):
try:
GlobalCallback.lock.acquire()
if not receiver.lower() in _EventsList[attribute.lower()].receivers:
_EventsList[attribute.lower()].receivers.append(receiver.lower())
except: print traceback.format_exc()
finally: GlobalCallback.lock.release()
return
[docs]def subscribeDeviceAttributes(self,dev_name,attrs):
"""
This is how attributes were registered in the old PyStateComposer
"""
dev = PyTango.DeviceProxy(dev_name)
dev.ping()
# Initializing lists
if dev_name not in callbacks._StatesList: callbacks._StatesList[dev_name] = PyTango.DevState.UNKNOWN
if dev_name not in callbacks._AttributesList: callbacks._AttributesList[dev_name] = None
for att in attrs:
att_name = (dev_name+'/'+att).lower()
if att not in dev.get_attribute_list():
continue
if not dev.is_attribute_polled(att) and self.ForcePolling:
self.info('::AddDevice(): forcing %s polling to %s' % (att,argin))
period = dev.get_attribute_poll_period(att) or 3000
dev.poll_attribute(att,period)
self.debug("%s.poll_attribute(%s,%s)" % (argin,att,period))
#cb = self
cb = GlobalCallback
if not att_name in callbacks._EventsList.keys():
callbacks._EventsList[att_name] = self.TAttr(att_name)
callbacks._EventsList[att_name].receivers.append(self.get_name())
self.info('AddDevice: subscribing event for %s' % att_name)
event_id = dev.subscribe_event(att,PyTango.EventType.CHANGE,cb,[],True)
# Global List
callbacks._EventsList[att_name].dp = dev
callbacks._EventsList[att_name].event_id = event_id
callbacks._EventsList[att_name].dev_name = dev_name
print "In ", self.get_name(), "::AddDevice()", ": Listing Device/Attributes in _EventsList:"
for a,t in callbacks._EventsList.items(): print "\tAttribute: ",a,"\tDevice: ",t.dev_name,"\n"
else:
self.warning("::AddDevice(%s): This attribute is already in the list, adding composer to receivers list." % att_name)
if not dev_name in callbacks._EventsList[att_name].receivers:
callbacks._EventsList[att_name].receivers.append(self.get_name())
if callbacks._EventsList[att_name].attr_value is not None:
if att is 'State':
callbacks._StatesList[dev_name]=_EventsList[att_name].attr_value.value
else:
callbacks._AttributesList[dev_name]=_EventsList[att_name].attr_value.value
return
###############################################################################
# TESTING
[docs]def __test__(args):
import fandango.callbacks as fb
a = args and args[0] or 'test/events/1/currentime'
es = fb.EventSource(a)
es.thread().set_period_ms(1000.)
es.thread().setLogLevel('DEBUG')
es.thread().filtered = True
#es.setLogLevel('DEBUG')
es.KeepAlive = 10000.
el = fb.EventListener('listener')
el.setLogLevel('DEBUG')
es.addListener(el)
el.set_value_hook(
lambda s,t,v:
printf('Current: %s'%s.read().value)
)
if 'ipython' not in str(sys.argv):
while (1):
try: threading.Event().wait(1.)
except: break
if __name__ == '__main__':
#__test__(sys.argv[1:])
pass
from . import doc
__doc__ = doc.get_fn_autodoc(__name__,vars())