Package ClusterShell :: Package Worker :: Module Worker
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Worker.Worker

  1  # 
  2  # Copyright (C) 2007-2016 CEA/DAM 
  3  # Copyright (C) 2015-2016 Stephane Thiell <sthiell@stanford.edu> 
  4  # 
  5  # This file is part of ClusterShell. 
  6  # 
  7  # ClusterShell is free software; you can redistribute it and/or 
  8  # modify it under the terms of the GNU Lesser General Public 
  9  # License as published by the Free Software Foundation; either 
 10  # version 2.1 of the License, or (at your option) any later version. 
 11  # 
 12  # ClusterShell is distributed in the hope that it will be useful, 
 13  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 14  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 15  # Lesser General Public License for more details. 
 16  # 
 17  # You should have received a copy of the GNU Lesser General Public 
 18  # License along with ClusterShell; if not, write to the Free Software 
 19  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 
 20   
 21  """ 
 22  ClusterShell worker interface. 
 23   
 24  A worker is a generic object which provides "grouped" work in a specific task. 
 25  """ 
 26   
 27  import inspect 
 28  import warnings 
 29   
 30  from ClusterShell.Worker.EngineClient import EngineClient 
 31  from ClusterShell.NodeSet import NodeSet 
 32  from ClusterShell.Engine.Engine import FANOUT_UNLIMITED, FANOUT_DEFAULT 
 33   
 34   
