1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """
22 Interface of underlying Task's Engine.
23
24 An Engine implements a loop your thread enters and uses to call event handlers
25 in response to incoming events (from workers, timers, etc.).
26 """
27
28 import errno
29 import heapq
30 import logging
31 import sys
32 import time
33 import traceback
34
35
36 LOGGER = logging.getLogger(__name__)
37
38
39 E_READ = 0x1
40 E_WRITE = 0x2
41
42
43 EPSILON = 1.0e-3
44
45
46 FANOUT_UNLIMITED = -1
47
48 FANOUT_DEFAULT = None
49
50
52 """
53 Base engine exception.
54 """
55
57 """
58 Raised on user abort.
59 """
63
65 """
66 Raised when a timeout is encountered.
67 """
68
70 """
71 Error raised when an illegal operation has been performed.
72 """
73
75 """
76 Error raised when the engine is already running.
77 """
78
80 """
81 Error raised when the engine mechanism is not supported.
82 """
86
87
89 """
90 Abstract class for ClusterShell's engine timer. Such a timer
91 requires a relative fire time (delay) in seconds (as float), and
92 supports an optional repeating interval in seconds (as float too).
93
94 See EngineTimer for more information about ClusterShell timers.
95 """
96
97 - def __init__(self, fire_delay, interval=-1.0, autoclose=False):
98 """
99 Create a base timer.
100 """
101 self.fire_delay = fire_delay
102 self.interval = interval
103 self.autoclose = autoclose
104 self._engine = None
105 self._timercase = None
106
108 """
109 Bind to engine, called by Engine.
110 """
111 if self._engine:
112
113 raise EngineIllegalOperationError("Already bound to engine.")
114
115 self._engine = engine
116
118 """
119 Invalidates a timer object, stopping it from ever firing again.
120 """
121 if self._engine:
122 self._engine.timerq.invalidate(self)
123 self._engine = None
124
126 """
127 Returns a boolean value that indicates whether an EngineTimer
128 object is valid and able to fire.
129 """
130 return self._engine is not None
131
133 """
134 Set the next firing delay in seconds for an EngineTimer object.
135
136 The optional paramater `interval' sets the firing interval
137 of the timer. If not specified, the timer fires once and then
138 is automatically invalidated.
139
140 Time values are expressed in second using floating point
141 values. Precision is implementation (and system) dependent.
142
143 It is safe to call this method from the task owning this
144 timer object, in any event handlers, anywhere.
145
146 However, resetting a timer's next firing time may be a
147 relatively expensive operation. It is more efficient to let
148 timers autorepeat or to use this method from the timer's own
149 event handler callback (ie. from its ev_timer).
150 """
151 if not self.is_valid():
152 raise EngineIllegalOperationError("Operation on invalid timer.")
153
154 self.fire_delay = fire_delay
155 self.interval = interval
156 self._engine.timerq.reschedule(self)
157
159 raise NotImplementedError("Derived classes must implement.")
160
161
163 """
164 Concrete class EngineTimer
165
166 An EngineTimer object represents a timer bound to an engine that
167 fires at a preset time in the future. Timers can fire either only
168 once or repeatedly at fixed time intervals. Repeating timers can
169 also have their next firing time manually adjusted.
170
171 A timer is not a real-time mechanism; it fires when the task's
172 underlying engine to which the timer has been added is running and
173 able to check if the timer's firing time has passed.
174 """
175
176 - def __init__(self, fire_delay, interval, autoclose, handler):
177 EngineBaseTimer.__init__(self, fire_delay, interval, autoclose)
178 self.eh = handler
179 assert self.eh is not None, "An event handler is needed for timer."
180
183
185
187 """
188 Helper class that allows comparisons of fire times, to be easily used
189 in an heapq.
190 """
192 self.client = client
193 self.client._timercase = self
194
195 assert self.client.fire_delay > -EPSILON
196 self.fire_date = self.client.fire_delay + time.time()
197
199 return cmp(self.fire_date, other.fire_date)
200
201 - def arm(self, client):
202 assert client is not None
203 self.client = client
204 self.client._timercase = self
205
206 time_current = time.time()
207 if self.client.fire_delay > -EPSILON:
208 self.fire_date = self.client.fire_delay + time_current
209 else:
210 interval = float(self.client.interval)
211 assert interval > 0
212
213
214
215 self.fire_date += interval
216
217
218 if self.fire_date < time_current:
219 LOGGER.debug("Warning: passed interval time for %r "
220 "(long running event handler?)", self.client)
221
223 client = self.client
224 client._timercase = None
225 self.client = None
226 return client
227
229 return self.client is not None
230
231
233 """
234 Initializer.
235 """
236 self._engine = engine
237 self.timers = []
238 self.armed_count = 0
239
241 """
242 Return the number of active timers.
243 """
244 return self.armed_count
245
247 """
248 Insert and arm a client's timer.
249 """
250
251 if client.fire_delay > -EPSILON:
252 heapq.heappush(self.timers, _EngineTimerQ._EngineTimerCase(client))
253 self.armed_count += 1
254 if not client.autoclose:
255 self._engine.evlooprefcnt += 1
256
265
267 """
268 Invalidate client's timer. Current implementation doesn't really remove
269 the timer, but simply flags it as disarmed.
270 """
271 if not client._timercase:
272
273 client.fire_delay = -1.0
274 client.interval = -1.0
275 return
276
277 if self.armed_count <= 0:
278 raise ValueError, "Engine client timer not found in timer queue"
279
280 client._timercase.disarm()
281 self.armed_count -= 1
282 if not client.autoclose:
283 self._engine.evlooprefcnt -= 1
284
286 """
287 Dequeue disarmed timers (sort of garbage collection).
288 """
289 while len(self.timers) > 0 and not self.timers[0].armed():
290 heapq.heappop(self.timers)
291
293 """
294 Remove expired timers from the queue and fire associated clients.
295 """
296 self._dequeue_disarmed()
297
298
299
300 expired_timercases = []
301 now = time.time()
302 while self.timers and (self.timers[0].fire_date - now) <= EPSILON:
303 expired_timercases.append(heapq.heappop(self.timers))
304 self._dequeue_disarmed()
305
306 for timercase in expired_timercases:
307
308
309 if not timercase.armed():
310 continue
311
312
313 client = timercase.disarm()
314
315
316 client.fire_delay = -1.0
317 client._fire()
318
319
320 if client.fire_delay >= -EPSILON or client.interval > EPSILON:
321 timercase.arm(client)
322 heapq.heappush(self.timers, timercase)
323 else:
324 self.armed_count -= 1
325 if not client.autoclose:
326 self._engine.evlooprefcnt -= 1
327
329 """
330 Return next timer fire delay (relative time).
331 """
332 self._dequeue_disarmed()
333 if len(self.timers) > 0:
334 return max(0., self.timers[0].fire_date - time.time())
335
336 return -1
337
339 """
340 Stop and clear all timers.
341 """
342 for timer in self.timers:
343 if timer.armed():
344 timer.client.invalidate()
345
346 self.timers = []
347 self.armed_count = 0
348
349
351 """
352 Base class for ClusterShell Engines.
353
354 Subclasses have to implement a runloop listening for client events.
355 Subclasses that override other than "pure virtual methods" should call
356 corresponding base class methods.
357 """
358
359 identifier = "(none)"
360
362 """Initialize base class."""
363
364 self.info = info
365
366
367 self.info['engine'] = self.identifier
368
369
370 self._clients = set()
371 self._ports = set()
372
373
374
375 self._reg_stats = {}
376
377
378
379 self.reg_clifds = {}
380
381
382 self._prev_fanout = 0
383
384
385
386
387 self._current_loopcnt = 0
388
389
390 self._current_stream = None
391
392
393 self.timerq = _EngineTimerQ(self)
394
395
396
397 self.evlooprefcnt = 0
398
399
400 self.running = False
401
402 self._exited = False
403
405 """Release engine-specific resources."""
406 pass
407
409 """Get a copy of clients set."""
410 return self._clients.copy()
411
413 """
414 Get a copy of ports set.
415 """
416 return self._ports.copy()
417
419 client, stream = self.reg_clifds.get(fd, (None, None))
420 if client:
421 if client._reg_epoch < self._current_loopcnt:
422 return client, stream
423 else:
424 LOGGER.debug("_fd2client: ignoring just re-used FD %d",
425 stream.fd)
426 return (None, None)
427
429 assert not client.registered
430
431 if not client.delayable or client.worker._fanout == FANOUT_UNLIMITED:
432 return True
433 elif client.worker._fanout is FANOUT_DEFAULT:
434 return self._reg_stats.get('default', 0) < self.info['fanout']
435 else:
436 worker = client.worker
437 return self._reg_stats.get(worker, 0) < worker._fanout
438
440 if client.worker._fanout is FANOUT_DEFAULT:
441 key = 'default'
442 else:
443 key = client.worker
444 self._reg_stats.setdefault(key, 0)
445 self._reg_stats[key] += offset
446
447 - def add(self, client):
448 """Add a client to engine."""
449
450 client._set_engine(self)
451
452 if client.delayable:
453
454 self._clients.add(client)
455 else:
456
457 self._ports.add(client)
458
459 if self.running and self._can_register(client):
460
461 self.register(client._start())
462
463
464 - def _remove(self, client, abort, did_timeout=False):
465 """Remove a client from engine (subroutine)."""
466
467 if client.registered or not client.delayable:
468 if client.registered:
469 self.unregister(client)
470
471 client._close(abort=abort, timeout=did_timeout)
472
473 - def remove(self, client, abort=False, did_timeout=False):
474 """
475 Remove a client from engine. Does NOT aim to flush individual stream
476 read buffers.
477 """
478 self._debug("REMOVE %s" % client)
479 if client.delayable:
480 self._clients.remove(client)
481 else:
482 self._ports.remove(client)
483 self._remove(client, abort, did_timeout)
484
485 self.start_clients()
486
488 """
489 Regular way to remove a client stream from engine, performing
490 needed read flush as needed. If no more retainable stream
491 remains for this client, this method automatically removes the
492 entire client from engine.
493
494 This function does nothing if the stream is not registered.
495 """
496 if stream.fd not in self.reg_clifds:
497 LOGGER.debug("remove_stream: %s not registered", stream)
498 return
499
500 self.unregister_stream(client, stream)
501
502
503 client._close_stream(stream.name)
504
505
506
507 if client in self._clients and not client.streams.retained():
508 self.remove(client)
509
510 - def clear(self, did_timeout=False, clear_ports=False):
511 """
512 Remove all clients. Does not flush read buffers.
513 Subclasses that override this method should call base class method.
514 """
515 all_clients = [self._clients]
516 if clear_ports:
517 all_clients.append(self._ports)
518
519 for clients in all_clients:
520 while len(clients) > 0:
521 client = clients.pop()
522 self._remove(client, True, did_timeout)
523
525 """
526 Register an engine client. Subclasses that override this method
527 should call base class method.
528 """
529 assert client in self._clients or client in self._ports
530 assert not client.registered
531
532 self._debug("REG %s (%s)(autoclose=%s)" % \
533 (client.__class__.__name__, client.streams,
534 client.autoclose))
535
536 client.registered = True
537 client._reg_epoch = self._current_loopcnt
538
539 if client.delayable:
540 self._update_reg_stats(client, 1)
541
542
543 for streams, ievent in ((client.streams.active_readers, E_READ),
544 (client.streams.active_writers, E_WRITE)):
545 for stream in streams():
546 self.reg_clifds[stream.fd] = client, stream
547 stream.events |= ievent
548 if not client.autoclose:
549 self.evlooprefcnt += 1
550 self._register_specific(stream.fd, ievent)
551
552
553 self.timerq.schedule(client)
554
556 """Unregister a stream from a client."""
557 self._debug("UNREG_STREAM stream=%s" % stream)
558 assert stream is not None and stream.fd is not None
559 assert stream.fd in self.reg_clifds, \
560 "stream fd %d not registered" % stream.fd
561 assert client.registered
562 self._unregister_specific(stream.fd, stream.events & stream.evmask)
563 self._debug("UNREG_STREAM unregistering stream fd %d (%d)" % \
564 (stream.fd, len(client.streams)))
565 stream.events &= ~stream.evmask
566 del self.reg_clifds[stream.fd]
567 if not client.autoclose:
568 self.evlooprefcnt -= 1
569
571 """Unregister a client"""
572
573 assert client.registered
574 self._debug("UNREG %s (%s)" % (client.__class__.__name__, \
575 client.streams))
576
577
578 self.timerq.invalidate(client)
579
580
581 for streams, ievent in ((client.streams.active_readers, E_READ),
582 (client.streams.active_writers, E_WRITE)):
583 for stream in streams():
584 if stream.fd in self.reg_clifds:
585 self._unregister_specific(stream.fd, stream.events & ievent)
586 stream.events &= ~ievent
587 del self.reg_clifds[stream.fd]
588 if not client.autoclose:
589 self.evlooprefcnt -= 1
590
591 client.registered = False
592 if client.delayable:
593 self._update_reg_stats(client, -1)
594
595 - def modify(self, client, sname, setmask, clearmask):
596 """Modify the next loop interest events bitset for a client stream."""
597 self._debug("MODEV set:0x%x clear:0x%x %s (%s)" % (setmask, clearmask,
598 client, sname))
599 stream = client.streams[sname]
600 stream.new_events &= ~clearmask
601 stream.new_events |= setmask
602
603 if self._current_stream is not stream:
604
605 self.set_events(client, stream)
606
608 """Engine-specific register fd for event method."""
609 raise NotImplementedError("Derived classes must implement.")
610
612 """Engine-specific unregister fd method."""
613 raise NotImplementedError("Derived classes must implement.")
614
616 """Engine-specific modify fd for event method."""
617 raise NotImplementedError("Derived classes must implement.")
618
620 """Set the active interest events bitset for a client stream."""
621 self._debug("SETEV new_events:0x%x events:0x%x for %s[%s]" % \
622 (stream.new_events, stream.events, client, stream.name))
623
624 if not client.registered:
625 LOGGER.debug("set_events: client %s not registered", self)
626 return
627
628 chgbits = stream.new_events ^ stream.events
629 if chgbits == 0:
630 return
631
632
633 for interest in (E_READ, E_WRITE):
634 if chgbits & interest:
635 assert stream.evmask & interest
636 status = stream.new_events & interest
637 self._modify_specific(stream.fd, interest, status)
638 if status:
639 stream.events |= interest
640 else:
641 stream.events &= ~interest
642
643 stream.new_events = stream.events
644
646 """Set client reading state."""
647
648 self.modify(client, sname, E_READ, 0)
649
651 """Set client writing state."""
652
653 self.modify(client, sname, E_WRITE, 0)
654
659
661 """Remove engine timer from engine."""
662 self.timerq.invalidate(timer)
663
665 """Fire expired timers for processing."""
666
667 if self.evlooprefcnt > 0:
668
669 self.timerq.fire_expired()
670
672 """Start and register all port clients."""
673
674 for port in self._ports:
675 if not port.registered:
676 self._debug("START PORT %s" % port)
677 self.register(port)
678
680 """Start and register regular engine clients in respect of fanout."""
681
682
683 fanout_diff = self.info['fanout'] - self._prev_fanout
684 if fanout_diff:
685 self._prev_fanout = self.info['fanout']
686
687 for client in self._clients:
688 if not client.registered and self._can_register(client):
689 self._debug("START CLIENT %s" % client.__class__.__name__)
690 self.register(client._start())
691
692 if fanout_diff == 0:
693
694 break
695
696 - def run(self, timeout):
739
741 """
742 Peek in ports for possible early pending messages.
743 This method simply tries to read port pipes in non-blocking mode.
744 """
745
746
747 ports = self._ports.copy()
748 for port in ports:
749 try:
750 port._handle_read('in')
751 except (IOError, OSError), ex:
752 if ex.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
753
754 return
755
756 raise
757
759 """Engine specific run loop. Derived classes must implement."""
760 raise NotImplementedError("Derived classes must implement.")
761
768
770 """Returns True if the engine has exited the runloop once."""
771 return not self.running and self._exited
772
774 """library engine verbose debugging hook"""
775
776 pass
777