Package ClusterShell :: Module Task
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Task

   1  # 
   2  # Copyright (C) 2007-2016 CEA/DAM 
   3  # Copyright (C) 2015-2016 Stephane Thiell <sthiell@stanford.edu> 
   4  # 
   5  # This file is part of ClusterShell. 
   6  # 
   7  # ClusterShell is free software; you can redistribute it and/or 
   8  # modify it under the terms of the GNU Lesser General Public 
   9  # License as published by the Free Software Foundation; either 
  10  # version 2.1 of the License, or (at your option) any later version. 
  11  # 
  12  # ClusterShell is distributed in the hope that it will be useful, 
  13  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
  14  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  15  # Lesser General Public License for more details. 
  16  # 
  17  # You should have received a copy of the GNU Lesser General Public 
  18  # License along with ClusterShell; if not, write to the Free Software 
  19  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 
  20   
  21  """ 
  22  ClusterShell Task module. 
  23   
  24  Simple example of use: 
  25   
  26  >>> from ClusterShell.Task import task_self, NodeSet 
  27  >>>   
  28  >>> # get task associated with calling thread 
  29  ... task = task_self() 
  30  >>>  
  31  >>> # add a command to execute on distant nodes 
  32  ... task.shell("/bin/uname -r", nodes="tiger[1-30,35]") 
  33  <ClusterShell.Worker.Ssh.WorkerSsh object at 0x7f41da71b890> 
  34  >>>  
  35  >>> # run task in calling thread 
  36  ... task.run() 
  37  >>>  
  38  >>> # get results 
  39  ... for output, nodelist in task.iter_buffers(): 
  40  ...     print '%s: %s' % (NodeSet.fromlist(nodelist), output) 
  41  ...  
  42   
  43  """ 
  44   
  45  from itertools import imap 
  46  import logging 
  47  from operator import itemgetter 
  48  import os 
  49  import socket 
  50  import sys 
  51  import threading 
  52  from time import sleep 
  53  import traceback 
  54   
  55  from ClusterShell.Defaults import config_paths, DEFAULTS 
  56  from ClusterShell.Defaults import _local_workerclass, _distant_workerclass 
  57  from ClusterShell.Engine.Engine import EngineAbortException 
  58  from ClusterShell.Engine.Engine import EngineTimeoutException 
  59  from ClusterShell.Engine.Engine import EngineAlreadyRunningError 
  60  from ClusterShell.Engine.Engine import EngineTimer 
  61  from ClusterShell.Engine.Factory import PreferredEngine 
  62  from ClusterShell.Worker.EngineClient import EnginePort 
  63  from ClusterShell.Worker.Popen import WorkerPopen 
  64  from ClusterShell.Worker.Tree import WorkerTree 
  65  from ClusterShell.Worker.Worker import FANOUT_UNLIMITED 
  66   
  67  from ClusterShell.Event import EventHandler 
  68  from ClusterShell.MsgTree import MsgTree 
  69  from ClusterShell.NodeSet import NodeSet 
  70   
  71  from ClusterShell.Topology import TopologyParser, TopologyError 
  72  from ClusterShell.Propagation import PropagationTreeRouter, PropagationChannel 
