fandango.threads module

Description

## file : threads.py ## ## description : see below ## ## project : Tango Control System ## ## $Author: Sergi Rubio Manrique, srubio@cells.es $ ## ## $Revision: 2011 $ ## ## copyleft : ALBA Synchrotron Controls Section, CELLS ## Bellaterra ## Spain ## ############################################################################# ## ## This file is part of Tango Control System ## ## Tango Control System is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as published ## by the Free Software Foundation; either version 3 of the License, or ## (at your option) any later version. ## ## Tango Control System is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; if not, see <http://www.gnu.org/licenses/>. ###########################################################################

by Sergi Rubio, srubio@cells.es, 2010

Classes

Pool

class fandango.threads.Pool(action=None, max_threads=5, start=False, mp=False)[source]

It creates a queue of tasks managed by a pool of threads. Each task can be a Callable or a Tuple containing args for the “action” class argument. If “action” is not defined the first element of the tuple can be a callable, and the rest will be arguments

Usage:

p = Pool() for item in source(): p.add_task(item) p.start() while len(self.pending()):

time.sleep(1.)

print ‘finished!’

add_task(item)[source]

Adds a new task to the queue :param task: a callable or a tuple with callable and arguments

pending()[source]

returns a list of strings with the actions not finished yet

start()[source]

Start all threads.

CronTab

class fandango.threads.CronTab(line='', task=None, start=False, process=False, keep=10, trace=False)[source]

Line Syntax: #Minutes Hour DayOfMonth(1-31) Month(1-12) DayOfWeek(0=Sunday-6) Task 00 */6 * * * /homelocal/sicilia/archiving/bin/cleanTdbFiles /tmp/archiving/tdb –no-prompt

ct = fandango.threads.CronTab(‘* 11 24 08 3 ./command &’) #command can be replaced by a callable task argument ct.match() #It will return True if actual time matches crontab condition, self.last_match stores last time check

True
ct.start()

In CronTab(* 11 24 08 3 date).start() CronTab thread started

In CronTab(* 11 24 08 3 date).do_task(<function <lambda> at 0x8cc4224>) CronTab(* 11 24 08 3 date).do_task() => 3

ct.stop()

changed(now=None)[source]

Checks if actual timestamp differs from last cron check

do_task(task=None, trace=False)[source]

Executes an string or callable

load(line)[source]

Crontab line parsing

match(now=None)[source]

Returns True if actual timestamp matches cron configuration

WorkerException

class fandango.threads.WorkerException

FakeLock

class fandango.threads.FakeLock[source]

Just for debugging, can replace a Lock when debugging a deadLock issue.

ThreadedObject

class fandango.threads.ThreadedObject(target=None, period=1.0, nthreads=1, start=False, min_wait=1e-05, first=0)[source]

An Object with a thread pool that provides safe stop on exit.

It has a permanent thread running, that it’s just paused

Created to allow safe thread usage in Tango Device Servers

WARNING DO NOT CALL start()/stop() methods inside target or any hook, it may provoke unexpected behaviors.

Some arguments:

Target:function to be called
Period=1:iteration frequency
Nthreads=1:size of thread pool
Min_wait=1e-5:max frequency allowed
First=0:first iteration to start execution
Start=False:start processing at object creation

Statistics and execution hooks are provided:

Start_hook:launched after an start() command, may return args,kwargs for the target() method
Loop_hook:like start_hook, but called at each iteration
Stop_hook:called after an stop() call
Wait_hook:called before each wait()
loop_hook(*args, **kwargs)[source]

redefine at convenience, it will return the arguments for target method

set_queue(queue)[source]

A list of timestamps can be passed to the main loop to force a faster or slower processing. This list will override the current periodic execution, target() will be executed at each queued time instead.

start_hook(*args, **kwargs)[source]

redefine at convenience, it will return the arguments for target method

stop_hook(*args, **kwargs)[source]

redefine at convenience

ProcessedData

class fandango.threads.ProcessedData(name, target=None, args=None, period=None, expire=None, callback=None)[source]

This struct stores named data with value,date associated and period,expire time constraints

WorkerThread

class fandango.threads.WorkerThread(name='', process=False, wait=0.01, target=None, hook=None, trace=False)[source]

This class allows to schedule tasks in a background thread or process

If no process() method is overriden, the tasks introduced in the internal queue using put(Task) method may be:

  • dictionary of built-in types: {‘__target__’:callable or method_name,’__args__’:[],’__class_’:’‘,’__module’:’‘,’__class_args__’:[]}
  • string to eval: eval(‘import $MODULE’ or ‘$VAR=code()’ or ‘code()’)
  • list if list[0] is callable: value = list[0](*list[1:])
  • callable: value = callable()

