1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Execute cluster commands in parallel
24
25 clush is an utility program to run commands on a cluster which benefits
26 from the ClusterShell library and its Ssh worker. It features an
27 integrated output results gathering system (dshbak-like), can get node
28 groups by running predefined external commands and can redirect lines
29 read on its standard input to the remote commands.
30
31 When no command are specified, clush runs interactively.
32
33 """
34
35 import errno
36 import logging
37 import os
38 from os.path import abspath, dirname, exists, isdir, join
39 import resource
40 import sys
41 import signal
42 import time
43 import threading
44 import random
45
46 from ClusterShell.Defaults import DEFAULTS, _load_workerclass
47 from ClusterShell.CLI.Config import ClushConfig, ClushConfigError
48 from ClusterShell.CLI.Display import Display
49 from ClusterShell.CLI.Display import VERB_QUIET, VERB_STD, VERB_VERB, VERB_DEBUG
50 from ClusterShell.CLI.OptionParser import OptionParser
51 from ClusterShell.CLI.Error import GENERIC_ERRORS, handle_generic_error
52 from ClusterShell.CLI.Utils import NodeSet, bufnodeset_cmp, human_bi_bytes_unit
53
54 from ClusterShell.Event import EventHandler
55 from ClusterShell.MsgTree import MsgTree
56 from ClusterShell.NodeSet import RESOLVER_NOGROUP, std_group_resolver
57 from ClusterShell.NodeSet import NodeSetParseError
58 from ClusterShell.Task import Task, task_self
59
60
62 """Exception used by the signal handler"""
63
77
79 """Base class for clush output handlers."""
80
84
86 """Init timer for live command-completed progressmeter."""
87 thandler = RunTimer(task, ntotal)
88 self._runtimer = task.timer(1.33, thandler, interval=1./3.,
89 autoclose=True)
90
92 """Hide runtimer counter"""
93 if self._runtimer:
94 self._runtimer.eh.erase_line()
95
97 """Force redisplay of counter"""
98 if self._runtimer:
99 self._runtimer.eh.set_dirty()
100
102 """Finalize display of runtimer counter"""
103 if self._runtimer:
104 self._runtimer.eh.finalize(worker.task.default("USER_interactive"))
105 self._runtimer.invalidate()
106 self._runtimer = None
107
109 """
110 If needed, notify main thread to update its prompt by sending
111 a SIGUSR1 signal. We use task-specific user-defined variable
112 to record current states (prefixed by USER_).
113 """
114 worker.task.set_default("USER_running", False)
115 if worker.task.default("USER_handle_SIGUSR1"):
116 os.kill(os.getpid(), signal.SIGUSR1)
117
119 """Worker is starting."""
120 if self._runtimer:
121 self._runtimer.eh.start_time = time.time()
122
124 """Bytes written on worker"""
125 if self._runtimer:
126 self._runtimer.eh.bytes_written += size
127
129 """Direct output event handler class."""
130
134
136 node = worker.current_node or worker.key
137 self._display.print_line(node, worker.current_msg)
138
140 node = worker.current_node or worker.key
141 self._display.print_line_error(node, worker.current_errmsg)
142
144 node = worker.current_node or worker.key
145 rc = worker.current_rc
146 if rc > 0:
147 verb = VERB_QUIET
148 if self._display.maxrc:
149 verb = VERB_STD
150 self._display.vprint_err(verb, \
151 "clush: %s: exited with exit code %d" % (node, rc))
152
156
159
161 """Direct output event handler class with progress support."""
162
163
164
165
166
168 self._runtimer_clean()
169
170 node = worker.current_node or worker.key
171 self._display.print_line(node, worker.current_msg)
172
177
181
183 """Copy output event handler."""
184 - def __init__(self, display, reverse=False):
187
189 """A copy worker has finished."""
190 for rc, nodes in worker.iter_retcodes():
191 if rc == 0:
192 if self.reverse:
193 self._display.vprint(VERB_VERB, "%s:`%s' -> `%s'" % \
194 (nodes, worker.source, worker.dest))
195 else:
196 self._display.vprint(VERB_VERB, "`%s' -> %s:`%s'" % \
197 (worker.source, nodes, worker.dest))
198 break
199
200 copies = worker.task.default("USER_copies") - 1
201 worker.task.set_default("USER_copies", copies)
202 if copies == 0:
203 self._runtimer_finalize(worker)
204 self.update_prompt(worker)
205
207 """Gathered output event handler class (clush -b)."""
208
212
214 if self._display.verbosity == VERB_VERB:
215 node = worker.current_node or worker.key
216 self._display.print_line(node, worker.current_msg)
217
223
250
268
270 """Sorted by node output event handler class (clush -L)."""
271
291
293 """Live line-gathered output event handler class (-bL)."""
294
296 assert nodes is not None, "cannot gather local command"
297 GatherOutputHandler.__init__(self, display)
298 self._nodes = NodeSet(nodes)
299 self._nodecnt = dict.fromkeys(self._nodes, 0)
300 self._mtreeq = []
301 self._offload = 0
302
304
305 node = worker.current_node
306 self._nodecnt[node] += 1
307 cnt = self._nodecnt[node]
308 if len(self._mtreeq) < cnt:
309 self._mtreeq.append(MsgTree())
310 self._mtreeq[cnt - self._offload - 1].add(node, worker.current_msg)
311 self._live_line(worker)
312
314 if self._mtreeq and worker.current_node not in self._mtreeq[0]:
315
316
317 self._nodes.remove(worker.current_node)
318 self._live_line(worker)
319
331
346
348 """Running progress timer event handler"""
350 EventHandler.__init__(self)
351 self.task = task
352 self.total = total
353 self.cnt_last = -1
354 self.tslen = len(str(self.total))
355 self.wholelen = 0
356 self.started = False
357
358 self.start_time = 0
359 self.bytes_written = 0
360
363
366
368 if self.wholelen:
369 sys.stderr.write(' ' * self.wholelen + '\r')
370 self.wholelen = 0
371
373 """Update runtime progress info"""
374 wrbwinfo = ''
375 if self.bytes_written > 0:
376 bandwidth = self.bytes_written/(time.time() - self.start_time)
377 wrbwinfo = " write: %s/s" % human_bi_bytes_unit(bandwidth)
378
379 gws = self.task.gateways.keys()
380 if gws:
381
382 act_targets = NodeSet()
383 for gw, (chan, metaworkers) in self.task.gateways.iteritems():
384 act_targets.updaten(mw.gwtargets[gw] for mw in metaworkers)
385 cnt = len(act_targets) + len(self.task._engine.clients()) - len(gws)
386 gwinfo = ' gw %d' % len(gws)
387 else:
388 cnt = len(self.task._engine.clients())
389 gwinfo = ''
390 if self.bytes_written > 0 or cnt != self.cnt_last:
391 self.cnt_last = cnt
392
393 towrite = 'clush: %*d/%*d%s%s\r' % (self.tslen, self.total - cnt,
394 self.tslen, self.total, gwinfo,
395 wrbwinfo)
396 self.wholelen = len(towrite)
397 sys.stderr.write(towrite)
398 self.started = True
399
401 """finalize display of runtimer"""
402 if not self.started:
403 return
404 self.erase_line()
405
406 fmt = 'clush: %*d/%*d'
407 if force_cr:
408 fmt += '\n'
409 else:
410 fmt += '\r'
411 sys.stderr.write(fmt % (self.tslen, self.total, self.tslen, self.total))
412
413
415 """Signal handler used for main thread notification"""
416 if signum == signal.SIGUSR1:
417 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
418 raise UpdatePromptException()
419
421 """Turn the history file path"""
422 return join(os.environ["HOME"], ".clush_history")
423
425 """
426 Configure readline to automatically load and save a history file
427 named .clush_history
428 """
429 import readline
430 readline.parse_and_bind("tab: complete")
431 readline.set_completer_delims("")
432 try:
433 readline.read_history_file(get_history_file())
434 except IOError:
435 pass
436
437 -def ttyloop(task, nodeset, timeout, display, remote):
438 """Manage the interactive prompt to run command"""
439 readline_avail = False
440 interactive = task.default("USER_interactive")
441 if interactive:
442 try:
443 import readline
444 readline_setup()
445 readline_avail = True
446 except ImportError:
447 pass
448 display.vprint(VERB_STD, \
449 "Enter 'quit' to leave this interactive mode")
450
451 rc = 0
452 ns = NodeSet(nodeset)
453 ns_info = True
454 cmd = ""
455 while task.default("USER_running") or \
456 (interactive and cmd.lower() != 'quit'):
457 try:
458
459 if task.default("USER_handle_SIGUSR1"):
460 signal.signal(signal.SIGUSR1, signal_handler)
461
462 if task.default("USER_interactive") and \
463 not task.default("USER_running"):
464 if ns_info:
465 display.vprint(VERB_QUIET, \
466 "Working with nodes: %s" % ns)
467 ns_info = False
468 prompt = "clush> "
469 else:
470 prompt = ""
471 try:
472 cmd = raw_input(prompt)
473 assert cmd is not None, "Result of raw_input() is None!"
474 finally:
475 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
476 except EOFError:
477 print
478 return
479 except UpdatePromptException:
480 if task.default("USER_interactive"):
481 continue
482 return
483 except KeyboardInterrupt, kbe:
484
485
486 if display.gather:
487
488 task.suspend()
489
490
491
492
493 if not task.default("USER_running"):
494
495 raise kbe
496
497
498
499 print_warn = False
500
501
502 nodesetify = lambda v: (v[0], NodeSet._fromlist1(v[1]))
503 for buf, nodeset in sorted(map(nodesetify, task.iter_buffers()),
504 cmp=bufnodeset_cmp):
505 if not print_warn:
506 print_warn = True
507 display.vprint_err(VERB_STD, \
508 "Warning: Caught keyboard interrupt!")
509 display.print_gather(nodeset, buf)
510
511
512 verbexit = VERB_QUIET
513 if display.maxrc:
514 verbexit = VERB_STD
515 ns_ok = NodeSet()
516 for rc, nodelist in task.iter_retcodes():
517 ns_ok.add(NodeSet._fromlist1(nodelist))
518 if rc != 0:
519
520 nsdisp = ns = NodeSet._fromlist1(nodelist)
521 if display.verbosity >= VERB_QUIET and len(ns) > 1:
522 nsdisp = "%s (%d)" % (ns, len(ns))
523 msgrc = "clush: %s: exited with exit code %d" % (nsdisp,
524 rc)
525 display.vprint_err(verbexit, msgrc)
526
527
528 kbe.uncompleted_nodes = ns - ns_ok
529
530
531 if task.num_timeout() > 0:
532 display.vprint_err(verbexit, \
533 "clush: %s: command timeout" % \
534 NodeSet._fromlist1(task.iter_keys_timeout()))
535 raise kbe
536
537 if task.default("USER_running"):
538 ns_reg, ns_unreg = NodeSet(), NodeSet()
539 for client in task._engine.clients():
540 if client.registered:
541 ns_reg.add(client.key)
542 else:
543 ns_unreg.add(client.key)
544 if ns_unreg:
545 pending = "\nclush: pending(%d): %s" % (len(ns_unreg), ns_unreg)
546 else:
547 pending = ""
548 display.vprint_err(VERB_QUIET,
549 "clush: interrupt (^C to abort task)")
550 gws = task.gateways.keys()
551 if not gws:
552 display.vprint_err(VERB_QUIET,
553 "clush: in progress(%d): %s%s"
554 % (len(ns_reg), ns_reg, pending))
555 else:
556 display.vprint_err(VERB_QUIET,
557 "clush: in progress(%d): %s%s\n"
558 "clush: [tree] open gateways(%d): %s"
559 % (len(ns_reg), ns_reg, pending,
560 len(gws), NodeSet._fromlist1(gws)))
561 for gw, (chan, metaworkers) in task.gateways.iteritems():
562 act_targets = NodeSet.fromlist(mw.gwtargets[gw]
563 for mw in metaworkers)
564 if act_targets:
565 display.vprint_err(VERB_QUIET,
566 "clush: [tree] in progress(%d) on %s: %s"
567 % (len(act_targets), gw, act_targets))
568 else:
569 cmdl = cmd.lower()
570 try:
571 ns_info = True
572 if cmdl.startswith('+'):
573 ns.update(cmdl[1:])
574 elif cmdl.startswith('-'):
575 ns.difference_update(cmdl[1:])
576 elif cmdl.startswith('@'):
577 ns = NodeSet(cmdl[1:])
578 elif cmdl == '=':
579 display.gather = not display.gather
580 if display.gather:
581 display.vprint(VERB_STD, \
582 "Switching to gathered output format")
583 else:
584 display.vprint(VERB_STD, \
585 "Switching to standard output format")
586 task.set_default("stdout_msgtree", \
587 display.gather or display.line_mode)
588 ns_info = False
589 continue
590 elif not cmdl.startswith('?'):
591 ns_info = False
592 except NodeSetParseError:
593 display.vprint_err(VERB_QUIET, \
594 "clush: nodeset parse error (ignoring)")
595
596 if ns_info:
597 continue
598
599 if cmdl.startswith('!') and len(cmd.strip()) > 0:
600 run_command(task, cmd[1:], None, timeout, display, remote)
601 elif cmdl != "quit":
602 if not cmd:
603 continue
604 if readline_avail:
605 readline.write_history_file(get_history_file())
606 run_command(task, cmd, ns, timeout, display, remote)
607 return rc
608
610 """Standard input reader thread entry point."""
611 try:
612
613
614
615
616 bufsize = 64 * 1024
617
618
619 buf = sys.stdin.read(bufsize)
620 while buf:
621
622 stdin_port.msg(buf)
623 buf = sys.stdin.read(bufsize)
624 except IOError, ex:
625 display.vprint(VERB_VERB, "stdin: %s" % ex)
626
627 stdin_port.msg(None)
628
630 """Create a stdin->port->worker binding: connect specified worker
631 to stdin with the help of a reader thread and a ClusterShell Port
632 object."""
633 assert not sys.stdin.isatty()
634
635
636
637 port = worker.task.port(handler=StdInputHandler(worker), autoclose=True)
638
639
640
641 stdin_thread = threading.Thread(None, _stdin_thread_start, args=(port, display))
642
643
644
645 stdin_thread.setDaemon(True)
646 stdin_thread.start()
647
648 -def run_command(task, cmd, ns, timeout, display, remote):
649 """
650 Create and run the specified command line, displaying
651 results in a dshbak way when gathering is used.
652 """
653 task.set_default("USER_running", True)
654
655 if (display.gather or display.line_mode) and ns is not None:
656 if display.gather and display.line_mode:
657 handler = LiveGatherOutputHandler(display, ns)
658 elif not display.gather and display.line_mode:
659 handler = SortedOutputHandler(display)
660 else:
661 handler = GatherOutputHandler(display)
662
663 if display.verbosity in (VERB_STD, VERB_VERB) or \
664 (display.progress and display.verbosity > VERB_QUIET):
665 handler.runtimer_init(task, len(ns))
666 elif display.progress and display.verbosity > VERB_QUIET:
667 handler = DirectProgressOutputHandler(display)
668 handler.runtimer_init(task, len(ns))
669 else:
670
671 handler = DirectOutputHandler(display)
672
673 worker = task.shell(cmd, nodes=ns, handler=handler, timeout=timeout,
674 remote=remote)
675 if ns is None:
676 worker.set_key('LOCAL')
677 if task.default("USER_stdin_worker"):
678 bind_stdin(worker, display)
679
680 task.resume()
681
682 -def run_copy(task, sources, dest, ns, timeout, preserve_flag, display):
683 """run copy command"""
684 task.set_default("USER_running", True)
685 task.set_default("USER_copies", len(sources))
686
687 copyhandler = CopyOutputHandler(display)
688 if display.verbosity in (VERB_STD, VERB_VERB):
689 copyhandler.runtimer_init(task, len(ns) * len(sources))
690
691
692 for source in sources:
693 if not exists(source):
694 display.vprint_err(VERB_QUIET,
695 'ERROR: file "%s" not found' % source)
696 clush_exit(1, task)
697 task.copy(source, dest, ns, handler=copyhandler, timeout=timeout,
698 preserve=preserve_flag)
699 task.resume()
700
701 -def run_rcopy(task, sources, dest, ns, timeout, preserve_flag, display):
702 """run reverse copy command"""
703 task.set_default("USER_running", True)
704 task.set_default("USER_copies", len(sources))
705
706
707 if not exists(dest):
708 display.vprint_err(VERB_QUIET,
709 'ERROR: directory "%s" not found' % dest)
710 clush_exit(1, task)
711 if not isdir(dest):
712 display.vprint_err(VERB_QUIET,
713 'ERROR: destination "%s" is not a directory' % dest)
714 clush_exit(1, task)
715
716 copyhandler = CopyOutputHandler(display, True)
717 if display.verbosity == VERB_STD or display.verbosity == VERB_VERB:
718 copyhandler.runtimer_init(task, len(ns) * len(sources))
719 for source in sources:
720 task.rcopy(source, dest, ns, handler=copyhandler, timeout=timeout,
721 stderr=True, preserve=preserve_flag)
722 task.resume()
723
725 """Make open file descriptors soft limit the max."""
726 soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
727 if hard < fd_max:
728 msgfmt = 'Warning: fd_max set to %d but max open files hard limit is %d'
729 display.vprint_err(VERB_VERB, msgfmt % (fd_max, hard))
730 rlim_max = min(hard, fd_max)
731 if soft != rlim_max:
732 msgfmt = 'Changing max open files soft limit from %d to %d'
733 display.vprint(VERB_DEBUG, msgfmt % (soft, rlim_max))
734 try:
735 resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_max, hard))
736 except (ValueError, resource.error), exc:
737
738 msgfmt = 'Warning: Failed to set max open files limit to %d (%s)'
739 display.vprint_err(VERB_VERB, msgfmt % (rlim_max, exc))
740
742 """Exit script, flushing stdio buffers and stopping ClusterShell task."""
743 if task:
744
745 task.abort()
746 task.join()
747 sys.exit(status)
748 else:
749
750 for stream in [sys.stdout, sys.stderr]:
751 try:
752 stream.flush()
753 except IOError:
754 pass
755
756 os._exit(status)
757
759 """Exceptions hook for clush: this method centralizes exception
760 handling from main thread and from (possible) separate task thread.
761 This hook has to be previously installed on startup by overriding
762 sys.excepthook and task.excepthook."""
763 try:
764 raise exp
765 except ClushConfigError, econf:
766 print >> sys.stderr, "ERROR: %s" % econf
767 clush_exit(1)
768 except KeyboardInterrupt, kbe:
769 uncomp_nodes = getattr(kbe, 'uncompleted_nodes', None)
770 if uncomp_nodes:
771 print >> sys.stderr, \
772 "Keyboard interrupt (%s did not complete)." % uncomp_nodes
773 else:
774 print >> sys.stderr, "Keyboard interrupt."
775 clush_exit(128 + signal.SIGINT)
776 except OSError, exp:
777 print >> sys.stderr, "ERROR: %s" % exp
778 if exp.errno == errno.EMFILE:
779 print >> sys.stderr, "ERROR: current `nofile' limits: " \
780 "soft=%d hard=%d" % resource.getrlimit(resource.RLIMIT_NOFILE)
781 clush_exit(1)
782 except GENERIC_ERRORS, exc:
783 clush_exit(handle_generic_error(exc))
784
785
786 task_self().default_excepthook(extype, exp, traceback)
787
789 """clush script entry point"""
790 sys.excepthook = clush_excepthook
791
792
793
794
795 usage = "%prog [options] command"
796
797 parser = OptionParser(usage)
798
799 parser.add_option("--nostdin", action="store_true", dest="nostdin",
800 help="don't watch for possible input from stdin")
801
802 parser.install_config_options('clush.conf(5)')
803 parser.install_nodes_options()
804 parser.install_display_options(verbose_options=True)
805 parser.install_filecopy_options()
806 parser.install_connector_options()
807
808 (options, args) = parser.parse_args()
809
810
811
812
813 config = ClushConfig(options)
814
815
816 if config.color == "auto":
817 color = sys.stdout.isatty() and (options.gatherall or \
818 sys.stderr.isatty())
819 else:
820 color = config.color == "always"
821
822 try:
823
824 display = Display(options, config, color)
825 except ValueError, exc:
826 parser.error("option mismatch (%s)" % exc)
827
828 if options.groupsource:
829
830 std_group_resolver().default_source_name = options.groupsource
831
832
833
834 wnodelist = []
835 xnodelist = []
836 if options.nodes:
837 wnodelist = [NodeSet(nodes) for nodes in options.nodes]
838
839 if options.exclude:
840 xnodelist = [NodeSet(nodes) for nodes in options.exclude]
841
842 for (opt, nodelist) in (('w', wnodelist), ('x', xnodelist)):
843 for nodes in nodelist:
844 if len(nodes) == 1 and exists(str(nodes)):
845 display.vprint_err(VERB_STD, "Warning: using '-%s %s' and "
846 "local path '%s' exists, was it expanded "
847 "by the shell?" % (opt, nodes, nodes))
848
849
850 for opt_hostfile in options.hostfile:
851 try:
852 fnodeset = NodeSet()
853 hostfile = open(opt_hostfile)
854 for line in hostfile.read().splitlines():
855 fnodeset.updaten(nodes for nodes in line.split())
856 hostfile.close()
857 display.vprint_err(VERB_DEBUG,
858 "Using nodeset %s from hostfile %s"
859 % (fnodeset, opt_hostfile))
860 wnodelist.append(fnodeset)
861 except IOError, exc:
862
863 errno, strerror = exc.args
864 raise OSError(errno, strerror, exc.filename)
865
866
867 nodeset_base = NodeSet.fromlist(wnodelist)
868
869 nodeset_exclude = NodeSet.fromlist(xnodelist)
870
871
872 DEFAULTS.engine = options.engine
873
874
875 task = task_self()
876 task.set_info("debug", config.verbosity >= VERB_DEBUG)
877 if config.verbosity == VERB_DEBUG:
878 std_group_resolver().set_verbosity(1)
879 if options.nodes_all:
880 all_nodeset = NodeSet.fromall()
881 display.vprint(VERB_DEBUG, "Adding nodes from option -a: %s" % \
882 all_nodeset)
883 nodeset_base.add(all_nodeset)
884
885 if options.group:
886 grp_nodeset = NodeSet.fromlist(options.group,
887 resolver=RESOLVER_NOGROUP)
888 for grp in grp_nodeset:
889 addingrp = NodeSet("@" + grp)
890 display.vprint(VERB_DEBUG, \
891 "Adding nodes from option -g %s: %s" % (grp, addingrp))
892 nodeset_base.update(addingrp)
893
894 if options.exgroup:
895 grp_nodeset = NodeSet.fromlist(options.exgroup,
896 resolver=RESOLVER_NOGROUP)
897 for grp in grp_nodeset:
898 removingrp = NodeSet("@" + grp)
899 display.vprint(VERB_DEBUG, \
900 "Excluding nodes from option -X %s: %s" % (grp, removingrp))
901 nodeset_exclude.update(removingrp)
902
903
904 nodeset_base.difference_update(nodeset_exclude)
905 if len(nodeset_base) < 1:
906 parser.error('No node to run on.')
907
908 if options.pick and options.pick < len(nodeset_base):
909
910
911 keep = random.sample(nodeset_base, options.pick)
912 nodeset_base.intersection_update(','.join(keep))
913 if config.verbosity >= VERB_VERB:
914 msg = "Picked random nodes: %s" % nodeset_base
915 print Display.COLOR_RESULT_FMT % msg
916
917
918 set_fdlimit(config.fd_max, display)
919
920
921
922
923
924 interactive = not len(args) and \
925 not (options.copy or options.rcopy)
926
927 stdin_isafgtty = sys.stdin.isatty() and \
928 os.tcgetpgrp(sys.stdin.fileno()) == os.getpgrp()
929
930 if interactive and not stdin_isafgtty:
931
932
933 interactive = False
934
935 ssh_options = config.ssh_options or ''
936 ssh_options += ' -T'
937 config._set_main("ssh_options", ssh_options)
938 if options.nostdin and interactive:
939 parser.error("illegal option `--nostdin' in that case")
940
941
942 user_interaction = hasattr(sys.modules[__name__], '_f_user_interaction')
943 if not options.nostdin:
944
945 stdout_isafgtty = sys.stdout.isatty() and \
946 os.tcgetpgrp(sys.stdout.fileno()) == os.getpgrp()
947 user_interaction |= stdin_isafgtty and stdout_isafgtty
948 display.vprint(VERB_DEBUG, "User interaction: %s" % user_interaction)
949 if user_interaction:
950
951
952
953
954 task = Task()
955
956
957
958 task.set_default("USER_handle_SIGUSR1", user_interaction)
959
960 task.excepthook = sys.excepthook
961 task.set_default("USER_stdin_worker", not (sys.stdin.isatty() or \
962 options.nostdin or \
963 user_interaction))
964 display.vprint(VERB_DEBUG, "Create STDIN worker: %s" % \
965 task.default("USER_stdin_worker"))
966
967 if config.verbosity >= VERB_DEBUG:
968 task.set_info("debug", True)
969 logging.basicConfig(level=logging.DEBUG)
970 logging.debug("clush: STARTING DEBUG")
971 else:
972 logging.basicConfig(level=logging.CRITICAL)
973
974 task.set_info("fanout", config.fanout)
975
976 if options.worker:
977 try:
978 if options.remote == 'no':
979 task.set_default('local_worker',
980 _load_workerclass(options.worker))
981 else:
982 task.set_default('distant_worker',
983 _load_workerclass(options.worker))
984 except (ImportError, AttributeError):
985 msg = "ERROR: Could not load worker '%s'" % options.worker
986 display.vprint_err(VERB_QUIET, msg)
987 clush_exit(1, task)
988
989 if options.topofile or task._default_tree_is_enabled():
990 if options.topofile:
991 task.load_topology(options.topofile)
992 if config.verbosity >= VERB_VERB:
993 roots = len(task.topology.root.nodeset)
994 gws = task.topology.inner_node_count() - roots
995 msg = "enabling tree topology (%d gateways)" % gws
996 print >> sys.stderr, "clush: %s" % msg
997
998 if options.grooming_delay:
999 if config.verbosity >= VERB_VERB:
1000 msg = Display.COLOR_RESULT_FMT % ("Grooming delay: %f" %
1001 options.grooming_delay)
1002 print >> sys.stderr, msg
1003 task.set_info("grooming_delay", options.grooming_delay)
1004 elif options.rcopy:
1005
1006 task.set_info("grooming_delay", 0)
1007
1008 if config.ssh_user:
1009 task.set_info("ssh_user", config.ssh_user)
1010 if config.ssh_path:
1011 task.set_info("ssh_path", config.ssh_path)
1012 if config.ssh_options:
1013 task.set_info("ssh_options", config.ssh_options)
1014 if config.scp_path:
1015 task.set_info("scp_path", config.scp_path)
1016 if config.scp_options:
1017 task.set_info("scp_options", config.scp_options)
1018 if config.rsh_path:
1019 task.set_info("rsh_path", config.rsh_path)
1020 if config.rcp_path:
1021 task.set_info("rcp_path", config.rcp_path)
1022 if config.rsh_options:
1023 task.set_info("rsh_options", config.rsh_options)
1024
1025
1026 task.set_info("connect_timeout", config.connect_timeout)
1027 task.set_info("command_timeout", config.command_timeout)
1028
1029
1030 task.set_default("stderr", not options.gatherall)
1031
1032
1033 task.set_default("stdout_msgtree", display.gather or display.line_mode)
1034
1035
1036 task.set_default("stderr_msgtree", False)
1037
1038
1039 if config.command_timeout > 0:
1040 timeout = config.command_timeout
1041 else:
1042 timeout = -1
1043
1044
1045 task.set_default("USER_interactive", interactive)
1046 task.set_default("USER_running", False)
1047
1048 if (options.copy or options.rcopy) and not args:
1049 parser.error("--[r]copy option requires at least one argument")
1050 if options.copy:
1051 if not options.dest_path:
1052
1053 options.dest_path = join(dirname(abspath(args[0])), '')
1054 op = "copy sources=%s dest=%s" % (args, options.dest_path)
1055 elif options.rcopy:
1056 if not options.dest_path:
1057 options.dest_path = dirname(abspath(args[0]))
1058 op = "rcopy sources=%s dest=%s" % (args, options.dest_path)
1059 else:
1060 op = "command=\"%s\"" % ' '.join(args)
1061
1062
1063
1064 display.vprint(VERB_DEBUG, "clush: nodeset=%s fanout=%d [timeout " \
1065 "conn=%.1f cmd=%.1f] %s" % (nodeset_base, config.fanout,
1066 config.connect_timeout,
1067 config.command_timeout,
1068 op))
1069 if not task.default("USER_interactive"):
1070 if display.verbosity >= VERB_DEBUG and task.topology:
1071 print Display.COLOR_RESULT_FMT % '-' * 15
1072 print Display.COLOR_RESULT_FMT % task.topology,
1073 print Display.COLOR_RESULT_FMT % '-' * 15
1074 if options.copy:
1075 run_copy(task, args, options.dest_path, nodeset_base, timeout,
1076 options.preserve_flag, display)
1077 elif options.rcopy:
1078 run_rcopy(task, args, options.dest_path, nodeset_base, timeout,
1079 options.preserve_flag, display)
1080 else:
1081 run_command(task, ' '.join(args), nodeset_base, timeout, display,
1082 options.remote != 'no')
1083
1084 if user_interaction:
1085 ttyloop(task, nodeset_base, timeout, display, options.remote != 'no')
1086 elif task.default("USER_interactive"):
1087 display.vprint_err(VERB_QUIET, \
1088 "ERROR: interactive mode requires a tty")
1089 clush_exit(1, task)
1090
1091 rc = 0
1092 if options.maxrc:
1093
1094 rc = task.max_retcode()
1095 if task.num_timeout() > 0:
1096 rc = 255
1097 clush_exit(rc, task)
1098
1099 if __name__ == '__main__':
1100 main()
1101