35 -class WorkerException(Exception):
36 """Generic worker exception."""
37
38 -class WorkerError(WorkerException):
39 """Generic worker error."""
40 41 # DEPRECATED: WorkerBadArgumentError exception is deprecated as of 1.4, 42 # use ValueError instead. 43 WorkerBadArgumentError = ValueError 44
45 -class Worker(object):
46 """ 47 Worker is an essential base class for the ClusterShell library. The goal 48 of a worker object is to execute a common work on a single or several 49 targets (abstract notion) in parallel. Concret targets and also the notion 50 of local or distant targets are managed by Worker's subclasses (for 51 example, see the DistantWorker base class). 52 53 A configured Worker object is associated to a specific ClusterShell Task, 54 which can be seen as a single-threaded Worker supervisor. Indeed, the work 55 to be done is executed in parallel depending on other Workers and Task's 56 current paramaters, like current fanout value. 57 58 ClusterShell is designed to write event-driven applications, and the Worker 59 class is key here as Worker objects are passed as parameter of most event 60 handlers (see the ClusterShell.Event.EventHandler class). 61 62 The following public object variables are defined on some events, so you 63 may find them useful in event handlers: 64 - worker.current_node [ev_pickup,ev_read,ev_error,ev_hup] 65 node/key concerned by event 66 - worker.current_msg [ev_read] 67 message just read (from stdout) 68 - worker.current_errmsg [ev_error] 69 error message just read (from stderr) 70 - worker.current_rc [ev_hup] 71 return code just received 72 73 Example of use: 74 75 >>> from ClusterShell.Event import EventHandler 76 >>> class MyOutputHandler(EventHandler): 77 ... def ev_read(self, worker): 78 ... node = worker.current_node 79 ... line = worker.current_msg 80 ... print "%s: %s" % (node, line) 81 ... 82 """ 83 84 # The following common stream names are recognized by the Task class. 85 # They can be changed per Worker, thus avoiding any Task buffering. 86 SNAME_STDIN = 'stdin' 87 SNAME_STDOUT = 'stdout' 88 SNAME_STDERR = 'stderr' 89
90 - def __init__(self, handler):
91 """Initializer. Should be called from derived classes.""" 92 # Associated EventHandler object 93 self.eh = handler #: associated :class:`.EventHandler` 94 95 # Per Worker fanout value (positive integer). 96 # Default is FANOUT_DEFAULT to use the fanout set at the Task level. 97 # Change to FANOUT_UNLIMITED to always schedule this worker. 98 # NOTE: the fanout value must be set before the Worker starts and 99 # cannot currently be changed afterwards. 100 self._fanout = FANOUT_DEFAULT 101 102 # Parent task (once bound) 103 self.task = None #: worker's task when scheduled or None 104 self.started = False #: set to True when worker has started 105 self.metaworker = None 106 self.metarefcnt = 0 107 108 # current_x public variables (updated at each event accordingly) 109 self.current_node = None #: set to node in event handler 110 self.current_msg = None #: set to stdout message in event handler 111 self.current_errmsg = None #: set to stderr message in event handler 112 self.current_rc = 0 #: set to return code in event handler 113 self.current_sname = None #: set to stream name in event handler
114
115 - def _set_task(self, task):
116 """Bind worker to task. Called by task.schedule().""" 117 if self.task is not None: 118 # one-shot-only schedule supported for now 119 raise WorkerError("worker has already been scheduled") 120 self.task = task
121
122 - def _task_bound_check(self):
123 """Helper method to check that worker is bound to a task.""" 124 if not self.task: 125 raise WorkerError("worker is not task bound")
126
127 - def _engine_clients(self):
128 """Return a list of underlying engine clients.""" 129 raise NotImplementedError("Derived classes must implement.")
130 131 # Event generators 132
133 - def _on_start(self, key):
134 """Called on command start.""" 135 self.current_node = key 136 137 if not self.started: 138 self.started = True 139 if self.eh: 140 self.eh.ev_start(self) 141 142 if self.eh: 143 self.eh.ev_pickup(self)
144
145 - def _on_rc(self, key, rc):
146 """Command return code received.""" 147 self.current_node = key 148 self.current_rc = rc 149 150 self.task._rc_set(self, key, rc) 151 152 if self.eh: 153 self.eh.ev_hup(self)
154
155 - def _on_written(self, key, bytes_count, sname):
156 """Notification of bytes written.""" 157 # set node and stream name (compat only) 158 self.current_node = key 159 self.current_sname = sname 160 161 # generate event - for ev_written, also check for new signature (1.7) 162 # NOTE: add DeprecationWarning in 1.8 for old ev_written signature 163 if self.eh and len(inspect.getargspec(self.eh.ev_written)[0]) == 5: 164 self.eh.ev_written(self, key, sname, bytes_count)
165 166 # Base getters 167
168 - def last_read(self):
169 """ 170 Get last read message from event handler. 171 [DEPRECATED] use current_msg 172 """ 173 raise NotImplementedError("Derived classes must implement.")
174
175 - def last_error(self):
176 """ 177 Get last error message from event handler. 178 [DEPRECATED] use current_errmsg 179 """ 180 raise NotImplementedError("Derived classes must implement.")
181
182 - def did_timeout(self):
183 """Return whether this worker has aborted due to timeout.""" 184 self._task_bound_check() 185 return self.task._num_timeout_by_worker(self) > 0
186
187 - def read(self, node=None, sname='stdout'):
188 """Read worker stream buffer. 189 190 Return stream read buffer of current worker. 191 192 Arguments: 193 node -- node name; can also be set to None for simple worker 194 having worker.key defined (default is None) 195 sname -- stream name (default is 'stdout') 196 """ 197 self._task_bound_check() 198 return self.task._msg_by_source(self, node, sname)
199 200 # Base actions 201
202 - def abort(self):
203 """Abort processing any action by this worker.""" 204 raise NotImplementedError("Derived classes must implement.")
205
206 - def flush_buffers(self):
207 """Flush any messages associated to this worker.""" 208 self._task_bound_check() 209 self.task._flush_buffers_by_worker(self)
210
211 - def flush_errors(self):
212 """Flush any error messages associated to this worker.""" 213 self._task_bound_check() 214 self.task._flush_errors_by_worker(self)
215
216 -class DistantWorker(Worker):
217 """Base class DistantWorker. 218 219 DistantWorker provides a useful set of setters/getters to use with 220 distant workers like ssh or pdsh. 221 """ 222 223 # Event generators 224
225 - def _on_node_msgline(self, node, msg, sname):
226 """Message received from node, update last* stuffs.""" 227 # Maxoptimize this method as it might be called very often. 228 task = self.task 229 handler = self.eh 230 assert type(node) is not NodeSet # for testing 231 # set stream name 232 self.current_sname = sname 233 # update task msgtree 234 task._msg_add(self, node, sname, msg) 235 # generate event 236 self.current_node = node 237 if sname == self.SNAME_STDERR: 238 self.current_errmsg = msg 239 if handler is not None: 240 handler.ev_error(self) 241 else: 242 self.current_msg = msg 243 if handler is not None: 244 handler.ev_read(self)
245
246 - def _on_node_rc(self, node, rc):
247 """Command return code received.""" 248 Worker._on_rc(self, node, rc)
249
250 - def _on_node_timeout(self, node):
251 """Update on node timeout.""" 252 # Update current_node to allow node resolution after ev_timeout. 253 self.current_node = node 254 255 self.task._timeout_add(self, node)
256
257 - def last_node(self):
258 """ 259 Get last node, useful to get the node in an EventHandler 260 callback like ev_read(). 261 [DEPRECATED] use current_node 262 """ 263 warnings.warn("use current_node instead", DeprecationWarning) 264 return self.current_node
265
266 - def last_read(self):
267 """ 268 Get last (node, buffer), useful in an EventHandler.ev_read() 269 [DEPRECATED] use (current_node, current_msg) 270 """ 271 warnings.warn("use current_node and current_msg instead", 272 DeprecationWarning) 273 return self.current_node, self.current_msg
274
275 - def last_error(self):
276 """ 277 Get last (node, error_buffer), useful in an EventHandler.ev_error() 278 [DEPRECATED] use (current_node, current_errmsg) 279 """ 280 warnings.warn("use current_node and current_errmsg instead", 281 DeprecationWarning) 282 return self.current_node, self.current_errmsg
283
284 - def last_retcode(self):
285 """ 286 Get last (node, rc), useful in an EventHandler.ev_hup() 287 [DEPRECATED] use (current_node, current_rc) 288 """ 289 warnings.warn("use current_node and current_rc instead", 290 DeprecationWarning) 291 return self.current_node, self.current_rc
292
293 - def node_buffer(self, node):
294 """Get specific node buffer.""" 295 return self.read(node, self.SNAME_STDOUT)
296
297 - def node_error(self, node):
298 """Get specific node error buffer.""" 299 return self.read(node, self.SNAME_STDERR)
300 301 node_error_buffer = node_error 302
303 - def node_retcode(self, node):
304 """ 305 Get specific node return code. 306 307 :raises KeyError: command on node has not yet finished (no return code 308 available), or this node is not known by this worker 309 """ 310 self._task_bound_check() 311 try: 312 rc = self.task._rc_by_source(self, node) 313 except KeyError: 314 raise KeyError(node) 315 return rc
316 317 node_rc = node_retcode 318
319 - def iter_buffers(self, match_keys=None):
320 """ 321 Returns an iterator over available buffers and associated 322 NodeSet. If the optional parameter match_keys is defined, only 323 keys found in match_keys are returned. 324 """ 325 self._task_bound_check() 326 for msg, keys in self.task._call_tree_matcher( 327 self.task._msgtree(self.SNAME_STDOUT).walk, match_keys, self): 328 yield msg, NodeSet.fromlist(keys)
329
330 - def iter_errors(self, match_keys=None):
331 """ 332 Returns an iterator over available error buffers and associated 333 NodeSet. If the optional parameter match_keys is defined, only 334 keys found in match_keys are returned. 335 """ 336 self._task_bound_check() 337 for msg, keys in self.task._call_tree_matcher( 338 self.task._msgtree(self.SNAME_STDERR).walk, match_keys, self): 339 yield msg, NodeSet.fromlist(keys)
340
341 - def iter_node_buffers(self, match_keys=None):
342 """ 343 Returns an iterator over each node and associated buffer. 344 """ 345 self._task_bound_check() 346 return self.task._call_tree_matcher( 347 self.task._msgtree(self.SNAME_STDOUT).items, match_keys, self)
348
349 - def iter_node_errors(self, match_keys=None):
350 """ 351 Returns an iterator over each node and associated error buffer. 352 """ 353 self._task_bound_check() 354 return self.task._call_tree_matcher( 355 self.task._msgtree(self.SNAME_STDERR).items, match_keys, self)
356
357 - def iter_retcodes(self, match_keys=None):
358 """ 359 Returns an iterator over return codes and associated NodeSet. 360 If the optional parameter match_keys is defined, only keys 361 found in match_keys are returned. 362 """ 363 self._task_bound_check() 364 for rc, keys in self.task._rc_iter_by_worker(self, match_keys): 365 yield rc, NodeSet.fromlist(keys)
366
367 - def iter_node_retcodes(self):
368 """ 369 Returns an iterator over each node and associated return code. 370 """ 371 self._task_bound_check() 372 return self.task._krc_iter_by_worker(self)
373
374 - def num_timeout(self):
375 """ 376 Return the number of timed out "keys" (ie. nodes) for this worker. 377 """ 378 self._task_bound_check() 379 return self.task._num_timeout_by_worker(self)
380
381 - def iter_keys_timeout(self):
382 """ 383 Iterate over timed out keys (ie. nodes) for a specific worker. 384 """ 385 self._task_bound_check() 386 return self.task._iter_keys_timeout_by_worker(self)
387
388 -class StreamClient(EngineClient):
389 """StreamWorker's default EngineClient. 390 391 StreamClient is the EngineClient subclass used by default by 392 StreamWorker. It handles some generic methods to pass data to the 393 StreamWorker. 394 """ 395
396 - def _start(self):
397 """Called on EngineClient start.""" 398 assert not self.worker.started 399 self.worker._on_start(self.key) 400 return self
401
402 - def _read(self, sname, size=65536):
403 """Read data from process.""" 404 return EngineClient._read(self, sname, size)
405
406 - def _close(self, abort, timeout):
407 """Close client. See EngineClient._close().""" 408 EngineClient._close(self, abort, timeout) 409 if timeout: 410 assert abort, "abort flag not set on timeout" 411 self.worker._on_timeout(self.key) 412 # return code not available 413 self.worker._on_rc(self.key, None) 414 415 if self.worker.eh: 416 self.worker.eh.ev_close(self.worker)
417
418 - def _handle_read(self, sname):
419 """Engine is telling us there is data available for reading.""" 420 # Local variables optimization 421 task = self.worker.task 422 msgline = self.worker._on_msgline 423 424 debug = task.info("debug", False) 425 if debug: 426 print_debug = task.info("print_debug") 427 for msg in self._readlines(sname): 428 print_debug(task, "LINE %s" % msg) 429 msgline(self.key, msg, sname) 430 else: 431 for msg in self._readlines(sname): 432 msgline(self.key, msg, sname)
433
434 - def _flush_read(self, sname):
435 """Called at close time to flush stream read buffer.""" 436 stream = self.streams[sname] 437 if stream.readable() and stream.rbuf: 438 # We still have some read data available in buffer, but no 439 # EOL. Generate a final message before closing. 440 self.worker._on_msgline(self.key, stream.rbuf, sname)
441
442 - def write(self, buf, sname=None):
443 """Write to writable stream(s).""" 444 if sname is not None: 445 self._write(sname, buf) 446 return 447 # sname not specified: "broadcast" to all writable streams... 448 for writer in self.streams.writers(): 449 self._write(writer.name, buf)
450
451 - def set_write_eof(self, sname=None):
452 """Set EOF flag to writable stream(s).""" 453 if sname is not None: 454 self._set_write_eof(sname) 455 return 456 # sname not specified: set eof flag on all writable streams... 457 for writer in self.streams.writers(): 458 self._set_write_eof(writer.name)
459
460 -class StreamWorker(Worker):
461 """StreamWorker base class [v1.7+] 462 463 The StreamWorker class implements a base (but concrete) Worker that 464 can read and write to multiple streams. Unlike most other Workers, 465 it does not execute any external commands by itself. Rather, it 466 should be pre-bound to "streams", ie. file(s) or file descriptor(s), 467 using the two following methods: 468 >>> worker.set_reader('stream1', fd1) 469 >>> worker.set_writer('stream2', fd2) 470 471 Like other Workers, the StreamWorker instance should be associated 472 with a Task using task.schedule(worker). When the task engine is 473 ready to process the StreamWorker, all of its streams are being 474 processed together. For that reason, it is not possible to add new 475 readers or writers to a running StreamWorker (ie. task is running 476 and worker is already scheduled). 477 478 Configured readers will generate ev_read() events when data is 479 available for reading. So, the following additional public worker 480 variable is available and defines the stream name for the event: 481 >>> worker.current_sname [ev_read,ev_error] 482 483 Please note that ev_error() is called instead of ev_read() when the 484 stream name is 'stderr'. Indeed, all other stream names use 485 ev_read(). 486 487 Configured writers will allow the use of the method write(), eg. 488 worker.write(data, 'stream2'), to write to the stream. 489 """ 490
491 - def __init__(self, handler, key=None, stderr=False, timeout=-1, 492 autoclose=False, client_class=StreamClient):
493 Worker.__init__(self, handler) 494 if key is None: # allow key=0 495 key = self 496 self.clients = [client_class(self, key, stderr, timeout, autoclose)]
497
498 - def set_reader(self, sname, sfile, retain=True, closefd=True):
499 """Add a readable stream to StreamWorker. 500 501 Arguments: 502 sname -- the name of the stream (string) 503 sfile -- the stream file or file descriptor 504 retain -- whether the stream retains engine client 505 (default is True) 506 closefd -- whether to close fd when the stream is closed 507 (default is True) 508 """ 509 if not self.clients[0].registered: 510 self.clients[0].streams.set_reader(sname, sfile, retain, closefd) 511 else: 512 raise WorkerError("cannot add new stream at runtime")
513
514 - def set_writer(self, sname, sfile, retain=True, closefd=True):
515 """Set a writable stream to StreamWorker. 516 517 Arguments: 518 sname -- the name of the stream (string) 519 sfile -- the stream file or file descriptor 520 retain -- whether the stream retains engine client 521 (default is True) 522 closefd -- whether to close fd when the stream is closed 523 (default is True) 524 """ 525 if not self.clients[0].registered: 526 self.clients[0].streams.set_writer(sname, sfile, retain, closefd) 527 else: 528 raise WorkerError("cannot add new stream at runtime")
529
530 - def _engine_clients(self):
531 """Return a list of underlying engine clients.""" 532 return self.clients
533
534 - def set_key(self, key):
535 """Source key for this worker is free for use. 536 537 Use this method to set the custom source key for this worker. 538 """ 539 self.clients[0].key = key
540
541 - def _on_msgline(self, key, msg, sname):
542 """Add a message.""" 543 # update task msgtree 544 self.task._msg_add(self, key, sname, msg) 545 546 # set stream name 547 self.current_sname = sname 548 549 # generate event 550 if sname == 'stderr': 551 # add last msg to local buffer 552 self.current_errmsg = msg 553 if self.eh: 554 self.eh.ev_error(self) 555 else: 556 # add last msg to local buffer 557 self.current_msg = msg 558 if self.eh: 559 self.eh.ev_read(self)
560
561 - def _on_timeout(self, key):
562 """Update on timeout.""" 563 self.task._timeout_add(self, key) 564 565 # trigger timeout event 566 if self.eh: 567 self.eh.ev_timeout(self)
568
569 - def abort(self):
570 """Abort processing any action by this worker.""" 571 self.clients[0].abort()
572
573 - def read(self, node=None, sname='stdout'):
574 """Read worker stream buffer. 575 576 Return stream read buffer of current worker. 577 578 Arguments: 579 node -- node name; can also be set to None for simple worker 580 having worker.key defined (default is None) 581 sname -- stream name (default is 'stdout') 582 """ 583 return Worker.read(self, node or self.clients[0].key, sname)
584
585 - def write(self, buf, sname=None):
586 """Write to worker. 587 588 If sname is specified, write to the associated stream, 589 otherwise write to all writable streams. 590 """ 591 self.clients[0].write(buf, sname)
592
593 - def set_write_eof(self, sname=None):
594 """ 595 Tell worker to close its writer file descriptor once flushed. 596 597 Do not perform writes after this call. Like write(), sname can 598 be optionally specified to target a specific writable stream, 599 otherwise all writable streams are marked as EOF. 600 """ 601 self.clients[0].set_write_eof(sname)
602
603 -class WorkerSimple(StreamWorker):
604 """WorkerSimple base class [DEPRECATED] 605 606 Implements a simple Worker to manage common process 607 stdin/stdout/stderr streams. 608 609 [DEPRECATED] use StreamWorker. 610 """ 611
612 - def __init__(self, file_reader, file_writer, file_error, key, handler, 613 stderr=False, timeout=-1, autoclose=False, closefd=True, 614 client_class=StreamClient):
615 """Initialize WorkerSimple worker.""" 616 StreamWorker.__init__(self, handler, key, stderr, timeout, autoclose, 617 client_class=client_class) 618 if file_reader: 619 self.set_reader('stdout', file_reader, closefd=closefd) 620 if file_error: 621 self.set_reader('stderr', file_error, closefd=closefd) 622 if file_writer: 623 self.set_writer('stdin', file_writer, closefd=closefd) 624 # keep reference of provided file objects during worker lifetime 625 self._filerefs = (file_reader, file_writer, file_error)
626
627 - def error_fileno(self):
628 """Return the standard error reader file descriptor (integer).""" 629 return self.clients[0].streams['stderr'].fd
630
631 - def reader_fileno(self):
632 """Return the reader file descriptor (integer).""" 633 return self.clients[0].streams['stdout'].fd
634
635 - def writer_fileno(self):
636 """Return the writer file descriptor as an integer.""" 637 return self.clients[0].streams['stdin'].fd
638
639 - def last_read(self):
640 """ 641 Get last read message. 642 643 [DEPRECATED] use current_msg 644 """ 645 warnings.warn("use current_msg instead", DeprecationWarning) 646 return self.current_msg
647
648 - def last_error(self):
649 """ 650 Get last error message. 651 652 [DEPRECATED] use current_errmsg 653 """ 654 warnings.warn("use current_errmsg instead", DeprecationWarning) 655 return self.current_errmsg
656
657 - def error(self):
658 """Read worker error buffer.""" 659 return self.read(sname='stderr')
660
661 - def _on_start(self, key):
662 """Called on command start.""" 663 if not self.started: 664 self.started = True 665 if self.eh: 666 self.eh.ev_start(self) 667 668 if self.eh: 669 self.eh.ev_pickup(self)
670
671 - def _on_rc(self, key, rc):
672 """Command return code received.""" 673 self.current_rc = rc 674 675 self.task._rc_set(self, key, rc) 676 677 if self.eh: 678 self.eh.ev_hup(self)
679
680 - def _on_written(self, key, bytes_count, sname):
681 """Notification of bytes written.""" 682 # set node and stream name (compat only) 683 self.current_sname = sname 684 685 # generate event - for ev_written, also check for new signature (1.7) 686 # NOTE: add DeprecationWarning in 1.8 for old ev_written signature 687 if self.eh and len(inspect.getargspec(self.eh.ev_written)[0]) == 5: 688 self.eh.ev_written(self, key, sname, bytes_count)
689