Documentation for version:

0.25.2

Description

This package intends to offer a priority-based remote task queue solution using Redis as the transport and persistence layer, and JSON for a common interchange format.

Semantically, this module implements a 0/1 queue with optional retries. That is, it attempts to execute every task once. If the task raises an exception, it will not automatically retry, but you can manually retry the task and specify the maximum attempts. See the Retries section below.

Full documentation is available: https://pythonhosted.org/rpqueue/

Getting started

In order to execute tasks, you must ensure that rpqueue knows about your tasks that can be executed, you must configure rpqueue to connect to your Redis server, then you must start the task execution daemon:

from mytasks import usertasks1, usertasks2, ...
import rpqueue

rpqueue.set_redis_connection_settings(host, port, db)
rpqueue.execute_tasks()

Alternatively, rpqueue offers a command-line interface to do the same, though you must provide the name of a module or package that imports all modules or packages that define tasks that you want to run. For example:

# tasks.py
from tasks import accounting, cleanup, ...
# any other imports or configuration necessary, put them here

# run from the command-line
python -m rpqueue.run --module=tasks --host=... --port=... --db=...

Example uses

Say that you have a module usertasks1 with a task to be executed called echo_to_stdout. Your module may look like the following:

from rpqueue import task

@task
def echo_to_stdout(message):
    print message

To call the above task, you would use:

echo_to_stdout.execute(...)
echo_to_stdout.execute(..., delay=delay_in_seconds)

You can also schedule a task to be repeatedly executed with the periodic_task decorator:

@periodic_task(25, queue="low")
def function1():
    # Will be executed every 25 seconds from within the 'low' queue.
    pass

Retries

Tasks may be provided an optional attempts argument, which specifies the total number of times the task will try to be executed before failing. By default, all tasks have attempts set at 1, unless otherwise specified:

@task(attempts=3)
def fail_until_zero(value, **kwargs):
    try:
        if value != 0:
            value -= 1
            raise Exception
    except:
        fail_until_zero.retry(value, **kwargs)
    else:
        print "succeeded"

If passed the value 3, “succeeded” will never be printed. Why? The first try has value=3, attempts=3, and fails. The second pass has value=2, attempts=2, and fails. The third pass has value=1, attempts=1, fails, and the retry returns without retrying. The attempts value is the total number of attempts, including the first, and all retries.

Waiting for task execution

As of version .19, RPQueue offers the ability to wait on a task until it begins execution:

@task
def my_task(args):
    # do something

executing_task = my_task.execute()
if executing_task.wait(5):
    # task is either being executed, or it is done
else:
    # task has not started execution yet

With the ability to wait for a task to complete, you can have the ability to add deadlines by inserting a call to executing_task.cancel() in the else block above.

Automatically storing results of tasks

As of version .19, RPQueue offers the ability to store the result returned by a task as it completes:

@task(save_results=30)
def task_with_results():
    return 5

etask = task_with_results.execute()
if etask.wait(5):
    print etask.result # should print 5

The save_results argument can be passed to tasks, periodic tasks, and even cron tasks (described below). The value passed will be how long the result is stored in Redis, in seconds. All results must be json-encodable.

Additional features

Support for cron_tasks using a crontab-like syntax requires the Python crontab module: http://pypi.python.org/pypi/crontab/ , allowing for:

@cron_task('0 5 tue * *')
def function2():
    # Will be executed every Tuesday at 5AM.
    pass

Rpqueue module contents

rpqueue (Redis Priority Queue)

Originally written July 5, 2011 Copyright 2011-2016 Josiah Carlson Released under the GNU LGPL v2.1 available: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html

Other licenses may be available upon request.

rpqueue.new_rpqueue(name, pfix=None)

Creates a new rpqueue state for running separate rpqueue task systems in the same codebase. This simplifies configuration for multiple Redis servers, and allows for using the same Redis server without using queue names (provide a ‘prefix’ for all RPqueue-related keys).

rpqueue.set_key_prefix(pfix)

Run before starting any tasks or task runners; will set the prefix on keys in Redis, allowing for multiple parallel rpqueue executions in the same Redis without worrying about queue names.

rpqueue.set_redis_connection_settings(host='localhost', port=6379, db=0, password=None, socket_timeout=30, unix_socket_path=None)

Sets the global redis connection settings for the queue. If not called before use, will connect to localhost:6379 with no password and db 0.

rpqueue.set_redis_connection(conn)

Sets the global pooled connection to the provided connection object. Useful for environments where additional pooling or other options are desired or required.

rpqueue.task(*args, **kwargs)

Decorator to allow the transparent execution of a function as a task. Used via:

@task(queue='bar')
def function1(arg1, arg2, ...):
    'will execute from within the 'bar' queue.'

@task
def function2(arg1, arg2, ...):
    'will execute from within the 'default' queue.'
rpqueue.periodic_task(run_every, queue='default', never_skip=False, attempts=1, retry_delay=30, low_delay_okay=False, save_results=0)

Decorator to allow the automatic repeated execution of a function every run_every seconds, which can be provided via int, long, float, or via a datetime.timedelta instance. Run from the context of the given queue. Used via:

