Documentation for pulsar 0.4.6. For development docs, go here.

API

Pulsar is built on top of a set of primitive classes which handle the different aspects of the asynchronous concurrent framework. These primitive classes are:

  • Deferred the primitive for handling asynchronous execution.
  • Actor the primitive for handling parallel execution.
  • IOLoop the primitive for handling asynchronous events.
  • Socket the primitive for sockets.

A second layer of classes, forming the concurrent framework, is built directly on top of pulsar primitives. These framework classes are:

High level functions

spawn

pulsar.spawn(cfg=None, **kwargs)

Spawn a new Actor and return an ActorProxyDeferred. This method can be used from any Actor. If not in the Arbiter domain, the method send a request to the Arbiter to spawn a new actor, once the arbiter creates the actor it returns the proxy to the original caller.

Parameter kwargs

These optional parameters are:
Return type:an ActorProxyDeferred.

A typical usage:

>>> a = spawn()
>>> a.aid
'ba42b02b'
>>> a.called
True
>>> p = a.result
>>> p.address
('127.0.0.1', 46691)

send

pulsar.send(target, action, *args, **params)

Send an message to target to perform a given action.

Parameters:
  • target – the Actor id or an ActorProxy or name of the target actor which will receive the message.
  • action – the name of the remote command to perform in the target Actor.
  • args – positional arguments to pass to the remote command action.
  • params – dictionary of parameters to pass to remote command action.
Return type:

an ActorMessage which is a Deferred and therefore can be used to attach callbacks.

Typical example:

>>> a = spawn()
>>> r = a.add_callback(lambda p: send(p,'ping'))
>>> r.result
'pong'

get_actor

pulsar.get_actor()

Return the Actor in the current thread/process

Actors

At the core of the library we have the Actor class which defines the primitive of pulsar concurrent framework. Actor’s instances communicate with each other via messages in a share-nothing architecture.

Actor

class pulsar.Actor(impl)

The base class for concurrent programming in pulsar. In computer science, the Actor model is a mathematical model of concurrent computation that treats actors as the universal primitives of computation. In response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received.

Pulsar actors are slightly different from the general theory. They cannot create other actors, unless they are of special kind.

The current implementation allows for actors to perform specific tasks such as listening to a socket, acting as http server, consuming a task queue and so forth.

To spawn a new actor:

>>> from pulsar import Actor, spawn
>>> a = spawn(Actor)
>>> a.is_alive()
True

Here a is actually a reference to the remote actor.

ATTRIBUTES

name

The name of this Actor.

aid

Unique ID for this Actor.

commands_set

Set of command names available to this Actor.

ioqueue

An optional distributed queue. If present, it signal that the Actor is a CPU bound worker receiving task requests on the ioqueue.

Default None.

cpubound

Indicates if the Actor is a CPU-bound worker or a I/O-bound one. CPU-bound actors have a separate event loop for handling I/O events.

See also: ioqueue

requestloop

The IOLoop to listen for requests.

ioloop

An instance of IOLoop which listen for input/output events on a socket. This is different from the requestloop only for CPU-bound actors.

request_processed

The total number of requests served by the actor

concurrent_requests

The current number of concurrent requests the actor is serving. Depending on the actor type, this number can be very high or max 1 (CPU bound actors).

proxy

Instance of a ActorProxy holding a reference to this Actor. The proxy is a lightweight representation of the actor which can be shared across different processes (i.e. it is pickable).

linked_actors

Dictionary of ActorProxy linked with this Actor.

params

Contains parameters which are passed to actors spawned by this actor.

METHODS

info_state

Current state description. One of initial, running, stopping, closed and terminated.

start()

Called after forking to start the actor’s life. This is where logging is configured, the Actor.mailbox is registered and the Actor.ioloop is initialised and started.

command(action)

Fetch the pulsar command for action.

send(target, action, *args, **params)

Send a message to target to perform action with given parameters params. It return a ActorMessage.

put(request)

Put a request into the ioqueue if available.

running()

True if actor is running.

active()

True if actor is active by being both running and having the ioloop running.

started()

True if actor has started. It does not necessarily mean it is running.

closed()

True if actor has exited in an clean fashion.

stopped()

True if actor has exited.

is_arbiter()

Return True if self is the Arbiter.

is_monitor()

Return True if self is a Monitor.