It also allows to pass a hook method to be called for every main method execution.

Usage::
wt = fandango.threads.WorkerThread(process=True) wt.start() wt.put(‘import fandango’) wt.put(“tc = fandango.device.TangoCommand(‘lab/15/vgct-01/sendcommand’)”) command = “tc.execute(feedback=’status’,args=[‘ver
n’])”
wt.put(“tc.execute(feedback=’status’,args=[‘ver
n’])”)
while not wt.getDone():
wt.stopEvent.wait(1.) pile = dict(wt.flush())

result = pile[command]

flush()[source]

Getting all elements stored in the output queue in [(command,result)] format

get()[source]

Getting the oldest element in the output queue in (command,result) format

process(target)[source]

This method can be overriden in child classes to perform actions distinct from evalX

put(target)[source]

Inserting a new object in the Queue.

SingletonWorker

class fandango.threads.SingletonWorker(name='', process=False, wait=0.01, target=None, hook=None, trace=False)[source]
Usage::
# ... same like WorkerThread, but command is required to get the result value command = “tc.execute(feedback=’status’,args=[‘ver
n’])”
sw.put(command) sw.get(command)
get(target)[source]

It flushes the value stored for {target} task. The target argument is needed to avoid mixing up commands from different requestors.

WorkerProcess

class fandango.threads.WorkerProcess(target=None, start=True, timeout=0, timewait=0)[source]

Class that provides a multiprocessing interface to process tasks in a background process and throw callbacks when finished.

See full description and usage in doc/recipes/Threading.rst

command(command, args=None)[source]

This method performs a synchronous command (no callback, no persistence), it doesn’t return until it is resolved

static get_hash(d)[source]

This method just converts a dictionary into a hashable type

send(key, target, args=None, callback=None)[source]

This method throws a new key,query,callback tuple to the process Pipe Queries may be: (key,) ; (key,args=None) ; (key,command,args=None)

AsynchronousFunction

class fandango.threads.AsynchronousFunction(function)[source]

This class executes a given function in a separate thread When finished it sets True to self.finished, a threading.Event object Whether the function is thread-safe or not is something that must be managed in the caller side. If you want to autoexecute the method with arguments just call: t = AsynchronousFunction(lambda:your_function(args),start=True) while True:

if not t.isAlive():
if t.exception: raise t.exception result = t.result break

print ‘waiting ...’ threading.Event().wait(0.1)

print ‘result = ‘,result

DataExpired

class fandango.threads.DataExpired[source]

Functions

SubprocessMethod

fandango.threads.SubprocessMethod(obj, method, *args, **kwargs)[source]

Method for executing reader.get_attribute_values in background with a timeout (30 s by default) In fact, it allows to call any object method passed by name; or just pass a callable as object. This method could be embedded in a thread with very high timeout to trigger a callback when data is received. This advanced behavior is not implemented yet.

example: reader,att = PyTangoArchiving.Reader(),’just/some/nice/attribute’ dates = ‘2014-06-23 00:00’,‘2014-06-30 00:00’ values = fandango.threads.SubprocessMethod(reader,’get_attribute_values’,att,*dates,timeout=10.)

or

def callback(v): print(‘>> received %d values’%len(v)) fandango.threads.SubprocessMethod(reader,’get_attribute_values’,att,*dates,timeout=10.,callback=callback)

>> received 414131 values

getPickable

fandango.threads.getPickable(value)[source]

wait

fandango.threads.wait(seconds, event=True, hook=None)[source]
Parameters:
  • seconds – seconds to wait for
  • event – if True (default) it uses a dummy Event, if False it uses time.sleep, if Event is passed then it calls event.wait(seconds)

timed_range

fandango.threads.timed_range(seconds, period, event=None)[source]

Method used to execute the content of a for loop at periodic intervals. For X seconds, this method will return each period fragment. event can be used to pass a threading.Event to abort the loop if needed.

Usage:

for t in trange(15,0.1):
method_executed_at_10Hz_for_15s()

raw autodoc

class fandango.threads.AsynchronousFunction(function)[source]

Bases: threading.Thread

This class executes a given function in a separate thread When finished it sets True to self.finished, a threading.Event object Whether the function is thread-safe or not is something that must be managed in the caller side. If you want to autoexecute the method with arguments just call: t = AsynchronousFunction(lambda:your_function(args),start=True) while True:

if not t.isAlive():
if t.exception: raise t.exception result = t.result break

print ‘waiting ...’ threading.Event().wait(0.1)

print ‘result = ‘,result

