Documentation for pulsar 0.4.6. For development docs, go here.
An asynchronous task-queue built on top pulsar.Application framework. By creating Job classes in a similar way you can do for celery, this application gives you all you need for running them with very little setup effort:
from pulsar.apps import tasks tq = tasks.TaskQueue(tasks_path='path.to.tasks.*') tq.start()
The Taskqueue application adds the following remote actions to its workers:
addtask to add a new task to the task queue:
send(taskqueue, 'addtask', jobname, task_extra, *args, **kwargs)
addtask_noack same as addtask but without acknowleding the sender:
send(taskqueue, 'addtask_noack', jobname, task_extra, *args, **kwargs)
get_task retrieve task information. This can be already executed or not. The implementation is left to the Task.get_task() method:
send(taskqueue, 'get_task', id)
get_tasks retrieve information for tasks which satisfy the filtering. The implementation is left to the Task.get_tasks() method:
send(taskqueue, 'get_tasks', **filters)
To define a job is simple, subclass from Job and implement the job callable method:
from pulsar.apps import tasks class Addition(tasks.Job): def __call__(self, consumer, a, b): "Add two numbers" return a+b class Sampler(tasks.Job): def __call__(self, consumer, sample, size=10): ...
The consumer, instance of TaskConsumer, is passed by the TaskQueue and should always be the first positional argument in the callable function. The remaining positional arguments and/or key-valued parameters are needed by your job implementation.
By default, tasks are constructed using an in-memory implementation of Task. To use a different implementation, for example one that saves tasks on a database, subclass Task and pass the new class to the TaskQueue constructor:
from pulsar.apps import tasks class TaskDatabase(tasks.Task): def on_created(self): return save2db(self) def on_received(self): return save2db(self) def on_start(self): return save2db(self) def on_finish(self): return save2db(self) @classmethod def get_task(cls, id, remove = False): return taskfromdb(id) tq = tasks.TaskQueue(task_class=TaskDatabase, tasks_path='path.to.tasks.*') tq.start()
When creating your own Task class all you need to override are the four task callbacks:
and Task.get_task() classmethod for retrieving tasks instances.
By default the queue is implemented using the multiprocessing.Queue from the standard python library. To specify a different queue you can use the task-queue flag from the command line:
python myserverscript.py --task-queue dotted.path.to.callable
or by setting the task_queue_factory parameter in the config file or in the TaskQueue constructor.
A pulsar.CPUboundServer for consuming tasks and managing scheduling of tasks.
A Scheduler which send task to the task queue and produces of periodic tasks according to their schedule of execution.
Check the TaskQueue.monitor_task() callback for implementation.
The Job class which is used in a distributed task queue.
The unique name which defines the Job and which can be used to retrieve it from the job registry.
If False (default is True), the Job need to be registered manually with the Job registry.
Type of Job, one of regular and periodic.
An instance of a datetime.timedelta or None. If set, it represents the time lag after which a task which did not start expires.
Boolean indicating if this job can generate overlapping tasks. It can also be a callable which accept the same input parameters as the job callable function.
The doc string syntax.
Level of task logging
an instance of a logger.
Get the task unique identifier. This can be overridden by Job implementation.
a native string.
Called by the TaskQueue.scheduler when creating a new task.
Callback invocked when a task has an id equal to a task already submitted to the task queue. By default return None and the task is aborted.
Return True if a Job task will acknowledge the task queue once the result is ready or an error has occured. By default it returns True but it can be overridden so that its behaviour can change at runtime.
A pulsar.ActorMessage if ack=True was passed in the key-valued parameters, otherwise nothing.
A periodic Job implementation.
If specified it must be a datetime.datetime instance. It controls when the periodic Job is run.
Periodicity as a datetime.timedelta instance.
Returns tuple of two items (is_due, next_time_to_run), where next time to run is in seconds. e.g.
time to run is in 20 seconds.
(False, 12), means the Job should be run in 12 seconds.
You can override this to decide the interval at runtime.
Interface for tasks which are produced by Job.
date time when the task was executed.
date-time when the task calculation has started.
date-time when the task has finished.
optional date-time indicating when the task should expire.
A datetime or None indicating whether a timeout has occurred.
Called when the task has finished and a result is ready. It sets the time_end attribute if not already set (in case the Task was revoked) and determined if it was succesful or a failure. Once done it invokes the task callback on_finish().
The task has been received by the scheduler. If its status is PENDING swicth to RECEIVED, save the task and return it. Otherwise returns nothing.
|Return type:||None or a DateTime.|
Convert the task instance into a JSON-serializable dictionary.
A task callback when the task has has been created.
|Parameters:||scheduler – the scheduler which created the task.|
Implement the task logging emit method. By default it does nothing. It can be reimplemented to do something with the log record.
Given a task id it retrieves a task instance or None if not available.
Given filters it retrieves task instances which satisfy the filter criteria.
Wait for a task to have finished.
|Parameters:||taskqueue – instance or name of the pulsar.apps.tasks.TaskQueue which exposes the remote procedure calls.|
Remote Procedure Calls
Return the list of Job registered with task queue with meta information. If a list of jobnames is given, it returns only jobs included in the list.
|Return type:||A list of dictionaries|
a dictionary containing information about the Task submitted
Retrieve a task from its id. Returns None if the task is not available.
Retrieve a list of tasks which satisfy filters.
Wait for a task to have finished.
Internal function which returns a dictionary of parameters to be passed to the Task class constructor. This function can be overridden to add information about the type of request, who made the request and so forth. It must return a dictionary. By default it returns an empty dictionary.
Scheduler is responsible for creating tasks and put them into the distributed queue. It also schedule the run of periodic tasks if enabled to do so. This class is the main driver of tasks and task scheduling.
Create a new Task which may or may not queued.
an instance of Task
Run a tick, that is one iteration of the scheduler. Executes all due tasks calculate the time in seconds to wait before running a new tick(). For testing purposes a datetime.datetime value now can be passed.
Remove all pending tasks
A context manager for consuming tasks. This is used by the Scheduler in a with block to correctly handle exceptions and optional logging.
the pulsar.apps.Worker running the process.
Site registry for tasks.
A generator of all regular jobs.
A generator of all periodic jobs.
Register a job in the job registry.
The task will be automatically instantiated if not already an instance.
Return a generator of all tasks of a specific type.
Metaclass for Jobs. It performs a little ammount of magic by: