Recovering from Catastrophes

What Might Go Wrong?

Sometimes bad things happen in the course of processing tasks. What might go wrong? How does zc.async handle these errors? What are your responsibilities?

First, what might go wrong?

  • zc.async could have a problem while polling for jobs. We’ll call this a “polling exception.”
  • zc.async could have a problem while performing a particular job. We’ll call this a “job-related exception.”

For the purpose of this discussion, we will omit the possibility that zc.async has a bug. That is certainly a possibility, but the recovery story is not predictable, and if we knew of a bug, we’d try to fix it, rather than discuss it here!

We’ll discuss both polling exceptions and job related exceptions, then drill down into some specific scenarios. This will illuminate how your code and zc.async’s can work together to handle them.

Polling Exceptions

Polling exceptions are, at least in theory, the least of your worries. You shouldn’t have to worry about them; and if you do, it is probably a basic configuration problem that you need to address, such as making sure that the dispatcher process has access to the needed databases and software; or making sure that the dispatcher process is run by a daemonizing software that will restart if needed, such as zdaemon (http://pypi.python.org/pypi/zdaemon) or supervisor (http://supervisord.org/).

zc.async is largely responsible for dealing with polling exceptions. What does it have to handle?

  • The process running the poll ends, perhaps in the middle of a poll.
  • zc.async cannot commit a transaction during the poll, for instance because of a ConflictError, or because the database is unavailable.

What needs to happen to handle these problems?

Process Ends while Polling

If the process ends, your daemonizing front-end (zdaemon, supervisor, etc.) needs to restart it. The ZODB will discard incomplete transaction data, if any.

The only thing a zc.async dispatcher needs to handle is clean up.

  • Ideally it will be able to deactivate its record in the ZODB during the process shutdown.
  • Instead, if it was a “hard crash” that didn’t allow deactivation, a sibling dispatcher will realize that the dispatcher is down and deactivate it.
  • Or, finally, if it was a hard crash without a sibling, and the daemon restarts a process for the original dispatcher instance, the new process needs to realize that the old process is dead, not competing with it.

Transaction Error while Polling

If the poll gets a conflict error, it should simply abort and retry the poll, forever, with a small back-off.

If the database goes away (perhaps the ZEO server goes down for a bit, and the ZEO client to which the dispatcher is connected is trying to reconnect) it should gracefully try to wait for the database to return, and resume when it does.

Other, more dramatic errors, such as POSKey errors, are generally considered to be out of zc.async’s domain and control. It should ideally continue to try to resume as long as the process is alive, in case somehow the situation improves, but this may be difficult and the expectations for zc.async’s recovery are lower than with ConflictErrors and ClientDisconnected errors.

Summary of Polling Exceptions

To repeat, then, polling exceptions have two basic scenarios.

If a dispatcher process ends, it needs to deactivate its record in the ZODB, or let another process know to deactivate it.

If a ZODB.POSException.ConflictError occurs, retry forever with a small backoff; or if ZEO.Exceptions.ClientDisconnected occurs, retry forever with a small backoff, waiting for the database to come back.

Most anything else will ideally keep zc.async attempting to re-poll, but it may not happen: expectations are lower.

Your Responsibilities

As the author of a zc.async job, your responsibilities, then, are to handle your own exceptions; and to make sure that the retry policy for each job is appropriate. This is controlled with an IRetryPolicy, as shown below.

As someone configuring a running dispatcher, you need to make sure that you give the dispatcher the necessary access to databases and software to perform your jobs, and you need to review (and rotate!) your logs.

zc.async’s Responsibilities

zc.async needs to have polling robust in the face of restarts, ConflictErrors and ClientDisconnected errors. It needs to give your code a chance to decide what to do in these circumstances, and log your errors.

Retry Policies

The rest of the document uses scenarios to illustrate how zc.async handles errors, and how you might want to configure retry policies.

What is a retry policy? It is used in three circumstances.

  • When the job starts but fails to complete because the system is interrupted, the job will try to call retry_policy.interrupted() to get a boolean as to whether the job should be retried.
  • When the code the job ran fails, the job will try to call retry_policy.jobError(failure, data_cache) to get a boolean as to whether the job should be retried.
  • When the commit fails, the job will try to call retry_policy.commitError(failure, data_cache) to get a boolean as to whether the job should be retried.

Why does this need to be a policy? Can’t it be a simpler arrangement?

The heart of the problem is that different jobs need different error resolutions.

In some cases, jobs may not be fully transactional. For instance, the job may be communicating with an external system, such as a credit card system. The retry policy here should typically be “never”: perhaps a callback should be in charge of determining what to do next.

If a job is fully transactional, it can be retried. But even then the desired behavior may differ.

  • In typical cases, some errors should simply cause a failure, while other errors, such as database conflict errors, should cause a limited number of retries.
  • In some jobs, conflict errors should be retried forever, because the job must be run to completion or else the system should fall over. Callbacks that try to handle errors themselves may take this approach, for instance.

zc.async currently ships with three retry policies.

  1. The default, appropriate for most fully transactional jobs, is the zc.async.job.RetryCommonFourTimes. This retries ZEO disconnects forever; and interrupts and transaction errors such as conflicts a set number of times.
  2. The other available (pre-written) option for transactional jobs is zc.async.job.RetryCommonForever. Callbacks will get this policy by default. This retries ZEO disconnects, transaction errors such as conflict errors, interrupts, and anything that happens during the job’s commit, forever.
  3. The last retry policy is zc.async.job.NeverRetry. This is appropriate for non-transactional jobs. You’ll still typically need to handle errors in your callbacks.

If you look at these, you will see that it is trivial to write your own, if desired.

Scenarios

We’ll examine polling error scenarios and job error scenarios.

  • Polling errors
    • The system is polling and gets a ConflictError.
    • The system is polling and gets a ClientDisconnected error.
  • Job errors
    • A worker process is working on a job with the default retry policy. The process dies gracefully and restarts.
    • Like the previous scenario, a worker process is working on a job with the default retry policy. The process crashes hard (does not die gracefully) and restarts.
    • Like the previous scenario, a worker process is working on a job with the default retry policy. The process crashes hard (does not die gracefully) and a sibling notices and takes over.
    • A worker process is working on a job with the default retry policy and gets an error during the job or the commit.

Scenarios: Polling Errors

ConflictError

A common place for a conflict error is with two dispatchers trying to claim the same job from the queue. This example will mimic that situation.

Imagine we have a full set up with a dispatcher, agent, and queue. [1] We’ll actually replace the agent’s chooser with one that behaves badly: it blocks, waiting for our lock.

>>> import threading
>>> lock1 = threading.Lock()
>>> lock2 = threading.Lock()
>>> lock1.acquire()
True
>>> lock2.acquire()
True
>>> def acquireLockAndChooseFirst(agent):
...     res = agent.queue.claim()
...     if res is not None:
...         lock2.release()
...         lock1.acquire()
...     return res
...
>>> import zc.async.instanceuuid
>>> import zc.async.interfaces
>>> import zc.async.testing
>>> import zc.async.dispatcher
>>> import pprint
>>> dispatcher = zc.async.dispatcher.get()
>>> pprint.pprint(zc.async.testing.get_poll(dispatcher, 0))
{'': {'main': {'active jobs': [],
               'error': None,
               'len': 0,
               'new jobs': [],
               'size': 3}}}
>>> import transaction
>>> _ = transaction.begin()
>>> queues = root[zc.async.interfaces.KEY]
>>> queue = queues['']
>>> da = queue.dispatchers[zc.async.instanceuuid.UUID]
>>> agent = da['main']
>>> agent.chooser = acquireLockAndChooseFirst
>>> def returnSomething():
...     return 42
...
>>> job = queue.put(returnSomething)
>>> transaction.commit()

Now, when the agent tries to get our job, we’ll start and commit another transaction that removes it from the queue. This will generate a conflict error for the poll’s thread and transaction, because it cannot also remove the same job.

>>> lock2.acquire()
True
>>> _ = transaction.begin()
>>> job is queue.pull()
True
>>> transaction.commit()
>>> lock1.release()

However, the ConflictError is handled, and polling continues.

>>> _ = transaction.begin()
>>> import zc.async.agent
>>> agent.chooser = zc.async.agent.chooseFirst
>>> transaction.commit()
>>> import zc.async.dispatcher
>>> dispatcher = zc.async.dispatcher.get()
>>> import zc.async.testing
>>> pprint.pprint(zc.async.testing.get_poll(dispatcher))
{'': {'main': {'active jobs': [],
               'error': None,
               'len': 0,
               'new jobs': [],
               'size': 3}}}

And if we put the job back, it will be performed.

>>> job is queue.put(job)
True
>>> transaction.commit()
>>> zc.async.testing.wait_for_result(job)
42

Client Disconnected

The story is very similar if the ZEO connection goes away for a while. We’ll mimic a ZEO ClientDisconnected error by monkeypatching transaction.TranasctionManager.commit.

>>> lock1.locked()
True
>>> lock2.locked()
True
>>> agent.chooser = acquireLockAndChooseFirst
>>> job = queue.put(returnSomething)
>>> transaction.commit()
>>> lock2.acquire()
True
>>> import ZEO.Exceptions
>>> def commit(self):
...     raise ZEO.Exceptions.ClientDisconnected()
...
>>> import transaction
>>> old_commit = transaction.TransactionManager.commit
>>> transaction.TransactionManager.commit = commit
>>> import time
>>> sleep_requests = []
>>> def sleep(i):
...     sleep_requests.append(i)
...
>>> old_sleep = time.sleep
>>> time.sleep = sleep
>>> agent.chooser = zc.async.agent.chooseFirst
>>> transaction.commit()
>>> lock1.release()
>>> info = zc.async.testing.get_poll(dispatcher)['']['main']
>>> len(info['active jobs'] + info['new jobs'])
1
>>> transaction.TransactionManager.commit = old_commit
>>> zc.async.testing.wait_for_result(job)
42
>>> bool(sleep_requests)
True

Here’s another variant that mimics being unable to read the storage during a poll, and then recuperating.

>>> error_raised = False
>>> def raiseDisconnectedThenChooseFirst(agent):
...     global error_raised
...     if not error_raised:
...         error_raised = True
...         raise ZEO.Exceptions.ClientDisconnected()
...     return agent.queue.claim()
>>> agent.chooser = raiseDisconnectedThenChooseFirst
>>> def returnSomething():
...     return 42
...
>>> job = queue.put(returnSomething)
>>> transaction.commit()
>>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) # doctest: +ELLIPSIS
{'': {'main': {'active jobs': [],
               'error': <zc.twist.Failure ...ClientDisconnected>,
               'len': 0,
               'new jobs': [],
               'size': 3}}}
>>> zc.async.testing.wait_for_result(job)
42