Package ClusterShell :: Package Engine :: Module Engine
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Engine.Engine

  1  # 
  2  # Copyright (C) 2007-2016 CEA/DAM 
  3  # Copyright (C) 2015-2016 Stephane Thiell <sthiell@stanford.edu> 
  4  # 
  5  # This file is part of ClusterShell. 
  6  # 
  7  # ClusterShell is free software; you can redistribute it and/or 
  8  # modify it under the terms of the GNU Lesser General Public 
  9  # License as published by the Free Software Foundation; either 
 10  # version 2.1 of the License, or (at your option) any later version. 
 11  # 
 12  # ClusterShell is distributed in the hope that it will be useful, 
 13  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 14  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 15  # Lesser General Public License for more details. 
 16  # 
 17  # You should have received a copy of the GNU Lesser General Public 
 18  # License along with ClusterShell; if not, write to the Free Software 
 19  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 
 20   
 21  """ 
 22  Interface of underlying Task's Engine. 
 23   
 24  An Engine implements a loop your thread enters and uses to call event handlers 
 25  in response to incoming events (from workers, timers, etc.). 
 26  """ 
 27   
 28  import errno 
 29  import heapq 
 30  import logging 
 31  import sys 
 32  import time 
 33  import traceback 
 34   
 35   
 36  LOGGER = logging.getLogger(__name__) 
 37   
 38  # Engine client fd I/O event interest bits 
 39  E_READ = 0x1 
 40  E_WRITE = 0x2 
 41   
 42  # Define epsilon value for time float arithmetic operations 
 43  EPSILON = 1.0e-3 
 44   
 45  # Special fanout value for unlimited 
 46  FANOUT_UNLIMITED = -1 
 47  # Special fanout value to use default Engine fanout 
 48  FANOUT_DEFAULT = None 
 49   
 50   
