# core/dispatcher.py
#
#
""" basic package for the program. """
__copyright__ = "Copyright 2015, B.H.J Thate"
## IMPORTS
from core.utils.trace import get_plugname, get_strace
from core.utils.parse import txt_parse
from core.utils.name import named
from core.errors import error
from core.errors import NoFunction, NoEvent, RemoteDisconnect
from core.thing import Thing
import collections
import threading
import logging
import random
import queue
import time
## DEFINES
threads = []
## RUNNER
[docs]class Task(threading.Thread):
""" the working unit of CORE. """
def __init__(zelf, *args, **kwargs):
threading.Thread.__init__(zelf, *args, **kwargs)
zelf._queue = queue.Queue()
def __iter__(zelf):
for x in dir(zelf): yield x
[docs] def run(zelf, *args, **kwargs):
""" loop that executes the (function, ding) pairs. """
zelf.type = "Task"
zelf._start = time.time()
zelf._last = time.time()
zelf._status = "starting"
while zelf._status:
arglist, kwargdict = zelf._queue.get()
if not arglist or not arglist[0]: break
zelf.func, *arguments = arglist
name = named(zelf.func)
zelf.setName(name)
logging.info("^ task %s" % name)
zelf._status = "running"
try: zelf.func(*arguments, **kwargdict)
except: error("", name) ; break
if arguments: logging.debug(arguments[0])
if zelf._queue.empty(): break
[docs] def stop(zelf, *args, **kwargs):
logging.warn("stop %s" % zelf.getName())
zelf._status = ""
zelf.put((None, {}))
[docs] def put(zelf, *args, **kwargs):
""" put arguments/kwargs to the Task. """
zelf._last = time.time()
zelf._queue.put((args, kwargs))
return zelf
## DISPATCH
[docs]class Dispatcher(Thing):
""" the dispatcher delegates the workload to the Runners. Runner gets instantiated when needed. """
cc = "!"
default = ""
def __init__(zelf, *args, **kwargs):
Thing.__init__(zelf, *args, **kwargs)
zelf._input = queue.Queue()
zelf._output = queue.Queue()
zelf._ready = threading.Event()
## VIRTUAL
[docs] def event(zelf, *args, **kwargs): pass
## PROCES
[docs] def start(zelf, *args, **kwargs):
logging.warn("# start %s" % zelf.type)
zelf._status = "running"
while zelf._status:
try:
event = zelf.event()
logging.debug("! event %s" % event.json())
logging.debug("! target %s" % event._target)
if not zelf._status: break
if not event: continue
if "txt" in event: event.parsed = txt_parse(zelf, event.txt)
else: event.parsed = txt_parse(zelf, "")
zelf.dispatch(event)
except RemoteDisconnect: time.sleep(5.0) ; continue
except (EOFError, KeyboardInterrupt): raise
except: error()
logging.warn("# stop %s/%s" % zelf.type, event.cmnd)
[docs] def stop(zelf, *args, **kwargs):
name = args[0]
for thr in threading.enumerate():
if name in str(thr):
try: thr.cancel()
except AttributeError: thr.join()
logging.warn("stop %s" % str(thr))
return True
[docs] def exit(zelf, *args, **kwargs):
zelf._status = ""
zelf.put(None)
## INPUT/OUTPUT
[docs] def put(zelf, *args, **kwargs):
task = Task()
task.start()
return task.put(*args, **kwargs)
## RESULTS
[docs] def collect(zelf, *args, **kwargs):
result = args[0]
for thr in result:
if thr: thr.join()
## DISPATCHING
[docs] def single(zelf, *args, **kwargs):
if not args[0]: return
event = Thing()
event._target = zelf
event.origin = "core@shell"
event.txt = args[0]
event.parsed = txt_parse(zelf, event.txt)
return zelf.dispatch(event)
[docs] def dispatch(zelf, *args, **kwargs):
event = args[0]
if "txt" not in event: return
if "parsed" not in event: event.parsed = txt_parse(zelf, event.txt)
cmnds = zelf.find_cmnd(event.parsed.cmnd)
thrs = []
for cmnd in cmnds:
try: o = zelf.put(cmnd.func, event) ; thrs.append(o)
except AttributeError: continue
except: error()
if thrs: zelf.collect(thrs)
zelf.ready()
[docs] def find_cmnd(zelf, *args, **kwargs):
from core.kernel import kernel
cmnd = args[0]
result = []
cmnds = kernel.find(cmnd)
if not cmnds:
try: cmnds = kernel[cmnd]
except KeyError: cmnds = []
return cmnds