To start, install virtualenv and create a virtual environment for our experiments.
$ easy_install virtualenv $ virtualenv quickstart
Install zc.async in the virtual environment.
$ cd quickstart/ $ ./bin/easy_install zc.async
This installed several packages.
All of these, and zc.async, are distributed under BSD-like licenses such as LGPL and ZPL.
zc.async relies on a distributed ZODB technology called ZEO (“Zope Enterprise Objects”) to distribute work. ZEO has a central database server to which client processes connect.
Let’s start the ZEO Server:
$ ./bin/runzeo -a 9999 -f test.fs &
That starts a database server, accessible on port 9999 of your local machine, saving the data in the test.fs file.
Now let’s start a Python with a client connection to the database server.
Start up bin/python (not your system python, but the one in virtualenv’s quickstart/bin):
$ ./bin/python
This will be our single client process.
You might have many, each connecting to the main database server, and each able to perform and/or request zc.async jobs.
Connect to the database.
import ZEO.ClientStorage
import ZODB
storage = ZEO.ClientStorage.ClientStorage(
('127.0.0.1', 9999))
db = ZODB.DB(storage)
Now we do some basic configuration. This first bit installs some default adapters. You might not ever have to worry too much about them.
>>> import zc.async.configure
>>> zc.async.configure.base()
This second part is policy, and if you ever put zc.async in production, you’ll want to understand what’s going on here.
>>> zc.async.configure.start(db, poll_interval=1)
Now the system has a dispatcher polling for jobs every second. As we’ll see below, a dispatcher polls for jobs in one or more queues, to assign them to worker threads.
The start function also installed a queue. To get zc.async to do work, you put a job in a queue, and commit the transaction.
First, let’s get the queue that we have installed. We need to open a connection to the database. Then we get the queue.
>>> conn = db.open()
>>> import zc.async.interfaces
>>> q = zc.async.interfaces.IQueue(conn)
Let’s put a job in our queue. This silly example will return the current time.
>>> import time
>>> j = q.put(time.time)
It’s not done yet.
>>> j.result
>>> j.status
u'pending-status'
We have to commit the transaction for the dispatcher to see the job.
>>> import transaction
>>> transaction.commit()
Now wait a second and then try this. “transaction.begin” will sync up our database with database changes made elsewhere.
>>> _ = transaction.begin()
>>> j.result
1216179006.856108
>>> j.status
u'completed-status'
You can also make closures. The Job class accepts arguments similarly to the Python 2.5 functools.partial(): Job(func, \*args, \*\*keywords). This instantiates a new callable (a Job instance) with partial application of the given arguments and keywords. You can then pass the job instance to the put() method.
Generating RSA keys is actually a reasonable real-world use case for something like this.
import subprocess
j = q.put(zc.async.job.Job(
subprocess.call,
['openssl', 'genrsa', '-out',
'key.pem', '1024']))
transaction.commit()
We need to begin the transaction to see the result...
j.result
_ = transaction.begin()
j.result
...which in this case is simply 0, indicating a successful UNIX process.
0
We can open the file to show the result.
subprocess.call(['cat', 'key.pem'])
This will show you the key, which should look something like this:
-----BEGIN RSA PRIVATE KEY----- MIICXgIBAAKBgQCYAZW+HjDGJhRHnUlZZWqhrGOxU2K/RhssmcMs0JLnWI2cWmZ+ ... CEcz6ZbO8zm4AEGI/dqLicZh3bhunhflAovW6WxbNKLENQ== -----END RSA PRIVATE KEY----- 0
We’ve now seen some simple examples from the standard library. But how do you get your own work done?
Let’s say we want to implement an old chestnut of a problem: use a Monte Carlo simulation (that is, “throwing darts and analyzing the results”) to calculate pi. This will use zc.async much like the map-reduce approach of Hadoop: we will want to distribute the work of running the simulation to multiple machines so the result can be done faster.
You want a job to have a reference to your own callable, so the job will get the work you define performed.
This reference, of the job to your callable, will need to be persisted in the database.
Because zc.async uses the ZODB for its persistence mechanism, the ZODB’s persistence rules are in effect.
Luckily, these are fairly simple.
For now, we’ll stay as simple as it gets: if you use module global functions and immutables, and share software across instances, you’ll be fine.
ZODB allows a lot more, if you’re willing to follow a few more rules, but that one rule will get us moving for this quick-start.
Exit the Python interpreter (control-D). Look around in the directory (ls): you should see a few database files, the key.pem you created, and a new file: uuid.txt. It should look something like this:
$ cat uuid.txt afd1e0d0-52e1-11dd-879b-0017f2c49bdd ------------------------------------------------------------------------ The value above (and this file) is created and used by the zc.async package. It is intended to uniquely identify this software instance when it is used to start a zc.async dispatcher. This allows multiple dispatchers, each in its own software instance, to connect to a single database to do work. In order to decide where to look for this file (or to create it, if necessary), the module looks in ``os.environ['ZC_ASYNC_UUID']`` for a file name. If you are using zdaemon (http://pypi.python.org/pypi/zdaemon) to daemonize your process, you can set this in a zdaemon environment section of your zdaemon.conf. Supervisor (http://supervisord.org/) also provides this functionality. Other similar tools probably do as well. If the ``ZC_ASYNC_UUID`` is not found in the environment, it will use ``os.path.join(os.getgwd(), 'uuid.txt')`` as the file name. To get a new identifier for this software instance, delete this file, restart Python, and import zc.async.instanceuuid. This file will be recreated with a new value.
That text is intended to be self-explanatory, so hopefully it made sense to you. We’ll handle these UUIDs more explicitly in a moment.
Make a new Python file. Let’s call it pi.py. Save this file in lib/python2.5/site-packages/.
Use the following for the file content.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | import random
import math
import ZEO.ClientStorage
import ZODB
import twisted.internet.reactor
import zc.async.configure
def generate_sample(size=100000):
count = 0
for i in range(size):
if math.hypot(random.random(), random.random()) < 1:
count += 1
return count, size
def process_samples(*sample_jobs):
count = 0
size = 0
for j in sample_jobs:
count += j.result[0]
size += j.result[1]
return 4.0 * count / size
if __name__ == '__main__':
storage = ZEO.ClientStorage.ClientStorage(
('127.0.0.1', 9999))
db = ZODB.DB(storage)
zc.async.configure.base()
zc.async.configure.start(
db, poll_interval=0.1, twisted=True)
twisted.internet.reactor.run()
|
We’ll need the ZEO server running. If you’ve gone through this file from the start, it should still be running. If not, use this:
$ ./bin/runzeo -a 9999 -f test.fs &
Now, for our first experiment with the Monte Carlo simulation, we’ll start a single worker process.
Enter this in the terminal:
$ ./bin/python lib/python2.5/site-packages/pi.py &
That will start our worker process. Now let’s start an interpreter.
$ ./bin/python
Now get a database and a connection, as we’ve seen before. We’ll also set up the base zc.async configuration.
import ZEO.ClientStorage
import ZODB
storage = ZEO.ClientStorage.ClientStorage(
('127.0.0.1', 9999))
db = ZODB.DB(storage)
conn = db.open()
import zc.async.configure
zc.async.configure.base()
We don’t have any adapters installed, so zc.async.interfaces.IQueue(conn) won’t work. This will though, and still looks pretty good:
>>> import zc.async.queue
>>> q = zc.async.queue.getDefaultQueue(conn)
Now we can start some jobs in parallel().
>>> import pi
>>> import zc.async.job
>>> j = q.put(zc.async.job.parallel(
... pi.generate_sample, pi.generate_sample, pi.generate_sample,
... postprocess=pi.process_samples))
>>> import transaction
>>> transaction.commit()
Wait a few seconds. If the result is empty (None), begin the transaction again and check the result again. Eventually, these next two lines should give you a result: an approximation of pi.
_ = transaction.begin()
j.result
For one run, I got 3.1386666666666665. Cool.
We’ve already seen Jobs used as closures in the openssl example. You can also use them to pass a different size argument to generate_sample.
Let’s try it.
>>> j = q.put(zc.async.job.parallel(
... zc.async.job.Job(pi.generate_sample, 1000000),
... zc.async.job.Job(pi.generate_sample, size=1000000),
... postprocess=pi.process_samples))
>>> transaction.commit()
Wait a bit, again. Retry these two lines until you get a result. It should be well under a minute on most machines.
_ = transaction.begin()
j.result
My run got 3.1434359999999999. Cool.
Unfortunately, the parallel runs we’ve done so far would actually make the calculation go slower than if we did it in a single function call! That’s because we ran it in a single Python worker process, and the overhead of threads just makes the jobs a bit less efficient. Threads help for some uses of zc.async, but not this one.
Let’s assume you are running these experiments on a machine with two processor cores. We should actually start two worker processes then, one for each core. We’ll need to make sure each worker process has its own OID.
Moreover, we need to configure each process to only take one generate_sample job at a time. Let’s adjust our code to do that.
A worker process regularly polls the database for new jobs. The software component that polls is called a dispatcher. Dispatchers look in the database to ask their personal agent (or agents) to determine what they should get their threads to do. Think of the agent as a “talent agent” or a “booking agent” for the process.
By default, using the zc.async.configure helpers, each dispatcher is given a single agent that will choose the first job in the queue, and that wants to run no more than three jobs at a time.
For our Monte Carlo job, we’ll give each process an additional agent. The new agent will only accept up to one instance of a generate_sample job at a time.
We will also reconfigure the existing agent to accept up to three of anything except generate_sample.
We’ll do that in our file. Here’s the revised version. The only changes are the imports, the three new functions, and setting up install_agent in the if __name__ == '__main__': block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | import random
import math
import ZEO.ClientStorage
import ZODB
import transaction
import twisted.internet.reactor
import zc.async.configure
import zc.async.queue
import zc.async.instanceuuid
import zc.async.agent
def generate_sample(size=100000):
count = 0
for i in range(size):
if math.hypot(random.random(), random.random()) < 1:
count += 1
return count, size
def process_samples(*sample_jobs):
count = 0
size = 0
for j in sample_jobs:
count += j.result[0]
size += j.result[1]
return 4.0 * count / size
def choose_generate_sample(agent):
return agent.queue.claim(
lambda j: j.callable.__name__ == 'generate_sample')
def choose_another(agent):
return agent.queue.claim(
lambda j: j.callable.__name__ != 'generate_sample')
def install_agent(db):
conn = db.open()
try:
q = zc.async.queue.getDefaultQueue(conn)
try:
dispatcher = q.dispatchers[zc.async.instanceuuid.UUID]
except KeyError:
twisted.internet.reactor.callLater(0.05, install_agent, db)
else:
if 'generate_sample' not in dispatcher:
agent = dispatcher['main']
agent.chooser = choose_another
dispatcher['generate_sample'] = zc.async.agent.Agent(
choose_generate_sample, 1)
transaction.commit()
finally:
transaction.abort()
conn.close()
if __name__ == '__main__':
storage = ZEO.ClientStorage.ClientStorage(
('127.0.0.1', 9999))
db = ZODB.DB(storage)
zc.async.configure.base()
zc.async.configure.start(
db, poll_interval=0.1, twisted=True)
twisted.internet.reactor.callWhenRunning(install_agent, db)
twisted.internet.reactor.run()
|
Now let’s start up our workers, and see how they work. We’re going to have two workers now, and they will each need separate UUIDs. A really simple approach will be to make two separate working directories for the two worker processes. (We also could use the environmental variable, ZC_ASYNC_UUID, described in the Process UUIDs section above.)
$ mkdir worker1 $ mv uuid.txt worker1 $ cd worker1 $ ../bin/python ../lib/python2.5/site-packages/pi.py & $ cd .. $ mkdir worker2 $ cd worker2 $ ../bin/python ../lib/python2.5/site-packages/pi.py &
Now we’ll start the Python process in which we will test our code. We’ll move to the main directory, but as long as we don’t start another worker, it doesn’t really matter.
$ cd .. $ ./bin/python
And now, our test.
import ZEO.ClientStorage
import ZODB
storage = ZEO.ClientStorage.ClientStorage(
('127.0.0.1', 9999))
db = ZODB.DB(storage)
conn = db.open()
import zc.async.configure
zc.async.configure.base()
import pi
import zc.async.job
import zc.async.queue
q = zc.async.queue.getDefaultQueue(conn)
j = q.put(zc.async.job.parallel(
zc.async.job.Job(pi.generate_sample, 5000000),
zc.async.job.Job(pi.generate_sample, size=5000000),
postprocess=pi.process_samples))
import transaction
transaction.commit()
Wait a few seconds and then try these lines.
_ = transaction.begin()
j.result
If the result is empty (None), repeat those two lines again (thatæ is, begin the transaction again and check the result again). Eventually, these lines should give you a result: an approximation of pi.
Just to prove to ourselves that we saved some time, let’s do a comparison test: the same number of samples, but not in parallel.
j2 = q.put(zc.async.job.parallel(
zc.async.job.Job(pi.generate_sample, 10000000),
postprocess=pi.process_samples))
transaction.commit()
_ = transaction.begin()
j2.result
Once both jobs are complete, compare their run-time.
j.active_end - j.active_start
j2.active_end - j2.active_start
On my machine, even in this simple, short example, running in parallel with a job per processor/core took 7.8 seconds, while running all in one process took 13.4 seconds.
Soon, you may want to be able to monitor or introspect what’s going on in your zc.async work. The package provides several tools to do that. We’ll take a look at a few here.
We will be turning on a monitor port. This port should be protected behind a firewall, like your ZEO ports.
If you like the functionality that we describe here but would prefer to expose it in a different manner, note that most of the Python functions in monitor.py and monitordb.py power the zc.async commands in the monitor port, and can be used without the monitor itself.
To enable the monitoring port, we need to install some extra dependencies for zc.async: “[monitor]”. Exit the Python interpreter an make sure you are in the top “quickstart” directory, and then enter this command:
$ ./bin/easy_install zc.async[monitor]
If you take a glance at the output, you’ll see we’ve only added a few dependencies: simplejson, zc.ngi, and zc.monitor.
Now let’s turn on the port and the zc.async commands.
At the top of the pi.py file, add some imports:
import os
import zope.component
import zc.monitor
import zc.monitor.interfaces
import zc.async.monitor
import zc.async.monitordb
Then down in the if __name__ == '__main__': block, add these lines at the top.
monitor_port = os.environ.get('MONITOR_PORT')
if monitor_port:
for f in (zc.monitor.interactive, zc.monitor.quit, zc.monitor.help,
zc.async.monitor.async, zc.async.monitordb.asyncdb):
zope.component.provideUtility(
f, zc.monitor.interfaces.IMonitorPlugin, f.__name__)
zc.monitor.start(int(monitor_port))
The file should look like this now.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | import os
import random
import math
import ZEO.ClientStorage
import ZODB
import transaction
import twisted.internet.reactor
import zc.monitor
import zc.monitor.interfaces
import zope.component
import zc.async.configure
import zc.async.queue
import zc.async.instanceuuid
import zc.async.agent
import zc.async.monitor
import zc.async.monitordb
def generate_sample(size=100000):
count = 0
for i in range(size):
if math.hypot(random.random(), random.random()) < 1:
count += 1
return count, size
def process_samples(*sample_jobs):
count = 0
size = 0
for j in sample_jobs:
count += j.result[0]
size += j.result[1]
return 4.0 * count / size
def choose_generate_sample(agent):
return agent.queue.claim(
lambda j: j.callable.__name__ == 'generate_sample')
def choose_another(agent):
return agent.queue.claim(
lambda j: j.callable.__name__ != 'generate_sample')
def install_agent(db):
conn = db.open()
try:
q = zc.async.queue.getDefaultQueue(conn)
try:
dispatcher = q.dispatchers[zc.async.instanceuuid.UUID]
except KeyError:
twisted.internet.reactor.callLater(0.05, install_agent, db)
else:
if 'generate_sample' not in dispatcher:
agent = dispatcher['main']
agent.chooser = choose_another
dispatcher['generate_sample'] = zc.async.agent.Agent(
choose_generate_sample, 1)
transaction.commit()
finally:
transaction.abort()
conn.close()
if __name__ == '__main__':
monitor_port = os.environ.get('MONITOR_PORT')
if monitor_port:
for f in (zc.monitor.interactive, zc.monitor.quit, zc.monitor.help,
zc.async.monitor.async, zc.async.monitordb.asyncdb):
zope.component.provideUtility(
f, zc.monitor.interfaces.IMonitorPlugin, f.__name__)
zc.monitor.start(int(monitor_port))
storage = ZEO.ClientStorage.ClientStorage(
('127.0.0.1', 9999))
db = ZODB.DB(storage)
zc.async.configure.base()
zc.async.configure.start(
db, poll_interval=0.1, twisted=True)
twisted.internet.reactor.callWhenRunning(install_agent, db)
twisted.internet.reactor.run()
|
Now stop and restart your two worker instances, this time providing two different ports in the environment for each worker. Here’s one way to do it. First we’ll shut down the previous instances. If you used the lines above to start them before, type fg and RETURN, and then CTRL-C to stop worker 2; and then do the same thing (fg and RETURN, and then CTRL-C) to stop worker 1.
$ cd worker1 $ MONITOR_PORT=9991 ../bin/python ../lib/python2.5/site-packages/pi.py & $ cd ../worker2 $ MONITOR_PORT=9992 ../bin/python ../lib/python2.5/site-packages/pi.py &
Now you can open connections to these two ports, 9991 and 9992, and query the worker (using the async command, primarily) and the state of zc.async in the database itself (the asyncdb command).
The easiest way to experiment with this is using telnet. Try this.
$ telnet 127.0.0.1 9991
You should see something like this:
Trying 127.0.0.1... Connected to localhost. Escape character is '^]'.
Now you can experiment with commands. Try this (followed by a RETURN):
help
You should see something like this:
Supported commands: async -- Monitor zc.async activity in this process. asyncdb -- Monitor and introspect zc.async activity in the database. help -- Get help about server commands interactive -- Turn on monitor's interactive mode quit -- Quit the monitor Connection closed by foreign host. $
Hm, it dumped us straight back to the shell! zc.monitor behaves that way tp be friendly to automated monitoring processes using the port. We can use the interactive command to make things a bit more pleasant for ourselves.
Reuse the telnet command shown above, or maybe connect to 9992 (telnet 127.0.0.1 9992) to see that you can. This time, type the interactive command first. You should see this reply:
Interactive mode on. Use "quit" To exit.
Now experiment! Try some of these commands to see what you get.
help async async help async help status async status async help poll async poll async UUID async utcnow help asyncdb asyncdb help asyncdb help UUIDs asyncdb UUIDs asyncdb lastcompleted asyncdb count completed asyncdb count completed uuid:THIS
That last command shows how many jobs this worker has completed for as long as the database has records, which by default means between seven and eight days. The help for asyncdb count, which is available from asyncdb help count, is not short, but tells you the many options available.
When you are done, use the quit command to exit the telnet connection to the monitor port.
Another important tool is logging. The zc.async.event logger gets important events–pay special attention to critical events! zc.async.trace lets you follow along with every detail, if so desired.
These monitoring and introspection tools, combined with logging, provide powerful tools to keep track of your zc.async work.
We’re at the end of this quickstart. To close, here’s a quick survey of some other configuration opportunities available that we haven’t seen here.
Note
Unlike Twisted callbacks, all callbacks for the same job get the same result; if you would like to chain results, the callbacks themselves are Jobs, so attach a callback to your callback.
Note
There is no built-in way to stop a running job, short of stopping the process. This can be approximated by use in your job of getLiveAnnotation() to poll for stop requests; or the brave can write some C to use PyThreadState_SetAsyncExc.
There are many other topics to discuss–testing, debugging, and Zope 3 integration, for instance–but this is a quick start, so we’ll end here.