.. _recovering-from-catastrophes: Recovering from Catastrophes ============================ -------------------- What Might Go Wrong? -------------------- Sometimes bad things happen in the course of processing tasks. What might go wrong? How does zc.async handle these errors? What are your responsibilities? First, what might go wrong? - zc.async could have a problem while polling for jobs. We'll call this a "polling exception." - zc.async could have a problem while performing a particular job. We'll call this a "job-related exception." For the purpose of this discussion, we will omit the possibility that zc.async has a bug. That is certainly a possibility, but the recovery story is not predictable, and if we knew of a bug, we'd try to fix it, rather than discuss it here! We'll discuss both polling exceptions and job related exceptions, then drill down into some specific scenarios. This will illuminate how your code and zc.async's can work together to handle them. Polling Exceptions ------------------ Polling exceptions are, at least in theory, the least of your worries. You shouldn't have to worry about them; and if you do, it is probably a basic configuration problem that you need to address, such as making sure that the dispatcher process has access to the needed databases and software; or making sure that the dispatcher process is run by a daemonizing software that will restart if needed, such as zdaemon (http://pypi.python.org/pypi/zdaemon) or supervisor (http://supervisord.org/). zc.async is largely responsible for dealing with polling exceptions. What does it have to handle? - The process running the poll ends, perhaps in the middle of a poll. - zc.async cannot commit a transaction during the poll, for instance because of a ConflictError, or because the database is unavailable. What needs to happen to handle these problems? Process Ends while Polling .......................... If the process ends, your daemonizing front-end (zdaemon, supervisor, etc.) needs to restart it. The ZODB will discard incomplete transaction data, if any. The only thing a zc.async dispatcher needs to handle is clean up. - Ideally it will be able to deactivate its record in the ZODB during the process shutdown. - Instead, if it was a "hard crash" that didn't allow deactivation, a sibling dispatcher will realize that the dispatcher is down and deactivate it. - Or, finally, if it was a hard crash without a sibling, and the daemon restarts a process for the original dispatcher instance, the new process needs to realize that the old process is dead, not competing with it. Transaction Error while Polling ............................... If the poll gets a conflict error, it should simply abort and retry the poll, forever, with a small back-off. If the database goes away (perhaps the ZEO server goes down for a bit, and the ZEO client to which the dispatcher is connected is trying to reconnect) it should gracefully try to wait for the database to return, and resume when it does. Other, more dramatic errors, such as POSKey errors, are generally considered to be out of zc.async's domain and control. It should ideally continue to try to resume as long as the process is alive, in case somehow the situation improves, but this may be difficult and the expectations for zc.async's recovery are lower than with ConflictErrors and ClientDisconnected errors. Summary of Polling Exceptions ............................. To repeat, then, polling exceptions have two basic scenarios. If a dispatcher process ends, it needs to deactivate its record in the ZODB, or let another process know to deactivate it. If a ZODB.POSException.ConflictError occurs, retry forever with a small backoff; or if ZEO.Exceptions.ClientDisconnected occurs, retry forever with a small backoff, waiting for the database to come back. Most anything else will ideally keep zc.async attempting to re-poll, but it may not happen: expectations are lower. Job-Related Exceptions ---------------------- What about job-related exceptions? Responsibility for handling job-related exceptions is shared between your code and zc.async's. What might happen? - Your job might fail internally. - The process running your task ends before completing your task. - zc.async cannot commit a transaction after your task completes, for instance because of a ConflictError, or because the database is unavailable. What should occur to handle these problems? Job Fails ......... As discussed elsewhere, if your job fails in your own code, this is mostly your responsibility. You should handle possible errors both within your job's code, and in callbacks, as appropriate. The other tool at your disposal for this situation, as with others below, is a retry policy. Retry policies let you determine what zc.async should do when your job fails. The default retry policy for job failures (as well as commit failures, below) is that transaction errors, such as conflict errors, are retried five times, and a ZEO ClientDisconnected error is retried forever with a backoff. You can customize these. Other than supporting these tools, zc.async's only other responsibilities are to report. By default, zc.async will log a failure of a job entered in a queue at the "ERROR" level in the ``zc.async.events`` log, and it will log a failure of a callback or other internal job at the "CRITICAL" level. This can be controlled per-process and per-job, as we'll see below. These tracebacks include information about the local and global variables for each frame in the stack, which can be useful to deduce the problem that occurred. zc.async also includes a ``Failure`` object on the job as a result, to let you react to the problem in a callback, and analyze it later. This is discussed in detail in other documents. Process Ends During Job ....................... If a process ends while it is performing a job, that is similar, in large part, to the possibility of the process ending the polling job: we need to restart the process, and realize that we had started the job. But should we restart the job, or abort it? Answering this question is a matter of policy, and requires knowing what each job does. Generally, if a job is fully transactional, such as writing something to the ZODB, and the job has not timed out yet, you'll want to restart it. You might want to restart some reasonably large number of times, and then suspect that, since you can't seem to finish the job, maybe the job is causing the process to die, and you should abort. Or perhaps you want to restart for ever. If the job isn't transactional, such as communicating with an external service, you might want to abort the job, and set up some callbacks to handle the fallout. As we'll see below, zc.async defaults to guessing that jobs placed directly in a queue are transactional, and can be tried up to ten times; and that jobs used as callbacks are also transactional, and can be tried until they succeed. The defaults can be changed and the behavior of an individual job can be changed. These settings are controlled with a RetryPolicy, discussed below. Transaction Error During Job ............................ Handling transaction errors after processing a job is also similar to the handling of transaction errors for polling exceptions. ConflictErrors and ClientDisconnected errors should often cause jobs to be aborted and restarted. However, if the job is not transactional, such as communicating with an external service, a simple abort and retry may be hazardous. Also, many jobs should be stopped if they retry on ConflictError more than some number of times--a heuristic bellweather--with the logic that they may be simply doing something too problematic, and they are blocking other tasks from starting. But other jobs should be retried until they complete. As mentioned above, zc.async defaults to guessing that jobs are transactional. Client Disconnected errors are retried forever, with a small backoff. Jobs placed in a queue retry transaction errors, such as ConflictErrors, four times, while callbacks retry them forever. The defaults can be changed and the behavior of an individual job can be changed, using the RetryPolicy described below. Summary of Job-Related Exceptions ................................. If an exception occurs in your job's code, zc.async will log it as an ERROR if a main queue job and as CRITICAL if it is a callback; and it will make the result of the call a ``Failure`` with error information, as shown elsewhere. Everything else is your responsibility, to be handled with try:except or try:finally blocks in your code, callbacks, or custom RetryPolicies. Process death, conflict errors, and ``ClientDisconnected`` errors all may need to be handled differently for different jobs. zc.async has a default policy for jobs placed in a queue, and for callback jobs. The default policy, a RetryPolicy, can be changed and can be set explicitly per-job. Your Responsibilities --------------------- As the author of a zc.async job, your responsibilities, then, are to handle your own exceptions; and to make sure that the retry policy for each job is appropriate. This is controlled with an IRetryPolicy, as shown below. As someone configuring a running dispatcher, you need to make sure that you give the dispatcher the necessary access to databases and software to perform your jobs, and you need to review (and rotate!) your logs. zc.async's Responsibilities --------------------------- zc.async needs to have polling robust in the face of restarts, ConflictErrors and ClientDisconnected errors. It needs to give your code a chance to decide what to do in these circumstances, and log your errors. Retry Policies -------------- The rest of the document uses scenarios to illustrate how zc.async handles errors, and how you might want to configure retry policies. What is a retry policy? It is used in three circumstances. - When the job starts but fails to complete because the system is interrupted, the job will try to call ``retry_policy.interrupted()`` to get a boolean as to whether the job should be retried. - When the code the job ran fails, the job will try to call ``retry_policy.jobError(failure, data_cache)`` to get a boolean as to whether the job should be retried. - When the commit fails, the job will try to call ``retry_policy.commitError(failure, data_cache)`` to get a boolean as to whether the job should be retried. Why does this need to be a policy? Can't it be a simpler arrangement? The heart of the problem is that different jobs need different error resolutions. In some cases, jobs may not be fully transactional. For instance, the job may be communicating with an external system, such as a credit card system. The retry policy here should typically be "never": perhaps a callback should be in charge of determining what to do next. If a job is fully transactional, it can be retried. But even then the desired behavior may differ. - In typical cases, some errors should simply cause a failure, while other errors, such as database conflict errors, should cause a limited number of retries. - In some jobs, conflict errors should be retried forever, because the job must be run to completion or else the system should fall over. Callbacks that try to handle errors themselves may take this approach, for instance. zc.async currently ships with three retry policies. 1. The default, appropriate for most fully transactional jobs, is the zc.async.job.RetryCommonFourTimes. This retries ZEO disconnects forever; and interrupts and transaction errors such as conflicts a set number of times. 2. The other available (pre-written) option for transactional jobs is zc.async.job.RetryCommonForever. Callbacks will get this policy by default. This retries ZEO disconnects, transaction errors such as conflict errors, interrupts, and *anything* that happens during the job's commit, forever. 3. The last retry policy is zc.async.job.NeverRetry. This is appropriate for non-transactional jobs. You'll still typically need to handle errors in your callbacks. If you look at these, you will see that it is trivial to write your own, if desired. Scenarios --------- We'll examine polling error scenarios and job error scenarios. - Polling errors * The system is polling and gets a ConflictError. * The system is polling and gets a ClientDisconnected error. - Job errors * A worker process is working on a job with the default retry policy. The process dies gracefully and restarts. * Like the previous scenario, a worker process is working on a job with the default retry policy. The process crashes hard (does not die gracefully) and restarts. * Like the previous scenario, a worker process is working on a job with the default retry policy. The process crashes hard (does not die gracefully) and a sibling notices and takes over. * A worker process is working on a job with the default retry policy and gets an error during the job or the commit. ------------------------- Scenarios: Polling Errors ------------------------- ConflictError ------------- A common place for a conflict error is with two dispatchers trying to claim the same job from the queue. This example will mimic that situation. Imagine we have a full set up with a dispatcher, agent, and queue. [#setUp]_ We'll actually replace the agent's chooser with one that behaves badly: it blocks, waiting for our lock. >>> import threading >>> lock1 = threading.Lock() >>> lock2 = threading.Lock() >>> lock1.acquire() True >>> lock2.acquire() True >>> def acquireLockAndChooseFirst(agent): ... res = agent.queue.claim() ... if res is not None: ... lock2.release() ... lock1.acquire() ... return res ... >>> import zc.async.instanceuuid >>> import zc.async.interfaces >>> import zc.async.testing >>> import zc.async.dispatcher >>> import pprint >>> dispatcher = zc.async.dispatcher.get() >>> pprint.pprint(zc.async.testing.get_poll(dispatcher, 0)) {'': {'main': {'active jobs': [], 'error': None, 'len': 0, 'new jobs': [], 'size': 3}}} >>> import transaction >>> _ = transaction.begin() >>> queues = root[zc.async.interfaces.KEY] >>> queue = queues[''] >>> da = queue.dispatchers[zc.async.instanceuuid.UUID] >>> agent = da['main'] >>> agent.chooser = acquireLockAndChooseFirst >>> def returnSomething(): ... return 42 ... >>> job = queue.put(returnSomething) >>> transaction.commit() Now, when the agent tries to get our job, we'll start and commit another transaction that removes it from the queue. This will generate a conflict error for the poll's thread and transaction, because it cannot also remove the same job. >>> lock2.acquire() True >>> _ = transaction.begin() >>> job is queue.pull() True >>> transaction.commit() >>> lock1.release() However, the ConflictError is handled, and polling continues. >>> _ = transaction.begin() >>> import zc.async.agent >>> agent.chooser = zc.async.agent.chooseFirst >>> transaction.commit() >>> import zc.async.dispatcher >>> dispatcher = zc.async.dispatcher.get() >>> import zc.async.testing >>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) {'': {'main': {'active jobs': [], 'error': None, 'len': 0, 'new jobs': [], 'size': 3}}} And if we put the job back, it will be performed. >>> job is queue.put(job) True >>> transaction.commit() >>> zc.async.testing.wait_for_result(job) 42 Client Disconnected ------------------- The story is very similar if the ZEO connection goes away for a while. We'll mimic a ZEO ClientDisconnected error by monkeypatching transaction.TranasctionManager.commit. >>> lock1.locked() True >>> lock2.locked() True >>> agent.chooser = acquireLockAndChooseFirst >>> job = queue.put(returnSomething) >>> transaction.commit() >>> lock2.acquire() True >>> import ZEO.Exceptions >>> def commit(self): ... raise ZEO.Exceptions.ClientDisconnected() ... >>> import transaction >>> old_commit = transaction.TransactionManager.commit >>> transaction.TransactionManager.commit = commit >>> import time >>> sleep_requests = [] >>> def sleep(i): ... sleep_requests.append(i) ... >>> old_sleep = time.sleep >>> time.sleep = sleep >>> agent.chooser = zc.async.agent.chooseFirst >>> transaction.commit() >>> lock1.release() >>> info = zc.async.testing.get_poll(dispatcher)['']['main'] >>> len(info['active jobs'] + info['new jobs']) 1 >>> transaction.TransactionManager.commit = old_commit >>> zc.async.testing.wait_for_result(job) 42 >>> bool(sleep_requests) True Here's another variant that mimics being unable to read the storage during a poll, and then recuperating. >>> error_raised = False >>> def raiseDisconnectedThenChooseFirst(agent): ... global error_raised ... if not error_raised: ... error_raised = True ... raise ZEO.Exceptions.ClientDisconnected() ... return agent.queue.claim() >>> agent.chooser = raiseDisconnectedThenChooseFirst >>> def returnSomething(): ... return 42 ... >>> job = queue.put(returnSomething) >>> transaction.commit() >>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) # doctest: +ELLIPSIS {'': {'main': {'active jobs': [], 'error': , 'len': 0, 'new jobs': [], 'size': 3}}} >>> zc.async.testing.wait_for_result(job) 42 ----------------------------- Scenarios: Job-Related Errors ----------------------------- Graceful Shutdown During Job ---------------------------- First let's consider how a failed job with a callback or two is handled when the dispatcher dies. Here we start a job. >>> import zope.component >>> import transaction >>> import zc.async.interfaces >>> import zc.async.testing >>> import zc.async.dispatcher >>> queue = root[zc.async.interfaces.KEY][''] >>> lock = threading.Lock() >>> lock.acquire() True >>> fail_flag = True >>> def wait_for_me(): ... global fail_flag ... if fail_flag: ... fail_flag = False ... lock.acquire() ... lock.release() # so we can use the same lock again later ... raise SystemExit() # this will cause the worker thread to exit ... else: ... return 42 ... >>> def handle_result(result): ... return 'I got result %r' % (result,) ... >>> job = queue.put(wait_for_me) >>> callback_job = job.addCallback(handle_result) >>> transaction.commit() >>> dispatcher = zc.async.dispatcher.get() >>> poll = zc.async.testing.get_poll(dispatcher) >>> zc.async.testing.wait_for_start(job) In this scenario, ``wait_for_me`` is a job that, the first time it is run, will "unexpectedly" be lost while the dispatcher stops working. ``handle_result`` will simply show us that callbacks will be called successfully. The job has started. Now, the dispatcher suddenly dies without the thread performing ``wait_for_me`` getting a chance to finish. For our first example, let's give the dispatcher a graceful exit. The dispatcher gets a chance to clean up its dispatcher agents, and job.handleInterrupt() goes into the queue. >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop) >>> zc.async.testing.wait_for_deactivation(dispatcher) >>> _ = transaction.begin() >>> job.status == zc.async.interfaces.ACTIVE True >>> len(queue) 1 >>> interrupt_job = queue[0] >>> interrupt_job # doctest: +ELLIPSIS >>> queue[0].callable # doctest: +ELLIPSIS > Now when the process starts back up again, ``handleInterrupt`` checks with the default retry policy as to what should be done. It requests that the job be retried. It's put back in the queue, and it is called again normally. >>> old_dispatcher = dispatcher >>> zc.async.dispatcher.clear() >>> zc.async.subscribers.ThreadedDispatcherInstaller( ... poll_interval=0.1)(zc.async.interfaces.DatabaseOpened(db)) >>> dispatcher = zc.async.dispatcher.get() >>> zc.async.testing.wait_for_result(interrupt_job) Now we need to wait for the job. >>> zc.async.testing.wait_for_result(job) 42 >>> callback_job.status == zc.async.interfaces.COMPLETED True >>> callback_job.result 'I got result 42' The job now has a retry policy with some currently non-interface values that are still worth showing here. >>> policy = job.getRetryPolicy() >>> policy.data.get('interruptions') 1 This shows that the policy registered one interruption. [#cleanup1]_ Hard Crash During Job --------------------- Our next catastrophe only changes one aspect to the previous one: the dispatcher does not stop gracefully, and does not have a chance to clean up its active jobs. It is a "hard" crash. To show this, we will start a job, simulate the dispatcher dying "hard," and restart it so it clean up. So, first we start a long-running job in the dispatcher. >>> lock.acquire() True >>> fail_flag = True >>> job = queue.put(wait_for_me) >>> callback_job = job.addCallback(handle_result) >>> transaction.commit() >>> dispatcher = zc.async.dispatcher.get() >>> poll = zc.async.testing.get_poll(dispatcher) >>> zc.async.testing.wait_for_start(job) Now we'll "crash" the dispatcher. >>> dispatcher.activated = False # this will make polling stop, without ... # cleanup >>> dispatcher.reactor.callFromThread(dispatcher.reactor.crash) >>> dispatcher.thread.join(3) Hard crashes can be detected because the dispatchers write datetimes to the database every few polls. A given dispatcher instance does this for each queue on a ``DispatcherAgents`` object available in ``queue.dispatchers[UUID]``, where ``UUID`` is the uuid of that dispatcher. The ``DispatcherAgents`` object has four pertinent attributes: ``ping_interval``, ``ping_death_interval``, ``last_ping.value``, and ``dead``. About every ``ping_interval`` (a ``datetime.timedelta``), the dispatcher is supposed to write a ``datetime`` to ``last_ping.value``. If the ``last_ping.value`` plus the ``ping_death_interval`` (also a ``timedelta``) is older than now, the dispatcher is considered to be ``dead``, and old jobs should be cleaned up. The ``ping_interval`` defaults to 30 seconds, and the ``ping_death_interval`` defaults to 60 seconds. Generally, the ``ping_death_interval`` should be at least two or three poll intervals (``zc.async.dispatcher.get().poll_interval``) greater than the ``ping_interval``. The ping hasn't timed out yet, so the dispatcher isn't considered dead yet. >>> _ = transaction.begin() >>> import zc.async.instanceuuid >>> da = queue.dispatchers[zc.async.instanceuuid.UUID] >>> da.ping_death_interval datetime.timedelta(0, 60) >>> da.ping_interval datetime.timedelta(0, 30) >>> bool(da.activated) True >>> da.dead False Therefore, the job is still sitting around in the dispatcher's pile in the database (the ``main`` key is for the ``main`` agent installed in this dispatcher in the set up for these examples). >>> job in da['main'] True >>> job.status == zc.async.interfaces.ACTIVE True Let's start our dispatcher up again. >>> old_dispatcher = dispatcher >>> zc.async.dispatcher.clear() >>> zc.async.subscribers.ThreadedDispatcherInstaller( ... poll_interval=0.1)(zc.async.interfaces.DatabaseOpened(db)) >>> dispatcher = zc.async.dispatcher.get() Initially, it's going to be a bit confused, because it sees that the DispatcherAgents object is ``activated``, and not ``dead``. It can't tell if there's another process using its same UUID, or if it is looking at the result of a hard crash. >>> zc.async.testing.wait_for_result(job, seconds=1) Traceback (most recent call last): ... AssertionError: job never completed >>> zc.async.testing.get_poll(dispatcher, seconds=1) {'': None} >>> for r in reversed(event_logs.records): ... if r.levelname == 'ERROR': ... break ... else: ... assert False, 'did not find log' ... >>> print r.getMessage() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE UUID ... already activated in queue (oid 4): another process? (To stop poll attempts in this process, set ``zc.async.dispatcher.get().activated = False``. To stop polls permanently, don't start a zc.async.dispatcher!) To speed up the realization of our dispatcher that the previous activation is ``dead``, we'll set the ping_death_interval to just one second. >>> _ = transaction.begin() >>> da.dead False >>> job in da['main'] True >>> len(queue) 0 >>> import datetime >>> da.ping_death_interval = datetime.timedelta(seconds=1) >>> transaction.commit() >>> zc.async.testing.wait_for_death(da) >>> bool(da.activated) True After the next poll, the dispatcher will have cleaned up its old tasks in the same way we saw in the previous example. The job's ``handleInterrupt`` method will be called, and the job will be put back in the queue to be retried. The DispatcherAgents object is no longer dead, because it is tied to the new instance of the dispatcher. >>> poll = zc.async.testing.get_poll(dispatcher) >>> t = transaction.begin() >>> da.ping_death_interval = datetime.timedelta(seconds=60) >>> transaction.commit() >>> from zc.async.testing import time_sleep >>> def wait_for_pending(job): ... for i in range(600): ... t = transaction.begin() ... if job.status in (zc.async.interfaces.PENDING): ... break ... time_sleep(0.01) ... else: ... assert False, 'job never pending: ' + str(job.status) ... >>> wait_for_pending(job) >>> job in da['main'] False >>> bool(da.activated) True >>> da.dead False >>> queue[0] is job True Now we need to wait for the job. >>> zc.async.testing.wait_for_result(job) 42 >>> callback_job.status == zc.async.interfaces.COMPLETED True >>> callback_job.result 'I got result 42' >>> policy = job.getRetryPolicy() >>> policy.data.get('interruptions') 1 The dispatcher cleaned up its own "hard" crash. [#cleanup1]_ .. _hard-crash-with-sibling-recovery: Hard Crash During Job with Sibling Recovery ------------------------------------------- Our next catastrophe is the same as the one before, except, after one dispatcher's hard crash, another dispatcher is around to clean up the dead jobs. To show this, we will start a job, start a second dispatcher, simulate the first dispatcher dying "hard," and watch the second dispatcher clean up after the first one. So, first we start a long-running job in the dispatcher as before. >>> lock.acquire() True >>> fail_flag = True >>> job = queue.put(wait_for_me) >>> callback_job = job.addCallback(handle_result) >>> transaction.commit() >>> dispatcher = zc.async.dispatcher.get() >>> poll = zc.async.testing.get_poll(dispatcher) >>> zc.async.testing.wait_for_start(job, seconds=30) # XXX not sure why so long Now we'll start up an alternate dispatcher. >>> import uuid >>> alt_uuid = uuid.uuid1() >>> zc.async.subscribers.ThreadedDispatcherInstaller( ... poll_interval=0.5, uuid=alt_uuid)( ... zc.async.interfaces.DatabaseOpened(db)) >>> alt_dispatcher = zc.async.dispatcher.get(alt_uuid) Now we'll "crash" the dispatcher. >>> dispatcher.activated = False # this will make polling stop, without ... # cleanup >>> dispatcher.reactor.callFromThread(dispatcher.reactor.crash) >>> dispatcher.thread.join(3) >>> dispatcher.thread.isAlive() False As discussed in the previous example, the polling hasn't timed out yet, so the alternate dispatcher can't know that the first one is dead. Therefore, the job is still sitting around in the old dispatcher's pile in the database. >>> _ = transaction.begin() >>> bool(da.activated) True >>> da.dead False >>> job.status == zc.async.interfaces.ACTIVE True >>> alt_poll_1 = zc.async.testing.get_poll(alt_dispatcher) >>> _ = transaction.begin() >>> job in da['main'] True >>> bool(da.activated) True >>> da.dead False >>> alt_poll_2 = zc.async.testing.get_poll(alt_dispatcher) >>> _ = transaction.begin() >>> job in da['main'] True >>> bool(da.activated) True >>> da.dead False Above, the ping_death_interval was returned to the default of 60 seconds. To speed up the realization of our second dispatcher that the first one is dead, we'll set the ping_death_interval back down to just one second. >>> bool(da.activated) True >>> da.ping_death_interval datetime.timedelta(0, 60) >>> import datetime >>> da.ping_death_interval = datetime.timedelta(seconds=1) >>> transaction.commit() >>> zc.async.testing.wait_for_death(da) After the second dispatcher gets a poll--a chance to notice--it will have cleaned up the first dispatcher's old tasks in the same way we saw in the previous example. The job's ``handleInterrupt`` method will be called, which in this case will put it back in the queue to be claimed and performed. >>> alt_poll_3 = zc.async.testing.get_poll(alt_dispatcher) >>> _ = transaction.begin() >>> job in da['main'] False >>> bool(da.activated) False >>> da.dead True >>> wait_for_pending(job) >>> queue[0] is job True Now we need to wait for the job. >>> zc.async.testing.wait_for_result(job) 42 >>> callback_job.status == zc.async.interfaces.COMPLETED True >>> callback_job.result 'I got result 42' The sibling, then, was able to clean up the mess left by the "hard" crash of the first dispatcher. [#cleanup2]_ Other Job-Related Errors ------------------------ Other problems--errors when performing or committing jobs--are handled within jobs, getting the decisions from retry policies as described above. These are demonstrated in the job.txt document. .. rubric:: Footnotes .. [#setUp] >>> import ZODB.FileStorage >>> storage = ZODB.FileStorage.FileStorage( ... 'main.fs', create=True) >>> from ZODB.DB import DB >>> db = DB(storage) >>> conn = db.open() >>> root = conn.root() >>> import zc.async.configure >>> zc.async.configure.base() >>> import zc.async.subscribers >>> import zope.component >>> zope.component.provideHandler(zc.async.subscribers.queue_installer) >>> zope.component.provideHandler( ... zc.async.subscribers.ThreadedDispatcherInstaller( ... poll_interval=0.1)) >>> zope.component.provideHandler(zc.async.subscribers.agent_installer) >>> import zope.event >>> import zc.async.interfaces >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db)) >>> import transaction >>> _ = transaction.begin() .. [#cleanup1] >>> lock.release() >>> zc.async.testing.tear_down_dispatcher(old_dispatcher) .. [#cleanup2] >>> lock.release() >>> zc.async.testing.tear_down_dispatcher(dispatcher) >>> zc.async.testing.tear_down_dispatcher(alt_dispatcher) >>> time.sleep = old_sleep