Usage

This chapter describes how to use Traffic. While event driven I/O has some advantages, it also has some added difficulty. This chapter is structured to illustrate how to perform specific tasks in isolation. The Tutorials chapter contains some complete examples that may be more illuminating.

Traffic positions itself as a dependency target for frameworks. This means that direct use of traffic.libkqueue will always require some boilerplate. Traffic itself uses traffic.libloop in its tests and documentation in order to simplify the work.

Note

Reading Project prior to this chapter is recommended.

KQueue Implementation

Currently, this is the only implementation.

Creating Traffic Instances

Transits are objects that manage the resources and functionality that performs transfers. In order for this functionality to be used–saving initial file descriptor allocation, a Transit must be acquired, traffic.libkqueue.Traffic.acquire(), by a traffic.libkqueue.Traffic instance.

Traffic instances serve as a bucket for Transits, and, internally, the object holding the kqueue(2) file descriptor. The entire set can be terminated or disregarded at any time. Multiple Traffic instances can exist in a process so that distinct parts of a process can manage their I/O resources in isolation.

Traffic objects require no parameters:

import traffic.libkqueue
T = traffic.libkqueue.Traffic()

Establishing Connections

Internet Protocol Connections

Data connections are managed with the traffic.libkqueue.Octets Transit. The type is named after the subject that is transferred: “octets”.

Connections can be created by the class methods provided on the traffic.libkqueue.Octets type. These methods allocate the necessary resources to establish the connection. The method names define the means used to connect. For example:

(input, output) = traffic.libkqueue.Octets.connect_stream_ip4(('127.0.0.1', 8080))

The above example constructs a pair, tuple, of Octets instances that can be used to communicate with the server listening at the target IPv4 address and port. Likewise, traffic.libkqueue.Octets.connect_stream_ip6() will connect using IPv6.

Note

The class methods establishing bidirectional channels have to return a pair of Transits as an individual Transit only transfers in one direction. This is by design in order to allowing arbitrary grouping of channels and to reduce the number of possible event identifiers.

Interprocess Connections

Connections used for interprocess communication can be created using the traffic.libkqueue.Octets.connect_stream_socketpair() and the traffic.libkqueue.Octets.connect_pipe() class methods.

The connect_pipe method returns a pair of traffic.libkqueue.Octets, (read, write), where the latter writes into the former. This is primarily useful for worker processes that only need to receive work until terminated by the closure of the pipe. See pipe(2) for more information:

child, parent = traffic.libkqueue.Octets.connect_pipe()

The connect_stream_socketpair method returns two pairs of Octets, ((read, write), (read, write)). socketpair(2) can be used when the processes need communication as opposed to dictation:

child, parent = traffic.libkqueue.Octets.connect_stream_socketpair()
child_input, child_output = child
parent_input, parent_output = parent

Subsequently, the process can fork(2) and perform communication with the child process. See Forking for more information about how Traffic works in multiprocess situations.

Note

connect_pipe can be ran twice as an alternative to connect_stream_socketpair, but on UNIX systems, this ends using more file descriptors than socketpair(2).

Listening for Connections

A listening socket is managed using the traffic.libkqueue.Sockets type. The type is named after the subject that is transferred by the type. In this case, “sockets”, being the identifier used to refer to a particular connection, an integer file descriptor on most platforms.

When a transfer is performed by a traffic.libkqueue.Sockets instance, the mutable buffer acquired by the instance is filled with the accepted integers. Normally, it is best to use an array.array instance as the resource to be acquired by Sockets. This provides easy access to the information written into the mutable buffer. The traffic.libkqueue.Sockets.rallocate() method provides simple access to array allocation suitable for use as a Sockets’ resource:

conbuffer = traffic.libkqueue.Sockets.rallocate(64)
import array
assert conbuffer.__class__ is array.array

Sockets instances can be created by the class methods on the traffic.libkqueue.Sockets type:

For example, bind and accept sockets from port 8080 on localhost interfaces:

S4 = traffic.libkqueue.Sockets.accept_stream_ip4(('127.0.0.1', 8080), backlog = 32)
S6 = traffic.libkqueue.Sockets.accept_stream_ip6(('::1', 8080), backlog = 32)

Note

These class methods only initialize the listening socket. No accept(2) calls are performed at this time. The method names indicate the role of the new Sockets instance, not what is actually performed upon invocation.

