Package ClusterShell :: Package Worker :: Module Tree
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Worker.Tree

  1  # 
  2  # Copyright (C) 2011-2016 CEA/DAM 
  3  # Copyright (C) 2015-2016 Stephane Thiell <sthiell@stanford.edu> 
  4  # 
  5  # This file is part of ClusterShell. 
  6  # 
  7  # ClusterShell is free software; you can redistribute it and/or 
  8  # modify it under the terms of the GNU Lesser General Public 
  9  # License as published by the Free Software Foundation; either 
 10  # version 2.1 of the License, or (at your option) any later version. 
 11  # 
 12  # ClusterShell is distributed in the hope that it will be useful, 
 13  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 14  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 15  # Lesser General Public License for more details. 
 16  # 
 17  # You should have received a copy of the GNU Lesser General Public 
 18  # License along with ClusterShell; if not, write to the Free Software 
 19  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 
 20  # This file is part of the ClusterShell library. 
 21   
 22  """ 
 23  ClusterShell v2 tree propagation worker 
 24  """ 
 25   
 26  import base64 
 27  import logging 
 28  import os 
 29  from os.path import basename, dirname, isfile, normpath 
 30  import tarfile 
 31  import tempfile 
 32   
 33  from ClusterShell.Event import EventHandler 
 34  from ClusterShell.NodeSet import NodeSet 
 35  from ClusterShell.Worker.Worker import DistantWorker, WorkerError 
 36  from ClusterShell.Worker.Exec import ExecWorker 
 37   
 38  from ClusterShell.Propagation import PropagationTreeRouter 
 39   
 40   
