Previous topic

util.svm

This Page

util.zmq_cluster

Classes and functions for interacting with a cluster of compute nodes using ZeroMQ sockets.

Note

In this module, a type is said to be socket-like if it is either a zmq.socket or a FutureSocket.

class glimpse.util.zmq_cluster.BasicSink(context, result_receiver, command_receiver=None, receiver_timeout=None)

Bases: object

Collect results from worker nodes on the cluster.

CMD_KILL = 'CLUSTER_SINK_KILL'

Send this to the command socket to shut down the sink.

Receive(num_results=None, timeout=None)

Get an iterator over the set of result objects.

Results will be available on the iterator as they arrive at the sink. This method raises a ReceiverTimeoutException if no result arrives in the given time period.

Parameters:
  • num_results (int) – Return after a fixed number of results have arrived.
  • timeout (int) – Time to wait for each result (in milliseconds).
Return type:

iterator

static SendKillCommand(context, command_sender)

Send the quit command to the sink.

Parameters:command_sender (socket-like) – Channel on which to send the command.
Setup()

Initialize the sink.

Shutdown()

Shutdown the sink.

command_receiver = None

(socket-like, optional) Constructor for the command socket.

result_receiver = None

(socket-like, required) Constructor for the reader socket.

class glimpse.util.zmq_cluster.BasicVentilator(context, request_sender, worker_connect_delay=None)

Bases: object

Push tasks to worker nodes on the cluster.

Send(requests)

Send a set of task requests to the worker nodes.

Parameters:requests (iterable) – Task requests to send.
Returns:The number of sent tasks.
Return type:int
Setup()

Initialize the ventilator.

Shutdown()

Shutdown the ventilator.

request_sender = None

(socket-like, required) Constructor for the writer socket.

class glimpse.util.zmq_cluster.BasicWorker(context, request_receiver, result_sender, command_receiver=None, receiver_timeout=None)

Bases: object

A cluster worker.

CMD_KILL = 'CLUSTER_WORKER_KILL'

Send this to the command socket to shut down the worker.

HandleCommand(command)

Event handler for an incoming command.

HandleRequest(request)

Event handler for an incoming task request.

Run()

Handle incoming requests, and watch for quit commands.

static SendKillCommand(context, command_sender, command=None)

Send a kill command to all workers on a given channel.

Parameters:
  • command_sender (socket-like) – The channel on which to send the command.
  • command – Message to send to workers. Defaults to CMD_KILL.
Setup()

Initialize the worker.

class glimpse.util.zmq_cluster.ClusterRequest(payload=None, metadata=None)

Bases: object

A cluster request, corresponding to the input value of a callback.

metadata = None

Optional information associated with the request. This information is copied to the result object.

payload = None

Input values for the task.

class glimpse.util.zmq_cluster.ClusterResult(status=None, payload=None, request_metadata=None, metadata=None, exception=None)

Bases: object

A cluster result, corresponding to the output value of a callback when applied to one input element.

STATUS_FAIL = 'FAIL'

Indicates that error occurred while processing request.

STATUS_SUCCESS = 'OK'

Indicates that the request was processed successfully.

exception = None

The exception that occurrred during processing, if any.

metadata = None

Optional information associated with result.

payload = None

Output corresponding to task’s input elements. This will either be a list in the case of a map operation, or a scalar in the case of a reduce operation.

request_metadata = None

Optional information that was associated with request.

status = None

Whether the input elements were processed successfully.

glimpse.util.zmq_cluster.Connect

alias of FutureSocket

class glimpse.util.zmq_cluster.FutureSocket(url=None, type=None, bind=False, options=None)

Bases: object

Describes the options needed to connect/bind a ZMQ socket to an end-point in the future.

MakeSocket(context, url=None, type=None, bind=None, options=None, pre_delay=None, post_delay=None)

Create the socket.

Arguments take precendence over their corresponding object attributes.

bind = False

(bool, optional) Whether this socket connects or binds.

options = None

(dict, optional) A dictionary of socket options (see zmq.setsockopt()).

post_delay = None

(int, optional) Amount of time (in seconds) to wait after connecting/binding the socket.

pre_delay = None

(int, optional) Amount of time (in seconds) to wait before connecting/binding the socket.

type = None

(optional) The type of socket to create.

url = None

(str, required) The URL passed to the connect() and bind() methods of the created socket.

glimpse.util.zmq_cluster.InitSocket(context, connect_or_socket, type=None, **kwargs)

Initialize a socket.

Parameters:
  • context (zmq.Context) – The ZeroMQ context object with which to associate this socket.
  • connect_or_socket (FutureSocket or zmq.socket) – The socket to initialize. If this is a FutureSocket, the corresponding ZeroMQ socket is constructed. If this is an existing ZeroMQ socket, its type is checked against the type argument (if present).
  • type – Type of the new socket.
  • kwargs – Arguments passed to the FutureSocket constructor.
Returns:

The ZeroMQ socket.

glimpse.util.zmq_cluster.LaunchForwarderDevice(context, frontend_connect, backend_connect)

Launch a ZeroMQ forwarder device.

Parameters:
  • context (zmq.Context) – The ZeroMQ context for the channels.
  • frontend_connect (FutureSocket) – The frontend channel.
  • backend_connect (FutureSocket) – The backend channel.

See also

zmq.device()

glimpse.util.zmq_cluster.LaunchStreamerDevice(context, frontend_connect, backend_connect)

Launch a ZeroMQ streamer device.

Parameters:
  • context (zmq.Context) – The ZeroMQ context for the channels.
  • frontend_connect (FutureSocket) – The frontend channel.
  • backend_connect (FutureSocket) – The backend channel.

See also

zmq.device()

glimpse.util.zmq_cluster.MakeSocket(context, url, type, bind=False, options=None)

This is a shortcut for constructing a FutureSocket and calling its MakeSocket method.

exception glimpse.util.zmq_cluster.ReceiverTimeoutException

Bases: exceptions.Exception

Indicates that a ZMQ recv() command timed out.

class glimpse.util.zmq_cluster.Sink(context, result_receiver, command_receiver=None, receiver_timeout=None)

Bases: glimpse.util.zmq_cluster.BasicSink

A sink for task results on the cluster.

Receive(num_results=None, timeout=None, metadata=False)

Get an iterator over the set of results sent to this sink.

Parameters:
  • num_results (int) – Return after a fixed number of results have arrived.
  • timeout (int) – Time to wait for each result (in milliseconds).
  • metadata (bool) – Wheter to return (request and result) metadata with each result.
glimpse.util.zmq_cluster.SocketTypeToString(type)

Get a textual representation of a socket type ID.

class glimpse.util.zmq_cluster.Ventilator(context, request_sender, worker_connect_delay=None)

Bases: glimpse.util.zmq_cluster.BasicVentilator

A ventilator for task requests on the cluster.

Send(requests, metadata=None)

Send requests to worker nodes.

Parameters:
  • requests (iterable) – Callback arguments.
  • metadata (iterable) – Metadata objects corresponding to the requests.
exception glimpse.util.zmq_cluster.WorkerException

Bases: exceptions.Exception

Indicates that a worker node reported an exception while processing a request.

worker_exception = None

The exception object thrown in the worker process.