Package mrv :: Module thread
[hide private]
[frames] | no frames]

Source Code for Module mrv.thread

  1  # -*- coding: utf-8 -*- 
  2  """Module with threading utilities""" 
  3  __docformat__ = "restructuredtext" 
  4  import threading 
  5  import inspect 
  6  import Queue 
  7   
  8  #{ Decorators 
  9   
10 -def do_terminate_threads(whitelist=list()):
11 """Simple function which terminates all of our threads 12 :param whitelist: If whitelist is given, only the given threads will be terminated""" 13 for t in threading.enumerate(): 14 if not isinstance(t, TerminatableThread): 15 continue 16 if whitelist and t not in whitelist: 17 continue 18 t.schedule_termination() 19 if isinstance(t, WorkerThread): 20 t.inq.put(t.quit) 21 # END worker special handling 22 t.stop_and_join()
23 # END for each thread 24
25 -def terminate_threads( func ):
26 """Kills all worker threads the method has created by sending the quit signal. 27 This takes over in case of an error in the main function""" 28 def wrapper(*args, **kwargs): 29 cur_threads = set(threading.enumerate()) 30 try: 31 return func(*args, **kwargs) 32 finally: 33 do_terminate_threads(set(threading.enumerate()) - cur_threads)
34 # END finally shutdown threads 35 # END wrapper 36 wrapper.__name__ = func.__name__ 37 return wrapper 38 39 #} END decorators 40 41 #{ Classes 42
43 -class TerminatableThread(threading.Thread):
44 """A simple thread able to terminate itself on behalf of the user. 45 46 Terminate a thread as follows: 47 48 t.stop_and_join() 49 50 Derived classes call _should_terminate() to determine whether they should 51 abort gracefully 52 """ 53 __slots__ = '_terminate' 54
55 - def __init__(self, *args, **kwargs):
56 super(TerminatableThread, self).__init__(*args, **kwargs) 57 self._terminate = False
58 59 60 #{ Subclass Interface
61 - def _should_terminate(self):
62 """:return: True if this thread should terminate its operation immediately""" 63 return self._terminate
64
65 - def _terminated(self):
66 """Called once the thread terminated. Its called in the main thread 67 and may perform cleanup operations 68 :note: in the current implementation, this method will only be called if 69 the thread was stopped by ``stop_and_join``. If you have very important 70 cleanup to do, you should do it before you exit your run method""" 71 pass
72
73 - def start(self):
74 """Start the thread and return self""" 75 super(TerminatableThread, self).start() 76 return self
77 78 #} END subclass interface 79 80 #{ Interface 81
82 - def schedule_termination(self):
83 """Schedule this thread to be terminated as soon as possible. 84 :note: this method does not block.""" 85 self._terminate = True
86
87 - def stop_and_join(self):
88 """Ask the thread to stop its operation and wait for it to terminate 89 :note: Depending on the implenetation, this might block a moment""" 90 self._terminate = True 91 self.join() 92 self._terminated()
93 #} END interface 94 95
96 -class WorkerThread(TerminatableThread):
97 """ 98 This base allows to call functions on class instances natively and retrieve 99 their results asynchronously using a queue. 100 The thread runs forever unless it receives the terminate signal using 101 its task queue. 102 103 Tasks could be anything, but should usually be class methods and arguments to 104 allow the following: 105 106 inq = Queue() 107 outq = Queue() 108 w = WorkerThread(inq, outq) 109 w.start() 110 inq.put((WorkerThread.<method>, args, kwargs)) 111 res = outq.get() 112 113 finally we call quit to terminate asap. 114 115 alternatively, you can make a call more intuitively - the output is the output queue 116 allowing you to get the result right away or later 117 w.call(arg, kwarg='value').get() 118 119 inq.put(WorkerThread.quit) 120 w.join() 121 122 You may provide the following tuples as task: 123 t[0] = class method, function or instance method 124 t[1] = optional, tuple or list of arguments to pass to the routine 125 t[2] = optional, dictionary of keyword arguments to pass to the routine 126 """ 127 __slots__ = ('inq', 'outq') 128
129 - class InvalidRoutineError(Exception):
130 """Class sent as return value in case of an error"""
131
132 - def __init__(self, inq = None, outq = None):
133 super(WorkerThread, self).__init__() 134 self.inq = inq or Queue.Queue() 135 self.outq = outq or Queue.Queue()
136
137 - def call(self, function, *args, **kwargs):
138 """Method that makes the call to the worker using the input queue, 139 returning our output queue 140 141 :param funciton: can be a standalone function unrelated to this class, 142 a class method of this class or any instance method. 143 If it is a string, it will be considered a function residing on this instance 144 :param args: arguments to pass to function 145 :parma **kwargs: kwargs to pass to function""" 146 self.inq.put((function, args, kwargs)) 147 return self.outq
148
149 - def wait_until_idle(self):
150 """wait until the input queue is empty, in the meanwhile, take all 151 results off the output queue.""" 152 while not self.inq.empty(): 153 try: 154 self.outq.get(False) 155 except Queue.Empty: 156 continue
157 # END while there are tasks on the queue 158
159 - def stop_and_join(self):
160 """Send the stop signal to terminate, then join""" 161 self._terminate = True 162 self.inq.put(self.quit) 163 self.join() 164 self._terminated()
165
166 - def run(self):
167 """Process input tasks until we receive the quit signal""" 168 while True: 169 if self._should_terminate(): 170 break 171 # END check for stop request 172 routine = self.__class__.quit 173 args = tuple() 174 kwargs = dict() 175 tasktuple = self.inq.get() 176 177 if isinstance(tasktuple, (tuple, list)): 178 if len(tasktuple) == 3: 179 routine, args, kwargs = tasktuple 180 elif len(tasktuple) == 2: 181 routine, args = tasktuple 182 elif len(tasktuple) == 1: 183 routine = tasktuple[0] 184 # END tasktuple length check 185 elif inspect.isroutine(tasktuple): 186 routine = tasktuple 187 # END tasktuple handling 188 189 try: 190 rval = None 191 if inspect.ismethod(routine): 192 if routine.im_self is None: 193 rval = routine(self, *args, **kwargs) 194 else: 195 rval = routine(*args, **kwargs) 196 elif inspect.isroutine(routine): 197 rval = routine(*args, **kwargs) 198 elif isinstance(routine, basestring) and hasattr(self, routine): 199 rval = getattr(self, routine)(*args, **kwargs) 200 else: 201 # ignore unknown items 202 print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) 203 self.outq.put(self.InvalidRoutineError(routine)) 204 break 205 # END make routine call 206 self.outq.put(rval) 207 except StopIteration: 208 break 209 except Exception,e: 210 print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e)) 211 self.outq.put(e)
212 # END routine exception handling 213 # END endless loop 214
215 - def quit(self):
216 raise StopIteration
217 218 219 #} END classes 220