Package pyworker :: Module pyworker
[hide private]
[frames] | no frames]

Source Code for Module pyworker.pyworker

  1  #!/usr/bin/python 
  2  # -*- coding: utf-8 -*- 
  3   
  4  # Most workers are expected to use JSON msgs 
  5  import simplejson 
  6   
  7  STATUSES = {'FAIL':0, 
  8              'COMPLETE':1, 
  9              } 
 10   
 11  # shorthand 
 12  FAIL = STATUSES['FAIL'] 
 13  COMPLETE = STATUSES['COMPLETE'] 
 14   
15 -class WorkerResponse(object):
16 - def __init__(self, status, **kw):
17 self.status = status 18 self.context = kw
19
20 -class JsonMsgParseError(Exception):
21 """JSON passed as a message over the queue couldn't be decoded.""" 22 pass
23
24 -class WorkerException(Exception):
25 - def __init__(self, response=None):
26 self.response = response
27 - def __str__(self):
28 if self.response: 29 resp_string = "Worker failed with a status: %s" % (STATUSES[self.response.status]) 30 if self.response.context: 31 resp_string += "\n Context: %s" % self.response.context 32 else: 33 return "Worker failed"
34
35 -class Worker(object):
36 - def __init__(self, queue_stdin, queue_stdout=None, **kw):
37 """Base class for all the workers. 38 queue_stdin - the instance passed through queue_stdin should implement a 39 *blocking* .get(), .task_done() and __len__(). 40 queue_stdout - the instance passed should implement a blocking .put() and 41 non-blocking __len__() 42 43 The built-in python module "Queue" can produce suitable queue objects, eg: 44 45 >>> from Queue import Queue 46 >>> import Thread 47 >>> q = Queue() 48 >>> for i in range(num_worker_threads): 49 ... t = Thread(target=Worker(q)) 50 ... t.setDaemon(True) 51 ... t.start() 52 >>> for item in source(): 53 ... q.put(item) 54 55 Queue's produced by the companion module, AMQPQueue, will also work well: 56 57 >>> from amqpqueue import QueueFactory 58 >>> import Thread 59 # Make a queue factory and point it at the local AMQP server 60 >>> qf = QueueFactory('localhost:5672','guest','guest') 61 >>> producer = qf.Producer('my_queue') 62 >>> for i in range(num_worker_threads): 63 ... t = Thread(target=Worker(qf.Consumer('my_queue'))) 64 ... t.setDaemon(True) 65 ... t.start() 66 >>> for item in source(): 67 ... producer.put(item) 68 69 Other keyword parameters can be passed to the workers as necessary, and these are 70 accessible via a self.context dictionary, eg: 71 >>> w = MyWorker(q_in, q_out, foo=bar) 72 ... 73 (In the worker instance:) 74 >>> if self.context.get('foo'): 75 >>> print self.context['foo'] 76 /'bar'/ 77 78 In actual use, the starttask and/or endtask method should be overridden to perform 79 the tasks necessary. 80 81 Overwrite the .starttask(msg) method, which is passed the contents of the message from 82 the queue. If this is the only method overridden, it is necessary to return a 83 WorkerResponse object (or any object with a obj.status => 0 for FAIL or 1 for COMPLETE) 84 85 endtask(msg, response) can likewise be overridden to perform tasks 86 - BUT this must acknoledge the msg via a .task_done() on the 'in' queue 87 -> self.queue_stdin. 88 89 endtask is typically the method to override for simple atomic-style operations. 90 """ 91 self.queue_stdin = queue_stdin 92 self.queue_stdout = queue_stdout 93 self.context = kw 94 self.stop = False 95 if 'start' in kw: 96 self.run()
97
98 - def parse_json_msg(self, msg, encoding="UTF-8"):
99 try: 100 return simplejson.loads(msg,encoding=encoding) 101 except: 102 raise JsonMsgParseError
103
104 - def run(self):
105 while (True): 106 # Blocking call: 107 if self.stop: 108 break 109 msg = self.queue_stdin.get() 110 # TODO implement variable timeout on .starttask() method 111 resp = self.starttask(msg) 112 self.endtask(msg, resp)
113
114 - def starttask(self, msg):
115 """Implements a basic 'echo' worker - pointless, but illustrative. 116 This method should be overridden by a specific worker class.""" 117 return WorkerResponse(COMPLETE)
118
119 - def endtask(self, msg, response):
120 """Simple task end, ack'ing the message consuming it on a COMPLETE response.""" 121 if response.status == FAIL: 122 raise WorkerException(resp) 123 elif response.status == COMPLETE: 124 if self.queue_stdout: 125 self.queue_stdout.put(msg) 126 else: 127 # eg print msg 128 pass 129 self.queue_stdin.task_done()
130
131 -class PollingWorker(Worker):
132 - def __init__(self, queue_stdin, queue_stdout=None, polling_wait=60, **kw):
133 """PollingWorker is built on the base Worker class. 134 135 The main difference is that it doesn't expect that the 'queue-in' object 136 will provide a get method that blocks until it recieves a message. 137 138 It will wait for 'polling_time' (default: 60 seconds) and then call the get 139 method. If the method returns a value that evaluates to False, then the worker 140 is put to sleep for the polling_time again to try again. 141 142 The wait is performed by "time.sleep(polling_wait)" 143 144 If a message of any sort is received, then the message is passed through the normal 145 pattern of starttask/endtask. 146 """ 147 self.queue_stdin = queue_stdin 148 self.queue_stdout = queue_stdout 149 # Make sure that the time is in seconds: 150 assert (isinstance(polling_wait, int) or isinstance(polling_wait, float)), "polling_wait must be either an integer or a float" 151 self.polling_wait = polling_wait 152 self.context = kw 153 self.stop = False 154 if 'start' in kw: 155 self.run()
156
157 - def run(self):
158 while (True): 159 # Non-blocking call: 160 if self.stop: 161 break 162 msg = self.queue_stdin.get() 163 if msg: 164 resp = self.starttask(msg) 165 self.endtask(msg, resp) 166 sleep(self.polling_wait)
167