.. _tutorial:

Tutorial
========

Install
-------

::

    $ pip install dsq


Also you need to start redis.


Register and push task
----------------------

Task is user defined function which actual execution can be
postponed by pushing name and arguments into some queue.
One can have multiple queues. Queues are created on the fly.

::

    # tasks.py
    import sys
    import logging
    import dsq

    # dsq does not init any logger by itself
    # and one must do it explicitly
    logging.basicConfig(level=logging.INFO)

    # using 127.0.0.1:6379/0 redis by default
    manager = dsq.create_manager()

    def task(value):
        print value

    # tasks should be registered so workers can execute them
    manager.register('my-task', task)

    if __name__ == '__main__':
        # put my-task into normal queue
        manager.push('normal', 'my-task', args=[sys.argv[1]])

Now run push by executing::

    $ python tasks.py Hello
    $ python tasks.py World

You can see queue size via ``stat`` command::

    $ dsq stat -t tasks
    normal	2
    schedule	0

Start worker for ``normal`` queue::

    $ dsq worker -b -t tasks normal
    INFO:dsq.worker:Executing task(Hello)#CLCKs0nNRQqC4TKVkwDFRw
    Hello
    INFO:dsq.worker:Executing task(World)#LjCRG7yiQIqVKms-QfhmGg
    World

``-b`` stops worker after queue is empty.


Task decorator
--------------

There is a shortcut to register tasks and push them via
:py:meth:`dsq.manager.Manager.task` decorator::

    # tasks.py
    import sys
    import logging
    import dsq

    logging.basicConfig(level=logging.INFO)
    manager = dsq.create_manager()

    @manager.task(queue='normal')
    def task(value):
        print value

    if __name__ == '__main__':
        task(sys.argv[1])


.. _queue-priorities:

Queue priorities
----------------

Worker queue list is prioritized. It processes tasks from a first queue, then
from a second if first is empty and so on::

    # tasks.py
    import logging
    import dsq

    logging.basicConfig(level=logging.INFO)
    manager = dsq.create_manager()

    @manager.task(queue='high')
    def high(value):
        print 'urgent', value

    @manager.task(queue='normal')
    def normal(value):
        print 'normal', value

    if __name__ == '__main__':
        normal(1)
        normal(2)
        normal(3)
        high(4)
        normal(5)
        high(6)

And processing::

    $ python tasks.py
    $ dsq stat -t tasks
    high	2
    normal	4
    schedule	0
    $ dsq worker -bt tasks high normal
    INFO:dsq.worker:Executing high(4)#w9RKVQ4oQoO9ivB8q198QA
    urgent 4
    INFO:dsq.worker:Executing high(6)#SEss1H0QQB2TAqLQjbBpmw
    urgent 6
    INFO:dsq.worker:Executing normal(1)#NY-e_Nu3QT-4zCDU9LvIvA
    normal 1
    INFO:dsq.worker:Executing normal(2)#yy44h7tcToe5yyTSUJ7dLw
    normal 2
    INFO:dsq.worker:Executing normal(3)#Hx3iau2MRW2xwwOFNinJIg
    normal 3
    INFO:dsq.worker:Executing normal(5)#DTDpF9xkSkaChwFURRCzDQ
    normal 5


.. _delayed-tasks:

Delayed tasks
-------------

You can use ``eta`` or ``delay`` parameter to postpone task::

    # tasks.py
    import sys
    import logging
    import dsq

    logging.basicConfig(level=logging.INFO)
    manager = dsq.create_manager()

    @manager.task(queue='normal')
    def task(value):
        print value

    if __name__ == '__main__':
        task.run_with(delay=30)(sys.argv[1])

You should use ``scheduler`` command to queue such tasks::

    $ python tasks.py boo
    $ python tasks.py foo
    $ date
    Sun Jul 17 13:41:10 MSK 2016
    $ dsq stat -t tasks
    schedule	2
    $ dsq schedule -t tasks
    2016-07-17 13:41:32	normal	{"args": ["boo"], "id": "qWbsEnu2SRyjwIXga35yqA", "name": "task"}
    2016-07-17 13:41:34	normal	{"args": ["foo"], "id": "xVm3OyWjQB2XDiskTsCN4w", "name": "task"}

    # next command waits until all tasks will be scheduled
    $ dsq scheduler -bt tasks
    $ dsq stat -t tasks
    normal	2
    schedule	0
    $ dsq queue -t tasks
    {"args": ["boo"], "id": "qWbsEnu2SRyjwIXga35yqA", "name": "task"}
    {"args": ["foo"], "id": "xVm3OyWjQB2XDiskTsCN4w", "name": "task"}
    $ dsq worker -bt tasks normal
    INFO:dsq.worker:Executing task(boo)#qWbsEnu2SRyjwIXga35yqA
    boo
    INFO:dsq.worker:Executing task(foo)#xVm3OyWjQB2XDiskTsCN4w
    foo

.. note::

    In production you need to start N workers and one scheduler to be able to
    process delayed tasks.


Task result
-----------

Provide ``keep_result`` parameter to be able fetch task result later::

    # tasks.py
    import sys
    import logging
    import dsq

    logging.basicConfig(level=logging.INFO)
    manager = dsq.create_manager()

    @manager.task(queue='normal', keep_result=600)
    def div(a, b):
        return a/b

    if __name__ == '__main__':
        result = div(int(sys.argv[1]), int(sys.argv[2]))
        if result.ready(5):
            if result.error:
                print result.error, result.error_message
            else:
                print 'Result is: ', result.value
        else:
            print 'Result is not ready'

Process::

    # start worker in background
    $ dsq worker -t tasks normal &
    [1] 6419
    $ python tasks.py 10 2
    INFO:dsq.worker:Executing div(10, 2)#6S_UlsECSxSddtluBLB6yQ
    Result is:  5
    $ python tasks.py 10 0
    INFO:dsq.worker:Executing div(10, 0)#_WQxcUDYQH6ZtqfSe1-0-Q
    ERROR:dsq.manager:Error during processing task div(10, 0)#_WQxcUDYQH6ZtqfSe1-0-Q
    Traceback (most recent call last):
      File "/home/bobrov/work/dsq/dsq/manager.py", line 242, in process
        result = func(*args, **kwargs)
      File "./tasks.py", line 11, in div
        return a/b
    ZeroDivisionError: integer division or modulo by zero
    ZeroDivisionError integer division or modulo by zero
    # kill worker
    $ kill %1
    [1]+  Done                    dsq worker -t tasks normal
    $ python tasks.py 10 1
    Result is not ready