Package ClusterShell :: Module Task :: Class Task
[hide private]
[frames] | no frames]

Class Task

source code



The Task class defines an essential ClusterShell object which aims to
execute commands in parallel and easily get their results.

More precisely, a Task object manages a coordinated (ie. with respect of
its current parameters) collection of independent parallel Worker objects.
See ClusterShell.Worker.Worker for further details on ClusterShell Workers.

Always bound to a specific thread, a Task object acts like a "thread
singleton". So most of the time, and even more for single-threaded
applications, you can get the current task object with the following
top-level Task module function:

    >>> task = task_self()

However, if you want to create a task in a new thread, use:

    >>> task = Task()

To create or get the instance of the task associated with the thread
object thr (threading.Thread):

    >>> task = Task(thread=thr)

To submit a command to execute locally within task, use:

    >>> task.shell("/bin/hostname")

To submit a command to execute to some distant nodes in parallel, use:

    >>> task.shell("/bin/hostname", nodes="tiger[1-20]")

The previous examples submit commands to execute but do not allow result
interaction during their execution. For your program to interact during
command execution, it has to define event handlers that will listen for
local or remote events. These handlers are based on the EventHandler
class, defined in ClusterShell.Event. The following example shows how to
submit a command on a cluster with a registered event handler:

    >>> task.shell("uname -r", nodes="node[1-9]", handler=MyEventHandler())

Run task in its associated thread (will block only if the calling thread is
the task associated thread):

    >>> task.resume()
or:

    >>> task.run()

You can also pass arguments to task.run() to schedule a command exactly
like in task.shell(), and run it:

    >>> task.run("hostname", nodes="tiger[1-20]", handler=MyEventHandler())

A common need is to set a maximum delay for command execution, especially
when the command time is not known. Doing this with ClusterShell Task is
very straighforward. To limit the execution time on each node, use the
timeout parameter of shell() or run() methods to set a delay in seconds,
like:

    >>> task.run("check_network.sh", nodes="tiger[1-20]", timeout=30)

You can then either use Task's iter_keys_timeout() method after execution
to see on what nodes the command has timed out, or listen for ev_timeout()
events in your event handler.

To get command result, you can either use Task's iter_buffers() method for
standard output, iter_errors() for standard error after command execution
(common output contents are automatically gathered), or you can listen for
ev_read() and ev_error() events in your event handler and get live command
output.

To get command return codes, you can either use Task's iter_retcodes(),
node_retcode() and max_retcode() methods after command execution, or
listen for ev_hup() events in your event handler.

Nested Classes [hide private]
  _SyncMsgHandler
Special task control port event handler.
  tasksyncmethod
Class encapsulating a function that checks if the calling task is running or is the current task, and allowing it to be used as a decorator making the wrapped task method thread-safe.
  _SuspendCondition
Special class to manage task suspend condition.
Instance Methods [hide private]
 
__init__(self, thread=None, defaults=None)
Initialize a Task, creating a new non-daemonic thread if needed.
source code
 
_is_task_self(self)
Private method used by the library to check if the task is task_self(), but do not create any task_self() instance.
source code
 
default_excepthook(self, exc_type, exc_value, tb)
Default excepthook for a newly Task.
source code
 
_excepthook(self, exc_type, exc_value, tb)
Default excepthook for a newly Task.
source code
 
_getexcepthook(self) source code
 
_setexcepthook(self, hook) source code
 
_thread_start(self)
Task-managed thread entry point
source code
 
_run(self, timeout)
Run task (always called from its self thread).
source code
 
_default_tree_is_enabled(self)
Return whether default tree is enabled (load topology_file btw)
source code
 
load_topology(self, topology_file)
Load propagation topology from provided file.
source code
 
_default_router(self) source code
 
default(self, default_key, def_val=None)
Return per-task value for key from the "default" dictionary.
source code
 
set_default(self, default_key, value)
Set task value for specified key in the dictionary "default".
source code
 
info(self, info_key, def_val=None)
Return per-task information.
source code
 
set_info(*args, **kwargs)
Set task value for a specific key information.
source code
 
shell(self, command, **kwargs)
Schedule a shell command for local or distant parallel execution.
source code
 
copy(self, source, dest, nodes, **kwargs)
Copy local file to distant nodes.
source code
 
rcopy(self, source, dest, nodes, **kwargs)
Copy distant file or directory to local node.
source code
 
_add_port(*args, **kwargs)
Add an EnginePort instance to Engine (private method).
source code
 
remove_port(*args, **kwargs)
Close and remove a port from task previously created with port().
source code
 
port(self, handler=None, autoclose=False)
Create a new task port.
source code
 
