Documentation for pulsar 0.8.1. For development docs, go here.

Design

Pulsar implements two layers of components on top of python asyncio module:

  • The actor layer provides parallel execution in processes and threads and uses the asyncio module as building block.
  • The second layer, built on top of the first one, is based on the higher level Application class.

Async Objects

Introduced in pulsar 0.8, an asynchronous object is any instance which exposes the _loop attribute. This attribute is the event loop where the instance performs its asynchronous operations, whatever they may be.

For example this is a class for valid async objects:

from pulsar import get_event_loop, new_event_loop


class SimpleAsyncObject:

    def __init__(self, loop=None):
        self._loop = loop or get_event_loop() or new_event_loop()

Properties:

Note

An async object can also run its asynchronous methods in a synchronous fashion. To do that, one should pass a bright new event loop during initialisation. Check synchronous components for further details.

Actors

An Actor is the atom of pulsar’s concurrent computation, they do not share state between them, communication is achieved via asynchronous inter-process message passing, implemented using the standard python socket library. A pulsar actor can be process based as well as thread based and can perform one or many activities.

The theory

The actor model is the cornerstone of the Erlang programming language. Python has very few implementation and all of them seem quite limited in scope.

The Actor model in computer science is a mathematical model of concurrent computation that treats “actors” as the universal primitives of concurrent digital computation: in response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received.

—Wikipedia

Actor’s properties

  • Each actor has its own process (not intended as an OS process) and they don’t shares state between them.
  • Actors can change their own states.
  • Actors can create other actors and when they do that they receive back the new actor address.
  • Actors exchange messages in an asynchronous fashion.

Why would one want to use an actor-based system?

  • No shared memory and therefore locking is not required.
  • Race conditions greatly reduced.
  • It greatly simplify the control flow of a program, each actor has its own process (flow of control).
  • Easy to distribute, across cores, across program boundaries, across machines.
  • It simplifies error handling code.
  • It makes it easier to build fault-tolerant systems.

The Arbiter

When using pulsar actor layer, you need to use pulsar in server state, that is to say, there will be a centralised Arbiter controlling the main event loop in the main thread of the master process. The arbiter is a specialised Actor which control the life of all Actor and Monitor.

To access the Arbiter, from the main process, one can use the arbiter() high level function:

>>> arbiter = pulsar.arbiter()
>>> arbiter.is_running()
False

Implementation

An actor can be processed based (default) or thread based and control at least one running event loop. To obtain the actor controlling the current thread:

actor = pulsar.get_actor()

When a new processed-based actor is created, a new process is started and the actor takes control of the main thread of that new process. On the other hand, thread-based actors always exist in the master process (the same process as the arbiter) and control threads other than the main thread.

An Actor can control more than one thread if it needs to, via the thread_pool as explained in the CPU bound paragraph. The actor event loop is installed in all threads controlled by the actor so that when the get_event_loop() function is invoked on these threads it returns the event loop of the controlling actor.

Note

Regardless of the type of concurrency, an actor always controls at least one thread, the actor io thread. In the case of process-based actors this thread is the main thread of the actor process.

An actor is a async object and therefore it has a _loop attribute, which can be used to register handlers on file descriptors. The Actor._loop is created just after forking (or after the actor’s thread starts for thread-based actors).

IO-bound

The most common usage for an Actor is to handle Input/Output events on file descriptors. An Actor._loop tells the operating system (through epoll or select) that it should be notified when a new connection is made, and then it goes to sleep. Serving the new request should occur as fast as possible so that other connections can be served simultaneously.

CPU-bound

Another way for an actor to function is to use its executor to perform CPU intensive operations, such as calculations, data manipulation or whatever you need them to do. CPU-bound Actor have the following properties:

  • Their Actor._loop listen for requests on file descriptors as usual and it is running (and installed) in the actor io thread as usual.
  • The threads in the executor() install an additional event loop which listen for events on a message queue. Pulsar refers to this specialised event loop as the request loop and it is an instance of QueueEventLoop.

Note

A CPU-bound actor controls more than one thread, the IO thread which runs the actor main event loop for listening to events on file descriptors and one or more threads for performing CPU-intensive calculations. These CPU-threads have installed two events loops: the event loop running on the IO thread and the request-loop.

The Actor.thread_pool needs to be initialised via the Actor.create_thread_pool method before it can be used.

Periodic task

Each Actor, including the Arbiter and Monitor, perform one crucial periodic task at given intervals. The next call of the task is stored in the Actor.next_periodic_task attribute.

