Source code for zot.dispatcher

# zot/dispatcher.py
#
#

""" basic package for the program. """

__copyright__ = "Copyright 2015, B.H.J Thate"

## IMPORTS

from zot.utils import get_plugname, get_named, get_strace, error
from zot.errors import NoFunction, NoEvent, RemoteDisconnect
from zot.object import Object

import collections
import threading
import logging
import threading
import random
import queue
import time

## DEFINES

threads = []

## RUNNER

[docs]class RUNNER(threading.Thread): """ the working unit of ZOTBOT. """ def __init__(zelf, *args, **kwargs): threading.Thread.__init__(zelf, *args, **kwargs) zelf._queue = queue.Queue() def __iter__(zelf): for x in dir(zelf): yield x
[docs] def run(zelf, *args, **kwargs): """ loop that executes the (function, object) pairs. """ zelf.type = "RUNNER" zelf._start = time.time() zelf._last = time.time() zelf._status = "wait" while zelf._status: arglist, kwargdict = zelf._queue.get() if not arglist or not arglist[0]: break zelf.func, *arguments = arglist name = get_named(zelf.func) zelf.setName(name) zelf.func(*arguments, **kwargdict) logging.info("# exec %s" % name) if zelf._queue.empty(): break
[docs] def stop(zelf, *args, **kwargs): zelf.put((None, {}))
[docs] def put(zelf, *args, **kwargs): """ put arguments/kwargs to the Runner. """ zelf._last = time.time() zelf._queue.put((args, kwargs)) return zelf ## DISPATCH
[docs]class DISPATCHER(Object): """ the dispatcher delegates the workload to the Runners. Runner gets instantiated when needed. """ cc = "!" def __init__(zelf, *args, **kwargs): Object.__init__(zelf, *args, **kwargs) zelf._input = queue.Queue() zelf._output = queue.Queue() zelf._ready = threading.Event() ## VIRTUAL
[docs] def event(zelf, *args, **kwargs): pass ## PROCES
[docs] def start(zelf, *args, **kwargs): logging.warn("# start %s" % zelf.type) status = "get" while status: try: event = zelf.event() if not event: break zelf.handle_event(event) except RemoteDisconnect: break except (EOFError, KeyboardInterrupt): raise except: error()
[docs] def stop(zelf, *args, **kwargs): name = args[0] for thr in threading.enumerate(): if name in str(thr): thr.exit()
[docs] def exit(zelf, *args, **kwargs): zelf._status = "" zelf.put(None) ## INPUT/OUTPUT
[docs] def put(zelf, *args, **kwargs): runner = RUNNER() runner.start() return runner.put(*args, **kwargs) ## RESULTS
[docs] def collect(zelf, *args, **kwargs): result = args[0] for thr in result: if thr: thr.join() ## DISPATCHING
[docs] def single(zelf, *args, **kwargs): if not args[0]: return event = Object() event._target = zelf event.origin = "zot@shell" event.txt = args[0] event.cc = zelf.cc return zelf.handle_cmnd(event)
[docs] def handle_event(zelf, *args, **kwargs): event = args[0] if "txt" not in event or not event.txt: return [] thrs = zelf.handle_cmnd(event) if thrs: zelf.collect(thrs)
[docs] def handle_cmnd(zelf, *args, **kwargs): from zot.runtime import kernel event = args[0] event.update(event.parsed) if "cc" in event and not event.txt.startswith(event.cc): return cmnds = [] result = [] cmnds = kernel.find(event.cmnd) if not cmnds: try: cmnds = kernel[event.cmnd] except KeyError: cmnds = [] for cmnd in cmnds: try: o = zelf.put(cmnd.func, event) ; result.append(o) except AttributeError: continue except: error() return result