Reference

This document describes the specifics of the modules, classes, and methods provided by Traffic. It refrains from discussing how the parts might be used in a wider scope; the Usage document should be read for such information.

traffic

🚦 About

Traffic provides kqueue(2) based event driven I/O for Python. However, in addition to listening for the I/O events, Traffic performs the corresponding I/O operation prior to propagating the notification. In essence, Traffic transforms can-read and can-write events into did-read and did-write.

Warning

Traffic a work in progress. The APIs may change without warning.

Traffic is designed to maximize efficiency by performing I/O operations in bulk. By grouping the collection of I/O events and the execution of I/O operations that work directly with pre-allocated buffer objects, Traffic is capable of performing a fair amount of work while the GIL is released. This batching of operations also reduces the number of times that the GIL is released and acquired within a process.

Note

Traffic aims to be the I/O foundation for frameworks. Direct use of Traffic will require a fair amount of boilerplate.

Transferring Data with Traffic

Using libkqueue and libloop:

import traffic.libkqueue
import traffic.libloop

Making a connection is easy:

s1, s2 = traffic.libkqueue.Octets.connect_stream_socketpair()

Traffic manages channels, a “Transit”, as distinct, half-duplex objects. The above example is referring to an invocation of socketpair(2), so the results, s1 and s2, are actually tuples of two traffic.libkqueue.Octets instances. The receive part is the first item and the send part, the second–I then O of I/O.

In order for these objects to perform actual transfers, they must be attached to a traffic.libkqueue.Traffic instance, which manages the kqueue subscriptions:

traffic = traffic.libkqueue.Traffic()
s1read, s1write = s1
s2read, s2write = s2

Attachment is performed with the traffic.libkqueue.Traffic.acquire() method. Once acquired, the Transit is owned by the Traffic object, and will not be released until termination:

traffic.acquire(s1read)
traffic.acquire(s1write)
traffic.acquire(s2read)
traffic.acquire(s2write)

Now that the traffic instance has acquired the Transits, transfers are possible given resource availability: the Transits now need to acquire resources to transfer; some data to send or some space to receive into. Similar to Traffic, this is done with the acquire method, traffic.libkqueue.Octets.acquire():

mutable_buffer = traffic.libkqueue.Octets.rallocate(128) # memoryview(bytearray()) object
s1read.acquire(mutable_buffer)
s1write.acquire(b'Hello, that side of the world!')

Likewise the other end needs to acquire resources as well:

s2read.acquire(traffic.libkqueue.Octets.rallocate(128))
s2write.acquire(b'Nobody is home! Go Away!')

All of the Transits have the necessary resources for performing a transfer. For illustrative purposes, events will be placed into a Queue by a loop performed in a thread:

import queue
import threading

q = queue.Queue(6)
deliver = q.put

thread = threading.Thread(args = (deliver, traffic), target = traffic.libloop.loop)
thread.start()

The loop is now performing transfers and delivering sequences of traffic.libloop.Activity instances using q.put(). Reading the sequences with q.get(), the transfers can be processed while the traffic.libloop.loop() performs the next set of transfers for subsequent processing:

io = q.get()
while io is not None:
        for x in io:
                if x.demand is not None:
                        # It's a write Transit that has sent all its data.
                        x.transit.terminate()
                else:
                        print(x)
        if traffic.volume == 0:
                # all transits have terminated
                traffic.terminate()
        io = q.get()
print("Complete!")

The output of the above should look something like:

Activity(transit=<traffic.libkqueue.Octets object at 0xDEADBEEF1>, termination=None, transferred=bytearray(b'Hello, that side of the world!'), demand=None)
Activity(transit=<traffic.libkqueue.Octets object at 0xDEADBEEF2>, termination=None, transferred=bytearray(b'Nobody is home! Go Away!'), demand=None)
Activity(transit=<traffic.libkqueue.Octets object at 0xDEADBEEF3>, termination=<Status>, transferred=None, demand=None)
Activity(transit=<traffic.libkqueue.Octets object at 0xDEADBEEF4>, termination=<Status>, transferred=None, demand=None)
Activity(transit=<traffic.libkqueue.Octets object at 0xDEADBEEF5>, termination=<Status>, transferred=None, demand=None)
Activity(transit=<traffic.libkqueue.Octets object at 0xDEADBEEF6>, termination=<Status>, transferred=None, demand=None)

