Threading Classes in Fandango

All these classes more or less execute tasks in background and store the date in a cache.

So, to choose one:

  • Periodical function, same arguments: ThreadedObject
  • Periodical function, different arguments and caches: ThreadDict
  • Different functions, callbacks instead of cache: Worker*
  • PyQt application: QWorker
  • Single shot tasks: Tasklets


threads.ThreadedObject

Developed for managing callbacks.EventThread queue within device servers. It provides a persistent pool of threads running always in background with methods for clean starting/pausing/killing and debugging.

Use target=callable or set_target(callable) to pass a method to be started periodically.

Properties

target:task (callable) to be executed periodically
period:seconds between executions

It can be set at initialization or using setters/getters.

Other Options at init()

nthreads=1:NOT IMPLEMENTED: Split tasks in multiple threads.
start=False:start to process tasks at startup (bg threads are always started)
min_wait=1e-5:The internal threads will do its best to accomplish periods even if it is not allowed

by the current period config and task duration. In this case, min_wait will become the minimal time between executions.

Behaviour

Once started, the threaded object will execute its task periodically.

start():starts processing tasks (threads already running)
stop():pauses the threads, but not really stops them
kill():effectively end all threads; called at __del__

Diagnostics

ThreadedObject will try to automatically compensate task execution time reducing the delay between periods; but it will not trigger exception if late. It just provides methods to inspect its performance externally.

get_count:
get_errors:
get_delay:
get_acc_delay:
get_usage:
get_next:
get_last:
is_alive():tells you if it has been killed;
get_started():whether if it is processing data

Class methods

This class keeps an INSTANCES list with all instantiated objects. It enables stop_all() / kill_all() methods to switch off all running threads.


Polling Threads (dict-like)

dicts.ThreadDict

The ThreadDict class allows to create a dictionary which values are generated by a custom method, which can be set in periodical polling by a background thread.

This is a very naive example of how it works:

import fandango

data = {}

def writer(k,v):
  t = fandango.time2str()
  print('%s: %s => %s'%(t,v,k))
  data[k]=v
  return str('%s:%s'%(t,v))

def reader(k):
  d = data.get(k,'Empty!')
  print('%s <= %s'%(d,k))
  return str('%s:%s'%(fandango.time2str(),d))

td = fandango.dicts.ThreadDict(read_method=reader,write_method=writer)
td.start()

td = fandango.dicts.ThreadDict(read_method=reader,write_method=writer)
td.start()

  In ThreadDict.start(), keys are: []
  In ThreadDict.run()
  ThreadDict started!

td['test']
  KeyError: 'test'

td.append('test',period=3.)

  Empty! <= test
  Empty! <= test
  Empty! <= test

['test'] = 10

  2016-11-21 15:48:37: 10 => test

  10 <= test
  10 <= test

td['test10']

  Out[24]: '2016-11-21 15:48:44:10'

threads.Worker``*``

Classes that provide threaded/multiprocess queues to execute tasks in the background and throw callbacks when finished.

WorkerThread

The queries are sent between sender/receiver thread and worker process using tuples. Queries may be: (key,) ; (key,target) ; (key,target,args):

  • key is just an identifier to internally store data results and callbacks
  • if target is a callable it will be thrown with args as argument (use [] if target is a void function)
  • if it isn’t, then executor(target) will be called
  • executor can be fandango.evalX or other object/method assigned using WorkerProcess.bind(class,args)

By default fandango.evalX is used to perform tasks, a different executor can be defined as WorkerProcess argument or calling:

CP = WorkerProcess(targetClass(initArgs))
CP.bind(targetClass,initArgs)

Sending tasks to the process:

CP.send(key='A1',target)
# Returns immediately and executes target() or executor(*target) in a background process
CP.send('A1',target,args,callback=callback)
# Returns immediately, executes x=target(args) in background and launches callback(x) when ready

When a (key,target,args) tuple is received the procedure is:

  • obtain the exec_ method (executor if args is None,
  • obtain arguments (target if args is None, if args is map/sequence it is pre-parsed):
  • if args is None and there’s a valid executor: return executor(target)

How the executable method is obtained:

  • if args is None it tries to get a valid executor and target will be args.
  • if target is string first it tries to get executor.target
  • if failed, then it evals target (that may return an executable)
  • if args is not none and target is not string, target is used as executable if callable

Return value: - if a valid executable method is found it returns exec_([/*]args) - if not, it returns what has been found instead (evalX(target), executor.target or target)

To use it like a threadDict, allowing a fixed list of keys to be permanently updated:

CP.add(key,target,args,period,expire,callback)
#This call will add a key to dictionary, which target(args) method will be executed every period, value obtained will expire after X seconds.
#Optional Callback will be executed every time value is updated.

Throwing commands in a sequential way (it will return when everything already in the queue is done):

CP.command('comm') # Execute comm() and returns result
CP.command('comm',args=(,)) # Execute comm(*args) and returns result

Two different dictionaries will keep track of process results:

  • data : will store named data with and update period associated
  • callbacks : will store associated callbacks to throw-1 calls

Pool

...

CronTab

...