Package ClusterShell :: Package Worker :: Module EngineClient
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Worker.EngineClient

  1  # 
  2  # Copyright (C) 2009-2016 CEA/DAM 
  3  # Copyright (C) 2016 Stephane Thiell <sthiell@stanford.edu> 
  4  # 
  5  # This file is part of ClusterShell. 
  6  # 
  7  # ClusterShell is free software; you can redistribute it and/or 
  8  # modify it under the terms of the GNU Lesser General Public 
  9  # License as published by the Free Software Foundation; either 
 10  # version 2.1 of the License, or (at your option) any later version. 
 11  # 
 12  # ClusterShell is distributed in the hope that it will be useful, 
 13  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 14  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 15  # Lesser General Public License for more details. 
 16  # 
 17  # You should have received a copy of the GNU Lesser General Public 
 18  # License along with ClusterShell; if not, write to the Free Software 
 19  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 
 20   
 21  """ 
 22  EngineClient 
 23   
 24  ClusterShell engine's client interface. 
 25   
 26  An engine client is similar to a process, you can start/stop it, read data from 
 27  it and write data to it. Multiple data channels are supported (eg. stdin, stdout 
 28  and stderr, or even more...) 
 29  """ 
 30   
 31  import errno 
 32  import logging 
 33  import os 
 34  import Queue 
 35  import thread 
 36   
 37  from ClusterShell.Worker.fastsubprocess import Popen, PIPE, STDOUT, \ 
 38      set_nonblock_flag 
 39   
 40  from ClusterShell.Engine.Engine import EngineBaseTimer, E_READ, E_WRITE 
 41   
 42   
 43  LOGGER = logging.getLogger(__name__) 
 44   
 45   
