Actors API

For an overview of pulsar Actor check out the design documentation.

High Level Functions

spawn

pulsar.async.actor.spawn(**kwargs)[source]

Spawn a new Actor and return an Future.

Parameter kwargs

These optional parameters are:

  • aid the actor id
  • name the actor name
  • actor hooks such as start, stopping and periodic_task
Returns:an Future.

A typical usage:

>>> def do_something(actor):
        ...
>>> a = spawn(start=do_something, ...)
>>> a.aid
'ba42b02b'
>>> a.called
True
>>> p = a.result()
>>> p.address
('127.0.0.1', 46691)

send

pulsar.async.actor.send(target, action, *args, **params)[source]

Send a message to target

The message is to perform a given action. The actor sending the message is obtained via the get_actor() function.

Parameters:
Returns:

an Future if the action acknowledge the caller or None.

Typical example:

>>> r = send(p,'ping')
>>> r.result()
'pong'

get_actor

pulsar.async.actor.get_actor()[source]

Returns the Actor controlling the current thread. Returns None if no actor is available.

arbiter

pulsar.async.concurrency.arbiter(**params)[source]

Obtain the arbiter.

It returns the arbiter instance only if we are on the arbiter context domain, otherwise it returns nothing.

Actor

At the core of the library we have the Actor class which defines the primitive of pulsar concurrent framework. Actor’s instances communicate with each other via messages in a share-nothing architecture.

class pulsar.async.actor.Actor(impl)[source]

The base class for parallel execution in pulsar.

In computer science, the Actor model is a mathematical model of concurrent computation that treats actors as the universal primitives of computation. In response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received.

The current implementation allows for actors to perform specific tasks such as listening to a socket, acting as http server, consuming a task queue and so forth.

To spawn a new actor:

>>> from pulsar import spawn
>>> a = spawn()
>>> a.is_alive()
True

Here a is actually a reference to the remote actor, it is an ActorProxy.

ATTRIBUTES

name

The name of this Actor.

aid

Unique ID for this Actor.

impl

The Concurrency implementation for this Actor.

_loop

An event loop which listen for input/output events on sockets or socket-like objects. It is the driver of the Actor. If the _loop stops, the Actor stops running and goes out of scope.

mailbox

Used to send and receive actor messages.

address

The socket address for this Actor.mailbox.

proxy

Instance of a ActorProxy holding a reference to this Actor. The proxy is a lightweight representation of the actor which can be shared across different processes (i.e. it is picklable).

state

The actor numeric state.

extra

A dictionary which can be populated with extra parameters useful for other actors. This dictionary is included in the dictionary returned by the info() method. Check the info command for how to obtain information about an actor.

info_state

Current state description string. One of initial, running, stopping, closed and terminated.

next_periodic_task

The asyncio.Handle for the next actor periodic task.

stream

A stream handler to write information messages without using the logger.

monitors

Dictionary of monitors or None

managed_actors

Dictionary of managed actors or None

terminated_actors

Dictionary of terminated actors or None

registered

Dictionary of registered actors or None

start(exit=True)[source]

Called after forking to start the actor’s life.

This is where logging is configured, the mailbox is registered and the _loop is initialised and started. Calling this method more than once does nothing.

send(target, action, *args, **kwargs)[source]

Send a message to target to perform action with given positional args and key-valued kwargs. Returns a coroutine or a Future.

spawn(**params)[source]

Spawn a new actor

stop(exc=None, exit_code=None)[source]

Gracefully stop the Actor.

Implemented by the Concurrency.stop() method of the impl attribute.

actorparams()[source]

Returns a dictionary of parameters for spawning actors.

The disctionary is passed to the spawn method when creating new actors. Fire the on_params actor hook.

is_running()[source]

True if actor is running, that is when the state is equal to ACTOR_STATES.RUN and the loop is running.

started()[source]

True if actor has started.

It does not necessarily mean it is running. Its state is greater or equal ACTOR_STATES.RUN.

after_run()[source]

True when the actor is sopping or has already stopped

closed()[source]

True if actor has exited in an clean fashion.

Its state is ACTOR_STATES.CLOSE.

stopped()[source]

True if actor has exited.

Its state is greater or equal to ACTOR_STATES.CLOSE.

is_arbiter()[source]

True if self is the arbiter

is_monitor()[source]

True if self is a monitor

is_process()[source]

boolean indicating if this is an actor on a child process.

get_actor(aid, check_monitor=True)[source]

Given an actor unique id return the actor proxy.

info()[source]

Return a nested dictionary of information related to the actor status and performance. The dictionary contains the following entries:

  • actor a dictionary containing information regarding the type of actor and its status.
  • events a dictionary of information about the event loop running the actor.
  • extra the extra attribute (you can use it to add stuff).
  • system system info.

This method is invoked when you run the info command from another actor.

Actor Internals

ActorProxy

class pulsar.async.proxy.ActorProxy(impl)[source]

A proxy for a remote Actor.

This is a lightweight class which delegates function calls to the underlying remote object.

It is picklable and therefore can be send from actor to actor using actor message passing.

For example, lets say we have a proxy a, to send a message to it:

from pulsar import send

send(a, 'echo', 'hello there!')

will send the command echo to actor a with parameter "hello there!".

aid

Unique ID for the remote Actor

address

the socket address of the underlying Actor.mailbox.

ActorProxyMonitor

class pulsar.async.proxy.ActorProxyMonitor(impl)[source]

A specialised ActorProxy class.

It contains additional information about the remote underlying Actor. Instances of this class serialise into ActorProxy.

