1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """
22 ClusterShell Task module.
23
24 Simple example of use:
25
26 >>> from ClusterShell.Task import task_self, NodeSet
27 >>>
28 >>> # get task associated with calling thread
29 ... task = task_self()
30 >>>
31 >>> # add a command to execute on distant nodes
32 ... task.shell("/bin/uname -r", nodes="tiger[1-30,35]")
33 <ClusterShell.Worker.Ssh.WorkerSsh object at 0x7f41da71b890>
34 >>>
35 >>> # run task in calling thread
36 ... task.run()
37 >>>
38 >>> # get results
39 ... for output, nodelist in task.iter_buffers():
40 ... print '%s: %s' % (NodeSet.fromlist(nodelist), output)
41 ...
42
43 """
44
45 from itertools import imap
46 import logging
47 from operator import itemgetter
48 import os
49 import socket
50 import sys
51 import threading
52 from time import sleep
53 import traceback
54
55 from ClusterShell.Defaults import config_paths, DEFAULTS
56 from ClusterShell.Defaults import _local_workerclass, _distant_workerclass
57 from ClusterShell.Engine.Engine import EngineAbortException
58 from ClusterShell.Engine.Engine import EngineTimeoutException
59 from ClusterShell.Engine.Engine import EngineAlreadyRunningError
60 from ClusterShell.Engine.Engine import EngineTimer
61 from ClusterShell.Engine.Factory import PreferredEngine
62 from ClusterShell.Worker.EngineClient import EnginePort
63 from ClusterShell.Worker.Popen import WorkerPopen
64 from ClusterShell.Worker.Tree import WorkerTree
65 from ClusterShell.Worker.Worker import FANOUT_UNLIMITED
66
67 from ClusterShell.Event import EventHandler
68 from ClusterShell.MsgTree import MsgTree
69 from ClusterShell.NodeSet import NodeSet
70
71 from ClusterShell.Topology import TopologyParser, TopologyError
72 from ClusterShell.Propagation import PropagationTreeRouter, PropagationChannel
76 """Base task exception."""
77
79 """Base task error exception."""
80
82 """Raised when the task timed out."""
83
85 """Raised when trying to resume an already running task."""
86
88 """Raised when trying to access disabled MsgTree."""
89
92 """Get short hostname (host name cut at the first dot)"""
93 return socket.gethostname().split('.')[0]
94
95
96 -class Task(object):
97 """
98 The Task class defines an essential ClusterShell object which aims to
99 execute commands in parallel and easily get their results.
100
101 More precisely, a Task object manages a coordinated (ie. with respect of
102 its current parameters) collection of independent parallel Worker objects.
103 See ClusterShell.Worker.Worker for further details on ClusterShell Workers.
104
105 Always bound to a specific thread, a Task object acts like a "thread
106 singleton". So most of the time, and even more for single-threaded
107 applications, you can get the current task object with the following
108 top-level Task module function:
109
110 >>> task = task_self()
111
112 However, if you want to create a task in a new thread, use:
113
114 >>> task = Task()
115
116 To create or get the instance of the task associated with the thread
117 object thr (threading.Thread):
118
119 >>> task = Task(thread=thr)
120
121 To submit a command to execute locally within task, use:
122
123 >>> task.shell("/bin/hostname")
124
125 To submit a command to execute to some distant nodes in parallel, use:
126
127 >>> task.shell("/bin/hostname", nodes="tiger[1-20]")
128
129 The previous examples submit commands to execute but do not allow result
130 interaction during their execution. For your program to interact during
131 command execution, it has to define event handlers that will listen for
132 local or remote events. These handlers are based on the EventHandler
133 class, defined in ClusterShell.Event. The following example shows how to
134 submit a command on a cluster with a registered event handler:
135
136 >>> task.shell("uname -r", nodes="node[1-9]", handler=MyEventHandler())
137
138 Run task in its associated thread (will block only if the calling thread is
139 the task associated thread):
140
141 >>> task.resume()
142 or:
143
144 >>> task.run()
145
146 You can also pass arguments to task.run() to schedule a command exactly
147 like in task.shell(), and run it:
148
149 >>> task.run("hostname", nodes="tiger[1-20]", handler=MyEventHandler())
150
151 A common need is to set a maximum delay for command execution, especially
152 when the command time is not known. Doing this with ClusterShell Task is
153 very straighforward. To limit the execution time on each node, use the
154 timeout parameter of shell() or run() methods to set a delay in seconds,
155 like:
156
157 >>> task.run("check_network.sh", nodes="tiger[1-20]", timeout=30)
158
159 You can then either use Task's iter_keys_timeout() method after execution
160 to see on what nodes the command has timed out, or listen for ev_timeout()
161 events in your event handler.
162
163 To get command result, you can either use Task's iter_buffers() method for
164 standard output, iter_errors() for standard error after command execution
165 (common output contents are automatically gathered), or you can listen for
166 ev_read() and ev_error() events in your event handler and get live command
167 output.
168
169 To get command return codes, you can either use Task's iter_retcodes(),
170 node_retcode() and max_retcode() methods after command execution, or
171 listen for ev_hup() events in your event handler.
172 """
173
174
175 TOPOLOGY_CONFIGS = config_paths('topology.conf')
176
177 _tasks = {}
178 _taskid_max = 0
179 _task_lock = threading.Lock()
180
182 """Special task control port event handler.
183 When a message is received on the port, call appropriate
184 task method."""
186 """Message received: call appropriate task method."""
187
188 func, (args, kwargs) = msg[0], msg[1:]
189
190 func(port.task, *args, **kwargs)
191
193 """Class encapsulating a function that checks if the calling
194 task is running or is the current task, and allowing it to be
195 used as a decorator making the wrapped task method thread-safe."""
197 def taskfunc(*args, **kwargs):
198
199 task, fargs = args[0], args[1:]
200
201 if task._is_task_self():
202 return f(task, *fargs, **kwargs)
203 elif task._dispatch_port:
204
205
206 task._dispatch_port.msg_send((f, fargs, kwargs))
207 else:
208 task.info("print_debug")(task, "%s: dropped call: %s" % \
209 (task, str(fargs)))
210
211
212
213 taskfunc.__name__ = f.__name__
214 taskfunc.__doc__ = f.__doc__
215 taskfunc.__dict__ = f.__dict__
216 taskfunc.__module__ = f.__module__
217 return taskfunc
218
220 """Special class to manage task suspend condition."""
221 - def __init__(self, lock=threading.RLock(), initial=0):
222 self._cond = threading.Condition(lock)
223 self.suspend_count = initial
224
226 """Increase suspend count."""
227 self._cond.acquire()
228 self.suspend_count += 1
229 self._cond.release()
230
232 """Decrease suspend count."""
233 self._cond.acquire()
234 self.suspend_count -= 1
235 self._cond.release()
236
238 """Wait for condition if needed."""
239 self._cond.acquire()
240 try:
241 if self.suspend_count > 0:
242 if release_lock:
243 release_lock.release()
244 self._cond.wait()
245 finally:
246 self._cond.release()
247
249 """Signal all threads waiting for condition."""
250 self._cond.acquire()
251 try:
252 self.suspend_count = min(self.suspend_count, 0)
253 self._cond.notifyAll()
254 finally:
255 self._cond.release()
256
257
258 - def __new__(cls, thread=None, defaults=None):
259 """
260 For task bound to a specific thread, this class acts like a
261 "thread singleton", so new style class is used and new object
262 are only instantiated if needed.
263 """
264 if thread:
265 if thread not in cls._tasks:
266 cls._tasks[thread] = object.__new__(cls)
267 return cls._tasks[thread]
268
269 return object.__new__(cls)
270
271 - def __init__(self, thread=None, defaults=None):
272 """Initialize a Task, creating a new non-daemonic thread if
273 needed."""
274 if not getattr(self, "_engine", None):
275
276 self._default_lock = threading.Lock()
277 if defaults is None:
278 defaults = DEFAULTS
279 self._default = defaults._task_default.copy()
280 self._default.update(
281 {"local_worker": _local_workerclass(defaults),
282 "distant_worker": _distant_workerclass(defaults)})
283 self._info = defaults._task_info.copy()
284
285
286
287 self._engine = PreferredEngine(self.default("engine"), self._info)
288 self.timeout = None
289
290
291 self._run_lock = threading.Lock()
292 self._suspend_lock = threading.RLock()
293
294 self._suspend_cond = Task._SuspendCondition(self._suspend_lock, 1)
295 self._join_cond = threading.Condition(self._suspend_lock)
296 self._suspended = False
297 self._quit = False
298 self._terminated = False
299
300
301 self.topology = None
302 self.router = None
303 self.gateways = {}
304
305
306 self._msgtrees = {}
307
308 self._d_source_rc = {}
309
310 self._d_rc_sources = {}
311
312 self._max_rc = None
313
314 self._timeout_sources = set()
315
316 self._reset()
317
318
319 self._dispatch_port = EnginePort(self,
320 handler=Task._SyncMsgHandler(),
321 autoclose=True)
322 self._engine.add(self._dispatch_port)
323
324
325 Task._task_lock.acquire()
326 Task._taskid_max += 1
327 self._taskid = Task._taskid_max
328 Task._task_lock.release()
329
330
331 self._thread_foreign = bool(thread)
332 if self._thread_foreign:
333 self.thread = thread
334 else:
335 self.thread = thread = \
336 threading.Thread(None,
337 Task._thread_start,
338 "Task-%d" % self._taskid,
339 args=(self,))
340 Task._tasks[thread] = self
341 thread.start()
342
344 """Private method used by the library to check if the task is
345 task_self(), but do not create any task_self() instance."""
346 return self.thread == threading.currentThread()
347
349 """Default excepthook for a newly Task. When an exception is
350 raised and uncaught on Task thread, excepthook is called, which
351 is default_excepthook by default. Once excepthook overriden,
352 you can still call default_excepthook if needed."""
353 print >> sys.stderr, 'Exception in thread %s:' % self.thread
354 traceback.print_exception(exc_type, exc_value, tb, file=sys.stderr)
355
356 _excepthook = default_excepthook
357
359 return self._excepthook
360
362 self._excepthook = hook
363
364
365 if self._thread_foreign:
366 sys.excepthook = self._excepthook
367
368
369
370
371
372 excepthook = property(_getexcepthook, _setexcepthook)
373
375 """Task-managed thread entry point"""
376 while not self._quit:
377 self._suspend_cond.wait_check()
378 if self._quit:
379 break
380 try:
381 self._resume()
382 except:
383 self.excepthook(*sys.exc_info())
384 self._quit = True
385
386 self._terminate(kill=True)
387
388 - def _run(self, timeout):
389 """Run task (always called from its self thread)."""
390
391 if self._run_lock.locked():
392 raise AlreadyRunningError("task is already running")
393
394 try:
395 self._run_lock.acquire()
396 self._engine.run(timeout)
397 finally:
398 self._run_lock.release()
399
401 """Return whether default tree is enabled (load topology_file btw)"""
402 if self.topology is None:
403 for topology_file in self.TOPOLOGY_CONFIGS[::-1]:
404 if os.path.exists(topology_file):
405 self.load_topology(topology_file)
406 break
407 return (self.topology is not None) and self.default("auto_tree")
408
410 """Load propagation topology from provided file.
411
412 On success, task.topology is set to a corresponding TopologyTree
413 instance.
414
415 On failure, task.topology is left untouched and a TopologyError
416 exception is raised.
417 """
418 self.topology = TopologyParser(topology_file).tree(_getshorthostname())
419
421 if self.router is None:
422 self.router = PropagationTreeRouter(str(self.topology.root.nodeset),
423 self.topology)
424 return self.router
425
426 - def default(self, default_key, def_val=None):
427 """
428 Return per-task value for key from the "default" dictionary.
429 See set_default() for a list of reserved task default_keys.
430 """
431 self._default_lock.acquire()
432 try:
433 return self._default.get(default_key, def_val)
434 finally:
435 self._default_lock.release()
436
438 """
439 Set task value for specified key in the dictionary "default".
440 Users may store their own task-specific key, value pairs
441 using this method and retrieve them with default().
442
443 Task default_keys are:
444 - "stderr": Boolean value indicating whether to enable
445 stdout/stderr separation when using task.shell(), if not
446 specified explicitly (default: False).
447 - "stdout_msgtree": Whether to instantiate standard output
448 MsgTree for automatic internal gathering of result messages
449 coming from Workers (default: True).
450 - "stderr_msgtree": Same for stderr (default: True).
451 - "engine": Used to specify an underlying Engine explicitly
452 (default: "auto").
453 - "port_qlimit": Size of port messages queue (default: 32).
454 - "worker": Worker-based class used when spawning workers through
455 shell()/run().
456
457 Threading considerations
458 ========================
459 Unlike set_info(), when called from the task's thread or
460 not, set_default() immediately updates the underlying
461 dictionary in a thread-safe manner. This method doesn't
462 wake up the engine when called.
463 """
464 self._default_lock.acquire()
465 try:
466 self._default[default_key] = value
467 finally:
468 self._default_lock.release()
469
470 - def info(self, info_key, def_val=None):
471 """
472 Return per-task information. See set_info() for a list of
473 reserved task info_keys.
474 """
475 return self._info.get(info_key, def_val)
476
477 @tasksyncmethod()
479 """
480 Set task value for a specific key information. Key, value
481 pairs can be passed to the engine and/or workers.
482 Users may store their own task-specific info key, value pairs
483 using this method and retrieve them with info().
484
485 The following example changes the fanout value to 128:
486 >>> task.set_info('fanout', 128)
487
488 The following example enables debug messages:
489 >>> task.set_info('debug', True)
490
491 Task info_keys are:
492 - "debug": Boolean value indicating whether to enable library
493 debugging messages (default: False).
494 - "print_debug": Debug messages processing function. This
495 function takes 2 arguments: the task instance and the
496 message string (default: an internal function doing standard
497 print).
498 - "fanout": Max number of registered clients in Engine at a
499 time (default: 64).
500 - "grooming_delay": Message maximum end-to-end delay requirement
501 used for traffic grooming, in seconds as float (default: 0.5).
502 - "connect_timeout": Time in seconds to wait for connecting to
503 remote host before aborting (default: 10).
504 - "command_timeout": Time in seconds to wait for a command to
505 complete before aborting (default: 0, which means
506 unlimited).
507
508 Threading considerations
509 ========================
510 Unlike set_default(), the underlying info dictionary is only
511 modified from the task's thread. So calling set_info() from
512 another thread leads to queueing the request for late apply
513 (at run time) using the task dispatch port. When received,
514 the request wakes up the engine when the task is running and
515 the info dictionary is then updated.
516 """
517 self._info[info_key] = value
518
519 - def shell(self, command, **kwargs):
520 """
521 Schedule a shell command for local or distant parallel execution. This
522 essential method creates a local or remote Worker (depending on the
523 presence of the nodes parameter) and immediately schedules it for
524 execution in task's runloop. So, if the task is already running
525 (ie. called from an event handler), the command is started immediately,
526 assuming current execution contraintes are met (eg. fanout value). If
527 the task is not running, the command is not started but scheduled for
528 late execution. See resume() to start task runloop.
529
530 The following optional parameters are passed to the underlying local
531 or remote Worker constructor:
532 - handler: EventHandler instance to notify (on event) -- default is
533 no handler (None)
534 - timeout: command timeout delay expressed in second using a floating
535 point value -- default is unlimited (None)
536 - autoclose: if set to True, the underlying Worker is automatically
537 aborted as soon as all other non-autoclosing task objects (workers,
538 ports, timers) have finished -- default is False
539 - stderr: separate stdout/stderr if set to True -- default is False.
540
541 Local usage::
542 task.shell(command [, key=key] [, handler=handler]
543 [, timeout=secs] [, autoclose=enable_autoclose]
544 [, stderr=enable_stderr])
545
546 Distant usage::
547 task.shell(command, nodes=nodeset [, handler=handler]
548 [, timeout=secs], [, autoclose=enable_autoclose]
549 [, tree=None|False|True] [, remote=False|True]
550 [, stderr=enable_stderr])
551
552 Example:
553
554 >>> task = task_self()
555 >>> task.shell("/bin/date", nodes="node[1-2345]")
556 >>> task.resume()
557 """
558
559 handler = kwargs.get("handler", None)
560 timeo = kwargs.get("timeout", None)
561 autoclose = kwargs.get("autoclose", False)
562 stderr = kwargs.get("stderr", self.default("stderr"))
563 remote = kwargs.get("remote", True)
564
565 if kwargs.get("nodes", None):
566 assert kwargs.get("key", None) is None, \
567 "'key' argument not supported for distant command"
568
569 tree = kwargs.get("tree")
570
571
572 if tree != False and self._default_tree_is_enabled():
573
574 if tree and self.topology is None:
575 raise TaskError("tree mode required for distant shell "
576 "command with unknown topology!")
577
578 wrkcls = WorkerTree
579 elif not remote:
580
581 wrkcls = self.default('local_worker')
582 else:
583
584 wrkcls = self.default('distant_worker')
585
586 worker = wrkcls(NodeSet(kwargs["nodes"]), command=command,
587 handler=handler, stderr=stderr,
588 timeout=timeo, autoclose=autoclose, remote=remote)
589 else:
590
591 worker = WorkerPopen(command, key=kwargs.get("key", None),
592 handler=handler, stderr=stderr,
593 timeout=timeo, autoclose=autoclose)
594
595
596 self.schedule(worker)
597
598 return worker
599
600 - def copy(self, source, dest, nodes, **kwargs):
601 """
602 Copy local file to distant nodes.
603 """
604 assert nodes != None, "local copy not supported"
605
606 handler = kwargs.get("handler", None)
607 stderr = kwargs.get("stderr", self.default("stderr"))
608 timeo = kwargs.get("timeout", None)
609 preserve = kwargs.get("preserve", None)
610 reverse = kwargs.get("reverse", False)
611
612 tree = kwargs.get("tree")
613
614
615 if tree != False and self._default_tree_is_enabled():
616
617 if tree and self.topology is None:
618 raise TaskError("tree mode required for distant shell "
619 "command with unknown topology!")
620
621
622 wrkcls = WorkerTree
623 else:
624
625 wrkcls = self.default('distant_worker')
626
627 worker = wrkcls(nodes, source=source, dest=dest, handler=handler,
628 stderr=stderr, timeout=timeo, preserve=preserve,
629 reverse=reverse)
630
631 self.schedule(worker)
632 return worker
633
634 - def rcopy(self, source, dest, nodes, **kwargs):
635 """
636 Copy distant file or directory to local node.
637 """
638 kwargs['reverse'] = True
639 return self.copy(source, dest, nodes, **kwargs)
640
641 @tasksyncmethod()
643 """Add an EnginePort instance to Engine (private method)."""
644 self._engine.add(port)
645
646 @tasksyncmethod()
648 """Close and remove a port from task previously created with port()."""
649 self._engine.remove(port)
650
651 - def port(self, handler=None, autoclose=False):
652 """
653 Create a new task port. A task port is an abstraction object to
654 deliver messages reliably between tasks.
655
656 Basic rules:
657 - A task can send messages to another task port (thread safe).
658 - A task can receive messages from an acquired port either by
659 setting up a notification mechanism or using a polling
660 mechanism that may block the task waiting for a message
661 sent on the port.
662 - A port can be acquired by one task only.
663
664 If handler is set to a valid EventHandler object, the port is
665 a send-once port, ie. a message sent to this port generates an
666 ev_msg event notification issued the port's task. If handler
667 is not set, the task can only receive messages on the port by
668 calling port.msg_recv().
669 """
670 port = EnginePort(self, handler, autoclose)
671 self._add_port(port)
672 return port
673
674 - def timer(self, fire, handler, interval=-1.0, autoclose=False):
675 """
676 Create a timer bound to this task that fires at a preset time
677 in the future by invoking the ev_timer() method of `handler'
678 (provided EventHandler object). Timers can fire either only
679 once or repeatedly at fixed time intervals. Repeating timers
680 can also have their next firing time manually adjusted.
681
682 The mandatory parameter `fire' sets the firing delay in seconds.
683
684 The optional parameter `interval' sets the firing interval of
685 the timer. If not specified, the timer fires once and then is
686 automatically invalidated.
687
688 Time values are expressed in second using floating point
689 values. Precision is implementation (and system) dependent.
690
691 The optional parameter `autoclose', if set to True, creates
692 an "autoclosing" timer: it will be automatically invalidated
693 as soon as all other non-autoclosing task's objects (workers,
694 ports, timers) have finished. Default value is False, which
695 means the timer will retain task's runloop until it is
696 invalidated.
697
698 Return a new EngineTimer instance.
699
700 See ClusterShell.Engine.Engine.EngineTimer for more details.
701 """
702 assert fire >= 0.0, \
703 "timer's relative fire time must be a positive floating number"
704
705 timer = EngineTimer(fire, interval, autoclose, handler)
706
707
708 self._add_timer(timer)
709
710 return timer
711
712 @tasksyncmethod()
714 """Add a timer to task engine (thread-safe)."""
715 self._engine.add_timer(timer)
716
717 @tasksyncmethod()
719 """
720 Schedule a worker for execution, ie. add worker in task running
721 loop. Worker will start processing immediately if the task is
722 running (eg. called from an event handler) or as soon as the
723 task is started otherwise. Only useful for manually instantiated
724 workers, for example:
725
726 >>> task = task_self()
727 >>> worker = WorkerSsh("node[2-3]", None, 10, command="/bin/ls")
728 >>> task.schedule(worker)
729 >>> task.resume()
730 """
731 assert self in Task._tasks.values(), \
732 "deleted task instance, call task_self() again!"
733
734
735 worker._set_task(self)
736
737
738 for client in worker._engine_clients():
739 self._engine.add(client)
740
742 """Resume task - called from another thread."""
743 self._suspend_cond.notify_all()
744
764
765 - def resume(self, timeout=None):
766 """
767 Resume task. If task is task_self(), workers are executed in the
768 calling thread so this method will block until all (non-autoclosing)
769 workers have finished. This is always the case for a single-threaded
770 application (eg. which doesn't create other Task() instance than
771 task_self()). Otherwise, the current thread doesn't block. In that
772 case, you may then want to call task_wait() to wait for completion.
773
774 Warning: the timeout parameter can be used to set an hard limit of
775 task execution time (in seconds). In that case, a TimeoutError
776 exception is raised if this delay is reached. Its value is 0 by
777 default, which means no task time limit (TimeoutError is never
778 raised). In order to set a maximum delay for individual command
779 execution, you should use Task.shell()'s timeout parameter instead.
780 """
781
782
783 self.timeout = timeout
784
785 self._suspend_cond.atomic_dec()
786
787 if self._is_task_self():
788 self._resume()
789 else:
790 self._resume_thread()
791
792 - def run(self, command=None, **kwargs):
793 """
794 With arguments, it will schedule a command exactly like a Task.shell()
795 would have done it and run it.
796 This is the easiest way to simply run a command.
797
798 >>> task.run("hostname", nodes="foo")
799
800 Without argument, it starts all outstanding actions.
801 It behaves like Task.resume().
802
803 >>> task.shell("hostname", nodes="foo")
804 >>> task.shell("hostname", nodes="bar")
805 >>> task.run()
806
807 When used with a command, you can set a maximum delay of individual
808 command execution with the help of the timeout parameter (see
809 Task.shell's parameters). You can then listen for ev_timeout() events
810 in your Worker event handlers, or use num_timeout() or
811 iter_keys_timeout() afterwards.
812 But, when used as an alias to Task.resume(), the timeout parameter
813 sets an hard limit of task execution time. In that case, a TimeoutError
814 exception is raised if this delay is reached.
815 """
816 worker = None
817 timeout = None
818
819
820
821
822
823 if type(command) in (int, float):
824 timeout = command
825 command = None
826
827 elif 'timeout' in kwargs and command is None:
828 timeout = kwargs.pop('timeout')
829
830
831 elif command is not None:
832 worker = self.shell(command, **kwargs)
833
834 self.resume(timeout)
835
836 return worker
837
838 @tasksyncmethod()
840 """Suspend request received."""
841 assert task_self() == self
842
843 self._suspend_lock.acquire()
844 self._suspended = True
845 self._suspend_lock.release()
846
847
848 self._suspend_cond.wait_check(self._run_lock)
849
850
851 self._suspend_lock.acquire()
852 self._suspended = False
853 self._suspend_lock.release()
854
856 """
857 Suspend task execution. This method may be called from another
858 task (thread-safe). The function returns False if the task
859 cannot be suspended (eg. it's not running), or returns True if
860 the task has been successfully suspended.
861 To resume a suspended task, use task.resume().
862 """
863
864 self._suspend_cond.atomic_inc()
865
866
867 self._suspend_wait()
868
869
870 self._run_lock.acquire()
871
872
873 result = True
874 self._suspend_lock.acquire()
875 if not self._suspended:
876
877 result = False
878 self._run_lock.release()
879 self._suspend_lock.release()
880 return result
881
882 @tasksyncmethod()
883 - def _abort(self, kill=False):
884 """Abort request received."""
885 assert task_self() == self
886
887 self._quit = True
888 self._engine.abort(kill)
889
890 - def abort(self, kill=False):
891 """
892 Abort a task. Aborting a task removes (and stops when needed)
893 all workers. If optional parameter kill is True, the task
894 object is unbound from the current thread, so calling
895 task_self() creates a new Task object.
896 """
897 if not self._run_lock.acquire(0):
898
899 self._abort(kill)
900
901
902 while not self._run_lock.acquire(0):
903 sleep(0.001)
904
905 self._quit = True
906 self._run_lock.release()
907 if self._is_task_self():
908 self._terminate(kill)
909 else:
910
911 self._suspend_cond.notify_all()
912
914 """
915 Abort completion subroutine.
916 """
917 assert self._quit == True
918 self._terminated = True
919
920 if kill:
921
922 self._dispatch_port = None
923
924 self._engine.clear(clear_ports=kill)
925 if kill:
926 self._engine.release()
927 self._engine = None
928
929
930 self._reset()
931
932
933
934
935 self._join_cond.acquire()
936 self._join_cond.notifyAll()
937 self._join_cond.release()
938
939
940 if kill:
941 Task._task_lock.acquire()
942 try:
943 del Task._tasks[threading.currentThread()]
944 finally:
945 Task._task_lock.release()
946
948 """
949 Suspend execution of the calling thread until the target task
950 terminates, unless the target task has already terminated.
951 """
952 self._join_cond.acquire()
953 try:
954 if self._suspend_cond.suspend_count > 0 and not self._suspended:
955
956 return
957 if self._terminated:
958
959 return
960 self._join_cond.wait()
961 finally:
962 self._join_cond.release()
963
965 """
966 Return True if the task is running.
967 """
968 return self._engine and self._engine.running
969
971 """
972 Reset buffers and retcodes management variables.
973 """
974
975 self._msgtrees = {}
976
977 self._d_source_rc = {}
978 self._d_rc_sources = {}
979 self._max_rc = None
980 self._timeout_sources.clear()
981
982 - def _msgtree(self, sname, strict=True):
983 """Helper method to return msgtree instance by sname if allowed."""
984 if self.default("%s_msgtree" % sname):
985 if sname not in self._msgtrees:
986 self._msgtrees[sname] = MsgTree()
987 return self._msgtrees[sname]
988 elif strict:
989 raise TaskMsgTreeError("%s_msgtree not set" % sname)
990
991 - def _msg_add(self, worker, node, sname, msg):
992 """
993 Process a new message into Task's MsgTree that is coming from:
994 - a worker instance of this task
995 - a node
996 - a stream name sname (string identifier)
997 """
998 assert worker.task == self, "better to add messages from my workers"
999 msgtree = self._msgtree(sname, strict=False)
1000
1001
1002
1003
1004 if msgtree is not None:
1005 msgtree.add((worker, node), msg)
1006
1007 - def _rc_set(self, worker, node, rc):
1008 """
1009 Add a worker return code (rc) that is coming from a node of a
1010 worker instance.
1011 """
1012 source = (worker, node)
1013
1014
1015 self._d_source_rc[source] = rc
1016
1017
1018 self._d_rc_sources.setdefault(rc, set()).add(source)
1019
1020
1021 if self._max_rc is None or rc > self._max_rc:
1022 self._max_rc = rc
1023
1025 """
1026 Add a timeout indicator that is coming from a node of a worker
1027 instance.
1028 """
1029
1030 self._timeout_sources.add((worker, node))
1031
1033 """Get a message by its worker instance, node and stream name."""
1034 msg = self._msgtree(sname).get((worker, node))
1035 if msg is None:
1036 return None
1037 return str(msg)
1038
1040 """Call identified tree matcher (items, walk) method with options."""
1041 if isinstance(match_keys, basestring):
1042 raise TypeError("Sequence of keys/nodes expected for 'match_keys'.")
1043
1044 if worker and match_keys is None:
1045 match = lambda k: k[0] is worker
1046 elif worker and match_keys is not None:
1047 match = lambda k: k[0] is worker and k[1] in match_keys
1048 elif match_keys:
1049 match = lambda k: k[1] in match_keys
1050 else:
1051 match = None
1052
1053 return tree_match_func(match, itemgetter(1))
1054
1056 """Get a return code by worker instance and node."""
1057 return self._d_source_rc[(worker, node)]
1058
1060 """
1061 Return an iterator over return codes for the given key.
1062 """
1063 for (w, k), rc in self._d_source_rc.iteritems():
1064 if k == key:
1065 yield rc
1066
1068 """
1069 Return an iterator over return codes and keys list for a
1070 specific worker and optional matching keys.
1071 """
1072 if match_keys:
1073
1074 for rc, src in self._d_rc_sources.iteritems():
1075 keys = [t[1] for t in src if t[0] is worker and \
1076 t[1] in match_keys]
1077 if len(keys) > 0:
1078 yield rc, keys
1079 else:
1080 for rc, src in self._d_rc_sources.iteritems():
1081 keys = [t[1] for t in src if t[0] is worker]
1082 if len(keys) > 0:
1083 yield rc, keys
1084
1086 """
1087 Return an iterator over key, rc for a specific worker.
1088 """
1089 for rc, src in self._d_rc_sources.iteritems():
1090 for w, k in src:
1091 if w is worker:
1092 yield k, rc
1093
1095 """
1096 Return the number of timed out "keys" for a specific worker.
1097 """
1098 cnt = 0
1099 for (w, k) in self._timeout_sources:
1100 if w is worker:
1101 cnt += 1
1102 return cnt
1103
1105 """
1106 Iterate over timed out keys (ie. nodes) for a specific worker.
1107 """
1108 for (w, k) in self._timeout_sources:
1109 if w is worker:
1110 yield k
1111
1113 """
1114 Remove any messages from specified worker.
1115 """
1116 msgtree = self._msgtree('stdout', strict=False)
1117 if msgtree is not None:
1118 msgtree.remove(lambda k: k[0] == worker)
1119
1121 """
1122 Remove any error messages from specified worker.
1123 """
1124 errtree = self._msgtree('stderr', strict=False)
1125 if errtree is not None:
1126 errtree.remove(lambda k: k[0] == worker)
1127
1129 """
1130 Get buffer for a specific key. When the key is associated
1131 to multiple workers, the resulting buffer will contain
1132 all workers content that may overlap. This method returns an
1133 empty buffer if key is not found in any workers.
1134 """
1135 msgtree = self._msgtree('stdout')
1136 select_key = lambda k: k[1] == key
1137 return "".join(imap(str, msgtree.messages(select_key)))
1138
1139 node_buffer = key_buffer
1140
1142 """
1143 Get error buffer for a specific key. When the key is associated
1144 to multiple workers, the resulting buffer will contain all
1145 workers content that may overlap. This method returns an empty
1146 error buffer if key is not found in any workers.
1147 """
1148 errtree = self._msgtree('stderr')
1149 select_key = lambda k: k[1] == key
1150 return "".join(imap(str, errtree.messages(select_key)))
1151
1152 node_error = key_error
1153
1155 """
1156 Return return code for a specific key. When the key is
1157 associated to multiple workers, return the max return
1158 code from these workers. Raises a KeyError if key is not found
1159 in any finished workers.
1160 """
1161 codes = list(self._rc_iter_by_key(key))
1162 if not codes:
1163 raise KeyError(key)
1164 return max(codes)
1165
1166 node_retcode = key_retcode
1167
1169 """
1170 Get max return code encountered during last run
1171 or None in the following cases:
1172 - all commands timed out,
1173 - no command was executed.
1174
1175 How retcodes work
1176 =================
1177 If the process exits normally, the return code is its exit
1178 status. If the process is terminated by a signal, the return
1179 code is 128 + signal number.
1180 """
1181 return self._max_rc
1182
1184 """Helper method to iterate over recorded buffers by sname."""
1185 try:
1186 msgtree = self._msgtrees[sname]
1187 return self._call_tree_matcher(msgtree.walk, match_keys)
1188 except KeyError:
1189 if not self.default("%s_msgtree" % sname):
1190 raise TaskMsgTreeError("%s_msgtree not set" % sname)
1191 return iter([])
1192
1194 """
1195 Iterate over buffers, returns a tuple (buffer, keys). For remote
1196 workers (Ssh), keys are list of nodes. In that case, you should use
1197 NodeSet.fromlist(keys) to get a NodeSet instance (which is more
1198 convenient and efficient):
1199
1200 Optional parameter match_keys add filtering on these keys.
1201
1202 Usage example:
1203
1204 >>> for buffer, nodelist in task.iter_buffers():
1205 ... print NodeSet.fromlist(nodelist)
1206 ... print buffer
1207 """
1208 return self._iter_msgtree('stdout', match_keys)
1209
1211 """
1212 Iterate over error buffers, returns a tuple (buffer, keys).
1213
1214 See iter_buffers().
1215 """
1216 return self._iter_msgtree('stderr', match_keys)
1217
1219 """
1220 Iterate over return codes, returns a tuple (rc, keys).
1221
1222 Optional parameter match_keys add filtering on these keys.
1223
1224 How retcodes work
1225 =================
1226 If the process exits normally, the return code is its exit
1227 status. If the process is terminated by a signal, the return
1228 code is 128 + signal number.
1229 """
1230 if match_keys:
1231
1232 for rc, src in self._d_rc_sources.iteritems():
1233 keys = [t[1] for t in src if t[1] in match_keys]
1234 yield rc, keys
1235 else:
1236 for rc, src in self._d_rc_sources.iteritems():
1237 yield rc, [t[1] for t in src]
1238
1240 """
1241 Return the number of timed out "keys" (ie. nodes).
1242 """
1243 return len(self._timeout_sources)
1244
1246 """
1247 Iterate over timed out keys (ie. nodes).
1248 """
1249 for (w, k) in self._timeout_sources:
1250 yield k
1251
1253 """
1254 Flush all task messages (from all task workers).
1255 """
1256 msgtree = self._msgtree('stdout', strict=False)
1257 if msgtree is not None:
1258 msgtree.clear()
1259
1261 """
1262 Flush all task error messages (from all task workers).
1263 """
1264 errtree = self._msgtree('stderr', strict=False)
1265 if errtree is not None:
1266 errtree.clear()
1267
1268 @classmethod
1269 - def wait(cls, from_thread):
1270 """
1271 Class method that blocks calling thread until all tasks have
1272 finished (from a ClusterShell point of view, for instance,
1273 their task.resume() return). It doesn't necessarly mean that
1274 associated threads have finished.
1275 """
1276 Task._task_lock.acquire()
1277 try:
1278 tasks = Task._tasks.copy()
1279 finally:
1280 Task._task_lock.release()
1281 for thread, task in tasks.iteritems():
1282 if thread != from_thread:
1283 task.join()
1284
1286 """Get propagation channel for gateway (create one if needed).
1287
1288 Use self.gateways dictionary that allows lookup like:
1289 gateway => (worker channel, set of metaworkers)
1290 """
1291
1292 if gateway not in self.gateways:
1293 chan = PropagationChannel(self, gateway)
1294 logger = logging.getLogger(__name__)
1295 logger.info("pchannel: creating new channel %s", chan)
1296
1297 timeout = None
1298 wrkcls = self.default('distant_worker')
1299 chanworker = wrkcls(gateway, command=metaworker.invoke_gateway,
1300 handler=chan, stderr=True, timeout=timeout)
1301
1302
1303
1304 chanworker._fanout = FANOUT_UNLIMITED
1305
1306
1307 chanworker.SNAME_STDIN = chan.SNAME_WRITER
1308 chanworker.SNAME_STDOUT = chan.SNAME_READER
1309 chanworker.SNAME_STDERR = chan.SNAME_ERROR
1310 self.schedule(chanworker)
1311
1312 self.gateways[gateway] = (chanworker, set([metaworker]))
1313 else:
1314
1315 chanworker, metaworkers = self.gateways[gateway]
1316 metaworkers.add(metaworker)
1317 return chanworker.eh
1318
1320 """Release propagation channel associated to gateway.
1321
1322 Lookup by gateway, decref associated metaworker set and release
1323 channel worker if needed.
1324 """
1325 logger = logging.getLogger(__name__)
1326 logger.debug("pchannel_release %s %s", gateway, metaworker)
1327
1328 if gateway not in self.gateways:
1329 logger.error("pchannel_release: no pchannel found for gateway %s",
1330 gateway)
1331 else:
1332
1333 chanworker, metaworkers = self.gateways[gateway]
1334 metaworkers.remove(metaworker)
1335 if len(metaworkers) == 0:
1336 logger.info("pchannel_release: destroying channel %s",
1337 chanworker.eh)
1338 chanworker.abort()
1339
1340 del self.gateways[gateway]
1341
1344 """
1345 Return the current Task object, corresponding to the caller's thread of
1346 control (a Task object is always bound to a specific thread). This function
1347 provided as a convenience is available in the top-level ClusterShell.Task
1348 package namespace.
1349 """
1350 return Task(thread=threading.currentThread(), defaults=defaults)
1351
1353 """
1354 Suspend execution of the calling thread until all tasks terminate, unless
1355 all tasks have already terminated. This function is provided as a
1356 convenience and is available in the top-level ClusterShell.Task package
1357 namespace.
1358 """
1359 Task.wait(threading.currentThread())
1360
1362 """
1363 Destroy the Task instance bound to the current thread. A next call to
1364 task_self() will create a new Task object. Not to be called from a signal
1365 handler. This function provided as a convenience is available in the
1366 top-level ClusterShell.Task package namespace.
1367 """
1368 task_self().abort(kill=True)
1369
1371 """
1372 Cleanup routine to destroy all created tasks. This function provided as a
1373 convenience is available in the top-level ClusterShell.Task package
1374 namespace. This is mainly used for testing purposes and should be avoided
1375 otherwise. task_cleanup() may be called from any threads but not from a
1376 signal handler.
1377 """
1378
1379 while True:
1380 Task._task_lock.acquire()
1381 try:
1382 tasks = Task._tasks.copy()
1383 if len(tasks) == 0:
1384 break
1385 finally:
1386 Task._task_lock.release()
1387
1388
1389
1390
1391 for task in tasks.itervalues():
1392 task.abort(kill=True)
1393
1394
1395
1396 sleep(0.001)
1397