# core/dispatch.py
#
#
""" basic package for the program. """
__copyright__ = "Copyright 2014 B.H.J Thate"
## IMPORTS
from core.utils import get_funcname, get_clsname, get_name, get_strace, get_how
from core import Object
import collections
import threading
import logging
import queue
import time
## RUNNER
[docs]class Runner(threading.Thread):
""" the working unit of CORELIB. """
nr = 0
def __init__(zelf, *args, **kwargs):
threading.Thread.__init__(zelf, None, zelf._loop, "Runner-%s" % str(zelf.nr), args, kwargs)
zelf.setDaemon(True)
zelf._queue = queue.Queue()
zelf._outqueue = queue.Queue()
zelf._ready = threading.Event()
zelf._state = Object()
zelf._state.boot = time.time()
zelf._state.output = time.time()
zelf._state.input = time.time()
zelf._status = Object()
zelf._status.status = "starting"
zelf._status.cmnd = "none"
zelf.nr += 1
def _loop(zelf):
""" loop that executes the (function, object) pairs. """
zelf._status.status = "running"
while zelf._status.status:
args, kwargs = zelf._queue.get()
func = args[0]
try: obj = args[1]
except: obj = Object()
if not zelf._status.status: obj.ready() ; break
zelf._status.cmnd = get_how(func)
zelf._status.status = zelf._status.cmnd
zelf._state.input = time.time()
logging.info("func %s" % get_how(func))
func(*args[1:], **kwargs)
obj.ready()
time.sleep(0.01)
zelf.ready()
_thread.interrupt_main()
## WAITERS
[docs] def ready(zelf):
""" signal to ready state. """
logging.debug("ready %s %s" % (get_name(zelf), get_strace()))
zelf._ready.set()
[docs] def clear(zelf):
""" clear the ready state. """
logging.debug("clear %s %s" % (get_name(zelf), get_strace()))
zelf._ready.clear()
[docs] def wait(zelf, sec=180.0):
""" wait for ready state. """
logging.debug("wait %s %s" % (get_name(zelf), get_strace()))
try: zelf._ready.wait(sec)
except: pass
[docs] def exit(zelf, *args, **kwargs):
""" stop the Runner. """
zelf._state.status = ""
[docs] def put(zelf, *args, **kwargs):
""" put arguments/kwargs to the Runner. """
zelf._queue.put_nowait((args, kwargs))
[docs] def get_name(zelf, *args, **kwargs): return "%s/%s" % (zelf.name, zelf._status.cmnd)
## DISPATCH
[docs]class Dispatcher(Object):
""" the dispatcher delegates the workload to the Runners. Runner gets instantiated when needed. """
def __init__(zelf, *args, **kwargs):
Object.__init__(zelf, *args, **kwargs)
zelf._runners = collections.deque()
zelf._state = Object()
zelf._state.boot = time.time()
zelf._state.input = time.time()
zelf._state.output = time.time()
zelf._status.status = "starting"
zelf._status.cmnd = ""
zelf._max = 50
[docs] def is_alive(zelf, *args, **kwargs):
""" whether any Runners running. """
for runner in zelf._runners:
if runner.is_alive(): return True
return False
[docs] def register(zelf, *args, **kwargs):
""" callback type with corresponding callback function. """
o = Object()
o.cbtype = args[0]
o.func = args[1]
if o.cbtype not in zelf: zelf[o.cbtype] = []
zelf[o.cbtype].append(o)
[docs] def denied(zelf, *args, **kwargs):
for obj in zelf.objects("denied"):
if args[0] == obj.get("denied", None): return True
return False
[docs] def allowed(zelf, *args, **kwargs):
for obj in zelf.objects("allowed"):
if args[0] == obj.get("allowed", None): return True
return False
[docs] def dispatch(zelf, *args, **kwargs):
""" dispatch an event onto a Runner. """
event = args[0]
return zelf.execute(event.get_cmnd(), event)
[docs] def execute(zelf, *args, **kwargs):
""" execute a command, event pair. """
cmnd = args[0]
event = args[1]
if not cmnd or cmnd not in zelf: event.ready() ; return event
if "origin" in event:
if zelf.allowed(event.origin):
logging.warn("! allow %s" % event.origin)
elif zelf.denied(event.origin):
logging.warn("! deny %s" % event.origin)
return
logging.info("> exec %s" % cmnd)
for functor in zelf[cmnd]: zelf.put(functor.func, event)
return event
[docs] def run_func(zelf, *args, **kwargs):
""" execute command/event pair. """
cmnd = args[0]
event = args[1]
if cmnd not in zelf: return
for functor in zelf[cmnd]: functor.func(event)
return event
[docs] def exit(zelf, name=None):
""" stop the Runners. """
for runner in zelf._runners:
if name and name not in runner.name: continue
runner.join(0.1)
[docs] def put(zelf, *args, **kwargs):
""" put load to the Runner. """
zelf.cleanup()
target = zelf.make_new()
target.put(*args, **kwargs)
return target
[docs] def make_new(zelf, *args, **kwargs):
""" create a Runner. """
if len(zelf._runners) < zelf._max:
runner = Runner(*args, **kwargs)
zelf._runners.append(runner)
runner.start()
else: runner = random.choice(zelf._runners)
return runner
[docs] def cleanup(zelf, dojoin=False):
""" remove idle Runners. """
todo = []
for runner in zelf._runners:
if not runner._status.status or not runner._queue.qsize(): todo.append(runner)
for runner in todo: runner.exit() ; zelf._runners.remove(runner)