Source code for core.dispatcher

# core/dispatcher.py
#
#

""" basic package for the program. """

__copyright__ = "Copyright 2015, B.H.J Thate"

## IMPORTS

from core.utils.trace import get_plugname, get_strace
from core.utils.parse import txt_parse
from core.utils.name import named
from core.errors import error

from core.errors import NoFunction, NoEvent, RemoteDisconnect
from core.thing import Thing

import collections
import threading
import logging
import random
import queue
import time

## DEFINES

threads = []

## RUNNER

[docs]class Task(threading.Thread): """ the working unit of CORE. """ def __init__(zelf, *args, **kwargs): threading.Thread.__init__(zelf, *args, **kwargs) zelf._queue = queue.Queue() def __iter__(zelf): for x in dir(zelf): yield x
[docs] def run(zelf, *args, **kwargs): """ loop that executes the (function, ding) pairs. """ zelf.type = "Task" zelf._start = time.time() zelf._last = time.time() zelf._status = "starting" while zelf._status: arglist, kwargdict = zelf._queue.get() if not arglist or not arglist[0]: break zelf.func, *arguments = arglist name = named(zelf.func) zelf.setName(name) logging.info("^ task %s" % name) zelf._status = "running" try: zelf.func(*arguments, **kwargdict) except: error("", name) ; break if arguments: logging.debug(arguments[0]) if zelf._queue.empty(): break
[docs] def stop(zelf, *args, **kwargs): logging.warn("stop %s" % zelf.getName()) zelf._status = "" zelf.put((None, {}))
[docs] def put(zelf, *args, **kwargs): """ put arguments/kwargs to the Task. """ zelf._last = time.time() zelf._queue.put((args, kwargs)) return zelf ## DISPATCH
[docs]class Dispatcher(Thing): """ the dispatcher delegates the workload to the Runners. Runner gets instantiated when needed. """ cc = "!" default = "" def __init__(zelf, *args, **kwargs): Thing.__init__(zelf, *args, **kwargs) zelf._input = queue.Queue() zelf._output = queue.Queue() zelf._ready = threading.Event() ## VIRTUAL
[docs] def event(zelf, *args, **kwargs): pass ## PROCES
[docs] def start(zelf, *args, **kwargs): logging.warn("# start %s" % zelf.type) zelf._status = "running" while zelf._status: try: event = zelf.event() logging.debug("! event %s" % event.json()) logging.debug("! target %s" % event._target) if not zelf._status: break if not event: continue if "txt" in event: event.parsed = txt_parse(zelf, event.txt) else: event.parsed = txt_parse(zelf, "") zelf.dispatch(event) except RemoteDisconnect: time.sleep(5.0) ; continue except (EOFError, KeyboardInterrupt): raise except: error() logging.warn("# stop %s/%s" % zelf.type, event.cmnd)
[docs] def stop(zelf, *args, **kwargs): name = args[0] for thr in threading.enumerate(): if name in str(thr): try: thr.cancel() except AttributeError: thr.join() logging.warn("stop %s" % str(thr)) return True
[docs] def exit(zelf, *args, **kwargs): zelf._status = "" zelf.put(None) ## INPUT/OUTPUT
[docs] def put(zelf, *args, **kwargs): task = Task() task.start() return task.put(*args, **kwargs) ## RESULTS
[docs] def collect(zelf, *args, **kwargs): result = args[0] for thr in result: if thr: thr.join() ## DISPATCHING
[docs] def single(zelf, *args, **kwargs): if not args[0]: return event = Thing() event._target = zelf event.origin = "core@shell" event.txt = args[0] event.parsed = txt_parse(zelf, event.txt) return zelf.dispatch(event)
[docs] def dispatch(zelf, *args, **kwargs): event = args[0] if "txt" not in event: return if "parsed" not in event: event.parsed = txt_parse(zelf, event.txt) cmnds = zelf.find_cmnd(event.parsed.cmnd) thrs = [] for cmnd in cmnds: try: o = zelf.put(cmnd.func, event) ; thrs.append(o) except AttributeError: continue except: error() if thrs: zelf.collect(thrs) zelf.ready()
[docs] def find_cmnd(zelf, *args, **kwargs): from core.kernel import kernel cmnd = args[0] result = [] cmnds = kernel.find(cmnd) if not cmnds: try: cmnds = kernel[cmnd] except KeyError: cmnds = [] return cmnds