util.zmq_cluster
Classes and functions for interacting with a cluster of compute nodes using
ZeroMQ sockets.
Note
In this module, a type is said to be socket-like if it is either a
zmq.socket or a FutureSocket.
-
class glimpse.util.zmq_cluster.BasicSink(context, result_receiver, command_receiver=None, receiver_timeout=None)
Bases: object
Collect results from worker nodes on the cluster.
-
CMD_KILL = 'CLUSTER_SINK_KILL'
Send this to the command socket to shut down the sink.
-
Receive(num_results=None, timeout=None)
Get an iterator over the set of result objects.
Results will be available on the iterator as they arrive at the sink. This
method raises a ReceiverTimeoutException if no result arrives in the given
time period.
| Parameters: |
- num_results (int) – Return after a fixed number of results have arrived.
- timeout (int) – Time to wait for each result (in milliseconds).
|
| Return type: | iterator
|
-
static SendKillCommand(context, command_sender)
Send the quit command to the sink.
| Parameters: | command_sender (socket-like) – Channel on which to send the command. |
-
Setup()
Initialize the sink.
-
Shutdown()
Shutdown the sink.
-
command_receiver = None
(socket-like, optional) Constructor for the command socket.
-
result_receiver = None
(socket-like, required) Constructor for the reader socket.
-
class glimpse.util.zmq_cluster.BasicVentilator(context, request_sender, worker_connect_delay=None)
Bases: object
Push tasks to worker nodes on the cluster.
-
Send(requests)
Send a set of task requests to the worker nodes.
| Parameters: | requests (iterable) – Task requests to send. |
| Returns: | The number of sent tasks. |
| Return type: | int |
-
Setup()
Initialize the ventilator.
-
Shutdown()
Shutdown the ventilator.
-
request_sender = None
(socket-like, required) Constructor for the writer socket.
-
class glimpse.util.zmq_cluster.BasicWorker(context, request_receiver, result_sender, command_receiver=None, receiver_timeout=None)
Bases: object
A cluster worker.
-
CMD_KILL = 'CLUSTER_WORKER_KILL'
Send this to the command socket to shut down the worker.
-
HandleCommand(command)
Event handler for an incoming command.
-
HandleRequest(request)
Event handler for an incoming task request.
-
Run()
Handle incoming requests, and watch for quit commands.
-
static SendKillCommand(context, command_sender, command=None)
Send a kill command to all workers on a given channel.
| Parameters: |
- command_sender (socket-like) – The channel on which to send the command.
- command – Message to send to workers. Defaults to CMD_KILL.
|
-
Setup()
Initialize the worker.
-
class glimpse.util.zmq_cluster.ClusterRequest(payload=None, metadata=None)
Bases: object
A cluster request, corresponding to the input value of a callback.
-
metadata = None
Optional information associated with the request. This information is
copied to the result object.
-
payload = None
Input values for the task.
-
class glimpse.util.zmq_cluster.ClusterResult(status=None, payload=None, request_metadata=None, metadata=None, exception=None)
Bases: object
A cluster result, corresponding to the output value of a callback when
applied to one input element.
-
STATUS_FAIL = 'FAIL'
Indicates that error occurred while processing request.
-
STATUS_SUCCESS = 'OK'
Indicates that the request was processed successfully.
-
exception = None
The exception that occurrred during processing, if any.
-
metadata = None
Optional information associated with result.
-
payload = None
Output corresponding to task’s input elements. This will either be a
list in the case of a map operation, or a scalar in the case of
a reduce operation.
-
request_metadata = None
Optional information that was associated with request.
-
status = None
Whether the input elements were processed successfully.
-
glimpse.util.zmq_cluster.Connect
alias of FutureSocket
-
class glimpse.util.zmq_cluster.FutureSocket(url=None, type=None, bind=False, options=None)
Bases: object
Describes the options needed to connect/bind a ZMQ socket to an end-point
in the future.
-
MakeSocket(context, url=None, type=None, bind=None, options=None, pre_delay=None, post_delay=None)
Create the socket.
Arguments take precendence over their corresponding object attributes.
-
bind = False
(bool, optional) Whether this socket connects or binds.
-
options = None
(dict, optional) A dictionary of socket options (see
zmq.setsockopt()).
-
post_delay = None
(int, optional) Amount of time (in seconds) to wait after connecting/binding
the socket.
-
pre_delay = None
(int, optional) Amount of time (in seconds) to wait before
connecting/binding the socket.
-
type = None
(optional) The type of socket to create.
-
url = None
(str, required) The URL passed to the connect() and bind()
methods of the created socket.
-
glimpse.util.zmq_cluster.InitSocket(context, connect_or_socket, type=None, **kwargs)
Initialize a socket.
| Parameters: |
- context (zmq.Context) – The ZeroMQ context object with which to associate
this socket.
- connect_or_socket (FutureSocket or zmq.socket) – The socket to initialize. If this is a
FutureSocket, the corresponding ZeroMQ socket is constructed. If
this is an existing ZeroMQ socket, its type is checked against the type
argument (if present).
- type – Type of the new socket.
- kwargs – Arguments passed to the FutureSocket constructor.
|
| Returns: | The ZeroMQ socket.
|
-
glimpse.util.zmq_cluster.LaunchForwarderDevice(context, frontend_connect, backend_connect)
Launch a ZeroMQ forwarder device.
| Parameters: |
- context (zmq.Context) – The ZeroMQ context for the channels.
- frontend_connect (FutureSocket) – The frontend channel.
- backend_connect (FutureSocket) – The backend channel.
|
-
glimpse.util.zmq_cluster.LaunchStreamerDevice(context, frontend_connect, backend_connect)
Launch a ZeroMQ streamer device.
| Parameters: |
- context (zmq.Context) – The ZeroMQ context for the channels.
- frontend_connect (FutureSocket) – The frontend channel.
- backend_connect (FutureSocket) – The backend channel.
|
-
glimpse.util.zmq_cluster.MakeSocket(context, url, type, bind=False, options=None)
This is a shortcut for constructing a FutureSocket and calling its
MakeSocket method.
-
exception glimpse.util.zmq_cluster.ReceiverTimeoutException
Bases: exceptions.Exception
Indicates that a ZMQ recv() command timed out.
-
class glimpse.util.zmq_cluster.Sink(context, result_receiver, command_receiver=None, receiver_timeout=None)
Bases: glimpse.util.zmq_cluster.BasicSink
A sink for task results on the cluster.
-
Receive(num_results=None, timeout=None, metadata=False)
Get an iterator over the set of results sent to this sink.
| Parameters: |
- num_results (int) – Return after a fixed number of results have arrived.
- timeout (int) – Time to wait for each result (in milliseconds).
- metadata (bool) – Wheter to return (request and result) metadata with
each result.
|
-
glimpse.util.zmq_cluster.SocketTypeToString(type)
Get a textual representation of a socket type ID.
-
class glimpse.util.zmq_cluster.Ventilator(context, request_sender, worker_connect_delay=None)
Bases: glimpse.util.zmq_cluster.BasicVentilator
A ventilator for task requests on the cluster.
-
Send(requests, metadata=None)
Send requests to worker nodes.
| Parameters: |
- requests (iterable) – Callback arguments.
- metadata (iterable) – Metadata objects corresponding to the requests.
|
-
exception glimpse.util.zmq_cluster.WorkerException
Bases: exceptions.Exception
Indicates that a worker node reported an exception while processing a
request.
-
worker_exception = None
The exception object thrown in the worker process.