Source code for meds.engine

# meds/engine
#
#

"""
    An engine is a epoll driven event handler.

"""

from meds.utils.trace import get_exception, get_strace
from meds.scheduler import Scheduler
from meds.errors import EDISCONNECT, ERESUME
from meds.utils.name import sname, naam
from meds.object import Object

import threading
import logging
import select
import time
import sys

[docs]class Engine(Scheduler): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._connected = Object() self._poll = select.epoll() self._resume = Object() self._status = "running" self._starttime = time.time() thr = self.launch(self._start, name="%s.engine" % sname(self), daemon=True) self._thrs.append(thr)
[docs] def _start(self): self._connected.wait() while self._status: for event in self.events(): self.put(event)
[docs] def events(self): self._poll.poll() try: yield self.event() except (ConnectionResetError, BrokenPipeError, EDISCONNECT) as ex: self.connect()
[docs] def event(self): pass
[docs] def register_fd(self, f): try: fd = f.fileno() except: fd = f logging.warn("# engine on %s" % str(fd)) self._poll.register(fd, select.EPOLLET) self._resume.fd = fd return fd
[docs] def resume(self): if not self._resume.fd: raise ERESUME logging.info("! resume on %s" % self._resume.fd) self._poll = select.epoll.fromfd(self._resume.fd)
[docs] def stop(self): self._status = "" self._poll.close() super().stop()