Package pyworker :: Module pyworker :: Class Worker
[hide private]
[frames] | no frames]

Class Worker

source code

object --+
         |
        Worker
Known Subclasses:
httpworker.HTTPWorker, jsonworker.JSONWorker, PollingWorker

Instance Methods [hide private]
 
__init__(self, queue_stdin, queue_stdout=None, **kw)
Base class for all the workers.
source code
 
parse_json_msg(self, msg, encoding='UTF-8') source code
 
run(self) source code
 
starttask(self, msg)
Implements a basic 'echo' worker - pointless, but illustrative.
source code
 
endtask(self, msg, response)
Simple task end, ack'ing the message consuming it on a COMPLETE response.
source code

Inherited from object: __delattr__, __getattribute__, __hash__, __new__, __reduce__, __reduce_ex__, __repr__, __setattr__, __str__

Properties [hide private]

Inherited from object: __class__

Method Details [hide private]

__init__(self, queue_stdin, queue_stdout=None, **kw)
(Constructor)

source code 

Base class for all the workers. queue_stdin - the instance passed through queue_stdin should implement a *blocking* .get(), .task_done() and __len__(). queue_stdout - the instance passed should implement a blocking .put() and non-blocking __len__()

The built-in python module "Queue" can produce suitable queue objects, eg:
>>> from Queue import Queue
>>> import Thread
>>> q = Queue()
>>> for i in range(num_worker_threads):
...      t = Thread(target=Worker(q))
...      t.setDaemon(True)
...      t.start()
>>> for item in source():
...      q.put(item)
Queue's produced by the companion module, AMQPQueue, will also work well:
>>> from amqpqueue import QueueFactory
>>> import Thread
# Make a queue factory and point it at the local AMQP server
>>> qf = QueueFactory('localhost:5672','guest','guest')
>>> producer = qf.Producer('my_queue')
>>> for i in range(num_worker_threads):
...      t = Thread(target=Worker(qf.Consumer('my_queue')))
...      t.setDaemon(True)
...      t.start()
>>> for item in source():
...      producer.put(item)

Other keyword parameters can be passed to the workers as necessary, and these are accessible via a self.context dictionary, eg: >>> w = MyWorker(q_in, q_out, foo=bar) ... (In the worker instance:) >>> if self.context.get('foo'): >>> print self.context['foo'] /'bar'/

In actual use, the starttask and/or endtask method should be overridden to perform the tasks necessary.

Overwrite the .starttask(msg) method, which is passed the contents of the message from the queue. If this is the only method overridden, it is necessary to return a WorkerResponse object (or any object with a obj.status => 0 for FAIL or 1 for COMPLETE)

endtask(msg, response) can likewise be overridden to perform tasks
  • BUT this must acknoledge the msg via a .task_done() on the 'in' queue -> self.queue_stdin.
endtask is typically the method to override for simple atomic-style operations.
Overrides: object.__init__

starttask(self, msg)

source code 
Implements a basic 'echo' worker - pointless, but illustrative. This method should be overridden by a specific worker class.