Package ClusterShell :: Package Worker :: Module Exec
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Worker.Exec

  1  # 
  2  # Copyright (C) 2014-2015 CEA/DAM 
  3  # Copyright (C) 2014-2015 Aurelien Degremont <aurelien.degremont@cea.fr> 
  4  # Copyright (C) 2014-2015 Stephane Thiell <sthiell@stanford.edu> 
  5  # 
  6  # This file is part of ClusterShell. 
  7  # 
  8  # ClusterShell is free software; you can redistribute it and/or 
  9  # modify it under the terms of the GNU Lesser General Public 
 10  # License as published by the Free Software Foundation; either 
 11  # version 2.1 of the License, or (at your option) any later version. 
 12  # 
 13  # ClusterShell is distributed in the hope that it will be useful, 
 14  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 16  # Lesser General Public License for more details. 
 17  # 
 18  # You should have received a copy of the GNU Lesser General Public 
 19  # License along with ClusterShell; if not, write to the Free Software 
 20  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 
 21   
 22  """ 
 23  ClusterShell base worker for process-based workers. 
 24   
 25  This module manages the worker class to spawn local commands, possibly using 
 26  a nodeset to behave like a distant worker. Like other workers it can run 
 27  commands or copy files, locally. 
 28   
 29  This is the base class for most of other distant workers. 
 30  """ 
 31   
 32  import os 
 33  from string import Template 
 34   
 35  from ClusterShell.NodeSet import NodeSet 
 36  from ClusterShell.Worker.EngineClient import EngineClient 
 37  from ClusterShell.Worker.Worker import WorkerError, DistantWorker 
 38   
 39   
