Documentation for pulsar 0.5.0. For development docs, go here.

Source code for pulsar.apps.tasks

'''\
An asynchronous task-queue built on top :class:`pulsar.Application` framework.
By creating :class:`Job` classes in a similar way you can do for celery_,
this application gives you all you need for running them with very
little setup effort::

    from pulsar.apps import tasks

    tq = tasks.TaskQueue(tasks_path='path.to.tasks.*')
    tq.start()

.. _tasks-actions:

Tutorial
==============

Actions
~~~~~~~~~~~~~~~

The :class:`Taskqueue` application adds the following
:ref:`remote actions <api-remote_commands>` to its workers:

* **addtask** to add a new task to the task queue::

    send(taskqueue, 'addtask', jobname, task_extra, *args, **kwargs)

 * *jobname*: the name of the :class:`Job` to run.
 * *task_extra*: dictionary of extra parameters to pass to the :class:`Task`
   constructor. Usually a empty dictionary.
 * *args*: positional arguments for the :ref:`job callable <job-callable>`.
 * *kwargs*: key-valued arguments for the :ref:`job callable <job-callable>`.

* **addtask_noack** same as **addtask** but without acknowleding the sender::

    send(taskqueue, 'addtask_noack', jobname, task_extra, *args, **kwargs)
    
* **get_task** retrieve task information. This can be already executed or not.
  The implementation is left to the :meth:`Task.get_task` method::
  
    send(taskqueue, 'get_task', id)
    
* **get_tasks** retrieve information for tasks which satisfy the filtering.
  The implementation is left to the :meth:`Task.get_tasks` method::
  
    send(taskqueue, 'get_tasks', **filters)
  
    
Jobs
~~~~~~~~~~~~~~~~

An application implements several :class:`Job`
classes which specify the way each :class:`Task` is run.
Each job class is a task-factory, therefore, a task is always associated
with one job, which can be of two types:

* standard (:class:`Job`)
* periodic (:class:`PeriodicJob`)

.. _job-callable:

To define a job is simple, subclass from :class:`Job` and implement the
**job callable method**::

    from pulsar.apps import tasks

    class Addition(tasks.Job):

        def __call__(self, consumer, a, b):
            "Add two numbers"
            return a+b
            
    class Sampler(tasks.Job):

        def __call__(self, consumer, sample, size=10):
            ...

The *consumer*, instance of :class:`TaskConsumer`, is passed by the
:class:`TaskQueue` and should always be the first positional argument in the
callable function.
The remaining positional arguments and/or key-valued parameters are needed by
your job implementation.

Task Class
~~~~~~~~~~~~~~~~~

By default, tasks are constructed using an in-memory implementation of
:class:`Task`. To use a different implementation, for example one that
saves tasks on a database, subclass :class:`Task` and pass the new class
to the :class:`TaskQueue` constructor::

    from pulsar.apps import tasks

    class TaskDatabase(tasks.Task):

        def on_created(self):
            return save2db(self)

        def on_received(self):
            return save2db(self)

        def on_start(self):
            return save2db(self)

        def on_finish(self):
            return save2db(self)

        @classmethod
        def get_task(cls, id, remove = False):
            return taskfromdb(id)


    tq = tasks.TaskQueue(task_class=TaskDatabase, tasks_path='path.to.tasks.*')
    tq.start()


.. _tasks-callbacks:

Task callbacks
~~~~~~~~~~~~~~~~~~~

When creating your own :class:`Task` class all you need to override are the four
task callbacks:

* :meth:`Task.on_created` called by the taskqueue when it creates a new task
  instance.
* :meth:`Task.on_received` called by a worker when it receives the task.
* :meth:`Task.on_start` called by a worker when it starts the task.
* :meth:`Task.on_finish` called by a worker when it ends the task.


and :meth:`Task.get_task` classmethod for retrieving tasks instances.

.. _task-state:

Task states
~~~~~~~~~~~~~

A :class:`Task` can have one of the following :attr:`Task.status` string:

* ``PENDING`` A task waiting for execution and unknown.
* ``RETRY`` A task is retrying calculation.
* ``RECEIVED`` when the task is received by the task queue.
* ``STARTED`` task execution has started.
* ``REVOKED`` the task execution has been revoked. One possible reason could be
  the task has timed out.
* ``UNKNOWN`` task execution is unknown.
* ``FAILURE`` task execution has finished with failure.
* ``SUCCESS`` task execution has finished with success.


.. attribute:: FULL_RUN_STATES

    The set of states for which a :class:`Task` has run:
    ``FAILURE`` and ``SUCCESS``

.. attribute:: READY_STATES

    The set of states for which a :class:`Task` has finished:
    ``REVOKED``, ``FAILURE`` and ``SUCCESS``


Queue
~~~~~~~~~~~~~~

By default the queue is implemented using the multiprocessing.Queue
from the standard python library. To specify a different queue you can
use the ``task-queue`` flag from the command line::

    python myserverscript.py --task-queue dotted.path.to.callable

or by setting the ``task_queue_factory`` parameter in the config file
or in the :class:`TaskQueue` constructor.


.. _celery: http://celeryproject.org/
'''
import os
from datetime import datetime

import pulsar
from pulsar import to_string, safe_async
from pulsar.utils.importer import import_modules, module_attribute

from .exceptions import *
from .task import *
from .models import *
from .scheduler import Scheduler
from .states import *
from .rpc import *


