Documentation for pulsar 0.4.6. For development docs, go here.

Distributed Task Queues

An asynchronous task-queue built on top pulsar.Application framework. By creating Job classes in a similar way you can do for celery, this application gives you all you need for running them with very little setup effort:

from pulsar.apps import tasks

tq = tasks.TaskQueue(tasks_path='path.to.tasks.*')
tq.start()

Tutorial

Actions

The Taskqueue application adds the following remote actions to its workers:

  • addtask to add a new task to the task queue:

    send(taskqueue, 'addtask', jobname, task_extra, *args, **kwargs)
    
  • jobname: the name of the Job to run.
  • task_extra: dictionary of extra parameters to pass to the Task constructor. Usually a empty dictionary.
  • args: positional arguments for the job callable.
  • kwargs: key-valued arguments for the job callable.
  • addtask_noack same as addtask but without acknowleding the sender:

    send(taskqueue, 'addtask_noack', jobname, task_extra, *args, **kwargs)
    
  • get_task retrieve task information. This can be already executed or not. The implementation is left to the Task.get_task() method:

    send(taskqueue, 'get_task', id)
    
  • get_tasks retrieve information for tasks which satisfy the filtering. The implementation is left to the Task.get_tasks() method:

    send(taskqueue, 'get_tasks', **filters)
    

Jobs

An application implements several Job classes which specify the way each Task is run. Each job class is a task-factory, therefore, a task is always associated with one job, which can be of two types:

To define a job is simple, subclass from Job and implement the job callable method:

from pulsar.apps import tasks

class Addition(tasks.Job):

    def __call__(self, consumer, a, b):
        "Add two numbers"
        return a+b
        
class Sampler(tasks.Job):

    def __call__(self, consumer, sample, size=10):
        ...

The consumer, instance of TaskConsumer, is passed by the TaskQueue and should always be the first positional argument in the callable function. The remaining positional arguments and/or key-valued parameters are needed by your job implementation.

Task Class

By default, tasks are constructed using an in-memory implementation of Task. To use a different implementation, for example one that saves tasks on a database, subclass Task and pass the new class to the TaskQueue constructor:

from pulsar.apps import tasks

class TaskDatabase(tasks.Task):

    def on_created(self):
        return save2db(self)

    def on_received(self):
        return save2db(self)

    def on_start(self):
        return save2db(self)

    def on_finish(self):
        return save2db(self)

    @classmethod
    def get_task(cls, id, remove = False):
        return taskfromdb(id)


tq = tasks.TaskQueue(task_class=TaskDatabase, tasks_path='path.to.tasks.*')
tq.start()

Task callbacks

When creating your own Task class all you need to override are the four task callbacks:

and Task.get_task() classmethod for retrieving tasks instances.

Task states

A Task can have one of the following Task.status string:

  • PENDING A task waiting for execution and unknown.
  • RETRY A task is retrying calculation.
  • RECEIVED when the task is received by the task queue.
  • STARTED task execution has started.
  • REVOKED the task execution has been revoked. One possible reason could be the task has timed out.
  • UNKNOWN task execution is unknown.
  • FAILURE task execution has finished with failure.
  • SUCCESS task execution has finished with success.
pulsar.apps.tasks.FULL_RUN_STATES

The set of states for which a Task has run: FAILURE and SUCCESS

pulsar.apps.tasks.READY_STATES

The set of states for which a Task has finished: REVOKED, FAILURE and SUCCESS

Queue

By default the queue is implemented using the multiprocessing.Queue from the standard python library. To specify a different queue you can use the task-queue flag from the command line:

python myserverscript.py --task-queue dotted.path.to.callable

or by setting the task_queue_factory parameter in the config file or in the TaskQueue constructor.

API

Taskqueue

class pulsar.apps.tasks.TaskQueue(callable=None, description=None, name=None, epilog=None, argv=None, script=None, version=None, parse_console=True, commands_set=None, cfg=None, **kwargs)[source]

A pulsar.CPUboundServer for consuming tasks and managing scheduling of tasks.

registry[source]

Instance of a JobRegistry containing all registered Job instances.

task_class

The Task class for storing information about task execution.

Default: TaskInMemory

scheduler_class

The scheduler class. Default: Scheduler.

scheduler[source]

