1.2. gearman.worker — Gearman worker

class gearman.worker.GearmanWorker(host_list=None)[source]

GearmanWorker :: Interface to accept jobs from a Gearman server

1.2.1. Job processing

GearmanWorker.set_client_id(client_id)[source]

Notify the server that we should be identified as this client ID

GearmanWorker.register_task(task, callback_function)[source]

Register a function with this worker

def function_callback(calling_gearman_worker, current_job):
return current_job.data
GearmanWorker.unregister_task(task)[source]

Unregister a function with worker

GearmanWorker.work(poll_timeout=60.0)[source]

Loop indefinitely, complete tasks from all connections.

Setting up a basic worker that reverses a given byte-string:

gm_worker = gearman.GearmanWorker(['localhost:4730'])

# See gearman/job.py to see attributes on the GearmanJob
# Send back a reversed version of the 'data' string
def task_listener_reverse(gearman_worker, gearman_job):
    return reversed(gearman_job.data)

# gm_worker.set_client_id is optional
gm_worker.set_client_id('your_worker_client_id_name')
gm_worker.register_task('reverse', task_listener_reverse)

# Enter our work loop and call gm_worker.after_poll() after each time we timeout/see socket activity
gm_worker.work()

1.2.2. Sending in-flight job updates

GearmanWorker.send_job_data(current_job, data, poll_timeout=None)[source]

Send a Gearman JOB_DATA update for an inflight job

GearmanWorker.send_job_status(current_job, numerator, denominator, poll_timeout=None)[source]

Send a Gearman JOB_STATUS update for an inflight job

GearmanWorker.send_job_warning(current_job, data, poll_timeout=None)[source]

Send a Gearman JOB_WARNING update for an inflight job

Callback function sending back inflight job updates:

gm_worker = gearman.GearmanWorker(['localhost:4730'])

# See gearman/job.py to see attributes on the GearmanJob
# Send back a reversed version of the 'data' string through WORK_DATA instead of WORK_COMPLETE
def task_listener_reverse_inflight(gearman_worker, gearman_job):
    reversed_data = reversed(gearman_job.data)
    total_chars = len(reversed_data)

    for idx, character in enumerate(reversed_data):
        gearman_worker.send_job_data(gearman_job, str(character))
        gearman_worker.send_job_status(gearman_job, idx + 1, total_chars)

    return None

# gm_worker.set_client_id is optional
gm_worker.register_task('reverse', task_listener_reverse_inflight)

# Enter our work loop and call gm_worker.after_poll() after each time we timeout/see socket activity
gm_worker.work()

1.2.3. Extending the worker

GearmanWorker.data_encoder

Provide common object dumps for all communications over gearman

GearmanWorker.after_poll(any_activity)[source]

Polling callback to notify any outside listeners whats going on with the GearmanWorker.

Return True to continue polling, False to exit the work loop

Send/receive Python objects and do work between polls:

# By default, GearmanWorker's can only send off byte-strings
# If we want to be able to send out Python objects, we can specify a data encoder
# This will automatically convert byte strings <-> Python objects for ALL commands that have the 'data' field
#
# See http://gearman.org/index.php?id=protocol for Worker commands that send/receive 'opaque data'
#
import json # Or similarly styled library
class JSONDataEncoder(gearman.DataEncoder):
    @classmethod
    def encode(cls, encodable_object):
        return json.dumps(encodable_object)

    @classmethod
    def decode(cls, decodable_string):
        return json.loads(decodable_string)

class DBRollbackJSONWorker(gearman.GearmanWorker):
    data_encoder = JSONDataEncoder

    def after_poll(self, any_activity):
        # After every select loop, let's rollback our DB connections just to be safe
        continue_working = True
        self.db_connections.rollback()
        return continue_working