Tutorial¶
Register and push task¶
Task is user defined function which actual execution can be postponed by pushing name and arguments into some queue. One can have multiple queues. Queues are created on the fly.
# tasks.py
import sys
import logging
import dsq
# dsq does not init any logger by itself
# and one must do it explicitly
logging.basicConfig(level=logging.INFO)
# using 127.0.0.1:6379/0 redis by default
manager = dsq.create_manager()
def task(value):
print value
# tasks should be registered so workers can execute them
manager.register('my-task', task)
if __name__ == '__main__':
# put my-task into normal queue
manager.push('normal', 'my-task', args=[sys.argv[1]])
Now run push by executing:
$ python tasks.py Hello
$ python tasks.py World
You can see queue size via stat
command:
$ dsq stat -t tasks
normal 2
schedule 0
Start worker for normal
queue:
$ dsq worker -b -t tasks normal
INFO:dsq.worker:Executing task(Hello)#CLCKs0nNRQqC4TKVkwDFRw
Hello
INFO:dsq.worker:Executing task(World)#LjCRG7yiQIqVKms-QfhmGg
World
-b
stops worker after queue is empty.
Task decorator¶
There is a shortcut to register tasks and push them via
dsq.manager.Manager.task()
decorator:
# tasks.py
import sys
import logging
import dsq
logging.basicConfig(level=logging.INFO)
manager = dsq.create_manager()
@manager.task(queue='normal')
def task(value):
print value
if __name__ == '__main__':
task(sys.argv[1])
Queue priorities¶
Worker queue list is prioritized. It processes tasks from a first queue, then from a second if first is empty and so on:
# tasks.py
import logging
import dsq
logging.basicConfig(level=logging.INFO)
manager = dsq.create_manager()
@manager.task(queue='high')
def high(value):
print 'urgent', value
@manager.task(queue='normal')
def normal(value):
print 'normal', value
if __name__ == '__main__':
normal(1)
normal(2)
normal(3)
high(4)
normal(5)
high(6)
And processing:
$ python tasks.py
$ dsq stat -t tasks
high 2
normal 4
schedule 0
$ dsq worker -bt tasks high normal
INFO:dsq.worker:Executing high(4)#w9RKVQ4oQoO9ivB8q198QA
urgent 4
INFO:dsq.worker:Executing high(6)#SEss1H0QQB2TAqLQjbBpmw
urgent 6
INFO:dsq.worker:Executing normal(1)#NY-e_Nu3QT-4zCDU9LvIvA
normal 1
INFO:dsq.worker:Executing normal(2)#yy44h7tcToe5yyTSUJ7dLw
normal 2
INFO:dsq.worker:Executing normal(3)#Hx3iau2MRW2xwwOFNinJIg
normal 3
INFO:dsq.worker:Executing normal(5)#DTDpF9xkSkaChwFURRCzDQ
normal 5
Delayed tasks¶
You can use eta
or delay
parameter to postpone task:
# tasks.py
import sys
import logging
import dsq
logging.basicConfig(level=logging.INFO)
manager = dsq.create_manager()
@manager.task(queue='normal')
def task(value):
print value
if __name__ == '__main__':
task.run_with(delay=30)(sys.argv[1])
You should use scheduler
command to queue such tasks:
$ python tasks.py boo
$ python tasks.py foo
$ date
Sun Jul 17 13:41:10 MSK 2016
$ dsq stat -t tasks
schedule 2
$ dsq schedule -t tasks
2016-07-17 13:41:32 normal {"args": ["boo"], "id": "qWbsEnu2SRyjwIXga35yqA", "name": "task"}
2016-07-17 13:41:34 normal {"args": ["foo"], "id": "xVm3OyWjQB2XDiskTsCN4w", "name": "task"}
# next command waits until all tasks will be scheduled
$ dsq scheduler -bt tasks
$ dsq stat -t tasks
normal 2
schedule 0
$ dsq queue -t tasks
{"args": ["boo"], "id": "qWbsEnu2SRyjwIXga35yqA", "name": "task"}
{"args": ["foo"], "id": "xVm3OyWjQB2XDiskTsCN4w", "name": "task"}
$ dsq worker -bt tasks normal
INFO:dsq.worker:Executing task(boo)#qWbsEnu2SRyjwIXga35yqA
boo
INFO:dsq.worker:Executing task(foo)#xVm3OyWjQB2XDiskTsCN4w
foo
Note
In production you need to start N workers and one scheduler to be able to process delayed tasks.
Task result¶
Provide keep_result
parameter to be able fetch task result later:
# tasks.py
import sys
import logging
import dsq
logging.basicConfig(level=logging.INFO)
manager = dsq.create_manager()
@manager.task(queue='normal', keep_result=600)
def div(a, b):
return a/b
if __name__ == '__main__':
result = div(int(sys.argv[1]), int(sys.argv[2]))
if result.ready(5):
if result.error:
print result.error, result.error_message
else:
print 'Result is: ', result.value
else:
print 'Result is not ready'
Process:
# start worker in background
$ dsq worker -t tasks normal &
[1] 6419
$ python tasks.py 10 2
INFO:dsq.worker:Executing div(10, 2)#6S_UlsECSxSddtluBLB6yQ
Result is: 5
$ python tasks.py 10 0
INFO:dsq.worker:Executing div(10, 0)#_WQxcUDYQH6ZtqfSe1-0-Q
ERROR:dsq.manager:Error during processing task div(10, 0)#_WQxcUDYQH6ZtqfSe1-0-Q
Traceback (most recent call last):
File "/home/bobrov/work/dsq/dsq/manager.py", line 242, in process
result = func(*args, **kwargs)
File "./tasks.py", line 11, in div
return a/b
ZeroDivisionError: integer division or modulo by zero
ZeroDivisionError integer division or modulo by zero
# kill worker
$ kill %1
[1]+ Done dsq worker -t tasks normal
$ python tasks.py 10 1
Result is not ready