Naturally, the code in this example doesn’t really benefit from having the loop performed in a separate thread. However, the purpose of the example is to illuminate the anticipated form of use. In cases of numerous connections, the above model is extremely efficient as transfers are actually being performed concurrently with processing. The concurrency itself does not contrast much with traditional multi-threaded Python processes. Most Python interfaces to underlying C functions release the GIL and allow true concurrency with regards to the processing performed by the C function. The contrast is in the form of GIL contention and payload delivery: Traffic loops can perform multiple I/O operations without the GIL being held and deliver the data into an actual Python object.

The Usage chapter of the documentation covers the details of what is summarized here. There are important facets not directly addressed in this introduction. Notably, the use of buffer exhaustion for flow control.

traffic.lib

Surface interfaces for the implementation modules:

traffic.lib.implementation = {'kqueue': functools.partial(<function import_module at 0x103a96d40>, '.libkqueue', package='traffic')}

Dictionary whose values refer to callables importing the Traffic implementation described by the key.

traffic.libkqueue

A kqueue(2) based Traffic implementation.

class traffic.libkqueue.Descriptors

Bases: traffic.libkqueue.Point

Descriptors is a traffic.libkqueue.Point based traffic.libkqueue.Transit transferring file descriptors over a local socket.Descriptors can be used to transfer file descriptors to a child process from a parent process or vice versa.

acquire(mutable_buffer)
Parameters:mutable_buffer (buffer) – The area of memory to write the socket identifiers to.
Returns:The descriptors instance acquiring the resource. (Method Chaining)
Return type:traffic.libkqueue.Descriptors

Acquire a resource for storing accepted file descriptors. Usually, traffic.libkqueue.Descriptors.rallocate() is used to allocate these resources.

connect_socket()

connect_sockets(parent, child)

Parameters:
  • parent (int) – The socket file descriptor that will be used by the parent process.
  • child (int) – The socket file descriptor that will be used by the child process.
Returns:

A new descriptors instance.

Return type:

traffic.libkqueue.Descriptors

Create the necessary Descriptors instances using the given file descriptors for communicating file descriptors across processes.

connect_socketpair()
Returns:A pair of Descriptors pairs.
Return type:traffic.libkqueue.Descriptors

Create the necessary Descriptors instances for communicating file descriptors across processes. The sockets used by the new Transits are created with socketpair(2)

identity()
Returns:File descriptor.
Return type:int

Return the file descriptor used by the Descriptors instance.

rallocate(size)
Parameters:size (int) – Number of descriptors that can be accepted.
Returns:A new mutable resource.
Return type:array.array

Create a mutable resource capable of being written into by a descriptors instance.

resource

The object whose buffer was acquired, traffic.libkqueue.Descriptors.acquire(), as the Transit’s transfer resource.

None if there is no resource.

transfer()
Returns:Return a pair of integers specifying the index range that the transferred file descriptors were written to or read from.
Return type:slice
class traffic.libkqueue.Octets

Bases: traffic.libkqueue.Point

Octets is a Transit that transfers ‘octets’, binary data. Primarily, data transfers are facilitated by buffer-capable objects such a bytearrays or bytes instances.

acquire(buffer)
Parameters:buffer (buffer) – The buffer to write data from or read data into.
Returns:The Octets instance acquiring the resource. (Method Chaining)
Return type:traffic.libkqueue.Octets

Set the buffer to facilitate transfers with. If the Octets instance receives, the buffer must be mutable. Usually, the type of mutable buffer used is the built-in bytearray.

