Documentation for pulsar 0.4.6. For development docs, go here.
An asynchronous task-queue built on top pulsar.Application framework. By creating Job classes in a similar way you can do for celery, this application gives you all you need for running them with very little setup effort:
from pulsar.apps import tasks
tq = tasks.TaskQueue(tasks_path='path.to.tasks.*')
tq.start()
The Taskqueue application adds the following remote actions to its workers:
addtask to add a new task to the task queue:
send(taskqueue, 'addtask', jobname, task_extra, *args, **kwargs)
- jobname: the name of the Job to run.
- task_extra: dictionary of extra parameters to pass to the Task constructor. Usually a empty dictionary.
- args: positional arguments for the job callable.
- kwargs: key-valued arguments for the job callable.
addtask_noack same as addtask but without acknowleding the sender:
send(taskqueue, 'addtask_noack', jobname, task_extra, *args, **kwargs)
get_task retrieve task information. This can be already executed or not. The implementation is left to the Task.get_task() method:
send(taskqueue, 'get_task', id)
get_tasks retrieve information for tasks which satisfy the filtering. The implementation is left to the Task.get_tasks() method:
send(taskqueue, 'get_tasks', **filters)
An application implements several Job classes which specify the way each Task is run. Each job class is a task-factory, therefore, a task is always associated with one job, which can be of two types:
To define a job is simple, subclass from Job and implement the job callable method:
from pulsar.apps import tasks
class Addition(tasks.Job):
def __call__(self, consumer, a, b):
"Add two numbers"
return a+b
class Sampler(tasks.Job):
def __call__(self, consumer, sample, size=10):
...
The consumer, instance of TaskConsumer, is passed by the TaskQueue and should always be the first positional argument in the callable function. The remaining positional arguments and/or key-valued parameters are needed by your job implementation.
By default, tasks are constructed using an in-memory implementation of Task. To use a different implementation, for example one that saves tasks on a database, subclass Task and pass the new class to the TaskQueue constructor:
from pulsar.apps import tasks
class TaskDatabase(tasks.Task):
def on_created(self):
return save2db(self)
def on_received(self):
return save2db(self)
def on_start(self):
return save2db(self)
def on_finish(self):
return save2db(self)
@classmethod
def get_task(cls, id, remove = False):
return taskfromdb(id)
tq = tasks.TaskQueue(task_class=TaskDatabase, tasks_path='path.to.tasks.*')
tq.start()
When creating your own Task class all you need to override are the four task callbacks:
and Task.get_task() classmethod for retrieving tasks instances.
A Task can have one of the following Task.status string:
By default the queue is implemented using the multiprocessing.Queue from the standard python library. To specify a different queue you can use the task-queue flag from the command line:
python myserverscript.py --task-queue dotted.path.to.callable
or by setting the task_queue_factory parameter in the config file or in the TaskQueue constructor.
A pulsar.CPUboundServer for consuming tasks and managing scheduling of tasks.
Instance of a JobRegistry containing all registered Job instances.
A Scheduler which send task to the task queue and produces of periodic tasks according to their schedule of execution.
At every event loop, the pulsar.ApplicationMonitor running the TaskQueue application, invokes the Scheduler.tick() which check for tasks to be scheduled.
Check the TaskQueue.monitor_task() callback for implementation.
Override the pulsar.Application.monitor_task() callback to check if the scheduler needs to perform a new run.
The Job class which is used in a distributed task queue.
The unique name which defines the Job and which can be used to retrieve it from the job registry.
If set to True (default is False), the Job won’t be registered with the JobRegistry.
If False (default is True), the Job need to be registered manually with the Job registry.
Type of Job, one of regular and periodic.
An instance of a datetime.timedelta or None. If set, it represents the time lag after which a task which did not start expires.
Default: None.
Boolean indicating if this job can generate overlapping tasks. It can also be a callable which accept the same input parameters as the job callable function.
Default: True.
The doc string syntax.
Default: markdown
Level of task logging
Default: None
an instance of a logger.
Get the task unique identifier. This can be overridden by Job implementation.
| Parameters: |
|
|---|---|
| Return type: | a native string. |
Called by the TaskQueue.scheduler when creating a new task.
Callback invocked when a task has an id equal to a task already submitted to the task queue. By default return None and the task is aborted.
Return True if a Job task will acknowledge the task queue once the result is ready or an error has occured. By default it returns True but it can be overridden so that its behaviour can change at runtime.
Send a new task request to the TaskQueue from within another Task. This allows tasks to act as tasks factories.
| Parameters: |
|
|---|---|
| Return type: | A pulsar.ActorMessage if ack=True was passed in the key-valued parameters, otherwise nothing. |
A periodic Job implementation.
If specified it must be a datetime.datetime instance. It controls when the periodic Job is run.
Periodicity as a datetime.timedelta instance.
Returns tuple of two items (is_due, next_time_to_run), where next time to run is in seconds. e.g.
time to run is in 20 seconds.
(False, 12), means the Job should be run in 12 seconds.
You can override this to decide the interval at runtime.
Interface for tasks which are produced by Job.
The current status string of task.
date time when the task was executed.
date-time when the task calculation has started.
date-time when the task has finished.
optional date-time indicating when the task should expire.
A datetime or None indicating whether a timeout has occurred.
Optional Task.id for the Task which queued this Task. This is a usuful for monitoring the creation of tasks within other tasks.
Called by the pulsar.Worker worker when the task start its execution. If no timeout has occured the task will switch to a STARTED Task.status and invoke the on_start() callback.
Called when the task has finished and a result is ready. It sets the time_end attribute if not already set (in case the Task was revoked) and determined if it was succesful or a failure. Once done it invokes the task callback on_finish().
The task has been received by the scheduler. If its status is PENDING swicth to RECEIVED, save the task and return it. Otherwise returns nothing.
called after calling to_queue(), it return True if the task needs to be queued.
Return True if the Task has finshed (its status is one of READY_STATES).
Try to revoke a task. It returns the timeout which is different from None only when the Task has been revoked.
| Return type: | None or a DateTime. |
|---|
The Task duration. Only available if the task status is in FULL_RUN_STATES.
Convert the task instance into a JSON-serializable dictionary.
A task callback when the task has has been created.
| Parameters: | scheduler – the scheduler which created the task. |
|---|
A task callback when the task has has been received by the scheduler.
A task callback when the task starts its execution
A task callback when the task is expired
A task callback when the task finish its execution
Implement the task logging emit method. By default it does nothing. It can be reimplemented to do something with the log record.
Given a task id it retrieves a task instance or None if not available.
Given filters it retrieves task instances which satisfy the filter criteria.
Wait for a task to have finished.
A pulsar.apps.rpc.JSONRPC mixin for communicating with a TaskQueue. To use it, you need to have an RPC application and a task queue application installed in the pulsar.Arbiter.
| Parameters: | taskqueue – instance or name of the pulsar.apps.tasks.TaskQueue which exposes the remote procedure calls. |
|---|
Remote Procedure Calls
Return the list of Job registered with task queue with meta information. If a list of jobnames is given, it returns only jobs included in the list.
| Return type: | A list of dictionaries |
|---|
Run a new Task in the task queue. The task can be of any type as long as it is registered in the Job registry.
| Parameters: |
|
|---|---|
| Return type: | a dictionary containing information about the Task submitted |
Retrieve a task from its id. Returns None if the task is not available.
Retrieve a list of tasks which satisfy filters.
Wait for a task to have finished.
Internal function which returns a dictionary of parameters to be passed to the Task class constructor. This function can be overridden to add information about the type of request, who made the request and so forth. It must return a dictionary. By default it returns an empty dictionary.
Scheduler is responsible for creating tasks and put them into the distributed queue. It also schedule the run of periodic tasks if enabled to do so. This class is the main driver of tasks and task scheduling.
The TaskQueue.task_class for producing new Task.
A shortcut for queue_task().
Create a new Task which may or may not queued.
| Parameters: |
|
|---|---|
| Return type: | an instance of Task |
Run a tick, that is one iteration of the scheduler. Executes all due tasks calculate the time in seconds to wait before running a new tick(). For testing purposes a datetime.datetime value now can be passed.
Remove all pending tasks
Site registry for tasks.
A generator of all regular jobs.
A generator of all periodic jobs.
Register a job in the job registry.
The task will be automatically instantiated if not already an instance.
Return a generator of all tasks of a specific type.
Metaclass for Jobs. It performs a little ammount of magic by: