Zope 3 Testing Tips and Tricks


  • Make sure you are using zope.app.testing version 3.4.2 or newer, or else ftests may intermittently raise spurious errors having to do with a missing _result attribute on a request’s response.

  • The Zope 3 tests use DemoStorage, which does not use MVCC. This can lead to your tests having occasional ConflictErrors that will not occur in production. In common cases for ftests, you won’t notice these because of Zope’s usual retry policy. Unit or integration tests may show these problems.

  • Set up the basic configuration in zcml or Python (see examples below), but you need to make sure that ftests do not use dispatchers started in the application. Start up ftest (or integration test) dispatchers separately, using zc.async.ftesting.setUp, and then tear down after the tests are done with zc.async.ftesting.tearDown. The tearDown feature tries to shut down all threads, and tries to let you know what job was running in threads that couldn’t be cleanly stopped.

    The ftest dispatcher polls every tenth of a second, so you shouldn’t need to wait long for you job to get started in your tests.

  • General zc.async testing tools such as zc.async.dispatcher.get, zc.async.testing.get_poll and zc.async.testing.wait_for_result can still be useful for in-depth zc.async tests.

  • If you don’t want to dig into guts in your functional tests to use the tools described in the previous point, consider making a view to check on job status using a data structure like JSON, and looking at that in your tests. Alternatively, investigate the tools in monitordb.py–although the tools were created for zc.monitor, they can still be used effectively in Python.

  • The setUp code by default sends critical log messages to __stdout__ so it can help diagnose why a callback might never complete.


Normally, in a Zope 3 configuration that uses zc.async, you configure it when you start your application. For instance, you might include a zc.async zcml file like basic_dispatcher_policy.zcml that performs the necessary set up.

However, the Zope 3 ftesting layer database dance doesn’t play well with zc.async unless you take a bit of extra care.

This is because zc.async will be started with the ftests’ underlying database, and then the test will be run with a DemoStorage wrapper. The zc.async dispatcher will run, then, but it will never see the changes that you make in the wrapper DemoStorage that your test manipulates. This can be mystifying and frustrating.

Because of this, when you write a Zope 3 app that wants to use both layered ftests and zc.async, you have to set things up in a mildly inconvenient way.

When you start your application normally, use configuration (zcml or grok or whatever) to register subscribers like the ones in subscribers.py: adding queues, starting dispatchers, and adding agents.

But don’t have this configuration registered for your ftests. Instead, bypass that part of your site’s configuration in your ftesting layer, and use the zc.async.ftesting.setUp function to set zc.async up in tests when you need it, in a footnote of your test or in a similar spot.

You’ll still want the basic adapters registered, as found in zc.async’s configure.zcml or configure.py files; and maybe the zc.async.queue.getDefaultQueue adapter too. This can be registered in ftesting.zcml with this snippet:

<include package="zc.async" />
<adapter factory="zc.async.queue.getDefaultQueue" />

Or in Python, you might want to do something like this:

>>> import zc.async.configure
>>> zc.async.configure.base() # or, more likely, ``minimal`` for Zope 3
>>> import zope.component
>>> import zc.async.queue
>>> zope.component.provideAdapter(zc.async.queue.getDefaultQueue)

Don’t forget to call tearDown (see below) at the end of your test!

Here’s a usage example.

As mentioned above, setUp does expect the necessary basic adapters to already be installed.

Zope 3 ftests generally have a getRootObject hanging around to give you the root object in the Zope application (but not in the ZODB). Therefore, this function tries to be helpful, for better and worse, and muck around in the locals to find it. If you want it to leave your locals alone, pass it a database connection.

So, here’s some set up. We create a database and make our stub getRootFolder function in the globals.

>>> import transaction
>>> import BTrees
>>> import ZODB.FileStorage
>>> storage = ZODB.FileStorage.FileStorage(
...     'zc_async.fs', create=True)
>>> from ZODB.DB import DB
>>> db = DB(storage)
>>> conn = db.open()
>>> root = conn.root()
>>> PseudoZopeRoot = root['Application'] = BTrees.family32.OO.BTree()
>>> transaction.commit()
>>> def _getRootObject():
...     return PseudoZopeRoot
>>> globals()['getRootFolder'] = _getRootObject

Notice we are using a real FileStorage, and not a DemoStorage, as is usually used in ftests. The fact that DemoStorage does not have MVCC can sometimes lead standard ftests to raise spurious ReadConflictErrors that will not actually occur in production. The ConflictErrors will generally be retried, so your tests should usually pass, even though you might see some “complaints”.

Now we can call setUp as if we were in a functional test.

>>> import zc.async.ftesting
>>> zc.async.ftesting.setUp()