timer(self, fire, handler, interval=-1.0, autoclose=False)
Create a timer bound to this task that fires at a preset time in the future by invoking the ev_timer() method of `handler' (provided EventHandler object).
source code
 
_add_timer(*args, **kwargs)
Add a timer to task engine (thread-safe).
source code
 
schedule(*args, **kwargs)
Schedule a worker for execution, ie.
source code
 
_resume_thread(self)
Resume task - called from another thread.
source code
 
_resume(self)
Resume task - called from self thread.
source code
 
resume(self, timeout=None)
Resume task.
source code
 
run(self, command=None, **kwargs)
With arguments, it will schedule a command exactly like a Task.shell() would have done it and run it.
source code
 
_suspend_wait(*args, **kwargs)
Suspend request received.
source code
 
suspend(self)
Suspend task execution.
source code
 
_abort(*args, **kwargs)
Abort request received.
source code
 
abort(self, kill=False)
Abort a task.
source code
 
_terminate(self, kill)
Abort completion subroutine.
source code
 
join(self)
Suspend execution of the calling thread until the target task terminates, unless the target task has already terminated.
source code
 
running(self)
Return True if the task is running.
source code
 
_reset(self)
Reset buffers and retcodes management variables.
source code
 
_msgtree(self, sname, strict=True)
Helper method to return msgtree instance by sname if allowed.
source code
 
_msg_add(self, worker, node, sname, msg)
Process a new message into Task's MsgTree that is coming from:
source code
 
_rc_set(self, worker, node, rc)
Add a worker return code (rc) that is coming from a node of a worker instance.
source code
 
_timeout_add(self, worker, node)
Add a timeout indicator that is coming from a node of a worker instance.
source code
 
_msg_by_source(self, worker, node, sname)
Get a message by its worker instance, node and stream name.
source code
 
_call_tree_matcher(self, tree_match_func, match_keys=None, worker=None)
Call identified tree matcher (items, walk) method with options.
source code
 
_rc_by_source(self, worker, node)
Get a return code by worker instance and node.
source code
 
_rc_iter_by_key(self, key)
Return an iterator over return codes for the given key.
source code
 
_rc_iter_by_worker(self, worker, match_keys=None)
Return an iterator over return codes and keys list for a specific worker and optional matching keys.
source code
 
_krc_iter_by_worker(self, worker)
Return an iterator over key, rc for a specific worker.
source code
 
_num_timeout_by_worker(self, worker)
Return the number of timed out "keys" for a specific worker.
source code
 
_iter_keys_timeout_by_worker(self, worker)
Iterate over timed out keys (ie.
source code
 
_flush_buffers_by_worker(self, worker)
Remove any messages from specified worker.
source code
 
_flush_errors_by_worker(self, worker)
Remove any error messages from specified worker.
source code
 
key_buffer(self, key)
Get buffer for a specific key.
source code
 
node_buffer(self, key)
Get buffer for a specific key.
source code
 
key_error(self, key)
Get error buffer for a specific key.
source code
 
node_error(self, key)
Get error buffer for a specific key.
source code
 
key_retcode(self, key)
Return return code for a specific key.
source code
 
node_retcode(self, key)
Return return code for a specific key.
source code
 
max_retcode(self)
Get max return code encountered during last run or None in the following cases: - all commands timed out, - no command was executed.
source code
 
_iter_msgtree(self, sname, match_keys=None)
Helper method to iterate over recorded buffers by sname.
source code
 
iter_buffers(self, match_keys=None)
Iterate over buffers, returns a tuple (buffer, keys).
source code
 
iter_errors(self, match_keys=None)
Iterate over error buffers, returns a tuple (buffer, keys).
source code
 
iter_retcodes(self, match_keys=None)
Iterate over return codes, returns a tuple (rc, keys).
source code
 
num_timeout(self)
Return the number of timed out "keys" (ie.
source code
 
iter_keys_timeout(self)
Iterate over timed out keys (ie.
source code
 
flush_buffers(self)
Flush all task messages (from all task workers).
source code
 
flush_errors(self)
Flush all task error messages (from all task workers).
source code
 
_pchannel(self, gateway, metaworker)
Get propagation channel for gateway (create one if needed).
source code
 
_pchannel_release(self, gateway, metaworker)
Release propagation channel associated to gateway.
source code

Inherited from object: __delattr__, __format__, __getattribute__, __hash__, __reduce__, __reduce_ex__, __repr__, __setattr__, __sizeof__, __str__, __subclasshook__

Class Methods [hide private]
 
wait(cls, from_thread)
Class method that blocks calling thread until all tasks have finished (from a ClusterShell point of view, for instance, their task.resume() return).
source code
Static Methods [hide private]
a new object with type S, a subtype of T
__new__(cls, thread=None, defaults=None)
For task bound to a specific thread, this class acts like a "thread singleton", so new style class is used and new object are only instantiated if needed.
source code
Class Variables [hide private]
  TOPOLOGY_CONFIGS = ['/etc/clustershell/topology.conf', '/home/...
  _tasks = {}
  _taskid_max = 0
  _task_lock = threading.Lock()
Properties [hide private]
  excepthook

Inherited from object: __class__

Method Details [hide private]

__new__(cls, thread=None, defaults=None)
Static Method

source code 

For task bound to a specific thread, this class acts like a "thread singleton", so new style class is used and new object are only instantiated if needed.

Returns: a new object with type S, a subtype of T
Overrides: object.__new__

__init__(self, thread=None, defaults=None)
(Constructor)

source code 

Initialize a Task, creating a new non-daemonic thread if needed.

Overrides: object.__init__

default_excepthook(self, exc_type, exc_value, tb)

source code 

Default excepthook for a newly Task. When an exception is raised and uncaught on Task thread, excepthook is called, which is default_excepthook by default. Once excepthook overriden, you can still call default_excepthook if needed.

_excepthook(self, exc_type, exc_value, tb)

source code 

Default excepthook for a newly Task. When an exception is raised and uncaught on Task thread, excepthook is called, which is default_excepthook by default. Once excepthook overriden, you can still call default_excepthook if needed.

load_topology(self, topology_file)

source code 

Load propagation topology from provided file.

On success, task.topology is set to a corresponding TopologyTree instance.

On failure, task.topology is left untouched and a TopologyError exception is raised.

default(self, default_key, def_val=None)

source code 

Return per-task value for key from the "default" dictionary. See set_default() for a list of reserved task default_keys.

set_default(self, default_key, value)

source code 

Set task value for specified key in the dictionary "default". Users may store their own task-specific key, value pairs using this method and retrieve them with default().

Task default_keys are:

  • "stderr": Boolean value indicating whether to enable stdout/stderr separation when using task.shell(), if not specified explicitly (default: False).
  • "stdout_msgtree": Whether to instantiate standard output MsgTree for automatic internal gathering of result messages coming from Workers (default: True).
  • "stderr_msgtree": Same for stderr (default: True).
  • "engine": Used to specify an underlying Engine explicitly (default: "auto").
  • "port_qlimit": Size of port messages queue (default: 32).
  • "worker": Worker-based class used when spawning workers through shell()/run().

Threading considerations

Unlike set_info(), when called from the task's thread or not, set_default() immediately updates the underlying dictionary in a thread-safe manner. This method doesn't wake up the engine when called.

info(self, info_key, def_val=None)

source code 

Return per-task information. See set_info() for a list of reserved task info_keys.

set_info(*args, **kwargs)

source code 

Set task value for a specific key information. Key, value pairs can be passed to the engine and/or workers. Users may store their own task-specific info key, value pairs using this method and retrieve them with info().

The following example changes the fanout value to 128:

>>> task.set_info('fanout', 128)

The following example enables debug messages:

>>> task.set_info('debug', True)

Task info_keys are:

  • "debug": Boolean value indicating whether to enable library debugging messages (default: False).
  • "print_debug": Debug messages processing function. This function takes 2 arguments: the task instance and the message string (default: an internal function doing standard print).
  • "fanout": Max number of registered clients in Engine at a time (default: 64).
  • "grooming_delay": Message maximum end-to-end delay requirement used for traffic grooming, in seconds as float (default: 0.5).
  • "connect_timeout": Time in seconds to wait for connecting to remote host before aborting (default: 10).
  • "command_timeout": Time in seconds to wait for a command to complete before aborting (default: 0, which means unlimited).

Threading considerations

Unlike set_default(), the underlying info dictionary is only modified from the task's thread. So calling set_info() from another thread leads to queueing the request for late apply (at run time) using the task dispatch port. When received, the request wakes up the engine when the task is running and the info dictionary is then updated.

Decorators:
  • @tasksyncmethod()

shell(self, command, **kwargs)

source code 

Schedule a shell command for local or distant parallel execution. This essential method creates a local or remote Worker (depending on the presence of the nodes parameter) and immediately schedules it for execution in task's runloop. So, if the task is already running (ie. called from an event handler), the command is started immediately, assuming current execution contraintes are met (eg. fanout value). If the task is not running, the command is not started but scheduled for late execution. See resume() to start task runloop.

The following optional parameters are passed to the underlying local or remote Worker constructor:

  • handler: EventHandler instance to notify (on event) -- default is no handler (None)
  • timeout: command timeout delay expressed in second using a floating point value -- default is unlimited (None)
  • autoclose: if set to True, the underlying Worker is automatically aborted as soon as all other non-autoclosing task objects (workers, ports, timers) have finished -- default is False
  • stderr: separate stdout/stderr if set to True -- default is False.

Local usage:

   task.shell(command [, key=key] [, handler=handler]
         [, timeout=secs] [, autoclose=enable_autoclose]
         [, stderr=enable_stderr])

Distant usage:

   task.shell(command, nodes=nodeset [, handler=handler]
         [, timeout=secs], [, autoclose=enable_autoclose]
         [, tree=None|False|True] [, remote=False|True]
         [, stderr=enable_stderr])

Example:

>>> task = task_self()
>>> task.shell("/bin/date", nodes="node[1-2345]")
>>> task.resume()

_add_port(*args, **kwargs)

source code 

Add an EnginePort instance to Engine (private method).

Decorators:
  • @tasksyncmethod()

remove_port(*args, **kwargs)

source code 

Close and remove a port from task previously created with port().

Decorators:
  • @tasksyncmethod()

port(self, handler=None, autoclose=False)

source code 

Create a new task port. A task port is an abstraction object to deliver messages reliably between tasks.

Basic rules:

  • A task can send messages to another task port (thread safe).
  • A task can receive messages from an acquired port either by setting up a notification mechanism or using a polling mechanism that may block the task waiting for a message sent on the port.
  • A port can be acquired by one task only.

If handler is set to a valid EventHandler object, the port is a send-once port, ie. a message sent to this port generates an ev_msg event notification issued the port's task. If handler is not set, the task can only receive messages on the port by calling port.msg_recv().

timer(self, fire, handler, interval=-1.0, autoclose=False)

source code 

Create a timer bound to this task that fires at a preset time in the future by invoking the ev_timer() method of `handler' (provided EventHandler object). Timers can fire either only once or repeatedly at fixed time intervals. Repeating timers can also have their next firing time manually adjusted.

