Package ClusterShell :: Package Worker :: Module Pdsh
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Worker.Pdsh

  1  # 
  2  # Copyright (C) 2007-2016 CEA/DAM 
  3  # 
  4  # This file is part of ClusterShell. 
  5  # 
  6  # ClusterShell is free software; you can redistribute it and/or 
  7  # modify it under the terms of the GNU Lesser General Public 
  8  # License as published by the Free Software Foundation; either 
  9  # version 2.1 of the License, or (at your option) any later version. 
 10  # 
 11  # ClusterShell is distributed in the hope that it will be useful, 
 12  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # Lesser General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU Lesser General Public 
 17  # License along with ClusterShell; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 
 19   
 20  """ 
 21  WorkerPdsh 
 22   
 23  ClusterShell worker for executing commands with LLNL pdsh. 
 24  """ 
 25   
 26  import errno 
 27  import os 
 28  import shlex 
 29   
 30  from ClusterShell.NodeSet import NodeSet 
 31  from ClusterShell.Worker.EngineClient import EngineClientError 
 32  from ClusterShell.Worker.EngineClient import EngineClientNotSupportedError 
 33  from ClusterShell.Worker.Worker import WorkerError 
 34  from ClusterShell.Worker.Exec import ExecWorker, ExecClient, CopyClient 
 35   
 36   
