Zope 3 Testing Tips and Tricks
==============================
-------
Summary
-------
- Make sure you are using zope.app.testing version 3.4.2 or newer, or else
ftests may intermittently raise spurious errors having to do with a missing
``_result`` attribute on a request's response.
- The Zope 3 tests use DemoStorage, which does not use MVCC. This can lead
to your tests having occasional ConflictErrors that will not occur in
production. In common cases for ftests, you won't notice these because of
Zope's usual retry policy. Unit or integration tests may show these
problems.
- Set up the basic configuration in zcml or Python (see examples below), but
you need to make sure that ftests do not use dispatchers started in the
application. Start up ftest (or integration test) dispatchers separately,
using ``zc.async.ftesting.setUp``, and then tear down after the tests are
done with ``zc.async.ftesting.tearDown``. The ``tearDown`` feature tries
to shut down all threads, and tries to let you know what job was running in
threads that couldn't be cleanly stopped.
The ftest dispatcher polls every tenth of a second, so you shouldn't need to
wait long for you job to get started in your tests.
- General zc.async testing tools such as ``zc.async.dispatcher.get``,
``zc.async.testing.get_poll`` and ``zc.async.testing.wait_for_result`` can
still be useful for in-depth zc.async tests.
- If you don't want to dig into guts in your functional tests to use the tools
described in the previous point, consider making a view to check on job
status using a data structure like JSON, and looking at that in your tests.
Alternatively, investigate the tools in monitordb.py--although the tools
were created for zc.monitor, they can still be used effectively in Python.
- The ``setUp`` code by default sends critical log messages to __stdout__ so it
can help diagnose why a callback might never complete.
----------
Discussion
----------
Normally, in a Zope 3 configuration that uses zc.async, you configure it
when you start your application. For instance, you might include a zc.async
zcml file like basic_dispatcher_policy.zcml that performs the necessary set up.
However, the Zope 3 ftesting layer database dance doesn't play well with
zc.async unless you take a bit of extra care.
This is because zc.async will be started with the ftests' underlying database,
and then the test will be run with a DemoStorage wrapper. The zc.async
dispatcher will run, then, but it will never see the changes that you make in
the wrapper DemoStorage that your test manipulates. This can be mystifying
and frustrating.
Because of this, when you write a Zope 3 app that wants to use both layered
ftests and zc.async, you have to set things up in a mildly inconvenient way.
When you start your application normally, use configuration (zcml or grok or
whatever) to register subscribers like the ones in subscribers.py: adding
queues, starting dispatchers, and adding agents.
But don't have this configuration registered for your ftests. Instead, bypass
that part of your site's configuration in your ftesting layer, and use the
``zc.async.ftesting.setUp`` function to set zc.async up in tests when you need
it, in a footnote of your test or in a similar spot.
You'll still want the basic adapters registered, as found in zc.async's
configure.zcml or configure.py files; and maybe the
zc.async.queue.getDefaultQueue adapter too. This can be registered in
ftesting.zcml with this snippet::
Or in Python, you might want to do something like this:
>>> import zc.async.configure
>>> zc.async.configure.base() # or, more likely, ``minimal`` for Zope 3
>>> import zope.component
>>> import zc.async.queue
>>> zope.component.provideAdapter(zc.async.queue.getDefaultQueue)
Don't forget to call ``tearDown`` (see below) at the end of your test!
Here's a usage example.
As mentioned above, ``setUp`` does expect the necessary basic adapters to
already be installed.
Zope 3 ftests generally have a ``getRootObject`` hanging around to give you the
root object in the Zope application (but not in the ZODB). Therefore, this
function tries to be helpful, for better and worse, and muck around in the
locals to find it. If you want it to leave your locals alone, pass it a
database connection.
So, here's some set up. We create a database and make our stub
``getRootFolder`` function in the globals.
>>> import transaction
>>> import BTrees
>>> import ZODB.FileStorage
>>> storage = ZODB.FileStorage.FileStorage(
... 'zc_async.fs', create=True)
>>> from ZODB.DB import DB
>>> db = DB(storage)
>>> conn = db.open()
>>> root = conn.root()
>>> PseudoZopeRoot = root['Application'] = BTrees.family32.OO.BTree()
>>> transaction.commit()
>>> def _getRootObject():
... return PseudoZopeRoot
...
>>> globals()['getRootFolder'] = _getRootObject
Notice we are using a real FileStorage, and not a DemoStorage, as is usually
used in ftests. The fact that DemoStorage does not have MVCC can sometimes lead
standard ftests to raise spurious ReadConflictErrors that will not actually
occur in production. The ConflictErrors will generally be retried, so your
tests should usually pass, even though you might see some "complaints".
Now we can call ``setUp`` as if we were in a functional test.
>>> import zc.async.ftesting
>>> zc.async.ftesting.setUp()
Now the dispatcher is activated and the polls are running. The function sets up
a dispatcher that polls much more frequently than usual--every 0.1 seconds
rather than every 5, so that tests might run faster--but otherwise uses typical
zc.async default values.
It's worth noting a few tricks that are particularly useful for tests here.
We'll also use a couple of them to verify that ``setUp`` did its work.
``zc.async.dispatcher.get()`` returns the currently installed dispatcher. This
can let you check if it is activated and polling and use its simple statistical
methods, if you want.
>>> import zc.async.dispatcher
>>> dispatcher = zc.async.dispatcher.get()
For now, we'll just see that the dispatcher is activated.
>>> bool(dispatcher.activated)
True
See the dispatcher.txt for information on information you can get from the
dispatcher object.
zc.async.testing has a number of helpful functions for testing. ``get_poll`` is
the most pertinent here: given a dispatcher, it will give you the next poll.
This is a good way to make sure that a job you just put in has had a chance to
be claimed by a dispatcher. It's also a reasonable way to verify that the
dispatcher has started. ``setUp`` already gets the first two polls, so
it's definitely all started.
>>> import zc.async.testing
>>> import pprint
>>> pprint.pprint(zc.async.testing.get_poll(dispatcher))
{'': {'main': {'active jobs': [],
'error': None,
'len': 0,
'new jobs': [],
'size': 3}}}
Other useful testing functions are ``zc.async.testing.wait_for_result``, which
waits for the result on a give job and returns it; and
``zc.async.testing.wait_for_annotation``, which waits for a given annotation
on a given job. These are demonstrated in various doctests in this package,
but should also be reasonably simple and self-explanatory.
Callbacks will retry some errors forever, by default. The logic is that
callbacks are often the "cleanup" and must be run. This can lead to confusion
in debugging tests, though, because the retry warnings are sent to the log,
and the log is not usually monitored in functional tests.
``setUp`` tries to help with this by adding logging of ``CRITICAL`` log
messages in the "zc.async" logger to stdout.
>>> import logging
>>> logging.getLogger('zc.async.event').critical('Foo!')
Foo!
>>> logging.getLogger('zc.async.event').error('Bar!')
Once you have finished your tests, make sure to shut down your dispatcher, or
the testing framework will complain about an unstopped daemon thread.
zc.async.ftesting.tearDown will do the trick.
>>> zc.async.ftesting.tearDown()
>>> dispatcher.activated
False
You can then start another async-enabled functional test up again later in the
same layer, of course.
>>> zc.async.ftesting.setUp()
>>> dispatcher = zc.async.dispatcher.get()
>>> bool(dispatcher.activated)
True
>>> zc.async.ftesting.tearDown()
>>> dispatcher.activated
False
ftesting.tearDown attempts to join all threads in the dispatchers' queues, but
will raise an error if a job or dispatcher fails to shut down.
If the thread is performing a job, the error informs you what job is being
performed.
>>> zc.async.ftesting.setUp()
>>> _ = transaction.begin()
>>> queue = root[zc.async.interfaces.KEY]['']
>>> def bad_job():
... zc.async.testing.time_sleep(4)
>>> job = queue.put(bad_job)
>>> transaction.commit()
>>> zc.async.testing.wait_for_start(job)
>>> zc.async.ftesting.tearDown() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
Traceback (most recent call last):
...
TearDownDispatcherError:
Job in pool 'main' failed to stop:
>>> zc.async.testing.wait_for_result(job)
If the dispatcher isn't shutting down for some reason, the UUID is given.
>>> zc.async.ftesting.tearDown()
>>> zc.async.ftesting.setUp()
>>> dispatcher = zc.async.dispatcher.get()
>>> def noop(*kw):
... pass
>>> original_stop = dispatcher.reactor.stop
>>> dispatcher.reactor.stop = noop
>>> zc.async.ftesting.tearDown() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
Traceback (most recent call last):
...
TearDownDispatcherError:
Dispatcher (..., ...) failed to stop.
Let's restore the original reactor.stop method and call tearDown again, which
will work this time.
>>> dispatcher.reactor.stop = original_stop
>>> zc.async.ftesting.tearDown()
Also worth noting, as mentioned in the summary, is that you should use
zope.app.testing version 3.4.2 or higher to avoid getting spurious,
intermittent bug reports from ftests that use zc.async.
In your test or your test's tearDown, if you used a FileStorage, as we did
here, you'll need to clean up as well. We normally do this in our tests'
tearDowns, but we do it here, now, to make the point.
>>> db.close()
>>> storage.close()
>>> storage.cleanup()
>>> del storage # we just do this to not confuse our own tearDown code.
>>> del globals()['getRootFolder'] # clean up globals; probably unnecessary