2
"""Module with threading utilities"""
3
__docformat__ = "restructuredtext"
10
def do_terminate_threads(whitelist=list()):
11
"""Simple function which terminates all of our threads
12
:param whitelist: If whitelist is given, only the given threads will be terminated"""
13
for t in threading.enumerate():
14
if not isinstance(t, TerminatableThread):
16
if whitelist and t not in whitelist:
18
t.schedule_termination()
19
if isinstance(t, WorkerThread):
21
# END worker special handling
25
def terminate_threads( func ):
26
"""Kills all worker threads the method has created by sending the quit signal.
27
This takes over in case of an error in the main function"""
28
def wrapper(*args, **kwargs):
29
cur_threads = set(threading.enumerate())
31
return func(*args, **kwargs)
33
do_terminate_threads(set(threading.enumerate()) - cur_threads)
34
# END finally shutdown threads
36
wrapper.__name__ = func.__name__
43
class TerminatableThread(threading.Thread):
44
"""A simple thread able to terminate itself on behalf of the user.
46
Terminate a thread as follows:
50
Derived classes call _should_terminate() to determine whether they should
53
__slots__ = '_terminate'
55
def __init__(self, *args, **kwargs):
56
super(TerminatableThread, self).__init__(*args, **kwargs)
57
self._terminate = False
61
def _should_terminate(self):
62
""":return: True if this thread should terminate its operation immediately"""
63
return self._terminate
65
def _terminated(self):
66
"""Called once the thread terminated. Its called in the main thread
67
and may perform cleanup operations
68
:note: in the current implementation, this method will only be called if
69
the thread was stopped by ``stop_and_join``. If you have very important
70
cleanup to do, you should do it before you exit your run method"""
74
"""Start the thread and return self"""
75
super(TerminatableThread, self).start()
78
#} END subclass interface
82
def schedule_termination(self):
83
"""Schedule this thread to be terminated as soon as possible.
84
:note: this method does not block."""
85
self._terminate = True
87
def stop_and_join(self):
88
"""Ask the thread to stop its operation and wait for it to terminate
89
:note: Depending on the implenetation, this might block a moment"""
90
self._terminate = True
96
class WorkerThread(TerminatableThread):
98
This base allows to call functions on class instances natively and retrieve
99
their results asynchronously using a queue.
100
The thread runs forever unless it receives the terminate signal using
103
Tasks could be anything, but should usually be class methods and arguments to
108
w = WorkerThread(inq, outq)
110
inq.put((WorkerThread.<method>, args, kwargs))
113
finally we call quit to terminate asap.
115
alternatively, you can make a call more intuitively - the output is the output queue
116
allowing you to get the result right away or later
117
w.call(arg, kwarg='value').get()
119
inq.put(WorkerThread.quit)
122
You may provide the following tuples as task:
123
t[0] = class method, function or instance method
124
t[1] = optional, tuple or list of arguments to pass to the routine
125
t[2] = optional, dictionary of keyword arguments to pass to the routine
127
__slots__ = ('inq', 'outq')
129
class InvalidRoutineError(Exception):
130
"""Class sent as return value in case of an error"""
132
def __init__(self, inq = None, outq = None):
133
super(WorkerThread, self).__init__()
134
self.inq = inq or Queue.Queue()
135
self.outq = outq or Queue.Queue()
137
def call(self, function, *args, **kwargs):
138
"""Method that makes the call to the worker using the input queue,
139
returning our output queue
141
:param funciton: can be a standalone function unrelated to this class,
142
a class method of this class or any instance method.
143
If it is a string, it will be considered a function residing on this instance
144
:param args: arguments to pass to function
145
:parma **kwargs: kwargs to pass to function"""
146
self.inq.put((function, args, kwargs))
149
def wait_until_idle(self):
150
"""wait until the input queue is empty, in the meanwhile, take all
151
results off the output queue."""
152
while not self.inq.empty():
157
# END while there are tasks on the queue
159
def stop_and_join(self):
160
"""Send the stop signal to terminate, then join"""
161
self._terminate = True
162
self.inq.put(self.quit)
167
"""Process input tasks until we receive the quit signal"""
169
if self._should_terminate():
171
# END check for stop request
172
routine = self.__class__.quit
175
tasktuple = self.inq.get()
177
if isinstance(tasktuple, (tuple, list)):
178
if len(tasktuple) == 3:
179
routine, args, kwargs = tasktuple
180
elif len(tasktuple) == 2:
181
routine, args = tasktuple
182
elif len(tasktuple) == 1:
183
routine = tasktuple[0]
184
# END tasktuple length check
185
elif inspect.isroutine(tasktuple):
187
# END tasktuple handling
191
if inspect.ismethod(routine):
192
if routine.im_self is None:
193
rval = routine(self, *args, **kwargs)
195
rval = routine(*args, **kwargs)
196
elif inspect.isroutine(routine):
197
rval = routine(*args, **kwargs)
198
elif isinstance(routine, basestring) and hasattr(self, routine):
199
rval = getattr(self, routine)(*args, **kwargs)
201
# ignore unknown items
202
print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
203
self.outq.put(self.InvalidRoutineError(routine))
205
# END make routine call
207
except StopIteration:
210
print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e))
212
# END routine exception handling