# zot/dispatcher.py
#
#
""" basic package for the program. """
__copyright__ = "Copyright 2015, B.H.J Thate"
## IMPORTS
from zot.utils import get_plugname, get_named, get_strace, error
from zot.errors import NoFunction, NoEvent, RemoteDisconnect
from zot.object import Object
import collections
import threading
import logging
import threading
import random
import queue
import time
## DEFINES
threads = []
## RUNNER
[docs]class RUNNER(threading.Thread):
""" the working unit of ZOTBOT. """
def __init__(zelf, *args, **kwargs):
threading.Thread.__init__(zelf, *args, **kwargs)
zelf._queue = queue.Queue()
def __iter__(zelf):
for x in dir(zelf): yield x
[docs] def run(zelf, *args, **kwargs):
""" loop that executes the (function, object) pairs. """
zelf.type = "RUNNER"
zelf._start = time.time()
zelf._last = time.time()
zelf._status = "wait"
while zelf._status:
arglist, kwargdict = zelf._queue.get()
if not arglist or not arglist[0]: break
zelf.func, *arguments = arglist
name = get_named(zelf.func)
zelf.setName(name)
zelf.func(*arguments, **kwargdict)
logging.info("# exec %s" % name)
if zelf._queue.empty(): break
[docs] def stop(zelf, *args, **kwargs): zelf.put((None, {}))
[docs] def put(zelf, *args, **kwargs):
""" put arguments/kwargs to the Runner. """
zelf._last = time.time()
zelf._queue.put((args, kwargs))
return zelf
## DISPATCH
[docs]class DISPATCHER(Object):
""" the dispatcher delegates the workload to the Runners. Runner gets instantiated when needed. """
cc = "!"
def __init__(zelf, *args, **kwargs):
Object.__init__(zelf, *args, **kwargs)
zelf._input = queue.Queue()
zelf._output = queue.Queue()
zelf._ready = threading.Event()
## VIRTUAL
[docs] def event(zelf, *args, **kwargs): pass
## PROCES
[docs] def start(zelf, *args, **kwargs):
logging.warn("# start %s" % zelf.type)
status = "get"
while status:
try:
event = zelf.event()
if not event: break
zelf.handle_event(event)
except RemoteDisconnect: break
except (EOFError, KeyboardInterrupt): raise
except: error()
[docs] def stop(zelf, *args, **kwargs):
name = args[0]
for thr in threading.enumerate():
if name in str(thr): thr.exit()
[docs] def exit(zelf, *args, **kwargs):
zelf._status = ""
zelf.put(None)
## INPUT/OUTPUT
[docs] def put(zelf, *args, **kwargs):
runner = RUNNER()
runner.start()
return runner.put(*args, **kwargs)
## RESULTS
[docs] def collect(zelf, *args, **kwargs):
result = args[0]
for thr in result:
if thr: thr.join()
## DISPATCHING
[docs] def single(zelf, *args, **kwargs):
if not args[0]: return
event = Object()
event._target = zelf
event.origin = "zot@shell"
event.txt = args[0]
event.cc = zelf.cc
return zelf.handle_cmnd(event)
[docs] def handle_event(zelf, *args, **kwargs):
event = args[0]
if "txt" not in event or not event.txt: return []
thrs = zelf.handle_cmnd(event)
if thrs: zelf.collect(thrs)
[docs] def handle_cmnd(zelf, *args, **kwargs):
from zot.runtime import kernel
event = args[0]
event.update(event.parsed)
if "cc" in event and not event.txt.startswith(event.cc): return
cmnds = []
result = []
cmnds = kernel.find(event.cmnd)
if not cmnds:
try: cmnds = kernel[event.cmnd]
except KeyError: cmnds = []
for cmnd in cmnds:
try: o = zelf.put(cmnd.func, event) ; result.append(o)
except AttributeError: continue
except: error()
return result