run()[source]
class fandango.threads.CronTab(line='', task=None, start=False, process=False, keep=10, trace=False)[source]

Bases: object

Line Syntax: #Minutes Hour DayOfMonth(1-31) Month(1-12) DayOfWeek(0=Sunday-6) Task 00 */6 * * * /homelocal/sicilia/archiving/bin/cleanTdbFiles /tmp/archiving/tdb –no-prompt

ct = fandango.threads.CronTab(‘* 11 24 08 3 ./command &’) #command can be replaced by a callable task argument ct.match() #It will return True if actual time matches crontab condition, self.last_match stores last time check

True
ct.start()

In CronTab(* 11 24 08 3 date).start() CronTab thread started

In CronTab(* 11 24 08 3 date).do_task(<function <lambda> at 0x8cc4224>) CronTab(* 11 24 08 3 date).do_task() => 3

ct.stop()

changed(now=None)[source]

Checks if actual timestamp differs from last cron check

do_task(task=None, trace=False)[source]

Executes an string or callable

get()[source]
is_alive()[source]
load(line)[source]

Crontab line parsing

match(now=None)[source]

Returns True if actual timestamp matches cron configuration

start()[source]
stop()[source]
exception fandango.threads.DataExpired[source]

Bases: exceptions.Exception

class fandango.threads.FakeLock[source]

Bases: object

Just for debugging, can replace a Lock when debugging a deadLock issue.

acquire()[source]
release()[source]
class fandango.threads.Pool(action=None, max_threads=5, start=False, mp=False)[source]

Bases: object

It creates a queue of tasks managed by a pool of threads. Each task can be a Callable or a Tuple containing args for the “action” class argument. If “action” is not defined the first element of the tuple can be a callable, and the rest will be arguments

Usage:

p = Pool() for item in source(): p.add_task(item) p.start() while len(self.pending()):

time.sleep(1.)

print ‘finished!’

add_task(item)[source]

Adds a new task to the queue :param task: a callable or a tuple with callable and arguments

pending()[source]

returns a list of strings with the actions not finished yet

start()[source]

Start all threads.

stop()[source]
class fandango.threads.ProcessedData(name, target=None, args=None, period=None, expire=None, callback=None)[source]

Bases: object

This struct stores named data with value,date associated and period,expire time constraints

get()[source]
get_args()[source]
set(value)[source]
class fandango.threads.SingletonWorker(name='', process=False, wait=0.01, target=None, hook=None, trace=False)[source]

Bases: fandango.threads.WorkerThread, fandango.objects.Singleton

Usage::
# ... same like WorkerThread, but command is required to get the result value command = “tc.execute(feedback=’status’,args=[‘ver
n’])”
sw.put(command) sw.get(command)
done()[source]
flush()[source]
get(target)[source]

It flushes the value stored for {target} task. The target argument is needed to avoid mixing up commands from different requestors.

put(target)[source]
values()[source]
fandango.threads.SubprocessMethod(obj, method, *args, **kwargs)[source]

Method for executing reader.get_attribute_values in background with a timeout (30 s by default) In fact, it allows to call any object method passed by name; or just pass a callable as object. This method could be embedded in a thread with very high timeout to trigger a callback when data is received. This advanced behavior is not implemented yet.

example: reader,att = PyTangoArchiving.Reader(),’just/some/nice/attribute’ dates = ‘2014-06-23 00:00’,‘2014-06-30 00:00’ values = fandango.threads.SubprocessMethod(reader,’get_attribute_values’,att,*dates,timeout=10.)

or

def callback(v): print(‘>> received %d values’%len(v)) fandango.threads.SubprocessMethod(reader,’get_attribute_values’,att,*dates,timeout=10.,callback=callback)

>> received 414131 values

class fandango.threads.ThreadedObject(target=None, period=1.0, nthreads=1, start=False, min_wait=1e-05, first=0)[source]

Bases: fandango.objects.Object

An Object with a thread pool that provides safe stop on exit.

It has a permanent thread running, that it’s just paused

Created to allow safe thread usage in Tango Device Servers

WARNING DO NOT CALL start()/stop() methods inside target or any hook, it may provoke unexpected behaviors.

Some arguments:

Target:function to be called
Period=1:iteration frequency
Nthreads=1:size of thread pool
Min_wait=1e-5:max frequency allowed
First=0:first iteration to start execution
Start=False:start processing at object creation

Statistics and execution hooks are provided:

