Source code for kuai.backends.priority

from collections import namedtuple, defaultdict

from kuai.backends import WeakCallback, Singleton, singleton_object


@singleton_object
class PriorityBackend(metaclass=Singleton):
    handlers = handlers = defaultdict(list)

    def addHandler(self, event, callback, priority=5):
        PriorityEvent = namedtuple("PriorityEvent", "priority callback")
        self.handlers[event].append(PriorityEvent(priority, WeakCallback(callback)))

    def handleEvent(self, event, *args, **kwargs):
        handlers = self.handlers.get(event)
        if handlers is None:
            return
        else:
            for handler in sorted(handlers, key=lambda t: t.priority):
                handler.callback(*args, **kwargs)


[docs]def setup(app): app.register_backend('priority', PriorityBackend)