Documentation for pulsar 0.4.6. For development docs, go here.
Pulsar is built on top of a set of primitive classes which handle the different aspects of the asynchronous concurrent framework. These primitive classes are:
A second layer of classes, forming the concurrent framework, is built directly on top of pulsar primitives. These framework classes are:
Spawn a new Actor and return an ActorProxyDeferred. This method can be used from any Actor. If not in the Arbiter domain, the method send a request to the Arbiter to spawn a new actor, once the arbiter creates the actor it returns the proxy to the original caller.
Parameter kwargs
| Return type: | an ActorProxyDeferred. |
|---|
A typical usage:
>>> a = spawn()
>>> a.aid
'ba42b02b'
>>> a.called
True
>>> p = a.result
>>> p.address
('127.0.0.1', 46691)
Send an message to target to perform a given action.
| Parameters: |
|
|---|---|
| Return type: | an ActorMessage which is a Deferred and therefore can be used to attach callbacks. |
Typical example:
>>> a = spawn()
>>> r = a.add_callback(lambda p: send(p,'ping'))
>>> r.result
'pong'
At the core of the library we have the Actor class which defines the primitive of pulsar concurrent framework. Actor’s instances communicate with each other via messages in a share-nothing architecture.
The base class for concurrent programming in pulsar. In computer science, the Actor model is a mathematical model of concurrent computation that treats actors as the universal primitives of computation. In response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received.
Pulsar actors are slightly different from the general theory. They cannot create other actors, unless they are of special kind.
The current implementation allows for actors to perform specific tasks such as listening to a socket, acting as http server, consuming a task queue and so forth.
To spawn a new actor:
>>> from pulsar import Actor, spawn
>>> a = spawn(Actor)
>>> a.is_alive()
True
Here a is actually a reference to the remote actor.
ATTRIBUTES
Set of command names available to this Actor.
An optional distributed queue. If present, it signal that the Actor is a CPU bound worker receiving task requests on the ioqueue.
Default None.
Indicates if the Actor is a CPU-bound worker or a I/O-bound one. CPU-bound actors have a separate event loop for handling I/O events.
See also: ioqueue
An instance of IOLoop which listen for input/output events on a socket. This is different from the requestloop only for CPU-bound actors.
The total number of requests served by the actor
The current number of concurrent requests the actor is serving. Depending on the actor type, this number can be very high or max 1 (CPU bound actors).
Instance of a ActorProxy holding a reference to this Actor. The proxy is a lightweight representation of the actor which can be shared across different processes (i.e. it is pickable).
Dictionary of ActorProxy linked with this Actor.
Contains parameters which are passed to actors spawned by this actor.
METHODS
Current state description. One of initial, running, stopping, closed and terminated.
Called after forking to start the actor’s life. This is where logging is configured, the Actor.mailbox is registered and the Actor.ioloop is initialised and started.
Fetch the pulsar command for action.
Send a message to target to perform action with given parameters params. It return a ActorMessage.
True if actor is running.
True if actor has started. It does not necessarily mean it is running.
True if actor has exited in an clean fashion.
True if actor has exited.
boolean indicating if this is an actor on a child process.
Check if the actor can poll requests. This is used by CPU-bound actors only.
This function should be used when registering events on file descriptors registered with the requestloop.
The actor callback run once just before the actor starts (after forking) its event loop. Every attribute is available, therefore this is a chance to setup to perform custom initialisation before the actor starts running.
Handle an event on a file descriptor fd. This is what defines the life of an actor.
The actor callback run once just before the actor stops running.
The actor callback run once when the actor has stopped running, just before it vanish in the garbage collector.
An actor callback executed when obtaining information about the actor. It can be used to add additional data to the data dictionary. Information about the actor is obtained via the Actor.info() method which is also exposed as a remote function.
| Parameters: | data – dictionary of data with information about the actor. |
|---|---|
| Return type: | a dictionary of pickable data. |
Stop the actor by stopping its Actor.requestloop and closing its Actor.mailbox. Once everything is closed properly this actor will go out of scope.
Given an actor unique id return the actor proxy.
return A dictionary of information related to the actor status and performance.
Add the proxy to the linked_actors dictionary. if proxy is not a class:ActorProxy instance raise an exception.
Not an actor per se, this is a mixin for Actor which manages a pool (group) of actors. Given an actor_class it makes sure there are always cfg.workers alive. It is used by both the Arbiter and the Monitor classes.
dictionary with keys given by actor’s ids and values by ActorProxyMonitor instances. These are the actors managed by the pool.
A dictionary of ActorProxyMonitor which are in the process of being spawned.
The class derived form Actor which the monitor manages during its life time.
Default: Actor
Spawn a new Actor and return its ActorProxyMonitor.
Return a dictionary of parameters to be passed to the spawn method when creating new actors.
Remove Actor which are not alive from the PoolMixin.managed_actors and return the number of actors still alive.
| Parameters: |
|
|---|
If an actor failed to notify itself to the arbiter for more than the timeout. Stop the arbiter.
Maintain the number of workers by spawning or killing as required.
A monitor is a very special Actor and PoolMixin which shares the same IOLoop with the Arbiter and therefore lives in the main process domain. The Arbiter manages monitors which in turn manage a set of Actor performing similar tasks.
In other words, you may have a monitor managing actors for serving HTTP requests on a given port, another monitor managing actors consuming tasks from a task queue and so forth. You can think of Monitor as managers of pools of Actor.
Monitors are created by invoking the Arbiter.add_monitor() functions and not by directly invoking the constructor. Therefore adding a new monitor to the arbiter follows the pattern:
import pulsar
m = pulsar.arbiter().add_monitor(pulsar.Monitor,'mymonitor')
You can also create a monitor with a distributed queue as IO mechanism:
from multiprocessing import Queue
import pulsar
m = pulsar.arbiter().add_monitor(pulsar.Monitor,
'mymonitor',
ioqueue = Queue())
Monitors with distributed queues manage CPU-bound actors.
Monitor specific task called by the Monitor.periodic_task(). By default it does nothing. Override if you need to.
Overrides the Actor.on_task() actor callback to perform the monitor IOLoop tasks, which are:
The implementation goes as following:
Users shouldn’t need to override this method, but use Monitor.monitor_task() instead.
Spawn a new actor and add its ActorProxyMonitor to the PoolMixin.managed_actors dictionary.
The Arbiter is the most important a Actor and PoolMixin in pulsar concurrent framework. It is used as singleton in the main process and it manages one or more Monitor. It runs the main IOLoop of your concurrent application. It is the equivalent of the gunicorn arbiter, the twisted reactor and the tornado eventloop.
Users access the arbiter (in the arbiter process domain) by the high level api:
import pulsar
arbiter = pulsar.arbiter()
Add a new Monitor to the Arbiter.
| Parameters: |
|
|---|---|
| Return type: | an instance of a pulsar.Monitor. |
Stop the pools the message queue and remaining actors.
This is an important component in pulsar concurrent framework. An instance of this class is as a proxy for a remote underlying Actor. This is a lightweight class which delegates function calls to the underlying remote object.
It is pickable and therefore can be send from actor to actor using pulsar messaging. It exposes all the underlying command which have been implemented.
For example, lets say we have a proxy a, to send a message to it:
from pulsar import send
send(a, 'echo', 'hello there!')
will send the command echo to actor a with parameter "hello there!".
the socket address of the underlying Actor.mailbox.
Actor mailbox
Send an ActorMessage to the underlying actor (the receiver). This is the low level function call for communicating between actors.
| Parameters: | |
|---|---|
| Return type: | an asynchronous ActorMessage. |
A Deferred for an ActorProxy. This instance will be obtain and ActorProxy result once the remote Actor is fully functional.
A specialised ActorProxy class which contains additional information about the remote underlying pulsar.Actor. Unlike the pulsar.ActorProxy class, instances of this class are not pickable and therefore remain in the Arbiter process domain, which is the process where they have been created.
The Concurrency instance for the remote Actor.
True if underlying actor is alive
Terminate life of underlying actor.
Wait until the underlying actor terminates. If timeout is provided, it raises an exception if the timeout is reached.
Start the remote actor.
A message which travels from Actor to Actor to perform a specific command. ActorMessage are not directly initialised using the constructor, instead they are created by ActorProxy.send() method.
id of the actor sending the message.
id of the actor receiving the message.
command to be performed
Positional arguments in the message body
Optional arguments in the message body
Mailbox for an Actor. If the actor is a CPU bound worker, the class:Mailbox creates its own IOLoop which runs on a separate thread of execution.
alias of MailboxConnection
Actor implementation is responsible for the actual spawning of actors according to a concurrency implementation. Instances are pickable and are shared between the Actor and its ActorProxyMonitor.
| Parameters: |
|
|---|
Actor communicate with each other via Mailbox which each actor has in its process domain. When an actor communicate with another remote actor it does so by sending an action to it with positional and/or key-valued arguments. For example:
send(target, 'ping')
will send the ping action to target from the actor in the current context of execution. The above is equivalent to:
get_actor().send(target, 'ping')
Each action is implemented via the command() decorator implemented in the pulsar.async.commands module. A list of standard commands is available in the design documentation.
Decorator for pulsar command functions.
| Parameters: |
|
|---|
This section describes the asynchronous utilities used throughout the library and which form the building block of the event driven concurrent framework. While Actor represents the concurrent side of pulsar, the Deferred adds the asynchronous flavour to it by using callbacks functions similar to twisted.
Convert val into an Deferred asynchronous instance so that callbacks can be attached to it.
| Parameters: |
|
|---|---|
| Returns: | a Deferred instance. |
This function is useful when someone needs to treat a value as a deferred:
v = ...
make_async(v).add_callback(...)
Execute function f safely and always returns an asynchronous result.
| Parameters: |
|
|---|---|
| Returns: | a Deferred instance. |
Convert val into an asynchronous instance only if val is a generator or a function. If val is a Deferred it checks if it has been called and all callbacks have been consumed. In this case it returns the Deferred.result attribute.
The main class of the pulsar asynchronous tools. It is a callback which will be put off until later. The implementation is very similar to the twisted.defer.Deferred object.
True if the deferred was called. In this case the asynchronous result is ready and available in the result.
True if the deferred is running callbacks.
Integer indicating the number of times this Deferred has been paused because the result of a callback was another :class::Deferred.
This is available once the Deferred has been called back. Note, this can be anything, including another Deferred. Trying to access this attribute when called is False will result in an AttributeError exception.
Add a callback as a callable function. The function takes at most one argument, the result passed to the callback() method. If the errback callable is provided it will be called when an exception occurs.
Same as add_callback() but only for errors.
Equivalent to self.add_callback(callback, callback).
Run registered callbacks with the given result. This can only be run once. Later calls to this will raise AlreadyCalledError. If further callbacks are added after this point, add_callback() will run the callbacks immediately.
| Returns: | the result input parameter |
|---|
A Deferred for managing a stream if independent objects which may be Deferred.
If True items can no longer be added to this MultiDeferred.
The type of multideferred. Either a list or a dict.
Lock the MultiDeferred so that no new items can be added. If it was alread locked a runtime exception is raised.
Update the MultiDeferred with new data. It works for both list and dict types.
Append only works for a list type multideferred
A Deferred for a generator over, possibly, deferred objects. The callback will occur once the generator has stopped (when it raises StopIteration), or a preset maximum number of errors has occurred.
| Parameters: |
|
|---|
A decorator class which transforms a function into an asynchronous callable.
| Parameters: |
|
|---|
Typical usage:
@async()
def myfunction(...):
...
Decorator for a function func which returns an iterable over, possibly asynchronous, values. This decorator create an instance of a MultiDeferred called once all asynchronous values have been caled.
Decorator for raising failures
handling asynchronous sockets is an important task in pulsar. The core component for asynchronous I/O on sockets is the AsyncIOStream class which has been adapted from tornado web server.
Wrapper class for a python socket. It provides with higher level tools for creating and reusing sockets already created.
Same as the socket send method but it close the connection if not data was sent. In this case it also raises a socket error.
Wrap the socket accept method.
Options for a server socket
Shutdown and close the socket.
Framework class to write and read from a non-blocking socket. It is used everywhere in pulsar for handling asynchronous write() and read() operations with callbacks which can be used to act when data has just been sent or has just been received.
It was originally forked from tornado IOStream and subsequently adapted to pulsar concurrent framework.
A timeout in second which is used when waiting for a data to be available for reading. If timeout is a positive number, every time the AsyncIOStream performs a read() operation a timeout is also created on the ioloop.
Returns true if we are currently reading from the stream.
Returns true if we are currently writing to the stream.
Boolean indicating if the sock is closed.
Connects the socket to a remote address without blocking. May only be called if the socket passed to the constructor was not available or it was not previously connected. The address parameter is in the same format as for socket.connect, i.e. a (host, port) tuple or a string for unix sockets. If callback is specified, it will be called when the connection is completed. Note that it is safe to call IOStream.write while the connection is pending, in which case the data will be written as soon as the connection is ready. Calling IOStream read methods before the socket is connected works on some platforms but is non-portable.
Starts reading data from the sock. It returns a Deferred which will be called back once data is available. If this function is called while this class:AsyncIOStream is already reading a RuntimeError occurs.
| Return type: | a pulsar.Deferred instance. |
|---|
One common pattern of usage:
def parse(data):
...
io = AsyncIOStream(socket=sock)
io.read().add_callback(parse)
Starts reading data from the sock. It returns a Deferred which will be called back once data is available. If this function is called while this class:AsyncIOStream is already reading a RuntimeError occurs.
| Return type: | a pulsar.Deferred instance. |
|---|
One common pattern of usage:
def parse(data):
...
io = AsyncIOStream(socket=sock)
io.read().add_callback(parse)
Write the given data to this stream. If there was previously buffered write data and an old write callback, that callback is simply overwritten with this new callback.
| Return type: | a Deferred instance or the number of bytes written. |
|---|
Write the given data to this stream. If there was previously buffered write data and an old write callback, that callback is simply overwritten with this new callback.
| Return type: | a Deferred instance or the number of bytes written. |
|---|
Close the sock and call the callback if it was setup using the set_close_callback() method.
Call the given callback when the stream is closed.
A BaseSocket with a protocol for encoding and decoding messages. This is the base class for AsyncSocketServer, AsyncConnection and ClientSocketHandler.
A callable for building the protocol to encode and decode messages. This attribute can be specified as class attribute or via the constructor. A protocol must be an instance of a class exposing the encode and decode methods.
True if the socket is closed.
Callback just before closing the socket
Close this socket and log the failure if there was one.
A ProtocolSocket for asynchronous servers which listen for requests on a socket.
The Actor running this AsyncSocketServer.
The IOLoop used by this AsyncSocketServer for asynchronously sending and receiving data.
The set of all open AsyncConnection
If True the server has its own IOLoop running on a separate thread of execution. Otherwise it shares the actor.requestloop
The timeout when reading data in an asynchronous way.
A subclass of AsyncConnection. A new instance of this class is constructued each time a new connection has been established by the accept() method.
A subclass of AsyncResponse for handling responses to clients once data has been received and processed.
Accept a new connection from a remote client
An asynchronous client connection for a AsyncSocketServer. The connection maintains the client socket open for as long as it is required. A connection can handle several request/responses until it is closed.
The class AsyncSocketServer which created this AsyncConnection.
Class or callable for building an AsyncResponse object. It is initialised by the AsyncSocketServer.response_class but it can be changed at runtime when upgrading connections to new protocols. An example is the websocket protocol.
This function is called when data to parse is available on the ClientSocket.buffer. It should return parsed data or None if more data in the buffer is required.
Write data to socket.
An asynchronous server response is created once an AsyncConnection has available parsed data from a read operation. Instances of this class are iterable and produce chunk of data to send back to the remote client.
The __iter__ is the only method which needs to be implemented by derived classes. If an empty byte is yielded, the asynchronous engine will resume the iteration after one loop in the actor event loop.
The AsyncConnection for this response
The AsyncSocketServer for this response
Parsed data from remote client
Create a new server Socket for the given address. If the address is a tuple, a TCP socket is created. If it is a string, a Unix socket is created. Otherwise a TypeError is raised.
| Parameters: |
|
|---|---|
| Return type: | Instance of Socket |
Create a 127.0.0.1 (client,server) socket pair on any available port. The first socket is connected to the second, the server socket, which is bound to 127.0.0.1 at any available port.
| Parameters: |
|
|---|---|
| Return type: | tuple with two instances of Socket |
A level-triggered I/O event loop adapted from tornado.
| Parameters: | io – The I/O implementation. If not supplied, the best possible implementation available will be used. On posix system this is epoll, or else select. It can be any other custom implementation as long as it has an epoll like interface. Pulsar ships with an additional I/O implementation based on distributed queue IOQueue. |
|---|
ATTRIBUTES
The IO implementation
If True this is a CPU bound event loop, otherwise it is an I/O event loop. CPU bound loops can block the loop for considerable amount of time.
Total number of loops
The timeout in seconds when polling with epol or select.
Default: 0.5
The thread id where the eventloop is running
A list of callables to be executed at each iteration of the event loop. Task can be added and deleted via the add_task() and remove_task(). Extra care must be taken when adding tasks to I/O event loops. These tasks should be fast to perform and not block.
METHODS
Registers the given handler to receive the given events for the file descriptor fd.
| Parameters: |
|
|---|---|
| Return type: | True if the handler was succesfully added. |
Changes the events we listen for fd.
Stop listening for events on fd.
Stop the loop after the current event loop iteration is complete. If the event loop is not currently running, the next call to start() will return immediately.
To use asynchronous methods from otherwise-synchronous code (such as unit tests), you can start and stop the event loop like this:
ioloop = IOLoop()
async_method(ioloop=ioloop, callback=ioloop.stop)
ioloop.start()
start() will return after async_method has run its callback, whether that callback was invoked before or after ioloop.start.
Returns true if this IOLoop is currently running.
Add a timeout callback. A timeout callback it is called at the time deadline from the IOLoop. It returns an handle that may be passed to remove_timeout to cancel.
Cancels a pending timeout. The argument is an handle as returned by the add_timeout() method.
Calls the given callback on the next I/O loop iteration.
It is safe to call this method from any thread at any time. Note that this is the only method in IOLoop that makes this guarantee; all other interaction with the IOLoop must be done from that IOLoop’s thread. add_callback() may be used to transfer control from other threads to the IOLoop’s thread.
Add a PeriodicCallback to the event loop.
Wake up the eventloop.
Epoll like class for a IO based on queues rather than sockets. The interface is the same as the python epoll implementation.
The underlying distributed queue used for I/O.
Register a fd descriptor with the io queue object
Modify a registered file descriptor
Remove a registered file descriptor from the ioqueue object..
Wait for events. timeout in seconds (float)
An application interface for configuring and loading the various necessities for any given server or distributed application running on pulsar concurrent framework. Applications can be of any sorts or forms and the library is shipped with several battery included examples in the pulsar.apps framework module.
These are the most important facts about a pulsar Application
| Parameters: |
|
|---|
A string indicating the application namespace for configuration parameters.
Default: None.
A callable serving your application. The callable must be pickable, therefore it is either a function or a pickable object. If not provided, the application must implement the handler() method.
Default None
dictionary of default configuration parameters.
Default: {}.
Optional tuple containing names of configuration namespaces to be included in the application config dictionary.
Default: None
full path of the script which starts the application or None. If supplied it is used to setup the python path
Optional set of remote actions available on Actor created by this Application.
Default: None.
Application name, It is unique and defines the application.
Returns the callable application handler which is stored in Worker.app_handler, used by Worker to carry out its task. By default it returns the Application.callable.
Build a request class from a file descriptor fd and an event. The returned request instance is passed to the handle_request() method.
This is the main function which needs to be implemented by actual applications. It is called by the worker to handle a request.
| Parameters: |
|
|---|---|
| Return type: | It can be a generator, a Deferred instance or the actual response. |
Returns an I/O distributed queue for the application if one is needed. If a queue is returned, the application Worker will have a IOLoop instance based on the queue (via IOQueue).
By default it returns None.
Callback when configuration is initialised but not yet loaded. This is a chance to add extra config parameters or remove unwanted ones. It returns a new Config instance or None.
Callback when configuration is loaded. This is a chance to do an application specific check before the concurrent machinery is put into place. If it returns False the application will abort.
Load the application configuration from a file and/or from the command line. Called during application initialization.
| Parameters: |
|
|---|
The parameters overriding order is the following:
- default parameters.
- the params passed in the initialization.
- the parameters in the optional configuration file
- the parameters passed in the command line.
Returns an application handler for the ApplicationMonitor. By default it returns None.
Called by the Worker pulsar.Actor.on_start() callback method.
Called by the Worker pulsar.Actor.on_stop() callback method.
Called by the Worker pulsar.Actor.on_exit() callback method.
A chance to override the dictionary of parameters params before a new Worker is spawned.
Callback by ApplicationMonitor when starting. The application is now in the arbiter but has not yet started.
Callback by ApplicationMonitor when stopping
Callback by ApplicationMonitor at exit
Callback by ApplicationMonitor at each event loop
Start the application if it wasn’t already started.
An Actor for serving a pulsar Application.
Instance of the Application to be performed by the worker
Configuration dictionary
The application handler obtained from Application.handler().
A Monitor for managing a pulsar Application.
The monitor application handler obtained from Application.monitor_handler().
Dictionary containing Setting parameters for fine tuning pulsar servers. It provides easy access to Setting.value attribute by exposing the Setting.name as attribute.
Dictionary of all Settings instances available. The keys are given by the Setting.name attribute.
Create the argparser for this configuration by adding all settings via the Setting.add_argument().
| Return type: | an instance of argparse.ArgumentParser. |
|---|
Invoked by a pulsar.Application just before starting.
A configuration parameter for pulsar. Parameters can be specified on the command line or on a config file.
If set to True the settings won’t be loaded and it can be only used as base class for other settings.
Setting for a specific Application.
Restrict the argument to the choices provided.
Default value
List of options strings, e.g. [-f, --foo].
For positional arguments. Same usage as argparse.
Setting section, used for creating documentation.
Add itself to the argparser.
Base class of all Pulsar Exception
Raised when a Deferred instance receives more than one Deferred.callback().
Check if arguments respect a given function arity and return a error message if the check did not pass, otherwise it returns None.
| Parameters: |
|
|---|