Start_hook:launched after an start() command, may return args,kwargs for the target() method
Loop_hook:like start_hook, but called at each iteration
Stop_hook:called after an stop() call
Wait_hook:called before each wait()
INSTANCES = []
clean_stats()[source]
get_acc_delay()[source]
get_avg_delay()[source]
get_count()[source]
get_delay()[source]
get_errors()[source]
get_last()[source]
get_next()[source]
get_nthreads()[source]
get_period()[source]
get_started()[source]
get_target()[source]
get_thread(i=0)[source]
get_usage()[source]
is_alive(i=0)[source]
kill(wait=3.0)[source]
static kill_all()[source]
loop()[source]
loop_hook(*args, **kwargs)[source]

redefine at convenience, it will return the arguments for target method

print_exc(e='', msg='')[source]
set_loop_hook(target)[source]
set_period(period)[source]
set_queue(queue)[source]

A list of timestamps can be passed to the main loop to force a faster or slower processing. This list will override the current periodic execution, target() will be executed at each queued time instead.

set_start_hook(target)[source]
set_stop_hook(target)[source]
set_target(target)[source]
set_wait_hook(target)[source]
start()[source]
start_hook(*args, **kwargs)[source]

redefine at convenience, it will return the arguments for target method

stop(wait=3.0)[source]
static stop_all()[source]
stop_hook(*args, **kwargs)[source]

redefine at convenience

exception fandango.threads.WorkerException

Bases: exceptions.Exception

class fandango.threads.WorkerProcess(target=None, start=True, timeout=0, timewait=0)[source]

Bases: fandango.objects.Object, fandango.objects.SingletonMap

Class that provides a multiprocessing interface to process tasks in a background process and throw callbacks when finished.

See full description and usage in doc/recipes/Threading.rst

ALIVE_PERIOD = 15
add(key, target=None, args=None, period=0, expire=0, callback=None)[source]
bind(target, args=None)[source]
command(command, args=None)[source]

This method performs a synchronous command (no callback, no persistence), it doesn’t return until it is resolved

get(key, default='__NULL', _raise=False)[source]
static get_callable(key, executor=None)[source]
static get_hash(d)[source]

This method just converts a dictionary into a hashable type

isAlive()[source]
keys()[source]
pause(timeout)[source]
pop(key)[source]
send(key, target, args=None, callback=None)[source]

This method throws a new key,query,callback tuple to the process Pipe Queries may be: (key,) ; (key,args=None) ; (key,command,args=None)

start()[source]
stop()[source]
trace(msg, level=0)[source]
class fandango.threads.WorkerThread(name='', process=False, wait=0.01, target=None, hook=None, trace=False)[source]

Bases: object

This class allows to schedule tasks in a background thread or process

If no process() method is overriden, the tasks introduced in the internal queue using put(Task) method may be:

  • dictionary of built-in types: {‘__target__’:callable or method_name,’__args__’:[],’__class_’:’‘,’__module’:’‘,’__class_args__’:[]}
  • string to eval: eval(‘import $MODULE’ or ‘$VAR=code()’ or ‘code()’)
  • list if list[0] is callable: value = list[0](*list[1:])
  • callable: value = callable()

It also allows to pass a hook method to be called for every main method execution.

Usage::
wt = fandango.threads.WorkerThread(process=True) wt.start() wt.put(‘import fandango’) wt.put(“tc = fandango.device.TangoCommand(‘lab/15/vgct-01/sendcommand’)”) command = “tc.execute(feedback=’status’,args=[‘ver
n’])”
wt.put(“tc.execute(feedback=’status’,args=[‘ver
n’])”)
while not wt.getDone():
wt.stopEvent.wait(1.) pile = dict(wt.flush())

result = pile[command]

SINGLETON = None
flush()[source]

Getting all elements stored in the output queue in [(command,result)] format

get()[source]

Getting the oldest element in the output queue in (command,result) format

getDone()[source]
getQueue()[source]
getSize()[source]
isAlive()[source]
locals(key=None)[source]
process(target)[source]

This method can be overriden in child classes to perform actions distinct from evalX

put(target)[source]

Inserting a new object in the Queue.

run()[source]
start()[source]
stop()[source]
fandango.threads.getPickable(value)[source]
fandango.threads.timed_range(seconds, period, event=None)[source]

Method used to execute the content of a for loop at periodic intervals. For X seconds, this method will return each period fragment. event can be used to pass a threading.Event to abort the loop if needed.

Usage:

for t in trange(15,0.1):
method_executed_at_10Hz_for_15s()
fandango.threads.wait(seconds, event=True, hook=None)[source]
Parameters:
  • seconds – seconds to wait for
  • event – if True (default) it uses a dummy Event, if False it uses time.sleep, if Event is passed then it calls event.wait(seconds)