Tutorial

Install

$ pip install dsq

Also you need to start redis.

Register and push task

Task is user defined function which actual execution can be postponed by pushing name and arguments into some queue. One can have multiple queues. Queues are created on the fly.

# tasks.py
import sys
import logging
import dsq

# dsq does not init any logger by itself
# and one must do it explicitly
logging.basicConfig(level=logging.INFO)

# using 127.0.0.1:6379/0 redis by default
manager = dsq.create_manager()

def task(value):
    print value

# tasks should be registered so workers can execute them
manager.register('my-task', task)

if __name__ == '__main__':
    # put my-task into normal queue
    manager.push('normal', 'my-task', args=[sys.argv[1]])

Now run push by executing:

$ python tasks.py Hello
$ python tasks.py World

You can see queue size via stat command:

$ dsq stat -t tasks
normal      2
schedule    0

Start worker for normal queue:

$ dsq worker -b -t tasks normal
INFO:dsq.worker:Executing task(Hello)#CLCKs0nNRQqC4TKVkwDFRw
Hello
INFO:dsq.worker:Executing task(World)#LjCRG7yiQIqVKms-QfhmGg
World

-b stops worker after queue is empty.

Task decorator

There is a shortcut to register tasks and push them via dsq.manager.Manager.task() decorator:

# tasks.py
import sys
import logging
import dsq

logging.basicConfig(level=logging.INFO)
manager = dsq.create_manager()

@manager.task(queue='normal')
def task(value):
    print value

if __name__ == '__main__':
    task(sys.argv[1])

Queue priorities

Worker queue list is prioritized. It processes tasks from a first queue, then from a second if first is empty and so on:

# tasks.py
import logging
import dsq

logging.basicConfig(level=logging.INFO)
manager = dsq.create_manager()

@manager.task(queue='high')
def high(value):
    print 'urgent', value

@manager.task(queue='normal')
def normal(value):
    print 'normal', value

if __name__ == '__main__':
    normal(1)
    normal(2)
    normal(3)
    high(4)
    normal(5)
    high(6)

And processing:

$ python tasks.py
$ dsq stat -t tasks
high        2
normal      4
schedule    0
$ dsq worker -bt tasks high normal
INFO:dsq.worker:Executing high(4)#w9RKVQ4oQoO9ivB8q198QA
urgent 4
INFO:dsq.worker:Executing high(6)#SEss1H0QQB2TAqLQjbBpmw
urgent 6
INFO:dsq.worker:Executing normal(1)#NY-e_Nu3QT-4zCDU9LvIvA
normal 1
INFO:dsq.worker:Executing normal(2)#yy44h7tcToe5yyTSUJ7dLw
normal 2
INFO:dsq.worker:Executing normal(3)#Hx3iau2MRW2xwwOFNinJIg
normal 3
INFO:dsq.worker:Executing normal(5)#DTDpF9xkSkaChwFURRCzDQ
normal 5

Delayed tasks

You can use eta or delay parameter to postpone task:

# tasks.py
import sys
import logging
import dsq

logging.basicConfig(level=logging.INFO)
manager = dsq.create_manager()

@manager.task(queue='normal')
def task(value):
    print value

if __name__ == '__main__':
    task.run_with(delay=30)(sys.argv[1])

You should use scheduler command to queue such tasks:

$ python tasks.py boo
$ python tasks.py foo
$ date
Sun Jul 17 13:41:10 MSK 2016
$ dsq stat -t tasks
schedule    2
$ dsq schedule -t tasks
2016-07-17 13:41:32 normal  {"args": ["boo"], "id": "qWbsEnu2SRyjwIXga35yqA", "name": "task"}
2016-07-17 13:41:34 normal  {"args": ["foo"], "id": "xVm3OyWjQB2XDiskTsCN4w", "name": "task"}

# next command waits until all tasks will be scheduled
$ dsq scheduler -bt tasks
$ dsq stat -t tasks
normal      2
schedule    0
$ dsq queue -t tasks
{"args": ["boo"], "id": "qWbsEnu2SRyjwIXga35yqA", "name": "task"}
{"args": ["foo"], "id": "xVm3OyWjQB2XDiskTsCN4w", "name": "task"}
$ dsq worker -bt tasks normal
INFO:dsq.worker:Executing task(boo)#qWbsEnu2SRyjwIXga35yqA
boo
INFO:dsq.worker:Executing task(foo)#xVm3OyWjQB2XDiskTsCN4w
foo

Note

In production you need to start N workers and one scheduler to be able to process delayed tasks.

Task result

Provide keep_result parameter to be able fetch task result later:

# tasks.py
import sys
import logging
import dsq

logging.basicConfig(level=logging.INFO)
manager = dsq.create_manager()

@manager.task(queue='normal', keep_result=600)
def div(a, b):
    return a/b

if __name__ == '__main__':
    result = div(int(sys.argv[1]), int(sys.argv[2]))
    if result.ready(5):
        if result.error:
            print result.error, result.error_message
        else:
            print 'Result is: ', result.value
    else:
        print 'Result is not ready'

Process:

# start worker in background
$ dsq worker -t tasks normal &
[1] 6419
$ python tasks.py 10 2
INFO:dsq.worker:Executing div(10, 2)#6S_UlsECSxSddtluBLB6yQ
Result is:  5
$ python tasks.py 10 0
INFO:dsq.worker:Executing div(10, 0)#_WQxcUDYQH6ZtqfSe1-0-Q
ERROR:dsq.manager:Error during processing task div(10, 0)#_WQxcUDYQH6ZtqfSe1-0-Q
Traceback (most recent call last):
  File "/home/bobrov/work/dsq/dsq/manager.py", line 242, in process
    result = func(*args, **kwargs)
  File "./tasks.py", line 11, in div
    return a/b
ZeroDivisionError: integer division or modulo by zero
ZeroDivisionError integer division or modulo by zero
# kill worker
$ kill %1
[1]+  Done                    dsq worker -t tasks normal
$ python tasks.py 10 1
Result is not ready