The mandatory parameter `fire' sets the firing delay in seconds.

The optional parameter `interval' sets the firing interval of the timer. If not specified, the timer fires once and then is automatically invalidated.

Time values are expressed in second using floating point values. Precision is implementation (and system) dependent.

The optional parameter `autoclose', if set to True, creates an "autoclosing" timer: it will be automatically invalidated as soon as all other non-autoclosing task's objects (workers, ports, timers) have finished. Default value is False, which means the timer will retain task's runloop until it is invalidated.

Return a new EngineTimer instance.

See ClusterShell.Engine.Engine.EngineTimer for more details.

_add_timer(*args, **kwargs)

source code 

Add a timer to task engine (thread-safe).

Decorators:
  • @tasksyncmethod()

schedule(*args, **kwargs)

source code 

Schedule a worker for execution, ie. add worker in task running loop. Worker will start processing immediately if the task is running (eg. called from an event handler) or as soon as the task is started otherwise. Only useful for manually instantiated workers, for example:

>>> task = task_self()
>>> worker = WorkerSsh("node[2-3]", None, 10, command="/bin/ls")
>>> task.schedule(worker)
>>> task.resume()
Decorators:
  • @tasksyncmethod()

resume(self, timeout=None)

source code 

Resume task. If task is task_self(), workers are executed in the calling thread so this method will block until all (non-autoclosing) workers have finished. This is always the case for a single-threaded application (eg. which doesn't create other Task() instance than task_self()). Otherwise, the current thread doesn't block. In that case, you may then want to call task_wait() to wait for completion.

Warning: the timeout parameter can be used to set an hard limit of task execution time (in seconds). In that case, a TimeoutError exception is raised if this delay is reached. Its value is 0 by default, which means no task time limit (TimeoutError is never raised). In order to set a maximum delay for individual command execution, you should use Task.shell()'s timeout parameter instead.

run(self, command=None, **kwargs)

source code 

With arguments, it will schedule a command exactly like a Task.shell() would have done it and run it. This is the easiest way to simply run a command.

>>> task.run("hostname", nodes="foo")

Without argument, it starts all outstanding actions. It behaves like Task.resume().

>>> task.shell("hostname", nodes="foo")
>>> task.shell("hostname", nodes="bar")
>>> task.run()

When used with a command, you can set a maximum delay of individual command execution with the help of the timeout parameter (see Task.shell's parameters). You can then listen for ev_timeout() events in your Worker event handlers, or use num_timeout() or iter_keys_timeout() afterwards. But, when used as an alias to Task.resume(), the timeout parameter sets an hard limit of task execution time. In that case, a TimeoutError exception is raised if this delay is reached.

_suspend_wait(*args, **kwargs)

source code 

Suspend request received.

Decorators:
  • @tasksyncmethod()

suspend(self)

source code 

Suspend task execution. This method may be called from another task (thread-safe). The function returns False if the task cannot be suspended (eg. it's not running), or returns True if the task has been successfully suspended. To resume a suspended task, use task.resume().

_abort(*args, **kwargs)

source code 

Abort request received.

Decorators:
  • @tasksyncmethod()

abort(self, kill=False)

source code 

Abort a task. Aborting a task removes (and stops when needed) all workers. If optional parameter kill is True, the task object is unbound from the current thread, so calling task_self() creates a new Task object.

_msg_add(self, worker, node, sname, msg)

source code 

Process a new message into Task's MsgTree that is coming from:

  • a worker instance of this task
  • a node
  • a stream name sname (string identifier)

_iter_keys_timeout_by_worker(self, worker)

source code 

Iterate over timed out keys (ie. nodes) for a specific worker.

key_buffer(self, key)

source code 

Get buffer for a specific key. When the key is associated to multiple workers, the resulting buffer will contain all workers content that may overlap. This method returns an empty buffer if key is not found in any workers.

node_buffer(self, key)

source code 

Get buffer for a specific key. When the key is associated to multiple workers, the resulting buffer will contain all workers content that may overlap. This method returns an empty buffer if key is not found in any workers.

key_error(self, key)

source code 

Get error buffer for a specific key. When the key is associated to multiple workers, the resulting buffer will contain all workers content that may overlap. This method returns an empty error buffer if key is not found in any workers.

node_error(self, key)

source code 

Get error buffer for a specific key. When the key is associated to multiple workers, the resulting buffer will contain all workers content that may overlap. This method returns an empty error buffer if key is not found in any workers.

key_retcode(self, key)

source code 

Return return code for a specific key. When the key is associated to multiple workers, return the max return code from these workers. Raises a KeyError if key is not found in any finished workers.

node_retcode(self, key)

source code 

Return return code for a specific key. When the key is associated to multiple workers, return the max return code from these workers. Raises a KeyError if key is not found in any finished workers.

max_retcode(self)

source code 

Get max return code encountered during last run
    or None in the following cases:
        - all commands timed out,
        - no command was executed.

How retcodes work
=================
  If the process exits normally, the return code is its exit
  status. If the process is terminated by a signal, the return
  code is 128 + signal number.

iter_buffers(self, match_keys=None)

source code 

Iterate over buffers, returns a tuple (buffer, keys). For remote workers (Ssh), keys are list of nodes. In that case, you should use NodeSet.fromlist(keys) to get a NodeSet instance (which is more convenient and efficient):

Optional parameter match_keys add filtering on these keys.

Usage example:

>>> for buffer, nodelist in task.iter_buffers():
...     print NodeSet.fromlist(nodelist)
...     print buffer

iter_errors(self, match_keys=None)

source code 

Iterate over error buffers, returns a tuple (buffer, keys).

See iter_buffers().

iter_retcodes(self, match_keys=None)

source code 

Iterate over return codes, returns a tuple (rc, keys).

Optional parameter match_keys add filtering on these keys.

How retcodes work

If the process exits normally, the return code is its exit status. If the process is terminated by a signal, the return code is 128 + signal number.

num_timeout(self)

source code 

Return the number of timed out "keys" (ie. nodes).

iter_keys_timeout(self)

source code 

Iterate over timed out keys (ie. nodes).

wait(cls, from_thread)
Class Method

source code 

Class method that blocks calling thread until all tasks have finished (from a ClusterShell point of view, for instance, their task.resume() return). It doesn't necessarly mean that associated threads have finished.

_pchannel(self, gateway, metaworker)

source code 
Get propagation channel for gateway (create one if needed).

Use self.gateways dictionary that allows lookup like:
    gateway => (worker channel, set of metaworkers)

_pchannel_release(self, gateway, metaworker)

source code 

Release propagation channel associated to gateway.

Lookup by gateway, decref associated metaworker set and release channel worker if needed.


Class Variable Details [hide private]

TOPOLOGY_CONFIGS

Value:
['/etc/clustershell/topology.conf',
 '/home/sthiell/.local/etc/clustershell/topology.conf',
 '/home/sthiell/.config/clustershell/topology.conf']

Property Details [hide private]

excepthook

Get Method:
_getexcepthook(self)
Set Method:
_setexcepthook(self, hook)