Source code for servermgr.base

# -*- mode: python; tab-width:8; py-indent-offset:4; indent-tabs-mode:nil -*-

"""
Interface definition and base implementation for all subprocess managers.
"""

import socket
import time

#pylint: disable=R0921
#abstract class not implemented;

[docs]class ManagerInterface(object): """ This abstract class defines the interface implemented by all Managers. """
[docs] def health(self): """ Check the health of the worker subprocess. :raises: **WorkerError** if the worker subprocess is not responding. """ raise NotImplementedError
[docs] def start(self, wait=True, timeout=10.0): """ Start the worker subprocess. :param wait: If true, call self.ready_wait() after starting the subprocess. :param timeout: When calling ready_wait(), use this timeout value. :type timeout: float, number of seconds """ raise NotImplementedError
[docs] def ready_wait(self, timeout=10.0): """ Wait for the worker to be ready to handle requests. :param timeout: Give the server this many seconds to start. If the timeout expires before the server has started, kill the server and raise WorkerError. :type timeout: float, seconds :raises: **WorkerError** if the worker is not ready within the time limit. """ raise NotImplementedError
[docs] def stop(self, wait=True): """ Stop the worker subprocess. :param wait: If True, wait for the worker to exit. :type wait: boolean """ raise NotImplementedError
[docs] def wait(self): """Wait for the worker process to exit.""" raise NotImplementedError
[docs]class WorkerError(Exception): """Exception raised when a Worker fails to start or fails to respond. """ def __init__(self, exception, *args, **kwargs): super(WorkerError, self).__init__(*args, **kwargs) self.exception = exception def __str__(self): val = super(WorkerError, self).__str__() if self.exception == None: return val return val + " " + str(self.exception)
[docs]def address_in_use(host, port): """ Check to see if there is a listener on host:port. :param host: the interface to check; :type host: string :param port: the post to check; :type port: int :returns: **True** if there is a listener, else **False**. """ try: s__ = socket.create_connection((host, port)) s__.close() return True except socket.error: return False
[docs]def address_free_check(host, port): """ Raise an exception if host:port is in use. :param host: the interface to check; :type host: string :param port: the post to check; :type port: int :raises: **WorkerError** if the address is in use. """ if address_in_use(host, port): raise WorkerError(None, "address already in use", host, port)
class BaseProcess(object): """ Base class for class wrapper functionality used by SubprocessWrapper and MultiprocessingWrapper. """ def __init__(self, process_thing): self.process = process_thing def __getattr__(self, attrib): return getattr(self.process, attrib) class SubprocessWrapper(BaseProcess): """ This class is a wrapper around subprocess.Popen to unify the interface with multiprocssing.Process. """ def join(self, _unused_timeout=None): """Add join method to subprocess.Popen.""" self.process.wait() class MultiprocessingWrapper(BaseProcess): """ This class is a wrapper around multiprocessing.Process to unify the interface with subprocess.Popen. """ def wait(self): """Add wait method to multiprocessing.Process.""" return self.process.join(None) def poll(self): """Add poll method to multiprocessing.Process.""" if self.process.is_alive(): return None return True def wrap_process(process_thing): """Wrap the (Popen, Process) in the proper wrapper to provide uniform functionality.""" if hasattr(process_thing, "wait"): obj = SubprocessWrapper(process_thing) else: obj = MultiprocessingWrapper(process_thing) return obj #pylint: disable=R0922 #abstract class only referenced one time;
[docs]class Manager(object): """ :param name: Name of manager, displayed in error and log messages. :param kwargs: Any other keyword args will be stored in self.kwargs. This is the base implementation of the Manager class. Implementations are provided for the following methods: * ready_wait() * stop() * wait() The following methods must be implemented by the subclass: * health() * start() """ def __init__(self, name="<unnamed>", **kwargs): """Initialize the Manager object.""" self._process = None self.name = name self.kwargs = kwargs def __del__(self): self.stop() @property def process(self): "Return the process object." return self._process @process.setter
[docs] def process(self, process): "Wrap the (subprocess.Popen, multiprocessing.Process) with a proxy class." self._process = wrap_process(process) if process else None
[docs] def health(self): """Check the worker subprocess health. :raises: **NotImplementedError** must be implemented by subclass. """ raise NotImplementedError
[docs] def start(self, wait=True, timeout=10.0): """ Start the worker subprocess. :raises: **NotImplementedError** must be implemented by subclass. """ raise NotImplementedError
[docs] def ready_wait(self, timeout=10.0, verbose=False): """Wait until the worker subprocess is responding to requests. :param timeout: Give the server this many seconds to start. If the timeout expires before the server has started, kill the server and raise WorkerError. :type timeout: float, seconds :raises: **WorkerError** if the worker is not ready within the time limit. :raises: **WorkerError** if there is no subprocess. """ if not self.process: raise WorkerError(None, self.name + " No subprocess") start_time = time.time() time_delta = 0.5 status = self.process.poll() while status == None: # status == None => process is running; try: self.health() if verbose: print "server %s is ready." % self.name return except WorkerError: pass if time.time() - start_time > timeout: self.process.terminate() self.process.wait() self.process = None raise WorkerError(None, self.name + " Taking too long to start.") time.sleep(time_delta) status = self.process.poll() errors = None if hasattr(self.process, "stderr") and self.process.stderr: errors = self.process.stderr.read() self.process = None raise WorkerError(self.name, errors)
[docs] def stop(self, wait=True): """ Terminate the worker subprocess. :param wait: If True, wait for the worker to exit. :type wait: boolean """ if self.process: self.process.terminate() if wait: self.process.wait() self.process = None
[docs] def wait(self): """Wait for the worker process to exit.""" if self.process: self.process.wait() self.process = None