Source code for zbot.__init__

# zbot/__init__.py
#
#

""" . """

__version__ = 47

## IMPORTS 

from zbot.utils import *

import collections
import threading
import logging
import _thread
import socket
import queue
import types
import errno
import select
import fcntl
import uuid
import json
import time
import imp
import sys
import os
import re

## ERRORS

[docs]class Error(BaseException): pass
[docs]class OverloadError(Error): pass
[docs]class MissingArgument(Error): pass
[docs]class MissingOutFunction(Error): pass
[docs]class NoText(Error): pass
[docs]class NoDate(Error): pass
[docs]class NoPath(Error): pass
[docs]class NoJSON(Error): pass
[docs]class NoFileName(Error): pass
[docs]class SignatureError(Error): pass
[docs]class NoSuchBotType(Error): pass
[docs]class NoEvent(Error): pass
[docs]class RemoteDisconnect(Error): pass
[docs]class NotOne(Error): pass
[docs]class TargetMissing(Error): pass ## BASE
[docs]class Object(dict): def __init__(self, *args, **kwargs): dict.__init__(self, *args, **kwargs) def __getattribute__(self, *args, **kwargs): name = args[0] if name == "date": return self.Date or self.added or "" if name == "time": return time_stamp(self.date) if name == "what": return get_cls(self) if name == "modname": return get_modname(self) if name == "plugname": return get_plugname(3) if name == "func": return get_func(5) if name == "how": return get_how(2) if name == "kind": return mj(self.modname, self.what, self.func) if name == "stdin": return sys.stdin if name == "stdout": return sys.stdout if name == "mdate": return short_date(self.Date or self.added) return dict.__getattribute__(self, *args, **kwargs) def __getattr__(self, name): try: return self[name] except KeyError: if name == "_result": self["_result"] = Object() if name == "_status": self["_status"] = Object() if name == "_ready": self["_ready"] = threading.Event() try: return self[name] except KeyError: return "" def __setattr__(self, name, value): return dict.__setitem__(self, name, value) def __exists__(self, a): try: return self[a] except KeyError: False def __lt__(self, a): return time_stamp(self.ddate) < time_stamp(a.ddate) ## TOUCH
[docs] def touch(self, *args, **kwargs): try: res = getattr(self, args[0]) except (TypeError, AttributeError): res = None return res ## locators
[docs] def get_root(self, *args, **kwargs): return j(os.path.expanduser("~"), *args)
[docs] def get_roots(self, *args, **kwargs): result = [] return result
[docs] def get_target(self): if "_target" in self: return self._target return None ## output
[docs] def make_json(self, *args, **kwargs): return json.dumps(self.reduced(), default=smooth, *args, **kwargs)
[docs] def make_full(self, *args, **kwargs): return json.dumps(self, default=smooth, *args, **kwargs)
[docs] def make_signature(self, sig=None): return str(hashlib.sha1(bytes(str(sig or self), "utf-8")).hexdigest())
[docs] def make_path(self, *args, **kwargs): path = self.get_path(*args, **kwargs) ; make_dir(path) ; return path ## input
[docs] def load(self, *args, **kwargs): if args: path = self.make_path(*args, **kwargs) else: path = self.make_path() return self.load_file(path)
[docs] def load_file(self, *args, **kwargs): path = args[0] ondisk = self.read(path) fromdisk = json.loads(ondisk) if "data" in fromdisk: self.update(fromdisk["data"]) return self
[docs] def load_json(self, *args, **kwargs): self.update(json.loads(args[0]))
[docs] def read(self, *args, **kwargs): logging.info("read %s" % args[0]) path = args[0] try: f = open(path, "r") except IOError as ex: if ex.errno == errno.ENOENT: return "{}" raise if self.do_test: f.line_buffering = False res = "" for line in f: if not line.strip().startswith("#"): res += line if not res.strip(): return "{}" f.close() return res ## persistence
[docs] def dump(self, *args, **kwargs): # object attributes settins before save path = args[0] todisk = Object() todisk.data = self.reduced() todisk.data.date = self.date todisk.how = self.how todisk.create_type = self.what todisk.stime = time.ctime(time.time()) todisk.modname = self.modname todisk.version = __version__ todisk.signature = self.make_signature() try: result = todisk.make_json(indent=2, ensure_ascii=False) except TypeError: raise NoJSON() return result
[docs] def save(self, *args, **kwargs): if not args: t = rtime() else: t = args[0] for path in kernel.cfg.rootlist: root = self.get_root(path, t) make_dir(root) self.sync(root)
[docs] def sync(self, *args, **kwargs): # syn the object to disk path = args[0] fn = path.split(os.sep)[-1] todisk = self.dump(path, **kwargs) datafile = open(path + ".tmp", 'w') fcntl.flock(datafile, fcntl.LOCK_EX | fcntl.LOCK_NB) datafile.write(headertxt % (fn, __version__, "%s characters" % len(todisk))) datafile.write(todisk) datafile.write("\n") fcntl.flock(datafile, fcntl.LOCK_UN) datafile.close() os.rename(path + ".tmp", path) logging.warn("saved %s" % path) return self ## state
[docs] def register(self, *args, **kwargs): name = args[0] obj = args[1] self[name] = obj
[docs] def reduced(self, *args, **kwargs): res = Object() for key in self.keys(): k = str(key) if k.startswith("_"): continue if key in ["args", "rest", "first"]: continue res[key] = self[key] return res
[docs] def pretty(self): return json.dumps(self, indent=2) ## objectors
[docs] def objects(self, *args, **kwargs): path = self.get_root(kernel.cfg.workdir, *args, **kwargs) flist = list_files(path, **kwargs) res = [] for fnn in flist: try: obj = Object().load_file(fnn) except ValueError: logging.warn("no json in %s" % fnn) ; continue res.append(obj) logging.warn("%s objects found" % len(res)) return res ## result
[docs] def parse(self, *args, **kwargs): try: self.ucmnd, self.rest = self.txt.split(" ", 1) except: self.ucmnd = self.txt ; self.rest = "" if self.ucmnd and self.ucmnd[0] == ".": self.ucmnd = self.ucmnd[1:] return self
[docs] def prepare(self, *args, **kwargs): self.opts = get_opts(self.txt) self.args = get_args(self.txt) self.knobs = get_knobs(self.txt) self.parse() return self
[docs] def ready(self): self._ready.set()
[docs] def wait(self, sec=180.0): self._ready.wait(sec)
[docs] def add(self, value): self._result[time.time()] = value
[docs] def reply(self, txt): self.say(self.channel, txt)
[docs] def show(self): return ["%s=%s" % (a, b) for a, b in self.items() if b]
def _raw(self, txt): self._target._raw(txt)
[docs] def say(self, *args, **kwargs): if self.outer: o = self.outer ; o.write(str(args[1])) ; o.write("\n") ; o.flush() else: self._target.say(*args, **kwargs)
[docs] def display(self, *args, **kwargs): for res in sorted(self._result.keys()): d = self._result[res] ; self.say(self.channel or "", d)
[docs] def put(self, *args, **kwargs): self._target.put(*args, **kwargs)
[docs] def handle(self, *args , **kwargs): res = resolve(self, *args, **kwargs) ; return res ## TASKS
[docs]class Runner(threading.Thread): def __init__(self, *args, **kwargs): threading.Thread.__init__(self, None, self._loop, "thread.%s" % str(time.time()), args, kwargs) self.setDaemon(True) self._queue = queue.Queue() self._outqueue = queue.Queue() self._state = "idle" def _loop(self, *args, **kwargs): logging.warn("start %s" % str(self)) while self._state in ["running", "idle", "callback", "once"]: o = self._queue.get() if self._state == "stop": break try: func = o.a[0] o.returned = func(*o.a[1:], **o.kw) except: error() time.sleep(0.001) logging.warn("stop %s" % str(self))
[docs] def stop(self, *args, **kwargs): self._state = "stop"
[docs] def put(self, *args, **kwargs): o = Object() o.a = args o.kw = kwargs self._queue.put(o)
[docs]class TaskRunner(Runner): def _loop(self, *args, **kwargs): logging.warn("start %s" % str(self)) while self._state in ["running", "idle", "callback", "once"]: o = self._queue.get() if self._state == "stop": break logging.info("run %s" % str(o)) kernel.cb.handle_cb(o.a[0]) time.sleep(0.001) logging.warn("stop %s" % str(self))
[docs]class Spider(Runner): def __init__(self, *args, **kwargs): Runner.__init__(self, *args, **kwargs) self.errors = [] self.urls = [] self.url = Object() self.followed = [] self.speed = 0 self.depth = 5
[docs] def crawl(self, *args, **kwargs): self.start() self.get_urls(*args, **kwargs)
[docs] def get_urls(self, *args, **kwargs): url = args[0] urls = [] if not self.url: self.url.url = url self.url.basepath, self.url.base, self.url.root, self.url.file = parse_url(self.url.url) pnr = len(url.split("/")) if pnr > self.depth: logging.warn("%s depth > 5" % url) ; return if url not in self.urls: self.urls.append(url) content = do_url("GET", url) newurl = need_redirect(content) if newurl: content = do_url("GET", newurl) ; logging.warn("redirecting to %s" % newurl) newurl2 = need_redirect(content) if newurl2: content = do_url("GET", url) ; logging.warn("redirecting to %s" % newurl2) time.sleep(self.speed) self.speed += 0.1 urls = parse_urls(url, content.read()) o = Object() o.spider = True o.orig_url = url o.urls = urls o.save() for u in urls: if u in self.urls: continue if not self.url.base in u: continue if u in self.errors: continue self.put(self.get_urls, u) return urls ## DISPATCH
[docs]class Dispatcher(Object): max = 50 runners = collections.deque()
[docs] def stop(self, name=None): for runner in self.runners: if name and name in runner.name: runner.stop()
[docs] def put(self, *args, **kwargs): if not args: raise NoTask() target = self.get_target() target.put(*args, **kwargs) return args[0]
[docs] def get_target(self): target = None for runner in self.runners: if runner._queue and runner._state == "idle": target = runner if not target: target = self.makenew() return target
[docs] def makenew(self, *args, **kwargs): if len(self._runners) < self.max: runner = TaskRunner(*args, **kwargs) runner.start() self.runners.append(runner) else: runner = random.choice(self._runners) return runner
[docs] def cleanup(self, dojoin=False): todo = [] for runner in self.runners: if runner.stopped or not len(runner.queue): todo.append(runner) for runner in todo: runner.stop() ; self.runners.remove(runner) ## CALLBACKS
[docs]class Callbacks(Dispatcher):
[docs] def register(self, cbtype, cb, *args, **kwargs): logging.debug("register %s" % cbtype) if cbtype not in self: self[cbtype] = [] self[cbtype].append(cb)
[docs] def handle_cb(self, *args, **kwargs): event = args[0] logging.info(event) target = get_cls(event) logging.info("cb %s %s" % (target, event.show())) functions = [] try: functions.extend(self[target]) except KeyError: pass for func in functions: try: pre = getattr(func, "pre") except AttributeError: pre = None if pre and not pre(event): logging.debug("pre %s" % str(func)) ; return try: result = func(event) except: error() return event ## BOTS
[docs]class Bot(Callbacks):
[docs] def connect(self, *args, **kwargs): pass
[docs] def exit(self, *args, **kwargs): try: sys.stdout.flush() except Exception as ex: logging.warn(str(ex)) sys.stdout.close() os._exit(0)
[docs] def get_one(self): return None
[docs] def read_some(self, *args, **kwargs): pass
[docs] def put_some(self, *args, **kwargs): input = args[0] [i, o, e] = select.select([input,], [], [], 0.05) gotcha = False if i: text = i[0].read() return gotcha
[docs] def get_prompt(self, *args, **kwargs): return ""
[docs] def begin(self, *args, **kwargs): self.connect() if not self._state: self._state = "running" self.run_forever(*args, **kwargs)
[docs] def run_forever(self, *args, **kwargs): while self._state in ["running", "idle", "once", "callback"]: time.sleep(0.001) e = self.get_one() if e: self.do_one(e) if e._state == "once": break self.ready()
[docs] def do_one(self, *args, **kwargs): ok = True try: event = args[0] ; event._target = self ; self.put(event) ; event.wait() except KeyboardInterrupt: raise except IndexError: ok = False except Exception: error() ; ok = False return ok ## PLUGINS
[docs]class Plugins(Object):
[docs] def get_names(self, plugsdir): return [x[:-3] for x in os.listdir(plugsdir) if x.endswith(".py")]
[docs] def load_plugs(self, path): logging.warn("plugs %s" % path) for plugname in self.get_names(path): if "__" in plugname: continue try: mod = self.load_mod(plugname, path, force=True) except: error() ; continue
[docs] def load_package(self, modname): mod = __import__(modname) path, fn = os.path.split(mod.__file__) path += os.sep + "plugs" self.load_plugs(path)
[docs] def load_mod(self, plugname, pdir="", force=False): logging.info("load %s - %s" % (self.what.lower(), plugname)) if plugname in self: if not force: return self[plugname] self[plugname] = imp.reload(self[plugname]) else: if not pdir: pdir = j(self.root, "plugs") search = imp.find_module(plugname, [pdir,]) self[plugname] = imp.load_module(plugname, *search) self.plug_exec(plugname, "init") return self[plugname]
[docs] def plug_exec(self, plugname, item): try: todo = getattr(self[plugname], item) ; todo() ; logging.info("exec %s" % get_name(todo)) except AttributeError: pass
[docs] def unload(self, plugname): self.plug_exec(plugname, "shutdown") del self[plugname]
[docs] def reload(self, plugname, force=False): self.unload(plugname) mod = self.load_mod(plugname, force) return mod ## BOOT
[docs]def boot(*args, **kwargs): global kernel cfg = args[0] kernel.cfg.update(cfg) if cfg.do_shell: hello("ZBOT") set_workdir(kernel.cfg.workdir) if not kernel.cfg.loglevel: cfg.loglevel = "error" from zbot.log import log_config log_config(cfg.loglevel) if kernel.cfg.loglevel: logging.warn("E G G S") logging.warn("") show_eggs() if kernel.cfg.loglevel: logging.warn("") logging.warn("C O N F I G") logging.warn("") for line in kernel.cfg.show(): logging.warn(line) if kernel.cfg.loglevel: logging.warn("") logging.warn("B O O T") logging.warn("") kernel.cb.register("Object", handle_loop) return kernel ## HANDLER
[docs]def handle_loop(event): try: event.prepare() ; event.handle() ; event.ready() except KeyboardInterrupt: raise ## WORKDIR
[docs]def set_workdir(*args, **kwargs): global kernel workdir = args[0] if not workdir: workdir = "zbot.workdir" wd = workdir.split(os.sep)[-1] if not wd: wd = workdir.split(os.sep)[-2] else: wd = "zbot.workdir" if not kernel.cfg.workdir: kernel.cfg.workdir = j(os.getcwd(), wd) kernel.cfg.rootlist = [] kernel.cfg.rootlist.append(kernel.cfg.workdir) kernel.cfg.rootlist.append(j("Public", wd, day(), "")) kernel.cfg.rootlist.append(j("Dropbox", "backups", wd, day(), "")) make_dir(kernel.cfg.workdir) ## VARS
kernel = Object() kernel.cfg = Object() kernel.plugs = Plugins() kernel.cb = Callbacks() kernel.cmnds = Callbacks() kernel.tests = Callbacks()