connect_datagram_ip4(host, interface = None)
Parameters:
  • host ((str, int)) – The (ip4_address, port) pair to send datagrams to.
  • interface ((str, int)) – The (local_interface, port) pair to bind to.
Returns:

(input, output): A pair of Octets.

Return type:

tuple((traffic.libkqueue.Octets, traffic.libkqueue.Octets))

Create and return a pair–(input, output)–of Octets instances reading from and writing to a new SOCK_DATAGRAM/IPv4 connection.

connect_datagram_ip6(host, interface = None)
Parameters:
  • host ((str, int)) – The (ip6_address, port) pair to send datagrams to.
  • interface ((str, int)) – The (local_interface, port) pair to bind to.
Returns:

(input, output): A pair of Octets.

Return type:

(traffic.libkqueue.Octets, traffic.libkqueue.Octets)

Create and return a pair–(input, output)–of Octets instances reading from and writing to a new SOCK_DATAGRAM/IPv6 connection.

connect_file(path)
Parameters:path (bytes) – The path to the file.
Returns:A new input Octets instance.
Return type:traffic.libkqueue.Octets

Warning

Broken

Create an input Octets reading from the given file at the given path.

connect_file_append(path)
Parameters:path (bytes) – The path to the file.
Returns:A new output Octets instance.
Return type:traffic.libkqueue.Octets

Warning

Broken

Create an output Octets appending to the file at the given path.

connect_file_overwrite(path)
Parameters:path (bytes) – The path to the file.
Returns:A new output Octets instance.
Return type:traffic.libkqueue.Octets

Warning

Broken

Create an output Octets overwriting to the file at the given path.

connect_input_point(fileno)
Parameters:fileno (int) – The file descriptor referring to the kernel resource.
Returns:octets: A single Octets instance that performs reads.
Return type:traffic.libkqueue.Octets

Create and return an Octets instance for reading from the given file descriptor. If the file descriptor is a socket, use traffic.libkqueue.Octets.connect_stream_socket() The given file descriptor’s options are modified to perform non-blocking I/O and disable SIGPIPE on supporting platforms.

connect_output_point(fileno)
Parameters:fileno (int) – The file descriptor referring to the kernel resource.
Returns:octets: A single Octets instance that performs writes.
Return type:traffic.libkqueue.Octets

Create and return an Octets instance for writing to the given file descriptor. If the file descriptor is a socket, use traffic.libkqueue.Octets.connect_stream_socket() The given file descriptor’s options are modified to perform non-blocking I/O and disable SIGPIPE on supporting platforms.

connect_pipe()
Returns:(input, output)
Return type:(traffic.libkqueue.Octets, traffic.libkqueue.Octets)

Return a pair of a Octets connected to each other. The former writing into the latter. The file descriptors are created using pipe(2)

connect_stderr()
Returns:A new Octets instance.
Return type:traffic.libkqueue.Octets

Create an output Octets to standard error.

connect_stdie()
Returns:(input, output)
Return type:(traffic.libkqueue.Octets, traffic.libkqueue.Octets)

Return a pair of Octets using standard input and standard error.

connect_stdio()
Returns:(input, output)
Return type:(traffic.libkqueue.Octets, traffic.libkqueue.Octets)

Return a pair of Octets from standard input and standard output.

connect_stream_ip4(host, interface = None)
Parameters:
  • host ((str, int)) – The (ip4_address, port) pair to connect to.
  • interface ((str, int)) – The (local_interface, port) pair to bind to.
Returns:

(input, output): A pair of Octets.

Return type:

(traffic.libkqueue.Octets, traffic.libkqueue.Octets)

Create and return a pair of Octets reading from and writing to a new SOCK_STREAM/IPv4 connection established using socket(2) and connect(2).

connect_stream_ip6(host, interface = None)
Parameters:
  • host ((str, int)) – The (ip6_address, port) pair to connect to.
  • interface ((str, int)) – The (local_interface, port) pair to bind to.
Returns:

(input, output)

Return type:

(traffic.libkqueue.Octets, traffic.libkqueue.Octets)

