Quickstart with virtualenv

Prerequisites

Installation

To start, install virtualenv and create a virtual environment for our experiments.

$ easy_install virtualenv
$ virtualenv quickstart

Install zc.async in the virtual environment.

$ cd quickstart/
$ ./bin/easy_install zc.async

Dependencies

This installed several packages.

  • the ZODB, an object database from the Zope project;
  • Twisted, a framework for networked applications;
  • the component architecture from the Zope project;
  • and a few smaller packages.

All of these, and zc.async, are distributed under BSD-like licenses such as LGPL and ZPL.

ZEO Server

zc.async relies on a distributed ZODB technology called ZEO (“Zope Enterprise Objects”) to distribute work. ZEO has a central database server to which client processes connect.

Let’s start the ZEO Server:

$ ./bin/runzeo -a 9999 -f test.fs &

That starts a database server, accessible on port 9999 of your local machine, saving the data in the test.fs file.

Starting zc.async

A Client

Now let’s start a Python with a client connection to the database server.

Start up bin/python (not your system python, but the one in virtualenv’s quickstart/bin):

$ ./bin/python

This will be our single client process.

You might have many, each connecting to the main database server, and each able to perform and/or request zc.async jobs.

Database Connection

Connect to the database.

import ZEO.ClientStorage
import ZODB
storage = ZEO.ClientStorage.ClientStorage(
    ('127.0.0.1', 9999))
db = ZODB.DB(storage)

Start zc.async

Basics

Now we do some basic configuration. This first bit installs some default adapters. You might not ever have to worry too much about them.

>>> import zc.async.configure
>>> zc.async.configure.base()

Policy

This second part is policy, and if you ever put zc.async in production, you’ll want to understand what’s going on here.

>>> zc.async.configure.start(db, poll_interval=1)

Now the system has a dispatcher polling for jobs every second. As we’ll see below, a dispatcher polls for jobs in one or more queues, to assign them to worker threads.

Using zc.async

The Queue

The start function also installed a queue. To get zc.async to do work, you put a job in a queue, and commit the transaction.

First, let’s get the queue that we have installed. We need to open a connection to the database. Then we get the queue.

>>> conn = db.open()
>>> import zc.async.interfaces
>>> q = zc.async.interfaces.IQueue(conn)

A Job

Let’s put a job in our queue. This silly example will return the current time.

>>> import time
>>> j = q.put(time.time)

It’s not done yet.

>>> j.result
>>> j.status
u'pending-status'

A Transaction

We have to commit the transaction for the dispatcher to see the job.

>>> import transaction
>>> transaction.commit()

A Result

Now wait a second and then try this. “transaction.begin” will sync up our database with database changes made elsewhere.

>>> _ = transaction.begin()
>>> j.result
1216179006.856108
>>> j.status
u'completed-status'

Another Job (And Closures)

You can also make closures. The Job class accepts arguments similarly to the Python 2.5 functools.partial(): Job(func, \*args, \*\*keywords). This instantiates a new callable (a Job instance) with partial application of the given arguments and keywords. You can then pass the job instance to the put() method.

Generating RSA keys is actually a reasonable real-world use case for something like this.

import subprocess
j = q.put(zc.async.job.Job(
    subprocess.call,
    ['openssl', 'genrsa', '-out',
     'key.pem', '1024']))
transaction.commit()

We need to begin the transaction to see the result...

j.result
_ = transaction.begin()
j.result

...which in this case is simply 0, indicating a successful UNIX process.

0

We can open the file to show the result.

subprocess.call(['cat', 'key.pem'])

This will show you the key, which should look something like this:

-----BEGIN RSA PRIVATE KEY-----
MIICXgIBAAKBgQCYAZW+HjDGJhRHnUlZZWqhrGOxU2K/RhssmcMs0JLnWI2cWmZ+
...
CEcz6ZbO8zm4AEGI/dqLicZh3bhunhflAovW6WxbNKLENQ==
-----END RSA PRIVATE KEY-----
0