is_process()

boolean indicating if this is an actor on a child process.

can_poll()

Check if the actor can poll requests. This is used by CPU-bound actors only.

handle_fd_event(*args, **kwargs)

This function should be used when registering events on file descriptors registered with the requestloop.

on_start()

The actor callback run once just before the actor starts (after forking) its event loop. Every attribute is available, therefore this is a chance to setup to perform custom initialisation before the actor starts running.

on_event(fd, event)

Handle an event on a file descriptor fd. This is what defines the life of an actor.

on_stop()

The actor callback run once just before the actor stops running.

on_exit()

The actor callback run once when the actor has stopped running, just before it vanish in the garbage collector.

on_info(data)

An actor callback executed when obtaining information about the actor. It can be used to add additional data to the data dictionary. Information about the actor is obtained via the Actor.info() method which is also exposed as a remote function.

Parameters:data – dictionary of data with information about the actor.
Return type:a dictionary of pickable data.
stop(force=False, exit_code=None)

Stop the actor by stopping its Actor.requestloop and closing its Actor.mailbox. Once everything is closed properly this actor will go out of scope.

exit(result=None)

Exit from the Actor domain.

get_actor(aid)

Given an actor unique id return the actor proxy.

info()

return A dictionary of information related to the actor status and performance.

Add the proxy to the linked_actors dictionary. if proxy is not a class:ActorProxy instance raise an exception.

PoolMixin

class pulsar.PoolMixin

Not an actor per se, this is a mixin for Actor which manages a pool (group) of actors. Given an actor_class it makes sure there are always cfg.workers alive. It is used by both the Arbiter and the Monitor classes.

managed_actors

dictionary with keys given by actor’s ids and values by ActorProxyMonitor instances. These are the actors managed by the pool.

spawning_actors

A dictionary of ActorProxyMonitor which are in the process of being spawned.

actor_class

The class derived form Actor which the monitor manages during its life time.

Default: Actor
spawn(actor_class=None, linked_actors=None, montitor=None, **params)

Spawn a new Actor and return its ActorProxyMonitor.

actorparams()

Return a dictionary of parameters to be passed to the spawn method when creating new actors.

manage_actors(terminate=False, stop=False, manage=True)

Remove Actor which are not alive from the PoolMixin.managed_actors and return the number of actors still alive.

Parameters:
  • terminate – if True force termination of alive actors.
  • stop – if True stops all alive actor.
  • manage – if True it checks if alive actors are still responsive.
manage_actor(actor)

If an actor failed to notify itself to the arbiter for more than the timeout. Stop the arbiter.

spawn_actors()

Spawn new actors if needed. If the PoolMixin is spawning do nothing.

stop_actors()

Maintain the number of workers by spawning or killing as required.

close_actors(*args, **kwargs)

Close all managed Actor.

Monitor

class pulsar.Monitor(impl)

A monitor is a very special Actor and PoolMixin which shares the same IOLoop with the Arbiter and therefore lives in the main process domain. The Arbiter manages monitors which in turn manage a set of Actor performing similar tasks.

In other words, you may have a monitor managing actors for serving HTTP requests on a given port, another monitor managing actors consuming tasks from a task queue and so forth. You can think of Monitor as managers of pools of Actor.

Monitors are created by invoking the Arbiter.add_monitor() functions and not by directly invoking the constructor. Therefore adding a new monitor to the arbiter follows the pattern:

import pulsar

m = pulsar.arbiter().add_monitor(pulsar.Monitor,'mymonitor')

You can also create a monitor with a distributed queue as IO mechanism:

from multiprocessing import Queue
import pulsar

m = pulsar.arbiter().add_monitor(pulsar.Monitor,
                                 'mymonitor',
                                 ioqueue = Queue())

Monitors with distributed queues manage CPU-bound actors.

monitor_task()

Monitor specific task called by the Monitor.periodic_task(). By default it does nothing. Override if you need to.

periodic_task()

Overrides the Actor.on_task() actor callback to perform the monitor IOLoop tasks, which are:

  • To maintain a responsive set of actors ready to perform their duty.
  • To perform its own tasks.

The implementation goes as following:

Users shouldn’t need to override this method, but use Monitor.monitor_task() instead.

actorparams()

Spawn a new actor and add its ActorProxyMonitor to the PoolMixin.managed_actors dictionary.

