1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """
22 EngineClient
23
24 ClusterShell engine's client interface.
25
26 An engine client is similar to a process, you can start/stop it, read data from
27 it and write data to it. Multiple data channels are supported (eg. stdin, stdout
28 and stderr, or even more...)
29 """
30
31 import errno
32 import logging
33 import os
34 import Queue
35 import thread
36
37 from ClusterShell.Worker.fastsubprocess import Popen, PIPE, STDOUT, \
38 set_nonblock_flag
39
40 from ClusterShell.Engine.Engine import EngineBaseTimer, E_READ, E_WRITE
41
42
43 LOGGER = logging.getLogger(__name__)
44
45
47 """Generic EngineClient exception."""
48
50 """EOF from client."""
51
53 """Base EngineClient error exception."""
54
56 """Operation not supported by EngineClient."""
57
58
60 """EngineClient I/O stream object.
61
62 Internal object used by EngineClient to manage its Engine-registered I/O
63 streams. Each EngineClientStream is bound to a file object (file
64 descriptor). It can be either an input, an output or a bidirectional
65 stream (not used for now)."""
66
67 - def __init__(self, name, sfile=None, evmask=0):
68 """Initialize an EngineClientStream object.
69
70 @param name: Name of stream.
71 @param sfile: File object or file descriptor.
72 @param evmask: Config I/O event bitmask.
73 """
74 self.name = name
75 self.fd = None
76 self.rbuf = ""
77 self.wbuf = ""
78 self.eof = False
79 self.evmask = evmask
80 self.events = 0
81 self.new_events = 0
82 self.retain = False
83 self.closefd = False
84 self.set_file(sfile)
85
86 - def set_file(self, sfile, evmask=0, retain=True, closefd=True):
87 """
88 Set the stream file and event mask for this object.
89 sfile should be a file object or a file descriptor.
90 Event mask can be either E_READ, E_WRITE or both.
91 Currently does NOT retain file object.
92 """
93 try:
94
95 self.fd = sfile.fileno()
96 except AttributeError:
97 self.fd = sfile
98
99 self.evmask = evmask
100
101 self.retain = retain
102
103 self.closefd = closefd
104
106 return "<%s at 0x%s (name=%s fd=%s rbuflen=%d wbuflen=%d eof=%d " \
107 "evmask=0x%x)>" % (self.__class__.__name__, id(self), self.name,
108 self.fd, len(self.rbuf), len(self.wbuf), self.eof, self.evmask)
109
111 """Close stream."""
112 if self.closefd and self.fd is not None:
113 os.close(self.fd)
114
116 """Return whether the stream is setup as readable."""
117 return self.evmask & E_READ
118
120 """Return whether the stream is setup as writable."""
121 return self.evmask & E_WRITE
122
123
125 """EngineClient's named stream dictionary."""
126
127 - def set_stream(self, sname, sfile=None, evmask=0, retain=True,
128 closefd=True):
129 """Set stream based on file object or file descriptor.
130
131 This method can be used to add a stream or update its
132 parameters.
133 """
134 engfile = dict.setdefault(self, sname, EngineClientStream(sname))
135 engfile.set_file(sfile, evmask, retain, closefd)
136 return engfile
137
138 - def set_reader(self, sname, sfile=None, retain=True, closefd=True):
139 """Set readable stream based on file object or file descriptor."""
140 self.set_stream(sname, sfile, E_READ, retain, closefd)
141
142 - def set_writer(self, sname, sfile=None, retain=True, closefd=True):
143 """Set writable stream based on file object or file descriptor."""
144 self.set_stream(sname, sfile, E_WRITE, retain, closefd)
145
147 """Close file object and remove it from this pool."""
148 self[key].close()
149 dict.pop(self, key)
150
153
155 """Clear File Pool"""
156 for stream in self.values():
157 stream.close()
158 dict.clear(self)
159
161 """Get an iterator on readable streams (with fd set)."""
162 return (s for s in self.readers() if s.fd is not None)
163
165 """Get an iterator on all streams setup as readable."""
166 return (s for s in self.values() if s.evmask & E_READ)
167
169 """Get an iterator on writable streams (with fd set)."""
170 return (s for s in self.writers() if s.fd is not None)
171
173 """Get an iterator on all streams setup as writable."""
174 return (s for s in self.values() if s.evmask & E_WRITE)
175
177 """Check whether this set of streams is retained.
178
179 Note on retain: an active stream with retain=True keeps the
180 engine client alive. When only streams with retain=False
181 remain, the engine client terminates.
182
183 Return:
184 True -- when at least one stream is retained
185 False -- when no retainable stream remain
186 """
187 for stream in self.values():
188 if stream.fd is not None and stream.retain:
189 return True
190 return False
191
192
194 """
195 Abstract class EngineClient.
196 """
197
198 - def __init__(self, worker, key, stderr, timeout, autoclose):
199 """EngineClient initializer.
200
201 Should be called from derived classes.
202
203 Arguments:
204 worker -- parent worker instance
205 key -- client key used by MsgTree (eg. node name)
206 stderr -- boolean set if stderr is on a separate stream
207 timeout -- client execution timeout value (float)
208 autoclose -- boolean set to indicate whether this engine
209 client should be aborted as soon as all other
210 non-autoclosing clients have finished.
211 """
212
213 EngineBaseTimer.__init__(self, timeout, -1, autoclose)
214
215 self._reg_epoch = 0
216
217
218 self.registered = False
219 self.delayable = True
220
221 self.worker = worker
222 if key is None:
223 key = id(worker)
224 self.key = key
225
226
227 self._stderr = stderr
228
229
230 self.streams = EngineClientStreamDict()
231
233
234 return '<%s.%s instance at 0x%x key %r>' % (self.__module__,
235 self.__class__.__name__,
236 id(self), self.key)
237
239 """
240 Fire timeout timer.
241 """
242 if self._engine:
243 self._engine.remove(self, abort=True, did_timeout=True)
244
246 """
247 Starts client and returns client instance as a convenience.
248 Derived classes (except EnginePort) must implement.
249 """
250 raise NotImplementedError("Derived classes must implement.")
251
252 - def _close(self, abort, timeout):
253 """
254 Close client. Called by the engine after client has been unregistered.
255 This method should handle both termination types (normal or aborted)
256 and should set timeout status accordingly.
257
258 Derived classes should implement.
259 """
260 for sname in list(self.streams):
261 self._close_stream(sname)
262
264 """
265 Close specific stream by name (internal, called by engine). This method
266 is the regular way to close a stream flushing read buffers accordingly.
267 """
268 self._flush_read(sname)
269
270
271 if sname in self.streams:
272 del self.streams[sname]
273
275 """
276 Set reading state.
277 """
278 self._engine.set_reading(self, sname)
279
281 """
282 Set writing state.
283 """
284 self._engine.set_writing(self, sname)
285
286 - def _read(self, sname, size=65536):
287 """
288 Read data from process.
289 """
290 result = os.read(self.streams[sname].fd, size)
291 if len(result) == 0:
292 raise EngineClientEOF()
293 self._set_reading(sname)
294 return result
295
297 """Called when stream is closing to flush read buffers."""
298 pass
299
301 """
302 Handle a read notification. Called by the engine as the result of an
303 event indicating that a read is available.
304 """
305 raise NotImplementedError("Derived classes must implement.")
306
308 """
309 Handle a write notification. Called by the engine as the result of an
310 event indicating that a write can be performed now.
311 """
312 wfile = self.streams[sname]
313 if not wfile.wbuf and wfile.eof:
314
315 self._engine.remove_stream(self, wfile)
316 elif len(wfile.wbuf) > 0:
317 try:
318 wcnt = os.write(wfile.fd, wfile.wbuf)
319 except OSError, exc:
320 if exc.errno == errno.EAGAIN:
321
322
323 self._set_writing(sname)
324 return
325 if exc.errno == errno.EPIPE:
326
327 LOGGER.warning('%r: %s', self, exc)
328 return
329 raise
330 if wcnt > 0:
331
332 wfile.wbuf = wfile.wbuf[wcnt:]
333
334 if wfile.eof and not wfile.wbuf:
335 self.worker._on_written(self.key, wcnt, sname)
336
337 self._engine.remove_stream(self, wfile)
338 else:
339 self._set_writing(sname)
340 self.worker._on_written(self.key, wcnt, sname)
341
343 """
344 Utility method to launch a command with stdin/stdout file
345 descriptors configured in non-blocking mode.
346 """
347 full_env = None
348 if env:
349 full_env = os.environ.copy()
350 full_env.update(env)
351
352 if self._stderr:
353 stderr_setup = PIPE
354 else:
355 stderr_setup = STDOUT
356
357
358 proc = Popen(commandlist, bufsize=0, stdin=PIPE, stdout=PIPE,
359 stderr=stderr_setup, shell=shell, env=full_env)
360
361 if self._stderr:
362 self.streams.set_stream(self.worker.SNAME_STDERR, proc.stderr,
363 E_READ)
364 self.streams.set_stream(self.worker.SNAME_STDOUT, proc.stdout, E_READ)
365 self.streams.set_stream(self.worker.SNAME_STDIN, proc.stdin, E_WRITE,
366 retain=False)
367
368 return proc
369
371 """Utility method to read client lines."""
372
373 readbuf = self._read(sname)
374 assert len(readbuf) > 0, "assertion failed: len(readbuf) > 0"
375
376
377
378
379 rfile = self.streams[sname]
380
381 buf = rfile.rbuf + readbuf
382 lines = buf.splitlines(True)
383 rfile.rbuf = ""
384 for line in lines:
385 if line.endswith('\n'):
386 if line.endswith('\r\n'):
387 yield line[:-2]
388 else:
389
390 yield line[:-1]
391 else:
392
393 rfile.rbuf = line
394
395
396 - def _write(self, sname, buf):
397 """Add some data to be written to the client."""
398 wfile = self.streams[sname]
399 if self._engine and wfile.fd:
400 wfile.wbuf += buf
401
402 self._handle_write(sname)
403 else:
404
405 wfile.wbuf += buf
406
408 """Set EOF on specific writable stream."""
409 if sname not in self.streams:
410 LOGGER.debug("stream %s was already closed on client %s, skipping",
411 sname, self.key)
412 return
413
414 wfile = self.streams[sname]
415 wfile.eof = True
416 if self._engine and wfile.fd and not wfile.wbuf:
417
418 self._engine.remove_stream(self, wfile)
419
421 """Abort processing any action by this client."""
422 if self._engine:
423 self._engine.remove(self, abort=True)
424
426 """
427 An EnginePort is an abstraction object to deliver messages
428 reliably between tasks.
429 """
430
432 """Private class representing a port message.
433
434 A port message may be any Python object.
435 """
436
438 self._user_msg = user_msg
439 self._sync_msg = sync
440 self.reply_lock = thread.allocate_lock()
441 self.reply_lock.acquire()
442
444 """
445 Get and acknowledge message.
446 """
447 self.reply_lock.release()
448 return self._user_msg
449
451 """
452 Wait for message acknowledgment if needed.
453 """
454 if self._sync_msg:
455 self.reply_lock.acquire()
456
457 - def __init__(self, task, handler=None, autoclose=False):
458 """
459 Initialize EnginePort object.
460 """
461 EngineClient.__init__(self, None, None, False, -1, autoclose)
462 self.task = task
463 self.eh = handler
464
465 self.delayable = False
466
467
468 self._msgq = Queue.Queue(self.task.default("port_qlimit"))
469
470
471 (readfd, writefd) = os.pipe()
472
473 set_nonblock_flag(readfd)
474 set_nonblock_flag(writefd)
475 self.streams.set_stream('in', readfd, E_READ)
476 self.streams.set_stream('out', writefd, E_WRITE)
477
479 try:
480 fd_in = self.streams['in'].fd
481 except KeyError:
482 fd_in = None
483 try:
484 fd_out = self.streams['out'].fd
485 except KeyError:
486 fd_out = None
487 return "<%s at 0x%s (streams=(%d, %d))>" % (self.__class__.__name__, \
488 id(self), fd_in, fd_out)
489
491 """Start port."""
492 return self
493
494 - def _close(self, abort, timeout):
495 """Close port."""
496 if not self._msgq.empty():
497
498 try:
499 while not self._msgq.empty():
500 pmsg = self._msgq.get(block=False)
501 if self.task.info("debug", False):
502 self.task.info("print_debug")(self.task,
503 "EnginePort: dropped msg: %s" % str(pmsg.get()))
504 except Queue.Empty:
505 pass
506 self._msgq = None
507 del self.streams['out']
508 del self.streams['in']
509
511 """
512 Handle a read notification. Called by the engine as the result of an
513 event indicating that a read is available.
514 """
515 readbuf = self._read(sname, 4096)
516 for dummy_char in readbuf:
517
518 pmsg = self._msgq.get(block=False)
519 self.eh.ev_msg(self, pmsg.get())
520
521 - def msg(self, send_msg, send_once=False):
522 """
523 Port message send method that will wait for acknowledgement
524 unless the send_once parameter if set.
525
526 May be called from another thread. Will generate ev_msg() on
527 Port event handler (in Port task/thread).
528
529 Return False if the message cannot be sent (eg. port closed).
530 """
531 if self._msgq is None:
532 return False
533
534 pmsg = EnginePort._Msg(send_msg, not send_once)
535 self._msgq.put(pmsg, block=True, timeout=None)
536 try:
537 ret = os.write(self.streams['out'].fd, "M")
538 except OSError:
539 raise
540 pmsg.sync()
541 return ret == 1
542
544 """
545 Port message send-once method (no acknowledgement). See msg().
546
547 Return False if the message cannot be sent (eg. port closed).
548 """
549 return self.msg(send_msg, send_once=True)
550