Source code for gearman.worker

import logging
import random
import sys

from gearman.connection_manager import GearmanConnectionManager
from gearman.worker_handler import GearmanWorkerCommandHandler
from gearman.errors import ConnectionError

gearman_logger = logging.getLogger(__name__)

POLL_TIMEOUT_IN_SECONDS = 60.0

[docs]class GearmanWorker(GearmanConnectionManager): """ GearmanWorker :: Interface to accept jobs from a Gearman server """ command_handler_class = GearmanWorkerCommandHandler def __init__(self, host_list=None): super(GearmanWorker, self).__init__(host_list=host_list) self.randomized_connections = None self.worker_abilities = {} self.worker_client_id = None self.command_handler_holding_job_lock = None self._update_initial_state() def _update_initial_state(self): self.handler_initial_state['abilities'] = self.worker_abilities.keys() self.handler_initial_state['client_id'] = self.worker_client_id ######################################################## ##### Public methods for general GearmanWorker use ##### ########################################################
[docs] def register_task(self, task, callback_function): """Register a function with this worker def function_callback(calling_gearman_worker, current_job): return current_job.data """ self.worker_abilities[task] = callback_function self._update_initial_state() for command_handler in self.handler_to_connection_map.iterkeys(): command_handler.set_abilities(self.handler_initial_state['abilities']) return task
[docs] def unregister_task(self, task): """Unregister a function with worker""" self.worker_abilities.pop(task, None) self._update_initial_state() for command_handler in self.handler_to_connection_map.iterkeys(): command_handler.set_abilities(self.handler_initial_state['abilities']) return task
[docs] def set_client_id(self, client_id): """Notify the server that we should be identified as this client ID""" self.worker_client_id = client_id self._update_initial_state() for command_handler in self.handler_to_connection_map.iterkeys(): command_handler.set_client_id(self.handler_initial_state['client_id']) return client_id
[docs] def work(self, poll_timeout=POLL_TIMEOUT_IN_SECONDS): """Loop indefinitely, complete tasks from all connections.""" continue_working = True worker_connections = [] def continue_while_connections_alive(any_activity): return self.after_poll(any_activity) # Shuffle our connections after the poll timeout while continue_working: worker_connections = self.establish_worker_connections() continue_working = self.poll_connections_until_stopped(worker_connections, continue_while_connections_alive, timeout=poll_timeout) # If we were kicked out of the worker loop, we should shutdown all our connections for current_connection in worker_connections: current_connection.close()
def shutdown(self): self.command_handler_holding_job_lock = None super(GearmanWorker, self).shutdown() ############################################################### ## Methods to override when dealing with connection polling ## ############################################################## def establish_worker_connections(self): """Return a shuffled list of connections that are alive, and try to reconnect to dead connections if necessary.""" self.randomized_connections = list(self.connection_list) random.shuffle(self.randomized_connections) output_connections = [] for current_connection in self.randomized_connections: try: valid_connection = self.establish_connection(current_connection) output_connections.append(valid_connection) except ConnectionError: pass return output_connections
[docs] def after_poll(self, any_activity): """Polling callback to notify any outside listeners whats going on with the GearmanWorker. Return True to continue polling, False to exit the work loop""" return True
def handle_error(self, current_connection): """If we discover that a connection has a problem, we better release the job lock""" current_handler = self.connection_to_handler_map.get(current_connection) if current_handler: self.set_job_lock(current_handler, lock=False) super(GearmanWorker, self).handle_error(current_connection) ############################################################# ## Public methods so Gearman jobs can send Gearman updates ## ############################################################# def _get_handler_for_job(self, current_job): return self.connection_to_handler_map[current_job.connection]
[docs] def send_job_status(self, current_job, numerator, denominator): """Send a Gearman JOB_STATUS update for an inflight job""" current_handler = self._get_handler_for_job(current_job) current_handler.send_job_status(current_job, numerator=numerator, denominator=denominator)
def send_job_complete(self, current_job, data): current_handler = self._get_handler_for_job(current_job) current_handler.send_job_complete(current_job, data=data) def send_job_failure(self, current_job): """Removes a job from the queue if its backgrounded""" current_handler = self._get_handler_for_job(current_job) current_handler.send_job_failure(current_job) def send_job_exception(self, current_job, data): """Removes a job from the queue if its backgrounded""" # Using GEARMAND_COMMAND_WORK_EXCEPTION is not recommended at time of this writing [2010-02-24] # http://groups.google.com/group/gearman/browse_thread/thread/5c91acc31bd10688/529e586405ed37fe # current_handler = self._get_handler_for_job(current_job) current_handler.send_job_exception(current_job, data=data) current_handler.send_job_failure(current_job)
[docs] def send_job_data(self, current_job, data): """Send a Gearman JOB_DATA update for an inflight job""" current_handler = self._get_handler_for_job(current_job) current_handler.send_job_data(current_job, data=data)
[docs] def send_job_warning(self, current_job, data): """Send a Gearman JOB_WARNING update for an inflight job""" current_handler = self._get_handler_for_job(current_job) current_handler.send_job_warning(current_job, data=data) ##################################################### ##### Callback methods for GearmanWorkerHandler ##### #####################################################
def create_job(self, command_handler, job_handle, task, unique, data): """Create a new job using our self.job_class""" current_connection = self.handler_to_connection_map[command_handler] return self.job_class(current_connection, job_handle, task, unique, data) def on_job_execute(self, current_job): try: function_callback = self.worker_abilities[current_job.task] job_result = function_callback(self, current_job) except Exception: return self.on_job_exception(current_job, sys.exc_info()) return self.on_job_complete(current_job, job_result) def on_job_exception(self, current_job, exc_info): self.send_job_failure(current_job) return False def on_job_complete(self, current_job, job_result): self.send_job_complete(current_job, job_result) return True def set_job_lock(self, command_handler, lock): """Set a worker level job lock so we don't try to hold onto 2 jobs at anytime""" if command_handler not in self.handler_to_connection_map: return False failed_lock = bool(lock and self.command_handler_holding_job_lock is not None) failed_unlock = bool(not lock and self.command_handler_holding_job_lock != command_handler) # If we've already been locked, we should say the lock failed # If we're attempting to unlock something when we don't have a lock, we're in a bad state if failed_lock or failed_unlock: return False if lock: self.command_handler_holding_job_lock = command_handler else: self.command_handler_holding_job_lock = None return True def check_job_lock(self, command_handler): """Check to see if we hold the job lock""" return bool(self.command_handler_holding_job_lock == command_handler)