6. Pyro Event Server

Introduction

In various situations it is needed that the servers and the clients are decoupled. In abstract terms this means that information producers do not know nor care about the parties that are interested in the information, and the information consumers do not know nor care about the source or sources of the information. All they know is that they produce or consume information on a certain subject.

Here does the Event Server fit in nicely. It is a third party that controls the flow of information about certain subjects ("events"). A publisher uses the Event Server to publish a message on a specific subject. A subscriber uses the Event Server to subscribe itself to specific subjects, or to a pattern that matches certain subjects. As soon as new information on a subject is produced (an "event" occurs) all subscribers for this subject receive the information. Nobody knows (and cares) about anybody else.

It is important to rembember that all events processed by the ES are transient, which means they are not stored. If there is no listener, all events disappear in the void. The store-and-forward programming model is part of a messaging service, which is not what the ES is meant to do. It is also important to know that all subscription data is transient. Once the ES is stopped, all subscriptions are lost. The clients that are subscribed are not notified of this! If no care is taken, they keep on waiting forever for events to occur, because the ES doesn't know about them anymore!

Usually your subscribers will receive the events in the order they are published. However, this is not guaranteed. If you rely on the exact order of receiving events, you must add some logic to check this (possibly by examining the event's timestamps). The chance of events not arriving in the order they were published is very, very small in a high-performance LAN. Only on very high server load, high network traffic, or a high-latency (WAN?) connection it is likely to occur.

Another thing to pay attention to is that the ES does not guarantee delivery of events. As mentioned above, the ES does not have a store-and-forward mechanism, but even if everything is up and running, the ES does not enhance Pyro's way of transporting messages. This means that it's still possible (perhaps due to a network error) that an event gets lost. For reliable, guaranteed, asynchronous message delivery you'll have to look somewhere else, sorry ;-)

The ES is a multithreaded server and will not work if your Python installation doesn't have thread support. Publications are dispatched to the subscribers in different threads, so they don't block eachother. Please note that events may arrive at your listener in multithreaded fashion! Pyro itself starts another thread in your listener to handle the new event, possibly while the previous one is still being handled. Theevent method may be called concurrently from several threads. If you can't handle this, you have to use some form of thread locking in your client! (see the threading module on Semaphore), or Pyro.util.getLockObject.

To summarize:

Starting the Event Server

Start the ES using the pyro-es command from the bin directory (use pyro-es.cmd on windows). You can specify the following arguments:

pyro-es [-h] [-n hostname] [-p port] [-N] [-i identification]
-h
Print help.
-n hostname
Change the hostname/ip address the server binds on. Useful with multiple network adapters.
-p port
Change the port number the server uses. (Omit to use Pyro defaults, 0 to let the operating system choose a random port).
-N
Do not use the Name server
-i identification
Specify the authentication passphrase that will be required to connect to this server. If it contains spaces, use quotes around the string. The same identification is also used to connect to other Pyro servers such as the Name Server. (this is required ofcourse when the Name Server has been started with the -i option).

There is also: pyro-essvc   (Windows-only Event Server 'NT-service' control scripts)
- Arguments: [options] install|update|remove|start [...]|stop|restart [...]
- On windows NT (2000/XP) systems, it's possible to register and start the Event server as a NT-service. You'll have to use the essvc.cmd script to register it as a service. Make sure you have Pyro properly installed in your Python's site-packages. Or make sure to register the service using an account with the correct PYTHONPATH setting, so that Pyro can be located. The ES service logs to C:\Pyro_ES_svc.log where C: is your system drive.
You can configure command line arguments for this service in the Registry. The key is: HKLM\System\CurrentControlSet\Services\PyroES, and the value under that key is: PyroServiceArguments (REG_SZ, it will be asked and created for you when doing a essvc.cmd install from a command prompt).
Running the ES as a windows NT service it not well supported.
You can also use python -m to start it:
python -m Pyro.EventService.Server

Like the Name Server, if you want to start the Event Server from within your own program, you can ofcourse start it by executing the start script mentioned above. You could also use the EventServiceStarter class from the Pyro.EventService.Server module to start it directly (this is what the script also does). Be sure to start it in a separate process or thread because it will run in its own endless loop. Have a look at the "AllInOne" example to see how you can start the Event Server using the EventServiceStarter class.
You probably have to wait until the ES has been fully started, call the waitUntilStarted() method on the starter object. It returns true if the ES has been started, false if it is not yet ready. You can provide a timeout argument (in seconds).

To start the ES you will first have to start the Name Server because the ES needs that to register itself. After starting the ES you will then see something like this:

*** Pyro Event Server ***<
Pyro Server Initialized. Using Pyro V3.2
URI= PYRO://192.168.1.40:7766/c0a8012804bc0c96774244d7d79d5db3
Event Server started.

Configuration options

There are two config options specifically for the ES: PYRO_ES_QUEUESIZE and PYRO_ES_BLOCKQUEUE. Read about them in the Installation and Configuration chapter. By default, the ES will allocate moderately sized queues for subscribers, and publishers will block if such a queue becomes full (so no events get lost). You might want to change this behavior. Every subscriber has its own queue. So if the queue of a slow subscriber fills up, other subscribers are still serviced nicely. By setting PYRO_ES_BLOCKQUEUE to 0, new messages for full queues are lost. This may be a way to allow slow subscribers to catch up, because new messages are put in the queue when there is room again. Note that only messages to the slow or frozen subscribers are lost, normal running subscribers still receive these messages.

Using the Event Server (publish)

The ES is just a regular Pyro object, with a few helper classes. Its name (to look it up in the Name Server) is available in Pyro.constants.EVENTSERVER_NAME. All subjects are case insensitive, so if you publish something on the "stockquotes" channel it is the same as if you published it on the "STOCKQuotes" channel.

To publish an event on a certain topic, you need to have a Pyro proxy object for the ES, and then call the publish method:publish(subjects, message) where subjects is a subject name or a sequence of one or more subject names (strings), and message is the actual message. The message can be any Python object (as long as it can be pickled):

import Pyro.core
import Pyro.constants
Pyro.core.initClient()
es = Pyro.core.getProxyForURI("PYRONAME://"+Pyro.constants.EVENTSERVER_NAME)
es.publish("StockQuotes",( "SUN", 22.44 ) )

If you think this is too much work, or if you want to abstract from the Pyro details, you can use the Publisher base class that is provided in Pyro.EventService.Clients. Subclass your event publishers from this class. The init takes care of locating the ES, and you can just call the publish(subjects, message) method of the base class. No ES proxy code needed:

import Pyro.EventService.Clients

class StockPublisher(Pyro.EventService.Clients.Publisher):
    def __init__(self):
        Pyro.EventService.Clients.Publisher.__init__(self)
    def publishQuote(self, symbol, quote):
        self.publish("StockQuotes", ( symbol, quote) )

sp = StockPublisher()
sp.publishQuote("SUN", 22.44)

Authentication passphrase

The __init__ of both the Publisher and the Subscriber takes an optional ident argument. Use this to specify the authentication passphrase that will be used to connect to the ES (and also to connect to the Name Server).

Not using the name server

The __init__ of both the Publisher and the Subscriber takes an optional esURI argument. Set it to the URI of the Event Server (string format) if you don't have a name server running. Look at the 'stockquotes' example to see how this can be done. Note that the Event service usually prints its URI when started.

Using the Event Server (subscribe)

As pointed out above, the ES is just a regular Pyro object, with a few helper classes. Its name (to look it up in the Name Server) is available in Pyro.constants.EVENTSERVER_NAME. All subjects are case insensitive, so if you publish something on the "stockquotes" channel it is the same as if you published it on the "STOCKQuotes" channel.

Event subscribers are a little more involved that event publishers. This is becaue they are full-blown Pyro server objects that receive calls from the ES when an event is published on one of the topics you've subscribed to! Therefore, your clients (subscribers) need to call the Pyro daemon's handleRequests or requestLoop (just like a Pyro server). They also have to call Pyro.core.initServer()because they also act as a Pyro server. Furthermore, they usually have to run as a multithreaded server, because the ES may call it as soon as a new event arrives and you are not done processing the previous event. Single-threaded servers will build up a backlog of undelivered events if this happens. You still get all events (with the original timestamp - so you could skip events that "have expired" to catch up). You can change this behavior by changing the before mentioned config items.

Subscribing to receive information

The Event Server has a few important methods that you'll be using to subscribe:
subscribe(subjects, subscriber) Subscribe to events. subjects is a subject name or a sequence of one or more subject names (strings), and subscriber is a proxy for your subscriber object
subscribeMatch(subjectPatterns, subscriber) Subscribe to events based on patterns. subjectPatterns is a subject pattern or a sequence of one or more subject patterns (strings), and subscriber is a proxy for your subscriber object
unsubscribe(subjects, subscriber) Unsubscribe from subjects. subjects is a subject or subject pattern or a sequence thereof, and subscriber is a proxy for your subscriber object

But first, create a subscriber object, which must be a Pyro object (or use delegation). The subscriber object should have an event(self, event) method. This method is called by the ES if a new event arrives on a channel you subscribed to. event is a Pyro.EventService.Event object, which has the following attributes:

msg the actual message that was published. Can be any Python object.
subject the subject (string) on which the message was published. (topic name)
time the event's timestamp (from the server - synchronised for all subscribers). A float, taken from time.time()

To subscribe, call the subscribe method of the ES with the desired subject(s) and a proxy for your subscriber object. If you want to subscribe to multiple subjects based on pattern matching, call the subscribeMatch method instead with the desired subject pattern(s) and a proxy for your subscriber object. The patterns are standard re-style regex expressions. See the standard re module for more information. The pattern '^STOCKQUOTE\\.S.*$' matches STOCKQUOTE.SUN, STOCKQUOTE.SAP but not STOCKQUOTE.IBM, NYSE.STOCKQUOTE.SUN etcetera. Once more: the subjects are case insensitive. The patterns are matched case insensitive too.

To unsubscribe, call the unsubscribe method with the subject(s) or pattern(s) you want to unsubscribe from, and a proxy for the subscriber object that has been previously subscribed. This will remove the subscriber from the subscription list and also from the pattern match list if the subject occurs as a pattern there. The ES (actually, Pyro) is smart enough to see if multiple (different) proxy objects point to the same subscriber object and will act correctly.

Using the Subscriber base class from the Event Server

As you can see it can be a bit complex to get your subcribers up and running. An easier way to do this is to use the Subscriber base class provided in Pyro.EventService.Clients. Subclass your event listeners (subscribers) from this class. The init takes care of locating the ES, and you can just call the subscribe(subjects),subscribeMatch(subjectPatterns) and unsubscribe(subjects) methods on the object itself. No ES proxy code needed. This base class also starts a Pyro daemon and by calling listen(), your code starts listening on incoming events. When you want to abort the event loop, you have to call self.abort() from within the event handler method.

The multithreading of the event method can be controlled using the setThreading(threading) method. If you threading=0, the threading will be switched off (it is on by default unless otherwise configured). Your events will then arrive purely sequentially, after processing each event. Call this method before entering the requestLoop or handleRequests or listen.

A minimalistic event listener that prints the stockquote events published by the example code above:

from Pyro.EventService.Clients import Subscriber
class StockSubscriber(Subscriber): def __init__(self): Subscriber.__init__(self) self.subscribe("StockQuotes") def event(self, event): print "Got a stockquote: %s=%f" % (event.msg) sub = StockSubscriber() sub.listen()

Authentication passphrase

The __init__ of both the Publisher and the Subscriber takes an optional ident argument. Use this to specify the authentication passphrase that will be used to connect to the ES (and also to connect to the Name Server).

Threads, Subscribers and Queues

As pointed out above the events are delivered to your subscribers in a multithreaded way. Your subscriber may still be processing an event when the next one arrives. Use the setThreading(threading) method of the Subscriber base class to control the threading. If you set threading=0, the threading will be switched off (it is on by default). But a better way to process events sequentially is to use Python's Queue module: you create a Queue in your subscriber process that is filled with arriving events, and you have a single event consumer process that takes events out of the queue one-by-one:
Pyro Event Server multithreaded
Subscriber(s) multithreaded
Queue.Queue
Consumer/Worker singlethreaded

Examples

To see how you use the ES, have a look at the "stockquotes" and "countingcars" examples. Also have a look at the client skeleton code that comes with the ES. To exercise the ES to the max, have a look at the fully threaded "stresstest" example. To see how to start and use the ES from within your own program, have a look at the "AllInOne" example.