1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 ClusterShell v2 tree propagation worker
24 """
25
26 import base64
27 import logging
28 import os
29 from os.path import basename, dirname, isfile, normpath
30 import tarfile
31 import tempfile
32
33 from ClusterShell.Event import EventHandler
34 from ClusterShell.NodeSet import NodeSet
35 from ClusterShell.Worker.Worker import DistantWorker, WorkerError
36 from ClusterShell.Worker.Exec import ExecWorker
37
38 from ClusterShell.Propagation import PropagationTreeRouter
39
40
108
109
110
111
112
113
115 """
116 ClusterShell tree worker Class.
117
118 """
119
120
121 UNTAR_CMD_FMT = "tar -xf - -C '%s'"
122 TAR_CMD_FMT = "tar -cf - -C '%s' " \
123 "--transform \"s,^\\([^/]*\\)[/]*,\\1.$(hostname -s)/,\" " \
124 "'%s' | base64 -w 65536"
125
126 - def __init__(self, nodes, handler, timeout, **kwargs):
127 """
128 Initialize Tree worker instance.
129
130 :param nodes: Targeted nodeset.
131 :param handler: Worker EventHandler.
132 :param timeout: Timeout value for worker.
133 :param command: Command to execute.
134 :param topology: Force specific TopologyTree.
135 :param newroot: Root node of TopologyTree.
136 """
137 DistantWorker.__init__(self, handler)
138
139 self.logger = logging.getLogger(__name__)
140 self.workers = []
141 self.nodes = NodeSet(nodes)
142 self.timeout = timeout
143 self.command = kwargs.get('command')
144 self.source = kwargs.get('source')
145 self.dest = kwargs.get('dest')
146 autoclose = kwargs.get('autoclose', False)
147 self.stderr = kwargs.get('stderr', False)
148 self.logger.debug("stderr=%s", self.stderr)
149 self.remote = kwargs.get('remote', True)
150 self.preserve = kwargs.get('preserve', None)
151 self.reverse = kwargs.get('reverse', False)
152 self._rcopy_bufs = {}
153 self._rcopy_tars = {}
154 self._close_count = 0
155 self._start_count = 0
156 self._child_count = 0
157 self._target_count = 0
158 self._has_timeout = False
159
160 if self.command is None and self.source is None:
161 raise ValueError("missing command or source parameter in "
162 "WorkerTree constructor")
163
164
165
166 if self.source and self.reverse:
167 self.stderr = True
168
169
170 invoke_gw_args = []
171 for envname in ('PYTHONPATH',
172 'CLUSTERSHELL_GW_LOG_DIR',
173 'CLUSTERSHELL_GW_LOG_LEVEL',
174 'CLUSTERSHELL_GW_B64_LINE_LENGTH'):
175 envval = os.getenv(envname)
176 if envval:
177 invoke_gw_args.append("%s=%s" % (envname, envval))
178 invoke_gw_args.append("python -m ClusterShell/Gateway -Bu")
179 self.invoke_gateway = ' '.join(invoke_gw_args)
180
181 self.topology = kwargs.get('topology')
182 if self.topology is not None:
183 self.newroot = kwargs.get('newroot') or \
184 str(self.topology.root.nodeset)
185 self.router = PropagationTreeRouter(self.newroot, self.topology)
186 else:
187 self.router = None
188
189 self.upchannel = None
190
191 self.metahandler = MetaWorkerEventHandler(self)
192
193
194 self.gwtargets = {}
195
197 """
198 Bind worker to task. Called by task.schedule().
199 WorkerTree metaworker: override to schedule sub-workers.
200 """
201
202
203
204
205 DistantWorker._set_task(self, task)
206
207 self.topology = self.topology or task.topology
208 self.router = self.router or task._default_router()
209 self._launch(self.nodes)
210 self._check_ini()
211
213 self.logger.debug("WorkerTree._launch on %s (fanout=%d)", nodes,
214 self.task.info("fanout"))
215
216
217 destdir = None
218 if self.source:
219 if self.reverse:
220 self.logger.debug("rcopy source=%s, dest=%s", self.source,
221 self.dest)
222
223 destdir = self.dest
224 else:
225 self.logger.debug("copy source=%s, dest=%s", self.source,
226 self.dest)
227
228
229
230
231
232 if isfile(self.source):
233
234 arcname = basename(self.dest) or \
235 basename(normpath(self.source))
236 destdir = dirname(self.dest)
237 else:
238 arcname = basename(normpath(self.source))
239 destdir = os.path.normpath(self.dest)
240 self.logger.debug("copy arcname=%s destdir=%s", arcname,
241 destdir)
242
243
244 next_hops = self._distribute(self.task.info("fanout"), nodes.copy())
245 self.logger.debug("next_hops=%s"
246 % [(str(n), str(v)) for n, v in next_hops.items()])
247 for gw, targets in next_hops.iteritems():
248 if gw == targets:
249 self.logger.debug('task.shell cmd=%s source=%s nodes=%s '
250 'timeout=%s remote=%s', self.command,
251 self.source, nodes, self.timeout, self.remote)
252 self._child_count += 1
253 self._target_count += len(targets)
254 if self.remote:
255 if self.source:
256
257
258
259 self.logger.debug('_launch copy r=%s source=%s dest=%s',
260 self.reverse, self.source, self.dest)
261 worker = self.task.copy(self.source, self.dest, targets,
262 handler=self.metahandler,
263 stderr=self.stderr,
264 timeout=self.timeout,
265 preserve=self.preserve,
266 reverse=self.reverse,
267 tree=False)
268 else:
269 worker = self.task.shell(self.command,
270 nodes=targets,
271 timeout=self.timeout,
272 handler=self.metahandler,
273 stderr=self.stderr,
274 tree=False)
275 else:
276 assert self.source is None
277 worker = ExecWorker(nodes=targets,
278 command=self.command,
279 handler=self.metahandler,
280 timeout=self.timeout,
281 stderr=self.stderr)
282 self.task.schedule(worker)
283
284 self.workers.append(worker)
285 self.logger.debug("added child worker %s count=%d", worker,
286 len(self.workers))
287 else:
288 self.logger.debug("trying gateway %s to reach %s", gw, targets)
289 if self.source:
290 self._copy_remote(self.source, destdir, targets, gw,
291 self.timeout, self.reverse)
292 else:
293 self._execute_remote(self.command, targets, gw,
294 self.timeout)
295
296
297 if self.source and not self.reverse:
298 try:
299
300 tmptar = tempfile.TemporaryFile()
301 tar = tarfile.open(fileobj=tmptar, mode='w:')
302 tar.add(self.source, arcname=arcname)
303 tar.close()
304 tmptar.flush()
305
306 tmptar.seek(0)
307 rbuf = tmptar.read(32768)
308
309 while len(rbuf) > 0:
310 self._write_remote(rbuf)
311 rbuf = tmptar.read(32768)
312 except OSError, exc:
313 raise WorkerError(exc)
314
316 """distribute target nodes between next hop gateways"""
317 distribution = {}
318 self.router.fanout = fanout
319
320 for gw, dstset in self.router.dispatch(dst_nodeset):
321 if gw in distribution:
322 distribution[gw].add(dstset)
323 else:
324 distribution[gw] = dstset
325 return distribution
326
327 - def _copy_remote(self, source, dest, targets, gateway, timeout, reverse):
328 """run a remote copy in tree mode (using gateway)"""
329 self.logger.debug("_copy_remote gateway=%s source=%s dest=%s "
330 "reverse=%s", gateway, source, dest, reverse)
331
332 self._target_count += len(targets)
333
334 self.gwtargets[gateway] = targets.copy()
335
336
337 if reverse:
338
339 srcdir = dirname(source).replace("'", '\'\"\'\"\'')
340 srcbase = basename(normpath(self.source)).replace("'", '\'\"\'\"\'')
341 cmd = self.TAR_CMD_FMT % (srcdir, srcbase)
342 else:
343 cmd = self.UNTAR_CMD_FMT % dest.replace("'", '\'\"\'\"\'')
344
345 self.logger.debug('_copy_remote: tar cmd: %s', cmd)
346
347 pchan = self.task._pchannel(gateway, self)
348 pchan.shell(nodes=targets, command=cmd, worker=self, timeout=timeout,
349 stderr=self.stderr, gw_invoke_cmd=self.invoke_gateway,
350 remote=self.remote)
351
352
354 """run command against a remote node via a gateway"""
355 self.logger.debug("_execute_remote gateway=%s cmd=%s targets=%s",
356 gateway, cmd, targets)
357
358 self._target_count += len(targets)
359
360 self.gwtargets[gateway] = targets.copy()
361
362 pchan = self.task._pchannel(gateway, self)
363 pchan.shell(nodes=targets, command=cmd, worker=self, timeout=timeout,
364 stderr=self.stderr, gw_invoke_cmd=self.invoke_gateway,
365 remote=self.remote)
366
368 """
369 Access underlying engine clients.
370 """
371 return []
372
374 """remote msg received"""
375 if not self.source or not self.reverse or sname != 'stdout':
376 DistantWorker._on_node_msgline(self, node, msg, sname)
377 return
378
379
380 encoded = self._rcopy_bufs.setdefault(node, '') + msg
381 if node not in self._rcopy_tars:
382 self._rcopy_tars[node] = tempfile.TemporaryFile()
383
384
385 encoded_sz = (len(encoded) // 4) * 4
386
387 self._rcopy_tars[node].write(base64.b64decode(encoded[0:encoded_sz]))
388
389 self._rcopy_bufs[node] = encoded[encoded_sz:]
390
392 """remote rc received"""
393 DistantWorker._on_node_rc(self, node, rc)
394 self.logger.debug("_on_remote_node_rc %s %s via gw %s", node,
395 self._close_count, gateway)
396
397
398 if self.source and self.reverse:
399 for node, buf in self._rcopy_bufs.iteritems():
400 tarfileobj = self._rcopy_tars[node]
401 if len(buf) > 0:
402 self.logger.debug("flushing node %s buf %d bytes", node,
403 len(buf))
404 tarfileobj.write(buf)
405 tarfileobj.flush()
406 tarfileobj.seek(0)
407 try:
408 tmptar = tarfile.open(fileobj=tarfileobj)
409 try:
410 self.logger.debug("%s extracting %d members in dest %s",
411 node, len(tmptar.getmembers()),
412 self.dest)
413 tmptar.extractall(path=self.dest)
414 except IOError, ex:
415 self._on_remote_node_msgline(node, ex, 'stderr',
416 gateway)
417
418 finally:
419 tmptar.close()
420 self._rcopy_bufs = {}
421 self._rcopy_tars = {}
422
423 self.gwtargets[gateway].remove(node)
424 self._close_count += 1
425 self._check_fini(gateway)
426
428 """remote node timeout received"""
429 DistantWorker._on_node_timeout(self, node)
430 self.logger.debug("_on_remote_node_timeout %s via gw %s", node, gateway)
431 self._close_count += 1
432 self._has_timeout = True
433 self.gwtargets[gateway].remove(node)
434 self._check_fini(gateway)
435
437 DistantWorker._on_node_rc(self, node, rc)
438 self.logger.debug("_on_node_rc %s %s (%s)", node, rc, self._close_count)
439 self._close_count += 1
440
445
447 self.logger.debug("WorkerTree: _check_ini (%d, %d)", self._start_count,
448 self._child_count)
449 if self.eh and self._start_count >= self._child_count:
450 self.eh.ev_start(self)
451
453 self.logger.debug("check_fini %s %s", self._close_count,
454 self._target_count)
455 if self._close_count >= self._target_count:
456 handler = self.eh
457 if handler:
458 if self._has_timeout:
459 handler.ev_timeout(self)
460 handler.ev_close(self)
461
462
463 if gateway:
464 targets = self.gwtargets[gateway]
465 if not targets:
466
467 self.logger.debug("WorkerTree._check_fini %s call pchannel_"
468 "release for gw %s", self, gateway)
469 self.task._pchannel_release(gateway, self)
470 del self.gwtargets[gateway]
471
473 """Write buf to remote clients only."""
474 for gateway, targets in self.gwtargets.items():
475 assert len(targets) > 0
476 self.task._pchannel(gateway, self).write(nodes=targets, buf=buf,
477 worker=self)
478
480 """Write to worker clients."""
481 osexc = None
482
483 for worker in self.workers:
484 try:
485 worker.write(buf)
486 except OSError, exc:
487 osexc = exc
488
489 self._write_remote(buf)
490
491 if osexc:
492 raise osexc
493
495 """
496 Tell worker to close its writer file descriptor once flushed. Do not
497 perform writes after this call.
498 """
499
500 for worker in self.workers:
501 worker.set_write_eof()
502 for gateway, targets in self.gwtargets.items():
503 assert len(targets) > 0
504 self.task._pchannel(gateway, self).set_write_eof(nodes=targets,
505 worker=self)
506
508 """Abort processing any action by this worker."""
509
510 raise NotImplementedError("see github issue #229")
511