Documentation for version:
0.25.2
Description¶
This package intends to offer a priority-based remote task queue solution using Redis as the transport and persistence layer, and JSON for a common interchange format.
Semantically, this module implements a 0/1 queue with optional retries. That is, it attempts to execute every task once. If the task raises an exception, it will not automatically retry, but you can manually retry the task and specify the maximum attempts. See the Retries section below.
Full documentation is available: https://pythonhosted.org/rpqueue/
Getting started¶
In order to execute tasks, you must ensure that rpqueue knows about your tasks that can be executed, you must configure rpqueue to connect to your Redis server, then you must start the task execution daemon:
from mytasks import usertasks1, usertasks2, ...
import rpqueue
rpqueue.set_redis_connection_settings(host, port, db)
rpqueue.execute_tasks()
Alternatively, rpqueue offers a command-line interface to do the same, though you must provide the name of a module or package that imports all modules or packages that define tasks that you want to run. For example:
# tasks.py
from tasks import accounting, cleanup, ...
# any other imports or configuration necessary, put them here
# run from the command-line
python -m rpqueue.run --module=tasks --host=... --port=... --db=...
Example uses¶
Say that you have a module usertasks1
with a task to be executed called
echo_to_stdout
. Your module may look like the following:
from rpqueue import task
@task
def echo_to_stdout(message):
print message
To call the above task, you would use:
echo_to_stdout.execute(...)
echo_to_stdout.execute(..., delay=delay_in_seconds)
You can also schedule a task to be repeatedly executed with the
periodic_task
decorator:
@periodic_task(25, queue="low")
def function1():
# Will be executed every 25 seconds from within the 'low' queue.
pass
Retries¶
Tasks may be provided an optional attempts
argument, which specifies the
total number of times the task will try to be executed before failing. By
default, all tasks have attempts
set at 1, unless otherwise specified:
@task(attempts=3)
def fail_until_zero(value, **kwargs):
try:
if value != 0:
value -= 1
raise Exception
except:
fail_until_zero.retry(value, **kwargs)
else:
print "succeeded"
If passed the value 3
, “succeeded” will never be printed. Why? The first
try has value=3, attempts=3, and fails. The second pass has value=2,
attempts=2, and fails. The third pass has value=1, attempts=1, fails, and the
retry returns without retrying. The attempts
value is the total number of
attempts, including the first, and all retries.
Waiting for task execution¶
As of version .19, RPQueue offers the ability to wait on a task until it begins execution:
@task
def my_task(args):
# do something
executing_task = my_task.execute()
if executing_task.wait(5):
# task is either being executed, or it is done
else:
# task has not started execution yet
With the ability to wait for a task to complete, you can have the ability to
add deadlines by inserting a call to executing_task.cancel()
in the else
block above.
Automatically storing results of tasks¶
As of version .19, RPQueue offers the ability to store the result returned by a task as it completes:
@task(save_results=30)
def task_with_results():
return 5
etask = task_with_results.execute()
if etask.wait(5):
print etask.result # should print 5
The save_results
argument can be passed to tasks, periodic tasks, and even
cron tasks (described below). The value passed will be how long the result is
stored in Redis, in seconds. All results must be json-encodable.
Additional features¶
Support for cron_tasks using a crontab-like syntax requires the Python crontab module: http://pypi.python.org/pypi/crontab/ , allowing for:
@cron_task('0 5 tue * *')
def function2():
# Will be executed every Tuesday at 5AM.
pass
Rpqueue module contents¶
rpqueue (Redis Priority Queue)
Originally written July 5, 2011 Copyright 2011-2016 Josiah Carlson Released under the GNU LGPL v2.1 available: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html
Other licenses may be available upon request.
-
rpqueue.
new_rpqueue
(name, pfix=None)¶ Creates a new rpqueue state for running separate rpqueue task systems in the same codebase. This simplifies configuration for multiple Redis servers, and allows for using the same Redis server without using queue names (provide a ‘prefix’ for all RPqueue-related keys).
-
rpqueue.
set_key_prefix
(pfix)¶ Run before starting any tasks or task runners; will set the prefix on keys in Redis, allowing for multiple parallel rpqueue executions in the same Redis without worrying about queue names.
-
rpqueue.
set_redis_connection_settings
(host='localhost', port=6379, db=0, password=None, socket_timeout=30, unix_socket_path=None)¶ Sets the global redis connection settings for the queue. If not called before use, will connect to localhost:6379 with no password and db 0.
-
rpqueue.
set_redis_connection
(conn)¶ Sets the global pooled connection to the provided connection object. Useful for environments where additional pooling or other options are desired or required.
-
rpqueue.
task
(*args, **kwargs)¶ Decorator to allow the transparent execution of a function as a task. Used via:
@task(queue='bar') def function1(arg1, arg2, ...): 'will execute from within the 'bar' queue.' @task def function2(arg1, arg2, ...): 'will execute from within the 'default' queue.'
-
rpqueue.
periodic_task
(run_every, queue='default', never_skip=False, attempts=1, retry_delay=30, low_delay_okay=False, save_results=0)¶ Decorator to allow the automatic repeated execution of a function every run_every seconds, which can be provided via int, long, float, or via a datetime.timedelta instance. Run from the context of the given queue. Used via:
@periodic_task(25) def function1(): 'will be executed every 25 seconds from within the 'default' queue.' @periodic_task(timedelta(minutes=5), queue='bar') def function2(): 'will be executed every 5 minutes from within the 'bar' queue.'
If never_skip is provided and is considered True like:
@periodic_task(60, never_skip=True) def function3(): pass
... and the function was scheduled to be executed at 4:15PM and 5 seconds, but actually executed at 4:25PM and 13 seconds, then prior to execution, it will be rescheduled to execute at 4:16PM and 5 seconds, which is 60 seconds after the earlier scheduled time (it never skips a scheduled time). If you instead had the periodic task defined as:
@periodic_task(60, never_skip=False) def function4(): pass
... and the function was scheduled to be executed at 4:15PM and 5 seconds, but actually executed at 4:25PM and 13 seconds, then prior to execution, it will be rescheduled to execute at 4:26PM and 13 seconds, which is 60 seconds after the current time (it skips any missed scheduled time).
-
rpqueue.
cron_task
(crontab, queue='default', never_skip=False, attempts=1, retry_delay=30, save_results=0)¶ Decorator to allow the automatic repeated execution of a function on a schedule with a crontab syntax. Crontab syntax provided by the ‘crontab’ Python module: http://pypi.python.org/pypi/crontab/ Which must also be installed to use this decorator.
Similar in use to the @periodic_task decorator:
@cron_task('* * * * *') def function1(): 'will be executed every minute' @cron_task('*/5 * * * *', queue='bar') def function2(): 'will be executed every 5 minutes from within the 'bar' queue.'
If never_skip is provided and is considered True, it will attempt to never skip a scheduled task, just like the @periodic_task decorator.
Please see the crontab package documentation or the crontab Wikipedia page for more information on the meaning of the schedule.
-
class
rpqueue.
Task
(queue, name, function, delay=None, never_skip=False, attempts=1, retry_delay=30, low_delay_okay=False, save_results=0)¶ An object that represents a task to be executed. These will replace functions when any of the
@task
,@periodic_task
, or@cron_task
decorators have been applied to a function.-
execute
(*args, **kwargs)¶ Invoke this task with the given arguments inside a task processor.
Optional arguments:
- delay - how long to delay the execution of this task for, in seconds
- taskid - override the taskid on this call, can be used to choose a destination key for the results (be careful!)
- _queue - override the queue to be used in this call, which can be used to alter priorities of individual calls when coupled with queue priorities
-
next
(now=None)¶ Calculates the next run time of recurring tasks.
-
retry
(*args, **kwargs)¶ Invoke this task as a retry with the given arguments inside a task processor.
To retry, the task must accept
_attempts
as a parameter, either directly or via**kwargs
.
-
-
rpqueue.
get_task
(name)¶ Get a task dynamically by name. The task’s module must be loaded first.
-
rpqueue.
result
(taskid, conn=None)¶ Get the results of remotely executing tasks from one or more taskids.
If a task is configured with save_results>0, any remote execution of that task will save its return value to expire after that many seconds.
These two ways of fetching the result are equivalent:
>>> remote = task.execute() >>> # some concurrent logic >>> result = remote.result >>> taskid = task.execute().taskid >>> # some concurrent logic >>> result = rpqueue.result(taskid)
Note
If you have more than one taskid whose results you want to fetch, check out
rpqueue.results()
below.
-
rpqueue.
results
(taskids, conn=None)¶ Get the results of remotely executing tasks from one or more taskids.
If a task is configured with save_results>0, any remote execution of that task will save its return value to expire after that many seconds.
These two ways of fetching the result are equivalent:
>>> remote = [t.execute() for t in tasks] >>> # some concurrent logic >>> results = [r.result for r in remote] >>> taskids = [t.execute().taskid for t in tasks] >>> # some concurrent logic >>> results = rpqueue.results(taskids)
-
rpqueue.
set_priority
(queue, qpri, conn=None)¶ Set the priority of a queue. Lower values means higher priorities. Queues with priorities come before queues without priorities.
-
rpqueue.
known_queues
(conn=None)¶ Get a list of all known queues.
-
rpqueue.
queue_sizes
(conn=None)¶ Return a list of all known queues, their sizes, and the number of items that have been seen in the queue.
-
rpqueue.
clear_queue
(queue, conn=None, delete=False)¶ Delete all items in a given queue, optionally deleting the queue itself.
-
rpqueue.
execute_tasks
(queues=None, threads_per_process=1, processes=1, wait_per_thread=1, module=None)¶ Will execute tasks from the (optionally) provided queues until the first value in the global SHOULD_QUIT is considered false.
-
class
rpqueue.
SimpleLock
(conn, name, duration=1)¶ This lock is dirt simple. You shouldn’t use it for anything unless you want it to fail fast when the lock is already held.
If Redis had a “setnxex key value ttl” that set the ‘key’ to ‘value’ if it wasn’t already set, and also set the expiration to ‘ttl’, this lock wouldn’t exist.
(Redis now has this functionality, but we need to support legacy)
-
refresh
()¶ Refreshes a lock
-
-
exception
rpqueue.
NoLock
¶ Raised when a lock cannot be acquired
-
rpqueue.
script_load
(script)¶ Borrowed and updated from my book, Redis in Action: https://github.com/josiahcarlson/redis-in-action/blob/master/python/ch11_listing_source.py