Arbiter

class pulsar.Arbiter(impl)

The Arbiter is the most important a Actor and PoolMixin in pulsar concurrent framework. It is used as singleton in the main process and it manages one or more Monitor. It runs the main IOLoop of your concurrent application. It is the equivalent of the gunicorn arbiter, the twisted reactor and the tornado eventloop.

Users access the arbiter (in the arbiter process domain) by the high level api:

import pulsar

arbiter = pulsar.arbiter()
add_monitor(monitor_class, monitor_name, **params)

Add a new Monitor to the Arbiter.

Parameters:
  • monitor_class – a pulsar.Monitor class.
  • monitor_name – a unique name for the monitor.
  • kwargs – dictionary of key-valued parameters for the monitor.
Return type:

an instance of a pulsar.Monitor.

get_all_monitors()

A dictionary of all Monitor in the arbiter

close_monitors(*args, **kwargs)

Close all Monitor at once.

on_stop(*args, **kwargs)

Stop the pools the message queue and remaining actors.

ActorProxy

class pulsar.ActorProxy(impl)

This is an important component in pulsar concurrent framework. An instance of this class is as a proxy for a remote underlying Actor. This is a lightweight class which delegates function calls to the underlying remote object.

It is pickable and therefore can be send from actor to actor using pulsar messaging. It exposes all the underlying command which have been implemented.

For example, lets say we have a proxy a, to send a message to it:

from pulsar import send

send(a, 'echo', 'hello there!')

will send the command echo to actor a with parameter "hello there!".

aid

Unique ID for the remote Actor

address

the socket address of the underlying Actor.mailbox.

mailbox

Actor mailbox

receive_from(sender, command, *args, **kwargs)

Send an ActorMessage to the underlying actor (the receiver). This is the low level function call for communicating between actors.

Parameters:
  • senderActor sending the message.
  • command – the command to perform in the actor underlying this proxy.
  • args – non positional arguments of command.
  • kwargs – key-valued arguments of command.
Return type:

an asynchronous ActorMessage.

stop(sender=None)

Stop the remote Actor

ActorProxyDeferred

class pulsar.ActorProxyDeferred(aid, msg=None)

A Deferred for an ActorProxy. This instance will be obtain and ActorProxy result once the remote Actor is fully functional.

aid

The the remote Actor id

ActorProxyMonitor

class pulsar.ActorProxyMonitor(impl)

A specialised ActorProxy class which contains additional information about the remote underlying pulsar.Actor. Unlike the pulsar.ActorProxy class, instances of this class are not pickable and therefore remain in the Arbiter process domain, which is the process where they have been created.

impl

The Concurrency instance for the remote Actor.

info

Dictionary of information regarding the remote Actor

is_alive()

True if underlying actor is alive

terminate()

Terminate life of underlying actor.

join(timeout=None)

Wait until the underlying actor terminates. If timeout is provided, it raises an exception if the timeout is reached.

start()

Start the remote actor.

ActorMessage

class pulsar.ActorMessage(command, sender=None, receiver=None, args=None, kwargs=None)

A message which travels from Actor to Actor to perform a specific command. ActorMessage are not directly initialised using the constructor, instead they are created by ActorProxy.send() method.

sender

id of the actor sending the message.

receiver

id of the actor receiving the message.

command

command to be performed

args

Positional arguments in the message body

kwargs

Optional arguments in the message body

Mailbox

class pulsar.Mailbox(actor, socket, onthread=False, connection_class=None, response_class=None, timeout=None, **kwargs)

Mailbox for an Actor. If the actor is a CPU bound worker, the class:Mailbox creates its own IOLoop which runs on a separate thread of execution.

connection_class

alias of MailboxConnection

Concurrency

class pulsar.Concurrency

Actor implementation is responsible for the actual spawning of actors according to a concurrency implementation. Instances are pickable and are shared between the Actor and its ActorProxyMonitor.

Parameters:
  • concurrency – string indicating the concurrency implementation. Valid choices are monitor, process and thread.
  • actor_classActor or one of its subclasses.
  • timeout – timeout in seconds for the actor.
  • kwargs – additional key-valued arguments to be passed to the actor constructor.

Remote Commands

Actor communicate with each other via Mailbox which each actor has in its process domain. When an actor communicate with another remote actor it does so by sending an action to it with positional and/or key-valued arguments. For example:

