Source code for meds.engine
# meds/engine
#
#
"""
An engine is a epoll driven event handler.
"""
from meds.utils.trace import get_exception, get_strace
from meds.scheduler import Scheduler
from meds.errors import EDISCONNECT, ERESUME
from meds.utils.name import sname, naam
from meds.object import Object
import threading
import logging
import select
import time
import sys
[docs]class Engine(Scheduler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._connected = Object()
self._poll = select.epoll()
self._resume = Object()
self._status = "running"
self._starttime = time.time()
thr = self.launch(self._start, name="%s.engine" % sname(self), daemon=True)
self._thrs.append(thr)
[docs] def _start(self):
self._connected.wait()
while self._status:
for event in self.events():
self.put(event)
[docs] def events(self):
self._poll.poll()
try: yield self.event()
except (ConnectionResetError, BrokenPipeError, EDISCONNECT) as ex:
self.connect()
[docs] def register_fd(self, f):
try: fd = f.fileno()
except: fd = f
logging.warn("# engine on %s" % str(fd))
self._poll.register(fd, select.EPOLLET)
self._resume.fd = fd
return fd
[docs] def resume(self):
if not self._resume.fd: raise ERESUME
logging.info("! resume on %s" % self._resume.fd)
self._poll = select.epoll.fromfd(self._resume.fd)
[docs] def stop(self):
self._status = ""
self._poll.close()
super().stop()