Create and return a pair of Octets reading from and writing to a new SOCK_STREAM/IPv6 connection established using connect(2).

connect_stream_local(path)
Parameters:path (bytes) – The file system path to the local socket.
Returns:(input, output): A pair of Octets.
Return type:(traffic.libkqueue.Octets, traffic.libkqueue.Octets)

Create and return a pair–(input, output)–of Octets instances reading from and writing to a new SOCK_STREAM/AF_LOCAL connection.

connect_stream_socket(fileno)
Parameters:fileno (int) – The file descriptor associated with the socket to be used.
Returns:(input, output): A pair of Octets.
Return type:(traffic.libkqueue.Octets, traffic.libkqueue.Octets)

Create and return a pair–(input, output)–of Octets instances reading from and writing to a SOCK_STREAM socket identified by the first argument.

The given socket’s options are modified to perform non-blocking I/O and disable SIGPIPE on supporting platforms.

connect_stream_socketpair()
Returns:((input, output), (input, output)).
Return type:((traffic.libkqueue.Octets, traffic.libkqueue.Octets), (traffic.libkqueue.Octets, traffic.libkqueue.Octets))

Construct and return two pairs of Octets connected to each other. The sockets are created using socketpair(2)

identity()
Returns:File descriptor or interface-port pair.
Return type:[int or (str, int)]

Get the represented endpoint’s identity (getpeername or getsockname).

rallocate(size)
Parameters:size (int) – Number of bytes that can be written in the resource.
Returns:A new mutable resource.
Return type:memoryview`(:py:class:`bytearray)

Create a mutable resource capable of being written into by a Octets instance.

resource

The object whose buffer was acquired, traffic.libkqueue.Octets.acquire(), as the Transit’s transfer resource.

None if there is no resource.

transfer()
Returns:A slice specifying the portion of the resource that was transferred.
Return type:slice

For input Octets, the slice is often directly applied to the resource in order to send the newly received data to a higher-level: t.resource[t.transfer()].

For output Octets, the slice is often used to identify the number of bytes transferred: s=t.transfer(); size=s.stop-s.start.

class traffic.libkqueue.Point

Bases: traffic.libkqueue.Transit

Points are an abstract Transit whose transfers are facilitated by a ‘kernel point’, a file descriptor on UNIX systems.

status()
Returns:A new Status instance.
Return type:Status
class traffic.libkqueue.Sockets

Bases: traffic.libkqueue.Point

Sockets is a Point based Transit transferring accepted socket file descriptors from a listening socket.

accept_stream_ip4(interface, backlog = 64)
Parameters:
  • interface (([str or int], int)) – The IPv4 interface and port pair to bind to.
  • backlog (int) – The listen backlog to configure.
Returns:

A new Sockets instance.

Return type:

traffic.libkqueue.Sockets

Create a Sockets instance that accepts sockets for IPv4 (PF_INET) stream connections.

accept_stream_ip6(interface, backlog = 64)
Parameters:
  • interface (([str or int], int)) – The IPv6 interface and port pair to bind to.
  • backlog (int) – The listen backlog to configure.
Returns:

A new Sockets instance.

Return type:

traffic.libkqueue.Sockets

Create a Sockets instance that accepts sockets for IPv6 (PF_INET6) stream connections.

accept_stream_local(interface, backlog = 64)
Parameters:
  • interface (bytes) – The file system path to bind to. For example, b'/tmp/server'.
  • backlog (int) – The listen backlog to configure.
Returns:

A new Sockets instance.

Return type:

traffic.libkqueue.Sockets

Create a Sockets instance that accepts sockets for local (PF_UNIX) stream connections.

Note

The file system path must be in bytes. It is necessary to properly encode the parameter into the file system’s encoding.

accept_stream_socket(fileno, backlog = 64)
Parameters:
  • fileno (int) – The socket file descriptor to accept connections on.
  • backlog (int) – The listen backlog to configure.
Returns:

A new Sockets instance.

Return type:

traffic.libkqueue.Sockets

Create a Sockets instance for accepting new sockets from the given file descriptor. listen(2) will be called unless a zero-backlog is given. The file descriptor will have its options modified.

acquire(mutable_buffer)
Parameters:mutable_buffer (buffer) – The area of memory to write the socket identifiers to.
Returns:The Sockets instance acquiring the resource. (Method Chaining)
Return type:traffic.libkqueue.Sockets

Acquire a resource for storing accepted file descriptors. Usually, traffic.libkqueue.Octets.rallocate() is used to allocate these resources.

identity()
Returns:Interface-port pair that the socket is bound to.
Return type:(str, int)

Builds a tuple identifying the interface that is accepting connections.

rallocate(size)
Parameters:size (int) – Number of sockets that can be accepted.
Returns:A new mutable resource.
Return type:array.array

Create a mutable resource capable of being written into by a Sockets instance.

resource

The object whose buffer was acquired, traffic.libkqueue.Sockets.acquire(), as the Transit’s transfer resource.

None if there is no resource.

transfer()
Returns:Return a pair of integers specifying the index range that the accepted file descriptors were written to.
Return type:slice
class traffic.libkqueue.Status

Bases: builtins.object

Status(polarity = 0, errno = -1, fileno = -1, call = ‘none’)

Status snapshot of a Transit. Status objects provide a snapshot of a Transit’s status and other metadata. In cases of termination, extracting the status object will acquire information that may be lost once termination is complete.

Status objects also provide high-level access to structured failure messages such that subsequent communication to the user may be performed in a way that suites the application.

call

The system library call performed that produced the errno.

errname

The macro name of the errno. Equivalent to errno.errorcode[status.errno].

errno

The errno as an int recorded by the Transit object at the time of status snapshot. This field is specific to libkqueue Transits.

fileno

The fileno as an int recorded by the Transit object at the time of status snapshot. This field is specific to libkqueue Transits.

polarity

-1 if the Transit sends, 1 if the Transit receives, and 0 if it does both or neither. Traffic instances are currently the only Transits that have zero polarity.

strerror

A string describing the errno using the strerror(2) function. This may be equivalent to the strposix attribute.

strposix

A string describing the errno using the POSIX descriptions built into Traffic.

class traffic.libkqueue.Traffic

Bases: traffic.libkqueue.Point

Traffic is a Point based Transit that manages the connection to the kernel. Transits created should be attached to a Traffic object in order to perform transfers with those Transits. Traffic represents the kqueue() instance.

acquire(transit)
Parameters:transit (traffic.libkqueue.Transit) – The Transit instance to place In Traffic.

Acquires the Transit in order to enable the Transits’ events to be collected in the traffic.libkqueue.Traffic‘s cycles.

disregard()
Returns:The Traffic instance being disregarded. (Method Chaining)
Return type:traffic.libkqueue.Traffic

Disregards, traffic.libkqueue.Transit.disregard(), all Transits in this Traffic instance.

force()
Returns:The Traffic instance being forced. (Method Chaining)
Return type:traffic.libkqueue.Traffic

Causes the next traffic cycle not wait for events. If a cycle has been started and is currently waiting for events, cause it to stop waiting for events.

resource

A list of all Transits attached to this Traffic instance, save the Traffic instance.

transfer()
Returns:An iterator producing the In Traffic Transits, traffic.libkqueue.Transit, that have events.
Return type:collections.Iterable
transfer_count

The number of Transits that produced events this cycle. -1 if not in a cycle.

volume

The number of transits being managed by the Traffic instance.

class traffic.libkqueue.Transit

Bases: builtins.object

Transits are objects that can perform transfers until termination. In order for events to be processed, the Transit must be acquired by a traffic.libkqueue.Traffic instance.

acquire(resource)
Parameters:resource (object) – The resource to use facitate transfers.
Returns:The Transit instance acquiring the resource. (Method Chaining)
Return type:traffic.libkqueue.Transit