send(target, 'ping')

will send the ping action to target from the actor in the current context of execution. The above is equivalent to:

get_actor().send(target, 'ping')

Each action is implemented via the command() decorator implemented in the pulsar.async.commands module. A list of standard commands is available in the design documentation.

pulsar command

class pulsar.command(ack=True, authenticated=False, internal=False, commands_set=None)

Decorator for pulsar command functions.

Parameters:
  • ackTrue if the command acknowledge the sender with a response. Usually is set to True (which is also the default value).
  • authenticated – If True the action can only be invoked by remote actors which have authenticated with the actor for which the action has been requested.
  • internal – Internal commands are for internal use only, not for external clients. They accept the actor proxy calling as third positional arguments.

Asyncronous Tools

This section describes the asynchronous utilities used throughout the library and which form the building block of the event driven concurrent framework. While Actor represents the concurrent side of pulsar, the Deferred adds the asynchronous flavour to it by using callbacks functions similar to twisted.

Make Async

pulsar.make_async(val=None, description=None, max_errors=None)

Convert val into an Deferred asynchronous instance so that callbacks can be attached to it.

Parameters:
  • val – can be a generator or any other value. If a generator, a DeferredGenerator instance will be returned.
  • max_errors – the maximum number of errors tolerated if val is a generator. Default None.
Returns:

a Deferred instance.

This function is useful when someone needs to treat a value as a deferred:

v = ...
make_async(v).add_callback(...)

Safe Async

pulsar.safe_async(f, args=None, kwargs=None, description=None, max_errors=None)

Execute function f safely and always returns an asynchronous result.

Parameters:
  • f – function to execute
  • args – tuple of positional arguments for f.
  • kwargs – dictionary of key-word parameters for f.
  • description – Optional description for the Deferred returned.
  • max_errors – the maximum number of errors tolerated if a DeferredGenerator is returned.
Returns:

a Deferred instance.

Maybe Async

pulsar.maybe_async(val, description=None, max_errors=None)

Convert val into an asynchronous instance only if val is a generator or a function. If val is a Deferred it checks if it has been called and all callbacks have been consumed. In this case it returns the Deferred.result attribute.

Deferred

class pulsar.Deferred(description=None)

The main class of the pulsar asynchronous tools. It is a callback which will be put off until later. The implementation is very similar to the twisted.defer.Deferred object.

called

True if the deferred was called. In this case the asynchronous result is ready and available in the result.

running

True if the deferred is running callbacks.

paused

Integer indicating the number of times this Deferred has been paused because the result of a callback was another :class::Deferred.

result

This is available once the Deferred has been called back. Note, this can be anything, including another Deferred. Trying to access this attribute when called is False will result in an AttributeError exception.

add_callback(callback, errback=None)

Add a callback as a callable function. The function takes at most one argument, the result passed to the callback() method. If the errback callable is provided it will be called when an exception occurs.

add_errback(errback)

Same as add_callback() but only for errors.

addBoth(callback)

Equivalent to self.add_callback(callback, callback).

callback(result=None)

Run registered callbacks with the given result. This can only be run once. Later calls to this will raise AlreadyCalledError. If further callbacks are added after this point, add_callback() will run the callbacks immediately.

Returns:the result input parameter
result_or_self()

It returns the result only if available and all callbacks have been consumed, otherwise it returns this Deferred. Users should use this method to obtain the result, rather than accessing directly the result attribute.

Multi Deferred

class pulsar.MultiDeferred(data=None, type=None, fireOnOneErrback=False, handle_value=None)

A Deferred for managing a stream if independent objects which may be Deferred.

lock

If True items can no longer be added to this MultiDeferred.

type

The type of multideferred. Either a list or a dict.

lock()

Lock the MultiDeferred so that no new items can be added. If it was alread locked a runtime exception is raised.

update(stream)

Update the MultiDeferred with new data. It works for both list and dict types.

append(value)

Append only works for a list type multideferred

Deferred Generator

class pulsar.DeferredGenerator(gen, max_errors=None, description=None)

A Deferred for a generator over, possibly, deferred objects. The callback will occur once the generator has stopped (when it raises StopIteration), or a preset maximum number of errors has occurred.