@periodic_task(25)
def function1():
    'will be executed every 25 seconds from within the 'default' queue.'

@periodic_task(timedelta(minutes=5), queue='bar')
def function2():
    'will be executed every 5 minutes from within the 'bar' queue.'

If never_skip is provided and is considered True like:

@periodic_task(60, never_skip=True)
def function3():
    pass

... and the function was scheduled to be executed at 4:15PM and 5 seconds, but actually executed at 4:25PM and 13 seconds, then prior to execution, it will be rescheduled to execute at 4:16PM and 5 seconds, which is 60 seconds after the earlier scheduled time (it never skips a scheduled time). If you instead had the periodic task defined as:

@periodic_task(60, never_skip=False)
def function4():
    pass

... and the function was scheduled to be executed at 4:15PM and 5 seconds, but actually executed at 4:25PM and 13 seconds, then prior to execution, it will be rescheduled to execute at 4:26PM and 13 seconds, which is 60 seconds after the current time (it skips any missed scheduled time).

rpqueue.cron_task(crontab, queue='default', never_skip=False, attempts=1, retry_delay=30, save_results=0)

Decorator to allow the automatic repeated execution of a function on a schedule with a crontab syntax. Crontab syntax provided by the ‘crontab’ Python module: http://pypi.python.org/pypi/crontab/ Which must also be installed to use this decorator.

Similar in use to the @periodic_task decorator:

@cron_task('* * * * *')
def function1():
    'will be executed every minute'

@cron_task('*/5 * * * *', queue='bar')
def function2():
    'will be executed every 5 minutes from within the 'bar' queue.'

If never_skip is provided and is considered True, it will attempt to never skip a scheduled task, just like the @periodic_task decorator.

Please see the crontab package documentation or the crontab Wikipedia page for more information on the meaning of the schedule.

class rpqueue.Task(queue, name, function, delay=None, never_skip=False, attempts=1, retry_delay=30, low_delay_okay=False, save_results=0)

An object that represents a task to be executed. These will replace functions when any of the @task, @periodic_task, or @cron_task decorators have been applied to a function.

execute(*args, **kwargs)

Invoke this task with the given arguments inside a task processor.

Optional arguments:

  • delay - how long to delay the execution of this task for, in seconds
  • taskid - override the taskid on this call, can be used to choose a destination key for the results (be careful!)
  • _queue - override the queue to be used in this call, which can be used to alter priorities of individual calls when coupled with queue priorities
next(now=None)

Calculates the next run time of recurring tasks.

retry(*args, **kwargs)

Invoke this task as a retry with the given arguments inside a task processor.

To retry, the task must accept _attempts as a parameter, either directly or via **kwargs.

rpqueue.get_task(name)

Get a task dynamically by name. The task’s module must be loaded first.

rpqueue.result(taskid, conn=None)

Get the results of remotely executing tasks from one or more taskids.

If a task is configured with save_results>0, any remote execution of that task will save its return value to expire after that many seconds.

These two ways of fetching the result are equivalent:

>>> remote = task.execute()
>>> # some concurrent logic
>>> result = remote.result

>>> taskid = task.execute().taskid
>>> # some concurrent logic
>>> result = rpqueue.result(taskid)

Note

If you have more than one taskid whose results you want to fetch, check out rpqueue.results() below.

rpqueue.results(taskids, conn=None)

Get the results of remotely executing tasks from one or more taskids.

If a task is configured with save_results>0, any remote execution of that task will save its return value to expire after that many seconds.

These two ways of fetching the result are equivalent:

>>> remote = [t.execute() for t in tasks]
>>> # some concurrent logic
>>> results = [r.result for r in remote]

>>> taskids = [t.execute().taskid for t in tasks]
>>> # some concurrent logic
>>> results = rpqueue.results(taskids)
rpqueue.set_priority(queue, qpri, conn=None)

Set the priority of a queue. Lower values means higher priorities. Queues with priorities come before queues without priorities.

rpqueue.known_queues(conn=None)

Get a list of all known queues.

rpqueue.queue_sizes(conn=None)

Return a list of all known queues, their sizes, and the number of items that have been seen in the queue.

rpqueue.clear_queue(queue, conn=None, delete=False)

Delete all items in a given queue, optionally deleting the queue itself.

rpqueue.execute_tasks(queues=None, threads_per_process=1, processes=1, wait_per_thread=1, module=None)

Will execute tasks from the (optionally) provided queues until the first value in the global SHOULD_QUIT is considered false.

class rpqueue.SimpleLock(conn, name, duration=1)

This lock is dirt simple. You shouldn’t use it for anything unless you want it to fail fast when the lock is already held.

If Redis had a “setnxex key value ttl” that set the ‘key’ to ‘value’ if it wasn’t already set, and also set the expiration to ‘ttl’, this lock wouldn’t exist.

(Redis now has this functionality, but we need to support legacy)

refresh()

Refreshes a lock

exception rpqueue.NoLock

Raised when a lock cannot be acquired

rpqueue.script_load(script)

Borrowed and updated from my book, Redis in Action: https://github.com/josiahcarlson/redis-in-action/blob/master/python/ch11_listing_source.py