Source code for core.dispatch

# core/dispatch.py
#
#

""" basic package for the program. """

__copyright__ = "Copyright 2014 B.H.J Thate"

## IMPORTS

from core.utils import get_funcname, get_clsname, get_name, get_strace, get_how
from core import Object

import collections
import threading
import logging
import queue
import time

## RUNNER

[docs]class Runner(threading.Thread): """ the working unit of CORELIB. """ nr = 0 def __init__(zelf, *args, **kwargs): threading.Thread.__init__(zelf, None, zelf._loop, "Runner-%s" % str(zelf.nr), args, kwargs) zelf.setDaemon(True) zelf._queue = queue.Queue() zelf._outqueue = queue.Queue() zelf._ready = threading.Event() zelf._state = Object() zelf._state.boot = time.time() zelf._state.output = time.time() zelf._state.input = time.time() zelf._status = Object() zelf._status.status = "starting" zelf._status.cmnd = "none" zelf.nr += 1 def _loop(zelf): """ loop that executes the (function, object) pairs. """ zelf._status.status = "running" while zelf._status.status: args, kwargs = zelf._queue.get() func = args[0] try: obj = args[1] except: obj = Object() if not zelf._status.status: obj.ready() ; break zelf._status.cmnd = get_how(func) zelf._status.status = zelf._status.cmnd zelf._state.input = time.time() logging.info("func %s" % get_how(func)) func(*args[1:], **kwargs) obj.ready() time.sleep(0.01) zelf.ready() _thread.interrupt_main() ## WAITERS
[docs] def ready(zelf): """ signal to ready state. """ logging.debug("ready %s %s" % (get_name(zelf), get_strace())) zelf._ready.set()
[docs] def clear(zelf): """ clear the ready state. """ logging.debug("clear %s %s" % (get_name(zelf), get_strace())) zelf._ready.clear()
[docs] def wait(zelf, sec=180.0): """ wait for ready state. """ logging.debug("wait %s %s" % (get_name(zelf), get_strace())) try: zelf._ready.wait(sec) except: pass
[docs] def exit(zelf, *args, **kwargs): """ stop the Runner. """ zelf._state.status = ""
[docs] def put(zelf, *args, **kwargs): """ put arguments/kwargs to the Runner. """ zelf._queue.put_nowait((args, kwargs))
[docs] def get_name(zelf, *args, **kwargs): return "%s/%s" % (zelf.name, zelf._status.cmnd) ## DISPATCH
[docs]class Dispatcher(Object): """ the dispatcher delegates the workload to the Runners. Runner gets instantiated when needed. """ def __init__(zelf, *args, **kwargs): Object.__init__(zelf, *args, **kwargs) zelf._runners = collections.deque() zelf._state = Object() zelf._state.boot = time.time() zelf._state.input = time.time() zelf._state.output = time.time() zelf._status.status = "starting" zelf._status.cmnd = "" zelf._max = 50
[docs] def is_alive(zelf, *args, **kwargs): """ whether any Runners running. """ for runner in zelf._runners: if runner.is_alive(): return True return False
[docs] def register(zelf, *args, **kwargs): """ callback type with corresponding callback function. """ o = Object() o.cbtype = args[0] o.func = args[1] if o.cbtype not in zelf: zelf[o.cbtype] = [] zelf[o.cbtype].append(o)
[docs] def denied(zelf, *args, **kwargs): for obj in zelf.objects("denied"): if args[0] == obj.get("denied", None): return True return False
[docs] def allowed(zelf, *args, **kwargs): for obj in zelf.objects("allowed"): if args[0] == obj.get("allowed", None): return True return False
[docs] def dispatch(zelf, *args, **kwargs): """ dispatch an event onto a Runner. """ event = args[0] return zelf.execute(event.get_cmnd(), event)
[docs] def execute(zelf, *args, **kwargs): """ execute a command, event pair. """ cmnd = args[0] event = args[1] if not cmnd or cmnd not in zelf: event.ready() ; return event if "origin" in event: if zelf.allowed(event.origin): logging.warn("! allow %s" % event.origin) elif zelf.denied(event.origin): logging.warn("! deny %s" % event.origin) return logging.info("> exec %s" % cmnd) for functor in zelf[cmnd]: zelf.put(functor.func, event) return event
[docs] def run_func(zelf, *args, **kwargs): """ execute command/event pair. """ cmnd = args[0] event = args[1] if cmnd not in zelf: return for functor in zelf[cmnd]: functor.func(event) return event
[docs] def exit(zelf, name=None): """ stop the Runners. """ for runner in zelf._runners: if name and name not in runner.name: continue runner.join(0.1)
[docs] def put(zelf, *args, **kwargs): """ put load to the Runner. """ zelf.cleanup() target = zelf.make_new() target.put(*args, **kwargs) return target
[docs] def make_new(zelf, *args, **kwargs): """ create a Runner. """ if len(zelf._runners) < zelf._max: runner = Runner(*args, **kwargs) zelf._runners.append(runner) runner.start() else: runner = random.choice(zelf._runners) return runner
[docs] def cleanup(zelf, dojoin=False): """ remove idle Runners. """ todo = [] for runner in zelf._runners: if not runner._status.status or not runner._queue.qsize(): todo.append(runner) for runner in todo: runner.exit() ; zelf._runners.remove(runner)