mrv.thread
Covered: 114 lines
Missed: 46 lines
Skipped 60 lines
Percent: 71 %
  2
"""Module with threading utilities"""
  3
__docformat__ = "restructuredtext"
  4
import threading
  5
import inspect
  6
import Queue
 10
def do_terminate_threads(whitelist=list()):
 11
	"""Simple function which terminates all of our threads
 12
	:param whitelist: If whitelist is given, only the given threads will be terminated"""
 13
	for t in threading.enumerate():
 14
		if not isinstance(t, TerminatableThread):
 15
			continue
 16
		if whitelist and t not in whitelist:
 17
			continue
 18
		t.schedule_termination()
 19
		if isinstance(t, WorkerThread):
 20
			t.inq.put(t.quit)
 22
		t.stop_and_join()
 25
def terminate_threads( func ):
 26
	"""Kills all worker threads the method has created by sending the quit signal.
 27
	This takes over in case of an error in the main function"""
 28
	def wrapper(*args, **kwargs):
 29
		cur_threads = set(threading.enumerate())
 30
		try:
 31
			return func(*args, **kwargs)
 32
		finally:
 33
			do_terminate_threads(set(threading.enumerate()) - cur_threads)
 36
	wrapper.__name__ = func.__name__
 37
	return wrapper
 43
class TerminatableThread(threading.Thread):
 44
	"""A simple thread able to terminate itself on behalf of the user.
 46
	Terminate a thread as follows:
 48
	t.stop_and_join()
 50
	Derived classes call _should_terminate() to determine whether they should 
 51
	abort gracefully
 52
	"""
 53
	__slots__ = '_terminate'
 55
	def __init__(self, *args, **kwargs):
 56
		super(TerminatableThread, self).__init__(*args, **kwargs)
 57
		self._terminate = False
 61
	def _should_terminate(self):
 62
		""":return: True if this thread should terminate its operation immediately"""
 63
		return self._terminate
 65
	def _terminated(self):
 66
		"""Called once the thread terminated. Its called in the main thread
 67
		and may perform cleanup operations
 68
		:note: in the current implementation, this method will only be called if 
 69
			the thread was stopped by ``stop_and_join``. If you have very important
 70
			cleanup to do, you should do it before you exit your run method"""
 71
		pass
 73
	def start(self):
 74
		"""Start the thread and return self"""
 75
		super(TerminatableThread, self).start()
 76
		return self
 82
	def schedule_termination(self):
 83
		"""Schedule this thread to be terminated as soon as possible.
 84
		:note: this method does not block."""
 85
		self._terminate = True
 87
	def stop_and_join(self):
 88
		"""Ask the thread to stop its operation and wait for it to terminate
 89
		:note: Depending on the implenetation, this might block a moment"""
 90
		self._terminate = True
 91
		self.join()
 92
		self._terminated()
 96
class WorkerThread(TerminatableThread):
 97
	"""
 98
	This base allows to call functions on class instances natively and retrieve
 99
	their results asynchronously using a queue.
100
	The thread runs forever unless it receives the terminate signal using 
101
	its task queue.
103
	Tasks could be anything, but should usually be class methods and arguments to
104
	allow the following:
106
	inq = Queue()
107
	outq = Queue()
108
	w = WorkerThread(inq, outq)
109
	w.start()
110
	inq.put((WorkerThread.<method>, args, kwargs))
111
	res = outq.get()
113
	finally we call quit to terminate asap.
115
	alternatively, you can make a call more intuitively - the output is the output queue
116
	allowing you to get the result right away or later
117
	w.call(arg, kwarg='value').get()
119
	inq.put(WorkerThread.quit)
120
	w.join()
122
	You may provide the following tuples as task:
123
	t[0] = class method, function or instance method
124
	t[1] = optional, tuple or list of arguments to pass to the routine
125
	t[2] = optional, dictionary of keyword arguments to pass to the routine
126
	"""
127
	__slots__ = ('inq', 'outq')
129
	class InvalidRoutineError(Exception):
130
		"""Class sent as return value in case of an error"""
132
	def __init__(self, inq = None, outq = None):
133
		super(WorkerThread, self).__init__()
134
		self.inq = inq or Queue.Queue()
135
		self.outq = outq or Queue.Queue()
137
	def call(self, function, *args, **kwargs):
138
		"""Method that makes the call to the worker using the input queue, 
139
		returning our output queue
141
		:param funciton: can be a standalone function unrelated to this class, 
142
			a class method of this class or any instance method.
143
			If it is a string, it will be considered a function residing on this instance
144
		:param args: arguments to pass to function
145
		:parma **kwargs: kwargs to pass to function"""
146
		self.inq.put((function, args, kwargs))
147
		return self.outq
149
	def wait_until_idle(self):
150
		"""wait until the input queue is empty, in the meanwhile, take all 
151
		results off the output queue."""
152
		while not self.inq.empty():
153
			try:
154
				self.outq.get(False)
155
			except Queue.Empty:
156
				continue
159
	def stop_and_join(self):
160
		"""Send the stop signal to terminate, then join"""
161
		self._terminate = True
162
		self.inq.put(self.quit)
163
		self.join()
164
		self._terminated()
166
	def run(self):
167
		"""Process input tasks until we receive the quit signal"""
168
		while True:
169
			if self._should_terminate():
170
				break
172
			routine = self.__class__.quit
173
			args = tuple()
174
			kwargs = dict()
175
			tasktuple = self.inq.get()
177
			if isinstance(tasktuple, (tuple, list)):
178
				if len(tasktuple) == 3:
179
					routine, args, kwargs = tasktuple
180
				elif len(tasktuple) == 2:
181
					routine, args = tasktuple
182
				elif len(tasktuple) == 1:
183
					routine = tasktuple[0]
185
			elif inspect.isroutine(tasktuple):
186
				routine = tasktuple
189
			try:
190
				rval = None
191
				if inspect.ismethod(routine):
192
					if routine.im_self is None:
193
						rval = routine(self, *args, **kwargs)
194
					else:
195
						rval = routine(*args, **kwargs)
196
				elif inspect.isroutine(routine):
197
					rval = routine(*args, **kwargs)
198
				elif isinstance(routine, basestring) and hasattr(self, routine):
199
					rval = getattr(self, routine)(*args, **kwargs)
200
				else:
202
					print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
203
					self.outq.put(self.InvalidRoutineError(routine))
204
					break
206
				self.outq.put(rval)
207
			except StopIteration:
208
				break
209
			except Exception,e:
210
				print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e))
211
				self.outq.put(e)
215
	def quit(self):
216
		raise StopIteration