Periodic task are implemented by the Concurrency.periodic_task method.

Spawning

Spawning a new actor is achieved via the spawn() function:

from pulsar import spawn

class PeriodicTask:

    def __call__(self, actor):
        actor.event_loop.call_repeatedly(2, self.task)

    def task(self):
        # do something useful here
        ...

ap = spawn(start=PeriodicTask())

The valued returned by spawn() is an ActorProxyFuture instance, a specialised Future, which has the spawned actor id aid and it is called back once the remote actor has started. The callback will be an ActorProxy, a lightweight proxy for the remote actor.

When spawning from an actor other than the arbiter, the workflow of the spawn() function is as follow:

  • send() a message to the arbiter to spawn a new actor.
  • The arbiter spawn the actor and wait for the actor’s hand shake. Once the hand shake is done, it sends the response (the ActorProxy of the spawned actor) to the original actor.

Handshake

The actor hand-shake is the mechanism with which an Actor register its mailbox address with its manager. The actor manager is either a Monitor or the Arbiter depending on which spawned the actor.

The handshake occurs when the monitor receives, for the first time, the actor notify message.

For the curious, the handshake is responsible for setting the ActorProxyMonitor.mailbox attribute.

If the hand-shake fails, the spawned actor will eventually stop.

Hooks

An Actor exposes three one time events which can be used to customise its behaviour and two many times event used when accessing actor information and when the actor spawn other actors. Hooks are passed as key-valued parameters to the spawn() function.

start

Fired just after the actor has received the hand-shake from its monitor. This hook can be used to setup the application and register event handlers. For example, the socket server application creates the server and register its file descriptor with the Actor._loop.

This snippet spawns a new actor which starts an Echo server:

from functools import partial

from pulsar import spawn, TcpServer

def create_echo_server(address, actor, _):
    '''Starts an echo server on a newly spawn actor'''
    server = TcpServer(actor.event_loop, address[0], address[1],
                       EchoServerProtocol)
    yield server.start_serving()
    actor.servers['echo'] = server
    actor.extra['echo-address'] = server.address

proxy = spawn(start=partial(create_echo_server, 'localhost:9898'))

The EchoServerProtocol is introduced in the echo server and client tutorial.

stopping

Fired when the Actor starts stopping.

stop

Fired just before the Actor is garbage collected

Important

start, stopping and stop hooks are function accepting one parameter only, the actor which invokes them. They are one time events for actors.

on_info

Fired every time the actor status information is accessed via the info command:

def extra_info(actor, info=None):
    info['message'] = 'Hello'

proxy = spawn(on_info=extra_info)

The hook must accept the actor as first parameter and the key-valued parameter info (a dictionary).

on_params

Fired every time an actor is about to spawn another actor. It can be used to add additional key-valued parameters passed to the spawn() function.

Commands

An Actor communicates with another remote Actor by sending an action to perform. This action takes the form of a command name and optional positional and key-valued parameters. It is possible to add new commands via the command decorator as explained in the api documentation.

ping

Ping the remote actor abcd and receive an asynchronous pong:

send('abcd', 'ping')

echo

received an asynchronous echo from a remote actor abcd:

send('abcd', 'echo', 'Hello!')

info

Request information about a remote actor abcd:

send('abcd', 'info')

The asynchronous result will be called back with the dictionary returned by the Actor.info() method.

notify

This message is used periodically by actors, to notify their manager. If an actor fails to notify itself on a regular basis, its manager will shut it down. The first notify message is sent to the manager as soon as the actor is up and running so that the handshake can occur.

run

Run a function on a remote actor. The function must accept actor as its initial parameter:

def dosomething(actor, *args, **kwargs):
    ...

send('monitor', 'run', dosomething, *args, **kwargs)

stop

Tell the remote actor abc to gracefully shutdown:

send('abc', 'stop')

Monitors

Exceptions

There are two categories of exceptions in Python: those that derive from the Exception class and those that derive from BaseException. Exceptions deriving from Exception will generally be caught and handled appropriately; for example, they will be passed through by a Future, and they will be logged and ignored when they occur in a callback.

However, exceptions deriving only from BaseException are never caught, and will usually cause the program to terminate with a traceback. (Examples of this category include KeyboardInterrupt and SystemExit; it is usually unwise to treat these the same as most other exceptions.)

Application Framework

To aid the development of applications running on top of pulsar concurrent framework, the library ships with the Application class.



Table Of Contents

Previous topic

Overview

Next topic

FAQ

This Page