1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """
22 ClusterShell worker interface.
23
24 A worker is a generic object which provides "grouped" work in a specific task.
25 """
26
27 import inspect
28 import warnings
29
30 from ClusterShell.Worker.EngineClient import EngineClient
31 from ClusterShell.NodeSet import NodeSet
32 from ClusterShell.Engine.Engine import FANOUT_UNLIMITED, FANOUT_DEFAULT
33
34
36 """Generic worker exception."""
37
39 """Generic worker error."""
40
41
42
43 WorkerBadArgumentError = ValueError
44
46 """
47 Worker is an essential base class for the ClusterShell library. The goal
48 of a worker object is to execute a common work on a single or several
49 targets (abstract notion) in parallel. Concret targets and also the notion
50 of local or distant targets are managed by Worker's subclasses (for
51 example, see the DistantWorker base class).
52
53 A configured Worker object is associated to a specific ClusterShell Task,
54 which can be seen as a single-threaded Worker supervisor. Indeed, the work
55 to be done is executed in parallel depending on other Workers and Task's
56 current paramaters, like current fanout value.
57
58 ClusterShell is designed to write event-driven applications, and the Worker
59 class is key here as Worker objects are passed as parameter of most event
60 handlers (see the ClusterShell.Event.EventHandler class).
61
62 The following public object variables are defined on some events, so you
63 may find them useful in event handlers:
64 - worker.current_node [ev_pickup,ev_read,ev_error,ev_hup]
65 node/key concerned by event
66 - worker.current_msg [ev_read]
67 message just read (from stdout)
68 - worker.current_errmsg [ev_error]
69 error message just read (from stderr)
70 - worker.current_rc [ev_hup]
71 return code just received
72
73 Example of use:
74
75 >>> from ClusterShell.Event import EventHandler
76 >>> class MyOutputHandler(EventHandler):
77 ... def ev_read(self, worker):
78 ... node = worker.current_node
79 ... line = worker.current_msg
80 ... print "%s: %s" % (node, line)
81 ...
82 """
83
84
85
86 SNAME_STDIN = 'stdin'
87 SNAME_STDOUT = 'stdout'
88 SNAME_STDERR = 'stderr'
89
91 """Initializer. Should be called from derived classes."""
92
93 self.eh = handler
94
95
96
97
98
99
100 self._fanout = FANOUT_DEFAULT
101
102
103 self.task = None
104 self.started = False
105 self.metaworker = None
106 self.metarefcnt = 0
107
108
109 self.current_node = None
110 self.current_msg = None
111 self.current_errmsg = None
112 self.current_rc = 0
113 self.current_sname = None
114
116 """Bind worker to task. Called by task.schedule()."""
117 if self.task is not None:
118
119 raise WorkerError("worker has already been scheduled")
120 self.task = task
121
123 """Helper method to check that worker is bound to a task."""
124 if not self.task:
125 raise WorkerError("worker is not task bound")
126
128 """Return a list of underlying engine clients."""
129 raise NotImplementedError("Derived classes must implement.")
130
131
132
134 """Called on command start."""
135 self.current_node = key
136
137 if not self.started:
138 self.started = True
139 if self.eh:
140 self.eh.ev_start(self)
141
142 if self.eh:
143 self.eh.ev_pickup(self)
144
146 """Command return code received."""
147 self.current_node = key
148 self.current_rc = rc
149
150 self.task._rc_set(self, key, rc)
151
152 if self.eh:
153 self.eh.ev_hup(self)
154
156 """Notification of bytes written."""
157
158 self.current_node = key
159 self.current_sname = sname
160
161
162
163 if self.eh and len(inspect.getargspec(self.eh.ev_written)[0]) == 5:
164 self.eh.ev_written(self, key, sname, bytes_count)
165
166
167
169 """
170 Get last read message from event handler.
171 [DEPRECATED] use current_msg
172 """
173 raise NotImplementedError("Derived classes must implement.")
174
176 """
177 Get last error message from event handler.
178 [DEPRECATED] use current_errmsg
179 """
180 raise NotImplementedError("Derived classes must implement.")
181
186
187 - def read(self, node=None, sname='stdout'):
188 """Read worker stream buffer.
189
190 Return stream read buffer of current worker.
191
192 Arguments:
193 node -- node name; can also be set to None for simple worker
194 having worker.key defined (default is None)
195 sname -- stream name (default is 'stdout')
196 """
197 self._task_bound_check()
198 return self.task._msg_by_source(self, node, sname)
199
200
201
203 """Abort processing any action by this worker."""
204 raise NotImplementedError("Derived classes must implement.")
205
210
215
217 """Base class DistantWorker.
218
219 DistantWorker provides a useful set of setters/getters to use with
220 distant workers like ssh or pdsh.
221 """
222
223
224
226 """Message received from node, update last* stuffs."""
227
228 task = self.task
229 handler = self.eh
230 assert type(node) is not NodeSet
231
232 self.current_sname = sname
233
234 task._msg_add(self, node, sname, msg)
235
236 self.current_node = node
237 if sname == self.SNAME_STDERR:
238 self.current_errmsg = msg
239 if handler is not None:
240 handler.ev_error(self)
241 else:
242 self.current_msg = msg
243 if handler is not None:
244 handler.ev_read(self)
245
247 """Command return code received."""
248 Worker._on_rc(self, node, rc)
249
251 """Update on node timeout."""
252
253 self.current_node = node
254
255 self.task._timeout_add(self, node)
256
258 """
259 Get last node, useful to get the node in an EventHandler
260 callback like ev_read().
261 [DEPRECATED] use current_node
262 """
263 warnings.warn("use current_node instead", DeprecationWarning)
264 return self.current_node
265
267 """
268 Get last (node, buffer), useful in an EventHandler.ev_read()
269 [DEPRECATED] use (current_node, current_msg)
270 """
271 warnings.warn("use current_node and current_msg instead",
272 DeprecationWarning)
273 return self.current_node, self.current_msg
274
276 """
277 Get last (node, error_buffer), useful in an EventHandler.ev_error()
278 [DEPRECATED] use (current_node, current_errmsg)
279 """
280 warnings.warn("use current_node and current_errmsg instead",
281 DeprecationWarning)
282 return self.current_node, self.current_errmsg
283
285 """
286 Get last (node, rc), useful in an EventHandler.ev_hup()
287 [DEPRECATED] use (current_node, current_rc)
288 """
289 warnings.warn("use current_node and current_rc instead",
290 DeprecationWarning)
291 return self.current_node, self.current_rc
292
294 """Get specific node buffer."""
295 return self.read(node, self.SNAME_STDOUT)
296
298 """Get specific node error buffer."""
299 return self.read(node, self.SNAME_STDERR)
300
301 node_error_buffer = node_error
302
304 """
305 Get specific node return code.
306
307 :raises KeyError: command on node has not yet finished (no return code
308 available), or this node is not known by this worker
309 """
310 self._task_bound_check()
311 try:
312 rc = self.task._rc_by_source(self, node)
313 except KeyError:
314 raise KeyError(node)
315 return rc
316
317 node_rc = node_retcode
318
329
340
348
356
358 """
359 Returns an iterator over return codes and associated NodeSet.
360 If the optional parameter match_keys is defined, only keys
361 found in match_keys are returned.
362 """
363 self._task_bound_check()
364 for rc, keys in self.task._rc_iter_by_worker(self, match_keys):
365 yield rc, NodeSet.fromlist(keys)
366
373
380
387
389 """StreamWorker's default EngineClient.
390
391 StreamClient is the EngineClient subclass used by default by
392 StreamWorker. It handles some generic methods to pass data to the
393 StreamWorker.
394 """
395
397 """Called on EngineClient start."""
398 assert not self.worker.started
399 self.worker._on_start(self.key)
400 return self
401
402 - def _read(self, sname, size=65536):
405
406 - def _close(self, abort, timeout):
407 """Close client. See EngineClient._close()."""
408 EngineClient._close(self, abort, timeout)
409 if timeout:
410 assert abort, "abort flag not set on timeout"
411 self.worker._on_timeout(self.key)
412
413 self.worker._on_rc(self.key, None)
414
415 if self.worker.eh:
416 self.worker.eh.ev_close(self.worker)
417
419 """Engine is telling us there is data available for reading."""
420
421 task = self.worker.task
422 msgline = self.worker._on_msgline
423
424 debug = task.info("debug", False)
425 if debug:
426 print_debug = task.info("print_debug")
427 for msg in self._readlines(sname):
428 print_debug(task, "LINE %s" % msg)
429 msgline(self.key, msg, sname)
430 else:
431 for msg in self._readlines(sname):
432 msgline(self.key, msg, sname)
433
435 """Called at close time to flush stream read buffer."""
436 stream = self.streams[sname]
437 if stream.readable() and stream.rbuf:
438
439
440 self.worker._on_msgline(self.key, stream.rbuf, sname)
441
442 - def write(self, buf, sname=None):
443 """Write to writable stream(s)."""
444 if sname is not None:
445 self._write(sname, buf)
446 return
447
448 for writer in self.streams.writers():
449 self._write(writer.name, buf)
450
452 """Set EOF flag to writable stream(s)."""
453 if sname is not None:
454 self._set_write_eof(sname)
455 return
456
457 for writer in self.streams.writers():
458 self._set_write_eof(writer.name)
459
461 """StreamWorker base class [v1.7+]
462
463 The StreamWorker class implements a base (but concrete) Worker that
464 can read and write to multiple streams. Unlike most other Workers,
465 it does not execute any external commands by itself. Rather, it
466 should be pre-bound to "streams", ie. file(s) or file descriptor(s),
467 using the two following methods:
468 >>> worker.set_reader('stream1', fd1)
469 >>> worker.set_writer('stream2', fd2)
470
471 Like other Workers, the StreamWorker instance should be associated
472 with a Task using task.schedule(worker). When the task engine is
473 ready to process the StreamWorker, all of its streams are being
474 processed together. For that reason, it is not possible to add new
475 readers or writers to a running StreamWorker (ie. task is running
476 and worker is already scheduled).
477
478 Configured readers will generate ev_read() events when data is
479 available for reading. So, the following additional public worker
480 variable is available and defines the stream name for the event:
481 >>> worker.current_sname [ev_read,ev_error]
482
483 Please note that ev_error() is called instead of ev_read() when the
484 stream name is 'stderr'. Indeed, all other stream names use
485 ev_read().
486
487 Configured writers will allow the use of the method write(), eg.
488 worker.write(data, 'stream2'), to write to the stream.
489 """
490
491 - def __init__(self, handler, key=None, stderr=False, timeout=-1,
492 autoclose=False, client_class=StreamClient):
493 Worker.__init__(self, handler)
494 if key is None:
495 key = self
496 self.clients = [client_class(self, key, stderr, timeout, autoclose)]
497
498 - def set_reader(self, sname, sfile, retain=True, closefd=True):
499 """Add a readable stream to StreamWorker.
500
501 Arguments:
502 sname -- the name of the stream (string)
503 sfile -- the stream file or file descriptor
504 retain -- whether the stream retains engine client
505 (default is True)
506 closefd -- whether to close fd when the stream is closed
507 (default is True)
508 """
509 if not self.clients[0].registered:
510 self.clients[0].streams.set_reader(sname, sfile, retain, closefd)
511 else:
512 raise WorkerError("cannot add new stream at runtime")
513
514 - def set_writer(self, sname, sfile, retain=True, closefd=True):
515 """Set a writable stream to StreamWorker.
516
517 Arguments:
518 sname -- the name of the stream (string)
519 sfile -- the stream file or file descriptor
520 retain -- whether the stream retains engine client
521 (default is True)
522 closefd -- whether to close fd when the stream is closed
523 (default is True)
524 """
525 if not self.clients[0].registered:
526 self.clients[0].streams.set_writer(sname, sfile, retain, closefd)
527 else:
528 raise WorkerError("cannot add new stream at runtime")
529
531 """Return a list of underlying engine clients."""
532 return self.clients
533
535 """Source key for this worker is free for use.
536
537 Use this method to set the custom source key for this worker.
538 """
539 self.clients[0].key = key
540
542 """Add a message."""
543
544 self.task._msg_add(self, key, sname, msg)
545
546
547 self.current_sname = sname
548
549
550 if sname == 'stderr':
551
552 self.current_errmsg = msg
553 if self.eh:
554 self.eh.ev_error(self)
555 else:
556
557 self.current_msg = msg
558 if self.eh:
559 self.eh.ev_read(self)
560
562 """Update on timeout."""
563 self.task._timeout_add(self, key)
564
565
566 if self.eh:
567 self.eh.ev_timeout(self)
568
570 """Abort processing any action by this worker."""
571 self.clients[0].abort()
572
573 - def read(self, node=None, sname='stdout'):
574 """Read worker stream buffer.
575
576 Return stream read buffer of current worker.
577
578 Arguments:
579 node -- node name; can also be set to None for simple worker
580 having worker.key defined (default is None)
581 sname -- stream name (default is 'stdout')
582 """
583 return Worker.read(self, node or self.clients[0].key, sname)
584
585 - def write(self, buf, sname=None):
586 """Write to worker.
587
588 If sname is specified, write to the associated stream,
589 otherwise write to all writable streams.
590 """
591 self.clients[0].write(buf, sname)
592
594 """
595 Tell worker to close its writer file descriptor once flushed.
596
597 Do not perform writes after this call. Like write(), sname can
598 be optionally specified to target a specific writable stream,
599 otherwise all writable streams are marked as EOF.
600 """
601 self.clients[0].set_write_eof(sname)
602
604 """WorkerSimple base class [DEPRECATED]
605
606 Implements a simple Worker to manage common process
607 stdin/stdout/stderr streams.
608
609 [DEPRECATED] use StreamWorker.
610 """
611
612 - def __init__(self, file_reader, file_writer, file_error, key, handler,
613 stderr=False, timeout=-1, autoclose=False, closefd=True,
614 client_class=StreamClient):
615 """Initialize WorkerSimple worker."""
616 StreamWorker.__init__(self, handler, key, stderr, timeout, autoclose,
617 client_class=client_class)
618 if file_reader:
619 self.set_reader('stdout', file_reader, closefd=closefd)
620 if file_error:
621 self.set_reader('stderr', file_error, closefd=closefd)
622 if file_writer:
623 self.set_writer('stdin', file_writer, closefd=closefd)
624
625 self._filerefs = (file_reader, file_writer, file_error)
626
628 """Return the standard error reader file descriptor (integer)."""
629 return self.clients[0].streams['stderr'].fd
630
632 """Return the reader file descriptor (integer)."""
633 return self.clients[0].streams['stdout'].fd
634
636 """Return the writer file descriptor as an integer."""
637 return self.clients[0].streams['stdin'].fd
638
640 """
641 Get last read message.
642
643 [DEPRECATED] use current_msg
644 """
645 warnings.warn("use current_msg instead", DeprecationWarning)
646 return self.current_msg
647
649 """
650 Get last error message.
651
652 [DEPRECATED] use current_errmsg
653 """
654 warnings.warn("use current_errmsg instead", DeprecationWarning)
655 return self.current_errmsg
656
658 """Read worker error buffer."""
659 return self.read(sname='stderr')
660
662 """Called on command start."""
663 if not self.started:
664 self.started = True
665 if self.eh:
666 self.eh.ev_start(self)
667
668 if self.eh:
669 self.eh.ev_pickup(self)
670
672 """Command return code received."""
673 self.current_rc = rc
674
675 self.task._rc_set(self, key, rc)
676
677 if self.eh:
678 self.eh.ev_hup(self)
679
681 """Notification of bytes written."""
682
683 self.current_sname = sname
684
685
686
687 if self.eh and len(inspect.getargspec(self.eh.ev_written)[0]) == 5:
688 self.eh.ev_written(self, key, sname, bytes_count)
689