Parameters:
  • gen – a generator.
  • max_errors – The maximum number of exceptions allowed before stopping the generator and raise exceptions. By default the generator will continue regardless of errors, accumulating them into the final result.

Failure

class pulsar.Failure(err=None, msg=None)

Aggregate failures during Deferred callbacks.

traces

List of (errorType, errvalue, traceback) occured during the execution of a Deferred.

append(trace)

Add new failure to self.

Decorators

class pulsar.async(max_errors=None, description=None)

A decorator class which transforms a function into an asynchronous callable.

Parameters:
  • max_errors – The maximum number of errors permitted if the asynchronous value is a DeferredGenerator.
  • description – optional description.

Typical usage:

@async()
def myfunction(...):
    ...
pulsar.multi_async(func)

Decorator for a function func which returns an iterable over, possibly asynchronous, values. This decorator create an instance of a MultiDeferred called once all asynchronous values have been caled.

pulsar.raise_failure(f)

Decorator for raising failures

Sockets

handling asynchronous sockets is an important task in pulsar. The core component for asynchronous I/O on sockets is the AsyncIOStream class which has been adapted from tornado web server.

Base Socket

class pulsar.BaseSocket

Base class for IStream using a socket as I/O mechanism.

sock

The underlying socket

async

True if this is an asynchronous socket.

address

same as getsockname().

getsockname()

The socket name if open otherwise None.

Socket

class pulsar.Socket(address=None, backlog=2048, fd=None, bound=False, is_server=None)

Wrapper class for a python socket. It provides with higher level tools for creating and reusing sockets already created.

write(data)

Same as the socket send method but it close the connection if not data was sent. In this case it also raises a socket error.

accept()

Wrap the socket accept method.

set_options(sock, address, bound)

Options for a server socket

close(log=None)

Shutdown and close the socket.

AsyncIOStream

class pulsar.AsyncIOStream(socket=None, max_buffer_size=None, read_chunk_size=None, timeout=None)

Framework class to write and read from a non-blocking socket. It is used everywhere in pulsar for handling asynchronous write() and read() operations with callbacks which can be used to act when data has just been sent or has just been received.

It was originally forked from tornado IOStream and subsequently adapted to pulsar concurrent framework.

socket

A Socket which might be connected or unconnected.

timeout

A timeout in second which is used when waiting for a data to be available for reading. If timeout is a positive number, every time the AsyncIOStream performs a read() operation a timeout is also created on the ioloop.

reading

Returns true if we are currently reading from the stream.

writing

Returns true if we are currently writing to the stream.

closed

Boolean indicating if the sock is closed.

connect(address)

Connects the socket to a remote address without blocking. May only be called if the socket passed to the constructor was not available or it was not previously connected. The address parameter is in the same format as for socket.connect, i.e. a (host, port) tuple or a string for unix sockets. If callback is specified, it will be called when the connection is completed. Note that it is safe to call IOStream.write while the connection is pending, in which case the data will be written as soon as the connection is ready. Calling IOStream read methods before the socket is connected works on some platforms but is non-portable.

read(length=None)

Starts reading data from the sock. It returns a Deferred which will be called back once data is available. If this function is called while this class:AsyncIOStream is already reading a RuntimeError occurs.

Return type:a pulsar.Deferred instance.

One common pattern of usage:

def parse(data):
    ...

io = AsyncIOStream(socket=sock)
io.read().add_callback(parse)
recv(length=None)

Starts reading data from the sock. It returns a Deferred which will be called back once data is available. If this function is called while this class:AsyncIOStream is already reading a RuntimeError occurs.

Return type:a pulsar.Deferred instance.

One common pattern of usage:

def parse(data):
    ...

io = AsyncIOStream(socket=sock)
io.read().add_callback(parse)
write(data)

Write the given data to this stream. If there was previously buffered write data and an old write callback, that callback is simply overwritten with this new callback.

Return type:a Deferred instance or the number of bytes written.
sendall(data)

Write the given data to this stream. If there was previously buffered write data and an old write callback, that callback is simply overwritten with this new callback.

Return type:a Deferred instance or the number of bytes written.
close()

Close the sock and call the callback if it was setup using the set_close_callback() method.

set_close_callback(callback)

Call the given callback when the stream is closed.

Socket with Protocol

class pulsar.ProtocolSocket

A BaseSocket with a protocol for encoding and decoding messages. This is the base class for AsyncSocketServer, AsyncConnection and ClientSocketHandler.