Now the dispatcher is activated and the polls are running. The function sets up a dispatcher that polls much more frequently than usual–every 0.1 seconds rather than every 5, so that tests might run faster–but otherwise uses typical zc.async default values.

It’s worth noting a few tricks that are particularly useful for tests here. We’ll also use a couple of them to verify that setUp did its work.

zc.async.dispatcher.get() returns the currently installed dispatcher. This can let you check if it is activated and polling and use its simple statistical methods, if you want.

>>> import zc.async.dispatcher
>>> dispatcher = zc.async.dispatcher.get()

For now, we’ll just see that the dispatcher is activated.

>>> bool(dispatcher.activated)

See the dispatcher.txt for information on information you can get from the dispatcher object.

zc.async.testing has a number of helpful functions for testing. get_poll is the most pertinent here: given a dispatcher, it will give you the next poll. This is a good way to make sure that a job you just put in has had a chance to be claimed by a dispatcher. It’s also a reasonable way to verify that the dispatcher has started. setUp already gets the first two polls, so it’s definitely all started.

>>> import zc.async.testing
>>> import pprint
>>> pprint.pprint(zc.async.testing.get_poll(dispatcher))
{'': {'main': {'active jobs': [],
               'error': None,
               'len': 0,
               'new jobs': [],
               'size': 3}}}

Other useful testing functions are zc.async.testing.wait_for_result, which waits for the result on a give job and returns it; and zc.async.testing.wait_for_annotation, which waits for a given annotation on a given job. These are demonstrated in various doctests in this package, but should also be reasonably simple and self-explanatory.

Callbacks will retry some errors forever, by default. The logic is that callbacks are often the “cleanup” and must be run. This can lead to confusion in debugging tests, though, because the retry warnings are sent to the log, and the log is not usually monitored in functional tests.

setUp tries to help with this by adding logging of CRITICAL log messages in the “zc.async” logger to stdout.

>>> import logging
>>> logging.getLogger('zc.async.event').critical('Foo!')
>>> logging.getLogger('zc.async.event').error('Bar!')

Once you have finished your tests, make sure to shut down your dispatcher, or the testing framework will complain about an unstopped daemon thread. zc.async.ftesting.tearDown will do the trick.

>>> zc.async.ftesting.tearDown()
>>> dispatcher.activated

You can then start another async-enabled functional test up again later in the same layer, of course.

>>> zc.async.ftesting.setUp()
>>> dispatcher = zc.async.dispatcher.get()
>>> bool(dispatcher.activated)
>>> zc.async.ftesting.tearDown()
>>> dispatcher.activated

ftesting.tearDown attempts to join all threads in the dispatchers’ queues, but will raise an error if a job or dispatcher fails to shut down.

If the thread is performing a job, the error informs you what job is being performed.

>>> zc.async.ftesting.setUp()
>>> _ = transaction.begin()
>>> queue = root[zc.async.interfaces.KEY]['']
>>> def bad_job():
...     zc.async.testing.time_sleep(4)
>>> job = queue.put(bad_job)
>>> transaction.commit()
>>> zc.async.testing.wait_for_start(job)
>>> zc.async.ftesting.tearDown() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
Job in pool 'main' failed to stop:
  <zc.async.job.Job (oid ..., db ...) ``zc.async.doctest_test.bad_job()``>
>>> zc.async.testing.wait_for_result(job)

If the dispatcher isn’t shutting down for some reason, the UUID is given.

>>> zc.async.ftesting.tearDown()
>>> zc.async.ftesting.setUp()
>>> dispatcher = zc.async.dispatcher.get()
>>> def noop(*kw):
...     pass
>>> original_stop = dispatcher.reactor.stop
>>> dispatcher.reactor.stop = noop
>>> zc.async.ftesting.tearDown() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
Dispatcher (..., ...) failed to stop.

Let’s restore the original reactor.stop method and call tearDown again, which will work this time.

>>> dispatcher.reactor.stop = original_stop
>>> zc.async.ftesting.tearDown()

Also worth noting, as mentioned in the summary, is that you should use zope.app.testing version 3.4.2 or higher to avoid getting spurious, intermittent bug reports from ftests that use zc.async.

In your test or your test’s tearDown, if you used a FileStorage, as we did here, you’ll need to clean up as well. We normally do this in our tests’ tearDowns, but we do it here, now, to make the point.

>>> db.close()
>>> storage.close()
>>> storage.cleanup()
>>> del storage # we just do this to not confuse our own tearDown code.
>>> del globals()['getRootFolder'] # clean up globals; probably unnecessary

Table Of Contents

Previous topic

Zope 3 General Tips and Tricks

Next topic


This Page

Quick search