41 -class MetaWorkerEventHandler(EventHandler):
42 """Handle events for the meta worker WorkerTree""" 43
44 - def __init__(self, metaworker):
45 self.metaworker = metaworker 46 self.logger = logging.getLogger(__name__)
47
48 - def ev_start(self, worker):
49 """ 50 Called to indicate that a worker has just started. 51 """ 52 self.logger.debug("MetaWorkerEventHandler: ev_start") 53 self.metaworker._start_count += 1 54 self.metaworker._check_ini()
55
56 - def ev_read(self, worker):
57 """ 58 Called to indicate that a worker has data to read. 59 """ 60 self.metaworker._on_node_msgline(worker.current_node, 61 worker.current_msg, 62 'stdout')
63
64 - def ev_error(self, worker):
65 """ 66 Called to indicate that a worker has error to read (on stderr). 67 """ 68 self.metaworker._on_node_msgline(worker.current_node, 69 worker.current_errmsg, 70 'stderr')
71
72 - def ev_written(self, worker, node, sname, size):
73 """ 74 Called to indicate that writing has been done. 75 """ 76 metaworker = self.metaworker 77 metaworker.current_node = node 78 metaworker.current_sname = sname 79 if metaworker.eh: 80 metaworker.eh.ev_written(metaworker, node, sname, size)
81
82 - def ev_hup(self, worker):
83 """ 84 Called to indicate that a worker's connection has been closed. 85 """ 86 self.metaworker._on_node_rc(worker.current_node, worker.current_rc)
87
88 - def ev_timeout(self, worker):
89 """ 90 Called to indicate that a worker has timed out (worker timeout only). 91 """ 92 # WARNING!!! this is not possible as metaworking is changing task's 93 # shared timeout set! 94 #for node in worker.iter_keys_timeout(): 95 # self.metaworker._on_node_timeout(node) 96 # we use NodeSet to copy set 97 self.logger.debug("MetaWorkerEventHandler: ev_timeout") 98 for node in NodeSet._fromlist1(worker.iter_keys_timeout()): 99 self.metaworker._on_node_timeout(node)
100
101 - def ev_close(self, worker):
102 """ 103 Called to indicate that a worker has just finished (it may already 104 have failed on timeout). 105 """ 106 self.logger.debug("MetaWorkerEventHandler: ev_close") 107 self.metaworker._check_fini()
108 #self._completed += 1 109 #if self._completed >= self.grpcount: 110 # metaworker = self.metaworker 111 # metaworker.eh.ev_close(metaworker) 112 113
114 -class WorkerTree(DistantWorker):
115 """ 116 ClusterShell tree worker Class. 117 118 """ 119 # copy and rcopy tar command formats 120 # the choice of single or double quotes is essential 121 UNTAR_CMD_FMT = "tar -xf - -C '%s'" 122 TAR_CMD_FMT = "tar -cf - -C '%s' " \ 123 "--transform \"s,^\\([^/]*\\)[/]*,\\1.$(hostname -s)/,\" " \ 124 "'%s' | base64 -w 65536" 125
126 - def __init__(self, nodes, handler, timeout, **kwargs):
127 """ 128 Initialize Tree worker instance. 129 130 :param nodes: Targeted nodeset. 131 :param handler: Worker EventHandler. 132 :param timeout: Timeout value for worker. 133 :param command: Command to execute. 134 :param topology: Force specific TopologyTree. 135 :param newroot: Root node of TopologyTree. 136 """ 137 DistantWorker.__init__(self, handler) 138 139 self.logger = logging.getLogger(__name__) 140 self.workers = [] 141 self.nodes = NodeSet(nodes) 142 self.timeout = timeout 143 self.command = kwargs.get('command') 144 self.source = kwargs.get('source') 145 self.dest = kwargs.get('dest') 146 autoclose = kwargs.get('autoclose', False) 147 self.stderr = kwargs.get('stderr', False) 148 self.logger.debug("stderr=%s", self.stderr) 149 self.remote = kwargs.get('remote', True) 150 self.preserve = kwargs.get('preserve', None) 151 self.reverse = kwargs.get('reverse', False) 152 self._rcopy_bufs = {} 153 self._rcopy_tars = {} 154 self._close_count = 0 155 self._start_count = 0 156 self._child_count = 0 157 self._target_count = 0 158 self._has_timeout = False 159 160 if self.command is None and self.source is None: 161 raise ValueError("missing command or source parameter in " 162 "WorkerTree constructor") 163 164 # rcopy is enforcing separated stderr to handle tar error messages 165 # because stdout is used for data transfer 166 if self.source and self.reverse: 167 self.stderr = True 168 169 # build gateway invocation command 170 invoke_gw_args = [] 171 for envname in ('PYTHONPATH', 172 'CLUSTERSHELL_GW_LOG_DIR', 173 'CLUSTERSHELL_GW_LOG_LEVEL', 174 'CLUSTERSHELL_GW_B64_LINE_LENGTH'): 175 envval = os.getenv(envname) 176 if envval: 177 invoke_gw_args.append("%s=%s" % (envname, envval)) 178 invoke_gw_args.append("python -m ClusterShell/Gateway -Bu") 179 self.invoke_gateway = ' '.join(invoke_gw_args) 180 181 self.topology = kwargs.get('topology') 182 if self.topology is not None: 183 self.newroot = kwargs.get('newroot') or \ 184 str(self.topology.root.nodeset) 185 self.router = PropagationTreeRouter(self.newroot, self.topology) 186 else: 187 self.router = None 188 189 self.upchannel = None 190 191 self.metahandler = MetaWorkerEventHandler(self) 192 193 # gateway -> active targets selection 194 self.gwtargets = {}
195
196 - def _set_task(self, task):
197 """ 198 Bind worker to task. Called by task.schedule(). 199 WorkerTree metaworker: override to schedule sub-workers. 200 """ 201 ##if fanout is None: 202 ## fanout = self.router.fanout 203 ##self.task.set_info('fanout', fanout) 204 205 DistantWorker._set_task(self, task) 206 # Now bound to task - initalize router 207 self.topology = self.topology or task.topology 208 self.router = self.router or task._default_router() 209 self._launch(self.nodes) 210 self._check_ini()
211
212 - def _launch(self, nodes):
213 self.logger.debug("WorkerTree._launch on %s (fanout=%d)", nodes, 214 self.task.info("fanout")) 215 216 # Prepare copy params if source is defined 217 destdir = None 218 if self.source: 219 if self.reverse: 220 self.logger.debug("rcopy source=%s, dest=%s", self.source, 221 self.dest) 222 # dest is a directory 223 destdir = self.dest 224 else: 225 self.logger.debug("copy source=%s, dest=%s", self.source, 226 self.dest) 227 # Special processing to determine best arcname and destdir for 228 # tar. The only case that we don't support is when source is a 229 # file and dest is a dir without a finishing / (in that case we 230 # cannot determine remotely whether it is a file or a 231 # directory). 232 if isfile(self.source): 233 # dest is not normalized here 234 arcname = basename(self.dest) or \ 235 basename(normpath(self.source)) 236 destdir = dirname(self.dest) 237 else: 238 arcname = basename(normpath(self.source)) 239 destdir = os.path.normpath(self.dest) 240 self.logger.debug("copy arcname=%s destdir=%s", arcname, 241 destdir) 242 243 # And launch stuffs 244 next_hops = self._distribute(self.task.info("fanout"), nodes.copy()) 245 self.logger.debug("next_hops=%s" 246 % [(str(n), str(v)) for n, v in next_hops.items()]) 247 for gw, targets in next_hops.iteritems(): 248 if gw == targets: 249 self.logger.debug('task.shell cmd=%s source=%s nodes=%s ' 250 'timeout=%s remote=%s', self.command, 251 self.source, nodes, self.timeout, self.remote) 252 self._child_count += 1 253 self._target_count += len(targets) 254 if self.remote: 255 if self.source: 256 # Note: specific case where targets are not in topology 257 # as self.source is never used on remote gateways 258 # so we try a direct copy/rcopy: 259 self.logger.debug('_launch copy r=%s source=%s dest=%s', 260 self.reverse, self.source, self.dest) 261 worker = self.task.copy(self.source, self.dest, targets, 262 handler=self.metahandler, 263 stderr=self.stderr, 264 timeout=self.timeout, 265 preserve=self.preserve, 266 reverse=self.reverse, 267 tree=False) 268 else: 269 worker = self.task.shell(self.command, 270 nodes=targets, 271 timeout=self.timeout, 272 handler=self.metahandler, 273 stderr=self.stderr, 274 tree=False) 275 else: 276 assert self.source is None 277 worker = ExecWorker(nodes=targets, 278 command=self.command, 279 handler=self.metahandler, 280 timeout=self.timeout, 281 stderr=self.stderr) 282 self.task.schedule(worker) 283 284 self.workers.append(worker) 285 self.logger.debug("added child worker %s count=%d", worker, 286 len(self.workers)) 287 else: 288 self.logger.debug("trying gateway %s to reach %s", gw, targets) 289 if self.source: 290 self._copy_remote(self.source, destdir, targets, gw, 291 self.timeout, self.reverse) 292 else: 293 self._execute_remote(self.command, targets, gw, 294 self.timeout) 295 296 # Copy mode: send tar data after above workers have been initialized 297 if self.source and not self.reverse: 298 try: 299 # create temporary tar file with all source files 300 tmptar = tempfile.TemporaryFile() 301 tar = tarfile.open(fileobj=tmptar, mode='w:') 302 tar.add(self.source, arcname=arcname) 303 tar.close() 304 tmptar.flush() 305 # read generated tar file 306 tmptar.seek(0) 307 rbuf = tmptar.read(32768) 308 # send tar data to remote targets only 309 while len(rbuf) > 0: 310 self._write_remote(rbuf) 311 rbuf = tmptar.read(32768) 312 except OSError, exc: 313 raise WorkerError(exc)
314
315 - def _distribute(self, fanout, dst_nodeset):
316 """distribute target nodes between next hop gateways""" 317 distribution = {} 318 self.router.fanout = fanout 319 320 for gw, dstset in self.router.dispatch(dst_nodeset): 321 if gw in distribution: 322 distribution[gw].add(dstset) 323 else: 324 distribution[gw] = dstset 325 return distribution
326
327 - def _copy_remote(self, source, dest, targets, gateway, timeout, reverse):
328 """run a remote copy in tree mode (using gateway)""" 329 self.logger.debug("_copy_remote gateway=%s source=%s dest=%s " 330 "reverse=%s", gateway, source, dest, reverse) 331 332 self._target_count += len(targets) 333 334 self.gwtargets[gateway] = targets.copy() 335 336 # tar commands are built here and launched on targets 337 if reverse: 338 # these weird replace calls aim to escape single quotes ' within '' 339 srcdir = dirname(source).replace("'", '\'\"\'\"\'') 340 srcbase = basename(normpath(self.source)).replace("'", '\'\"\'\"\'') 341 cmd = self.TAR_CMD_FMT % (srcdir, srcbase) 342 else: 343 cmd = self.UNTAR_CMD_FMT % dest.replace("'", '\'\"\'\"\'') 344 345 self.logger.debug('_copy_remote: tar cmd: %s', cmd) 346 347 pchan = self.task._pchannel(gateway, self) 348 pchan.shell(nodes=targets, command=cmd, worker=self, timeout=timeout, 349 stderr=self.stderr, gw_invoke_cmd=self.invoke_gateway, 350 remote=self.remote)
351 352
353 - def _execute_remote(self, cmd, targets, gateway, timeout):
354 """run command against a remote node via a gateway""" 355 self.logger.debug("_execute_remote gateway=%s cmd=%s targets=%s", 356 gateway, cmd, targets) 357 358 self._target_count += len(targets) 359 360 self.gwtargets[gateway] = targets.copy() 361 362 pchan = self.task._pchannel(gateway, self) 363 pchan.shell(nodes=targets, command=cmd, worker=self, timeout=timeout, 364 stderr=self.stderr, gw_invoke_cmd=self.invoke_gateway, 365 remote=self.remote)
366
367 - def _engine_clients(self):
368 """ 369 Access underlying engine clients. 370 """ 371 return []
372
373 - def _on_remote_node_msgline(self, node, msg, sname, gateway):
374 """remote msg received""" 375 if not self.source or not self.reverse or sname != 'stdout': 376 DistantWorker._on_node_msgline(self, node, msg, sname) 377 return 378 379 # rcopy only: we expect base64 encoded tar content on stdout 380 encoded = self._rcopy_bufs.setdefault(node, '') + msg 381 if node not in self._rcopy_tars: 382 self._rcopy_tars[node] = tempfile.TemporaryFile() 383 384 # partial base64 decoding requires a multiple of 4 characters 385 encoded_sz = (len(encoded) // 4) * 4 386 # write decoded binary msg to node temporary tarfile 387 self._rcopy_tars[node].write(base64.b64decode(encoded[0:encoded_sz])) 388 # keep trailing encoded chars for next time 389 self._rcopy_bufs[node] = encoded[encoded_sz:]
390
391 - def _on_remote_node_rc(self, node, rc, gateway):
392 """remote rc received""" 393 DistantWorker._on_node_rc(self, node, rc) 394 self.logger.debug("_on_remote_node_rc %s %s via gw %s", node, 395 self._close_count, gateway) 396 397 # finalize rcopy: extract tar data 398 if self.source and self.reverse: 399 for node, buf in self._rcopy_bufs.iteritems(): 400 tarfileobj = self._rcopy_tars[node] 401 if len(buf) > 0: 402 self.logger.debug("flushing node %s buf %d bytes", node, 403 len(buf)) 404 tarfileobj.write(buf) 405 tarfileobj.flush() 406 tarfileobj.seek(0) 407 try: 408 tmptar = tarfile.open(fileobj=tarfileobj) 409 try: 410 self.logger.debug("%s extracting %d members in dest %s", 411 node, len(tmptar.getmembers()), 412 self.dest) 413 tmptar.extractall(path=self.dest) 414 except IOError, ex: 415 self._on_remote_node_msgline(node, ex, 'stderr', 416 gateway) 417 # note: try-except-finally not supported before python 2.5 418 finally: 419 tmptar.close() 420 self._rcopy_bufs = {} 421 self._rcopy_tars = {} 422 423 self.gwtargets[gateway].remove(node) 424 self._close_count += 1 425 self._check_fini(gateway)
426
427 - def _on_remote_node_timeout(self, node, gateway):
428 """remote node timeout received""" 429 DistantWorker._on_node_timeout(self, node) 430 self.logger.debug("_on_remote_node_timeout %s via gw %s", node, gateway) 431 self._close_count += 1 432 self._has_timeout = True 433 self.gwtargets[gateway].remove(node) 434 self._check_fini(gateway)
435
436 - def _on_node_rc(self, node, rc):
437 DistantWorker._on_node_rc(self, node, rc) 438 self.logger.debug("_on_node_rc %s %s (%s)", node, rc, self._close_count) 439 self._close_count += 1
440
441 - def _on_node_timeout(self, node):
442 DistantWorker._on_node_timeout(self, node) 443 self._close_count += 1 444 self._has_timeout = True
445
446 - def _check_ini(self):
447 self.logger.debug("WorkerTree: _check_ini (%d, %d)", self._start_count, 448 self._child_count) 449 if self.eh and self._start_count >= self._child_count: 450 self.eh.ev_start(self)
451
452 - def _check_fini(self, gateway=None):
453 self.logger.debug("check_fini %s %s", self._close_count, 454 self._target_count) 455 if self._close_count >= self._target_count: 456 handler = self.eh 457 if handler: 458 if self._has_timeout: 459 handler.ev_timeout(self) 460 handler.ev_close(self) 461 462 # check completion of targets per gateway 463 if gateway: 464 targets = self.gwtargets[gateway] 465 if not targets: 466 # no more active targets for this gateway 467 self.logger.debug("WorkerTree._check_fini %s call pchannel_" 468 "release for gw %s", self, gateway) 469 self.task._pchannel_release(gateway, self) 470 del self.gwtargets[gateway]
471
472 - def _write_remote(self, buf):
473 """Write buf to remote clients only.""" 474 for gateway, targets in self.gwtargets.items(): 475 assert len(targets) > 0 476 self.task._pchannel(gateway, self).write(nodes=targets, buf=buf, 477 worker=self)
478
479 - def write(self, buf):
480 """Write to worker clients.""" 481 osexc = None 482 # Differentiate directly handled writes from remote ones 483 for worker in self.workers: 484 try: 485 worker.write(buf) 486 except OSError, exc: 487 osexc = exc 488 489 self._write_remote(buf) 490 491 if osexc: 492 raise osexc
493
494 - def set_write_eof(self):
495 """ 496 Tell worker to close its writer file descriptor once flushed. Do not 497 perform writes after this call. 498 """ 499 # Differentiate directly handled EOFs from remote ones 500 for worker in self.workers: 501 worker.set_write_eof() 502 for gateway, targets in self.gwtargets.items(): 503 assert len(targets) > 0 504 self.task._pchannel(gateway, self).set_write_eof(nodes=targets, 505 worker=self)
506
507 - def abort(self):
508 """Abort processing any action by this worker.""" 509 # Not yet supported by WorkerTree 510 raise NotImplementedError("see github issue #229")
511