Source code for mads.engine

# mads/engine.py
#
#

""" 
    a engine is select.epoll event loop, easily interrup_table
    esp. versus a blocking event loop.

 """

from mads.trace import get_exception
from mads.name import sname

from mads.launcher import Launcher
from mads.register import Register
from mads.handler import Handler
from mads.object import Object

import mads.errors

import multiprocessing
import collections
import logging
import select
import queue
import time

[docs]class Engine(Handler, Launcher): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._poll = select.epoll() self._resume = Object() self._state.status = "running" self._times.start = time.time() self._connected.clear()
[docs] def dispatch(self, event): event.handle()
[docs] def select(self): while self._state.status: for event in self.events(): if not event: break try: self.dispatch(event) except: logging.error(get_exception()) self._times.last = time.time()
[docs] def events(self): res = self._poll.poll() try: yield self.event() except (ConnectionResetError, BrokenPipeError, mads.errors.EDISCONNECT) as ex: self.connect()
[docs] def register_fd(self, f): try: fd = f.fileno() except: fd = f logging.warn("# engine on %s" % str(fd)) try: self._poll.register(fd, select.EPOLLET) except: logging.error(get_exception()) self._resume.fd = fd return fd
[docs] def resume(self): if not self._resume.fd: raise mads.errors.ERESUME logging.info("# resume on %s" % self._resume.fd) self._poll = select.epoll.fromfd(self._resume.fd)
[docs] def start(self, *args, **kwargs): self.launch(self.select)
[docs] def stop(self): self._status = "" self._poll.close() super().stop()