40 -def _replace_cmd(pattern, node, rank):
41 """ 42 Replace keywords in `pattern' with value from `node' and `rank'. 43 44 %h, %host map `node' 45 %n, %rank map `rank' 46 """ 47 variables = { 48 'h': node, 49 'host': node, 50 'hosts': node, 51 'n': rank or 0, 52 'rank': rank or 0, 53 # 'u': None, 54 } 55 class Replacer(Template): 56 delimiter = '%'
57 try: 58 cmd = Replacer(pattern).substitute(variables) 59 except (KeyError, ValueError), error: 60 msg = "%s is not a valid pattern, use '%%%%' to escape '%%'" % error 61 raise WorkerError(msg) 62 return cmd 63
64 -class ExecClient(EngineClient):
65 """ 66 Run a simple local command. 67 68 Useful as a superclass for other more specific workers. 69 """ 70
71 - def __init__(self, node, command, worker, stderr, timeout, autoclose=False, 72 rank=None):
73 """ 74 Create an EngineClient-type instance to locally run `command'. 75 76 :param node: will be used as key. 77 """ 78 EngineClient.__init__(self, worker, node, stderr, timeout, autoclose) 79 self.rank = rank 80 self.command = command 81 self.popen = None 82 # Declare writer stream to allow early buffering 83 self.streams.set_writer(worker.SNAME_STDIN, None, retain=True)
84
85 - def _build_cmd(self):
86 """ 87 Build the shell command line to start the commmand. 88 89 Return a tuple containing command and arguments as a string or a list 90 of string, and a dict of additional environment variables. None could 91 be returned if no environment change is required. 92 """ 93 return (_replace_cmd(self.command, self.key, self.rank), None)
94
95 - def _start(self):
96 """Prepare command and start client.""" 97 98 # Build command 99 cmd, cmd_env = self._build_cmd() 100 101 # If command line is string, we need to interpret it as a shell command 102 shell = type(cmd) is str 103 104 task = self.worker.task 105 if task.info("debug", False): 106 name = str(self.__class__).upper().split('.')[-1] 107 if shell: 108 task.info("print_debug")(task, "%s: %s" % (name, cmd)) 109 else: 110 task.info("print_debug")(task, "%s: %s" % (name, ' '.join(cmd))) 111 112 self.popen = self._exec_nonblock(cmd, env=cmd_env, shell=shell) 113 self._on_nodeset_start(self.key) 114 return self
115
116 - def _close(self, abort, timeout):
117 """Close client. See EngineClient._close().""" 118 if abort: 119 # it's safer to call poll() first for long time completed processes 120 prc = self.popen.poll() 121 # if prc is None, process is still running 122 if prc is None: 123 try: # try to kill it 124 self.popen.kill() 125 except OSError: 126 pass 127 prc = self.popen.wait() 128 129 self.streams.clear() 130 131 if prc >= 0: 132 self._on_nodeset_rc(self.key, prc) 133 elif timeout: 134 assert abort, "abort flag not set on timeout" 135 self.worker._on_node_timeout(self.key) 136 elif not abort: 137 # if process was signaled, return 128 + signum (bash-like) 138 self._on_nodeset_rc(self.key, 128 + -prc) 139 140 self.worker._check_fini()
141
142 - def _on_nodeset_start(self, nodes):
143 """local wrapper over _on_start that can also handle nodeset""" 144 if isinstance(nodes, NodeSet): 145 for node in nodes: 146 self.worker._on_start(node) 147 else: 148 self.worker._on_start(nodes)
149
150 - def _on_nodeset_rc(self, nodes, rc):
151 """local wrapper over _on_node_rc that can also handle nodeset""" 152 if isinstance(nodes, NodeSet): 153 for node in nodes: 154 self.worker._on_node_rc(node, rc) 155 else: 156 self.worker._on_node_rc(nodes, rc)
157
158 - def _on_nodeset_msgline(self, nodes, msg, sname):
159 """local wrapper over _on_node_msgline that can also handle nodeset""" 160 if isinstance(nodes, NodeSet): 161 for node in nodes: 162 self.worker._on_node_msgline(node, msg, sname) 163 else: 164 self.worker._on_node_msgline(nodes, msg, sname)
165
166 - def _flush_read(self, sname):
167 """Called at close time to flush stream read buffer.""" 168 stream = self.streams[sname] 169 if stream.readable() and stream.rbuf: 170 # We still have some read data available in buffer, but no 171 # EOL. Generate a final message before closing. 172 self._on_nodeset_msgline(self.key, stream.rbuf, sname)
173
174 - def _handle_read(self, sname):
175 """ 176 Handle a read notification. Called by the engine as the result of an 177 event indicating that a read is available. 178 """ 179 # Local variables optimization 180 worker = self.worker 181 task = worker.task 182 key = self.key 183 node_msgline = self._on_nodeset_msgline 184 debug = task.info("debug", False) 185 if debug: 186 print_debug = task.info("print_debug") 187 for msg in self._readlines(sname): 188 if debug: 189 print_debug(task, "%s: %s" % (key, msg)) 190 node_msgline(key, msg, sname) # handle full msg line
191
192 -class CopyClient(ExecClient):
193 """ 194 Run a local `cp' between a source and destination. 195 196 Destination could be a directory. 197 """ 198
199 - def __init__(self, node, source, dest, worker, stderr, timeout, autoclose, 200 preserve, reverse, rank=None):
201 """Create an EngineClient-type instance to locally run 'cp'.""" 202 ExecClient.__init__(self, node, None, worker, stderr, timeout, 203 autoclose, rank) 204 self.source = source 205 self.dest = dest 206 207 # Preserve modification times and modes? 208 self.preserve = preserve 209 210 # Reverse copy? 211 self.reverse = reverse 212 213 # Directory? 214 # FIXME: file sanity checks could be moved to Copy._start() as we 215 # should now be able to handle error when starting (#215). 216 if self.reverse: 217 self.isdir = os.path.isdir(self.dest) 218 if not self.isdir: 219 raise ValueError("reverse copy dest must be a directory") 220 else: 221 self.isdir = os.path.isdir(self.source)
222
223 - def _build_cmd(self):
224 """ 225 Build the shell command line to start the rcp commmand. 226 Return an array of command and arguments. 227 """ 228 source = _replace_cmd(self.source, self.key, self.rank) 229 dest = _replace_cmd(self.dest, self.key, self.rank) 230 231 cmd_l = [ "cp" ] 232 233 if self.isdir: 234 cmd_l.append("-r") 235 236 if self.preserve: 237 cmd_l.append("-p") 238 239 if self.reverse: 240 cmd_l.append(dest) 241 cmd_l.append(source) 242 else: 243 cmd_l.append(source) 244 cmd_l.append(dest) 245 246 return (cmd_l, None)
247 248
249 -class ExecWorker(DistantWorker):
250 """ 251 ClusterShell simple execution worker Class. 252 253 It runs commands locally. If a node list is provided, one command will be 254 launched for each node and specific keywords will be replaced based on node 255 name and rank. 256 257 Local shell usage example: 258 259 >>> worker = ExecWorker(nodeset, handler=MyEventHandler(), 260 ... timeout=30, command="/bin/uptime") 261 >>> task.schedule(worker) # schedule worker for execution 262 >>> task.run() # run 263 264 Local copy usage example: 265 266 >>> worker = ExecWorker(nodeset, handler=MyEventHandler(), 267 ... source="/etc/my.cnf", 268 ... dest="/etc/my.cnf.bak") 269 >>> task.schedule(worker) # schedule worker for execution 270 >>> task.run() # run 271 272 connect_timeout option is ignored by this worker. 273 """ 274 275 SHELL_CLASS = ExecClient 276 COPY_CLASS = CopyClient 277
278 - def __init__(self, nodes, handler, timeout=None, **kwargs):
279 """Create an ExecWorker and its engine client instances.""" 280 DistantWorker.__init__(self, handler) 281 self._close_count = 0 282 self._has_timeout = False 283 self._clients = [] 284 285 self.nodes = NodeSet(nodes) 286 self.command = kwargs.get('command') 287 self.source = kwargs.get('source') 288 self.dest = kwargs.get('dest') 289 290 self._create_clients(timeout=timeout, **kwargs)
291 292 # 293 # Spawn and manage EngineClient classes 294 # 295
296 - def _create_clients(self, **kwargs):
297 """ 298 Create several shell and copy engine client instances based on worker 299 properties. 300 301 Additional arguments in `kwargs' will be used for client creation. 302 There will be one client per node in self.nodes 303 """ 304 # do not iterate if special %hosts placeholder is found in command 305 if self.command and ('%hosts' in self.command or 306 '%{hosts}' in self.command): 307 self._add_client(self.nodes, rank=None, **kwargs) 308 else: 309 for rank, node in enumerate(self.nodes): 310 self._add_client(node, rank=rank, **kwargs)
311
312 - def _add_client(self, nodes, **kwargs):
313 """Create one shell or copy client.""" 314 autoclose = kwargs.get('autoclose', False) 315 stderr = kwargs.get('stderr', False) 316 rank = kwargs.get('rank') 317 timeout = kwargs.get('timeout') 318 319 if self.command is not None: 320 cls = self.__class__.SHELL_CLASS 321 self._clients.append(cls(nodes, self.command, self, stderr, 322 timeout, autoclose, rank)) 323 elif self.source: 324 cls = self.__class__.COPY_CLASS 325 self._clients.append(cls(nodes, self.source, self.dest, self, 326 stderr, timeout, autoclose, 327 kwargs.get('preserve', False), 328 kwargs.get('reverse', False), rank)) 329 else: 330 raise ValueError("missing command or source parameter in " 331 "worker constructor")
332
333 - def _engine_clients(self):
334 """ 335 Used by upper layer to get the list of underlying created engine 336 clients. 337 """ 338 return self._clients
339
340 - def write(self, buf, sname=None):
341 """Write to worker clients.""" 342 sname = sname or self.SNAME_STDIN 343 for client in self._clients: 344 if sname in client.streams: 345 client._write(sname, buf)
346
347 - def set_write_eof(self, sname=None):
348 """ 349 Tell worker to close its writer file descriptors once flushed. Do not 350 perform writes after this call. 351 """ 352 for client in self._clients: 353 client._set_write_eof(sname or self.SNAME_STDIN)
354
355 - def abort(self):
356 """Abort processing any action by this worker.""" 357 for client in self._clients: 358 client.abort()
359 360 # 361 # Events 362 # 363
364 - def _on_node_timeout(self, node):
365 DistantWorker._on_node_timeout(self, node) 366 self._has_timeout = True
367
368 - def _check_fini(self):
369 """ 370 Must be called by each client when closing. 371 372 If they are all closed, trigger the required events. 373 """ 374 self._close_count += 1 375 assert self._close_count <= len(self._clients) 376 if self._close_count == len(self._clients) and self.eh: 377 if self._has_timeout: 378 self.eh.ev_timeout(self) 379 self.eh.ev_close(self)
380 381 WORKER_CLASS = ExecWorker 382