A Scheduler which send task to the task queue and produces of periodic tasks according to their schedule of execution.

At every event loop, the pulsar.ApplicationMonitor running the TaskQueue application, invokes the Scheduler.tick() which check for tasks to be scheduled.

Check the TaskQueue.monitor_task() callback for implementation.

monitor_task(monitor)[source]

Override the pulsar.Application.monitor_task() callback to check if the scheduler needs to perform a new run.

Job

class pulsar.apps.tasks.Job

The Job class which is used in a distributed task queue.

name

The unique name which defines the Job and which can be used to retrieve it from the job registry.

abstract

If set to True (default is False), the Job won’t be registered with the JobRegistry.

autoregister

If False (default is True), the Job need to be registered manually with the Job registry.

type

Type of Job, one of regular and periodic.

timeout

An instance of a datetime.timedelta or None. If set, it represents the time lag after which a task which did not start expires.

Default: None.

can_overlap

Boolean indicating if this job can generate overlapping tasks. It can also be a callable which accept the same input parameters as the job callable function.

Default: True.

doc_syntax

The doc string syntax.

Default: markdown

loglevel

Level of task logging

Default: None

logger

an instance of a logger.

make_task_id(args, kwargs)

Get the task unique identifier. This can be overridden by Job implementation.

Parameters:
  • args – tuple of positional arguments passed to the job callable.
  • kwargs – dictionary of key-valued parameters passed to the job callable.
Return type:

a native string.

Called by the TaskQueue.scheduler when creating a new task.

on_same_id(task)

Callback invocked when a task has an id equal to a task already submitted to the task queue. By default return None and the task is aborted.

ack(args, kwargs)

Return True if a Job task will acknowledge the task queue once the result is ready or an error has occured. By default it returns True but it can be overridden so that its behaviour can change at runtime.

send_to_queue(consumer, jobname, *args, **kwargs)

Send a new task request to the TaskQueue from within another Task. This allows tasks to act as tasks factories.

Parameters:
Return type:

A pulsar.ActorMessage if ack=True was passed in the key-valued parameters, otherwise nothing.

Periodic Job

class pulsar.apps.tasks.PeriodicJob(run_every=None)

A periodic Job implementation.

anchor

If specified it must be a datetime.datetime instance. It controls when the periodic Job is run.

run_every

Periodicity as a datetime.timedelta instance.

is_due(last_run_at)

Returns tuple of two items (is_due, next_time_to_run), where next time to run is in seconds. e.g.

  • (True, 20), means the Job should be run now, and the next

    time to run is in 20 seconds.

  • (False, 12), means the Job should be run in 12 seconds.

You can override this to decide the interval at runtime.

Task

class pulsar.apps.tasks.Task

Interface for tasks which are produced by Job.

id

Task unique id.

name

Job name.

status

The current status string of task.

time_executed

date time when the task was executed.

time_start

date-time when the task calculation has started.

time_end

date-time when the task has finished.

expiry

optional date-time indicating when the task should expire.

timeout

A datetime or None indicating whether a timeout has occurred.

from_task

Optional Task.id for the Task which queued this Task. This is a usuful for monitoring the creation of tasks within other tasks.

status_code

Integer indicating status precedence. Lower number higher precedence.

start(worker)

Called by the pulsar.Worker worker when the task start its execution. If no timeout has occured the task will switch to a STARTED Task.status and invoke the on_start() callback.

finish(worker, result)

Called when the task has finished and a result is ready. It sets the time_end attribute if not already set (in case the Task was revoked) and determined if it was succesful or a failure. Once done it invokes the task callback on_finish().

to_queue(schedulter=None)

The task has been received by the scheduler. If its status is PENDING swicth to RECEIVED, save the task and return it. Otherwise returns nothing.

needs_queuing()

called after calling to_queue(), it return True if the task needs to be queued.

done()

Return True if the Task has finshed (its status is one of READY_STATES).

maybe_revoked()

Try to revoke a task. It returns the timeout which is different from None only when the Task has been revoked.

Return type:None or a DateTime.
duration()

The Task duration. Only available if the task status is in FULL_RUN_STATES.

tojson()

Convert the task instance into a JSON-serializable dictionary.

on_created(scheduler=None)

A task callback when the task has has been created.