class TaskQueueFactory(pulsar.Setting):
    app = 'cpubound'
    name = "task_queue_factory"
    section = "Task Consumer"
    flags = ["-q", "--task-queue"]
    default = "pulsar.Queue"
    desc = """The task queue factory to use."""

    def get(self):
        return module_attribute(self.value)


class TaskSetting(pulsar.Setting):
    virtual = True
    app = 'tasks'


class TaskPath(TaskSetting):
    name = "tasks_path"
    section = "Task Consumer"
    meta = "STRING"
    validator = pulsar.validate_list
    cli = ["--tasks-path"]
    default = ['pulsar.apps.tasks.testing']
    desc = """\
        List of python dotted paths where tasks are located.
        """


class CPUboundServer(pulsar.Application):
    '''A CPU-bound application server.'''
    _app_name = 'cpubound'

    def get_ioqueue(self):
        '''Return the distributed task queue which produces tasks to
be consumed by the workers.'''
        return self.cfg.task_queue_factory()

    def request_instance(self, worker, fd, request):
        return request

    def on_event(self, worker, fd, request):
        request = self.request_instance(worker, fd, request)
        if request is not None:
            c = self.local.current_requests
            if c is None:
                c = []
                self.local.current_requests = c
            c.append(request)
            yield safe_async(request.start, args=(worker,))
            try:
                c.remove(request)
            except ValueError:
                pass

#################################################    TASKQUEUE COMMANDS
taskqueue_cmnds = set()

@pulsar.command(internal=True, commands_set=taskqueue_cmnds)
def addtask(client, actor, caller, jobname, task_extra, *args, **kwargs):
    kwargs.pop('ack', None)
    return actor.app._addtask(actor, caller, jobname, task_extra, True,
                              args, kwargs)

@pulsar.command(internal=True, ack=False, commands_set=taskqueue_cmnds)
def addtask_noack(client, actor, caller, jobname, task_extra, *args, **kwargs):
    kwargs.pop('ack', None)
    return actor.app._addtask(actor, caller, jobname, task_extra, False,
                              args, kwargs)

@pulsar.command(internal=True, commands_set=taskqueue_cmnds)
def save_task(client, actor, caller, task):
    #import time
    #time.sleep(0.1)
    return actor.app.scheduler.save_task(task)

@pulsar.command(internal=True, commands_set=taskqueue_cmnds)
def delete_tasks(client, actor, caller, ids):
    return actor.app.scheduler.delete_tasks(ids)

@pulsar.command(commands_set=taskqueue_cmnds)
def get_task(client, actor, id):
    return actor.app.scheduler.get_task(id)

@pulsar.command(commands_set=taskqueue_cmnds)
def get_tasks(client, actor, **parameters):
    return actor.app.scheduler.get_tasks(**parameters)

@pulsar.command(commands_set=taskqueue_cmnds)
def job_list(client, actor, jobnames=None):
    return list(actor.app.job_list(jobnames=jobnames))

@pulsar.command(commands_set=taskqueue_cmnds)
def next_scheduled(client, actor, jobnames=None):
    return actor.app.scheduler.next_scheduled(jobnames=jobnames)

@pulsar.command(commands_set=taskqueue_cmnds)
def wait_for_task(client, actor, id, timeout=3600):
    # wait for a task to finish for at most timeout seconds
    scheduler = actor.app.scheduler
    return scheduler.task_class.wait_for_task(scheduler, id, timeout)


[docs]class TaskQueue(CPUboundServer): '''A :class:`pulsar.CPUboundServer` for consuming tasks and managing scheduling of tasks. .. attribute:: registry Instance of a :class:`JobRegistry` containing all registered :class:`Job` instances. ''' _app_name = 'tasks' cfg_apps = ('cpubound',) cfg = {'timeout': '3600', 'backlog': 1} commands_set = taskqueue_cmnds task_class = TaskInMemory '''The :class:`Task` class for storing information about task execution. Default: :class:`TaskInMemory` ''' scheduler_class = Scheduler '''The scheduler class. Default: :class:`Scheduler`.''' @property
[docs] def scheduler(self): '''A :class:`Scheduler` which send task to the task queue and produces of periodic tasks according to their schedule of execution. At every event loop, the :class:`pulsar.ApplicationMonitor` running the :class:`TaskQueue` application, invokes the :meth:`Scheduler.tick` which check for tasks to be scheduled. Check the :meth:`TaskQueue.monitor_task` callback for implementation.''' return self.local.scheduler
def request_instance(self, worker, fd, request): return self.scheduler.get_task(request)
[docs] def monitor_task(self, monitor): '''Override the :meth:`pulsar.Application.monitor_task` callback to check if the scheduler needs to perform a new run.''' s = self.scheduler if s: if s.next_run <= datetime.now(): s.tick(monitor)
def handler(self): # Load the application callable, the task consumer if self.callable: self.callable() import_modules(self.cfg.tasks_path) self.local.scheduler = Scheduler(self) return self def monitor_handler(self): return self.handler() def job_list(self, jobnames=None): return self.scheduler.job_list(jobnames=jobnames) @property
[docs] def registry(self): global registry return registry # Internals
def _addtask(self, monitor, caller, jobname, task_extra, ack, args, kwargs): task = self.scheduler.queue_task(monitor, jobname, args, kwargs, **task_extra) if ack: return task