Package ClusterShell :: Package CLI :: Module Clush
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.CLI.Clush

   1  #!/usr/bin/env python 
   2  # 
   3  # Copyright (C) 2007-2016 CEA/DAM 
   4  # Copyright (C) 2015-2016 Stephane Thiell <sthiell@stanford.edu> 
   5  # 
   6  # This file is part of ClusterShell. 
   7  # 
   8  # ClusterShell is free software; you can redistribute it and/or 
   9  # modify it under the terms of the GNU Lesser General Public 
  10  # License as published by the Free Software Foundation; either 
  11  # version 2.1 of the License, or (at your option) any later version. 
  12  # 
  13  # ClusterShell is distributed in the hope that it will be useful, 
  14  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
  15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  16  # Lesser General Public License for more details. 
  17  # 
  18  # You should have received a copy of the GNU Lesser General Public 
  19  # License along with ClusterShell; if not, write to the Free Software 
  20  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 
  21   
  22  """ 
  23  Execute cluster commands in parallel 
  24   
  25  clush is an utility program to run commands on a cluster which benefits 
  26  from the ClusterShell library and its Ssh worker. It features an 
  27  integrated output results gathering system (dshbak-like), can get node 
  28  groups by running predefined external commands and can redirect lines 
  29  read on its standard input to the remote commands. 
  30   
  31  When no command are specified, clush runs interactively. 
  32   
  33  """ 
  34   
  35  import errno 
  36  import logging 
  37  import os 
  38  from os.path import abspath, dirname, exists, isdir, join 
  39  import resource 
  40  import sys 
  41  import signal 
  42  import time 
  43  import threading 
  44  import random 
  45   
  46  from ClusterShell.Defaults import DEFAULTS, _load_workerclass 
  47  from ClusterShell.CLI.Config import ClushConfig, ClushConfigError 
  48  from ClusterShell.CLI.Display import Display 
  49  from ClusterShell.CLI.Display import VERB_QUIET, VERB_STD, VERB_VERB, VERB_DEBUG 
  50  from ClusterShell.CLI.OptionParser import OptionParser 
  51  from ClusterShell.CLI.Error import GENERIC_ERRORS, handle_generic_error 
  52  from ClusterShell.CLI.Utils import NodeSet, bufnodeset_cmp, human_bi_bytes_unit 
  53   
  54  from ClusterShell.Event import EventHandler 
  55  from ClusterShell.MsgTree import MsgTree 
  56  from ClusterShell.NodeSet import RESOLVER_NOGROUP, std_group_resolver 
  57  from ClusterShell.NodeSet import NodeSetParseError 
  58  from ClusterShell.Task import Task, task_self 
  59   
  60   