37 -class PdshClient(ExecClient):
38 """EngineClient which run 'pdsh'""" 39 40 MODE = 'pdsh' 41
42 - def __init__(self, node, command, worker, stderr, timeout, autoclose=False, 43 rank=None):
44 ExecClient.__init__(self, node, command, worker, stderr, timeout, 45 autoclose, rank) 46 self._closed_nodes = NodeSet()
47
48 - def _build_cmd(self):
49 """ 50 Build the shell command line to start the commmand. 51 Return an array of command and arguments. 52 """ 53 task = self.worker.task 54 pdsh_env = {} 55 56 # Build pdsh command 57 path = task.info("pdsh_path") or "pdsh" 58 cmd_l = [os.path.expanduser(pathc) for pathc in shlex.split(path)] 59 cmd_l.append("-b") 60 61 fanout = task.info("fanout", 0) 62 if fanout > 0: 63 cmd_l.append("-f %d" % fanout) 64 65 # Pdsh flag '-t' do not really works well. Better to use 66 # PDSH_SSH_ARGS_APPEND variable to transmit ssh ConnectTimeout 67 # flag. 68 connect_timeout = task.info("connect_timeout", 0) 69 if connect_timeout > 0: 70 pdsh_env['PDSH_SSH_ARGS_APPEND'] = "-o ConnectTimeout=%d" % \ 71 connect_timeout 72 73 command_timeout = task.info("command_timeout", 0) 74 if command_timeout > 0: 75 cmd_l.append("-u %d" % command_timeout) 76 77 cmd_l.append("-w %s" % self.key) 78 cmd_l.append("%s" % self.command) 79 80 return (cmd_l, pdsh_env)
81
82 - def _close(self, abort, timeout):
83 """Close client. See EngineClient._close().""" 84 if abort: 85 # it's safer to call poll() first for long time completed processes 86 prc = self.popen.poll() 87 # if prc is None, process is still running 88 if prc is None: 89 try: # try to kill it 90 self.popen.kill() 91 except OSError: 92 pass 93 prc = self.popen.wait() 94 95 if prc > 0: 96 raise WorkerError("Cannot run pdsh (error %d)" % prc) 97 98 self.streams.clear() 99 100 if timeout: 101 assert abort, "abort flag not set on timeout" 102 for node in (self.key - self._closed_nodes): 103 self.worker._on_node_timeout(node) 104 else: 105 for node in (self.key - self._closed_nodes): 106 self.worker._on_node_rc(node, 0) 107 108 self.worker._check_fini()
109
110 - def _parse_line(self, line, sname):
111 """ 112 Parse Pdsh line syntax. 113 """ 114 if line.startswith("pdsh@") or \ 115 line.startswith("pdcp@") or \ 116 line.startswith("sending "): 117 try: 118 # pdsh@cors113: cors115: ssh exited with exit code 1 119 # 0 1 2 3 4 5 6 7 120 # corsUNKN: ssh: corsUNKN: Name or service not known 121 # 0 1 2 3 4 5 6 7 122 # pdsh@fortoy0: fortoy101: command timeout 123 # 0 1 2 3 124 # sending SIGTERM to ssh fortoy112 pid 32014 125 # 0 1 2 3 4 5 6 126 # pdcp@cors113: corsUNKN: ssh exited with exit code 255 127 # 0 1 2 3 4 5 6 7 128 # pdcp@cors113: cors115: fatal: /var/cache/shine/... 129 # 0 1 2 3... 130 words = line.split() 131 # Set return code for nodename of worker 132 if self.MODE == 'pdsh': 133 if len(words) == 4 and words[2] == "command" and \ 134 words[3] == "timeout": 135 pass 136 elif len(words) == 8 and words[3] == "exited" and \ 137 words[7].isdigit(): 138 self._closed_nodes.add(words[1][:-1]) 139 self.worker._on_node_rc(words[1][:-1], int(words[7])) 140 elif self.MODE == 'pdcp': 141 self._closed_nodes.add(words[1][:-1]) 142 self.worker._on_node_rc(words[1][:-1], errno.ENOENT) 143 144 except Exception, exc: 145 raise EngineClientError("Pdsh parser error: %s" % exc) 146 else: 147 # split pdsh reply "nodename: msg" 148 nodename, msg = line.split(': ', 1) 149 self.worker._on_node_msgline(nodename, msg, sname)
150
151 - def _flush_read(self, sname):
152 """Called at close time to flush stream read buffer.""" 153 pass
154
155 - def _handle_read(self, sname):
156 """Engine is telling us a read is available.""" 157 debug = self.worker.task.info("debug", False) 158 if debug: 159 print_debug = self.worker.task.info("print_debug") 160 161 suffix = "" 162 if sname == 'stderr': 163 suffix = "@STDERR" 164 165 for msg in self._readlines(sname): 166 if debug: 167 print_debug(self.worker.task, "PDSH%s: %s" % (suffix, msg)) 168 self._parse_line(msg, sname)
169 170
171 -class PdcpClient(CopyClient, PdshClient):
172 """EngineClient when pdsh is run to copy file, using pdcp.""" 173 174 MODE = 'pdcp' 175
176 - def _build_cmd(self):
177 178 cmd_l = [] 179 180 # Build pdcp command 181 if self.reverse: 182 path = self.worker.task.info("rpdcp_path") or "rpdcp" 183 else: 184 path = self.worker.task.info("pdcp_path") or "pdcp" 185 cmd_l = [os.path.expanduser(pathc) for pathc in shlex.split(path)] 186 cmd_l.append("-b") 187 188 fanout = self.worker.task.info("fanout", 0) 189 if fanout > 0: 190 cmd_l.append("-f %d" % fanout) 191 192 connect_timeout = self.worker.task.info("connect_timeout", 0) 193 if connect_timeout > 0: 194 cmd_l.append("-t %d" % connect_timeout) 195 196 cmd_l.append("-w %s" % self.key) 197 198 if self.isdir: 199 cmd_l.append("-r") 200 201 if self.preserve: 202 cmd_l.append("-p") 203 204 cmd_l.append(self.source) 205 cmd_l.append(self.dest) 206 207 return (cmd_l, None)
208 209
210 -class WorkerPdsh(ExecWorker):
211 """ 212 ClusterShell pdsh-based worker Class. 213 214 Remote Shell (pdsh) usage example: 215 >>> worker = WorkerPdsh(nodeset, handler=MyEventHandler(), 216 ... timeout=30, command="/bin/hostname") 217 >>> task.schedule(worker) # schedule worker for execution 218 >>> task.resume() # run 219 220 Remote Copy (pdcp) usage example: 221 >>> worker = WorkerPdsh(nodeset, handler=MyEventHandler(), 222 ... timeout=30, source="/etc/my.conf", 223 ... dest="/etc/my.conf") 224 >>> task.schedule(worker) # schedule worker for execution 225 >>> task.resume() # run 226 227 Known limitations: 228 - write() is not supported by WorkerPdsh 229 - return codes == 0 are not garanteed when a timeout is used (rc > 0 230 are fine) 231 """ 232 233 SHELL_CLASS = PdshClient 234 COPY_CLASS = PdcpClient 235 236 # 237 # Spawn and control 238 # 239
240 - def _create_clients(self, **kwargs):
241 self._add_client(self.nodes, **kwargs)
242
243 - def write(self, buf):
244 """ 245 Write data to process. Not supported with Pdsh worker. 246 """ 247 raise EngineClientNotSupportedError("writing is not supported by pdsh " 248 "worker")
249
250 - def set_write_eof(self):
251 """ 252 Tell worker to close its writer file descriptor once flushed. Do not 253 perform writes after this call. 254 255 Not supported by PDSH Worker. 256 """ 257 raise EngineClientNotSupportedError("writing is not supported by pdsh " 258 "worker")
259 260 WORKER_CLASS = WorkerPdsh 261