1
2
3
4
5 import simplejson
6
7 STATUSES = {'FAIL':0,
8 'COMPLETE':1,
9 }
10
11
12 FAIL = STATUSES['FAIL']
13 COMPLETE = STATUSES['COMPLETE']
14
17 self.status = status
18 self.context = kw
19
21 """JSON passed as a message over the queue couldn't be decoded."""
22 pass
23
26 self.response = response
28 if self.response:
29 resp_string = "Worker failed with a status: %s" % (STATUSES[self.response.status])
30 if self.response.context:
31 resp_string += "\n Context: %s" % self.response.context
32 else:
33 return "Worker failed"
34
36 - def __init__(self, queue_stdin, queue_stdout=None, **kw):
37 """Base class for all the workers.
38 queue_stdin - the instance passed through queue_stdin should implement a
39 *blocking* .get(), .task_done() and __len__().
40 queue_stdout - the instance passed should implement a blocking .put() and
41 non-blocking __len__()
42
43 The built-in python module "Queue" can produce suitable queue objects, eg:
44
45 >>> from Queue import Queue
46 >>> import Thread
47 >>> q = Queue()
48 >>> for i in range(num_worker_threads):
49 ... t = Thread(target=Worker(q))
50 ... t.setDaemon(True)
51 ... t.start()
52 >>> for item in source():
53 ... q.put(item)
54
55 Queue's produced by the companion module, AMQPQueue, will also work well:
56
57 >>> from amqpqueue import QueueFactory
58 >>> import Thread
59 # Make a queue factory and point it at the local AMQP server
60 >>> qf = QueueFactory('localhost:5672','guest','guest')
61 >>> producer = qf.Producer('my_queue')
62 >>> for i in range(num_worker_threads):
63 ... t = Thread(target=Worker(qf.Consumer('my_queue')))
64 ... t.setDaemon(True)
65 ... t.start()
66 >>> for item in source():
67 ... producer.put(item)
68
69 Other keyword parameters can be passed to the workers as necessary, and these are
70 accessible via a self.context dictionary, eg:
71 >>> w = MyWorker(q_in, q_out, foo=bar)
72 ...
73 (In the worker instance:)
74 >>> if self.context.get('foo'):
75 >>> print self.context['foo']
76 /'bar'/
77
78 In actual use, the starttask and/or endtask method should be overridden to perform
79 the tasks necessary.
80
81 Overwrite the .starttask(msg) method, which is passed the contents of the message from
82 the queue. If this is the only method overridden, it is necessary to return a
83 WorkerResponse object (or any object with a obj.status => 0 for FAIL or 1 for COMPLETE)
84
85 endtask(msg, response) can likewise be overridden to perform tasks
86 - BUT this must acknoledge the msg via a .task_done() on the 'in' queue
87 -> self.queue_stdin.
88
89 endtask is typically the method to override for simple atomic-style operations.
90 """
91 self.queue_stdin = queue_stdin
92 self.queue_stdout = queue_stdout
93 self.context = kw
94 self.stop = False
95 if 'start' in kw:
96 self.run()
97
99 try:
100 return simplejson.loads(msg,encoding=encoding)
101 except:
102 raise JsonMsgParseError
103
105 while (True):
106
107 if self.stop:
108 break
109 msg = self.queue_stdin.get()
110
111 resp = self.starttask(msg)
112 self.endtask(msg, resp)
113
115 """Implements a basic 'echo' worker - pointless, but illustrative.
116 This method should be overridden by a specific worker class."""
117 return WorkerResponse(COMPLETE)
118
120 """Simple task end, ack'ing the message consuming it on a COMPLETE response."""
121 if response.status == FAIL:
122 raise WorkerException(resp)
123 elif response.status == COMPLETE:
124 if self.queue_stdout:
125 self.queue_stdout.put(msg)
126 else:
127
128 pass
129 self.queue_stdin.task_done()
130
132 - def __init__(self, queue_stdin, queue_stdout=None, polling_wait=60, **kw):
133 """PollingWorker is built on the base Worker class.
134
135 The main difference is that it doesn't expect that the 'queue-in' object
136 will provide a get method that blocks until it recieves a message.
137
138 It will wait for 'polling_time' (default: 60 seconds) and then call the get
139 method. If the method returns a value that evaluates to False, then the worker
140 is put to sleep for the polling_time again to try again.
141
142 The wait is performed by "time.sleep(polling_wait)"
143
144 If a message of any sort is received, then the message is passed through the normal
145 pattern of starttask/endtask.
146 """
147 self.queue_stdin = queue_stdin
148 self.queue_stdout = queue_stdout
149
150 assert (isinstance(polling_wait, int) or isinstance(polling_wait, float)), "polling_wait must be either an integer or a float"
151 self.polling_wait = polling_wait
152 self.context = kw
153 self.stop = False
154 if 'start' in kw:
155 self.run()
156
158 while (True):
159
160 if self.stop:
161 break
162 msg = self.queue_stdin.get()
163 if msg:
164 resp = self.starttask(msg)
165 self.endtask(msg, resp)
166 sleep(self.polling_wait)
167