Running Your Own Code

A Monte Carlo Simulation

We’ve now seen some simple examples from the standard library. But how do you get your own work done?

Let’s say we want to implement an old chestnut of a problem: use a Monte Carlo simulation (that is, “throwing darts and analyzing the results”) to calculate pi. This will use zc.async much like the map-reduce approach of Hadoop: we will want to distribute the work of running the simulation to multiple machines so the result can be done faster.

Picklable Callables and Arguments

You want a job to have a reference to your own callable, so the job will get the work you define performed.

This reference, of the job to your callable, will need to be persisted in the database.

Because zc.async uses the ZODB for its persistence mechanism, the ZODB’s persistence rules are in effect.

Luckily, these are fairly simple.

For now, we’ll stay as simple as it gets: if you use module global functions and immutables, and share software across instances, you’ll be fine.

ZODB allows a lot more, if you’re willing to follow a few more rules, but that one rule will get us moving for this quick-start.

Process UUIDs

Exit the Python interpreter (control-D). Look around in the directory (ls): you should see a few database files, the key.pem you created, and a new file: uuid.txt. It should look something like this:

$ cat uuid.txt
afd1e0d0-52e1-11dd-879b-0017f2c49bdd
------------------------------------------------------------------------
The value above (and this file) is created and used by the zc.async
package. It is intended to uniquely identify this software instance when
it is used to start a zc.async dispatcher.  This allows multiple
dispatchers, each in its own software instance, to connect to a single
database to do work.

In order to decide where to look for this file (or to create it, if
necessary), the module looks in ``os.environ['ZC_ASYNC_UUID']`` for a
file name.

