fandango.threads module¶
Contents
Description¶
## file : threads.py ## ## description : see below ## ## project : Tango Control System ## ## $Author: Sergi Rubio Manrique, srubio@cells.es $ ## ## $Revision: 2011 $ ## ## copyleft : ALBA Synchrotron Controls Section, CELLS ## Bellaterra ## Spain ## ############################################################################# ## ## This file is part of Tango Control System ## ## Tango Control System is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as published ## by the Free Software Foundation; either version 3 of the License, or ## (at your option) any later version. ## ## Tango Control System is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; if not, see <http://www.gnu.org/licenses/>. ###########################################################################
by Sergi Rubio, srubio@cells.es, 2010
Classes¶
Pool¶
- class fandango.threads.Pool(action=None, max_threads=5, start=False, mp=False)[source]¶
It creates a queue of tasks managed by a pool of threads. Each task can be a Callable or a Tuple containing args for the “action” class argument. If “action” is not defined the first element of the tuple can be a callable, and the rest will be arguments
- Usage:
p = Pool() for item in source(): p.add_task(item) p.start() while len(self.pending()):
time.sleep(1.)print ‘finished!’
CronTab¶
- class fandango.threads.CronTab(line='', task=None, start=False, process=False, keep=10, trace=False)[source]¶
Line Syntax: #Minutes Hour DayOfMonth(1-31) Month(1-12) DayOfWeek(0=Sunday-6) Task 00 */6 * * * /homelocal/sicilia/archiving/bin/cleanTdbFiles /tmp/archiving/tdb –no-prompt
ct = fandango.threads.CronTab(‘* 11 24 08 3 ./command &’) #command can be replaced by a callable task argument ct.match() #It will return True if actual time matches crontab condition, self.last_match stores last time check
True- ct.start()
In CronTab(* 11 24 08 3 date).start() CronTab thread started
In CronTab(* 11 24 08 3 date).do_task(<function <lambda> at 0x8cc4224>) CronTab(* 11 24 08 3 date).do_task() => 3
ct.stop()
WorkerException¶
- class fandango.threads.WorkerException¶
ThreadedObject¶
- class fandango.threads.ThreadedObject(target=None, period=1.0, nthreads=1, start=False, min_wait=1e-05, first=0)[source]¶
An Object with a thread pool that provides safe stop on exit.
It has a permanent thread running, that it’s just paused
Created to allow safe thread usage in Tango Device Servers
WARNING DO NOT CALL start()/stop() methods inside target or any hook, it may provoke unexpected behaviors.
Some arguments:
Target: function to be called Period=1: iteration frequency Nthreads=1: size of thread pool Min_wait=1e-5: max frequency allowed First=0: first iteration to start execution Start=False: start processing at object creation Statistics and execution hooks are provided:
Start_hook: launched after an start() command, may return args,kwargs for the target() method Loop_hook: like start_hook, but called at each iteration Stop_hook: called after an stop() call Wait_hook: called before each wait() - loop_hook(*args, **kwargs)[source]¶
redefine at convenience, it will return the arguments for target method
- set_queue(queue)[source]¶
A list of timestamps can be passed to the main loop to force a faster or slower processing. This list will override the current periodic execution, target() will be executed at each queued time instead.
WorkerThread¶
- class fandango.threads.WorkerThread(name='', process=False, wait=0.01, target=None, hook=None, trace=False)[source]¶
This class allows to schedule tasks in a background thread or process
If no process() method is overriden, the tasks introduced in the internal queue using put(Task) method may be:
- dictionary of built-in types: {‘__target__’:callable or method_name,’__args__’:[],’__class_’:’‘,’__module’:’‘,’__class_args__’:[]}
- string to eval: eval(‘import $MODULE’ or ‘$VAR=code()’ or ‘code()’)
- list if list[0] is callable: value = list[0](*list[1:])
- callable: value = callable()
It also allows to pass a hook method to be called for every main method execution.
- Usage::
- wt = fandango.threads.WorkerThread(process=True) wt.start() wt.put(‘import fandango’) wt.put(“tc = fandango.device.TangoCommand(‘lab/15/vgct-01/sendcommand’)”) command = “tc.execute(feedback=’status’,args=[‘ver
- n’])”
- wt.put(“tc.execute(feedback=’status’,args=[‘ver
- n’])”)
- while not wt.getDone():
- wt.stopEvent.wait(1.) pile = dict(wt.flush())
result = pile[command]
SingletonWorker¶
WorkerProcess¶
- class fandango.threads.WorkerProcess(target=None, start=True, timeout=0, timewait=0)[source]¶
Class that provides a multiprocessing interface to process tasks in a background process and throw callbacks when finished.
See full description and usage in doc/recipes/Threading.rst
AsynchronousFunction¶
- class fandango.threads.AsynchronousFunction(function)[source]¶
This class executes a given function in a separate thread When finished it sets True to self.finished, a threading.Event object Whether the function is thread-safe or not is something that must be managed in the caller side. If you want to autoexecute the method with arguments just call: t = AsynchronousFunction(lambda:your_function(args),start=True) while True:
- if not t.isAlive():
- if t.exception: raise t.exception result = t.result break
print ‘waiting ...’ threading.Event().wait(0.1)
print ‘result = ‘,result
Functions¶
SubprocessMethod¶
- fandango.threads.SubprocessMethod(obj, method, *args, **kwargs)[source]¶
Method for executing reader.get_attribute_values in background with a timeout (30 s by default) In fact, it allows to call any object method passed by name; or just pass a callable as object. This method could be embedded in a thread with very high timeout to trigger a callback when data is received. This advanced behavior is not implemented yet.
example: reader,att = PyTangoArchiving.Reader(),’just/some/nice/attribute’ dates = ‘2014-06-23 00:00’,‘2014-06-30 00:00’ values = fandango.threads.SubprocessMethod(reader,’get_attribute_values’,att,*dates,timeout=10.)
or
def callback(v): print(‘>> received %d values’%len(v)) fandango.threads.SubprocessMethod(reader,’get_attribute_values’,att,*dates,timeout=10.,callback=callback)
>> received 414131 values
timed_range¶
- fandango.threads.timed_range(seconds, period, event=None)[source]¶
Method used to execute the content of a for loop at periodic intervals. For X seconds, this method will return each period fragment. event can be used to pass a threading.Event to abort the loop if needed.
Usage:
- for t in trange(15,0.1):
- method_executed_at_10Hz_for_15s()
raw autodoc¶
- class fandango.threads.AsynchronousFunction(function)[source]
Bases: threading.Thread
This class executes a given function in a separate thread When finished it sets True to self.finished, a threading.Event object Whether the function is thread-safe or not is something that must be managed in the caller side. If you want to autoexecute the method with arguments just call: t = AsynchronousFunction(lambda:your_function(args),start=True) while True:
- if not t.isAlive():
- if t.exception: raise t.exception result = t.result break
print ‘waiting ...’ threading.Event().wait(0.1)
print ‘result = ‘,result
- class fandango.threads.CronTab(line='', task=None, start=False, process=False, keep=10, trace=False)[source]
Bases: object
Line Syntax: #Minutes Hour DayOfMonth(1-31) Month(1-12) DayOfWeek(0=Sunday-6) Task 00 */6 * * * /homelocal/sicilia/archiving/bin/cleanTdbFiles /tmp/archiving/tdb –no-prompt
ct = fandango.threads.CronTab(‘* 11 24 08 3 ./command &’) #command can be replaced by a callable task argument ct.match() #It will return True if actual time matches crontab condition, self.last_match stores last time check
True- ct.start()
In CronTab(* 11 24 08 3 date).start() CronTab thread started
In CronTab(* 11 24 08 3 date).do_task(<function <lambda> at 0x8cc4224>) CronTab(* 11 24 08 3 date).do_task() => 3
ct.stop()
- changed(now=None)[source]
Checks if actual timestamp differs from last cron check
- do_task(task=None, trace=False)[source]
Executes an string or callable
- load(line)[source]
Crontab line parsing
- match(now=None)[source]
Returns True if actual timestamp matches cron configuration
- exception fandango.threads.DataExpired[source]
Bases: exceptions.Exception
- class fandango.threads.FakeLock[source]
Bases: object
Just for debugging, can replace a Lock when debugging a deadLock issue.
- class fandango.threads.Pool(action=None, max_threads=5, start=False, mp=False)[source]
Bases: object
It creates a queue of tasks managed by a pool of threads. Each task can be a Callable or a Tuple containing args for the “action” class argument. If “action” is not defined the first element of the tuple can be a callable, and the rest will be arguments
- Usage:
p = Pool() for item in source(): p.add_task(item) p.start() while len(self.pending()):
time.sleep(1.)print ‘finished!’
- add_task(item)[source]
Adds a new task to the queue :param task: a callable or a tuple with callable and arguments
- pending()[source]
returns a list of strings with the actions not finished yet
- start()[source]
Start all threads.
- class fandango.threads.ProcessedData(name, target=None, args=None, period=None, expire=None, callback=None)[source]
Bases: object
This struct stores named data with value,date associated and period,expire time constraints
- class fandango.threads.SingletonWorker(name='', process=False, wait=0.01, target=None, hook=None, trace=False)[source]
Bases: fandango.threads.WorkerThread, fandango.objects.Singleton
- Usage::
- # ... same like WorkerThread, but command is required to get the result value command = “tc.execute(feedback=’status’,args=[‘ver
- n’])”
- sw.put(command) sw.get(command)
- get(target)[source]
It flushes the value stored for {target} task. The target argument is needed to avoid mixing up commands from different requestors.
- fandango.threads.SubprocessMethod(obj, method, *args, **kwargs)[source]
Method for executing reader.get_attribute_values in background with a timeout (30 s by default) In fact, it allows to call any object method passed by name; or just pass a callable as object. This method could be embedded in a thread with very high timeout to trigger a callback when data is received. This advanced behavior is not implemented yet.
example: reader,att = PyTangoArchiving.Reader(),’just/some/nice/attribute’ dates = ‘2014-06-23 00:00’,‘2014-06-30 00:00’ values = fandango.threads.SubprocessMethod(reader,’get_attribute_values’,att,*dates,timeout=10.)
or
def callback(v): print(‘>> received %d values’%len(v)) fandango.threads.SubprocessMethod(reader,’get_attribute_values’,att,*dates,timeout=10.,callback=callback)
>> received 414131 values
- class fandango.threads.ThreadedObject(target=None, period=1.0, nthreads=1, start=False, min_wait=1e-05, first=0)[source]
Bases: fandango.objects.Object
An Object with a thread pool that provides safe stop on exit.
It has a permanent thread running, that it’s just paused
Created to allow safe thread usage in Tango Device Servers
WARNING DO NOT CALL start()/stop() methods inside target or any hook, it may provoke unexpected behaviors.
Some arguments:
Target: function to be called Period=1: iteration frequency Nthreads=1: size of thread pool Min_wait=1e-5: max frequency allowed First=0: first iteration to start execution Start=False: start processing at object creation Statistics and execution hooks are provided:
Start_hook: launched after an start() command, may return args,kwargs for the target() method Loop_hook: like start_hook, but called at each iteration Stop_hook: called after an stop() call Wait_hook: called before each wait() - INSTANCES = []¶
- loop_hook(*args, **kwargs)[source]
redefine at convenience, it will return the arguments for target method
- set_queue(queue)[source]
A list of timestamps can be passed to the main loop to force a faster or slower processing. This list will override the current periodic execution, target() will be executed at each queued time instead.
- start_hook(*args, **kwargs)[source]
redefine at convenience, it will return the arguments for target method
- stop_hook(*args, **kwargs)[source]
redefine at convenience
- exception fandango.threads.WorkerException
Bases: exceptions.Exception
- class fandango.threads.WorkerProcess(target=None, start=True, timeout=0, timewait=0)[source]
Bases: fandango.objects.Object, fandango.objects.SingletonMap
Class that provides a multiprocessing interface to process tasks in a background process and throw callbacks when finished.
See full description and usage in doc/recipes/Threading.rst
- ALIVE_PERIOD = 15¶
- command(command, args=None)[source]
This method performs a synchronous command (no callback, no persistence), it doesn’t return until it is resolved
- static get_hash(d)[source]
This method just converts a dictionary into a hashable type
- send(key, target, args=None, callback=None)[source]
This method throws a new key,query,callback tuple to the process Pipe Queries may be: (key,) ; (key,args=None) ; (key,command,args=None)
- class fandango.threads.WorkerThread(name='', process=False, wait=0.01, target=None, hook=None, trace=False)[source]
Bases: object
This class allows to schedule tasks in a background thread or process
If no process() method is overriden, the tasks introduced in the internal queue using put(Task) method may be:
- dictionary of built-in types: {‘__target__’:callable or method_name,’__args__’:[],’__class_’:’‘,’__module’:’‘,’__class_args__’:[]}
- string to eval: eval(‘import $MODULE’ or ‘$VAR=code()’ or ‘code()’)
- list if list[0] is callable: value = list[0](*list[1:])
- callable: value = callable()
It also allows to pass a hook method to be called for every main method execution.
- Usage::
- wt = fandango.threads.WorkerThread(process=True) wt.start() wt.put(‘import fandango’) wt.put(“tc = fandango.device.TangoCommand(‘lab/15/vgct-01/sendcommand’)”) command = “tc.execute(feedback=’status’,args=[‘ver
- n’])”
- wt.put(“tc.execute(feedback=’status’,args=[‘ver
- n’])”)
- while not wt.getDone():
- wt.stopEvent.wait(1.) pile = dict(wt.flush())
result = pile[command]
- SINGLETON = None¶
- flush()[source]
Getting all elements stored in the output queue in [(command,result)] format
- get()[source]
Getting the oldest element in the output queue in (command,result) format
- process(target)[source]
This method can be overriden in child classes to perform actions distinct from evalX
- put(target)[source]
Inserting a new object in the Queue.
- fandango.threads.getPickable(value)[source]
- fandango.threads.timed_range(seconds, period, event=None)[source]
Method used to execute the content of a for loop at periodic intervals. For X seconds, this method will return each period fragment. event can be used to pass a threading.Event to abort the loop if needed.
Usage:
- for t in trange(15,0.1):
- method_executed_at_10Hz_for_15s()
- fandango.threads.wait(seconds, event=True, hook=None)[source]
Parameters: - seconds – seconds to wait for
- event – if True (default) it uses a dummy Event, if False it uses time.sleep, if Event is passed then it calls event.wait(seconds)