Documentation for pulsar 0.7.2. For development docs, go here.
This part of the pulsar API is about classes responsible for implementing the Protocol/Transport paradigm as well as Server and Client base classes. Transport and Protocol are designed to comply with pep-3156 specification
Base class for transports.
Transports talk to two things: the other side of the connection on one hand, and a protocol on the other. It’s a bridge between the specific underlying transfer mechanism and the protocol. Its job can be described as allowing the protocol to just send and receive bytes, taking care of all of the magic that needs to happen to those bytes to be eventually sent across the wire.
A Transport for sockets.
When a new SocketTransport is created, it adds a read handler to the Transport.event_loop and notifies the Transport.protocol that the connection is available via the BaseProtocol.connection_made() method.
alias of OSError
The transport is about to close. In this state the transport is not listening for read events but it may still be writing, unless it is closed.
The transport is closed. No read/write operation available.
Closes the transport.
Buffered data will be flushed asynchronously. No more data will be received. After all buffered data is flushed, the BaseProtocol.connection_lost method will (eventually) called with None as its argument.
ABC for base protocol class.
The only case when BaseProtocol should be implemented directly is write-only transport like write pipe
Called when a connection is made.
Called when the connection is lost or closed.
The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).
ABC representing a protocol for a stream.
The user should implement this interface. They can inherit from this class but don’t need to. The implementations here do nothing (they don’t raise exceptions).
When the user wants to requests a transport, they pass a protocol factory to a utility function (e.g., EventLoop.create_connection()).
When the connection is made successfully, connection_made() is called with a suitable transport object. Then data_received() will be called 0 or more times with data (bytes) received from the transport; finally, connection_lost() will be called exactly once with either an exception object or None as an argument.
State machine of calls:
start -> CM [-> DR*] [-> ER?] -> CL -> end
Called when some data is received.
The argument is a bytes object.
Called when the other end calls write_eof() or equivalent.
The default implementation does nothing.
TODO: By default close the transport. But we don’t have the transport as an instance variable (connection_made() may not set it).
Connection session number.
The address of this connection.
The python logger for this connection.
The ProtocolConsumer currently handling incoming data.
Number of separate ProtocolConsumer processed.
For connections which are keept alive over several requests.
Number of seconds to keep alive this connection when an idle.
A value of 0 means no timeout.
If the current_consumer is not None an exception occurs.
Sets the transport, fire the connection_made event and adds a timeout for idle connections.
Implements the Protocol.data_received() method.
Implements the BaseProtocol.connection_lost() method.
It performs these actions in the following order:
Upgrade the consumer_factory() callable.
This method can be used when the protocol specification changes during a response (an example is a WebSocket request/response, or HTTP tunneling). For the upgrade to be successful, the post_request event of the protocol consumer should not have been fired already.
the new consumer if build_consumer is True.
The consumer of data for a server or client Connection.
It is responsible for receiving incoming data from an end point via the Connection.data_received() method, decoding (parsing) and, possibly, writing back to the client or server via the transport attribute.
In addition, it has two many times events:
A useful example on how to use the data_received event is the wsgi proxy server.
True if consumer has finished consuming data.
This is when the finish event has been fired.
Called by a Connection when it starts using this consumer.
By default it does nothing.
Called when some data is received.
This method must be implemented by subclasses for both server and client consumers.
The argument is a bytes object.
Starts a new request.
For clients this method should be implemented and it is critical method where errors caused by stale socket connections can arise. This method should not be called directly. Use start() instead. Typically one writes some data from the request into the transport. Something like this:
Starts processing the request for this protocol consumer.
For server side consumer, this method simply fires the pre_request event with request as data.
Call this method when done with this ProtocolConsumer.
fires the post_request event and removes self from the connection.
|Parameters:||result – the positional parameter passed to the post_request event handler.|
|Returns:||whatever is returned by the :meth:`EventHandler.fire_event` method (usually ``self is the input result is None, otherwise the input result)|
Producers are factory of connections with end-points. They are used by both servers and clients classes.
An Abstract EventHandler class for all producers of connections.
A callable producing connections.
The signature of the connection factory must be:
connection_factory(session, consumer_factory, producer, **params)
By default it is set to the Connection class.
alias of Connection
Number of seconds to keep alive an idle connection.
Passed as key-valued parameter to to the connection_factory().
Maximum number of connections allowed.
A value of 0 (default) means no limit.
Check if connection can be reused.
By default it returns True.
A Producer of connections with remote servers or clients.
Total number of connections created.
Number of concurrent active connections.
Called when a new connection is created.
Once a new connection is created, all the many times events of the producer are added to the connection.
the result of the connection_factory() call.
A base class for Servers listening on a socket.
In addition it has four many times event:
Stop serving and close the listening socket.
The protocol factory for a server.
Build a protocol consumer.
Uses the consumer_factory() to build the consumer and add events from the many-times events of this producer.
|Returns:||a protocol consumer.|
The socket receiving connections.
Server address, where clients send requests to.
This section introduces classes implementing the transport/protocol paradigm for clients with several connections to a remote Server. Client is the main class here, and Client.request is the single most important method a subclass must implement.
A client for several remote servers of the same type.
Most initialisation parameters have sensible defaults and don’t need to be passed for most use-cases. Additionally, they can also be set as class attributes to override defaults.
Reconnecting gap in seconds.
An optional version for this client.
Dictionary of ConnectionPool.
If initialized at class level it will remain as a class attribute, otherwise it will be an instance attribute.
Can reconnect on socket error.
Total number of concurrent connections.
Total number of available connections.
Setup the client.
Invoked at the end of initialisation with the additional parameters passed. By default it does nothing.
The event loop can be set during initialisation. If force_sync is True a specialised event loop is created.
Override the Producer.build_consumer() method.
Add a post_request handler to release the connection back to the connection pool.
Abstract method for creating a Request.
def request(self, ...): ... request = ... return self.response(request)
Sends a request to the remote server.
Once a request object has been constructed, the request() method can invoke this method to build the ProtocolConsumer and start the response. There should not be any reason to override this method. This method is run on this client event loop (obtained via the get_event_loop() method) thread.
Returns a suitable Connection for request.
First checks if an available open connection can be used. Alternatively it creates a new connection by invoking the ConnectionPool.get_or_create_connection() method on the appropiate connection pool.
If a new connection is created, the connection won’t be yet connected with end-point.
Update params with attributes from this Client.
an updated copy of params.
Close all connections in each connection_pools.
|Parameters:||async – if True flush the write buffer before closing (same as SocketTransport.close method).|
|Returns:||a Deferred called back once all connections are closed.|
Close all connections.
Send times requests asynchronously and evaluate the time taken to obtain all responses. In the standard implementation this method will open times Connection with the remote server. Usage:
client = Client(...) multi = client.timeit(100, ...) response = yield multi multi.total_time
|Returns:||a MultiDeferred which results in the list of results for the individual requests. Its MultiDeferred.total_time attribute indicates the number of seconds taken (once the deferred has been called back).|
A Producer of of active connections for client protocols.
It maintains a live set of Connection.
Address to connect to
Number of available connection in the pool.
Available connections are not currently in-use and therefore they can be selected when the get_or_create_connection() method is invoked.
Releases the connection back to the pool.
This function remove the connection from the set of concurrent connections and add it to the set of available connections.
Get or create a new connection for client.
If a connection is given and either
then it is chosen ahead of others in the pool.
A Client request.
A request object is hashable an it is used to select the appropriate ConnectionPool for the client request.
The socket address of the remote server
When True a protocol consumer release the connection back to the ConnectionPool once done with the request.
A transport layer security context or True.
Used to create SSL/TLS connections.