Protocols/Transports API

This part of the pulsar API is about classes responsible for implementing the Protocol/Transport paradigm. They are based on asyncio.Protocol and asyncio.DatagramProtocol classes.



class pulsar.async.protocols.PulsarProtocol(loop, session=1, producer=None, logger=None, **kw)[source]

A mixin class for both Protocol and DatagramProtocol.

A PulsarProtocol is an EventHandler which has two one time events:

  • connection_made
  • connection_lost

Abort by aborting the transport


The address of the transport.


Close by closing the transport

Return the connection_lost event which can be used to wait for complete transport closure.


True if the transport is closed.


Fires the connection_lost event.


Sets the transport, fire the connection_made event and adds a timeout for idle connections.


The socket was closed from the remote end


The producer of this Protocol.


Connection session number.

Passed during initialisation by the producer. Usually an integer representing the number of separate connections the producer has processed at the time it created this Protocol.


The socket of transport.


The transport for this protocol.

Available once the connection_made() is called.


class pulsar.async.protocols.Protocol(loop, session=1, producer=None, logger=None, **kw)[source]

An asyncio.Protocol with events


Write data into the wire.

Returns an empty tuple or a Future if this protocol has paused writing.


class pulsar.async.protocols.Connection(consumer_factory=None, timeout=None, **kw)[source]

A FlowControl to handle multiple TCP requests/responses.

It is a class which acts as bridge between a transport and a ProtocolConsumer. It routes data arriving from the transport to the current_consumer().


A factory of ProtocolConsumer.


number of separate requests processed.


The ProtocolConsumer currently handling incoming data.

This instance will receive data when this connection get data from the transport via the data_received() method.

If no consumer is available, build a new one and return it.


Delegates handling of data to the current_consumer().

Once done set a timeout for idle connections when a timeout is a positive number (of seconds).


Upgrade the _consumer_factory() callable.

This method can be used when the protocol specification changes during a response (an example is a WebSocket request/response, or HTTP tunneling).

This method adds a post_request callback to the current_consumer() to build a new consumer with the new _consumer_factory().

Parameters:consumer_factory – the new consumer factory (a callable accepting no parameters)

Protocol Consumer

class pulsar.async.protocols.ProtocolConsumer(loop=None, one_time_events=None, many_times_events=None)[source]

The consumer of data for a server or client Connection.

It is responsible for receiving incoming data from an end point via the Connection.data_received() method, decoding (parsing) and, possibly, writing back to the client or server via the transport attribute.


For server consumers, data_received() is the only method to implement. For client consumers, start_request() should also be implemented.

A ProtocolConsumer is a subclass of EventHandler and it has two default one time events:

  • pre_request fired when the request is received (for servers) or just before is sent (for clients). This occurs just before the start_request() method.
  • post_request fired when the request is done. The on_finished attribute is a shortcut for the post_request OneTime event and therefore can be used to wait for the request to have received a full response (clients).

In addition, it has two many times events:

  • data_received fired when new data is received from the transport but not yet processed (before the data_received() method is invoked)
  • data_processed fired just after data has been consumed (after the data_received() method)


A useful example on how to use the data_received event is the wsgi proxy server.


Abort the request.

This method can be called during the pre-request stage


The Connection of this consumer.


Called by the connection when the transport is closed.

By default it calls the finished() method. It can be overwritten to handle the potential exception exc.


Called by a Connection when it starts using this consumer.

By default it does nothing.


Called when some data is received.

This method must be implemented by subclasses for both server and client consumers.

The argument is a bytes object.

finished(*arg, **kw)[source]

Fire the post_request event if it wasn’t already fired.


Event fired once a full response to a request is received. It is the post_request one time event.


The Producer of this consumer.


The request.

Used for clients only and available only after the start() method is invoked.


Starts processing the request for this protocol consumer.

There is no need to override this method, implement start_request() instead. If either connection or transport are missing, a RuntimeError occurs.

For server side consumer, this method simply fires the pre_request event.


Starts a new request.

Invoked by the start() method to kick start the request with remote server. For server ProtocolConsumer this method is not invoked at all.

For clients this method should be implemented and it is critical method where errors caused by stale socket connections can arise. This method should not be called directly. Use start() instead. Typically one writes some data from the request into the transport. Something like this:


The Transport of this consumer


Delegate writing to the underlying Connection

Return an empty tuple or a Future


Producers are factory of Protocol with end-points. They are used by both servers and clients classes.


class pulsar.async.protocols.Producer(loop=None, protocol_factory=None, name=None, max_requests=None, logger=None)[source]

An Abstract EventHandler class for all producers of socket (client and servers)


Build a consumer for a protocol.

This method can be used by protocols which handle several requests, for example the Connection class.

Parameters:consumer_factory – consumer factory to use.

Create a new protocol via the protocol_factory()

This method increase the count of sessions and build the protocol passing self as the producer.


Total number of requests processed.


Total number of protocols created by the Producer.

TCP Server

class pulsar.async.protocols.TcpServer(protocol_factory, loop, address=None, name=None, sockets=None, max_requests=None, keep_alive=None, logger=None)[source]

A Producer of server Connection for TCP servers.


A Server managed by this Tcp wrapper.

Available once the start_serving() method has returned.


Socket address of this server.

It is obtained from the first socket getsockname method.


Stop serving the Server.sockets.


Override Producer.create_protocol().

start_serving(backlog=100, sslcontext=None)[source]

Start serving.

  • backlog – Number of maximum connections
  • sslcontext – optional SSLContext object.

a Future called back when the server is serving the socket.


Classes for the (user) datagram protocol. UDP uses a simple transmission model with a minimum of protocol mechanism.

Datagram Protocol

class pulsar.async.protocols.DatagramProtocol(loop, session=1, producer=None, logger=None, **kw)[source]

An asyncio.DatagramProtocol with events`

Datagram Server

class pulsar.async.protocols.DatagramServer(protocol_factory, loop=None, address=None, name=None, sockets=None, max_requests=None, logger=None)[source]

An Producer for serving UDP sockets.


A list of DatagramTransport.

Available once the create_endpoint() method has returned.


Stop serving the Server.sockets and close all concurrent connections.


create the server endpoint.

Returns:a Future called back when the server is serving the socket.

Protocol Mixins


class pulsar.async.mixins.FlowControl(low_limit=None, high_limit=None, **kw)[source]

A protocol mixin for flow control logic.

This implements the protocol methods pause_writing(), resume_writing().


Called by the transport when the buffer goes over the high-water mark

Successive calls to this method will fails unless resume_writing() is called first.


Resume writing.

Successive calls to this method will fails unless pause_writing() is called first.


class pulsar.async.mixins.Timeout[source]

Adds a timeout for idle connections to protocols


This section introduces classes implementing the transport/protocol paradigm for clients with several connections to a remote TcpServer.

Abstract Client

class pulsar.async.clients.AbstractClient(loop=None, protocol_factory=None, name=None, max_requests=None, logger=None)[source]

A Producer for a client connections.


Abstract method for creating a connection.

create_connection(address, protocol_factory=None, **kw)[source]

Helper method for creating a connection to an address.

Abstract UDP Client

class pulsar.async.clients.AbstractUdpClient(loop=None, protocol_factory=None, name=None, max_requests=None, logger=None)[source]

A Producer for a client udp connections.

create_datagram_endpoint(protocol_factory=None, **kw)[source]

Helper method for creating a connection to an address.


Abstract method for creating the endpoint


class pulsar.async.clients.Pool(creator, pool_size=10, loop=None, timeout=None, **kw)[source]

An asynchronous pool of open connections.

Open connections are either in_use or available to be used. Available connection are placed in an asyncio.Queue.

This class is not thread safe.


Number of available connections in the pool.


Close all connections

Return a Future called once all connections have closed


True when this pool is closed


Get a connection from the pool.

The connection is either a new one or retrieved from the available connections in the pool.

Returns:a Future resulting in the connection.

The number of connections in use.

These connections are not available until they are released back to the pool.


The maximum number of open connections allowed.

If more connections are requested, the request is queued and a connection returned as soon as one becomes available.

Pool Connection

class pulsar.async.clients.PoolConnection(pool, connection)[source]

A wrapper for a Connection in a connection Pool.


The Pool which created this PoolConnection


The underlying socket connection.


Close this pool connection by releasing the underlying connection back to the pool.


Remove the underlying connection from the connection pool.