The ActorProxyMonitor is special since it lives in the Arbiter domain and it is used by the Arbiter (or a Monitor) to monitor the state of the spawned actor.

impl

The Concurrency instance for the remote Actor. This dictionary is constantly updated by the remote actor by sending the info message.

info

Dictionary of information regarding the remote Actor

mailbox

This is the connection with the remote actor. It is available once the actor handshake between the actor and the monitor has completed. The mailbox is a server-side MailboxProtocol instance and it is used by the send() function to send messages to the remote actor.

notified

Last time this ActorProxyMonitor was notified by the remote actor.

proxy

The ActorProxy for this monitor.

is_alive()[source]

True if underlying actor is alive.

terminate()[source]

Terminate life of underlying actor.

join(timeout=None)[source]

Wait until the underlying actor terminates.

If timeout is provided, it raises an exception if the timeout is reached.

start()[source]

Start the remote actor.

Messages

Actor communicate with each other via the send() function which uses the via Actor.mailbox attribute of the actor in the current context. When an actor communicate with another remote actor it does so by sending an action to it with positional and/or key-valued arguments. For example:

send(target, 'ping')

will send the ping action to target from the actor in the current context of execution. The above is equivalent to:

get_actor().send(target, 'ping')

Each action is implemented via the command() decorator implemented in the pulsar.async.commands module. A list of standard commands is available in the design documentation.

pulsar command

class pulsar.async.commands.command(ack=True)[source]

Decorator for pulsar command functions.

Parameters:ackTrue if the command acknowledge the sender with a response. Usually is set to True (which is also the default value).

Concurrency

The Concurrency class implements the behaviour of an Actor and therefore allows for decoupling between the Actor abstraction and its implementation (bridge pattern).

Base Concurrency

class pulsar.async.concurrency.Concurrency[source]

Actor Concurrency.

Responsible for the actual spawning of actors according to a concurrency implementation. Instances are picklable and are shared between the Actor and its ActorProxyMonitor. This is an abstract class, derived classes must implement the start method.

Parameters:
  • concurrency – string indicating the concurrency implementation. Valid choices are monitor, process, thread.
  • actor_classActor or one of its subclasses.
  • timeout – timeout in seconds for the actor.
  • kwargs – additional key-valued arguments to be passed to the actor constructor.
actor_class

alias of Actor

create_mailbox(actor, loop)[source]

Create the mailbox for actor.

hand_shake(actor)[source]

Perform the hand shake for actor

The hand shake occurs when the actor is in starting state. It performs the following actions:

  • set the actor as the actor of the current thread
  • bind two additional callbacks to the start event
  • fire the start event

If the hand shake is successful, the actor will eventually results in a running state.

periodic_task(actor, **kw)[source]

Implement the actor period task.

This is an internal method called periodically by the Actor._loop to ping the actor monitor. If successful return a Future called back with the acknowledgement from the monitor.

run_actor(actor)[source]

Start running the actor.

selector()[source]

Return a selector instance.

By default it return nothing so that the best handler for the system is chosen.

setup_event_loop(actor)[source]

Set up the event loop for actor.

spawn(actor, aid=None, **params)[source]

Spawn a new actor from actor.

stop(actor, exc=None, exit_code=None)[source]

Gracefully stop the actor.

Monitor Concurrency

class pulsar.async.concurrency.MonitorConcurrency[source]

Concurrency class for a Monitor.

Monitors live in the main thread of the master process and therefore do not require to be spawned.

periodic_task(monitor, **kw)[source]

Override the Concurrency.periodic_task() to implement the Monitor periodic task.

Arbiter Concurrency

class pulsar.async.concurrency.ArbiterConcurrency[source]

Concurrency implementation for the arbiter

add_monitor(actor, monitor_name, **params)[source]

Add a new monitor.

Parameters:
  • monitor_class – a Monitor class.
  • monitor_name – a unique name for the monitor.
  • kwargs – dictionary of key-valued parameters for the monitor.
Returns:

the Monitor added.

before_start(actor)[source]

Daemonise the system if required.

create_mailbox(actor, loop)[source]

Override Concurrency.create_mailbox() to create the mailbox server.

get_actor(actor, aid, check_monitor=True)[source]

Given an actor unique id return the actor proxy.

periodic_task(actor, **kw)[source]

Override the Concurrency.periodic_task() to implement the Arbiter periodic task.

Constants

Constants used throughout pulsar.

pulsar.async.consts.ACTOR_STATES = {'INACTIVE': 1, 'RUN': 3, 'TERMINATE': 6, 'CLOSE': 5, 'STOPPING': 4, 'INITIAL': 0, 'DESCRIPTION': {0: 'initial', 1: 'inactive', 2: 'starting', 3: 'running', 4: 'stopping', 5: 'closed', 6: 'terminated'}, 'STARTING': 2}

Actor state constants are access via:

from pulsar import ACTOR_STATES

They are:

  • ACTOR_STATES.INITIAL = 0 when an actor is just created, before the pulsar.Actor.start method is called.
  • ACTOR_STATES.STARTING = 2 when pulsar.Actor.start method is called.
  • ACTOR_STATES.RUN = 3 when pulsar.Actor._loop is up and running.
  • ACTOR_STATES.STOPPING = 4 when pulsar.Actor.stop has been called for the first time and the actor is running.
pulsar.async.consts.ACTOR_ACTION_TIMEOUT = 5

Important constant used by pulsar.Monitor to kill actors which don’t respond to the stop command.

pulsar.async.consts.MONITOR_TASK_PERIOD = 1

Interval for pulsar.Monitor and pulsar.Arbiter periodic task.