Parameters:scheduler – the scheduler which created the task.
on_received(scheduler=None)

A task callback when the task has has been received by the scheduler.

on_start(worker=None)

A task callback when the task starts its execution

on_timeout(worker=None)

A task callback when the task is expired

on_finish(worker=None)

A task callback when the task finish its execution

emit_log(record)

Implement the task logging emit method. By default it does nothing. It can be reimplemented to do something with the log record.

classmethod get_task(scheduler, id)

Given a task id it retrieves a task instance or None if not available.

classmethod get_tasks(scheduler, **filters)

Given filters it retrieves task instances which satisfy the filter criteria.

classmethod wait_for_task(scheduler, id, timeout)

Wait for a task to have finished.

TaskQueue Rpc Mixin

class pulsar.apps.tasks.TaskQueueRpcMixin(taskqueue, **kwargs)

A pulsar.apps.rpc.JSONRPC mixin for communicating with a TaskQueue. To use it, you need to have an RPC application and a task queue application installed in the pulsar.Arbiter.

Parameters:taskqueue – instance or name of the pulsar.apps.tasks.TaskQueue which exposes the remote procedure calls.

Remote Procedure Calls

job_list([jobnames=None])

Return the list of Job registered with task queue with meta information. If a list of jobnames is given, it returns only jobs included in the list.

Return type:A list of dictionaries
run_new_task(jobname[, **kwargs])

Run a new Task in the task queue. The task can be of any type as long as it is registered in the Job registry.

Parameters:
  • jobname – the name of the Job to run.
  • kwargs – optional key-valued job parameters.
Return type:

a dictionary containing information about the Task submitted

get_task(id=task_id)

Retrieve a task from its id. Returns None if the task is not available.

get_tasks(**filters)

Retrieve a list of tasks which satisfy filters.

wait_for_task(id=task_id)

Wait for a task to have finished.

task_request_parameters(request)

Internal function which returns a dictionary of parameters to be passed to the Task class constructor. This function can be overridden to add information about the type of request, who made the request and so forth. It must return a dictionary. By default it returns an empty dictionary.

Scheduler

class pulsar.apps.tasks.Scheduler(queue, task_class, tasks_path=None, logger=None, schedule_periodic=False)

Scheduler is responsible for creating tasks and put them into the distributed queue. It also schedule the run of periodic tasks if enabled to do so. This class is the main driver of tasks and task scheduling.

task_class

The TaskQueue.task_class for producing new Task.

run(jobname, *args, **kwargs)

A shortcut for queue_task().

queue_task(jobname, targs=None, tkwargs=None, **params)

Create a new Task which may or may not queued.

Parameters:
  • jobname – the name of a Job registered with the TaskQueue application.
  • targs – optional tuple used for the positional arguments in the task callable.
  • tkwargs – optional dictionary used for the key-valued arguments in the task callable.
  • params – Additional parameters to be passed to the Task constructor (not its callable function).
Return type:

an instance of Task

tick(now=None)

Run a tick, that is one iteration of the scheduler. Executes all due tasks calculate the time in seconds to wait before running a new tick(). For testing purposes a datetime.datetime value now can be passed.

flush()

Remove all pending tasks

TaskConsumer

class pulsar.apps.tasks.TaskConsumer(task, worker, job)

A context manager for consuming tasks. This is used by the Scheduler in a with block to correctly handle exceptions and optional logging.

task

the Task being consumed.

job

the Job which generated the task.

worker

the pulsar.apps.Worker running the process.

Job Registry

class pulsar.apps.tasks.JobRegistry

Site registry for tasks.

regular()

A generator of all regular jobs.

periodic()

A generator of all periodic jobs.

register(job)

Register a job in the job registry.

The task will be automatically instantiated if not already an instance.

filter_types(type)

Return a generator of all tasks of a specific type.

Job meta Class

class pulsar.apps.tasks.JobMetaClass

Metaclass for Jobs. It performs a little ammount of magic by:

  • Automatic registration of Job instances to the global JobRegistry, unless the Job.abstract attribute is set to True.
  • If no Job.name` attribute is provided, it is automatically set to the class name in lower case.
  • Add a logger instance with name given by ‘job.name` where name is the same as above.

Table Of Contents

Previous topic

JSON-RPC

Next topic

WebSockets

This Page