Sometimes bad things happen in the course of processing tasks. What might go wrong? How does zc.async handle these errors? What are your responsibilities?
First, what might go wrong?
For the purpose of this discussion, we will omit the possibility that zc.async has a bug. That is certainly a possibility, but the recovery story is not predictable, and if we knew of a bug, we’d try to fix it, rather than discuss it here!
We’ll discuss both polling exceptions and job related exceptions, then drill down into some specific scenarios. This will illuminate how your code and zc.async’s can work together to handle them.
Polling exceptions are, at least in theory, the least of your worries. You shouldn’t have to worry about them; and if you do, it is probably a basic configuration problem that you need to address, such as making sure that the dispatcher process has access to the needed databases and software; or making sure that the dispatcher process is run by a daemonizing software that will restart if needed, such as zdaemon (http://pypi.python.org/pypi/zdaemon) or supervisor (http://supervisord.org/).
zc.async is largely responsible for dealing with polling exceptions. What does it have to handle?
What needs to happen to handle these problems?
If the process ends, your daemonizing front-end (zdaemon, supervisor, etc.) needs to restart it. The ZODB will discard incomplete transaction data, if any.
The only thing a zc.async dispatcher needs to handle is clean up.
If the poll gets a conflict error, it should simply abort and retry the poll, forever, with a small back-off.
If the database goes away (perhaps the ZEO server goes down for a bit, and the ZEO client to which the dispatcher is connected is trying to reconnect) it should gracefully try to wait for the database to return, and resume when it does.
Other, more dramatic errors, such as POSKey errors, are generally considered to be out of zc.async’s domain and control. It should ideally continue to try to resume as long as the process is alive, in case somehow the situation improves, but this may be difficult and the expectations for zc.async’s recovery are lower than with ConflictErrors and ClientDisconnected errors.
To repeat, then, polling exceptions have two basic scenarios.
If a dispatcher process ends, it needs to deactivate its record in the ZODB, or let another process know to deactivate it.
If a ZODB.POSException.ConflictError occurs, retry forever with a small backoff; or if ZEO.Exceptions.ClientDisconnected occurs, retry forever with a small backoff, waiting for the database to come back.
Most anything else will ideally keep zc.async attempting to re-poll, but it may not happen: expectations are lower.
As the author of a zc.async job, your responsibilities, then, are to handle your own exceptions; and to make sure that the retry policy for each job is appropriate. This is controlled with an IRetryPolicy, as shown below.
As someone configuring a running dispatcher, you need to make sure that you give the dispatcher the necessary access to databases and software to perform your jobs, and you need to review (and rotate!) your logs.
zc.async needs to have polling robust in the face of restarts, ConflictErrors and ClientDisconnected errors. It needs to give your code a chance to decide what to do in these circumstances, and log your errors.
The rest of the document uses scenarios to illustrate how zc.async handles errors, and how you might want to configure retry policies.
What is a retry policy? It is used in three circumstances.
Why does this need to be a policy? Can’t it be a simpler arrangement?
The heart of the problem is that different jobs need different error resolutions.
In some cases, jobs may not be fully transactional. For instance, the job may be communicating with an external system, such as a credit card system. The retry policy here should typically be “never”: perhaps a callback should be in charge of determining what to do next.
If a job is fully transactional, it can be retried. But even then the desired behavior may differ.
zc.async currently ships with three retry policies.
If you look at these, you will see that it is trivial to write your own, if desired.
We’ll examine polling error scenarios and job error scenarios.
A common place for a conflict error is with two dispatchers trying to claim the same job from the queue. This example will mimic that situation.
Imagine we have a full set up with a dispatcher, agent, and queue. [1] We’ll actually replace the agent’s chooser with one that behaves badly: it blocks, waiting for our lock.
>>> import threading
>>> lock1 = threading.Lock()
>>> lock2 = threading.Lock()
>>> lock1.acquire()
True
>>> lock2.acquire()
True
>>> def acquireLockAndChooseFirst(agent):
... res = agent.queue.claim()
... if res is not None:
... lock2.release()
... lock1.acquire()
... return res
...
>>> import zc.async.instanceuuid
>>> import zc.async.interfaces
>>> import zc.async.testing
>>> import zc.async.dispatcher
>>> import pprint
>>> dispatcher = zc.async.dispatcher.get()
>>> pprint.pprint(zc.async.testing.get_poll(dispatcher, 0))
{'': {'main': {'active jobs': [],
'error': None,
'len': 0,
'new jobs': [],
'size': 3}}}
>>> import transaction
>>> _ = transaction.begin()
>>> queues = root[zc.async.interfaces.KEY]
>>> queue = queues['']
>>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
>>> agent = da['main']
>>> agent.chooser = acquireLockAndChooseFirst
>>> def returnSomething():
... return 42
...
>>> job = queue.put(returnSomething)
>>> transaction.commit()
Now, when the agent tries to get our job, we’ll start and commit another transaction that removes it from the queue. This will generate a conflict error for the poll’s thread and transaction, because it cannot also remove the same job.
>>> lock2.acquire()
True
>>> _ = transaction.begin()
>>> job is queue.pull()
True
>>> transaction.commit()
>>> lock1.release()
However, the ConflictError is handled, and polling continues.
>>> _ = transaction.begin()
>>> import zc.async.agent
>>> agent.chooser = zc.async.agent.chooseFirst
>>> transaction.commit()
>>> import zc.async.dispatcher
>>> dispatcher = zc.async.dispatcher.get()
>>> import zc.async.testing
>>> pprint.pprint(zc.async.testing.get_poll(dispatcher))
{'': {'main': {'active jobs': [],
'error': None,
'len': 0,
'new jobs': [],
'size': 3}}}
And if we put the job back, it will be performed.
>>> job is queue.put(job)
True
>>> transaction.commit()
>>> zc.async.testing.wait_for_result(job)
42
The story is very similar if the ZEO connection goes away for a while. We’ll mimic a ZEO ClientDisconnected error by monkeypatching transaction.TranasctionManager.commit.
>>> lock1.locked() True >>> lock2.locked() True>>> agent.chooser = acquireLockAndChooseFirst >>> job = queue.put(returnSomething) >>> transaction.commit()>>> lock2.acquire() True >>> import ZEO.Exceptions >>> def commit(self): ... raise ZEO.Exceptions.ClientDisconnected() ... >>> import transaction >>> old_commit = transaction.TransactionManager.commit >>> transaction.TransactionManager.commit = commit >>> import time >>> sleep_requests = [] >>> def sleep(i): ... sleep_requests.append(i) ... >>> old_sleep = time.sleep >>> time.sleep = sleep >>> agent.chooser = zc.async.agent.chooseFirst >>> transaction.commit() >>> lock1.release() >>> info = zc.async.testing.get_poll(dispatcher)['']['main'] >>> len(info['active jobs'] + info['new jobs']) 1 >>> transaction.TransactionManager.commit = old_commit >>> zc.async.testing.wait_for_result(job) 42 >>> bool(sleep_requests) True
Here’s another variant that mimics being unable to read the storage during a poll, and then recuperating.
>>> error_raised = False
>>> def raiseDisconnectedThenChooseFirst(agent):
... global error_raised
... if not error_raised:
... error_raised = True
... raise ZEO.Exceptions.ClientDisconnected()
... return agent.queue.claim()
>>> agent.chooser = raiseDisconnectedThenChooseFirst
>>> def returnSomething():
... return 42
...
>>> job = queue.put(returnSomething)
>>> transaction.commit()
>>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) # doctest: +ELLIPSIS
{'': {'main': {'active jobs': [],
'error': <zc.twist.Failure ...ClientDisconnected>,
'len': 0,
'new jobs': [],
'size': 3}}}
>>> zc.async.testing.wait_for_result(job)
42