.. vim: ft=rst ===== Usage ===== This chapter describes how to use Traffic. While event driven I/O has some advantages, it also has some added difficulty. This chapter is structured to illustrate how to perform *specific tasks in isolation*. The :doc:`tutorials` chapter contains some complete examples that may be more illuminating. Traffic positions itself as a dependency target for frameworks. This means that *direct* use of :py:mod:`traffic.libkqueue` will *always* require some boilerplate. Traffic itself uses :py:mod:`traffic.libloop` in its tests and documentation in order to simplify the work. .. note:: Reading :doc:`project` prior to this chapter is recommended. --------------------- KQueue Implementation --------------------- Currently, this is the only implementation. Creating Traffic Instances ========================== Transits are objects that manage the resources and functionality that performs transfers. In order for this functionality to be used--saving initial file descriptor allocation, a Transit *must* be acquired, :py:meth:`traffic.libkqueue.Traffic.acquire`, by a :py:class:`traffic.libkqueue.Traffic` instance. Traffic instances serve as a bucket for Transits, and, internally, the object holding the :manpage:`kqueue(2)` file descriptor. The entire set can be terminated or disregarded at any time. Multiple Traffic instances can exist in a process so that distinct parts of a process can manage their I/O resources in isolation. Traffic objects require no parameters:: import traffic.libkqueue T = traffic.libkqueue.Traffic() Establishing Connections ======================== Internet Protocol Connections ----------------------------- Data connections are managed with the :py:class:`traffic.libkqueue.Octets` Transit. The type is named after the subject that is transferred: "octets". Connections can be created by the class methods provided on the :py:class:`traffic.libkqueue.Octets` type. These methods allocate the necessary resources to establish the connection. The method names define the means used to connect. For example:: (input, output) = traffic.libkqueue.Octets.connect_stream_ip4(('127.0.0.1', 8080)) The above example constructs a *pair*, :py:class:`tuple`, of Octets instances that can be used to communicate with the server listening at the target IPv4 address and port. Likewise, :py:meth:`traffic.libkqueue.Octets.connect_stream_ip6` will connect using IPv6. .. note:: The class methods establishing bidirectional channels have to return a pair of Transits as an individual Transit only transfers in one direction. This is by design in order to allowing arbitrary grouping of channels and to reduce the number of possible event identifiers. Interprocess Connections ------------------------ Connections used for interprocess communication can be created using the :py:meth:`traffic.libkqueue.Octets.connect_stream_socketpair` and the :py:meth:`traffic.libkqueue.Octets.connect_pipe` class methods. The `connect_pipe` method returns a pair of :py:class:`traffic.libkqueue.Octets`, ``(read, write)``, where the latter writes into the former. This is primarily useful for worker processes that only need to receive work until terminated by the closure of the pipe. See :manpage:`pipe(2)` for more information:: child, parent = traffic.libkqueue.Octets.connect_pipe() The `connect_stream_socketpair` method returns *two pairs* of Octets, ``((read, write), (read, write))``. :manpage:`socketpair(2)` can be used when the processes need communication as opposed to dictation:: child, parent = traffic.libkqueue.Octets.connect_stream_socketpair() child_input, child_output = child parent_input, parent_output = parent Subsequently, the process can :manpage:`fork(2)` and perform communication with the child process. See `Forking`_ for more information about how Traffic works in multiprocess situations. .. note:: `connect_pipe` can be ran twice as an alternative to `connect_stream_socketpair`, but on UNIX systems, this ends using more file descriptors than :manpage:`socketpair(2)`. Listening for Connections ========================= A listening socket is managed using the :py:class:`traffic.libkqueue.Sockets` type. The type is named after the subject that is transferred by the type. In this case, "sockets", being the identifier used to refer to a particular connection, an integer file descriptor on most platforms. When a transfer is performed by a :py:class:`traffic.libkqueue.Sockets` instance, the mutable buffer acquired by the instance is filled with the accepted integers. Normally, it is best to use an :py:class:`array.array` instance as the resource to be acquired by Sockets. This provides easy access to the information written into the mutable buffer. The :py:meth:`traffic.libkqueue.Sockets.rallocate` method provides simple access to array allocation suitable for use as a Sockets' resource:: conbuffer = traffic.libkqueue.Sockets.rallocate(64) import array assert conbuffer.__class__ is array.array Sockets instances can be created by the class methods on the :py:class:`traffic.libkqueue.Sockets` type: * :py:meth:`traffic.libkqueue.Sockets.accept_stream_ip4` * :py:meth:`traffic.libkqueue.Sockets.accept_stream_ip6` * :py:meth:`traffic.libkqueue.Sockets.accept_stream_local` * :py:meth:`traffic.libkqueue.Sockets.accept_stream_socket` For example, bind and accept sockets from port `8080` on localhost interfaces:: S4 = traffic.libkqueue.Sockets.accept_stream_ip4(('127.0.0.1', 8080), backlog = 32) S6 = traffic.libkqueue.Sockets.accept_stream_ip6(('::1', 8080), backlog = 32) .. note:: These class methods only initialize the listening socket. No :manpage:`accept(2)` calls are performed at this time. The method names indicate the role of the new Sockets instance, not what is actually performed upon invocation. Sockets can be created from arbitrary socket file descriptors, but it expects a fully initialized listening socket. In this fashion, :: import os import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(('127.0.0.1', 8080)) fd = os.dup(s.fileno) s.close() # accept_stream_socket() will perform the listen(2) call. S = traffic.libkqueue.Sockets.accept_stream_socket(fd, backlog = 32) This creation interface is provided in cases where some custom socket initialization needs to be performed by a separate process--consider projects like circus and supervisord that may pass in file descriptors. Transferring ============ Internally, a transfer is possible when two conditions hold true: the transit has a resource for transmission, and the kernel has indicated that a transfer is possible by emitting an event. The actual transfer is performed by a Traffic instance which manages the subscription to kernel events. The Transit must be acquired by a Traffic instance in order to receive kernel events:: traffic = traffic.libkqueue.Traffic() input, output = traffic.libkqueue.Octets.connect_stream_ip4(('127.0.0.1', 8080)) input.acquire(traffic.libkqueue.Octets.rallocate(24)) output.acquire(b'') traffic.acquire(input) traffic.acquire(output) As shown in the above example, Transits can be given a resource using the :py:meth:`traffic.libkqueue.Transit.acquire` method. Once a resource has been acquire, it is used until exhaustion: the resource has been completely transferred. Merely acquiring resources, however, will not perform the transfers. The Traffic instance *must* perform a cycle in order to collect the kernel events and perform any possible transfers. Transfers *only* occur during Traffic cycles. Cycles can be performed by using the traffic instance as a context manager:: with traffic: pass While inside a traffic cycle, the events that occurred can be accessed:: while not traffic.terminated: # Context manager interfaces with traffic: for transit in traffic.transfer(): slice = transit.transfer() if slice is not None: transferred = transit.resource[slice] print(bytes(transferred)) The above example should illuminate the structure of a Traffic cycle. The Traffic Transit, transfers Transit objects that have produced events in the cycle. Outside of a cycle, the :py:meth:`traffic.libkqueue.Traffic.transfer` method will produce an empty iterator. Accepting Connections --------------------- `Listening for Connections`_ describes how to create a listening socket using :py:class:`traffic.libkqueue.Sockets`. However, there is more work to be done in order to actually accept incoming connections. Like :py:class:`traffic.libkqueue.Octets`, a resource must be acquired in order to facilitate a transfer. In the case of :py:class:`traffic.libkqueue.Sockets`, transfers are actually a sequence of :manpage:`accept(2)` invocations writing into a Python buffer. Using :py:meth:`traffic.libkqueue.Sockets.rallocate`, an :py:class:`array.array` instance can be created for subsequent acquisition:: lsocket = traffic.libkqueue.Sockets.accept_stream_ip4(('127.0.0.1', 8080)) buf = lsocket.rallocate(128) # NOTE: rallocate is a classmethod lsocket.acquire(buf) The `lsocket` instance, being a Transit, needs to be acquired by a Traffic instance in order to process incoming connections:: traffic = traffic.libkqueue.Traffic() traffic.acquire(lsocket) Terminating Connections ======================= Termination occurs when either the Transit or the kernel notes that the Transit should be terminated. The Transit can be terminated by the process with the :py:meth:`traffic.libkqueue.Transit.terminate` method. When executed, the Transit's process resources and kernel resources will be released at the end of the cycle that first identifies event. In the cases terminations caused by failures, the :py:meth:`traffic.libkqueue.Transit.status` method returns an object describing the why the Transit was terminated. Essentially, this allows a user to identify whether or not the Transit was terminated by an internal request, or by an external force. Octets & Socket Shutdown ------------------------ Upon termination of an Octets instance whose file descriptor is a socket, :manpage:`shutdown(2)` will be called against the socket with the corresponding direction of the Transit being terminated. Shutdown is performed at the beginning of the Traffic cycle, but the file descriptor itself will still be valid, like non-socket Octets. However, after the end of a terminating cycle, the validity of the file descriptor will not be guaranteed. .. note:: In the case of sockets, it is possible that the file descriptor remains valid after a terminating cycle. However, this situation only occurs when the Transit managing the opposing direction is not terminated. This facet of termination should not be depended upon. Controlling the Flow ==================== With explicit acquisition of resources, flow control is performed by depriving a Transit of a Transfer Resource. If a transfer is desired, naturally, designate a resource to support the transfer with the :py:meth:`traffic.libkqueue.Transit.acquire` method:: T = traffic.libkqueue.Traffic() r, w = traffic.libkqueue.Octets.connect_pipe() r.acquire(r.rallocate(128)) w.acquire(b'data') # Traffic must acquire Transit resources as well. T.acquire(r) T.acquire(w) If a pause of transferring is desired, deprive the Transit of a resource. However, deprivation cannot be directly performed on a Transit that already has a resource. A Transit's resource must be exhausted before a true halt to transferring can occur. .. note:: The desired concurrency of a Traffic cycle inhibits the implementation of a pause feature without the use of synchronization mechanisms and additional record keeping. Traffic Loops ============= The `Transferring` section reveals the Traffic loop. It is a windowed event loop; events can only be accessed during the cycle. Traffic is designed to take in events in batches, between the GIL. This allows for data to be written into Python objects in a truly concurrent manner. Forking ======= Traffic instances are designed to tolerate certain failures. In cases where it is evident that :manpage:`kqueue(2)` file descriptor has been closed, Traffic, will end the cycle and create a new :manpage:`kqueue(2)` instance. When this occurs, Traffic will mark each Transit as needing a subscription. Subsequently, the state will be restored. However, such use in child processes is not recommended. Rather, in the child process, the inherited Traffic instance should be disregarded, :py:meth:`traffic.libkqueue.Traffic.disregard`. Once disregarded, a new Traffic instance can be built and the inherited Transits can be acquired as needed. Communicating With a Child Process ---------------------------------- With a Traffic instance's eventual destruction in the child on :manpage:`fork(2)`, using a Traffic based communication channel between the child and the parent takes some work. The socket pair can be created right away:: traffic = traffic.libkqueue.Traffic() pep, cep = traffic.libkqueue.Octets.connect_socketpair() Remembering that these Transits haven't been attached to a Traffic instance, the child endpoints, ``cep``, can be passed to a function that constructs a new Traffic instance for the child to use:: def in_parent(pid, child_channel): for transit in child_channel: traffic.acquire(transit) def in_child(ppid, parent_channel): global traffic traffic.close() traffic = traffic.libkqueue.Traffic() for transit in parent_channel: traffic.acquire(transit) def do_fork(): ppid = os.getpid() pid = os.fork() if pid == 0: # child process, disregard parent transits. for transit in pep: transit.disregard() in_child(ppid, cep) else: # parent process, disregard child transits. for transit in cep: transit.disregard() in_parent(pid, pep) The use of the :py:meth:`traffic.libkqueue.Transit.disregard` is very important here. In this particular case, if :py:meth:`traffic.libkqueue.Transit.terminate` were used, the child's socket would not be functional as terminate performs a :manpage:`shutdown(2)` operation on the socket, permanently ending that connection. :manpage:`close(2)` releases the parent's reference to the child's. Inheriting Sockets in Child Processes ------------------------------------- There are cases where inheriting sockets is desireable. Notably in pre-fork server models where the parent initializes the listening sockets, and the child processes service the incoming connections. In these cases, file descriptor duplication should be used in combination with :py:meth:`traffic.libkqueue.Transit.disregard`. The :py:attr:`traffic.libkqueue.Point._fileno` attribute holds the file descriptor identifier. Sending File Descriptors across Processes ========================================= There is a well hidden feature in UNIX systems that allows programmers to transfer file descriptors across a :manpage:`socketpair(2)` connection. This feature is exposed by the :py:class:`traffic.libkqueue.Descriptors` instance. Similiar to :py:class:`traffic.libkqueue.Sockets` instances, an array of integers are the subject of the channel. The word "Descriptors" was chosen to denote that arbitrary file descriptors may be transferred. ..note:: Operating System limitations may apply. Similar to connecting Octets, Descriptors are created using class methods:: parent, child = traffic.libkqueue.Descriptors.connect_socketpair() If the socket file descriptor already exists, the :py:meth:`traffic.libkqueue.Descriptors.connect_socket` method can be used:: parent, child = traffic.libkqueue.Descriptors.connect_socket(parentfd, childfd) Sockets being full-duplex, the `parent` and `child` objects are tuples of :py:class:`traffic.libkqueue.Descriptors` instances where the first item is the reader and the second, the writer. Once the Descriptors instances have been created, they can be treated similiarly to :py:class:`traffic.libkqueue.Sockets`:: send_buf = parent[1].rallocate(1) # an array.array() receive_buf = child[0].rallocate(1) # an array.array() fds[0] = open(...) parent[1].acquire(send_buf) child[0].acquire(receive_buf) Traffic.acquire(parent[1]) Traffic.acquire(child[0]) Afterwards, the file descriptors placed into the `send_buf` will be transferred to the `child`'s Transit and into its resource, the `receive_buf`.