1
2 """Module with threading utilities"""
3 __docformat__ = "restructuredtext"
4 import threading
5 import inspect
6 import Queue
7
8
9
23
24
26 """Kills all worker threads the method has created by sending the quit signal.
27 This takes over in case of an error in the main function"""
28 def wrapper(*args, **kwargs):
29 cur_threads = set(threading.enumerate())
30 try:
31 return func(*args, **kwargs)
32 finally:
33 do_terminate_threads(set(threading.enumerate()) - cur_threads)
34
35
36 wrapper.__name__ = func.__name__
37 return wrapper
38
39
40
41
42
44 """A simple thread able to terminate itself on behalf of the user.
45
46 Terminate a thread as follows:
47
48 t.stop_and_join()
49
50 Derived classes call _should_terminate() to determine whether they should
51 abort gracefully
52 """
53 __slots__ = '_terminate'
54
58
59
60
62 """:return: True if this thread should terminate its operation immediately"""
63 return self._terminate
64
66 """Called once the thread terminated. Its called in the main thread
67 and may perform cleanup operations
68 :note: in the current implementation, this method will only be called if
69 the thread was stopped by ``stop_and_join``. If you have very important
70 cleanup to do, you should do it before you exit your run method"""
71 pass
72
77
78
79
80
81
83 """Schedule this thread to be terminated as soon as possible.
84 :note: this method does not block."""
85 self._terminate = True
86
88 """Ask the thread to stop its operation and wait for it to terminate
89 :note: Depending on the implenetation, this might block a moment"""
90 self._terminate = True
91 self.join()
92 self._terminated()
93
94
95
97 """
98 This base allows to call functions on class instances natively and retrieve
99 their results asynchronously using a queue.
100 The thread runs forever unless it receives the terminate signal using
101 its task queue.
102
103 Tasks could be anything, but should usually be class methods and arguments to
104 allow the following:
105
106 inq = Queue()
107 outq = Queue()
108 w = WorkerThread(inq, outq)
109 w.start()
110 inq.put((WorkerThread.<method>, args, kwargs))
111 res = outq.get()
112
113 finally we call quit to terminate asap.
114
115 alternatively, you can make a call more intuitively - the output is the output queue
116 allowing you to get the result right away or later
117 w.call(arg, kwarg='value').get()
118
119 inq.put(WorkerThread.quit)
120 w.join()
121
122 You may provide the following tuples as task:
123 t[0] = class method, function or instance method
124 t[1] = optional, tuple or list of arguments to pass to the routine
125 t[2] = optional, dictionary of keyword arguments to pass to the routine
126 """
127 __slots__ = ('inq', 'outq')
128
130 """Class sent as return value in case of an error"""
131
132 - def __init__(self, inq = None, outq = None):
136
137 - def call(self, function, *args, **kwargs):
138 """Method that makes the call to the worker using the input queue,
139 returning our output queue
140
141 :param funciton: can be a standalone function unrelated to this class,
142 a class method of this class or any instance method.
143 If it is a string, it will be considered a function residing on this instance
144 :param args: arguments to pass to function
145 :parma **kwargs: kwargs to pass to function"""
146 self.inq.put((function, args, kwargs))
147 return self.outq
148
150 """wait until the input queue is empty, in the meanwhile, take all
151 results off the output queue."""
152 while not self.inq.empty():
153 try:
154 self.outq.get(False)
155 except Queue.Empty:
156 continue
157
158
160 """Send the stop signal to terminate, then join"""
161 self._terminate = True
162 self.inq.put(self.quit)
163 self.join()
164 self._terminated()
165
167 """Process input tasks until we receive the quit signal"""
168 while True:
169 if self._should_terminate():
170 break
171
172 routine = self.__class__.quit
173 args = tuple()
174 kwargs = dict()
175 tasktuple = self.inq.get()
176
177 if isinstance(tasktuple, (tuple, list)):
178 if len(tasktuple) == 3:
179 routine, args, kwargs = tasktuple
180 elif len(tasktuple) == 2:
181 routine, args = tasktuple
182 elif len(tasktuple) == 1:
183 routine = tasktuple[0]
184
185 elif inspect.isroutine(tasktuple):
186 routine = tasktuple
187
188
189 try:
190 rval = None
191 if inspect.ismethod(routine):
192 if routine.im_self is None:
193 rval = routine(self, *args, **kwargs)
194 else:
195 rval = routine(*args, **kwargs)
196 elif inspect.isroutine(routine):
197 rval = routine(*args, **kwargs)
198 elif isinstance(routine, basestring) and hasattr(self, routine):
199 rval = getattr(self, routine)(*args, **kwargs)
200 else:
201
202 print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
203 self.outq.put(self.InvalidRoutineError(routine))
204 break
205
206 self.outq.put(rval)
207 except StopIteration:
208 break
209 except Exception,e:
210 print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e))
211 self.outq.put(e)
212
213
214
217
218
219
220