Source code for mads.engine
# mads/engine.py
#
#
"""
a engine is select.epoll event loop, easily interrup_table
esp. versus a blocking event loop.
"""
from mads.trace import get_exception
from mads.name import sname
from mads.launcher import Launcher
from mads.register import Register
from mads.handler import Handler
from mads.object import Object
import mads.errors
import multiprocessing
import collections
import logging
import select
import queue
import time
[docs]class Engine(Handler, Launcher):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._poll = select.epoll()
self._resume = Object()
self._state.status = "running"
self._times.start = time.time()
self._connected.clear()
[docs] def dispatch(self, event):
event.handle()
[docs] def select(self):
while self._state.status:
for event in self.events():
if not event:
break
try:
self.dispatch(event)
except:
logging.error(get_exception())
self._times.last = time.time()
[docs] def events(self):
res = self._poll.poll()
try:
yield self.event()
except (ConnectionResetError,
BrokenPipeError,
mads.errors.EDISCONNECT) as ex:
self.connect()
[docs] def register_fd(self, f):
try:
fd = f.fileno()
except:
fd = f
logging.warn("# engine on %s" % str(fd))
try: self._poll.register(fd, select.EPOLLET)
except:
logging.error(get_exception())
self._resume.fd = fd
return fd
[docs] def resume(self):
if not self._resume.fd:
raise mads.errors.ERESUME
logging.info("# resume on %s" % self._resume.fd)
self._poll = select.epoll.fromfd(self._resume.fd)
[docs] def start(self, *args, **kwargs):
self.launch(self.select)
[docs] def stop(self):
self._status = ""
self._poll.close()
super().stop()