61 -class UpdatePromptException(Exception):
62 """Exception used by the signal handler"""
63
64 -class StdInputHandler(EventHandler):
65 """Standard input event handler class."""
66 - def __init__(self, worker):
67 EventHandler.__init__(self) 68 self.master_worker = worker
69
70 - def ev_msg(self, port, msg):
71 """invoked when a message is received from port object""" 72 if not msg: 73 self.master_worker.set_write_eof() 74 return 75 # Forward messages to master worker 76 self.master_worker.write(msg)
77
78 -class OutputHandler(EventHandler):
79 """Base class for clush output handlers.""" 80
81 - def __init__(self):
82 EventHandler.__init__(self) 83 self._runtimer = None
84
85 - def runtimer_init(self, task, ntotal=0):
86 """Init timer for live command-completed progressmeter.""" 87 thandler = RunTimer(task, ntotal) 88 self._runtimer = task.timer(1.33, thandler, interval=1./3., 89 autoclose=True)
90
91 - def _runtimer_clean(self):
92 """Hide runtimer counter""" 93 if self._runtimer: 94 self._runtimer.eh.erase_line()
95
96 - def _runtimer_set_dirty(self):
97 """Force redisplay of counter""" 98 if self._runtimer: 99 self._runtimer.eh.set_dirty()
100
101 - def _runtimer_finalize(self, worker):
102 """Finalize display of runtimer counter""" 103 if self._runtimer: 104 self._runtimer.eh.finalize(worker.task.default("USER_interactive")) 105 self._runtimer.invalidate() 106 self._runtimer = None
107
108 - def update_prompt(self, worker):
109 """ 110 If needed, notify main thread to update its prompt by sending 111 a SIGUSR1 signal. We use task-specific user-defined variable 112 to record current states (prefixed by USER_). 113 """ 114 worker.task.set_default("USER_running", False) 115 if worker.task.default("USER_handle_SIGUSR1"): 116 os.kill(os.getpid(), signal.SIGUSR1)
117
118 - def ev_start(self, worker):
119 """Worker is starting.""" 120 if self._runtimer: 121 self._runtimer.eh.start_time = time.time()
122
123 - def ev_written(self, worker, node, sname, size):
124 """Bytes written on worker""" 125 if self._runtimer: 126 self._runtimer.eh.bytes_written += size
127
128 -class DirectOutputHandler(OutputHandler):
129 """Direct output event handler class.""" 130
131 - def __init__(self, display):
132 OutputHandler.__init__(self) 133 self._display = display
134
135 - def ev_read(self, worker):
136 node = worker.current_node or worker.key 137 self._display.print_line(node, worker.current_msg)
138
139 - def ev_error(self, worker):
140 node = worker.current_node or worker.key 141 self._display.print_line_error(node, worker.current_errmsg)
142
143 - def ev_hup(self, worker):
144 node = worker.current_node or worker.key 145 rc = worker.current_rc 146 if rc > 0: 147 verb = VERB_QUIET 148 if self._display.maxrc: 149 verb = VERB_STD 150 self._display.vprint_err(verb, \ 151 "clush: %s: exited with exit code %d" % (node, rc))
152
153 - def ev_timeout(self, worker):
154 self._display.vprint_err(VERB_QUIET, "clush: %s: command timeout" % \ 155 NodeSet._fromlist1(worker.iter_keys_timeout()))
156
157 - def ev_close(self, worker):
158 self.update_prompt(worker)
159
160 -class DirectProgressOutputHandler(DirectOutputHandler):
161 """Direct output event handler class with progress support.""" 162 163 # NOTE: This class is very similar to DirectOutputHandler, thus it could 164 # first look overkill, but merging both is slightly impacting ev_read 165 # performance of current DirectOutputHandler. 166
167 - def ev_read(self, worker):
168 self._runtimer_clean() 169 # it is ~10% faster to avoid calling super here 170 node = worker.current_node or worker.key 171 self._display.print_line(node, worker.current_msg)
172
173 - def ev_error(self, worker):
174 self._runtimer_clean() 175 node = worker.current_node or worker.key 176 self._display.print_line_error(node, worker.current_errmsg)
177
178 - def ev_close(self, worker):
179 self._runtimer_clean() 180 DirectOutputHandler.ev_close(self, worker)
181
182 -class CopyOutputHandler(DirectProgressOutputHandler):
183 """Copy output event handler."""
184 - def __init__(self, display, reverse=False):
185 DirectOutputHandler.__init__(self, display) 186 self.reverse = reverse
187
188 - def ev_close(self, worker):
189 """A copy worker has finished.""" 190 for rc, nodes in worker.iter_retcodes(): 191 if rc == 0: 192 if self.reverse: 193 self._display.vprint(VERB_VERB, "%s:`%s' -> `%s'" % \ 194 (nodes, worker.source, worker.dest)) 195 else: 196 self._display.vprint(VERB_VERB, "`%s' -> %s:`%s'" % \ 197 (worker.source, nodes, worker.dest)) 198 break 199 # multiple copy workers may be running (handled by this task's thread) 200 copies = worker.task.default("USER_copies") - 1 201 worker.task.set_default("USER_copies", copies) 202 if copies == 0: 203 self._runtimer_finalize(worker) 204 self.update_prompt(worker)
205
206 -class GatherOutputHandler(OutputHandler):
207 """Gathered output event handler class (clush -b).""" 208
209 - def __init__(self, display):
210 OutputHandler.__init__(self) 211 self._display = display
212
213 - def ev_read(self, worker):
214 if self._display.verbosity == VERB_VERB: 215 node = worker.current_node or worker.key 216 self._display.print_line(node, worker.current_msg)
217
218 - def ev_error(self, worker):
219 self._runtimer_clean() 220 self._display.print_line_error(worker.current_node, 221 worker.current_errmsg) 222 self._runtimer_set_dirty()
223
224 - def ev_close(self, worker):
225 # Worker is closing -- it's time to gather results... 226 self._runtimer_finalize(worker) 227 # Display command output, try to order buffers by rc 228 nodesetify = lambda v: (v[0], NodeSet._fromlist1(v[1])) 229 cleaned = False 230 for _rc, nodelist in sorted(worker.iter_retcodes()): 231 ns_remain = NodeSet._fromlist1(nodelist) 232 # Then order by node/nodeset (see bufnodeset_cmp) 233 for buf, nodeset in sorted(map(nodesetify, 234 worker.iter_buffers(nodelist)), 235 cmp=bufnodeset_cmp): 236 if not cleaned: 237 # clean runtimer line before printing first result 238 self._runtimer_clean() 239 cleaned = True 240 self._display.print_gather(nodeset, buf) 241 ns_remain.difference_update(nodeset) 242 if ns_remain: 243 self._display.print_gather_finalize(ns_remain) 244 self._display.flush() 245 246 self._close_common(worker) 247 248 # Notify main thread to update its prompt 249 self.update_prompt(worker)
250
251 - def _close_common(self, worker):
252 verbexit = VERB_QUIET 253 if self._display.maxrc: 254 verbexit = VERB_STD 255 # Display return code if not ok ( != 0) 256 for rc, nodelist in worker.iter_retcodes(): 257 if rc != 0: 258 nsdisp = ns = NodeSet._fromlist1(nodelist) 259 if self._display.verbosity > VERB_QUIET and len(ns) > 1: 260 nsdisp = "%s (%d)" % (ns, len(ns)) 261 msgrc = "clush: %s: exited with exit code %d" % (nsdisp, rc) 262 self._display.vprint_err(verbexit, msgrc) 263 264 # Display nodes that didn't answer within command timeout delay 265 if worker.num_timeout() > 0: 266 self._display.vprint_err(verbexit, "clush: %s: command timeout" % \ 267 NodeSet._fromlist1(worker.iter_keys_timeout()))
268
269 -class SortedOutputHandler(GatherOutputHandler):
270 """Sorted by node output event handler class (clush -L).""" 271
272 - def ev_close(self, worker):
273 # Overrides GatherOutputHandler.ev_close() 274 self._runtimer_finalize(worker) 275 276 # Display command output, try to order buffers by rc 277 for _rc, nodelist in sorted(worker.iter_retcodes()): 278 for node in nodelist: 279 # NOTE: msg should be a MsgTreeElem as Display will iterate 280 # over it to display multiple lines. As worker.node_buffer() 281 # returns either a string or None if there is no output, it 282 # cannot be used here. We use worker.iter_node_buffers() with 283 # a single node as match_keys instead. 284 for node, msg in worker.iter_node_buffers(match_keys=(node,)): 285 self._display.print_gather(node, msg) 286 287 self._close_common(worker) 288 289 # Notify main thread to update its prompt 290 self.update_prompt(worker)
291
292 -class LiveGatherOutputHandler(GatherOutputHandler):
293 """Live line-gathered output event handler class (-bL).""" 294
295 - def __init__(self, display, nodes):
296 assert nodes is not None, "cannot gather local command" 297 GatherOutputHandler.__init__(self, display) 298 self._nodes = NodeSet(nodes) 299 self._nodecnt = dict.fromkeys(self._nodes, 0) 300 self._mtreeq = [] 301 self._offload = 0
302
303 - def ev_read(self, worker):
304 # Read new line from node 305 node = worker.current_node 306 self._nodecnt[node] += 1 307 cnt = self._nodecnt[node] 308 if len(self._mtreeq) < cnt: 309 self._mtreeq.append(MsgTree()) 310 self._mtreeq[cnt - self._offload - 1].add(node, worker.current_msg) 311 self._live_line(worker)
312
313 - def ev_hup(self, worker):
314 if self._mtreeq and worker.current_node not in self._mtreeq[0]: 315 # forget a node that doesn't answer to continue live line 316 # gathering anyway 317 self._nodes.remove(worker.current_node) 318 self._live_line(worker)
319
320 - def _live_line(self, worker):
321 # if all nodes have replied, display gathered line 322 while self._mtreeq and len(self._mtreeq[0]) == len(self._nodes): 323 mtree = self._mtreeq.pop(0) 324 self._offload += 1 325 self._runtimer_clean() 326 nodesetify = lambda v: (v[0], NodeSet.fromlist(v[1])) 327 for buf, nodeset in sorted(map(nodesetify, mtree.walk()), 328 cmp=bufnodeset_cmp): 329 self._display.print_gather(nodeset, buf) 330 self._runtimer_set_dirty()
331
332 - def ev_close(self, worker):
333 # Worker is closing -- it's time to gather results... 334 self._runtimer_finalize(worker) 335 336 for mtree in self._mtreeq: 337 nodesetify = lambda v: (v[0], NodeSet.fromlist(v[1])) 338 for buf, nodeset in sorted(map(nodesetify, mtree.walk()), 339 cmp=bufnodeset_cmp): 340 self._display.print_gather(nodeset, buf) 341 342 self._close_common(worker) 343 344 # Notify main thread to update its prompt 345 self.update_prompt(worker)
346
347 -class RunTimer(EventHandler):
348 """Running progress timer event handler"""
349 - def __init__(self, task, total):
350 EventHandler.__init__(self) 351 self.task = task 352 self.total = total 353 self.cnt_last = -1 354 self.tslen = len(str(self.total)) 355 self.wholelen = 0 356 self.started = False 357 # updated by worker handler for progress 358 self.start_time = 0 359 self.bytes_written = 0
360
361 - def ev_timer(self, timer):
362 self.update()
363
364 - def set_dirty(self):
365 self.cnt_last = -1
366
367 - def erase_line(self):
368 if self.wholelen: 369 sys.stderr.write(' ' * self.wholelen + '\r') 370 self.wholelen = 0
371
372 - def update(self):
373 """Update runtime progress info""" 374 wrbwinfo = '' 375 if self.bytes_written > 0: 376 bandwidth = self.bytes_written/(time.time() - self.start_time) 377 wrbwinfo = " write: %s/s" % human_bi_bytes_unit(bandwidth) 378 379 gws = self.task.gateways.keys() 380 if gws: 381 # tree mode 382 act_targets = NodeSet() 383 for gw, (chan, metaworkers) in self.task.gateways.iteritems(): 384 act_targets.updaten(mw.gwtargets[gw] for mw in metaworkers) 385 cnt = len(act_targets) + len(self.task._engine.clients()) - len(gws) 386 gwinfo = ' gw %d' % len(gws) 387 else: 388 cnt = len(self.task._engine.clients()) 389 gwinfo = '' 390 if self.bytes_written > 0 or cnt != self.cnt_last: 391 self.cnt_last = cnt 392 # display completed/total clients 393 towrite = 'clush: %*d/%*d%s%s\r' % (self.tslen, self.total - cnt, 394 self.tslen, self.total, gwinfo, 395 wrbwinfo) 396 self.wholelen = len(towrite) 397 sys.stderr.write(towrite) 398 self.started = True
399
400 - def finalize(self, force_cr):
401 """finalize display of runtimer""" 402 if not self.started: 403 return 404 self.erase_line() 405 # display completed/total clients 406 fmt = 'clush: %*d/%*d' 407 if force_cr: 408 fmt += '\n' 409 else: 410 fmt += '\r' 411 sys.stderr.write(fmt % (self.tslen, self.total, self.tslen, self.total))
412 413
414 -def signal_handler(signum, frame):
415 """Signal handler used for main thread notification""" 416 if signum == signal.SIGUSR1: 417 signal.signal(signal.SIGUSR1, signal.SIG_IGN) 418 raise UpdatePromptException()
419
420 -def get_history_file():
421 """Turn the history file path""" 422 return join(os.environ["HOME"], ".clush_history")
423
424 -def readline_setup():
425 """ 426 Configure readline to automatically load and save a history file 427 named .clush_history 428 """ 429 import readline 430 readline.parse_and_bind("tab: complete") 431 readline.set_completer_delims("") 432 try: 433 readline.read_history_file(get_history_file()) 434 except IOError: 435 pass
436
437 -def ttyloop(task, nodeset, timeout, display, remote):
438 """Manage the interactive prompt to run command""" 439 readline_avail = False 440 interactive = task.default("USER_interactive") 441 if interactive: 442 try: 443 import readline 444 readline_setup() 445 readline_avail = True 446 except ImportError: 447 pass 448 display.vprint(VERB_STD, \ 449 "Enter 'quit' to leave this interactive mode") 450 451 rc = 0 452 ns = NodeSet(nodeset) 453 ns_info = True 454 cmd = "" 455 while task.default("USER_running") or \ 456 (interactive and cmd.lower() != 'quit'): 457 try: 458 # Set SIGUSR1 handler if needed 459 if task.default("USER_handle_SIGUSR1"): 460 signal.signal(signal.SIGUSR1, signal_handler) 461 462 if task.default("USER_interactive") and \ 463 not task.default("USER_running"): 464 if ns_info: 465 display.vprint(VERB_QUIET, \ 466 "Working with nodes: %s" % ns) 467 ns_info = False 468 prompt = "clush> " 469 else: 470 prompt = "" 471 try: 472 cmd = raw_input(prompt) 473 assert cmd is not None, "Result of raw_input() is None!" 474 finally: 475 signal.signal(signal.SIGUSR1, signal.SIG_IGN) 476 except EOFError: 477 print 478 return 479 except UpdatePromptException: 480 if task.default("USER_interactive"): 481 continue 482 return 483 except KeyboardInterrupt, kbe: 484 # Caught SIGINT here (main thread) but the signal will also reach 485 # subprocesses (that will most likely kill them) 486 if display.gather: 487 # Suspend task, so we can safely access its data from here 488 task.suspend() 489 490 # If USER_running is not set, the task had time to finish, 491 # that could mean all subprocesses have been killed and all 492 # handlers have been processed. 493 if not task.default("USER_running"): 494 # let's clush_excepthook handle the rest 495 raise kbe 496 497 # If USER_running is set, the task didn't have time to finish 498 # its work, so we must print something for the user... 499 print_warn = False 500 501 # Display command output, but cannot order buffers by rc 502 nodesetify = lambda v: (v[0], NodeSet._fromlist1(v[1])) 503 for buf, nodeset in sorted(map(nodesetify, task.iter_buffers()), 504 cmp=bufnodeset_cmp): 505 if not print_warn: 506 print_warn = True 507 display.vprint_err(VERB_STD, \ 508 "Warning: Caught keyboard interrupt!") 509 display.print_gather(nodeset, buf) 510 511 # Return code handling 512 verbexit = VERB_QUIET 513 if display.maxrc: 514 verbexit = VERB_STD 515 ns_ok = NodeSet() 516 for rc, nodelist in task.iter_retcodes(): 517 ns_ok.add(NodeSet._fromlist1(nodelist)) 518 if rc != 0: 519 # Display return code if not ok ( != 0) 520 nsdisp = ns = NodeSet._fromlist1(nodelist) 521 if display.verbosity >= VERB_QUIET and len(ns) > 1: 522 nsdisp = "%s (%d)" % (ns, len(ns)) 523 msgrc = "clush: %s: exited with exit code %d" % (nsdisp, 524 rc) 525 display.vprint_err(verbexit, msgrc) 526 527 # Add uncompleted nodeset to exception object 528 kbe.uncompleted_nodes = ns - ns_ok 529 530 # Display nodes that didn't answer within command timeout delay 531 if task.num_timeout() > 0: 532 display.vprint_err(verbexit, \ 533 "clush: %s: command timeout" % \ 534 NodeSet._fromlist1(task.iter_keys_timeout())) 535 raise kbe 536 537 if task.default("USER_running"): 538 ns_reg, ns_unreg = NodeSet(), NodeSet() 539 for client in task._engine.clients(): 540 if client.registered: 541 ns_reg.add(client.key) 542 else: 543 ns_unreg.add(client.key) 544 if ns_unreg: 545 pending = "\nclush: pending(%d): %s" % (len(ns_unreg), ns_unreg) 546 else: 547 pending = "" 548 display.vprint_err(VERB_QUIET, 549 "clush: interrupt (^C to abort task)") 550 gws = task.gateways.keys() 551 if not gws: 552 display.vprint_err(VERB_QUIET, 553 "clush: in progress(%d): %s%s" 554 % (len(ns_reg), ns_reg, pending)) 555 else: 556 display.vprint_err(VERB_QUIET, 557 "clush: in progress(%d): %s%s\n" 558 "clush: [tree] open gateways(%d): %s" 559 % (len(ns_reg), ns_reg, pending, 560 len(gws), NodeSet._fromlist1(gws))) 561 for gw, (chan, metaworkers) in task.gateways.iteritems(): 562 act_targets = NodeSet.fromlist(mw.gwtargets[gw] 563 for mw in metaworkers) 564 if act_targets: 565 display.vprint_err(VERB_QUIET, 566 "clush: [tree] in progress(%d) on %s: %s" 567 % (len(act_targets), gw, act_targets)) 568 else: 569 cmdl = cmd.lower() 570 try: 571 ns_info = True 572 if cmdl.startswith('+'): 573 ns.update(cmdl[1:]) 574 elif cmdl.startswith('-'): 575 ns.difference_update(cmdl[1:]) 576 elif cmdl.startswith('@'): 577 ns = NodeSet(cmdl[1:]) 578 elif cmdl == '=': 579 display.gather = not display.gather 580 if display.gather: 581 display.vprint(VERB_STD, \ 582 "Switching to gathered output format") 583 else: 584 display.vprint(VERB_STD, \ 585 "Switching to standard output format") 586 task.set_default("stdout_msgtree", \ 587 display.gather or display.line_mode) 588 ns_info = False 589 continue 590 elif not cmdl.startswith('?'): # if ?, just print ns_info 591 ns_info = False 592 except NodeSetParseError: 593 display.vprint_err(VERB_QUIET, \ 594 "clush: nodeset parse error (ignoring)") 595 596 if ns_info: 597 continue 598 599 if cmdl.startswith('!') and len(cmd.strip()) > 0: 600 run_command(task, cmd[1:], None, timeout, display, remote) 601 elif cmdl != "quit": 602 if not cmd: 603 continue 604 if readline_avail: 605 readline.write_history_file(get_history_file()) 606 run_command(task, cmd, ns, timeout, display, remote) 607 return rc
608
609 -def _stdin_thread_start(stdin_port, display):
610 """Standard input reader thread entry point.""" 611 try: 612 # Note: read length should be as large as possible for performance 613 # yet not too large to not introduce artificial latency. 614 # 64k seems to be perfect with an openssh backend (they issue 64k 615 # reads) ; could consider making it an option for e.g. gsissh. 616 bufsize = 64 * 1024 617 # thread loop: blocking read stdin + send messages to specified 618 # port object 619 buf = sys.stdin.read(bufsize) 620 while buf: 621 # send message to specified port object (with ack) 622 stdin_port.msg(buf) 623 buf = sys.stdin.read(bufsize) 624 except IOError, ex: 625 display.vprint(VERB_VERB, "stdin: %s" % ex) 626 # send a None message to indicate EOF 627 stdin_port.msg(None)
628
629 -def bind_stdin(worker, display):
630 """Create a stdin->port->worker binding: connect specified worker 631 to stdin with the help of a reader thread and a ClusterShell Port 632 object.""" 633 assert not sys.stdin.isatty() 634 # Create a ClusterShell Port object bound to worker's task. This object 635 # is able to receive messages in a thread-safe manner and then will safely 636 # trigger ev_msg() on a specified event handler. 637 port = worker.task.port(handler=StdInputHandler(worker), autoclose=True) 638 # Launch a dedicated thread to read stdin in blocking mode. Indeed stdin 639 # can be a file, so we cannot use a WorkerSimple here as polling on file 640 # may result in different behaviors depending on selected engine. 641 stdin_thread = threading.Thread(None, _stdin_thread_start, args=(port, display)) 642 # setDaemon because we're sometimes left with data that has been read and 643 # ssh connection already closed. 644 # Syntax for compat with Python < 2.6 645 stdin_thread.setDaemon(True) 646 stdin_thread.start()
647
648 -def run_command(task, cmd, ns, timeout, display, remote):
649 """ 650 Create and run the specified command line, displaying 651 results in a dshbak way when gathering is used. 652 """ 653 task.set_default("USER_running", True) 654 655 if (display.gather or display.line_mode) and ns is not None: 656 if display.gather and display.line_mode: 657 handler = LiveGatherOutputHandler(display, ns) 658 elif not display.gather and display.line_mode: 659 handler = SortedOutputHandler(display) 660 else: 661 handler = GatherOutputHandler(display) 662 663 if display.verbosity in (VERB_STD, VERB_VERB) or \ 664 (display.progress and display.verbosity > VERB_QUIET): 665 handler.runtimer_init(task, len(ns)) 666 elif display.progress and display.verbosity > VERB_QUIET: 667 handler = DirectProgressOutputHandler(display) 668 handler.runtimer_init(task, len(ns)) 669 else: 670 # this is the simpler but faster output handler 671 handler = DirectOutputHandler(display) 672 673 worker = task.shell(cmd, nodes=ns, handler=handler, timeout=timeout, 674 remote=remote) 675 if ns is None: 676 worker.set_key('LOCAL') 677 if task.default("USER_stdin_worker"): 678 bind_stdin(worker, display) 679 680 task.resume()
681
682 -def run_copy(task, sources, dest, ns, timeout, preserve_flag, display):
683 """run copy command""" 684 task.set_default("USER_running", True) 685 task.set_default("USER_copies", len(sources)) 686 687 copyhandler = CopyOutputHandler(display) 688 if display.verbosity in (VERB_STD, VERB_VERB): 689 copyhandler.runtimer_init(task, len(ns) * len(sources)) 690 691 # Sources check 692 for source in sources: 693 if not exists(source): 694 display.vprint_err(VERB_QUIET, 695 'ERROR: file "%s" not found' % source) 696 clush_exit(1, task) 697 task.copy(source, dest, ns, handler=copyhandler, timeout=timeout, 698 preserve=preserve_flag) 699 task.resume()
700
701 -def run_rcopy(task, sources, dest, ns, timeout, preserve_flag, display):
702 """run reverse copy command""" 703 task.set_default("USER_running", True) 704 task.set_default("USER_copies", len(sources)) 705 706 # Sanity checks 707 if not exists(dest): 708 display.vprint_err(VERB_QUIET, 709 'ERROR: directory "%s" not found' % dest) 710 clush_exit(1, task) 711 if not isdir(dest): 712 display.vprint_err(VERB_QUIET, 713 'ERROR: destination "%s" is not a directory' % dest) 714 clush_exit(1, task) 715 716 copyhandler = CopyOutputHandler(display, True) 717 if display.verbosity == VERB_STD or display.verbosity == VERB_VERB: 718 copyhandler.runtimer_init(task, len(ns) * len(sources)) 719 for source in sources: 720 task.rcopy(source, dest, ns, handler=copyhandler, timeout=timeout, 721 stderr=True, preserve=preserve_flag) 722 task.resume()
723
724 -def set_fdlimit(fd_max, display):
725 """Make open file descriptors soft limit the max.""" 726 soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) 727 if hard < fd_max: 728 msgfmt = 'Warning: fd_max set to %d but max open files hard limit is %d' 729 display.vprint_err(VERB_VERB, msgfmt % (fd_max, hard)) 730 rlim_max = min(hard, fd_max) 731 if soft != rlim_max: 732 msgfmt = 'Changing max open files soft limit from %d to %d' 733 display.vprint(VERB_DEBUG, msgfmt % (soft, rlim_max)) 734 try: 735 resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_max, hard)) 736 except (ValueError, resource.error), exc: 737 # Most probably the requested limit exceeds the system imposed limit 738 msgfmt = 'Warning: Failed to set max open files limit to %d (%s)' 739 display.vprint_err(VERB_VERB, msgfmt % (rlim_max, exc))
740
741 -def clush_exit(status, task=None):
742 """Exit script, flushing stdio buffers and stopping ClusterShell task.""" 743 if task: 744 # Clean, usual termination 745 task.abort() 746 task.join() 747 sys.exit(status) 748 else: 749 # Best effort cleanup if no task is set 750 for stream in [sys.stdout, sys.stderr]: 751 try: 752 stream.flush() 753 except IOError: 754 pass 755 # Use os._exit to avoid threads cleanup 756 os._exit(status)
757
758 -def clush_excepthook(extype, exp, traceback):
759 """Exceptions hook for clush: this method centralizes exception 760 handling from main thread and from (possible) separate task thread. 761 This hook has to be previously installed on startup by overriding 762 sys.excepthook and task.excepthook.""" 763 try: 764 raise exp 765 except ClushConfigError, econf: 766 print >> sys.stderr, "ERROR: %s" % econf 767 clush_exit(1) 768 except KeyboardInterrupt, kbe: 769 uncomp_nodes = getattr(kbe, 'uncompleted_nodes', None) 770 if uncomp_nodes: 771 print >> sys.stderr, \ 772 "Keyboard interrupt (%s did not complete)." % uncomp_nodes 773 else: 774 print >> sys.stderr, "Keyboard interrupt." 775 clush_exit(128 + signal.SIGINT) 776 except OSError, exp: 777 print >> sys.stderr, "ERROR: %s" % exp 778 if exp.errno == errno.EMFILE: 779 print >> sys.stderr, "ERROR: current `nofile' limits: " \ 780 "soft=%d hard=%d" % resource.getrlimit(resource.RLIMIT_NOFILE) 781 clush_exit(1) 782 except GENERIC_ERRORS, exc: 783 clush_exit(handle_generic_error(exc)) 784 785 # Error not handled 786 task_self().default_excepthook(extype, exp, traceback)
787
788 -def main():
789 """clush script entry point""" 790 sys.excepthook = clush_excepthook 791 792 # 793 # Argument management 794 # 795 usage = "%prog [options] command" 796 797 parser = OptionParser(usage) 798 799 parser.add_option("--nostdin", action="store_true", dest="nostdin", 800 help="don't watch for possible input from stdin") 801 802 parser.install_config_options('clush.conf(5)') 803 parser.install_nodes_options() 804 parser.install_display_options(verbose_options=True) 805 parser.install_filecopy_options() 806 parser.install_connector_options() 807 808 (options, args) = parser.parse_args() 809 810 # 811 # Load config file and apply overrides 812 # 813 config = ClushConfig(options) 814 815 # Should we use ANSI colors for nodes? 816 if config.color == "auto": 817 color = sys.stdout.isatty() and (options.gatherall or \ 818 sys.stderr.isatty()) 819 else: 820 color = config.color == "always" 821 822 try: 823 # Create and configure display object. 824 display = Display(options, config, color) 825 except ValueError, exc: 826 parser.error("option mismatch (%s)" % exc) 827 828 if options.groupsource: 829 # Be sure -a/g -s source work as espected. 830 std_group_resolver().default_source_name = options.groupsource 831 832 # Compute the nodeset and warn for possible use of shell pathname 833 # expansion (#225) 834 wnodelist = [] 835 xnodelist = [] 836 if options.nodes: 837 wnodelist = [NodeSet(nodes) for nodes in options.nodes] 838 839 if options.exclude: 840 xnodelist = [NodeSet(nodes) for nodes in options.exclude] 841 842 for (opt, nodelist) in (('w', wnodelist), ('x', xnodelist)): 843 for nodes in nodelist: 844 if len(nodes) == 1 and exists(str(nodes)): 845 display.vprint_err(VERB_STD, "Warning: using '-%s %s' and " 846 "local path '%s' exists, was it expanded " 847 "by the shell?" % (opt, nodes, nodes)) 848 849 # --hostfile support (#235) 850 for opt_hostfile in options.hostfile: 851 try: 852 fnodeset = NodeSet() 853 hostfile = open(opt_hostfile) 854 for line in hostfile.read().splitlines(): 855 fnodeset.updaten(nodes for nodes in line.split()) 856 hostfile.close() 857 display.vprint_err(VERB_DEBUG, 858 "Using nodeset %s from hostfile %s" 859 % (fnodeset, opt_hostfile)) 860 wnodelist.append(fnodeset) 861 except IOError, exc: 862 # re-raise as OSError to be properly handled 863 errno, strerror = exc.args 864 raise OSError(errno, strerror, exc.filename) 865 866 # Instantiate target nodeset from command line and hostfile 867 nodeset_base = NodeSet.fromlist(wnodelist) 868 # Instantiate filter nodeset (command line only) 869 nodeset_exclude = NodeSet.fromlist(xnodelist) 870 871 # Specified engine prevails over default engine 872 DEFAULTS.engine = options.engine 873 874 # Do we have nodes group? 875 task = task_self() 876 task.set_info("debug", config.verbosity >= VERB_DEBUG) 877 if config.verbosity == VERB_DEBUG: 878 std_group_resolver().set_verbosity(1) 879 if options.nodes_all: 880 all_nodeset = NodeSet.fromall() 881 display.vprint(VERB_DEBUG, "Adding nodes from option -a: %s" % \ 882 all_nodeset) 883 nodeset_base.add(all_nodeset) 884 885 if options.group: 886 grp_nodeset = NodeSet.fromlist(options.group, 887 resolver=RESOLVER_NOGROUP) 888 for grp in grp_nodeset: 889 addingrp = NodeSet("@" + grp) 890 display.vprint(VERB_DEBUG, \ 891 "Adding nodes from option -g %s: %s" % (grp, addingrp)) 892 nodeset_base.update(addingrp) 893 894 if options.exgroup: 895 grp_nodeset = NodeSet.fromlist(options.exgroup, 896 resolver=RESOLVER_NOGROUP) 897 for grp in grp_nodeset: 898 removingrp = NodeSet("@" + grp) 899 display.vprint(VERB_DEBUG, \ 900 "Excluding nodes from option -X %s: %s" % (grp, removingrp)) 901 nodeset_exclude.update(removingrp) 902 903 # Do we have an exclude list? (-x ...) 904 nodeset_base.difference_update(nodeset_exclude) 905 if len(nodeset_base) < 1: 906 parser.error('No node to run on.') 907 908 if options.pick and options.pick < len(nodeset_base): 909 # convert to string for sample as nsiter() is slower for big 910 # nodesets; and we assume options.pick will remain small-ish 911 keep = random.sample(nodeset_base, options.pick) 912 nodeset_base.intersection_update(','.join(keep)) 913 if config.verbosity >= VERB_VERB: 914 msg = "Picked random nodes: %s" % nodeset_base 915 print Display.COLOR_RESULT_FMT % msg 916 917 # Set open files limit. 918 set_fdlimit(config.fd_max, display) 919 920 # 921 # Task management 922 # 923 # check for clush interactive mode 924 interactive = not len(args) and \ 925 not (options.copy or options.rcopy) 926 # check for foreground ttys presence (input) 927 stdin_isafgtty = sys.stdin.isatty() and \ 928 os.tcgetpgrp(sys.stdin.fileno()) == os.getpgrp() 929 # check for special condition (empty command and stdin not a tty) 930 if interactive and not stdin_isafgtty: 931 # looks like interactive but stdin is not a tty: 932 # switch to non-interactive + disable ssh pseudo-tty 933 interactive = False 934 # SSH: disable pseudo-tty allocation (-T) 935 ssh_options = config.ssh_options or '' 936 ssh_options += ' -T' 937 config._set_main("ssh_options", ssh_options) 938 if options.nostdin and interactive: 939 parser.error("illegal option `--nostdin' in that case") 940 941 # Force user_interaction if Clush._f_user_interaction for test purposes 942 user_interaction = hasattr(sys.modules[__name__], '_f_user_interaction') 943 if not options.nostdin: 944 # Try user interaction: check for foreground ttys presence (ouput) 945 stdout_isafgtty = sys.stdout.isatty() and \ 946 os.tcgetpgrp(sys.stdout.fileno()) == os.getpgrp() 947 user_interaction |= stdin_isafgtty and stdout_isafgtty 948 display.vprint(VERB_DEBUG, "User interaction: %s" % user_interaction) 949 if user_interaction: 950 # Standard input is a terminal and we want to perform some user 951 # interactions in the main thread (using blocking calls), so 952 # we run cluster commands in a new ClusterShell Task (a new 953 # thread is created). 954 task = Task() 955 # else: perform everything in the main thread 956 957 # Handle special signal only when user_interaction is set 958 task.set_default("USER_handle_SIGUSR1", user_interaction) 959 960 task.excepthook = sys.excepthook 961 task.set_default("USER_stdin_worker", not (sys.stdin.isatty() or \ 962 options.nostdin or \ 963 user_interaction)) 964 display.vprint(VERB_DEBUG, "Create STDIN worker: %s" % \ 965 task.default("USER_stdin_worker")) 966 967 if config.verbosity >= VERB_DEBUG: 968 task.set_info("debug", True) 969 logging.basicConfig(level=logging.DEBUG) 970 logging.debug("clush: STARTING DEBUG") 971 else: 972 logging.basicConfig(level=logging.CRITICAL) 973 974 task.set_info("fanout", config.fanout) 975 976 if options.worker: 977 try: 978 if options.remote == 'no': 979 task.set_default('local_worker', 980 _load_workerclass(options.worker)) 981 else: 982 task.set_default('distant_worker', 983 _load_workerclass(options.worker)) 984 except (ImportError, AttributeError): 985 msg = "ERROR: Could not load worker '%s'" % options.worker 986 display.vprint_err(VERB_QUIET, msg) 987 clush_exit(1, task) 988 989 if options.topofile or task._default_tree_is_enabled(): 990 if options.topofile: 991 task.load_topology(options.topofile) 992 if config.verbosity >= VERB_VERB: 993 roots = len(task.topology.root.nodeset) 994 gws = task.topology.inner_node_count() - roots 995 msg = "enabling tree topology (%d gateways)" % gws 996 print >> sys.stderr, "clush: %s" % msg 997 998 if options.grooming_delay: 999 if config.verbosity >= VERB_VERB: 1000 msg = Display.COLOR_RESULT_FMT % ("Grooming delay: %f" % 1001 options.grooming_delay) 1002 print >> sys.stderr, msg 1003 task.set_info("grooming_delay", options.grooming_delay) 1004 elif options.rcopy: 1005 # By default, --rcopy should inhibit grooming 1006 task.set_info("grooming_delay", 0) 1007 1008 if config.ssh_user: 1009 task.set_info("ssh_user", config.ssh_user) 1010 if config.ssh_path: 1011 task.set_info("ssh_path", config.ssh_path) 1012 if config.ssh_options: 1013 task.set_info("ssh_options", config.ssh_options) 1014 if config.scp_path: 1015 task.set_info("scp_path", config.scp_path) 1016 if config.scp_options: 1017 task.set_info("scp_options", config.scp_options) 1018 if config.rsh_path: 1019 task.set_info("rsh_path", config.rsh_path) 1020 if config.rcp_path: 1021 task.set_info("rcp_path", config.rcp_path) 1022 if config.rsh_options: 1023 task.set_info("rsh_options", config.rsh_options) 1024 1025 # Set detailed timeout values 1026 task.set_info("connect_timeout", config.connect_timeout) 1027 task.set_info("command_timeout", config.command_timeout) 1028 1029 # Enable stdout/stderr separation 1030 task.set_default("stderr", not options.gatherall) 1031 1032 # Disable MsgTree buffering if not gathering outputs 1033 task.set_default("stdout_msgtree", display.gather or display.line_mode) 1034 1035 # Always disable stderr MsgTree buffering 1036 task.set_default("stderr_msgtree", False) 1037 1038 # Set timeout at worker level when command_timeout is defined. 1039 if config.command_timeout > 0: 1040 timeout = config.command_timeout 1041 else: 1042 timeout = -1 1043 1044 # Configure task custom status 1045 task.set_default("USER_interactive", interactive) 1046 task.set_default("USER_running", False) 1047 1048 if (options.copy or options.rcopy) and not args: 1049 parser.error("--[r]copy option requires at least one argument") 1050 if options.copy: 1051 if not options.dest_path: 1052 # append '/' to clearly indicate a directory for tree mode 1053 options.dest_path = join(dirname(abspath(args[0])), '') 1054 op = "copy sources=%s dest=%s" % (args, options.dest_path) 1055 elif options.rcopy: 1056 if not options.dest_path: 1057 options.dest_path = dirname(abspath(args[0])) 1058 op = "rcopy sources=%s dest=%s" % (args, options.dest_path) 1059 else: 1060 op = "command=\"%s\"" % ' '.join(args) 1061 1062 # print debug values (fanout value is get from the config object 1063 # and not task itself as set_info() is an asynchronous call. 1064 display.vprint(VERB_DEBUG, "clush: nodeset=%s fanout=%d [timeout " \ 1065 "conn=%.1f cmd=%.1f] %s" % (nodeset_base, config.fanout, 1066 config.connect_timeout, 1067 config.command_timeout, 1068 op)) 1069 if not task.default("USER_interactive"): 1070 if display.verbosity >= VERB_DEBUG and task.topology: 1071 print Display.COLOR_RESULT_FMT % '-' * 15 1072 print Display.COLOR_RESULT_FMT % task.topology, 1073 print Display.COLOR_RESULT_FMT % '-' * 15 1074 if options.copy: 1075 run_copy(task, args, options.dest_path, nodeset_base, timeout, 1076 options.preserve_flag, display) 1077 elif options.rcopy: 1078 run_rcopy(task, args, options.dest_path, nodeset_base, timeout, 1079 options.preserve_flag, display) 1080 else: 1081 run_command(task, ' '.join(args), nodeset_base, timeout, display, 1082 options.remote != 'no') 1083 1084 if user_interaction: 1085 ttyloop(task, nodeset_base, timeout, display, options.remote != 'no') 1086 elif task.default("USER_interactive"): 1087 display.vprint_err(VERB_QUIET, \ 1088 "ERROR: interactive mode requires a tty") 1089 clush_exit(1, task) 1090 1091 rc = 0 1092 if options.maxrc: 1093 # Instead of clush return code, return commands retcode 1094 rc = task.max_retcode() 1095 if task.num_timeout() > 0: 1096 rc = 255 1097 clush_exit(rc, task)
1098 1099 if __name__ == '__main__': 1100 main() 1101