on_closed

A Deferred which receives a callback once the close() method is invoked.

protocol_factory

A callable for building the protocol to encode and decode messages. This attribute can be specified as class attribute or via the constructor. A protocol must be an instance of a class exposing the encode and decode methods.

closed

True if the socket is closed.

on_close(failure=None)

Callback just before closing the socket

close(msg=None)

Close this socket and log the failure if there was one.

AsyncSocketServer

class pulsar.AsyncSocketServer(actor, socket, onthread=False, connection_class=None, response_class=None, timeout=None, **kwargs)

A ProtocolSocket for asynchronous servers which listen for requests on a socket.

actor

The Actor running this AsyncSocketServer.

ioloop

The IOLoop used by this AsyncSocketServer for asynchronously sending and receiving data.

connections

The set of all open AsyncConnection

onthread

If True the server has its own IOLoop running on a separate thread of execution. Otherwise it shares the actor.requestloop

timeout

The timeout when reading data in an asynchronous way.

connection_class

A subclass of AsyncConnection. A new instance of this class is constructued each time a new connection has been established by the accept() method.

response_class

A subclass of AsyncResponse for handling responses to clients once data has been received and processed.

accept()

Accept a new connection from a remote client

AsyncConnection

class pulsar.AsyncConnection(sock, address, server, **kwargs)

An asynchronous client connection for a AsyncSocketServer. The connection maintains the client socket open for as long as it is required. A connection can handle several request/responses until it is closed.

server

The class AsyncSocketServer which created this AsyncConnection.

response_class

Class or callable for building an AsyncResponse object. It is initialised by the AsyncSocketServer.response_class but it can be changed at runtime when upgrading connections to new protocols. An example is the websocket protocol.

request_data()

This function is called when data to parse is available on the ClientSocket.buffer. It should return parsed data or None if more data in the buffer is required.

write(data)

Write data to socket.

AsyncResponse

class pulsar.AsyncResponse(connection, parsed_data)

An asynchronous server response is created once an AsyncConnection has available parsed data from a read operation. Instances of this class are iterable and produce chunk of data to send back to the remote client.

The __iter__ is the only method which needs to be implemented by derived classes. If an empty byte is yielded, the asynchronous engine will resume the iteration after one loop in the actor event loop.

connection

The AsyncConnection for this response

server

The AsyncSocketServer for this response

parsed_data

Parsed data from remote client

Utilities

pulsar.create_socket(address, logger=None, backlog=2048, bound=False, retry=5, retry_lag=2, retry_step=2, interval_max=30)

Create a new server Socket for the given address. If the address is a tuple, a TCP socket is created. If it is a string, a Unix socket is created. Otherwise a TypeError is raised.

Parameters:
  • address – Socket address.
  • logger – Optional python logger instance.
  • backlog – The maximum number of pending connections or None. if None this is a client socket.
  • bound – If False the socket will bind to address otherwise it is assumed to be already bound.
  • retry – Number of retries before aborting.
  • retry_lag – Number of seconds between connection attempts.
Return type:

Instance of Socket

pulsar.socket_pair(backlog=2048, logger=None, blocking=0)

Create a 127.0.0.1 (client,server) socket pair on any available port. The first socket is connected to the second, the server socket, which is bound to 127.0.0.1 at any available port.

Parameters:
  • backlog – number of connection to listen.
  • logger – optional python logger.
Return type:

tuple with two instances of Socket

Eventloop

IOLoop

class pulsar.IOLoop(io=None, logger=None, poll_timeout=None)

A level-triggered I/O event loop adapted from tornado.

Parameters:io – The I/O implementation. If not supplied, the best possible implementation available will be used. On posix system this is epoll, or else select. It can be any other custom implementation as long as it has an epoll like interface. Pulsar ships with an additional I/O implementation based on distributed queue IOQueue.

ATTRIBUTES

_impl

The IO implementation

cpubound

If True this is a CPU bound event loop, otherwise it is an I/O event loop. CPU bound loops can block the loop for considerable amount of time.

num_loops

Total number of loops

poll_timeout

The timeout in seconds when polling with epol or select.

Default: 0.5

tid

The thread id where the eventloop is running

tasks