46 -class EngineClientException(Exception):
47 """Generic EngineClient exception."""
48
49 -class EngineClientEOF(EngineClientException):
50 """EOF from client."""
51
52 -class EngineClientError(EngineClientException):
53 """Base EngineClient error exception."""
54
55 -class EngineClientNotSupportedError(EngineClientError):
56 """Operation not supported by EngineClient."""
57 58
59 -class EngineClientStream(object):
60 """EngineClient I/O stream object. 61 62 Internal object used by EngineClient to manage its Engine-registered I/O 63 streams. Each EngineClientStream is bound to a file object (file 64 descriptor). It can be either an input, an output or a bidirectional 65 stream (not used for now).""" 66
67 - def __init__(self, name, sfile=None, evmask=0):
68 """Initialize an EngineClientStream object. 69 70 @param name: Name of stream. 71 @param sfile: File object or file descriptor. 72 @param evmask: Config I/O event bitmask. 73 """ 74 self.name = name 75 self.fd = None 76 self.rbuf = "" 77 self.wbuf = "" 78 self.eof = False 79 self.evmask = evmask 80 self.events = 0 81 self.new_events = 0 82 self.retain = False 83 self.closefd = False 84 self.set_file(sfile)
85
86 - def set_file(self, sfile, evmask=0, retain=True, closefd=True):
87 """ 88 Set the stream file and event mask for this object. 89 sfile should be a file object or a file descriptor. 90 Event mask can be either E_READ, E_WRITE or both. 91 Currently does NOT retain file object. 92 """ 93 try: 94 # file descriptor 95 self.fd = sfile.fileno() 96 except AttributeError: 97 self.fd = sfile 98 # Set I/O event mask 99 self.evmask = evmask 100 # Set retain flag 101 self.retain = retain 102 # Set closefd flag 103 self.closefd = closefd
104
105 - def __repr__(self):
106 return "<%s at 0x%s (name=%s fd=%s rbuflen=%d wbuflen=%d eof=%d " \ 107 "evmask=0x%x)>" % (self.__class__.__name__, id(self), self.name, 108 self.fd, len(self.rbuf), len(self.wbuf), self.eof, self.evmask)
109
110 - def close(self):
111 """Close stream.""" 112 if self.closefd and self.fd is not None: 113 os.close(self.fd)
114
115 - def readable(self):
116 """Return whether the stream is setup as readable.""" 117 return self.evmask & E_READ
118
119 - def writable(self):
120 """Return whether the stream is setup as writable.""" 121 return self.evmask & E_WRITE
122 123
124 -class EngineClientStreamDict(dict):
125 """EngineClient's named stream dictionary.""" 126
127 - def set_stream(self, sname, sfile=None, evmask=0, retain=True, 128 closefd=True):
129 """Set stream based on file object or file descriptor. 130 131 This method can be used to add a stream or update its 132 parameters. 133 """ 134 engfile = dict.setdefault(self, sname, EngineClientStream(sname)) 135 engfile.set_file(sfile, evmask, retain, closefd) 136 return engfile
137
138 - def set_reader(self, sname, sfile=None, retain=True, closefd=True):
139 """Set readable stream based on file object or file descriptor.""" 140 self.set_stream(sname, sfile, E_READ, retain, closefd)
141
142 - def set_writer(self, sname, sfile=None, retain=True, closefd=True):
143 """Set writable stream based on file object or file descriptor.""" 144 self.set_stream(sname, sfile, E_WRITE, retain, closefd)
145
146 - def destroy(self, key):
147 """Close file object and remove it from this pool.""" 148 self[key].close() 149 dict.pop(self, key)
150
151 - def __delitem__(self, key):
152 self.destroy(key)
153
154 - def clear(self):
155 """Clear File Pool""" 156 for stream in self.values(): 157 stream.close() 158 dict.clear(self)
159
160 - def active_readers(self):
161 """Get an iterator on readable streams (with fd set).""" 162 return (s for s in self.readers() if s.fd is not None)
163
164 - def readers(self):
165 """Get an iterator on all streams setup as readable.""" 166 return (s for s in self.values() if s.evmask & E_READ)
167
168 - def active_writers(self):
169 """Get an iterator on writable streams (with fd set).""" 170 return (s for s in self.writers() if s.fd is not None)
171
172 - def writers(self):
173 """Get an iterator on all streams setup as writable.""" 174 return (s for s in self.values() if s.evmask & E_WRITE)
175
176 - def retained(self):
177 """Check whether this set of streams is retained. 178 179 Note on retain: an active stream with retain=True keeps the 180 engine client alive. When only streams with retain=False 181 remain, the engine client terminates. 182 183 Return: 184 True -- when at least one stream is retained 185 False -- when no retainable stream remain 186 """ 187 for stream in self.values(): 188 if stream.fd is not None and stream.retain: 189 return True 190 return False
191 192
193 -class EngineClient(EngineBaseTimer):
194 """ 195 Abstract class EngineClient. 196 """ 197
198 - def __init__(self, worker, key, stderr, timeout, autoclose):
199 """EngineClient initializer. 200 201 Should be called from derived classes. 202 203 Arguments: 204 worker -- parent worker instance 205 key -- client key used by MsgTree (eg. node name) 206 stderr -- boolean set if stderr is on a separate stream 207 timeout -- client execution timeout value (float) 208 autoclose -- boolean set to indicate whether this engine 209 client should be aborted as soon as all other 210 non-autoclosing clients have finished. 211 """ 212 213 EngineBaseTimer.__init__(self, timeout, -1, autoclose) 214 215 self._reg_epoch = 0 # registration generation number 216 217 # read-only public 218 self.registered = False # registered on engine or not 219 self.delayable = True # subject to fanout limit 220 221 self.worker = worker 222 if key is None: 223 key = id(worker) 224 self.key = key 225 226 # boolean indicating whether stderr is on a separate fd 227 self._stderr = stderr 228 229 # streams associated with this client 230 self.streams = EngineClientStreamDict()
231
232 - def __repr__(self):
233 # added repr(self.key) 234 return '<%s.%s instance at 0x%x key %r>' % (self.__module__, 235 self.__class__.__name__, 236 id(self), self.key)
237
238 - def _fire(self):
239 """ 240 Fire timeout timer. 241 """ 242 if self._engine: 243 self._engine.remove(self, abort=True, did_timeout=True)
244
245 - def _start(self):
246 """ 247 Starts client and returns client instance as a convenience. 248 Derived classes (except EnginePort) must implement. 249 """ 250 raise NotImplementedError("Derived classes must implement.")
251
252 - def _close(self, abort, timeout):
253 """ 254 Close client. Called by the engine after client has been unregistered. 255 This method should handle both termination types (normal or aborted) 256 and should set timeout status accordingly. 257 258 Derived classes should implement. 259 """ 260 for sname in list(self.streams): 261 self._close_stream(sname)
262
263 - def _close_stream(self, sname):
264 """ 265 Close specific stream by name (internal, called by engine). This method 266 is the regular way to close a stream flushing read buffers accordingly. 267 """ 268 self._flush_read(sname) 269 # flush_read() is useful but may generate user events (ev_read) that 270 # could lead to worker abort and then ev_close. Be careful there. 271 if sname in self.streams: 272 del self.streams[sname]
273
274 - def _set_reading(self, sname):
275 """ 276 Set reading state. 277 """ 278 self._engine.set_reading(self, sname)
279
280 - def _set_writing(self, sname):
281 """ 282 Set writing state. 283 """ 284 self._engine.set_writing(self, sname)
285
286 - def _read(self, sname, size=65536):
287 """ 288 Read data from process. 289 """ 290 result = os.read(self.streams[sname].fd, size) 291 if len(result) == 0: 292 raise EngineClientEOF() 293 self._set_reading(sname) 294 return result
295
296 - def _flush_read(self, sname):
297 """Called when stream is closing to flush read buffers.""" 298 pass # derived classes may implement
299
300 - def _handle_read(self, sname):
301 """ 302 Handle a read notification. Called by the engine as the result of an 303 event indicating that a read is available. 304 """ 305 raise NotImplementedError("Derived classes must implement.")
306
307 - def _handle_write(self, sname):
308 """ 309 Handle a write notification. Called by the engine as the result of an 310 event indicating that a write can be performed now. 311 """ 312 wfile = self.streams[sname] 313 if not wfile.wbuf and wfile.eof: 314 # remove stream from engine (not directly) 315 self._engine.remove_stream(self, wfile) 316 elif len(wfile.wbuf) > 0: 317 try: 318 wcnt = os.write(wfile.fd, wfile.wbuf) 319 except OSError, exc: 320 if exc.errno == errno.EAGAIN: 321 # _handle_write() is not only called by the engine but also 322 # by _write(), so this is legit: we just try again later 323 self._set_writing(sname) 324 return 325 if exc.errno == errno.EPIPE: 326 # broken pipe: log warning message and do NOT retry 327 LOGGER.warning('%r: %s', self, exc) 328 return 329 raise 330 if wcnt > 0: 331 # dequeue written buffer 332 wfile.wbuf = wfile.wbuf[wcnt:] 333 # check for possible ending 334 if wfile.eof and not wfile.wbuf: 335 self.worker._on_written(self.key, wcnt, sname) 336 # remove stream from engine (not directly) 337 self._engine.remove_stream(self, wfile) 338 else: 339 self._set_writing(sname) 340 self.worker._on_written(self.key, wcnt, sname)
341
342 - def _exec_nonblock(self, commandlist, shell=False, env=None):
343 """ 344 Utility method to launch a command with stdin/stdout file 345 descriptors configured in non-blocking mode. 346 """ 347 full_env = None 348 if env: 349 full_env = os.environ.copy() 350 full_env.update(env) 351 352 if self._stderr: 353 stderr_setup = PIPE 354 else: 355 stderr_setup = STDOUT 356 357 # Launch process in non-blocking mode 358 proc = Popen(commandlist, bufsize=0, stdin=PIPE, stdout=PIPE, 359 stderr=stderr_setup, shell=shell, env=full_env) 360 361 if self._stderr: 362 self.streams.set_stream(self.worker.SNAME_STDERR, proc.stderr, 363 E_READ) 364 self.streams.set_stream(self.worker.SNAME_STDOUT, proc.stdout, E_READ) 365 self.streams.set_stream(self.worker.SNAME_STDIN, proc.stdin, E_WRITE, 366 retain=False) 367 368 return proc
369
370 - def _readlines(self, sname):
371 """Utility method to read client lines.""" 372 # read a chunk of data, may raise eof 373 readbuf = self._read(sname) 374 assert len(readbuf) > 0, "assertion failed: len(readbuf) > 0" 375 376 # Current version implements line-buffered reads. If needed, we could 377 # easily provide direct, non-buffered, data reads in the future. 378 379 rfile = self.streams[sname] 380 381 buf = rfile.rbuf + readbuf 382 lines = buf.splitlines(True) 383 rfile.rbuf = "" 384 for line in lines: 385 if line.endswith('\n'): 386 if line.endswith('\r\n'): 387 yield line[:-2] # trim CRLF 388 else: 389 # trim LF 390 yield line[:-1] # trim LF 391 else: 392 # keep partial line in buffer 393 rfile.rbuf = line
394 # breaking here 395
396 - def _write(self, sname, buf):
397 """Add some data to be written to the client.""" 398 wfile = self.streams[sname] 399 if self._engine and wfile.fd: 400 wfile.wbuf += buf 401 # give it a try now (will set writing flag anyhow) 402 self._handle_write(sname) 403 else: 404 # bufferize until pipe is ready 405 wfile.wbuf += buf
406
407 - def _set_write_eof(self, sname):
408 """Set EOF on specific writable stream.""" 409 if sname not in self.streams: 410 LOGGER.debug("stream %s was already closed on client %s, skipping", 411 sname, self.key) 412 return 413 414 wfile = self.streams[sname] 415 wfile.eof = True 416 if self._engine and wfile.fd and not wfile.wbuf: 417 # sendq empty, remove stream now 418 self._engine.remove_stream(self, wfile)
419
420 - def abort(self):
421 """Abort processing any action by this client.""" 422 if self._engine: 423 self._engine.remove(self, abort=True)
424
425 -class EnginePort(EngineClient):
426 """ 427 An EnginePort is an abstraction object to deliver messages 428 reliably between tasks. 429 """ 430
431 - class _Msg:
432 """Private class representing a port message. 433 434 A port message may be any Python object. 435 """ 436
437 - def __init__(self, user_msg, sync):
438 self._user_msg = user_msg 439 self._sync_msg = sync 440 self.reply_lock = thread.allocate_lock() 441 self.reply_lock.acquire()
442
443 - def get(self):
444 """ 445 Get and acknowledge message. 446 """ 447 self.reply_lock.release() 448 return self._user_msg
449
450 - def sync(self):
451 """ 452 Wait for message acknowledgment if needed. 453 """ 454 if self._sync_msg: 455 self.reply_lock.acquire()
456
457 - def __init__(self, task, handler=None, autoclose=False):
458 """ 459 Initialize EnginePort object. 460 """ 461 EngineClient.__init__(self, None, None, False, -1, autoclose) 462 self.task = task 463 self.eh = handler 464 # ports are no subject to fanout 465 self.delayable = False 466 467 # Port messages queue 468 self._msgq = Queue.Queue(self.task.default("port_qlimit")) 469 470 # Request pipe 471 (readfd, writefd) = os.pipe() 472 # Set nonblocking flag 473 set_nonblock_flag(readfd) 474 set_nonblock_flag(writefd) 475 self.streams.set_stream('in', readfd, E_READ) 476 self.streams.set_stream('out', writefd, E_WRITE)
477
478 - def __repr__(self):
479 try: 480 fd_in = self.streams['in'].fd 481 except KeyError: 482 fd_in = None 483 try: 484 fd_out = self.streams['out'].fd 485 except KeyError: 486 fd_out = None 487 return "<%s at 0x%s (streams=(%d, %d))>" % (self.__class__.__name__, \ 488 id(self), fd_in, fd_out)
489
490 - def _start(self):
491 """Start port.""" 492 return self
493
494 - def _close(self, abort, timeout):
495 """Close port.""" 496 if not self._msgq.empty(): 497 # purge msgq 498 try: 499 while not self._msgq.empty(): 500 pmsg = self._msgq.get(block=False) 501 if self.task.info("debug", False): 502 self.task.info("print_debug")(self.task, 503 "EnginePort: dropped msg: %s" % str(pmsg.get())) 504 except Queue.Empty: 505 pass 506 self._msgq = None 507 del self.streams['out'] 508 del self.streams['in']
509
510 - def _handle_read(self, sname):
511 """ 512 Handle a read notification. Called by the engine as the result of an 513 event indicating that a read is available. 514 """ 515 readbuf = self._read(sname, 4096) 516 for dummy_char in readbuf: 517 # raise Empty if empty (should never happen) 518 pmsg = self._msgq.get(block=False) 519 self.eh.ev_msg(self, pmsg.get())
520
521 - def msg(self, send_msg, send_once=False):
522 """ 523 Port message send method that will wait for acknowledgement 524 unless the send_once parameter if set. 525 526 May be called from another thread. Will generate ev_msg() on 527 Port event handler (in Port task/thread). 528 529 Return False if the message cannot be sent (eg. port closed). 530 """ 531 if self._msgq is None: # called after port closed? 532 return False 533 534 pmsg = EnginePort._Msg(send_msg, not send_once) 535 self._msgq.put(pmsg, block=True, timeout=None) 536 try: 537 ret = os.write(self.streams['out'].fd, "M") 538 except OSError: 539 raise 540 pmsg.sync() 541 return ret == 1
542
543 - def msg_send(self, send_msg):
544 """ 545 Port message send-once method (no acknowledgement). See msg(). 546 547 Return False if the message cannot be sent (eg. port closed). 548 """ 549 return self.msg(send_msg, send_once=True)
550