Acquire a resource for facilitating transfers. The resource type depends on the Transit subclass, but it is normally an object supporting the buffer interface. The particular Transit type should document the kind of object it expects.

disregard()
Returns:The transit being disregarded. (Method Chaining)
Return type:traffic.libkqueue.Transit

Deallocate references to resources held by the Transit. The semantics are distinct from traffic.libkqueue.terminate() as traffic.libkqueue.disregard() doesn’t attempt to end the represented channel. For some Transits, this is identical to terminate.

exhausted

whether the transit has a resource capable of performing transfers

polarity

True if the transit receives, False if it sends.

status()
Returns:An object describing the status of the Transit.
Return type:Status

See traffic.libkqueue.Point.status() for specifics.

terminate()
Returns:The transit being terminated. (Method Chaining)
Return type:traffic.libkqueue.Transit

Terminate the Transit permanently causing events to subside. Eventually, resources being held by the Tranist will be released.

terminated

whether the transit is capable of future transfers

traffic

The traffic.libkqueue.Traffic instance that the Transit has been acquired by. None if the Transit has not been acquired by a Traffic instance.

traffic.libloop

Tools for working with traffic.abstract.Traffic cycles.

class traffic.libloop.Activity

Bases: builtins.tuple

Used to hold a snapshot of events and data that were produced by a Transit and their associated information or callback. This is useful for extracting transfer data from a Transit that can be used outside of the Traffic Cycle.

Attributes:

transit
The traffic.abstract.Transit instance.
terminated
Whether or not a terminate event occurred: None if not terminated. Otherwise, the result of the traffic.abstract.Transit.failure() method.
transferred
An integer designating the amount transferred iff the transit sends. The transfer slice of the resource iff the transit receives.
demand
The traffic.abstract.Transit.acquire() method of the Transit if an exhaust event occurred, otherwise, None.
demand

Alias for field number 3

termination

Alias for field number 1

transferred

Alias for field number 2

transit

Alias for field number 0

traffic.libloop.activity(transit)[source]

Given a traffic.abstract.Transit, get an traffic.libloop.Activity snapshot of the Transit.

traffic.libloop.cycle(traffic)[source]
Parameters:traffic (traffic.abstract.Traffic) – The Traffic to cycle.
Returns:List of Activity instances.
Return type:[Activity]

Cycle the traffic instance and return a traffic.libloop.snapshot() list of activity.

Warning

If insufficient memory is available for building the snapshot, transfer-loss will occur.

traffic.libloop.loop(deliver, traffic)[source]
Parameters:
  • deliver (callable) – The callable that is given the sequence of Activity instances.
  • traffic (traffic.abstract.Traffic) – The Traffic instance that is being cycled until termination.
Returns:

None

Function that runs a Traffic loop and delivers sequences of Activity instances into the given deliver callable so long as the Traffic instance is not terminated:

Q = queue.Queue(8)
T = libtraffic.Traffic()
libloop.loop(Q.put, T)

Running the call to loop in a thread is an appropriate way to collect events. By doing this, events and wire transfers can be processed in parallel.

traffic.libloop.snapshot(traffic, _list=<class 'list'>, _map=<class 'map'>)[source]
Parameters:traffic (traffic.abstract.Traffic) – The Traffic instance to extract Activity from.
Returns:List of Activity instances.
Return type:[Activity]

Given a traffic.abstract.Traffic instance, get a snapshot of all traffic.libloop.activity().

traffic.project

project information

traffic.project.contact = 'mailto:x@jwp.io'

Contact Point for the Responsible Party

traffic.project.identity = 'http://jwp.io/project/py-traffic'

IRI based project identity.

traffic.project.meaculpa = 'James William Pye'

Responsible Party

traffic.project.name = 'traffic'

Project name.

traffic.project.version = '0.9.0'

The version string.

traffic.project.version_info = (0, 9, 0)

Version tuple: (major, minor, bugfix)

Table Of Contents

Previous topic

Tutorials

Next topic

Mechanics

This Page