If you are using zdaemon (http://pypi.python.org/pypi/zdaemon) to
daemonize your process, you can set this in a zdaemon environment section
of your zdaemon.conf. Supervisor (http://supervisord.org/) also provides
this functionality. Other similar tools probably do as well.

If the ``ZC_ASYNC_UUID`` is not found in the environment, it will use
``os.path.join(os.getgwd(), 'uuid.txt')`` as the file name.

To get a new identifier for this software instance, delete this file,
restart Python, and import zc.async.instanceuuid.  This file will be
recreated with a new value.

That text is intended to be self-explanatory, so hopefully it made sense to you. We’ll handle these UUIDs more explicitly in a moment.

Make a File

Make a new Python file. Let’s call it pi.py. Save this file in lib/python2.5/site-packages/.

Use the following for the file content.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import random
import math

import ZEO.ClientStorage
import ZODB
import twisted.internet.reactor

import zc.async.configure

def generate_sample(size=100000):
    count = 0
    for i in range(size):
        if math.hypot(random.random(), random.random()) < 1:
            count += 1
    return count, size

def process_samples(*sample_jobs):
    count = 0 
    size = 0
    for j in sample_jobs:
        count += j.result[0]
        size += j.result[1]
    return 4.0 * count / size

if __name__ == '__main__':
    storage = ZEO.ClientStorage.ClientStorage(
        ('127.0.0.1', 9999))
    db = ZODB.DB(storage)
    zc.async.configure.base()
    zc.async.configure.start(
        db, poll_interval=0.1, twisted=True)
    twisted.internet.reactor.run()

Our First Monte Carlo Experiment

We’ll need the ZEO server running. If you’ve gone through this file from the start, it should still be running. If not, use this:

$ ./bin/runzeo -a 9999 -f test.fs &

Now, for our first experiment with the Monte Carlo simulation, we’ll start a single worker process.

Enter this in the terminal:

$ ./bin/python lib/python2.5/site-packages/pi.py &

That will start our worker process. Now let’s start an interpreter.

$ ./bin/python

Now get a database and a connection, as we’ve seen before. We’ll also set up the base zc.async configuration.

import ZEO.ClientStorage
import ZODB
storage = ZEO.ClientStorage.ClientStorage(
    ('127.0.0.1', 9999))
db = ZODB.DB(storage)
conn = db.open()
import zc.async.configure
zc.async.configure.base()

We don’t have any adapters installed, so zc.async.interfaces.IQueue(conn) won’t work. This will though, and still looks pretty good:

>>> import zc.async.queue
>>> q = zc.async.queue.getDefaultQueue(conn)

Now we can start some jobs in parallel().

>>> import pi
>>> import zc.async.job
>>> j = q.put(zc.async.job.parallel(
...     pi.generate_sample, pi.generate_sample, pi.generate_sample,
...     postprocess=pi.process_samples))
>>> import transaction
>>> transaction.commit()

Wait a few seconds. If the result is empty (None), begin the transaction again and check the result again. Eventually, these next two lines should give you a result: an approximation of pi.

_ = transaction.begin()
j.result

For one run, I got 3.1386666666666665. Cool.

Closures

We’ve already seen Jobs used as closures in the openssl example. You can also use them to pass a different size argument to generate_sample.

Let’s try it.

>>> j = q.put(zc.async.job.parallel(
...     zc.async.job.Job(pi.generate_sample, 1000000),
...     zc.async.job.Job(pi.generate_sample, size=1000000),
...     postprocess=pi.process_samples))
>>> transaction.commit()

Wait a bit, again. Retry these two lines until you get a result. It should be well under a minute on most machines.

_ = transaction.begin()
j.result

My run got 3.1434359999999999. Cool.

Configuration

Unfortunately, the parallel runs we’ve done so far would actually make the calculation go slower than if we did it in a single function call! That’s because we ran it in a single Python worker process, and the overhead of threads just makes the jobs a bit less efficient. Threads help for some uses of zc.async, but not this one.

Let’s assume you are running these experiments on a machine with two processor cores. We should actually start two worker processes then, one for each core. We’ll need to make sure each worker process has its own OID.

Moreover, we need to configure each process to only take one generate_sample job at a time. Let’s adjust our code to do that.

Agents

A worker process regularly polls the database for new jobs. The software component that polls is called a dispatcher. Dispatchers look in the database to ask their personal agent (or agents) to determine what they should get their threads to do. Think of the agent as a “talent agent” or a “booking agent” for the process.

By default, using the zc.async.configure helpers, each dispatcher is given a single agent that will choose the first job in the queue, and that wants to run no more than three jobs at a time.

For our Monte Carlo job, we’ll give each process an additional agent. The new agent will only accept up to one instance of a generate_sample job at a time.

We will also reconfigure the existing agent to accept up to three of anything except generate_sample.

We’ll do that in our file. Here’s the revised version. The only changes are the imports, the three new functions, and setting up install_agent in the if __name__ == '__main__': block.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import random
import math

import ZEO.ClientStorage
import ZODB
import transaction
import twisted.internet.reactor

import zc.async.configure
import zc.async.queue
import zc.async.instanceuuid
import zc.async.agent

def generate_sample(size=100000):
    count = 0
    for i in range(size):
        if math.hypot(random.random(), random.random()) < 1:
            count += 1
    return count, size

def process_samples(*sample_jobs):
    count = 0 
    size = 0
    for j in sample_jobs:
        count += j.result[0]
        size += j.result[1]
    return 4.0 * count / size

def choose_generate_sample(agent):
    return agent.queue.claim(
        lambda j: j.callable.__name__ == 'generate_sample')

def choose_another(agent):
    return agent.queue.claim(
        lambda j: j.callable.__name__ != 'generate_sample')

def install_agent(db):
    conn = db.open()
    try:
        q = zc.async.queue.getDefaultQueue(conn)
        try:
            dispatcher = q.dispatchers[zc.async.instanceuuid.UUID]
        except KeyError:
            twisted.internet.reactor.callLater(0.05, install_agent, db)
        else:
            if 'generate_sample' not in dispatcher:
                agent = dispatcher['main']
                agent.chooser = choose_another
                dispatcher['generate_sample'] = zc.async.agent.Agent(
                    choose_generate_sample, 1)
                transaction.commit()
    finally:
        transaction.abort()
        conn.close()

if __name__ == '__main__':
    storage = ZEO.ClientStorage.ClientStorage(
        ('127.0.0.1', 9999))
    db = ZODB.DB(storage)
    zc.async.configure.base()
    zc.async.configure.start(
        db, poll_interval=0.1, twisted=True)
    twisted.internet.reactor.callWhenRunning(install_agent, db)
    twisted.internet.reactor.run()

Demonstration

Now let’s start up our workers, and see how they work. We’re going to have two workers now, and they will each need separate UUIDs. A really simple approach will be to make two separate working directories for the two worker processes. (We also could use the environmental variable, ZC_ASYNC_UUID, described in the Process UUIDs section above.)

$ mkdir worker1
$ mv uuid.txt worker1
$ cd worker1
$ ../bin/python ../lib/python2.5/site-packages/pi.py &
$ cd ..
$ mkdir worker2
$ cd worker2
$ ../bin/python ../lib/python2.5/site-packages/pi.py &

Now we’ll start the Python process in which we will test our code. We’ll move to the main directory, but as long as we don’t start another worker, it doesn’t really matter.

$ cd ..
$ ./bin/python

And now, our test.

import ZEO.ClientStorage
import ZODB
storage = ZEO.ClientStorage.ClientStorage(
    ('127.0.0.1', 9999))
db = ZODB.DB(storage)
conn = db.open()
import zc.async.configure
zc.async.configure.base()
import pi
import zc.async.job
import zc.async.queue
q = zc.async.queue.getDefaultQueue(conn)
j = q.put(zc.async.job.parallel(
    zc.async.job.Job(pi.generate_sample, 5000000),
    zc.async.job.Job(pi.generate_sample, size=5000000),
    postprocess=pi.process_samples))
import transaction
transaction.commit()

Wait a few seconds and then try these lines.

_ = transaction.begin()
j.result

If the result is empty (None), repeat those two lines again (thatæ is, begin the transaction again and check the result again). Eventually, these lines should give you a result: an approximation of pi.

Just to prove to ourselves that we saved some time, let’s do a comparison test: the same number of samples, but not in parallel.

j2 = q.put(zc.async.job.parallel(
    zc.async.job.Job(pi.generate_sample, 10000000),
    postprocess=pi.process_samples))
transaction.commit()
_ = transaction.begin()
j2.result

Once both jobs are complete, compare their run-time.

j.active_end - j.active_start
j2.active_end - j2.active_start

On my machine, even in this simple, short example, running in parallel with a job per processor/core took 7.8 seconds, while running all in one process took 13.4 seconds.

Monitoring

Soon, you may want to be able to monitor or introspect what’s going on in your zc.async work. The package provides several tools to do that. We’ll take a look at a few here.

We will be turning on a monitor port. This port should be protected behind a firewall, like your ZEO ports.

If you like the functionality that we describe here but would prefer to expose it in a different manner, note that most of the Python functions in monitor.py and monitordb.py power the zc.async commands in the monitor port, and can be used without the monitor itself.

To enable the monitoring port, we need to install some extra dependencies for zc.async: “[monitor]”. Exit the Python interpreter an make sure you are in the top “quickstart” directory, and then enter this command:

$ ./bin/easy_install zc.async[monitor]

If you take a glance at the output, you’ll see we’ve only added a few dependencies: simplejson, zc.ngi, and zc.monitor.

Now let’s turn on the port and the zc.async commands.

At the top of the pi.py file, add some imports:

import os
import zope.component
import zc.monitor
import zc.monitor.interfaces
import zc.async.monitor
import zc.async.monitordb

Then down in the if __name__ == '__main__': block, add these lines at the top.

monitor_port = os.environ.get('MONITOR_PORT')
if monitor_port:
    for f in (zc.monitor.interactive, zc.monitor.quit, zc.monitor.help,
              zc.async.monitor.async, zc.async.monitordb.asyncdb):
        zope.component.provideUtility(
            f, zc.monitor.interfaces.IMonitorPlugin, f.__name__)
    zc.monitor.start(int(monitor_port))

The file should look like this now.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import os
import random
import math

import ZEO.ClientStorage
import ZODB
import transaction
import twisted.internet.reactor
import zc.monitor
import zc.monitor.interfaces
import zope.component

import zc.async.configure
import zc.async.queue
import zc.async.instanceuuid
import zc.async.agent
import zc.async.monitor
import zc.async.monitordb

def generate_sample(size=100000):
    count = 0
    for i in range(size):
        if math.hypot(random.random(), random.random()) < 1:
            count += 1
    return count, size

def process_samples(*sample_jobs):
    count = 0 
    size = 0
    for j in sample_jobs:
        count += j.result[0]
        size += j.result[1]
    return 4.0 * count / size

def choose_generate_sample(agent):
    return agent.queue.claim(
        lambda j: j.callable.__name__ == 'generate_sample')

def choose_another(agent):
    return agent.queue.claim(
        lambda j: j.callable.__name__ != 'generate_sample')

def install_agent(db):
    conn = db.open()
    try:
        q = zc.async.queue.getDefaultQueue(conn)
        try:
            dispatcher = q.dispatchers[zc.async.instanceuuid.UUID]
        except KeyError:
            twisted.internet.reactor.callLater(0.05, install_agent, db)
        else:
            if 'generate_sample' not in dispatcher:
                agent = dispatcher['main']
                agent.chooser = choose_another
                dispatcher['generate_sample'] = zc.async.agent.Agent(
                    choose_generate_sample, 1)
                transaction.commit()
    finally:
        transaction.abort()
        conn.close()

if __name__ == '__main__':
    monitor_port = os.environ.get('MONITOR_PORT')
    if monitor_port:
        for f in (zc.monitor.interactive, zc.monitor.quit, zc.monitor.help,
                  zc.async.monitor.async, zc.async.monitordb.asyncdb):
            zope.component.provideUtility(
                f, zc.monitor.interfaces.IMonitorPlugin, f.__name__)
        zc.monitor.start(int(monitor_port))
    storage = ZEO.ClientStorage.ClientStorage(
        ('127.0.0.1', 9999))
    db = ZODB.DB(storage)
    zc.async.configure.base()
    zc.async.configure.start(
        db, poll_interval=0.1, twisted=True)
    twisted.internet.reactor.callWhenRunning(install_agent, db)
    twisted.internet.reactor.run()

Now stop and restart your two worker instances, this time providing two different ports in the environment for each worker. Here’s one way to do it. First we’ll shut down the previous instances. If you used the lines above to start them before, type fg and RETURN, and then CTRL-C to stop worker 2; and then do the same thing (fg and RETURN, and then CTRL-C) to stop worker 1.

$ cd worker1
$ MONITOR_PORT=9991 ../bin/python ../lib/python2.5/site-packages/pi.py &
$ cd ../worker2
$ MONITOR_PORT=9992 ../bin/python ../lib/python2.5/site-packages/pi.py &

Now you can open connections to these two ports, 9991 and 9992, and query the worker (using the async command, primarily) and the state of zc.async in the database itself (the asyncdb command).

The easiest way to experiment with this is using telnet. Try this.

$ telnet 127.0.0.1 9991

You should see something like this:

Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.

Now you can experiment with commands. Try this (followed by a RETURN):

help

You should see something like this:

Supported commands:
  async -- Monitor zc.async activity in this process.
  asyncdb -- Monitor and introspect zc.async activity in the database.
  help -- Get help about server commands
  interactive -- Turn on monitor's interactive mode
  quit -- Quit the monitor
Connection closed by foreign host.
$

Hm, it dumped us straight back to the shell! zc.monitor behaves that way tp be friendly to automated monitoring processes using the port. We can use the interactive command to make things a bit more pleasant for ourselves.

Reuse the telnet command shown above, or maybe connect to 9992 (telnet 127.0.0.1 9992) to see that you can. This time, type the interactive command first. You should see this reply:

Interactive mode on.  Use "quit" To exit.

Now experiment! Try some of these commands to see what you get.

help async

async help

async help status

async status

async help poll

async poll

async UUID

async utcnow

help asyncdb

asyncdb help

asyncdb help UUIDs

asyncdb UUIDs

asyncdb lastcompleted

asyncdb count completed

asyncdb count completed uuid:THIS

That last command shows how many jobs this worker has completed for as long as the database has records, which by default means between seven and eight days. The help for asyncdb count, which is available from asyncdb help count, is not short, but tells you the many options available.

When you are done, use the quit command to exit the telnet connection to the monitor port.

Another important tool is logging. The zc.async.event logger gets important events–pay special attention to critical events! zc.async.trace lets you follow along with every detail, if so desired.

These monitoring and introspection tools, combined with logging, provide powerful tools to keep track of your zc.async work.

Other Configuration

We’re at the end of this quickstart. To close, here’s a quick survey of some other configuration opportunities available that we haven’t seen here.

  • Callbacks are a very important per-job configuration. You can add them to a job, to be run unconditionally, conditionally if the result is an instance of a twisted.python.failure.Failure, or conditionally if the result is not a Failure. See addCallback(), addCallbacks(), and callbacks

Note

Unlike Twisted callbacks, all callbacks for the same job get the same result; if you would like to chain results, the callbacks themselves are Jobs, so attach a callback to your callback.

  • You can request that a job begin_after a given timezone-aware datetime. If not given, this defaults to now, for the purposes of calculating the effective begin_by datetime, described below.
  • You can specify that a job should begin_by a given duration (datetime.timedelta) after the jobs’s begin_after value. When the queue :gets ready to offer the job for an agent to choose, if the effective begin_by value has passed, the queue will instead offer a call to the job’s fail() method.

Note

There is no built-in way to stop a running job, short of stopping the process. This can be approximated by use in your job of getLiveAnnotation() to poll for stop requests; or the brave can write some C to use PyThreadState_SetAsyncExc.

  • You can set up quotas for certain arbitrary quota names that you define. This is a limit: no more than the given quota can run at once, total, across all workers. This can let you decrease the chance of conflict errors for long-running jobs that write to the same data structures. See quotas, IQuotas, and quota_names.
  • Retry policies determine how jobs should be retried if they raise an uncaught exception while running, if their commit raises an error, or if they are interrupted. They can be configured per job, or as defaults for callback and non-callback jobs. See IRetryPolicy, retry_policy_factory, getRetryPolicy(); the default retry policies RetryCommonFourTimes, RetryCommonForever and NeverRetry; and the retry_policy_factory argument to addCallback(), addCallbacks(), and put().
  • The failure_log_level determines at what level Failure results for a given job should be logged. This is usually logging.ERROR, and logging.CRITICAL for callbacks. This can be set directly on the job, or applied via the failure_log_level argument to addCallback(), addCallbacks(), and put().
  • Custom job subclasses can take advantage of setUp() and tearDown() hooks to set up and tear down state. An example is the Zope 3-specific Job.
  • A custom queue might support broadcasting jobs across dispatchers, or targeting specific dispatchers. These features may be developed for zc.async itself at a later date.

There are many other topics to discuss–testing, debugging, and Zope 3 integration, for instance–but this is a quick start, so we’ll end here.