The Network Gateway Interface provides:
Network programs are typically difficult to test because they require setting up network connections, clients, and servers.
The Network Gateway Interface (NGI) seeks to improve this situation by separating application code from network code [1]. NGI provides a layered architecture with pluggable networking implementations. This allows application and network code to be tested independently and provides greater separation of concerns. A testing implementation supports testing application code without making network calls.
NGI defines 2 groups of interfaces: application and implementation. Application interfaces are implemented by people writing applications and application-level libraries calling implementation interfaces.
NGI is primarily an asynchronous event-driven networking library. Applications provide handlers that respond to network events. The application interfaces define these handlers:
The implementation APIs provide (or mimic) low-level networking APIs and include:
We’ll look at these interfaces in more detail in the following sections.
The core application interface in NGI is IConnectionHandler. It’s an event-based API that’s used to exchange data with a peer on the other side of a connection. Let’s look at a simple echo server that accepts input and sends it back after converting it to upper case:
class Echo:
def handle_input(self, connection, data):
connection.write(data.upper())
def handle_close(self, connection, reason):
print 'closed', reason
def handle_exception(self, connection, exception):
print 'oops', exception
There are only 3 methods in the interface, 2 of which are optional. Each of the 3 methods takes a connection object, implementing IConnection. Typically, connection handlers will call the write, writelines [2], or close methods from the handler’s handle_input method.
The handler’s handle_close and handle_exception methods are optional. The handle_exception method is only called if an iterator created from an iterable passed to writelines raises an exception. If a call to handle_exception fails, or if handle_exception isn’t implemented, an implementation will close the connection and call handle_close (if it is implemented).
The handle_close method is called when a connection is closed other than through the connection handler calling the connection’s close method. For many applications, this is uninteresting, which is why the method is optional. Clients that maintain long-running connections, may try to create new connections when notified that a connection has closed.
Testing a connection handler is easy. Just call its methods passing suitable arguments. The zc.ngi.testing module provides a connection implementation designed to make testing convenient. For example, to test our Echo connection handler, we can use code like the following:
>>> import zc.ngi.testing
>>> connection = zc.ngi.testing.Connection()
>>> handler = Echo()
>>> handler.handle_input(connection, 'hello out there')
-> 'HELLO OUT THERE'
Any data written to the test connection, using its write or writelines methods, is written to standard output preceded by “-> “:
>>> handler.handle_close(connection, 'done')
closed done
Implementing servers is only slightly more involved that implementing connection handlers. A server is just a callable that takes a connection and gives it a handler by calling set_hsndler. For example, we can use a simple function to implement a server for the Echo handler:
def echo_server(connection):
connection.set_handler(Echo())
Finally, we have to listen for connections on an address by calling an implementation’s listener method. NGI comes with 2 implementation modules [3]. The zc.ngi.testing module provides an implementation for testing applications.
The zc.ngi.async module provides a collection of implementations based on the asyncore module from the Python standard library. These implementations differ based on the way they handle threads. Perhaps the simplest of these is the zc.ngi.async.main implementation:
import zc.ngi.async
address = 'localhost', 8000
listener = zc.ngi.async.main.listener(address, echo_server)
zc.ngi.async.main.loop()
In this example, we listen for connections to our echo server on port 8000. The listener method returns a listener object. We’ll say more about these objects in a little bit. We then call the zc.ngi.async.main.loop method, which blocks until either:
I encourage you to try the above example. Write a script that contains the Echo and echo_server implementations and that calls zc.ngi.async.main.listener and zc.ngi.async.main.main as shown above. Run the script in a shell/terminal window and, in a separate window, telnet to your server and type some text.
It’s often simplest to implement a server using a connection handler class that takes a connection in it’s constructor:
class EchoServer:
def __init__(self, connection):
connection.set_handler(self)
def handle_input(self, connection, data):
connection.write(data.upper())
def handle_close(self, connection, reason):
print 'closed', reason
def handle_exception(self, connection, exception):
print 'oops', exception
Remember a server is just a callable that takes a connection and sets its handler.
The testing implementation provides a listener function:
>>> listener = zc.ngi.testing.listener('addr', EchoServer)
This is primarily useful when you want to connect client and server handlers, as we’ll discuss later. The address passed to the testing listener function can be any hashable object.
Creating a testing listener causes it to be registered in a mapping so it can be connected to later. For this reason, it’s important to close any listeners created in tests.
Listener objects, returned by an implementation’s listener method, provide methods for controlling listeners. The connections method returns an iterable of open connections to a server:
>>> list(listener.connections())
[]
We can stop listening by calling a listener’s close method:
>>> listener.close()
NGI tries to accommodate threaded applications without imposing thread-safety requirements.
Implementation (IImplementation) methods connect, listener, udp and udp_listener are thread safe. They may be called at any time by any thread.
Connection (IConnection) methods write, writelines, and close are thread safe. They may be called at any time by any thread.
The connection set_handler method must only be called in a connect handler’s connected method or a connection handler’s handle_input method.
Listener (IListener) methods connections and close are thread safe. They may be called at any time by any thread.
Application handler methods need not be thread safe. NGI implementations will never call them from more than one thread at a time.
Handlers block implementations When an implementatuon calls a handler, it is blocked from handling other network events until the handler returns.
The zc.ngi.async module provides a number of threading models. The zc.ngi.async module works by running one or more “loops”. These loops wait for networking events and call application handlers.
In this model, the application is responsible for calling zc.ngi.async.main.loop, typically from an application’s main thread. This is most appropriate for simple single-threaded applications that do nothing but respond to application events.
The loop call blocks until an exception is raised by a handler or until there are no more handlers registered with the implementation.
In this model, the zc.ngi.async module maintains its own loop thread. This is the default implementation, provided by the module itself. It is appropriate when implementing libraries that perform networking to perform their function. The advantage of this approach is that it is less intrusive to applications. The loop thread is managed automatically.
Note that the thread used by zc.ngi.async is “daemonic”, meaning that if the main program thread exits, the zc.ngi.async thread won’t keep the program running. If a program registers handlers with the zc.ngi.async implementation and then exists, the program will exit without the handlers being called. If the application doesn’t have other work to do, it should use zc.ngi.async.main or take other steps to keep the application running.
An advantage of the application-managed loop options is that exceptions raised by handlers are propagated to the application. When an implementation manages a loop thread, it logs exceptions.
With a single loop, all networking activity is done in one thread. If a handler takes a long time to perform some function, it prevents other networking activity from proceeding. For this reason, when a single loop is used, it’s important that handlers perform their work quickly, without blocking for any significant length of time.
If a loop is only servicing a single handler, or a small number of handlers, it’s not a problem if a handler takes along time to respond to a network event.
If you need to do a lot of work in response to network events, consider using multiple loops, or using thread pools (or multiprocessing pools) connected to your handlers with queues.
If you’re going to be dealing with lots of network connections, it’s probably better to use a single loop (or few loops) and use non-blocking handlers. Many non-blocking handlers can be efficiently managed at once. Compared to handlers, threads are relatively heavy weight, with large memory requirements and relatively long start-up times.
We saw earlier that we implemented connection handlers by implementing the IConnectionHandler in a class that provided, at a minimum, a handle_input method. This is pretty straightforward. The handle_input method simply reacts to input data. Unfortunately, for many applications, this can make application logic harder to express. Sometimes, a more imperative style leads to simpler application logic.
Let’s look at an example. We’ll implement a simple word-count server connection handler that implements something akin to the Unix wc command. It takes a line of input containing a text length followed by length bytes of data. After receiving the length bytes of data, it sends back a line of data containing line and word counts:
class WC:
input = ''
count = None
def handle_input(self, connection, data):
self.input += data
if self.count is None:
if '\n' not in self.input:
return
count, self.input = self.input.split('\n', 1)
self.count = int(count)
if len(self.input) < self.count:
return
data = self.input[:self.count]
self.input = self.input[self.count:]
self.count = None
connection.write(
'%d %d\n' % (len(data.split('\n')), len(data.split())))
Here, we omitted the optional handle_close and handle_exception methods. The implementation is a bit complicated. We have to use instance variables to keep track of state between calls. Note that we can’t count on data coming in a line at a time or make any assumptions about the amount of data we’ll receive in a handle_input call. The logic is further complicated by the fact that we have two modes of collecting input. In the first mode, we’re collecting a length. In the second mode, we’re collecting input for analysis.
Connection handlers can often be simplified by writing them as generators, using the zc.ngi.generator.handler decorator:
import zc.ngi.generator
@zc.ngi.generator.handler
def wc(connection):
input = ''
while 1:
while '\n' not in input:
input += (yield)
count, input = input.split('\n', 1)
count = int(count)
while len(input) < count:
input += (yield)
data = input[:count]
connection.write(
'%d %d\n' % (len(data.split('\n')), len(data.split())))
input = input[count:]
The generator takes a connection object and gets data via yield expressions. The yield expressions can raise exceptions. In particular, a GeneratorExit exception is raised when the connection is closed by the connection peer. The yield statement will also (re)raise any exceptions raised when calling an iterator passed to writelines.
A generator-based handler is instantiated by calling it with a connection object:
>>> handler = wc(connection)
>>> handler.handle_input(connection, '15')
>>> handler.handle_input(connection, '\nhello out\nthere')
-> '2 3\n'
>>> handler.handle_close(connection, 'done')
There are a number of things to note about generator-based handlers:
Implementing clients is a little bit more involved than implementing servers because, in addition to handling connections, you have to initiate the connections in the first place. This involves implementing client connect handlers. You request a connection by calling an implementation’s connect function, passing an address and a connect handler. The handler’s connected method is called if the connection succeeds and the handler’s failed_connect method is called if it fails.
Let’s implement a word-count client. It will take a string and use a word-count server to get its line and word counts:
class WCClient:
def __init__(self, data):
self.data = data
def connected(self, connection):
connection.set_handler(LineReader())
connection.write(self.data)
def failed_connect(self, reason):
print 'failed', reason
class LineReader:
input = ''
def handle_input(self, connection, data):
self.input += data
if '\n' in self.input:
print 'LineReader got', self.input
connection.close()
We test client connect handlers the same way we test connection handlers and servers, by calling their methods:
>>> wcc = WCClient('Hello out\nthere')
>>> wcc.failed_connect('test')
failed test
>>> connection = zc.ngi.testing.Connection()
>>> wcc.connected(connection)
-> 'Hello out\nthere'
In this example, the connect handler set the connection handler to an instance of LineReader and wrote the data to be analyzed to the connection. We now want to send some test result data to the reader. If we call the connection’s write method, the data we pass will just be printed, as the data the connect handler passed to the connection write method was. We want to play the role of the server. To do that, we need to get the test connection’s peer and call its write method:
>>> connection.peer.write('text from server\n')
LineReader got text from server
<BLANKLINE>
-> CLOSE
Testing connections are always created in pairs. Each connection in the pair is the other’s peer:
>>> connection.peer.peer is connection
True
When a connection is created directly, it’s peer has a simple printing handler, which is why, when we write to the connection, the text we write is written out with a marker. When we create a connection by connecting to a listener, the connections’s peer’s handler is the server used to create the listener.
A connect handler can be its own connection handler:
class WCClient:
def __init__(self, data):
self.data = data
def connected(self, connection):
connection.set_handler(self)
connection.write("%s\n%s" % (len(self.data), self.data))
def failed_connect(self, reason):
print 'failed', reason
input = ''
def handle_input(self, connection, data):
self.input += data
if '\n' in self.input:
print 'WCClient got', self.input
connection.close()
and, of course, a generator can be used in the connected method:
class WCClientG:
def __init__(self, data):
self.data = data
@zc.ngi.generator.handler
def connected(self, connection):
connection.write("%s\n%s" % (len(self.data), self.data))
input = ''
while '\n' not in input:
input += (yield)
print 'Got', input
def failed_connect(self, reason):
print 'failed', reason
A generator can also be used as a client connect handler. The failed_connect method provided by a generator handler simply raises an exception. For this reason, generator handlers are generally only appropriate in ad hoc situations, like simple client scripts, typically using zc.ngi.async.main, where exceptions are propagated to the zc.ngi.async.main.loop call.
Implementations provide a connect method that takes an address and connect handler.
Let’s put everything together and connect our server and client implementations. First, we’ll do this with the testing implementation:
>>> listener = zc.ngi.testing.listener(address, wc)
>>> zc.ngi.testing.connect(address, WCClient('hi\nout there'))
WCClient got 2 3
<BLANKLINE>
The testing listener method not only creates a listener, but also makes in available for connecting with the connect method.
We’ll see the same behavior with the zc.ngi.async implementation:
>>> listener = zc.ngi.async.listener(address, wc)
>>> zc.ngi.async.connect(address, WCClient('hi out\nthere'))
WCClient got 2 3
<BLANKLINE>
We’ll often refer to the connect method as a “connector”. Applications that maintain long-running connections will often need to reconnect when connections are lost or retry connections when they fail. In situations like this, we’ll often pass a connector to the application so that it can reconnect or retry a connection when needed.
When testing application connection logic, you’ll typically create your own connector object. This is especially important if applications reconnect when a connection is lost or fails. Let’s look at an example. Here’s a client application that does nothing but try to stay connected:
class Stay:
def __init__(self, address, connector):
self.address = address
self.connector = connector
self.connector(self.address, self)
def connected(self, connection):
connection.set_handler(self)
def failed_connect(self, reason):
print 'failed connect', reason
self.connector(self.address, self)
def handle_input(self, connection, data):
print 'got', repr(data)
def handle_close(self, connection, reason):
print 'closed', reason
self.connector(self.address, self)
To try this out, we’ll create a trivial connector that just notes the attempt:
def connector(addr, handler):
print 'connect request', addr, handler.__class__.__name__
global connect_handler
connect_handler = handler
Now, if we create a Stay instance, it will call the connector passed to it:
>>> handler = Stay(('', 8000), connector)
connect request ('', 8000) Stay
>>> connect_handler is handler
True
If the connection fails, the Stay handler will try it again:
>>> handler.failed_connect('test')
failed connect test
connect request ('', 8000) Stay
>>> connect_handler is handler
True
If it succeeds and then is closed, the Stay connection handler will reconnect:
>>> connection = zc.ngi.testing.Connection()
>>> handler.connected(connection)
>>> connection.handler is handler
True
>>> connect_handler = None
>>> handler.handle_close(connection, 'test')
closed test
connect request ('', 8000) Stay
>>> connect_handler is handler
True
The zc.ngi.testing module provides a test connector. If a listener is registered, then connections to it will succeed, otherwise it will fail. It will raise an exception if it’s called in response to a failed_connect call to prevent infinite loops:
>>> _ = Stay(('', 8000), zc.ngi.testing.connect)
failed connect no such server
For address, ('', 8000), a connect handler called connect from a
failed_connect call.
Often, connection handlers have 2 functions:
Examples of low-level protocols include line-oriented protocols where messages are line terminated, and sized-message protocols, where messages are preceded by message sizes. The word-count example above used a sized-message protocol. A common pattern in NGI is to separate low-level protocol handling into a separate component using a connection adapter. When we get a connection, we wrap it with an adapter to perform the low-level processing. Here’s an adapter that deals with the handling of sized messages for the word-count example:
class Sized:
def __init__(self, connection):
self.input = ''
self.handler = self.count = None
self.connection = connection
self.close = connection.close
self.write = connection.write
self.writelines = connection.writelines
def set_handler(self, handler):
self.handler = handler
if hasattr(handler, 'handle_close'):
self.handle_close = handler.handle_close
if hasattr(handler, 'handle_exception'):
self.handle_exception = handler.handle_exception
self.connection.set_handler(self)
def handle_input(self, connection, data):
self.input += data
if self.count is None:
if '\n' not in self.input:
return
count, self.input = self.input.split('\n', 1)
self.count = int(count)
if len(self.input) < self.count:
return
data = self.input[:self.count]
self.input = self.input[self.count:]
self.handler.handle_input(self, data)
With this adapter, we can now write a much simpler version of the word-count server:
class WCAdapted:
def __init__(self, connection):
Sized(connection).set_handler(self)
def handle_input(self, connection, data):
connection.write(
'%d %d\n' % (len(data.split('\n')),
len(data.split())))
We can also use adapters with generator-based handlers by passing an adapter factory to zc.ngi.generator.handler using the connection_adapter keyword argument. Here’s the generator version of the word count server using an adapter:
@zc.ngi.generator.handler(connection_adapter=Sized)
def wcadapted(connection):
while 1:
data = (yield)
connection.write(
'%d %d\n' % (len(data.split('\n')),
len(data.split())))
By separating the low-level protocol handling from the application logic, we can reuse the low-level protocol in other applications, and we can use other low-level protocol with our word-count application.
The zc.ngi.adapters module provides 2 connection adapters:
The Lines and Sized adapter classes provide a handler class method that provide slightly nicer ways of defining generator-based handlers:
import zc.ngi.adapters
@zc.ngi.adapters.Lines.handler
def example(connection):
print (yield)
Here we’ve defined a defined a generator-based adapter that uses the Lines adapter.
You may need to make a few networking requests in a script. You typically want to make the requests, block until they’re done, and then go on about your business. The zc.ngi.async implementations provide a wait method that can be used in this situation. The wait` method blocks until there are no outstanding requests, or until an optional timeout has passed.
For example, suppose a word-count server is running on an address. We can use the following script to get the word counts for a set of strings:
result = []
def get_word_count(s):
@zc.ngi.adapters.Lines.handler
def getwc(connection):
connection.write("%s\n" % len(s))
connection.write(s)
result.append((yield))
zc.ngi.async.main.connect(address, getwc)
for s in 'Hello\nworld\n', 'hi\n':
get_word_count(s)
zc.ngi.async.main.wait(10)
print sorted(result)
If the wait call times out, a zc.ngi.interfaces.Timeout exception will be raised.
Most scripts will use an Inline implementation, like zc.ngi.async.main because errors raised by handlers are propagated to the callers.
A possible advantage of the non-inline implementations (zc.ngi.async and instances of zc.ngi.async.Implementation) is that, because the network requests are handled in a separate thread, an application can do other work while requests are being handled and before calling wait.
The NGI also supports UDP networking. Applications can send UDP messages by calling an implementation’s udp method:
>>> zc.ngi.testing.udp(('', 8000), 'hello udp')
If there isn’t a UDP listener registered, then nothing will happen.
You can also listen for UDP requests by registering a callable with an implementation’s udp_listener:
>>> def handle(addr, s):
... print 'got udp', s, 'from address', addr
>>> listener = zc.ngi.testing.udp_listener(('', 8000), handle)
>>> zc.ngi.testing.udp(('', 8000), 'hello udp')
got udp hello udp from address <test>
>>> listener.close()
>>> zc.ngi.testing.udp(('', 8000), 'hello udp')
[1] | The Twisted networking framework also provides this separation. Twisted doesn’t leverage this separation to provide a clean testing environment as NGI does, although it’s likely that it will in the future. A twisted implementation for NGI is planned. |
[2] | The writelines method takes an iterable object. |
[3] | A number of implementations based on Twisted are planned, including a basic Twisted implementation and an implementation using twisted.conch that will support communication over ssh channels. |