51 -class EngineException(Exception):
52 """ 53 Base engine exception. 54 """
55
56 -class EngineAbortException(EngineException):
57 """ 58 Raised on user abort. 59 """
60 - def __init__(self, kill):
61 EngineException.__init__(self) 62 self.kill = kill
63
64 -class EngineTimeoutException(EngineException):
65 """ 66 Raised when a timeout is encountered. 67 """
68
69 -class EngineIllegalOperationError(EngineException):
70 """ 71 Error raised when an illegal operation has been performed. 72 """
73
74 -class EngineAlreadyRunningError(EngineIllegalOperationError):
75 """ 76 Error raised when the engine is already running. 77 """
78
79 -class EngineNotSupportedError(EngineException):
80 """ 81 Error raised when the engine mechanism is not supported. 82 """
83 - def __init__(self, engineid):
84 EngineException.__init__(self) 85 self.engineid = engineid
86 87
88 -class EngineBaseTimer:
89 """ 90 Abstract class for ClusterShell's engine timer. Such a timer 91 requires a relative fire time (delay) in seconds (as float), and 92 supports an optional repeating interval in seconds (as float too). 93 94 See EngineTimer for more information about ClusterShell timers. 95 """ 96
97 - def __init__(self, fire_delay, interval=-1.0, autoclose=False):
98 """ 99 Create a base timer. 100 """ 101 self.fire_delay = fire_delay 102 self.interval = interval 103 self.autoclose = autoclose 104 self._engine = None 105 self._timercase = None
106
107 - def _set_engine(self, engine):
108 """ 109 Bind to engine, called by Engine. 110 """ 111 if self._engine: 112 # A timer can be registered to only one engine at a time. 113 raise EngineIllegalOperationError("Already bound to engine.") 114 115 self._engine = engine
116
117 - def invalidate(self):
118 """ 119 Invalidates a timer object, stopping it from ever firing again. 120 """ 121 if self._engine: 122 self._engine.timerq.invalidate(self) 123 self._engine = None
124
125 - def is_valid(self):
126 """ 127 Returns a boolean value that indicates whether an EngineTimer 128 object is valid and able to fire. 129 """ 130 return self._engine is not None
131
132 - def set_nextfire(self, fire_delay, interval=-1):
133 """ 134 Set the next firing delay in seconds for an EngineTimer object. 135 136 The optional paramater `interval' sets the firing interval 137 of the timer. If not specified, the timer fires once and then 138 is automatically invalidated. 139 140 Time values are expressed in second using floating point 141 values. Precision is implementation (and system) dependent. 142 143 It is safe to call this method from the task owning this 144 timer object, in any event handlers, anywhere. 145 146 However, resetting a timer's next firing time may be a 147 relatively expensive operation. It is more efficient to let 148 timers autorepeat or to use this method from the timer's own 149 event handler callback (ie. from its ev_timer). 150 """ 151 if not self.is_valid(): 152 raise EngineIllegalOperationError("Operation on invalid timer.") 153 154 self.fire_delay = fire_delay 155 self.interval = interval 156 self._engine.timerq.reschedule(self)
157
158 - def _fire(self):
159 raise NotImplementedError("Derived classes must implement.")
160 161
162 -class EngineTimer(EngineBaseTimer):
163 """ 164 Concrete class EngineTimer 165 166 An EngineTimer object represents a timer bound to an engine that 167 fires at a preset time in the future. Timers can fire either only 168 once or repeatedly at fixed time intervals. Repeating timers can 169 also have their next firing time manually adjusted. 170 171 A timer is not a real-time mechanism; it fires when the task's 172 underlying engine to which the timer has been added is running and 173 able to check if the timer's firing time has passed. 174 """ 175
176 - def __init__(self, fire_delay, interval, autoclose, handler):
177 EngineBaseTimer.__init__(self, fire_delay, interval, autoclose) 178 self.eh = handler 179 assert self.eh is not None, "An event handler is needed for timer."
180
181 - def _fire(self):
182 self.eh.ev_timer(self)
183
184 -class _EngineTimerQ:
185
186 - class _EngineTimerCase:
187 """ 188 Helper class that allows comparisons of fire times, to be easily used 189 in an heapq. 190 """
191 - def __init__(self, client):
192 self.client = client 193 self.client._timercase = self 194 # arm timer (first time) 195 assert self.client.fire_delay > -EPSILON 196 self.fire_date = self.client.fire_delay + time.time()
197
198 - def __cmp__(self, other):
199 return cmp(self.fire_date, other.fire_date)
200
201 - def arm(self, client):
202 assert client is not None 203 self.client = client 204 self.client._timercase = self 205 # setup next firing date 206 time_current = time.time() 207 if self.client.fire_delay > -EPSILON: 208 self.fire_date = self.client.fire_delay + time_current 209 else: 210 interval = float(self.client.interval) 211 assert interval > 0 212 # Keep it simple: increase fire_date by interval even if 213 # fire_date stays in the past, as in that case it's going to 214 # fire again at next runloop anyway. 215 self.fire_date += interval 216 # Just print a debug message that could help detect issues 217 # coming from a long-running timer handler. 218 if self.fire_date < time_current: 219 LOGGER.debug("Warning: passed interval time for %r " 220 "(long running event handler?)", self.client)
221
222 - def disarm(self):
223 client = self.client 224 client._timercase = None 225 self.client = None 226 return client
227
228 - def armed(self):
229 return self.client is not None
230 231
232 - def __init__(self, engine):
233 """ 234 Initializer. 235 """ 236 self._engine = engine 237 self.timers = [] 238 self.armed_count = 0
239
240 - def __len__(self):
241 """ 242 Return the number of active timers. 243 """ 244 return self.armed_count
245
246 - def schedule(self, client):
247 """ 248 Insert and arm a client's timer. 249 """ 250 # arm only if fire is set 251 if client.fire_delay > -EPSILON: 252 heapq.heappush(self.timers, _EngineTimerQ._EngineTimerCase(client)) 253 self.armed_count += 1 254 if not client.autoclose: 255 self._engine.evlooprefcnt += 1
256
257 - def reschedule(self, client):
258 """ 259 Re-insert client's timer. 260 """ 261 if client._timercase: 262 self.invalidate(client) 263 self._dequeue_disarmed() 264 self.schedule(client)
265
266 - def invalidate(self, client):
267 """ 268 Invalidate client's timer. Current implementation doesn't really remove 269 the timer, but simply flags it as disarmed. 270 """ 271 if not client._timercase: 272 # if timer is being fire, invalidate its values 273 client.fire_delay = -1.0 274 client.interval = -1.0 275 return 276 277 if self.armed_count <= 0: 278 raise ValueError, "Engine client timer not found in timer queue" 279 280 client._timercase.disarm() 281 self.armed_count -= 1 282 if not client.autoclose: 283 self._engine.evlooprefcnt -= 1
284
285 - def _dequeue_disarmed(self):
286 """ 287 Dequeue disarmed timers (sort of garbage collection). 288 """ 289 while len(self.timers) > 0 and not self.timers[0].armed(): 290 heapq.heappop(self.timers)
291
292 - def fire_expired(self):
293 """ 294 Remove expired timers from the queue and fire associated clients. 295 """ 296 self._dequeue_disarmed() 297 298 # Build a queue of expired timercases. Any expired (and still armed) 299 # timer is fired, but only once per call. 300 expired_timercases = [] 301 now = time.time() 302 while self.timers and (self.timers[0].fire_date - now) <= EPSILON: 303 expired_timercases.append(heapq.heappop(self.timers)) 304 self._dequeue_disarmed() 305 306 for timercase in expired_timercases: 307 # Be careful to recheck and skip any disarmed timers (eg. timer 308 # could be invalidated from another timer's event handler) 309 if not timercase.armed(): 310 continue 311 312 # Disarm timer 313 client = timercase.disarm() 314 315 # Fire timer 316 client.fire_delay = -1.0 317 client._fire() 318 319 # Rearm it if needed - Note: fire=0 is valid, interval=0 is not 320 if client.fire_delay >= -EPSILON or client.interval > EPSILON: 321 timercase.arm(client) 322 heapq.heappush(self.timers, timercase) 323 else: 324 self.armed_count -= 1 325 if not client.autoclose: 326 self._engine.evlooprefcnt -= 1
327
328 - def nextfire_delay(self):
329 """ 330 Return next timer fire delay (relative time). 331 """ 332 self._dequeue_disarmed() 333 if len(self.timers) > 0: 334 return max(0., self.timers[0].fire_date - time.time()) 335 336 return -1
337
338 - def clear(self):
339 """ 340 Stop and clear all timers. 341 """ 342 for timer in self.timers: 343 if timer.armed(): 344 timer.client.invalidate() 345 346 self.timers = [] 347 self.armed_count = 0
348 349
350 -class Engine:
351 """ 352 Base class for ClusterShell Engines. 353 354 Subclasses have to implement a runloop listening for client events. 355 Subclasses that override other than "pure virtual methods" should call 356 corresponding base class methods. 357 """ 358 359 identifier = "(none)" 360
361 - def __init__(self, info):
362 """Initialize base class.""" 363 # take a reference on info dict 364 self.info = info 365 366 # and update engine id 367 self.info['engine'] = self.identifier 368 369 # keep track of all clients 370 self._clients = set() 371 self._ports = set() 372 373 # keep track of the number of registered clients per worker 374 # (this does not include ports) 375 self._reg_stats = {} 376 377 # keep track of registered file descriptors in a dict where keys 378 # are fileno and values are (EngineClient, EngineClientStream) tuples 379 self.reg_clifds = {} 380 381 # fanout cache used to speed up client launch when fanout changed 382 self._prev_fanout = 0 # fanout_diff != 0 the first time 383 384 # Current loop iteration counter. It is the number of performed engine 385 # loops in order to keep track of client registration epoch, so we can 386 # safely process FDs by chunk and re-use FDs (see Engine._fd2client). 387 self._current_loopcnt = 0 388 389 # Current stream being processed 390 self._current_stream = None 391 392 # timer queue to handle both timers and clients timeout 393 self.timerq = _EngineTimerQ(self) 394 395 # reference count to the event loop (must include registered 396 # clients and timers configured WITHOUT autoclose) 397 self.evlooprefcnt = 0 398 399 # running state 400 self.running = False 401 # runloop-has-exited flag 402 self._exited = False
403
404 - def release(self):
405 """Release engine-specific resources.""" 406 pass
407
408 - def clients(self):
409 """Get a copy of clients set.""" 410 return self._clients.copy()
411
412 - def ports(self):
413 """ 414 Get a copy of ports set. 415 """ 416 return self._ports.copy()
417
418 - def _fd2client(self, fd):
419 client, stream = self.reg_clifds.get(fd, (None, None)) 420 if client: 421 if client._reg_epoch < self._current_loopcnt: 422 return client, stream 423 else: 424 LOGGER.debug("_fd2client: ignoring just re-used FD %d", 425 stream.fd) 426 return (None, None)
427
428 - def _can_register(self, client):
429 assert not client.registered 430 431 if not client.delayable or client.worker._fanout == FANOUT_UNLIMITED: 432 return True 433 elif client.worker._fanout is FANOUT_DEFAULT: 434 return self._reg_stats.get('default', 0) < self.info['fanout'] 435 else: 436 worker = client.worker 437 return self._reg_stats.get(worker, 0) < worker._fanout
438
439 - def _update_reg_stats(self, client, offset):
440 if client.worker._fanout is FANOUT_DEFAULT: 441 key = 'default' 442 else: 443 key = client.worker 444 self._reg_stats.setdefault(key, 0) 445 self._reg_stats[key] += offset
446
447 - def add(self, client):
448 """Add a client to engine.""" 449 # bind to engine 450 client._set_engine(self) 451 452 if client.delayable: 453 # add to regular client set 454 self._clients.add(client) 455 else: 456 # add to port set (non-delayable) 457 self._ports.add(client) 458 459 if self.running and self._can_register(client): 460 # in-fly add if running 461 self.register(client._start())
462 463
464 - def _remove(self, client, abort, did_timeout=False):
465 """Remove a client from engine (subroutine).""" 466 # be careful to also remove ports when engine has not started yet 467 if client.registered or not client.delayable: 468 if client.registered: 469 self.unregister(client) 470 # care should be taken to ensure correct closing flags 471 client._close(abort=abort, timeout=did_timeout)
472
473 - def remove(self, client, abort=False, did_timeout=False):
474 """ 475 Remove a client from engine. Does NOT aim to flush individual stream 476 read buffers. 477 """ 478 self._debug("REMOVE %s" % client) 479 if client.delayable: 480 self._clients.remove(client) 481 else: 482 self._ports.remove(client) 483 self._remove(client, abort, did_timeout) 484 # we just removed a client, so start pending client(s) 485 self.start_clients()
486
487 - def remove_stream(self, client, stream):
488 """ 489 Regular way to remove a client stream from engine, performing 490 needed read flush as needed. If no more retainable stream 491 remains for this client, this method automatically removes the 492 entire client from engine. 493 494 This function does nothing if the stream is not registered. 495 """ 496 if stream.fd not in self.reg_clifds: 497 LOGGER.debug("remove_stream: %s not registered", stream) 498 return 499 500 self.unregister_stream(client, stream) 501 502 # _close_stream() will flush pending read buffers so may generate events 503 client._close_stream(stream.name) 504 505 # client may have been removed by previous events, if not check whether 506 # some retained streams still remain 507 if client in self._clients and not client.streams.retained(): 508 self.remove(client)
509
510 - def clear(self, did_timeout=False, clear_ports=False):
511 """ 512 Remove all clients. Does not flush read buffers. 513 Subclasses that override this method should call base class method. 514 """ 515 all_clients = [self._clients] 516 if clear_ports: 517 all_clients.append(self._ports) 518 519 for clients in all_clients: 520 while len(clients) > 0: 521 client = clients.pop() 522 self._remove(client, True, did_timeout)
523
524 - def register(self, client):
525 """ 526 Register an engine client. Subclasses that override this method 527 should call base class method. 528 """ 529 assert client in self._clients or client in self._ports 530 assert not client.registered 531 532 self._debug("REG %s (%s)(autoclose=%s)" % \ 533 (client.__class__.__name__, client.streams, 534 client.autoclose)) 535 536 client.registered = True 537 client._reg_epoch = self._current_loopcnt 538 539 if client.delayable: 540 self._update_reg_stats(client, 1) 541 542 # set interest event bits... 543 for streams, ievent in ((client.streams.active_readers, E_READ), 544 (client.streams.active_writers, E_WRITE)): 545 for stream in streams(): 546 self.reg_clifds[stream.fd] = client, stream 547 stream.events |= ievent 548 if not client.autoclose: 549 self.evlooprefcnt += 1 550 self._register_specific(stream.fd, ievent) 551 552 # start timeout timer 553 self.timerq.schedule(client)
554
555 - def unregister_stream(self, client, stream):
556 """Unregister a stream from a client.""" 557 self._debug("UNREG_STREAM stream=%s" % stream) 558 assert stream is not None and stream.fd is not None 559 assert stream.fd in self.reg_clifds, \ 560 "stream fd %d not registered" % stream.fd 561 assert client.registered 562 self._unregister_specific(stream.fd, stream.events & stream.evmask) 563 self._debug("UNREG_STREAM unregistering stream fd %d (%d)" % \ 564 (stream.fd, len(client.streams))) 565 stream.events &= ~stream.evmask 566 del self.reg_clifds[stream.fd] 567 if not client.autoclose: 568 self.evlooprefcnt -= 1
569
570 - def unregister(self, client):
571 """Unregister a client""" 572 # sanity check 573 assert client.registered 574 self._debug("UNREG %s (%s)" % (client.__class__.__name__, \ 575 client.streams)) 576 577 # remove timeout timer 578 self.timerq.invalidate(client) 579 580 # clear interest events... 581 for streams, ievent in ((client.streams.active_readers, E_READ), 582 (client.streams.active_writers, E_WRITE)): 583 for stream in streams(): 584 if stream.fd in self.reg_clifds: 585 self._unregister_specific(stream.fd, stream.events & ievent) 586 stream.events &= ~ievent 587 del self.reg_clifds[stream.fd] 588 if not client.autoclose: 589 self.evlooprefcnt -= 1 590 591 client.registered = False 592 if client.delayable: 593 self._update_reg_stats(client, -1)
594
595 - def modify(self, client, sname, setmask, clearmask):
596 """Modify the next loop interest events bitset for a client stream.""" 597 self._debug("MODEV set:0x%x clear:0x%x %s (%s)" % (setmask, clearmask, 598 client, sname)) 599 stream = client.streams[sname] 600 stream.new_events &= ~clearmask 601 stream.new_events |= setmask 602 603 if self._current_stream is not stream: 604 # modifying a non processing stream, apply new_events now 605 self.set_events(client, stream)
606
607 - def _register_specific(self, fd, event):
608 """Engine-specific register fd for event method.""" 609 raise NotImplementedError("Derived classes must implement.")
610
611 - def _unregister_specific(self, fd, ev_is_set):
612 """Engine-specific unregister fd method.""" 613 raise NotImplementedError("Derived classes must implement.")
614
615 - def _modify_specific(self, fd, event, setvalue):
616 """Engine-specific modify fd for event method.""" 617 raise NotImplementedError("Derived classes must implement.")
618
619 - def set_events(self, client, stream):
620 """Set the active interest events bitset for a client stream.""" 621 self._debug("SETEV new_events:0x%x events:0x%x for %s[%s]" % \ 622 (stream.new_events, stream.events, client, stream.name)) 623 624 if not client.registered: 625 LOGGER.debug("set_events: client %s not registered", self) 626 return 627 628 chgbits = stream.new_events ^ stream.events 629 if chgbits == 0: 630 return 631 632 # configure interest events as appropriate 633 for interest in (E_READ, E_WRITE): 634 if chgbits & interest: 635 assert stream.evmask & interest 636 status = stream.new_events & interest 637 self._modify_specific(stream.fd, interest, status) 638 if status: 639 stream.events |= interest 640 else: 641 stream.events &= ~interest 642 643 stream.new_events = stream.events
644
645 - def set_reading(self, client, sname):
646 """Set client reading state.""" 647 # listen for readable events 648 self.modify(client, sname, E_READ, 0)
649
650 - def set_writing(self, client, sname):
651 """Set client writing state.""" 652 # listen for writable events 653 self.modify(client, sname, E_WRITE, 0)
654
655 - def add_timer(self, timer):
656 """Add a timer instance to engine.""" 657 timer._set_engine(self) 658 self.timerq.schedule(timer)
659
660 - def remove_timer(self, timer):
661 """Remove engine timer from engine.""" 662 self.timerq.invalidate(timer)
663
664 - def fire_timers(self):
665 """Fire expired timers for processing.""" 666 # Only fire timers if runloop is still retained 667 if self.evlooprefcnt > 0: 668 # Fire once any expired timers 669 self.timerq.fire_expired()
670
671 - def start_ports(self):
672 """Start and register all port clients.""" 673 # Ports are special, non-delayable engine clients 674 for port in self._ports: 675 if not port.registered: 676 self._debug("START PORT %s" % port) 677 self.register(port)
678
679 - def start_clients(self):
680 """Start and register regular engine clients in respect of fanout.""" 681 # check if engine fanout has changed 682 # NOTE: worker._fanout live changes not supported (see #323) 683 fanout_diff = self.info['fanout'] - self._prev_fanout 684 if fanout_diff: 685 self._prev_fanout = self.info['fanout'] 686 687 for client in self._clients: 688 if not client.registered and self._can_register(client): 689 self._debug("START CLIENT %s" % client.__class__.__name__) 690 self.register(client._start()) 691 # if first time or engine fanout has changed, we do a full scan 692 if fanout_diff == 0: 693 # if engine fanout has not changed, we only start 1 client 694 break
695
696 - def run(self, timeout):
697 """Run engine in calling thread.""" 698 # change to running state 699 if self.running: 700 raise EngineAlreadyRunningError() 701 702 # note: try-except-finally not supported before python 2.5 703 try: 704 self.running = True 705 try: 706 # start port clients 707 self.start_ports() 708 # peek in ports for early pending messages 709 self.snoop_ports() 710 # start all other clients 711 self.start_clients() 712 # run loop until all clients and timers are removed 713 self.runloop(timeout) 714 except EngineTimeoutException: 715 self.clear(did_timeout=True) 716 raise 717 except: # MUST use BaseException as soon as possible (py2.5+) 718 # The game is over. 719 exc_t, exc_val, exc_tb = sys.exc_info() 720 try: 721 # Close Engine clients 722 self.clear() 723 except: 724 # self.clear() may still generate termination events that 725 # may raises exceptions, overriding the other one above. 726 # In the future, we should block new user events to avoid 727 # that. Also, such cases could be better handled with 728 # BaseException. For now, print a backtrace in debug to 729 # help detect the problem. 730 tbexc = traceback.format_exception(exc_t, exc_val, exc_tb) 731 LOGGER.debug(''.join(tbexc)) 732 raise 733 raise 734 finally: 735 # cleanup 736 self.timerq.clear() 737 self.running = False 738 self._prev_fanout = 0
739
740 - def snoop_ports(self):
741 """ 742 Peek in ports for possible early pending messages. 743 This method simply tries to read port pipes in non-blocking mode. 744 """ 745 # make a copy so that early messages on installed ports may 746 # lead to new ports 747 ports = self._ports.copy() 748 for port in ports: 749 try: 750 port._handle_read('in') 751 except (IOError, OSError), ex: 752 if ex.errno in (errno.EAGAIN, errno.EWOULDBLOCK): 753 # no pending message 754 return 755 # raise any other error 756 raise
757
758 - def runloop(self, timeout):
759 """Engine specific run loop. Derived classes must implement.""" 760 raise NotImplementedError("Derived classes must implement.")
761
762 - def abort(self, kill):
763 """Abort runloop.""" 764 if self.running: 765 raise EngineAbortException(kill) 766 767 self.clear(clear_ports=kill)
768
769 - def exited(self):
770 """Returns True if the engine has exited the runloop once.""" 771 return not self.running and self._exited
772
773 - def _debug(self, s):
774 """library engine verbose debugging hook""" 775 #LOGGER.debug(s) 776 pass
777