A list of callables to be executed at each iteration of the event loop. Task can be added and deleted via the add_task() and remove_task(). Extra care must be taken when adding tasks to I/O event loops. These tasks should be fast to perform and not block.

METHODS

add_handler(fd, handler, events)

Registers the given handler to receive the given events for the file descriptor fd.

Parameters:
  • fd – A file descriptor or an object with the fileno method.
  • handler – A callable which will be called when events occur on the file descriptor fd.
Return type:

True if the handler was succesfully added.

update_handler(fd, events)

Changes the events we listen for fd.

remove_handler(fd)

Stop listening for events on fd.

stop()

Stop the loop after the current event loop iteration is complete. If the event loop is not currently running, the next call to start() will return immediately.

To use asynchronous methods from otherwise-synchronous code (such as unit tests), you can start and stop the event loop like this:

ioloop = IOLoop()
async_method(ioloop=ioloop, callback=ioloop.stop)
ioloop.start()

start() will return after async_method has run its callback, whether that callback was invoked before or after ioloop.start.

running()

Returns true if this IOLoop is currently running.

add_timeout(deadline, callback)

Add a timeout callback. A timeout callback it is called at the time deadline from the IOLoop. It returns an handle that may be passed to remove_timeout to cancel.

remove_timeout(timeout)

Cancels a pending timeout. The argument is an handle as returned by the add_timeout() method.

add_callback(callback, wake=True)

Calls the given callback on the next I/O loop iteration.

It is safe to call this method from any thread at any time. Note that this is the only method in IOLoop that makes this guarantee; all other interaction with the IOLoop must be done from that IOLoop’s thread. add_callback() may be used to transfer control from other threads to the IOLoop’s thread.

add_periodic(callback, period)

Add a PeriodicCallback to the event loop.

wake()

Wake up the eventloop.

IOQueue

class pulsar.IOQueue(queue, actor=None)

Epoll like class for a IO based on queues rather than sockets. The interface is the same as the python epoll implementation.

queue

The underlying distributed queue used for I/O.

register(fd, events=None)

Register a fd descriptor with the io queue object

modify(fd, events=None)

Modify a registered file descriptor

unregister(fd)

Remove a registered file descriptor from the ioqueue object..

poll(timeout=0.5)

Wait for events. timeout in seconds (float)

PeriodicCallback

class pulsar.PeriodicCallback(callback, callback_time, ioloop)

Schedules the given callback to be called periodically.

The callback is called every callback_time seconds.

Application

class pulsar.Application(callable=None, description=None, name=None, epilog=None, argv=None, script=None, version=None, parse_console=True, commands_set=None, cfg=None, **kwargs)

An application interface for configuring and loading the various necessities for any given server or distributed application running on pulsar concurrent framework. Applications can be of any sorts or forms and the library is shipped with several battery included examples in the pulsar.apps framework module.

These are the most important facts about a pulsar Application

  • Instances must be pickable. If non-pickable data needs to be add on an Application instance, it must be stored on the Application.local dictionary.
  • When a new Application is initialized, a new ApplicationMonitor instance is added to the Arbiter, ready to perform its duties.
Parameters:
  • callable – Initialise the Application.callable attribute.
  • description – A string describing the application. It will be displayed on the command line.
  • epilog – Epilog string you will see when interacting with the command line.
  • name – Application name. If not provided the class name in lower case is used
  • commands_set – Initialise the commands_set attribute.
  • params – a dictionary of configuration parameters which overrides the defaults and the cfg attribute. They will be overritten by a config file or command line arguments.
app

A string indicating the application namespace for configuration parameters.

Default: None.

callable

A callable serving your application. The callable must be pickable, therefore it is either a function or a pickable object. If not provided, the application must implement the handler() method.

Default None

cfg

dictionary of default configuration parameters.

Default: {}.

cfg_apps

Optional tuple containing names of configuration namespaces to be included in the application config dictionary.

Default: None

script

full path of the script which starts the application or None. If supplied it is used to setup the python path

commands_set

Optional set of remote actions available on Actor created by this Application.

Default: None.

name

Application name, It is unique and defines the application.

handler()

Returns the callable application handler which is stored in Worker.app_handler, used by Worker to carry out its task. By default it returns the Application.callable.

request_instance(worker, fd, event)

Build a request class from a file descriptor fd and an event. The returned request instance is passed to the handle_request() method.

handle_request(worker, request)