73 74 75 -class TaskException(Exception):
76 """Base task exception."""
77
78 -class TaskError(TaskException):
79 """Base task error exception."""
80
81 -class TimeoutError(TaskError):
82 """Raised when the task timed out."""
83
84 -class AlreadyRunningError(TaskError):
85 """Raised when trying to resume an already running task."""
86
87 -class TaskMsgTreeError(TaskError):
88 """Raised when trying to access disabled MsgTree."""
89
90 91 -def _getshorthostname():
92 """Get short hostname (host name cut at the first dot)""" 93 return socket.gethostname().split('.')[0]
94
95 96 -class Task(object):
97 """ 98 The Task class defines an essential ClusterShell object which aims to 99 execute commands in parallel and easily get their results. 100 101 More precisely, a Task object manages a coordinated (ie. with respect of 102 its current parameters) collection of independent parallel Worker objects. 103 See ClusterShell.Worker.Worker for further details on ClusterShell Workers. 104 105 Always bound to a specific thread, a Task object acts like a "thread 106 singleton". So most of the time, and even more for single-threaded 107 applications, you can get the current task object with the following 108 top-level Task module function: 109 110 >>> task = task_self() 111 112 However, if you want to create a task in a new thread, use: 113 114 >>> task = Task() 115 116 To create or get the instance of the task associated with the thread 117 object thr (threading.Thread): 118 119 >>> task = Task(thread=thr) 120 121 To submit a command to execute locally within task, use: 122 123 >>> task.shell("/bin/hostname") 124 125 To submit a command to execute to some distant nodes in parallel, use: 126 127 >>> task.shell("/bin/hostname", nodes="tiger[1-20]") 128 129 The previous examples submit commands to execute but do not allow result 130 interaction during their execution. For your program to interact during 131 command execution, it has to define event handlers that will listen for 132 local or remote events. These handlers are based on the EventHandler 133 class, defined in ClusterShell.Event. The following example shows how to 134 submit a command on a cluster with a registered event handler: 135 136 >>> task.shell("uname -r", nodes="node[1-9]", handler=MyEventHandler()) 137 138 Run task in its associated thread (will block only if the calling thread is 139 the task associated thread): 140 141 >>> task.resume() 142 or: 143 144 >>> task.run() 145 146 You can also pass arguments to task.run() to schedule a command exactly 147 like in task.shell(), and run it: 148 149 >>> task.run("hostname", nodes="tiger[1-20]", handler=MyEventHandler()) 150 151 A common need is to set a maximum delay for command execution, especially 152 when the command time is not known. Doing this with ClusterShell Task is 153 very straighforward. To limit the execution time on each node, use the 154 timeout parameter of shell() or run() methods to set a delay in seconds, 155 like: 156 157 >>> task.run("check_network.sh", nodes="tiger[1-20]", timeout=30) 158 159 You can then either use Task's iter_keys_timeout() method after execution 160 to see on what nodes the command has timed out, or listen for ev_timeout() 161 events in your event handler. 162 163 To get command result, you can either use Task's iter_buffers() method for 164 standard output, iter_errors() for standard error after command execution 165 (common output contents are automatically gathered), or you can listen for 166 ev_read() and ev_error() events in your event handler and get live command 167 output. 168 169 To get command return codes, you can either use Task's iter_retcodes(), 170 node_retcode() and max_retcode() methods after command execution, or 171 listen for ev_hup() events in your event handler. 172 """ 173 174 # topology.conf file path list 175 TOPOLOGY_CONFIGS = config_paths('topology.conf') 176 177 _tasks = {} 178 _taskid_max = 0 179 _task_lock = threading.Lock() 180
181 - class _SyncMsgHandler(EventHandler):
182 """Special task control port event handler. 183 When a message is received on the port, call appropriate 184 task method."""
185 - def ev_msg(self, port, msg):
186 """Message received: call appropriate task method.""" 187 # pull out function and its arguments from message 188 func, (args, kwargs) = msg[0], msg[1:] 189 # call task method 190 func(port.task, *args, **kwargs)
191
192 - class tasksyncmethod(object):
193 """Class encapsulating a function that checks if the calling 194 task is running or is the current task, and allowing it to be 195 used as a decorator making the wrapped task method thread-safe."""
196 - def __call__(self, f):
197 def taskfunc(*args, **kwargs): 198 # pull out the class instance 199 task, fargs = args[0], args[1:] 200 # check if the calling task is the current thread task 201 if task._is_task_self(): 202 return f(task, *fargs, **kwargs) 203 elif task._dispatch_port: 204 # no, safely call the task method by message 205 # through the task special dispatch port 206 task._dispatch_port.msg_send((f, fargs, kwargs)) 207 else: 208 task.info("print_debug")(task, "%s: dropped call: %s" % \ 209 (task, str(fargs)))
210 # modify the decorator meta-data for pydoc 211 # Note: should be later replaced by @wraps (functools) 212 # as of Python 2.5 213 taskfunc.__name__ = f.__name__ 214 taskfunc.__doc__ = f.__doc__ 215 taskfunc.__dict__ = f.__dict__ 216 taskfunc.__module__ = f.__module__ 217 return taskfunc
218
219 - class _SuspendCondition(object):
220 """Special class to manage task suspend condition."""
221 - def __init__(self, lock=threading.RLock(), initial=0):
222 self._cond = threading.Condition(lock) 223 self.suspend_count = initial
224
225 - def atomic_inc(self):
226 """Increase suspend count.""" 227 self._cond.acquire() 228 self.suspend_count += 1 229 self._cond.release()
230
231 - def atomic_dec(self):
232 """Decrease suspend count.""" 233 self._cond.acquire() 234 self.suspend_count -= 1 235 self._cond.release()
236
237 - def wait_check(self, release_lock=None):
238 """Wait for condition if needed.""" 239 self._cond.acquire() 240 try: 241 if self.suspend_count > 0: 242 if release_lock: 243 release_lock.release() 244 self._cond.wait() 245 finally: 246 self._cond.release()
247
248 - def notify_all(self):
249 """Signal all threads waiting for condition.""" 250 self._cond.acquire() 251 try: 252 self.suspend_count = min(self.suspend_count, 0) 253 self._cond.notifyAll() 254 finally: 255 self._cond.release()
256 257
258 - def __new__(cls, thread=None, defaults=None):
259 """ 260 For task bound to a specific thread, this class acts like a 261 "thread singleton", so new style class is used and new object 262 are only instantiated if needed. 263 """ 264 if thread: 265 if thread not in cls._tasks: 266 cls._tasks[thread] = object.__new__(cls) 267 return cls._tasks[thread] 268 269 return object.__new__(cls)
270
271 - def __init__(self, thread=None, defaults=None):
272 """Initialize a Task, creating a new non-daemonic thread if 273 needed.""" 274 if not getattr(self, "_engine", None): 275 # first time called 276 self._default_lock = threading.Lock() 277 if defaults is None: 278 defaults = DEFAULTS 279 self._default = defaults._task_default.copy() 280 self._default.update( 281 {"local_worker": _local_workerclass(defaults), 282 "distant_worker": _distant_workerclass(defaults)}) 283 self._info = defaults._task_info.copy() 284 285 # use factory class PreferredEngine that gives the proper 286 # engine instance 287 self._engine = PreferredEngine(self.default("engine"), self._info) 288 self.timeout = None 289 290 # task synchronization objects 291 self._run_lock = threading.Lock() # primitive lock 292 self._suspend_lock = threading.RLock() # reentrant lock 293 # both join and suspend conditions share the same underlying lock 294 self._suspend_cond = Task._SuspendCondition(self._suspend_lock, 1) 295 self._join_cond = threading.Condition(self._suspend_lock) 296 self._suspended = False 297 self._quit = False 298 self._terminated = False 299 300 # Default router 301 self.topology = None 302 self.router = None 303 self.gateways = {} 304 305 # dict of MsgTree by sname 306 self._msgtrees = {} 307 # dict of sources to return codes 308 self._d_source_rc = {} 309 # dict of return codes to sources 310 self._d_rc_sources = {} 311 # keep max rc 312 self._max_rc = None 313 # keep timeout'd sources 314 self._timeout_sources = set() 315 # allow no-op call to getters before resume() 316 self._reset() 317 318 # special engine port for task method dispatching 319 self._dispatch_port = EnginePort(self, 320 handler=Task._SyncMsgHandler(), 321 autoclose=True) 322 self._engine.add(self._dispatch_port) 323 324 # set taskid used as Thread name 325 Task._task_lock.acquire() 326 Task._taskid_max += 1 327 self._taskid = Task._taskid_max 328 Task._task_lock.release() 329 330 # create new thread if needed 331 self._thread_foreign = bool(thread) 332 if self._thread_foreign: 333 self.thread = thread 334 else: 335 self.thread = thread = \ 336 threading.Thread(None, 337 Task._thread_start, 338 "Task-%d" % self._taskid, 339 args=(self,)) 340 Task._tasks[thread] = self 341 thread.start()
342
343 - def _is_task_self(self):
344 """Private method used by the library to check if the task is 345 task_self(), but do not create any task_self() instance.""" 346 return self.thread == threading.currentThread()
347
348 - def default_excepthook(self, exc_type, exc_value, tb):
349 """Default excepthook for a newly Task. When an exception is 350 raised and uncaught on Task thread, excepthook is called, which 351 is default_excepthook by default. Once excepthook overriden, 352 you can still call default_excepthook if needed.""" 353 print >> sys.stderr, 'Exception in thread %s:' % self.thread 354 traceback.print_exception(exc_type, exc_value, tb, file=sys.stderr)
355 356 _excepthook = default_excepthook 357
358 - def _getexcepthook(self):
359 return self._excepthook
360
361 - def _setexcepthook(self, hook):
362 self._excepthook = hook 363 # If thread has not been created by us, install sys.excepthook which 364 # might handle uncaught exception. 365 if self._thread_foreign: 366 sys.excepthook = self._excepthook
367 368 # When an exception is raised and uncaught on Task's thread, 369 # excepthook is called. You may want to override this three 370 # arguments method (very similar of what you can do with 371 # sys.excepthook).""" 372 excepthook = property(_getexcepthook, _setexcepthook) 373
374 - def _thread_start(self):
375 """Task-managed thread entry point""" 376 while not self._quit: 377 self._suspend_cond.wait_check() 378 if self._quit: # may be set by abort() 379 break 380 try: 381 self._resume() 382 except: 383 self.excepthook(*sys.exc_info()) 384 self._quit = True 385 386 self._terminate(kill=True)
387
388 - def _run(self, timeout):
389 """Run task (always called from its self thread).""" 390 # check if task is already running 391 if self._run_lock.locked(): 392 raise AlreadyRunningError("task is already running") 393 # use with statement later 394 try: 395 self._run_lock.acquire() 396 self._engine.run(timeout) 397 finally: 398 self._run_lock.release()
399
400 - def _default_tree_is_enabled(self):
401 """Return whether default tree is enabled (load topology_file btw)""" 402 if self.topology is None: 403 for topology_file in self.TOPOLOGY_CONFIGS[::-1]: 404 if os.path.exists(topology_file): 405 self.load_topology(topology_file) 406 break 407 return (self.topology is not None) and self.default("auto_tree")
408
409 - def load_topology(self, topology_file):
410 """Load propagation topology from provided file. 411 412 On success, task.topology is set to a corresponding TopologyTree 413 instance. 414 415 On failure, task.topology is left untouched and a TopologyError 416 exception is raised. 417 """ 418 self.topology = TopologyParser(topology_file).tree(_getshorthostname())
419
420 - def _default_router(self):
421 if self.router is None: 422 self.router = PropagationTreeRouter(str(self.topology.root.nodeset), 423 self.topology) 424 return self.router
425
426 - def default(self, default_key, def_val=None):
427 """ 428 Return per-task value for key from the "default" dictionary. 429 See set_default() for a list of reserved task default_keys. 430 """ 431 self._default_lock.acquire() 432 try: 433 return self._default.get(default_key, def_val) 434 finally: 435 self._default_lock.release()
436
437 - def set_default(self, default_key, value):
438 """ 439 Set task value for specified key in the dictionary "default". 440 Users may store their own task-specific key, value pairs 441 using this method and retrieve them with default(). 442 443 Task default_keys are: 444 - "stderr": Boolean value indicating whether to enable 445 stdout/stderr separation when using task.shell(), if not 446 specified explicitly (default: False). 447 - "stdout_msgtree": Whether to instantiate standard output 448 MsgTree for automatic internal gathering of result messages 449 coming from Workers (default: True). 450 - "stderr_msgtree": Same for stderr (default: True). 451 - "engine": Used to specify an underlying Engine explicitly 452 (default: "auto"). 453 - "port_qlimit": Size of port messages queue (default: 32). 454 - "worker": Worker-based class used when spawning workers through 455 shell()/run(). 456 457 Threading considerations 458 ======================== 459 Unlike set_info(), when called from the task's thread or 460 not, set_default() immediately updates the underlying 461 dictionary in a thread-safe manner. This method doesn't 462 wake up the engine when called. 463 """ 464 self._default_lock.acquire() 465 try: 466 self._default[default_key] = value 467 finally: 468 self._default_lock.release()
469
470 - def info(self, info_key, def_val=None):
471 """ 472 Return per-task information. See set_info() for a list of 473 reserved task info_keys. 474 """ 475 return self._info.get(info_key, def_val)
476 477 @tasksyncmethod()
478 - def set_info(self, info_key, value):
479 """ 480 Set task value for a specific key information. Key, value 481 pairs can be passed to the engine and/or workers. 482 Users may store their own task-specific info key, value pairs 483 using this method and retrieve them with info(). 484 485 The following example changes the fanout value to 128: 486 >>> task.set_info('fanout', 128) 487 488 The following example enables debug messages: 489 >>> task.set_info('debug', True) 490 491 Task info_keys are: 492 - "debug": Boolean value indicating whether to enable library 493 debugging messages (default: False). 494 - "print_debug": Debug messages processing function. This 495 function takes 2 arguments: the task instance and the 496 message string (default: an internal function doing standard 497 print). 498 - "fanout": Max number of registered clients in Engine at a 499 time (default: 64). 500 - "grooming_delay": Message maximum end-to-end delay requirement 501 used for traffic grooming, in seconds as float (default: 0.5). 502 - "connect_timeout": Time in seconds to wait for connecting to 503 remote host before aborting (default: 10). 504 - "command_timeout": Time in seconds to wait for a command to 505 complete before aborting (default: 0, which means 506 unlimited). 507 508 Threading considerations 509 ======================== 510 Unlike set_default(), the underlying info dictionary is only 511 modified from the task's thread. So calling set_info() from 512 another thread leads to queueing the request for late apply 513 (at run time) using the task dispatch port. When received, 514 the request wakes up the engine when the task is running and 515 the info dictionary is then updated. 516 """ 517 self._info[info_key] = value
518
519 - def shell(self, command, **kwargs):
520 """ 521 Schedule a shell command for local or distant parallel execution. This 522 essential method creates a local or remote Worker (depending on the 523 presence of the nodes parameter) and immediately schedules it for 524 execution in task's runloop. So, if the task is already running 525 (ie. called from an event handler), the command is started immediately, 526 assuming current execution contraintes are met (eg. fanout value). If 527 the task is not running, the command is not started but scheduled for 528 late execution. See resume() to start task runloop. 529 530 The following optional parameters are passed to the underlying local 531 or remote Worker constructor: 532 - handler: EventHandler instance to notify (on event) -- default is 533 no handler (None) 534 - timeout: command timeout delay expressed in second using a floating 535 point value -- default is unlimited (None) 536 - autoclose: if set to True, the underlying Worker is automatically 537 aborted as soon as all other non-autoclosing task objects (workers, 538 ports, timers) have finished -- default is False 539 - stderr: separate stdout/stderr if set to True -- default is False. 540 541 Local usage:: 542 task.shell(command [, key=key] [, handler=handler] 543 [, timeout=secs] [, autoclose=enable_autoclose] 544 [, stderr=enable_stderr]) 545 546 Distant usage:: 547 task.shell(command, nodes=nodeset [, handler=handler] 548 [, timeout=secs], [, autoclose=enable_autoclose] 549 [, tree=None|False|True] [, remote=False|True] 550 [, stderr=enable_stderr]) 551 552 Example: 553 554 >>> task = task_self() 555 >>> task.shell("/bin/date", nodes="node[1-2345]") 556 >>> task.resume() 557 """ 558 559 handler = kwargs.get("handler", None) 560 timeo = kwargs.get("timeout", None) 561 autoclose = kwargs.get("autoclose", False) 562 stderr = kwargs.get("stderr", self.default("stderr")) 563 remote = kwargs.get("remote", True) 564 565 if kwargs.get("nodes", None): 566 assert kwargs.get("key", None) is None, \ 567 "'key' argument not supported for distant command" 568 569 tree = kwargs.get("tree") 570 571 # tree == None means auto 572 if tree != False and self._default_tree_is_enabled(): 573 # fail if tree is forced without any topology 574 if tree and self.topology is None: 575 raise TaskError("tree mode required for distant shell " 576 "command with unknown topology!") 577 # create tree worker 578 wrkcls = WorkerTree 579 elif not remote: 580 # create local worker 581 wrkcls = self.default('local_worker') 582 else: 583 # create distant worker 584 wrkcls = self.default('distant_worker') 585 586 worker = wrkcls(NodeSet(kwargs["nodes"]), command=command, 587 handler=handler, stderr=stderr, 588 timeout=timeo, autoclose=autoclose, remote=remote) 589 else: 590 # create old fashioned local worker 591 worker = WorkerPopen(command, key=kwargs.get("key", None), 592 handler=handler, stderr=stderr, 593 timeout=timeo, autoclose=autoclose) 594 595 # schedule worker for execution in this task 596 self.schedule(worker) 597 598 return worker
599
600 - def copy(self, source, dest, nodes, **kwargs):
601 """ 602 Copy local file to distant nodes. 603 """ 604 assert nodes != None, "local copy not supported" 605 606 handler = kwargs.get("handler", None) 607 stderr = kwargs.get("stderr", self.default("stderr")) 608 timeo = kwargs.get("timeout", None) 609 preserve = kwargs.get("preserve", None) 610 reverse = kwargs.get("reverse", False) 611 612 tree = kwargs.get("tree") 613 614 # tree == None means auto 615 if tree != False and self._default_tree_is_enabled(): 616 # fail if tree is forced without any topology 617 if tree and self.topology is None: 618 raise TaskError("tree mode required for distant shell " 619 "command with unknown topology!") 620 621 # create tree worker 622 wrkcls = WorkerTree 623 else: 624 # create a new copy worker 625 wrkcls = self.default('distant_worker') 626 627 worker = wrkcls(nodes, source=source, dest=dest, handler=handler, 628 stderr=stderr, timeout=timeo, preserve=preserve, 629 reverse=reverse) 630 631 self.schedule(worker) 632 return worker
633
634 - def rcopy(self, source, dest, nodes, **kwargs):
635 """ 636 Copy distant file or directory to local node. 637 """ 638 kwargs['reverse'] = True 639 return self.copy(source, dest, nodes, **kwargs)
640 641 @tasksyncmethod()
642 - def _add_port(self, port):
643 """Add an EnginePort instance to Engine (private method).""" 644 self._engine.add(port)
645 646 @tasksyncmethod()
647 - def remove_port(self, port):
648 """Close and remove a port from task previously created with port().""" 649 self._engine.remove(port)
650
651 - def port(self, handler=None, autoclose=False):
652 """ 653 Create a new task port. A task port is an abstraction object to 654 deliver messages reliably between tasks. 655 656 Basic rules: 657 - A task can send messages to another task port (thread safe). 658 - A task can receive messages from an acquired port either by 659 setting up a notification mechanism or using a polling 660 mechanism that may block the task waiting for a message 661 sent on the port. 662 - A port can be acquired by one task only. 663 664 If handler is set to a valid EventHandler object, the port is 665 a send-once port, ie. a message sent to this port generates an 666 ev_msg event notification issued the port's task. If handler 667 is not set, the task can only receive messages on the port by 668 calling port.msg_recv(). 669 """ 670 port = EnginePort(self, handler, autoclose) 671 self._add_port(port) 672 return port
673
674 - def timer(self, fire, handler, interval=-1.0, autoclose=False):
675 """ 676 Create a timer bound to this task that fires at a preset time 677 in the future by invoking the ev_timer() method of `handler' 678 (provided EventHandler object). Timers can fire either only 679 once or repeatedly at fixed time intervals. Repeating timers 680 can also have their next firing time manually adjusted. 681 682 The mandatory parameter `fire' sets the firing delay in seconds. 683 684 The optional parameter `interval' sets the firing interval of 685 the timer. If not specified, the timer fires once and then is 686 automatically invalidated. 687 688 Time values are expressed in second using floating point 689 values. Precision is implementation (and system) dependent. 690 691 The optional parameter `autoclose', if set to True, creates 692 an "autoclosing" timer: it will be automatically invalidated 693 as soon as all other non-autoclosing task's objects (workers, 694 ports, timers) have finished. Default value is False, which 695 means the timer will retain task's runloop until it is 696 invalidated. 697 698 Return a new EngineTimer instance. 699 700 See ClusterShell.Engine.Engine.EngineTimer for more details. 701 """ 702 assert fire >= 0.0, \ 703 "timer's relative fire time must be a positive floating number" 704 705 timer = EngineTimer(fire, interval, autoclose, handler) 706 # The following method may be sent through msg port (async 707 # call) if called from another task. 708 self._add_timer(timer) 709 # always return new timer (sync) 710 return timer
711 712 @tasksyncmethod()
713 - def _add_timer(self, timer):
714 """Add a timer to task engine (thread-safe).""" 715 self._engine.add_timer(timer)
716 717 @tasksyncmethod()
718 - def schedule(self, worker):
719 """ 720 Schedule a worker for execution, ie. add worker in task running 721 loop. Worker will start processing immediately if the task is 722 running (eg. called from an event handler) or as soon as the 723 task is started otherwise. Only useful for manually instantiated 724 workers, for example: 725 726 >>> task = task_self() 727 >>> worker = WorkerSsh("node[2-3]", None, 10, command="/bin/ls") 728 >>> task.schedule(worker) 729 >>> task.resume() 730 """ 731 assert self in Task._tasks.values(), \ 732 "deleted task instance, call task_self() again!" 733 734 # bind worker to task self 735 worker._set_task(self) 736 737 # add worker clients to engine 738 for client in worker._engine_clients(): 739 self._engine.add(client)
740
741 - def _resume_thread(self):
742 """Resume task - called from another thread.""" 743 self._suspend_cond.notify_all()
744
745 - def _resume(self):
746 """Resume task - called from self thread.""" 747 assert self.thread == threading.currentThread() 748 try: 749 try: 750 self._reset() 751 self._run(self.timeout) 752 except EngineTimeoutException: 753 raise TimeoutError() 754 except EngineAbortException, exc: 755 self._terminate(exc.kill) 756 except EngineAlreadyRunningError: 757 raise AlreadyRunningError("task engine is already running") 758 finally: 759 # task becomes joinable 760 self._join_cond.acquire() 761 self._suspend_cond.atomic_inc() 762 self._join_cond.notifyAll() 763 self._join_cond.release()
764
765 - def resume(self, timeout=None):
766 """ 767 Resume task. If task is task_self(), workers are executed in the 768 calling thread so this method will block until all (non-autoclosing) 769 workers have finished. This is always the case for a single-threaded 770 application (eg. which doesn't create other Task() instance than 771 task_self()). Otherwise, the current thread doesn't block. In that 772 case, you may then want to call task_wait() to wait for completion. 773 774 Warning: the timeout parameter can be used to set an hard limit of 775 task execution time (in seconds). In that case, a TimeoutError 776 exception is raised if this delay is reached. Its value is 0 by 777 default, which means no task time limit (TimeoutError is never 778 raised). In order to set a maximum delay for individual command 779 execution, you should use Task.shell()'s timeout parameter instead. 780 """ 781 # If you change options here, check Task.run() compatibility. 782 783 self.timeout = timeout 784 785 self._suspend_cond.atomic_dec() 786 787 if self._is_task_self(): 788 self._resume() 789 else: 790 self._resume_thread()
791
792 - def run(self, command=None, **kwargs):
793 """ 794 With arguments, it will schedule a command exactly like a Task.shell() 795 would have done it and run it. 796 This is the easiest way to simply run a command. 797 798 >>> task.run("hostname", nodes="foo") 799 800 Without argument, it starts all outstanding actions. 801 It behaves like Task.resume(). 802 803 >>> task.shell("hostname", nodes="foo") 804 >>> task.shell("hostname", nodes="bar") 805 >>> task.run() 806 807 When used with a command, you can set a maximum delay of individual 808 command execution with the help of the timeout parameter (see 809 Task.shell's parameters). You can then listen for ev_timeout() events 810 in your Worker event handlers, or use num_timeout() or 811 iter_keys_timeout() afterwards. 812 But, when used as an alias to Task.resume(), the timeout parameter 813 sets an hard limit of task execution time. In that case, a TimeoutError 814 exception is raised if this delay is reached. 815 """ 816 worker = None 817 timeout = None 818 819 # Both resume() and shell() support a 'timeout' parameter. We need a 820 # trick to behave correctly for both cases. 821 # 822 # Here, we mock: task.resume(10) 823 if type(command) in (int, float): 824 timeout = command 825 command = None 826 # Here, we mock: task.resume(timeout=10) 827 elif 'timeout' in kwargs and command is None: 828 timeout = kwargs.pop('timeout') 829 # All other cases mean a classical: shell(...) 830 # we mock: task.shell("mycommand", [timeout=..., ...]) 831 elif command is not None: 832 worker = self.shell(command, **kwargs) 833 834 self.resume(timeout) 835 836 return worker
837 838 @tasksyncmethod()
839 - def _suspend_wait(self):
840 """Suspend request received.""" 841 assert task_self() == self 842 # atomically set suspend state 843 self._suspend_lock.acquire() 844 self._suspended = True 845 self._suspend_lock.release() 846 847 # wait for special suspend condition, while releasing l_run 848 self._suspend_cond.wait_check(self._run_lock) 849 850 # waking up, atomically unset suspend state 851 self._suspend_lock.acquire() 852 self._suspended = False 853 self._suspend_lock.release()
854
855 - def suspend(self):
856 """ 857 Suspend task execution. This method may be called from another 858 task (thread-safe). The function returns False if the task 859 cannot be suspended (eg. it's not running), or returns True if 860 the task has been successfully suspended. 861 To resume a suspended task, use task.resume(). 862 """ 863 # first of all, increase suspend count 864 self._suspend_cond.atomic_inc() 865 866 # call synchronized suspend method 867 self._suspend_wait() 868 869 # wait for stopped task 870 self._run_lock.acquire() # run_lock ownership transfer 871 872 # get result: are we really suspended or just stopped? 873 result = True 874 self._suspend_lock.acquire() 875 if not self._suspended: 876 # not acknowledging suspend state, task is stopped 877 result = False 878 self._run_lock.release() 879 self._suspend_lock.release() 880 return result
881 882 @tasksyncmethod()
883 - def _abort(self, kill=False):
884 """Abort request received.""" 885 assert task_self() == self 886 # raise an EngineAbortException when task is running 887 self._quit = True 888 self._engine.abort(kill)
889
890 - def abort(self, kill=False):
891 """ 892 Abort a task. Aborting a task removes (and stops when needed) 893 all workers. If optional parameter kill is True, the task 894 object is unbound from the current thread, so calling 895 task_self() creates a new Task object. 896 """ 897 if not self._run_lock.acquire(0): 898 # self._run_lock is locked, try to call synchronized method 899 self._abort(kill) 900 # but there is no guarantee that it has really been called, as the 901 # task could have aborted during the same time, so we use polling 902 while not self._run_lock.acquire(0): 903 sleep(0.001) 904 # in any case, once _run_lock has been acquired, confirm abort 905 self._quit = True 906 self._run_lock.release() 907 if self._is_task_self(): 908 self._terminate(kill) 909 else: 910 # abort on stopped/suspended task 911 self._suspend_cond.notify_all()
912
913 - def _terminate(self, kill):
914 """ 915 Abort completion subroutine. 916 """ 917 assert self._quit == True 918 self._terminated = True 919 920 if kill: 921 # invalidate dispatch port 922 self._dispatch_port = None 923 # clear engine 924 self._engine.clear(clear_ports=kill) 925 if kill: 926 self._engine.release() 927 self._engine = None 928 929 # clear result objects 930 self._reset() 931 932 # unlock any remaining threads that are waiting for our 933 # termination (late join()s) 934 # must be called after _terminated is set to True 935 self._join_cond.acquire() 936 self._join_cond.notifyAll() 937 self._join_cond.release() 938 939 # destroy task if needed 940 if kill: 941 Task._task_lock.acquire() 942 try: 943 del Task._tasks[threading.currentThread()] 944 finally: 945 Task._task_lock.release()
946
947 - def join(self):
948 """ 949 Suspend execution of the calling thread until the target task 950 terminates, unless the target task has already terminated. 951 """ 952 self._join_cond.acquire() 953 try: 954 if self._suspend_cond.suspend_count > 0 and not self._suspended: 955 # ignore stopped task 956 return 957 if self._terminated: 958 # ignore join() on dead task 959 return 960 self._join_cond.wait() 961 finally: 962 self._join_cond.release()
963
964 - def running(self):
965 """ 966 Return True if the task is running. 967 """ 968 return self._engine and self._engine.running
969
970 - def _reset(self):
971 """ 972 Reset buffers and retcodes management variables. 973 """ 974 # reinit MsgTree dict 975 self._msgtrees = {} 976 # other re-init's 977 self._d_source_rc = {} 978 self._d_rc_sources = {} 979 self._max_rc = None 980 self._timeout_sources.clear()
981
982 - def _msgtree(self, sname, strict=True):
983 """Helper method to return msgtree instance by sname if allowed.""" 984 if self.default("%s_msgtree" % sname): 985 if sname not in self._msgtrees: 986 self._msgtrees[sname] = MsgTree() 987 return self._msgtrees[sname] 988 elif strict: 989 raise TaskMsgTreeError("%s_msgtree not set" % sname)
990
991 - def _msg_add(self, worker, node, sname, msg):
992 """ 993 Process a new message into Task's MsgTree that is coming from: 994 - a worker instance of this task 995 - a node 996 - a stream name sname (string identifier) 997 """ 998 assert worker.task == self, "better to add messages from my workers" 999 msgtree = self._msgtree(sname, strict=False) 1000 # As strict=False, if msgtree is None, this means task is set to NOT 1001 # record messages... in that case we ignore this request, still 1002 # keeping possible existing MsgTree, thus allowing temporarily 1003 # disabled ones. 1004 if msgtree is not None: 1005 msgtree.add((worker, node), msg)
1006
1007 - def _rc_set(self, worker, node, rc):
1008 """ 1009 Add a worker return code (rc) that is coming from a node of a 1010 worker instance. 1011 """ 1012 source = (worker, node) 1013 1014 # store rc by source 1015 self._d_source_rc[source] = rc 1016 1017 # store source by rc 1018 self._d_rc_sources.setdefault(rc, set()).add(source) 1019 1020 # update max rc 1021 if self._max_rc is None or rc > self._max_rc: 1022 self._max_rc = rc
1023
1024 - def _timeout_add(self, worker, node):
1025 """ 1026 Add a timeout indicator that is coming from a node of a worker 1027 instance. 1028 """ 1029 # store source in timeout set 1030 self._timeout_sources.add((worker, node))
1031
1032 - def _msg_by_source(self, worker, node, sname):
1033 """Get a message by its worker instance, node and stream name.""" 1034 msg = self._msgtree(sname).get((worker, node)) 1035 if msg is None: 1036 return None 1037 return str(msg)
1038
1039 - def _call_tree_matcher(self, tree_match_func, match_keys=None, worker=None):
1040 """Call identified tree matcher (items, walk) method with options.""" 1041 if isinstance(match_keys, basestring): # change to str for Python 3 1042 raise TypeError("Sequence of keys/nodes expected for 'match_keys'.") 1043 # filter by worker and optionally by matching keys 1044 if worker and match_keys is None: 1045 match = lambda k: k[0] is worker 1046 elif worker and match_keys is not None: 1047 match = lambda k: k[0] is worker and k[1] in match_keys 1048 elif match_keys: 1049 match = lambda k: k[1] in match_keys 1050 else: 1051 match = None 1052 # Call tree matcher function (items or walk) 1053 return tree_match_func(match, itemgetter(1))
1054
1055 - def _rc_by_source(self, worker, node):
1056 """Get a return code by worker instance and node.""" 1057 return self._d_source_rc[(worker, node)]
1058
1059 - def _rc_iter_by_key(self, key):
1060 """ 1061 Return an iterator over return codes for the given key. 1062 """ 1063 for (w, k), rc in self._d_source_rc.iteritems(): 1064 if k == key: 1065 yield rc
1066
1067 - def _rc_iter_by_worker(self, worker, match_keys=None):
1068 """ 1069 Return an iterator over return codes and keys list for a 1070 specific worker and optional matching keys. 1071 """ 1072 if match_keys: 1073 # Use the items iterator for the underlying dict. 1074 for rc, src in self._d_rc_sources.iteritems(): 1075 keys = [t[1] for t in src if t[0] is worker and \ 1076 t[1] in match_keys] 1077 if len(keys) > 0: 1078 yield rc, keys 1079 else: 1080 for rc, src in self._d_rc_sources.iteritems(): 1081 keys = [t[1] for t in src if t[0] is worker] 1082 if len(keys) > 0: 1083 yield rc, keys
1084
1085 - def _krc_iter_by_worker(self, worker):
1086 """ 1087 Return an iterator over key, rc for a specific worker. 1088 """ 1089 for rc, src in self._d_rc_sources.iteritems(): 1090 for w, k in src: 1091 if w is worker: 1092 yield k, rc
1093
1094 - def _num_timeout_by_worker(self, worker):
1095 """ 1096 Return the number of timed out "keys" for a specific worker. 1097 """ 1098 cnt = 0 1099 for (w, k) in self._timeout_sources: 1100 if w is worker: 1101 cnt += 1 1102 return cnt
1103
1104 - def _iter_keys_timeout_by_worker(self, worker):
1105 """ 1106 Iterate over timed out keys (ie. nodes) for a specific worker. 1107 """ 1108 for (w, k) in self._timeout_sources: 1109 if w is worker: 1110 yield k
1111
1112 - def _flush_buffers_by_worker(self, worker):
1113 """ 1114 Remove any messages from specified worker. 1115 """ 1116 msgtree = self._msgtree('stdout', strict=False) 1117 if msgtree is not None: 1118 msgtree.remove(lambda k: k[0] == worker)
1119
1120 - def _flush_errors_by_worker(self, worker):
1121 """ 1122 Remove any error messages from specified worker. 1123 """ 1124 errtree = self._msgtree('stderr', strict=False) 1125 if errtree is not None: 1126 errtree.remove(lambda k: k[0] == worker)
1127
1128 - def key_buffer(self, key):
1129 """ 1130 Get buffer for a specific key. When the key is associated 1131 to multiple workers, the resulting buffer will contain 1132 all workers content that may overlap. This method returns an 1133 empty buffer if key is not found in any workers. 1134 """ 1135 msgtree = self._msgtree('stdout') 1136 select_key = lambda k: k[1] == key 1137 return "".join(imap(str, msgtree.messages(select_key)))
1138 1139 node_buffer = key_buffer 1140
1141 - def key_error(self, key):
1142 """ 1143 Get error buffer for a specific key. When the key is associated 1144 to multiple workers, the resulting buffer will contain all 1145 workers content that may overlap. This method returns an empty 1146 error buffer if key is not found in any workers. 1147 """ 1148 errtree = self._msgtree('stderr') 1149 select_key = lambda k: k[1] == key 1150 return "".join(imap(str, errtree.messages(select_key)))
1151 1152 node_error = key_error 1153
1154 - def key_retcode(self, key):
1155 """ 1156 Return return code for a specific key. When the key is 1157 associated to multiple workers, return the max return 1158 code from these workers. Raises a KeyError if key is not found 1159 in any finished workers. 1160 """ 1161 codes = list(self._rc_iter_by_key(key)) 1162 if not codes: 1163 raise KeyError(key) 1164 return max(codes)
1165 1166 node_retcode = key_retcode 1167
1168 - def max_retcode(self):
1169 """ 1170 Get max return code encountered during last run 1171 or None in the following cases: 1172 - all commands timed out, 1173 - no command was executed. 1174 1175 How retcodes work 1176 ================= 1177 If the process exits normally, the return code is its exit 1178 status. If the process is terminated by a signal, the return 1179 code is 128 + signal number. 1180 """ 1181 return self._max_rc
1182
1183 - def _iter_msgtree(self, sname, match_keys=None):
1184 """Helper method to iterate over recorded buffers by sname.""" 1185 try: 1186 msgtree = self._msgtrees[sname] 1187 return self._call_tree_matcher(msgtree.walk, match_keys) 1188 except KeyError: 1189 if not self.default("%s_msgtree" % sname): 1190 raise TaskMsgTreeError("%s_msgtree not set" % sname) 1191 return iter([])
1192
1193 - def iter_buffers(self, match_keys=None):
1194 """ 1195 Iterate over buffers, returns a tuple (buffer, keys). For remote 1196 workers (Ssh), keys are list of nodes. In that case, you should use 1197 NodeSet.fromlist(keys) to get a NodeSet instance (which is more 1198 convenient and efficient): 1199 1200 Optional parameter match_keys add filtering on these keys. 1201 1202 Usage example: 1203 1204 >>> for buffer, nodelist in task.iter_buffers(): 1205 ... print NodeSet.fromlist(nodelist) 1206 ... print buffer 1207 """ 1208 return self._iter_msgtree('stdout', match_keys)
1209
1210 - def iter_errors(self, match_keys=None):
1211 """ 1212 Iterate over error buffers, returns a tuple (buffer, keys). 1213 1214 See iter_buffers(). 1215 """ 1216 return self._iter_msgtree('stderr', match_keys)
1217
1218 - def iter_retcodes(self, match_keys=None):
1219 """ 1220 Iterate over return codes, returns a tuple (rc, keys). 1221 1222 Optional parameter match_keys add filtering on these keys. 1223 1224 How retcodes work 1225 ================= 1226 If the process exits normally, the return code is its exit 1227 status. If the process is terminated by a signal, the return 1228 code is 128 + signal number. 1229 """ 1230 if match_keys: 1231 # Use the items iterator for the underlying dict. 1232 for rc, src in self._d_rc_sources.iteritems(): 1233 keys = [t[1] for t in src if t[1] in match_keys] 1234 yield rc, keys 1235 else: 1236 for rc, src in self._d_rc_sources.iteritems(): 1237 yield rc, [t[1] for t in src]
1238
1239 - def num_timeout(self):
1240 """ 1241 Return the number of timed out "keys" (ie. nodes). 1242 """ 1243 return len(self._timeout_sources)
1244
1245 - def iter_keys_timeout(self):
1246 """ 1247 Iterate over timed out keys (ie. nodes). 1248 """ 1249 for (w, k) in self._timeout_sources: 1250 yield k
1251
1252 - def flush_buffers(self):
1253 """ 1254 Flush all task messages (from all task workers). 1255 """ 1256 msgtree = self._msgtree('stdout', strict=False) 1257 if msgtree is not None: 1258 msgtree.clear()
1259
1260 - def flush_errors(self):
1261 """ 1262 Flush all task error messages (from all task workers). 1263 """ 1264 errtree = self._msgtree('stderr', strict=False) 1265 if errtree is not None: 1266 errtree.clear()
1267 1268 @classmethod
1269 - def wait(cls, from_thread):
1270 """ 1271 Class method that blocks calling thread until all tasks have 1272 finished (from a ClusterShell point of view, for instance, 1273 their task.resume() return). It doesn't necessarly mean that 1274 associated threads have finished. 1275 """ 1276 Task._task_lock.acquire() 1277 try: 1278 tasks = Task._tasks.copy() 1279 finally: 1280 Task._task_lock.release() 1281 for thread, task in tasks.iteritems(): 1282 if thread != from_thread: 1283 task.join()
1284
1285 - def _pchannel(self, gateway, metaworker):
1286 """Get propagation channel for gateway (create one if needed). 1287 1288 Use self.gateways dictionary that allows lookup like: 1289 gateway => (worker channel, set of metaworkers) 1290 """ 1291 # create gateway channel if needed 1292 if gateway not in self.gateways: 1293 chan = PropagationChannel(self, gateway) 1294 logger = logging.getLogger(__name__) 1295 logger.info("pchannel: creating new channel %s", chan) 1296 # invoke gateway 1297 timeout = None # FIXME: handle timeout for gateway channels 1298 wrkcls = self.default('distant_worker') 1299 chanworker = wrkcls(gateway, command=metaworker.invoke_gateway, 1300 handler=chan, stderr=True, timeout=timeout) 1301 # gateway is special! define worker._fanout to not rely on the 1302 # engine's fanout, and use the special value FANOUT_UNLIMITED to 1303 # always allow registration of gateways 1304 chanworker._fanout = FANOUT_UNLIMITED 1305 # change default stream names to avoid internal task buffering 1306 # and conform with channel stream names 1307 chanworker.SNAME_STDIN = chan.SNAME_WRITER 1308 chanworker.SNAME_STDOUT = chan.SNAME_READER 1309 chanworker.SNAME_STDERR = chan.SNAME_ERROR 1310 self.schedule(chanworker) 1311 # update gateways dict 1312 self.gateways[gateway] = (chanworker, set([metaworker])) 1313 else: 1314 # TODO: assert chanworker is running (need Worker.running()) 1315 chanworker, metaworkers = self.gateways[gateway] 1316 metaworkers.add(metaworker) 1317 return chanworker.eh
1318
1319 - def _pchannel_release(self, gateway, metaworker):
1320 """Release propagation channel associated to gateway. 1321 1322 Lookup by gateway, decref associated metaworker set and release 1323 channel worker if needed. 1324 """ 1325 logger = logging.getLogger(__name__) 1326 logger.debug("pchannel_release %s %s", gateway, metaworker) 1327 1328 if gateway not in self.gateways: 1329 logger.error("pchannel_release: no pchannel found for gateway %s", 1330 gateway) 1331 else: 1332 # TODO: delay gateway closing when other gateways are running 1333 chanworker, metaworkers = self.gateways[gateway] 1334 metaworkers.remove(metaworker) 1335 if len(metaworkers) == 0: 1336 logger.info("pchannel_release: destroying channel %s", 1337 chanworker.eh) 1338 chanworker.abort() 1339 # delete gateway reference 1340 del self.gateways[gateway]
1341
1342 1343 -def task_self(defaults=None):
1344 """ 1345 Return the current Task object, corresponding to the caller's thread of 1346 control (a Task object is always bound to a specific thread). This function 1347 provided as a convenience is available in the top-level ClusterShell.Task 1348 package namespace. 1349 """ 1350 return Task(thread=threading.currentThread(), defaults=defaults)
1351
1352 -def task_wait():
1353 """ 1354 Suspend execution of the calling thread until all tasks terminate, unless 1355 all tasks have already terminated. This function is provided as a 1356 convenience and is available in the top-level ClusterShell.Task package 1357 namespace. 1358 """ 1359 Task.wait(threading.currentThread())
1360
1361 -def task_terminate():
1362 """ 1363 Destroy the Task instance bound to the current thread. A next call to 1364 task_self() will create a new Task object. Not to be called from a signal 1365 handler. This function provided as a convenience is available in the 1366 top-level ClusterShell.Task package namespace. 1367 """ 1368 task_self().abort(kill=True)
1369
1370 -def task_cleanup():
1371 """ 1372 Cleanup routine to destroy all created tasks. This function provided as a 1373 convenience is available in the top-level ClusterShell.Task package 1374 namespace. This is mainly used for testing purposes and should be avoided 1375 otherwise. task_cleanup() may be called from any threads but not from a 1376 signal handler. 1377 """ 1378 # be sure to return to a clean state (no task at all) 1379 while True: 1380 Task._task_lock.acquire() 1381 try: 1382 tasks = Task._tasks.copy() 1383 if len(tasks) == 0: 1384 break 1385 finally: 1386 Task._task_lock.release() 1387 # send abort to all known tasks (it's needed to retry as we may have 1388 # missed the engine notification window (it was just exiting, which is 1389 # quite a common case if we didn't task_join() previously), or we may 1390 # have lost some task's dispatcher port messages. 1391 for task in tasks.itervalues(): 1392 task.abort(kill=True) 1393 # also, for other task than self, task.abort() is async and performed 1394 # through an EngineAbortException, so tell the Python scheduler to give 1395 # up control to raise this exception (handled by task._terminate())... 1396 sleep(0.001)
1397