Source code for kuai.backends.threaded
from collections import namedtuple, defaultdict
import threading
import queue
from atexit import register
from kuai.backends import WeakCallback, Singleton, singleton_object
@singleton_object
class ThreadedBackend(metaclass=Singleton):
handlers = defaultdict(set)
events = queue.Queue()
isRunning = False
def startThread(self):
self.__thread = threading.Thread(target=self.__engine)
self.__thread.daemon = True
self.__thread.start()
register(self.__stop)
def addHandler(self, event, callback, *args, **kwargs):
if not self.isRunning:
self.isRunning = True
self.startThread()
self.handlers[event].add(WeakCallback(callback))
def handleEvent(self, event, *args, **kwargs):
if not self.isRunning:
self.isRunning = True
self.startThread()
Event = namedtuple("Event", "name args kwargs")
self.events.put(Event(event, args, kwargs))
def __stop(self):
self.isRunning = False
self.__thread.join()
def __engine(self):
while self.isRunning:
try:
currentEvent = self.events.get(timeout=0.3)
except queue.Empty:
pass
else:
callbacks = self.handlers.get(currentEvent.name)
if callbacks is not None:
for callback in list(callbacks):
try:
callback(*currentEvent.args, **currentEvent.kwargs)
except Exception as e:
print("Kuai Exception in `threaded` backend : ", e)
raise
self.events.task_done()
[docs]def setup(app):
app.register_backend('threaded', ThreadedBackend)
test = ThreadedBackend()