This is the main function which needs to be implemented by actual applications. It is called by the worker to handle a request.

Parameters:
  • worker – the Worker handling the request.
  • request – an application specific request object.
Return type:

It can be a generator, a Deferred instance or the actual response.

get_ioqueue()

Returns an I/O distributed queue for the application if one is needed. If a queue is returned, the application Worker will have a IOLoop instance based on the queue (via IOQueue).

By default it returns None.

on_config_init(cfg, params)

Callback when configuration is initialised but not yet loaded. This is a chance to add extra config parameters or remove unwanted ones. It returns a new Config instance or None.

on_config()

Callback when configuration is loaded. This is a chance to do an application specific check before the concurrent machinery is put into place. If it returns False the application will abort.

load_config(argv, version, parse_console, params)

Load the application configuration from a file and/or from the command line. Called during application initialization.

Parameters:
  • argv – list of command line parameters to parse.
  • version – The version of this application.
  • parse_console – True if the console parameters need parsing.
  • params – dictionary of parameters passed during construction.

The parameters overriding order is the following:

  • default parameters.
  • the params passed in the initialization.
  • the parameters in the optional configuration file
  • the parameters passed in the command line.
monitor_handler()

Returns an application handler for the ApplicationMonitor. By default it returns None.

worker_start(worker)

Called by the Worker pulsar.Actor.on_start() callback method.

worker_stop(worker)

Called by the Worker pulsar.Actor.on_stop() callback method.

worker_exit(worker)

Called by the Worker pulsar.Actor.on_exit() callback method.

actorparams(monitor, params)

A chance to override the dictionary of parameters params before a new Worker is spawned.

monitor_start(monitor)

Callback by ApplicationMonitor when starting. The application is now in the arbiter but has not yet started.

monitor_stop(monitor)

Callback by ApplicationMonitor when stopping

monitor_exit(monitor)

Callback by ApplicationMonitor at exit

monitor_task(monitor)

Callback by ApplicationMonitor at each event loop

start()

Start the application if it wasn’t already started.

Application Worker

class pulsar.Worker(impl)

An Actor for serving a pulsar Application.

app

Instance of the Application to be performed by the worker

cfg

Configuration dictionary

app_handler

The application handler obtained from Application.handler().

Application Monitor

class pulsar.ApplicationMonitor(impl)

A Monitor for managing a pulsar Application.

app_handler

The monitor application handler obtained from Application.monitor_handler().

actor_class

alias of Worker

Utilities

Config

class pulsar.Config(description=None, epilog=None, version=None, app=None, include=None, exclude=None, settings=None)

Dictionary containing Setting parameters for fine tuning pulsar servers. It provides easy access to Setting.value attribute by exposing the Setting.name as attribute.

settings

Dictionary of all Settings instances available. The keys are given by the Setting.name attribute.

parser()

Create the argparser for this configuration by adding all settings via the Setting.add_argument().

Return type:an instance of argparse.ArgumentParser.
on_start()

Invoked by a pulsar.Application just before starting.

Setting

class pulsar.Setting(name=None, flags=None, action=None, type=None, default=None, nargs=None, desc=None, validator=None)

A configuration parameter for pulsar. Parameters can be specified on the command line or on a config file.

virtual

If set to True the settings won’t be loaded and it can be only used as base class for other settings.

app

Setting for a specific Application.

choices

Restrict the argument to the choices provided.

default

Default value

flags

List of options strings, e.g. [-f, --foo].

nargs

For positional arguments. Same usage as argparse.

section

Setting section, used for creating documentation.

add_argument(parser)

Add itself to the argparser.

Exceptions

class pulsar.PulsarException

Base class of all Pulsar Exception

class pulsar.AlreadyCalledError

Raised when a Deferred instance receives more than one Deferred.callback().

Internals

System info

pulsar.utils.system.system_info(pid)[source]

Returns a dictionary of system information for the process with id pid. It uses the psutil module for the purpose. If psutil is not available it returns an empty dictionary.

checkarity

pulsar.utils.tools.checkarity(func, args, kwargs, discount=0)

Check if arguments respect a given function arity and return a error message if the check did not pass, otherwise it returns None.

Parameters:
  • func – the function.
  • args – function arguments.
  • kwargs – function key-valued parameters.
  • discount – optional integer which discount the number of positional argument to check. Default 0.