Source code for botlib.kernel

# mad/kernel.py
#
#

""" module loading. """

from .db import Db
from .event import Event
from .launcher import Launcher
from .log import loglevel
from .object import Default, Object, Config
from .handler import Handler
from .register import Register
from .template import template
from .raw import RAW
from .utils import set_completer, pathname
from .trace import get_exception

from botlib import __license__

import botlib
import logging
import importlib
import os
import pkgutil
import queue
import shutil
import sys
import time
import types

[docs]class Kernel(Handler, Launcher, Db): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._booted = Object() self._cbs = Register() self._cmnds = [] self._finished = Object() self._names = Register() self._table = Object() self._time = Default(default=0)
[docs] def announce(self, txt): """ announce txt on all fleet bot. """ from .space import fleet for bot in fleet: bot.announce(txt)
[docs] def boot(self, cfgin): """ start the kernel. """ from .space import alias, cfg, fleet, load, users, ENOWORKDIR cfg.update(cfgin) self._time.start = time.time() cfg.logdir = cfg.logdir or os.path.join(cfg.homedir, ".botlog") if cfg.workdir: cfg.changed = True if cfg.user: cfg.banner = True cfg.license = True cfg.shell = True cfg.verbose = True cfg.loglevel = cfg.loglevel or "warn" if cfg.banner: print("%s #%s\n" % (cfg.name.upper(), cfg.version)) if cfg.test: cfg.workdir = os.path.abspath(os.path.join(os.getcwd(), "data")) cfg.changed = True if cfg.yes or cfg.shell: if cfgin.loglevel: cfg.verbose = True cfg.loglevel = cfg.loglevel or "error" cfg.init += ",cli" loglevel(cfg.loglevel or "error", cfg.logdir) if cfg.license: print(__license__) for fn in os.listdir(os.getcwd()): if fn.endswith(".egg"): if cfg.verbose: logging.warn("# load %s" % fn) sys.path.insert(0, fn) if not cfg.workdir: cfg.workdir = os.path.join(cfg.homedir, ".botdata") if not os.path.isdir(cfg.bootdir): os.mkdir(cfg.bootdir) if not os.path.isdir(cfg.workdir): os.mkdir(cfg.workdir) thrs = [] dosave = False load() if not self._names and not cfg.write: k = Kernel().load(os.path.join(cfg.bootdir, "runtime", "kernel")) self._names.update(k._names) c = Config().load(os.path.join(cfg.bootdir, "runtime", "cfg")) if cfg.scan: if "workdirs" in c and c.workdirs: cfg.workdirs = c.workdirs if cfg.write: cfg.workdirs = [] dosave = True if cfg.yes or cfg.write or not self._names: self.walk("botlib") self.sync(os.path.join(cfg.bootdir, "runtime", "kernel")) if cfg.name.lower() != "botlib": self.walk(cfg.name.lower()) cfg.packages.append(cfg.name.lower()) for name in cfg.mods.split(","): if name: self.walk(name) if cfg.all or cfg.init: for pname in cfg.packages: for modname in self.modules(pname): n = modname.split(".")[-1] if modname in cfg.ignore and n not in cfg.init: continue if cfg.all or n in cfg.init.split(","): thr = self.init(modname) if thr: thrs.append(thr) for modname in cfg.needed: thr = self.init(modname) thrs.append(thr) if not self._cmnds: self._cmnds = sorted(set([cmnd for cmnd in self._names.keys()])) set_completer(self._cmnds) if dosave: cfg.sync(os.path.join(cfg.bootdir, "runtime", "cfg")) self.start() if cfg.args: e = self.once(" ".join(cfg.args)) if e: e.wait() self.ready() return self results = self.waiter(thrs) for bot in fleet: bot._connected.wait() if cfg.verbose: print("") for bot in fleet: bot.prompt() if cfg.user: cfg.loglevel = "error" loglevel(cfg.loglevel, cfg.logdir) if cfg.shell or cfg.init: for object in results: if object: object.wait() self.ready() return self
[docs] def callcb(self, event): try: _cbs = self._cbs[event.origin] except KeyError: _cbs = [] for cb in _cbs: cb(event)
[docs] def cmnd(self, txt): """ execute a command based on txt. """ from .space import fleet bot = RAW() fleet.add(bot) event = Event() event.cc = "" event.origin = "root@shell" event.channel = "#botlib" event.server = "localhost" event.btype = bot.type event.txt = txt event.parse() return event
[docs] def direct(self, name, package=None): """ import a module directly, not storing it in the cache. """ return importlib.import_module(name, package)
[docs] def dispatch(self, event): """ dispatch an event. """ thr = event.handle() thr.join() self._time.last = time.time()
[docs] def get_funcs(self, cmnd): """ search for a function registered by command. """ from botlib.space import alias oldcmnd = cmnd cmnd = alias.get(cmnd, cmnd) funcs = self._handlers.get(cmnd, []) if not funcs: funcs = self._handlers.get(oldcmnd, []) if not funcs: modnames = self._names.get(cmnd, []) if not modnames and cmnd in self._cmnds: self.walk("botlib") for modname in modnames: self.loading(modname) funcs = self._handlers.get(cmnd, []) return funcs
[docs] def init(self, modname): """ initialize a module. """ from botlib.space import cfg event = Event() n = modname.split(".")[-1] mod = self._table.get(modname, None) if not mod or type(mod) == str: mod = self.load_mod(modname) if "init" in dir(mod): return self.launch(mod.init, event)
[docs] def list(self, name): """ list all functions found in a module. """ for modname in self.modules(name): mod = self.direct(modname) for key in dir(mod): if key in ["init", "shutdown"]: continue object = getattr(mod, key, None) if object and type(object) == types.FunctionType: if "event" in object.__code__.co_varnames: yield key
[docs] def load_mod(self, modname, force=True): """ load a module and if force is sset to True, put in in the cache, """ mod = self.direct(modname) if mod and force: self._table[modname] = mod return mod
[docs] def loading(self, modname, force=True): """ load a module. """ logging.warn("# loading %s" % modname) if force: mod = self.load_mod(modname) else: mod = self.direct(modname) res = Object() for key in dir(mod): if key.startswith("_"): continue object = getattr(mod, key, None) if object and type(object) == types.FunctionType: if "event" in object.__code__.co_varnames: if key.startswith("cb_"): k = key.split("cb_")[-1] self._cbs.register(key, object) else: self._names.register(key, modname) if key not in ["init", "shutdown"]: self._handlers.register(key, object) res[key] = object return res
[docs] def modules(self, name=""): """ return a list of modules in the named packages or cfg.packages if no module name is provided. """ from botlib.space import cfg if not name: names = cfg.packages else: names = [name, ] for name in names: package = self.direct(name) for pkg in pkgutil.walk_packages(package.__path__, name + "."): yield pkg[1]
[docs] def once(self, txt): """ run once command. """ try: e = self.cmnd(txt) self.dispatch(e) return e except: print(get_exception())
[docs] def put(self, *args, **kwargs): if args: logging.debug("# put %s" % args[0]) super().put(*args, **kwargs)
[docs] def register(self, key, value, force=True): """ register handlers. """ self._handlers.register(key, value, force)
[docs] def reload(self, name, force=False, event=None): """ reload module. """ e = event or template.get("event") try: self._table[name].shutdown(e) except (KeyError, AttributeError): pass self.loading(name, True) try: self._table[name].init(e) except (KeyError, AttributeError): pass if name in self._table: return self._table[name]
[docs] def shutdown(self, close=True): """ stop bot, services and plugins. """ from .space import cfg, fleet, kernel, runtime logging.warn("") event = Event() event.txt = "shutdown" event.server = "kernel" thrs = [] for bot in fleet: if "stop" in dir(bot): bot.stop() elif "exit" in dir(bot): bot.exit() bot.ready() for key, mod in self._table.items(): try: mod.shutdown(event) except AttributeError: continue if cfg.write: kernel.sync(os.path.join(cfg.bootdir, "runtime", "kernel")) cfg.sync(os.path.join(cfg.bootdir, "runtime", "cfg")) if cfg.test: for ex in exceptions: print(ex) self.ready()
[docs] def walk(self, name, init=False, force=False): """ return all modules in a package. """ mod = [] logging.warn("# walk %s" % name) for modname in sorted(list(self.modules(name))): self.loading(modname, force) if init: self.init(modname) mod.append(modname) return mod