Sockets can be created from arbitrary socket file descriptors, but it expects a fully initialized listening socket. In this fashion,

import os
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('127.0.0.1', 8080))
fd = os.dup(s.fileno)
s.close()
# accept_stream_socket() will perform the listen(2) call.
S = traffic.libkqueue.Sockets.accept_stream_socket(fd, backlog = 32)

This creation interface is provided in cases where some custom socket initialization needs to be performed by a separate process–consider projects like circus and supervisord that may pass in file descriptors.

Transferring

Internally, a transfer is possible when two conditions hold true: the transit has a resource for transmission, and the kernel has indicated that a transfer is possible by emitting an event. The actual transfer is performed by a Traffic instance which manages the subscription to kernel events. The Transit must be acquired by a Traffic instance in order to receive kernel events:

traffic = traffic.libkqueue.Traffic()
input, output = traffic.libkqueue.Octets.connect_stream_ip4(('127.0.0.1', 8080))
input.acquire(traffic.libkqueue.Octets.rallocate(24))
output.acquire(b'')
traffic.acquire(input)
traffic.acquire(output)

As shown in the above example, Transits can be given a resource using the traffic.libkqueue.Transit.acquire() method. Once a resource has been acquire, it is used until exhaustion: the resource has been completely transferred.

Merely acquiring resources, however, will not perform the transfers. The Traffic instance must perform a cycle in order to collect the kernel events and perform any possible transfers. Transfers only occur during Traffic cycles. Cycles can be performed by using the traffic instance as a context manager:

with traffic:
        pass

While inside a traffic cycle, the events that occurred can be accessed:

while not traffic.terminated:
        # Context manager interfaces
        with traffic:
                for transit in traffic.transfer():
    slice = transit.transfer()
    if slice is not None:
       transferred = transit.resource[slice]
       print(bytes(transferred))

The above example should illuminate the structure of a Traffic cycle. The Traffic Transit, transfers Transit objects that have produced events in the cycle. Outside of a cycle, the traffic.libkqueue.Traffic.transfer() method will produce an empty iterator.

Accepting Connections

Listening for Connections describes how to create a listening socket using traffic.libkqueue.Sockets. However, there is more work to be done in order to actually accept incoming connections.

Like traffic.libkqueue.Octets, a resource must be acquired in order to facilitate a transfer. In the case of traffic.libkqueue.Sockets, transfers are actually a sequence of accept(2) invocations writing into a Python buffer.

Using traffic.libkqueue.Sockets.rallocate(), an array.array instance can be created for subsequent acquisition:

lsocket = traffic.libkqueue.Sockets.accept_stream_ip4(('127.0.0.1', 8080))
buf = lsocket.rallocate(128) # NOTE: rallocate is a classmethod
lsocket.acquire(buf)

The lsocket instance, being a Transit, needs to be acquired by a Traffic instance in order to process incoming connections:

traffic = traffic.libkqueue.Traffic()
traffic.acquire(lsocket)

Terminating Connections

Termination occurs when either the Transit or the kernel notes that the Transit should be terminated. The Transit can be terminated by the process with the traffic.libkqueue.Transit.terminate() method. When executed, the Transit’s process resources and kernel resources will be released at the end of the cycle that first identifies event.

In the cases terminations caused by failures, the traffic.libkqueue.Transit.status() method returns an object describing the why the Transit was terminated. Essentially, this allows a user to identify whether or not the Transit was terminated by an internal request, or by an external force.

Octets & Socket Shutdown

Upon termination of an Octets instance whose file descriptor is a socket, shutdown(2) will be called against the socket with the corresponding direction of the Transit being terminated. Shutdown is performed at the beginning of the Traffic cycle, but the file descriptor itself will still be valid, like non-socket Octets. However, after the end of a terminating cycle, the validity of the file descriptor will not be guaranteed.

Note

In the case of sockets, it is possible that the file descriptor remains valid after a terminating cycle. However, this situation only occurs when the Transit managing the opposing direction is not terminated. This facet of termination should not be depended upon.

Controlling the Flow

With explicit acquisition of resources, flow control is performed by depriving a Transit of a Transfer Resource. If a transfer is desired, naturally, designate a resource to support the transfer with the traffic.libkqueue.Transit.acquire() method:

     T = traffic.libkqueue.Traffic()
     r, w = traffic.libkqueue.Octets.connect_pipe()
     r.acquire(r.rallocate(128))
     w.acquire(b'data')

