Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# Copyright (c) 2014, Facebook, Inc. All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. An additional grant # of patent rights can be found in the PATENTS file in the same directory. #
Tasks in sparts are a way to organize and delegate some sort of background or other synchronized processing. This module defines the most common features. """
"""The base class for all tasks. Needs to be subclassed to be useful.
Attributes: OPT_PREFIX - Overrides the prefix for any associated options LOOPLESS - True indicates this task should not spawn any threads DEPS - List of `VTask` subclasses that must be initialized first workers - Number of Threads that should execute the `_runloop`
"""
def name(self):
"""Task Constructor. requires a `service` VService instance
You should not need to override this. Override initTask isntead."""
"""Override this to do any task-specific initialization
Don't forget to call super(...).initTask(), or things may not run properly.""" else: threading.Thread(target=self._run, name=name))
"""Override thread-specific initialization for multi-threaded tasks"""
"""Called during bootstrap to spin up threads post-creation."""
"""Custom stopping logic for this task.
This is called by the main VService thread, after a graceful shutdown request has been received."""
"""Block, waiting for all child worker threads to finish."""
def running(self): """Returns True if task is still doing work.
This base implementation returns True if any child threads are alive""" for thread in self.threads: if thread.isAlive(): return True return False
# In general, you should not get here. So, we will shutdown the # server. It is better for your service to *completely* crash in # response to an unhandled error, than to continue on in some sort # of half-alive zombie state. Please catch your exceptions. # Consider throwing a TryLater if this task is a subclass of # QueueTask or PeriodicTask. # # I hate zombies. finally: threading.currentThread().name)
"""For normal (non-LOOPLESS) tasks, this MUST be implemented""" # TODO: May require some janky metaprogramming to make ABC enforce # this in a cleaner way. raise NotImplementedError()
def _loptName(cls, name):
def _optName(cls, name): name.replace('-', '_')]
self._optName(opt), default)
def register(cls): REGISTERED.register(cls)
"""Throw during initTask() to skip execution of this task.
Useful in case the task is missing configuration critical to its operation, but not critical to the overall program.
A good example might be a network-based logging task."""
"""Throw this in overridden tasks to defer execution.
Can be used to temporarily suspend and restart execution, which is useful for handling unexpected error conditions, or re-scheduling work."""
"""An abstraction used internally by various tasks to track work
Encapsulates common metrics for work that can be retried later, hooks for signalling completion, etc"""
"""Indicate that execution has started"""
"""Indicate that execution has completed"""
"""Indicate that execution has failed"""
def elapsed(self): """Convenience property. Returns timer duration."""
def _unhandledErrback(error, unhandled): """Fallback errback for deferred processing""" unhandled.append(error) return None
"""Collection class for dealing with service tasks.
Tasks can be accessed but accessing them (by name) as attributes, or via the get/require methods. """
self.register(t)
"""Register task_class with the collection""" # Recursively register dependencies
"""Register multiple `tasks` classes with the collection"""
"""Unregister `task_class` from the collection""" assert not self._did_create self._registered.remove(task_class) del(self._registered_names[task_class.__name__])
"""Create all registered tasks.
TODO: Handle SkipTask? """
"""Remove created `task` from the collection""" assert self._did_create self._created.remove(task) del(self._created_names[task.name])
"""Initialize all created tasks. Remove ones that throw SkipTask."""
# Keep track of SkipTasks so we can remove it from this # task collection self.logger.info("Skipping %s (%s)", t.name, e) skipped.append(t) # Log and track unhandled exceptions during init, so we can # fail later.
# Remove any tasks that should be skipped self.remove(t)
# Reraise a new exception, if any exceptions were thrown in init len(exceptions))
"""Start all the tasks, creating worker threads, etc"""
"""Returns the `task` or its class, if creation hasn't happened yet.""" else:
else: return self._registered_names.get(name)
"""Return the `task` instance or class, raising if not found.""" raise KeyError('%s not in tasks (%s|%s)' % (task, self.task_classes, self.tasks))
def task_classes(self): """Accessor for accessing a copy of registered task classes"""
def tasks(self): """Accessor for accessing a registered or instantiated task classes
Return value varies based on whether `create()` has been called.""" else:
"""Helper for accessing tasks using their name as an attribute."""
"""Iterates on created or registered tasks, as appropriate."""
"""Returns the number of created or registered tasks, as appropriate"""
"""Returns the created or registered task at the specified `index`"""
# This `Tasks` collection tracks globally registered tasks. |