__init__(self,
queue_stdin,
queue_stdout=None,
**kw)
(Constructor)
| source code
|
Base class for all the workers. queue_stdin - the instance passed
through queue_stdin should implement a *blocking* .get(), .task_done()
and __len__(). queue_stdout - the instance passed should implement a
blocking .put() and non-blocking __len__()
The built-in python module "Queue" can produce suitable
queue objects, eg:
>>> from Queue import Queue
>>> import Thread
>>> q = Queue()
>>> for i in range(num_worker_threads):
... t = Thread(target=Worker(q))
... t.setDaemon(True)
... t.start()
>>> for item in source():
... q.put(item)
Queue's produced by the companion module, AMQPQueue, will also work
well:
>>> from amqpqueue import QueueFactory
>>> import Thread
# Make a queue factory and point it at the local AMQP server
>>> qf = QueueFactory('localhost:5672','guest','guest')
>>> producer = qf.Producer('my_queue')
>>> for i in range(num_worker_threads):
... t = Thread(target=Worker(qf.Consumer('my_queue')))
... t.setDaemon(True)
... t.start()
>>> for item in source():
... producer.put(item)
Other keyword parameters can be passed to the workers as necessary,
and these are accessible via a self.context dictionary, eg: >>>
w = MyWorker(q_in, q_out, foo=bar) ... (In the worker instance:)
>>> if self.context.get('foo'): >>> print
self.context['foo'] /'bar'/
In actual use, the starttask and/or endtask method should be
overridden to perform the tasks necessary.
Overwrite the .starttask(msg) method, which is passed the contents of
the message from the queue. If this is the only method overridden, it is
necessary to return a WorkerResponse object (or any object with a
obj.status => 0 for FAIL or 1 for COMPLETE)
endtask(msg, response) can likewise be overridden to perform tasks
-
BUT this must acknoledge the msg via a .task_done() on the 'in'
queue -> self.queue_stdin.
endtask is typically the method to override for simple atomic-style
operations.
- Overrides:
object.__init__
|