# Traffic must acquire Transit resources as well.
     T.acquire(r)
     T.acquire(w)

If a pause of transferring is desired, deprive the Transit of a resource. However, deprivation cannot be directly performed on a Transit that already has a resource. A Transit’s resource must be exhausted before a true halt to transferring can occur.

Note

The desired concurrency of a Traffic cycle inhibits the implementation of a pause feature without the use of synchronization mechanisms and additional record keeping.

Traffic Loops

The Transferring section reveals the Traffic loop. It is a windowed event loop; events can only be accessed during the cycle. Traffic is designed to take in events in batches, between the GIL. This allows for data to be written into Python objects in a truly concurrent manner.

Forking

Traffic instances are designed to tolerate certain failures. In cases where it is evident that kqueue(2) file descriptor has been closed, Traffic, will end the cycle and create a new kqueue(2) instance. When this occurs, Traffic will mark each Transit as needing a subscription. Subsequently, the state will be restored.

However, such use in child processes is not recommended. Rather, in the child process, the inherited Traffic instance should be disregarded, traffic.libkqueue.Traffic.disregard(). Once disregarded, a new Traffic instance can be built and the inherited Transits can be acquired as needed.

Communicating With a Child Process

With a Traffic instance’s eventual destruction in the child on fork(2), using a Traffic based communication channel between the child and the parent takes some work.

The socket pair can be created right away:

traffic = traffic.libkqueue.Traffic()
pep, cep = traffic.libkqueue.Octets.connect_socketpair()

Remembering that these Transits haven’t been attached to a Traffic instance, the child endpoints, cep, can be passed to a function that constructs a new Traffic instance for the child to use:

def in_parent(pid, child_channel):
        for transit in child_channel:
                traffic.acquire(transit)

def in_child(ppid, parent_channel):
        global traffic
        traffic.close()
        traffic = traffic.libkqueue.Traffic()
        for transit in parent_channel:
                traffic.acquire(transit)

def do_fork():
        ppid = os.getpid()
        pid = os.fork()
        if pid == 0:
                # child process, disregard parent transits.
                for transit in pep:
                        transit.disregard()
                in_child(ppid, cep)
        else:
                # parent process, disregard child transits.
                for transit in cep:
                        transit.disregard()
                in_parent(pid, pep)

The use of the traffic.libkqueue.Transit.disregard() is very important here. In this particular case, if traffic.libkqueue.Transit.terminate() were used, the child’s socket would not be functional as terminate performs a shutdown(2) operation on the socket, permanently ending that connection. close(2) releases the parent’s reference to the child’s.

Inheriting Sockets in Child Processes

There are cases where inheriting sockets is desireable. Notably in pre-fork server models where the parent initializes the listening sockets, and the child processes service the incoming connections. In these cases, file descriptor duplication should be used in combination with traffic.libkqueue.Transit.disregard().

The traffic.libkqueue.Point._fileno attribute holds the file descriptor identifier.

Sending File Descriptors across Processes

There is a well hidden feature in UNIX systems that allows programmers to transfer file descriptors across a socketpair(2) connection. This feature is exposed by the traffic.libkqueue.Descriptors instance. Similiar to traffic.libkqueue.Sockets instances, an array of integers are the subject of the channel. The word “Descriptors” was chosen to denote that arbitrary file descriptors may be transferred.

..note:: Operating System limitations may apply.

Similar to connecting Octets, Descriptors are created using class methods:

parent, child = traffic.libkqueue.Descriptors.connect_socketpair()

If the socket file descriptor already exists, the traffic.libkqueue.Descriptors.connect_socket() method can be used:

parent, child = traffic.libkqueue.Descriptors.connect_socket(parentfd, childfd)

Sockets being full-duplex, the parent and child objects are tuples of traffic.libkqueue.Descriptors instances where the first item is the reader and the second, the writer.

Once the Descriptors instances have been created, they can be treated similiarly to traffic.libkqueue.Sockets:

send_buf = parent[1].rallocate(1) # an array.array()
receive_buf = child[0].rallocate(1) # an array.array()

fds[0] = open(...)

parent[1].acquire(send_buf)
child[0].acquire(receive_buf)

Traffic.acquire(parent[1])
Traffic.acquire(child[0])

Afterwards, the file descriptors placed into the send_buf will be transferred to the child‘s Transit and into its resource, the receive_buf.