#!/usr/bin/env python2.5
"""
#############################################################################
##
## file : threads.py
##
## description : see below
##
## project : Tango Control System
##
## $Author: Sergi Rubio Manrique, srubio@cells.es $
##
## $Revision: 2011 $
##
## copyleft : ALBA Synchrotron Controls Section, CELLS
## Bellaterra
## Spain
##
#############################################################################
##
## This file is part of Tango Control System
##
## Tango Control System is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as published
## by the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## Tango Control System is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
###########################################################################
by Sergi Rubio,
srubio@cells.es,
2010
"""
import time,threading,multiprocessing,traceback
import imp,__builtin__,pickle,re
from threading import Event,RLock,Thread
try: import Queue
except: import queue as Queue
from log import except2str,shortstr
from functional import *
from excepts import trial,Catched,CatchedArgs
from operator import isCallable
from objects import Singleton,Object,SingletonMap
try: from collections import namedtuple #Only available since python 2.6
except: pass
###############################################################################
_EVENT = threading.Event()
[docs]def wait(seconds,event=True,hook=None):
"""
:param seconds: seconds to wait for
:param event: if True (default) it uses a dummy Event, if False it uses time.sleep, if Event is passed then it calls event.wait(seconds)
"""
r = 0
try:
if hook and isCallable(hook):
Catched(hook)()
r+=1
if not event:
time.sleep(seconds)
elif hasattr(event,'wait'):
try:
event.wait(seconds)
except Exception,e:
raise e
else:
_EVENT and _EVENT.wait(seconds)
r+=2
except Exception,e:
## This method triggers unexpected exceptions on ipython exit
print('wait.hook failed!: %s,%s,%s,%s'%(event,event.wait,r,e))
if time: time.sleep(seconds)
[docs]def timed_range(seconds,period,event=None):
"""
Method used to execute the content of a for loop at periodic intervals.
For X seconds, this method will return each period fragment.
event can be used to pass a threading.Event to abort the loop if needed.
Usage:
for t in trange(15,0.1):
method_executed_at_10Hz_for_15s()
"""
t0 = time.time()
diff = 0
e = event or threading.Event()
while diff<seconds and not e.is_set():
e.wait(period)
diff = time.time()-t0
if not e.is_set:
yield diff
[docs]class FakeLock(object):
""" Just for debugging, can replace a Lock when debugging a deadLock issue. """
[docs] def release(self):pass
###############################################################################
[docs]class ThreadedObject(Object):
"""
An Object with a thread pool that provides safe stop on exit.
It has a permanent thread running, that it's just paused
Created to allow safe thread usage in Tango Device Servers
WARNING DO NOT CALL start()/stop() methods inside target or any hook,
it may provoke unexpected behaviors.
Some arguments:
:target: function to be called
:period=1: iteration frequency
:nthreads=1: size of thread pool
:min_wait=1e-5: max frequency allowed
:first=0: first iteration to start execution
:start=False: start processing at object creation
Statistics and execution hooks are provided:
:start_hook: launched after an start() command,
may return args,kwargs for the target() method
:loop_hook: like start_hook, but called at each iteration
:stop_hook: called after an stop() call
:wait_hook: called before each wait()
"""
INSTANCES = []
def __init__(self,target=None,period=1.,nthreads=1,start=False,min_wait=1e-5,first=0):
self._event = threading.Event()
self._stop = threading.Event()
self._done = threading.Event()
self._kill = threading.Event()
self._started = 0
self._min_wait = 1e-5
self._first = first
self._count = -1
self._errors = 0
self._delay = 0
self._acc_delay = 0
self._usage = 1.
self._next = time.time()+period
self._last = 0
self._queue = []
self._start_hook = self.start_hook
self._loop_hook = self.loop_hook
self._stop_hook = self.stop_hook
self._wait_hook = None
self._last_exc = ''
self._threads = []
if nthreads>1:
print('Warning: ThreadedObject.nthreads>1 Not Implemented Yet!')
for i in range(nthreads):
self._threads.append(threading.Thread(target=self.loop))
self._threads[i].daemon = True
self.set_period(period)
self.set_target(target)
self.stop(wait=False)
if start:
#Not starting threads, just reset flags
self.start()
for t in self._threads:
t.start()
ThreadedObject.INSTANCES.append(self)
def __del__(self):
self.kill()
@classmethod
def __test__(klass):
""" returns True if performs 2 successful cycles """
to = klass(period=.1)
to.set_target(
lambda o=to:setattr(o,'SUCCESS',
not getattr(o,'SUCCESS',True)))
to.start()
wait(2*to.get_period())
try: v = to.SUCCESS
except: v = False
to.stop()
return v
## HELPERS
[docs] def get_count(self): return self._count
[docs] def get_errors(self): return self._errors
[docs] def get_delay(self): return self._delay
[docs] def get_acc_delay(self): return self._acc_delay
[docs] def get_avg_delay(self): return self._acc_delay/(self._count or 1)
[docs] def get_usage(self): return self._usage
[docs] def get_next(self): return self._next
[docs] def get_last(self): return self._last
[docs] def get_started(self): return self._started
[docs] def get_thread(self,i=0): return self._threads[i]
[docs] def get_nthreads(self): return len(self._threads)
[docs] def is_alive(self,i=0): return self.get_thread(i).is_alive()
[docs] def set_period(self,period): self._timewait = period
[docs] def get_period(self): return self._timewait
[docs] def set_target(self,target): self._target = target
[docs] def get_target(self): return self._target
[docs] def set_start_hook(self,target): self._start_hook = target
[docs] def set_loop_hook(self,target): self._loop_hook = target
[docs] def set_stop_hook(self,target): self._stop_hook = target
[docs] def set_wait_hook(self,target): self._wait_hook = target
[docs] def print_exc(self,e='',msg=''):
if not e and traceback:
e = traceback.format_exc()
self._last_exc = str(e)
print(msg+':'+self._last_exc)
[docs] def set_queue(self,queue):
"""
A list of timestamps can be passed to the main loop to force
a faster or slower processing.
This list will override the current periodic execution,
target() will be executed at each queued time instead.
"""
self._queue = queue
## MAIN METHODS
[docs] def start(self):
if self._started:
self.stop()
else: #abort stop wait
self._event.clear()
self._done.clear()
self._stop.clear()
[docs] def stop(self,wait=3.):
self._event.set()
self._stop.set()
if not wait: wait = .1e-5
self._done.wait(wait)
self._done.clear()
self._event.clear()
@staticmethod
[docs] def stop_all():
for i in ThreadedObject.INSTANCES:
try: i.stop()
except: pass
[docs] def kill(self,wait=3.):
self._kill.set()
self.stop(wait)
@staticmethod
[docs] def kill_all():
for i in ThreadedObject.INSTANCES:
try: i.kill()
except: pass
[docs] def start_hook(self,*args,**kwargs):
""" redefine at convenience, it will return the arguments for target method """
return [],{}
print('Starting push_loop(%s)'%self._timewait)
print('Sending %d events in bunches of %d every %f seconds'%(
self.MaxEvents,self.ConsecutiveEvents,self._timewait))
t0,t1,ts,self.send_buffer = time.time(),0,0,[]
tnext = t0 + self._timewait
[docs] def loop_hook(self,*args,**kwargs):
""" redefine at convenience, it will return the arguments for target method """
return [],{}
[docs] def stop_hook(self,*args,**kwargs):
""" redefine at convenience """
pass
[docs] def clean_stats(self):
self._count = 0
self._errors = 0
self._delay = 0
self._acc_delay = 0
self._last = 0
[docs] def loop(self):
try:
self._done.set() #Will not be cleared until stop/start() are called
while not self._kill.isSet():
while self._stop.isSet():
wait(self._timewait,self._event,self._wait_hook)
self._done.clear()
self.clean_stats()
ts = time.time()
## Evaluate target() arguments
try:
args,kwargs = self._start_hook(ts)
except:
if self._errors < 10:
self.print_exc()
self._errors += 1
args,kwargs = [],{}
print('ThreadedObject.Start() ...')
self._started = time.time()
self._next = self._started + self._timewait
while not self._stop.isSet():
try:
self._event.clear()
## Task Execution
if count>=self._first:
try:
if self._target:
self._target(*args,**kwargs)
except:
if self._errors < 10:
self.print_exc()
self._errors += 1
## Obtain next scheduled execution
t1,tn = time.time(),ts+self._timewait
if self._queue:
while self._queue and self._queue[0]<self._next:
self._queue.pop(0)
if self._queue:
tn = self._queue[0]
## Wait and Calcullate statistics
self._next = tn
tw = self._next-t1
self._usage = (t1-ts)/self._timewait
#self.debug('waiting %s'%tw)
wait(max((tw,self._min_wait)),self._event,self._wait_hook)
ts = self._last = time.time()
self._delay = ts>self._next and ts-self._next or 0
self._acc_delay = self._acc_delay + self._delay
## Execute Loop Hook to reevaluate target Arguments
if count>=self._first:
try:
args,kwargs = self._loop_hook(ts)
except:
if self._errors < 10:
self.print_exc()
self._errors += 1
args,kwargs = [],{}
self._count += 1
except Exception,e:
self.print_exc(e if not traceback else traceback.format_exc(),
'ThreadObject stop!')
raise e
print('ThreadedObject.Stop(...)')
self._started = 0
self._done.set() #Will not be cleared until stop/start() are called
Catched(self._stop_hook)()
print('ThreadedObject.Kill() ...')
return #<< Should never get to this point
except Exception,e:
self.print_exc(e,'ThreadObject exit!')
###############################################################################
[docs]class CronTab(object):
"""
Line Syntax:
#Minutes Hour DayOfMonth(1-31) Month(1-12) DayOfWeek(0=Sunday-6) Task
00 */6 * * * /homelocal/sicilia/archiving/bin/cleanTdbFiles /tmp/archiving/tdb --no-prompt
ct = fandango.threads.CronTab('* 11 24 08 3 ./command &') #command can be replaced by a callable task argument
ct.match() #It will return True if actual time matches crontab condition, self.last_match stores last time check
True
ct.start()
In CronTab(* 11 24 08 3 date).start()
CronTab thread started
In CronTab(* 11 24 08 3 date).do_task(<function <lambda> at 0x8cc4224>)
CronTab(* 11 24 08 3 date).do_task() => 3
ct.stop()
"""
def __init__(self,line='',task=None,start=False,process=False,keep=10,trace=False):
if line: self.load(line)
if task is not None: self.task = task
self.last_match = 0
self.trace = trace
self.keep = keep
self.THREAD_CLASS = threading.Thread if not process else multiprocessing.Process
self.QUEUE_CLASS = Queue.Queue if not process else multiprocessing.Queue
self.EVENT_CLASS = threading.Event if not process else multiprocessing.Event
self.LOCK_CLASS = threading.RLock if not process else multiprocessing.RLock
self._thread = None
self.event = None
self._queue = self.QUEUE_CLASS(maxsize=int(self.keep or 10))
if start: self.start()
[docs] def load(self,line):
"""
Crontab line parsing
"""
print 'In CronTab().load(%s)'%line
vals = line.split()
if len(vals)<5: raise Exception('NotEnoughArguments')
self.minute,self.hour,self.day,self.month,self.weekday = vals[:5]
if vals[5:] or not getattr(self,'task',None): self.task = ' '.join(vals[5:])
self.line = line
def _check(self,cond,value):
if '*'==cond: return True
elif '*/' in cond: return not int(value)%int(cond.replace('*/',''))
else: return int(cond)==int(value)
[docs] def match(self,now=None):
"""
Returns True if actual timestamp matches cron configuration
"""
if now is None: now=time.time()
self.last_match = now-(now%60)
tt = time2tuple(now)
if all(self._check(c,v) for c,v in
zip([self.minute,self.hour,self.day,self.month,self.weekday],
[tt.tm_min,tt.tm_hour,tt.tm_mday,tt.tm_mon,tt.tm_wday+1])
):
return True
else:
return False
[docs] def changed(self,now=None):
"""
Checks if actual timestamp differs from last cron check
"""
if now is None: now=time.time()
return (now-(now%60))!=self.last_match
[docs] def do_task(self,task=None,trace=False):
"""
Executes an string or callable
"""
trace = trace or self.trace
task = task or self.task
if trace: print 'In CronTab(%s).do_task(%s)'%(self.line,task)
if isCallable(task):
ret = task()
elif isString(task):
from fandango.linos import shell_command
ret = shell_command(self.task)
else:
raise Exception('NotCallable/String')
if self.keep:
if self._queue.full(): self.get()
self._queue.put(ret,False)
if trace:
print 'CronTab(%s).do_task() => %s'%(self.line,ret)
[docs] def get(self):
return self._queue.get(False)
def _run(self):
print 'CronTab thread started'
from fandango.linos import shell_command
while not self.event.is_set():
now = time.time()
if self.changed(now) and self.match(now):
try:
self.do_task()
except:
print 'CronTab thread exception'
print traceback.format_exc()
self.event.wait(15)
print 'CronTab thread finished'
return
[docs] def start(self):
print 'In CronTab(%s).start()'%self.line
if self._thread and self._thread.is_alive:
self.stop()
self._thread = self.THREAD_CLASS(target=self._run)
self.event = self.EVENT_CLASS()
self._thread.daemon = True
self._thread.start()
[docs] def stop(self):
print 'In CronTab(%s).stop()'%self.line
if self._thread and self._thread.is_alive:
self.event.set()
self._thread.join()
[docs] def is_alive(self):
if not self._thread: return False
else: return self._thread.is_alive()
###############################################################################
WorkerException = type('WorkerException',(Exception,),{})
[docs]class WorkerThread(object):
"""
This class allows to schedule tasks in a background thread or process
If no process() method is overriden, the tasks introduced in the internal queue using put(Task) method may be:
- dictionary of built-in types: {'__target__':callable or method_name,'__args__':[],'__class_':'','__module':'','__class_args__':[]}
- string to eval: eval('import $MODULE' or '$VAR=code()' or 'code()')
- list if list[0] is callable: value = list[0](*list[1:])
- callable: value = callable()
It also allows to pass a hook method to be called for every main method execution.
Usage::
wt = fandango.threads.WorkerThread(process=True)
wt.start()
wt.put('import fandango')
wt.put("tc = fandango.device.TangoCommand('lab/15/vgct-01/sendcommand')")
command = "tc.execute(feedback='status',args=['ver\r\\n'])"
wt.put("tc.execute(feedback='status',args=['ver\r\\n'])")
while not wt.getDone():
wt.stopEvent.wait(1.)
pile = dict(wt.flush())
result = pile[command]
"""
SINGLETON = None
def __init__(self,name='',process=False,wait=.01,target=None,hook=None,trace=False):
self._name = name
self.wait = wait
self._process = process
self._trace = trace
self.hook=hook
self.THREAD_CLASS = threading.Thread if not process else multiprocessing.Process
self.QUEUE_CLASS = Queue.Queue if not process else multiprocessing.Queue
self.EVENT_CLASS = threading.Event if not process else multiprocessing.Event
self.LOCK_CLASS = threading.RLock if not process else multiprocessing.RLock
self.inQueue = self.QUEUE_CLASS()
self.outQueue = self.QUEUE_CLASS()
self.errorQueue = self.QUEUE_CLASS()
self.stopEvent = self.EVENT_CLASS()
if target is not None:
self.put(target)
self._thread = self.THREAD_CLASS(name='Worker',target=self.run)
self._thread.daemon = True
pass
def __del__(self):
try:
self.stop()
object.__del__(self)
except: pass
[docs] def locals(self,key=None):
return self._locals if key is None else self._locals.get(key)
[docs] def put(self,target):
"""Inserting a new object in the Queue."""
self.inQueue.put(target,False)
[docs] def get(self):
"""Getting the oldest element in the output queue in (command,result) format"""
try:
self.getDone()
try:
while True: print self.errorQueue.get(False)
except Queue.Empty:
pass
return self.outQueue.get(False)
except Queue.Empty:
#if self.outQueue.qsize():
#print('FATAL PickleError, output queue has been lost')
#self.outQueue = self.QUEUE_CLASS()
return None
[docs] def flush(self):
"""
Getting all elements stored in the output queue in [(command,result)] format
"""
result = []
try:
while True: result.append(self.outQueue.get(False))
except Queue.Empty:
pass
return result
[docs] def start(self):
self._thread.start()
[docs] def stop(self):
self.stopEvent.set()
self._thread.join()
[docs] def isAlive(self):
return self._thread.is_alive()
[docs] def getQueue(self):
return self.outQueue
[docs] def getSize(self):
return self.inQueue.qsize()
[docs] def getDone(self):
return not self.inQueue.qsize() and not self.outQueue.qsize()
[docs] def process(self,target):
"""
This method can be overriden in child classes to perform actions distinct from evalX
"""
self.modules = getattr(self,'modules',{})
self.instances = getattr(self,'instances',{})
self._locals = getattr(self,'_locals',{})
return evalX(target,
_locals=self._locals,modules=self.modules,instances=self.instances,
_trace=self._trace,_exception=WorkerException)
[docs] def run(self):
print 'WorkerThread(%s) started!'%self._name
logger = getattr(__builtin__,'print') if not self._process else (lambda s:(getattr(__builtin__,'print')(s),self.errorQueue.put(s)))
while not self.stopEvent.is_set():
try:
target,value = self.inQueue.get(True,timeout=self.wait),None
if self.stopEvent.is_set(): break
if target is not None:
try:
model = target #To avoid original target to be overriden in process()
value = self.process(target)
try: pickle.dumps(value)
except pickle.PickleError:
print traceback.format_exc()
raise WorkerException('UnpickableValue')
self.outQueue.put((model,value))
except Exception,e:
msg = 'Exception in WorkerThread(%s).run()\n%s'%(self._name,except2str())
print( msg)
self.outQueue.put((target,e))
finally:
if not self._process: self.inQueue.task_done()
if self.hook is not None:
try:
self.hook()
except:
print('Exception in WorkerThread(%s).hook()\n%s'%(self._name,except2str()))
except Queue.Empty:
pass
except:
print 'FATAL Exception in WorkerThread(%s).run()'%self._name
print except2str()
print 'WorkerThread(%s) finished!'%self._name
import objects
[docs]class SingletonWorker(WorkerThread,objects.Singleton):
"""
Usage::
# ... same like WorkerThread, but command is required to get the result value
command = "tc.execute(feedback='status',args=['ver\r\\n'])"
sw.put(command)
sw.get(command)
"""
[docs] def put(self,target):
if not hasattr(self,'_queued'): self._queued = []
self._queued.append(target)
WorkerThread.put(self,target)
[docs] def get(self,target):
"""
It flushes the value stored for {target} task.
The target argument is needed to avoid mixing up commands from different requestors.
"""
if not hasattr(self,'_values'): self._values = {}
self._values.update(self.flush())
return self._values.pop(target)
[docs] def flush(self):
#It just flushes received values
l = []
l.extend(getattr(self,'_values',{}).items())
l.extend(WorkerThread.flush(self))
[self._queued.remove(v) for v in l if v in self._queued]
return l
[docs] def values(self):
return self._values
[docs] def done(self):
return not bool(self._queued)
###############################################################################
[docs]class DataExpired(Exception):
def __str__(self):
return 'DataExpired(%s)'%Exception.__str__(self)
__repr__ = __str__
[docs]def getPickable(value):
try:
pickle.dumps(value)
except:# pickle.PickleError:
if isinstance(value,Exception):
return Exception(str(value))
else:
value = str(value)
return value
[docs]class ProcessedData(object):
""" This struct stores named data with value,date associated and period,expire time constraints """
def __init__(self,name,target=None,args=None,period=None,expire=None,callback=None):
self.name = name
self.target = target or name
self.args = args
self.period = period or 0
self.expire = expire or 0
self.callback = callback
self.date = -1
self.value = None
[docs] def get_args(self):
return (self.name,self.target,self.args,self.period,self.expire,None)
def __repr__(self):
return 'ProcessedData(%s)=(%s) at %s'%(str(self.get_args()),self.value,time2str(self.date))
def __str__(self):
return str(self.get_args())
[docs] def set(self,value):
self.date = time.time()
self.value = value
[docs] def get(self):
result = self.value
if self.date<=0:
raise DataExpired('%s s'%self.expire)
else:
now = time.time()
expired = self.date+max((self.expire,self.period)) < now
if self.expire>=0 and expired: #(self.expire==0 or self.date>0 and expired)):
self.date = -1
self.value = None
return result
[docs]class WorkerProcess(Object,SingletonMap): #,Object,SingletonMap):
"""
Class that provides a multiprocessing interface to process tasks in a background process and throw callbacks when finished.
See full description and usage in doc/recipes/Threading.rst
"""
ALIVE_PERIOD = 15
__ALIVE,__ADDKEY,__REMOVEKEY,__BIND,__NULL,__PAUSE = '__ALIVE','__ADDKEY','__REMOVEKEY','__BIND','__NULL','__PAUSE'
def __init__(self,target=None,start=True,timeout=0,timewait=0):
"""
:param target: If not None, target will be an object which methods could be targetted by queries.
"""
import multiprocessing
import threading
from collections import defaultdict
self.trace('__init__(%s)'%(target))
self.timeout = timeout or self.ALIVE_PERIOD #Maximum time between requests, process will die if exceeded
self.paused = 0
self.timewait = max((timewait,0.02)) #Time to wait between operations
self.data = {} #It will contain a {key:ProcessedData} dictionary
self.last_alive = 0
#Process Part
self._pipe1,self._pipe2 = multiprocessing.Pipe()
self._process_event,self._threading_event,self._command_event = multiprocessing.Event(),threading.Event(),threading.Event()
self._process = multiprocessing.Process(
target=self._run_process, #Callable for the process.main()
args=[self._pipe2,self._process_event,target] #List of target method arguments
)
#Thread part
self._receiver = threading.Thread(target=self._receive_data)
self._process.daemon,self._receiver.daemon = True,True
self.callbacks = defaultdict(list)
if start: self.start()
def __del__(self):
self.stop()
type(self).__base__.__del__(self)
[docs] def trace(self,msg,level=0):
print '%s, %s: %s'%(time2str(),type(self).__name__,str(msg))
[docs] def bind(self,target,args=None):
self.send(self.__BIND,target=target,args=args)
#Thread Management
[docs] def start(self):
#Launches the process and threads
self._receiver.start(),self._process.start()
[docs] def stop(self):
#This method stops all threads
self.trace('stop()')
self._process_event.set(),self._threading_event.set()
self._pipe1.close(),self._pipe2.close()
[docs] def isAlive(self):
return self._process.is_alive() and self._receiver.is_alive()
[docs] def keys(self):
return self.data.keys()
[docs] def add(self,key,target=None,args=None,period=0,expire=0,callback=None):
# Adds a command to be periodically executed
data = self.data[key] = ProcessedData(key,target=target,args=args,period=period,expire=expire,callback=callback)
self.send(self.__ADDKEY,target=data.get_args())
[docs] def get(self,key,default=__NULL,_raise=False):
# Returns a key value (or default if defined)
if key not in self.data and default!=self.__NULL: return default
result = self.data[key].get()
if _raise and isinstance(result,Exception): raise result
return result
[docs] def pop(self,key):
# Returns a key value and removes from dictionary
d = self.data.pop(key)
self.send(self.__REMOVEKEY,key)
return d
[docs] def pause(self,timeout):
# Stops for a while the execution of scheduled keys
self.paused = time.time()+timeout
self.send(self.__PAUSE,target=self.paused)
[docs] def send(self,key,target,args=None,callback=None):
"""
This method throws a new key,query,callback tuple to the process Pipe
Queries may be: (key,) ; (key,args=None) ; (key,command,args=None)
"""
keywords = (self.__BIND,self.__ADDKEY,self.__ALIVE,self.__REMOVEKEY,None)
if (key in keywords or key not in self.callbacks):
#self.trace('send(%s,%s,%s,%s)'%(key,target,args,callback))
if key not in keywords: self.callbacks[key] = [callback]
if args is not None: self._pipe1.send((key,target,args))
else: self._pipe1.send((key,target))
elif callback not in self.callbacks[key]:
#self.trace('send(%s,%s,%s,%s) => %s'%(key,target,args,callback,self.callbacks[key]))
self.callbacks[key].append(callback)
return
[docs] def command(self,command,args=None):
""" This method performs a synchronous command (no callback, no persistence),
it doesn't return until it is resolved """
self._return = None
self.send(key=str(command),target=command,args=args,callback=lambda q,e=self._command_event,s=self:(setattr(s,'_return',q),e.set()))
while not self._command_event.is_set(): self._command_event.wait(self.timewait)
self._command_event.clear()
return self._return
# Protected methods
@staticmethod
[docs] def get_hash(d):
"""This method just converts a dictionary into a hashable type"""
if isMapping(d): d = d.items()
if isSequence(d): d = sorted(d)
return str(d)
@staticmethod
[docs] def get_callable(key,executor=None):
try:
x = []
if isinstance(key,basestring):
trial(lambda:x.append(evalX(key)),lambda:x.append(None))
return first(a for a in (
key,
x and x[0],
isinstance(key,basestring) and getattr(executor,key,None), #key is a member of executor
getattr(executor,'process',None), #executor is a process object
executor, #executor is a callable
#isinstance(key,basestring) and evalX(key), # key may be name of function
) if a and isCallable(a))
except StopIteration,e:
return None
#@staticmethod
def _run_process(self,pipe,event,executor=None):
"""
This is the main loop of the background Process.
Queries sent to Process can be executed in different ways:
- Having a process(query) method given as argument.
- Having an object with a key(query) method: returns (key,result)
- If none of this, passing query to an evalX call.
Executor will be a callable or an object with 'target' methods
"""
last_alive,idle = time.time(),0 #Using NCycles count instead of raw time to avoid CPU influence
key,paused = None,0
scheduled = {} #It will be a {key:[data,period,last_read]} dictionary
locals_,modules,instances = {'executor':executor},{},{}
key = None
self.trace('.Process(%s) started'%str(executor or ''))
while not event.is_set() and (pipe.poll() or idle<(self.timeout/self.timewait)): #time.time()<(last_alive+self.timeout)):
try:
idle+=1
now = time.time()
if pipe.poll():
t = pipe.recv() #May be (key,) ; (key,args=None) ; (key,command,args)
key,target,args = [(None,None,None),(t[0],None,None),(t[0],t[1],None),t][len(t)]
if key!=self.__ALIVE: self.trace(shortstr('.Process: Received: %s => (%s,%s,%s)'%(str(t),key,target,args)))
last_alive,idle = time.time(),0 #Keep Alive Thread
elif scheduled and time.time()>paused:
data = first(sorted((v.date+v.period,v) for n,v in scheduled.items()))[-1]
if key not in scheduled and key is not None:
self.trace('Nothing in queue, checking scheduled tasks ...')
if (data.date+data.period)<=now:
data.date = now
key,target,args = data.name,data.target,data.args
else: #print '%s > %s - %s' % (time2str(now),time2str(next))
key = None
if key == self.__PAUSE:
# should delay scheduled commands but not freeze those synchronous (like ADDKEY or COMMAND)
paused = target
self.trace('.Process: Scheduled keys will be paused %s seconds.'%(paused-time.time()))
elif key == self.__ADDKEY: #(__ADDKEY,(args for ProcessedData(*)))
#Done here to evaluate not-periodic keys in the same turn that they are received
data = ProcessedData(*target)
if data.period>0:
scheduled[data.name]=data
self.trace('.Process: Added key: %s'%str(data))
else:
if data.name in scheduled:
self.trace('.Process: Removing %s key'%data.name)
scheduled.pop(data.name)
if time.time()>paused:
key,target,args = data.name,data.target,data.args #Added data will be immediately read
if key is not None:
try:
if key == self.__REMOVEKEY: #(__REMOVEKEY,key)
if target in scheduled: scheduled.pop(target)
self.trace(scheduled)
elif key == self.__BIND:
# Setting a new executor object
if isCallable(target): executor = target
elif isinstance(target,basestring): executor = evalX(target,locals_,modules,instances)
else: executor = target
if isCallable(executor) and args is not None:
if isMapping(args): executor = executor(**args)
elif isSequence(args): executor = executor(*args)
else: executor = executor(args)
locals_['executor'] = executor
self.trace('.Process: Bound executor object to %s'%(executor))
elif key!=self.__ALIVE:
if args is None and executor is not None and (isCallable(executor) or getattr(executor,'process',None)):
# Target is a set of arguments to Executor object
exec_ = getattr(executor,'process',executor)
args = target
elif isinstance(target,basestring):
# Target is a member of executor or an string to be evaluated
# e.g. getattr(Reader,'get_attribute_values')(*(attr,start,stop))
if hasattr(executor,target): exec_ = getattr(executor,target)
else: exec_ = evalX(target,locals_,modules,instances) #Executor bypassed if both target and args are sent
else:
#Target is a value or callable
exec_ = target
if isCallable(exec_):
# Executing
if key not in scheduled:
self.trace(shortstr('.Process: [%s] = %s(%s)(*%s)'%(key,exec_,target,args)))
if args is None: value = exec_()
elif isDictionary(args): value = exec_(**args)
elif isSequence(args): value = exec_(*args)
else: value = exec_(args)
else:
#target can be a an object member or eval(target) result
if key not in scheduled:
self.trace(shortstr('.Process: [%s] = %s(*%s)'%(key,target,args)))
value = exec_
pipe.send((key,getPickable(value)))
except Exception,e:
self.trace('.Process:\tError in %s process!\n%s\n%s\n%s'%(key,target,args,except2str(e)))
#print traceback.format_exc()
#print e
pipe.send((key,getPickable(e)))
except Exception,e:
self.trace('.Process:\tUnknown Error in process!\n%s'%traceback.format_exc())
key = None
event.wait(self.timewait)
print '!'*80
self.trace('.Process: exit_process: event=%s, thread not alive for %d s' % (event.is_set(),time.time()-last_alive))
def _receive_data(self):
"""
Main loop of the thread receiving data from the background Process (and launching callbacks when needed)
"""
while not self._threading_event.is_set():
try:
if self._pipe1.poll():
key,query = self._pipe1.recv()
if key not in self.data:
self.trace('.Thread: received %s data; pending: %s'%(key,self.callbacks.keys()))
pass
if key in self.keys():
self.data[key].set(query)
if self.data[key].callback:
try:
self.trace('.Thread:\tlaunching %s callback %s'%(key,callback))
self.data[key].callback(query)
except:
self.trace('.Thread:\tError in %s callback %s!'%(key,callback))
self.trace(except2str())
if key in self.callbacks:
for callback in self.callbacks[key]:
if callback is None: continue
try:
self.trace('.Thread:\tlaunching callback %s'%callback)
callback(query)
except:
self.trace('.Thread:\tError in %s callback %s!'%(key,callback))
self.trace(except2str())
self.callbacks.pop(key)
except: self.trace('.Thread,Exception:%s'%traceback.format_exc())
try:
if time.time() > self.last_alive+3:
self._pipe1.send((self.__ALIVE,None))
self.last_alive = time.time()
except: self.trace('.Thread.is_alive(),Exception:%s'%traceback.format_exc())
self._threading_event.wait(self.timewait)
print '!'*80
self.trace('.Thread: exit_data_thread')
self.trace('<'*80)
self.trace('<'*80)
###############################################################################
[docs]class Pool(object):
"""
It creates a queue of tasks managed by a pool of threads.
Each task can be a Callable or a Tuple containing args for the "action" class argument.
If "action" is not defined the first element of the tuple can be a callable, and the rest will be arguments
Usage:
p = Pool()
for item in source(): p.add_task(item)
p.start()
while len(self.pending()):
time.sleep(1.)
print 'finished!'
"""
def __init__(self,action=None,max_threads=5,start=False,mp=False):
import threading
if mp==True:
import multiprocessing
self._myThread = multiprocessing.Process
self._myQueue = multiprocessing.Queue
else:
import Queue
self._myThread = threading.Thread
self._myQueue = Queue.Queue
self._action = action
self._max_threads = max_threads
self._threads = []
self._pending = []
self._stop = threading.Event()
self._lock = threading.Lock()
self._locked = partial(locked,_lock=self._lock)
self._started = start
self._queue = self._myQueue()
[docs] def start(self):
"""
Start all threads.
"""
[t.start() for t in self._threads]
self._started = True
[docs] def stop(self):
self._stop.set()
[t.join(3.) for t in self._threads]
#while not self._queue.empty(): self._queue.get()
self._retire()
self._started = False
[docs] def add_task(self,item):#,block=False,timeout=None):
"""
Adds a new task to the queue
:param task: a callable or a tuple with callable and arguments
"""
self._locked(self._pending.append,str(item))
if self._started: self._retire()
if len(self._pending)>len(self._threads) and len(self._threads)<self._max_threads:
self._new_worker()
self._queue.put(item)#,block,timeout)
[docs] def pending(self):
""" returns a list of strings with the actions not finished yet"""
self._retire()
return self._pending
####################################################################################
#Protected methods
def _new_worker(self):
#Creates a new thread
t = self._myThread(target=self._worker)
self._locked(self._threads.append,t)
t.daemon = True
if self._started: t.start()
def _retire(self):
#Cleans dead threads
dead = [t for t in self._threads if not t.is_alive()]
for t in dead:
self._locked(self._threads.remove,t)
def _worker(self):
#Processing queue items
while not self._stop.is_set() and not self._queue.empty():
item = self._queue.get()
try:
if item is not None and isCallable(item):
item()
elif isSequence(item):
if self._action: self._action(*item)
elif isCallable(item[0]): item[0](*item[1:])
elif self._action:
self._action(item)
except:
import traceback
print('objects.Pool.worker(%s) failed: %s'%(str(item),traceback.format_exc()))
self._remove_task(item)
return
def _remove_task(self,item=None):
#Remove a finished task from the list
if str(item) in self._pending:
self._locked(self._pending.remove,str(item))
return getattr(self._queue,'task_done',lambda:None)()
pass
###############################################################################
[docs]def SubprocessMethod(obj,method,*args,**kwargs):
"""
Method for executing reader.get_attribute_values in background with a timeout (30 s by default)
In fact, it allows to call any object method passed by name; or just pass a callable as object.
This method could be embedded in a thread with very high timeout to trigger a callback when data is received.
This advanced behavior is not implemented yet.
example:
reader,att = PyTangoArchiving.Reader(),'just/some/nice/attribute'
dates = '2014-06-23 00:00','2014-06-30 00:00'
values = fandango.threads.SubprocessMethod(reader,'get_attribute_values',att,*dates,timeout=10.)
or
def callback(v): print('>> received %d values'%len(v))
fandango.threads.SubprocessMethod(reader,'get_attribute_values',att,*dates,timeout=10.,callback=callback)
>> received 414131 values
"""
timeout = kwargs.pop('timeout',30.)
callback = kwargs.pop('callback',None)
print args
print kwargs
local,remote = multiprocessing.Pipe(False)
def do_query(o,m,conn,*a,**k):
if None in (o,m): m = o or m
else: m = getattr(o,m)
print m,a,k
conn.send(m(*a,**k))
conn.close()
args = (obj,method,remote)+args
subproc = multiprocessing.Process(target=do_query,args=args,kwargs=kwargs)
subproc.start()
t0 = time.time()
result = None
while time.time()<t0+timeout:
if local.poll():
result = local.recv()
break
threading.Event().wait(.1)
local.close(),remote.close()
subproc.terminate(),subproc.join()
if time.time()>t0+timeout:
raise Exception('TimeOut(%s)!'%str(obj))
elif callback:
callback(result)
else:
return result
[docs]class AsynchronousFunction(threading.Thread):
'''This class executes a given function in a separate thread
When finished it sets True to self.finished, a threading.Event object
Whether the function is thread-safe or not is something that must be managed in the caller side.
If you want to autoexecute the method with arguments just call:
t = AsynchronousFunction(lambda:your_function(args),start=True)
while True:
if not t.isAlive():
if t.exception: raise t.exception
result = t.result
break
print 'waiting ...'
threading.Event().wait(0.1)
print 'result = ',result
'''
def __init__(self,function):
"""It just creates the function object, you must call function.start() afterwards"""
self.function = function
self.result = None
self.exception = None
self.finished = threading.Event()
self.finished.clear()
threading.Thread.__init__(self)
self.wait = self.finished.wait
self.daemon = False
[docs] def run(self):
try:
self.wait(0.01)
self.result = self.function()
except Exception,e:
self.result = None
self.exception = e
self.finished.set() #Not really needed, simply call AsynchronousFunction.isAlive() to know if it has finished
from . import doc
__doc__ = doc.get_fn_autodoc(__name__,vars())