# zbot/__init__.py
#
#
""" . """
__version__ = 47
## IMPORTS
from zbot.utils import *
import collections
import threading
import logging
import _thread
import socket
import queue
import types
import errno
import select
import fcntl
import uuid
import json
import time
import imp
import sys
import os
import re
## ERRORS
[docs]class Error(BaseException): pass
[docs]class OverloadError(Error): pass
[docs]class MissingArgument(Error): pass
[docs]class MissingOutFunction(Error): pass
[docs]class NoText(Error): pass
[docs]class NoDate(Error): pass
[docs]class NoPath(Error): pass
[docs]class NoJSON(Error): pass
[docs]class NoFileName(Error): pass
[docs]class SignatureError(Error): pass
[docs]class NoSuchBotType(Error): pass
[docs]class NoEvent(Error): pass
[docs]class RemoteDisconnect(Error): pass
[docs]class NotOne(Error): pass
[docs]class TargetMissing(Error): pass
## BASE
[docs]class Object(dict):
def __init__(self, *args, **kwargs): dict.__init__(self, *args, **kwargs)
def __getattribute__(self, *args, **kwargs):
name = args[0]
if name == "date": return self.Date or self.added or ""
if name == "time": return time_stamp(self.date)
if name == "what": return get_cls(self)
if name == "modname": return get_modname(self)
if name == "plugname": return get_plugname(3)
if name == "func": return get_func(5)
if name == "how": return get_how(2)
if name == "kind": return mj(self.modname, self.what, self.func)
if name == "stdin": return sys.stdin
if name == "stdout": return sys.stdout
if name == "mdate": return short_date(self.Date or self.added)
return dict.__getattribute__(self, *args, **kwargs)
def __getattr__(self, name):
try: return self[name]
except KeyError:
if name == "_result": self["_result"] = Object()
if name == "_status": self["_status"] = Object()
if name == "_ready": self["_ready"] = threading.Event()
try: return self[name]
except KeyError: return ""
def __setattr__(self, name, value): return dict.__setitem__(self, name, value)
def __exists__(self, a):
try: return self[a]
except KeyError: False
def __lt__(self, a): return time_stamp(self.ddate) < time_stamp(a.ddate)
## TOUCH
[docs] def touch(self, *args, **kwargs):
try: res = getattr(self, args[0])
except (TypeError, AttributeError): res = None
return res
## locators
[docs] def get_root(self, *args, **kwargs): return j(os.path.expanduser("~"), *args)
[docs] def get_roots(self, *args, **kwargs):
result = []
return result
[docs] def get_target(self):
if "_target" in self: return self._target
return None
## output
[docs] def make_json(self, *args, **kwargs): return json.dumps(self.reduced(), default=smooth, *args, **kwargs)
[docs] def make_full(self, *args, **kwargs): return json.dumps(self, default=smooth, *args, **kwargs)
[docs] def make_signature(self, sig=None): return str(hashlib.sha1(bytes(str(sig or self), "utf-8")).hexdigest())
[docs] def make_path(self, *args, **kwargs): path = self.get_path(*args, **kwargs) ; make_dir(path) ; return path
## input
[docs] def load(self, *args, **kwargs):
if args: path = self.make_path(*args, **kwargs)
else: path = self.make_path()
return self.load_file(path)
[docs] def load_file(self, *args, **kwargs):
path = args[0]
ondisk = self.read(path)
fromdisk = json.loads(ondisk)
if "data" in fromdisk: self.update(fromdisk["data"])
return self
[docs] def load_json(self, *args, **kwargs): self.update(json.loads(args[0]))
[docs] def read(self, *args, **kwargs):
logging.info("read %s" % args[0])
path = args[0]
try: f = open(path, "r")
except IOError as ex:
if ex.errno == errno.ENOENT: return "{}"
raise
if self.do_test: f.line_buffering = False
res = ""
for line in f:
if not line.strip().startswith("#"): res += line
if not res.strip(): return "{}"
f.close()
return res
## persistence
[docs] def dump(self, *args, **kwargs):
# object attributes settins before save
path = args[0]
todisk = Object()
todisk.data = self.reduced()
todisk.data.date = self.date
todisk.how = self.how
todisk.create_type = self.what
todisk.stime = time.ctime(time.time())
todisk.modname = self.modname
todisk.version = __version__
todisk.signature = self.make_signature()
try: result = todisk.make_json(indent=2, ensure_ascii=False)
except TypeError: raise NoJSON()
return result
[docs] def save(self, *args, **kwargs):
if not args: t = rtime()
else: t = args[0]
for path in kernel.cfg.rootlist:
root = self.get_root(path, t)
make_dir(root)
self.sync(root)
[docs] def sync(self, *args, **kwargs):
# syn the object to disk
path = args[0]
fn = path.split(os.sep)[-1]
todisk = self.dump(path, **kwargs)
datafile = open(path + ".tmp", 'w')
fcntl.flock(datafile, fcntl.LOCK_EX | fcntl.LOCK_NB)
datafile.write(headertxt % (fn, __version__, "%s characters" % len(todisk)))
datafile.write(todisk)
datafile.write("\n")
fcntl.flock(datafile, fcntl.LOCK_UN)
datafile.close()
os.rename(path + ".tmp", path)
logging.warn("saved %s" % path)
return self
## state
[docs] def register(self, *args, **kwargs):
name = args[0]
obj = args[1]
self[name] = obj
[docs] def reduced(self, *args, **kwargs):
res = Object()
for key in self.keys():
k = str(key)
if k.startswith("_"): continue
if key in ["args", "rest", "first"]: continue
res[key] = self[key]
return res
[docs] def pretty(self): return json.dumps(self, indent=2)
## objectors
[docs] def objects(self, *args, **kwargs):
path = self.get_root(kernel.cfg.workdir, *args, **kwargs)
flist = list_files(path, **kwargs)
res = []
for fnn in flist:
try: obj = Object().load_file(fnn)
except ValueError: logging.warn("no json in %s" % fnn) ; continue
res.append(obj)
logging.warn("%s objects found" % len(res))
return res
## result
[docs] def parse(self, *args, **kwargs):
try: self.ucmnd, self.rest = self.txt.split(" ", 1)
except: self.ucmnd = self.txt ; self.rest = ""
if self.ucmnd and self.ucmnd[0] == ".": self.ucmnd = self.ucmnd[1:]
return self
[docs] def prepare(self, *args, **kwargs):
self.opts = get_opts(self.txt)
self.args = get_args(self.txt)
self.knobs = get_knobs(self.txt)
self.parse()
return self
[docs] def ready(self): self._ready.set()
[docs] def wait(self, sec=180.0): self._ready.wait(sec)
[docs] def add(self, value): self._result[time.time()] = value
[docs] def reply(self, txt): self.say(self.channel, txt)
[docs] def show(self): return ["%s=%s" % (a, b) for a, b in self.items() if b]
def _raw(self, txt): self._target._raw(txt)
[docs] def say(self, *args, **kwargs):
if self.outer: o = self.outer ; o.write(str(args[1])) ; o.write("\n") ; o.flush()
else: self._target.say(*args, **kwargs)
[docs] def display(self, *args, **kwargs):
for res in sorted(self._result.keys()): d = self._result[res] ; self.say(self.channel or "", d)
[docs] def put(self, *args, **kwargs): self._target.put(*args, **kwargs)
[docs] def handle(self, *args , **kwargs): res = resolve(self, *args, **kwargs) ; return res
## TASKS
[docs]class Runner(threading.Thread):
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self, None, self._loop, "thread.%s" % str(time.time()), args, kwargs)
self.setDaemon(True)
self._queue = queue.Queue()
self._outqueue = queue.Queue()
self._state = "idle"
def _loop(self, *args, **kwargs):
logging.warn("start %s" % str(self))
while self._state in ["running", "idle", "callback", "once"]:
o = self._queue.get()
if self._state == "stop": break
try:
func = o.a[0]
o.returned = func(*o.a[1:], **o.kw)
except: error()
time.sleep(0.001)
logging.warn("stop %s" % str(self))
[docs] def stop(self, *args, **kwargs): self._state = "stop"
[docs] def put(self, *args, **kwargs):
o = Object()
o.a = args
o.kw = kwargs
self._queue.put(o)
[docs]class TaskRunner(Runner):
def _loop(self, *args, **kwargs):
logging.warn("start %s" % str(self))
while self._state in ["running", "idle", "callback", "once"]:
o = self._queue.get()
if self._state == "stop": break
logging.info("run %s" % str(o))
kernel.cb.handle_cb(o.a[0])
time.sleep(0.001)
logging.warn("stop %s" % str(self))
[docs]class Spider(Runner):
def __init__(self, *args, **kwargs):
Runner.__init__(self, *args, **kwargs)
self.errors = []
self.urls = []
self.url = Object()
self.followed = []
self.speed = 0
self.depth = 5
[docs] def crawl(self, *args, **kwargs):
self.start()
self.get_urls(*args, **kwargs)
[docs] def get_urls(self, *args, **kwargs):
url = args[0]
urls = []
if not self.url:
self.url.url = url
self.url.basepath, self.url.base, self.url.root, self.url.file = parse_url(self.url.url)
pnr = len(url.split("/"))
if pnr > self.depth: logging.warn("%s depth > 5" % url) ; return
if url not in self.urls:
self.urls.append(url)
content = do_url("GET", url)
newurl = need_redirect(content)
if newurl: content = do_url("GET", newurl) ; logging.warn("redirecting to %s" % newurl)
newurl2 = need_redirect(content)
if newurl2: content = do_url("GET", url) ; logging.warn("redirecting to %s" % newurl2)
time.sleep(self.speed)
self.speed += 0.1
urls = parse_urls(url, content.read())
o = Object()
o.spider = True
o.orig_url = url
o.urls = urls
o.save()
for u in urls:
if u in self.urls: continue
if not self.url.base in u: continue
if u in self.errors: continue
self.put(self.get_urls, u)
return urls
## DISPATCH
[docs]class Dispatcher(Object):
max = 50
runners = collections.deque()
[docs] def stop(self, name=None):
for runner in self.runners:
if name and name in runner.name: runner.stop()
[docs] def put(self, *args, **kwargs):
if not args: raise NoTask()
target = self.get_target()
target.put(*args, **kwargs)
return args[0]
[docs] def get_target(self):
target = None
for runner in self.runners:
if runner._queue and runner._state == "idle": target = runner
if not target: target = self.makenew()
return target
[docs] def makenew(self, *args, **kwargs):
if len(self._runners) < self.max:
runner = TaskRunner(*args, **kwargs)
runner.start()
self.runners.append(runner)
else: runner = random.choice(self._runners)
return runner
[docs] def cleanup(self, dojoin=False):
todo = []
for runner in self.runners:
if runner.stopped or not len(runner.queue): todo.append(runner)
for runner in todo: runner.stop() ; self.runners.remove(runner)
## CALLBACKS
[docs]class Callbacks(Dispatcher):
[docs] def register(self, cbtype, cb, *args, **kwargs):
logging.debug("register %s" % cbtype)
if cbtype not in self: self[cbtype] = []
self[cbtype].append(cb)
[docs] def handle_cb(self, *args, **kwargs):
event = args[0]
logging.info(event)
target = get_cls(event)
logging.info("cb %s %s" % (target, event.show()))
functions = []
try: functions.extend(self[target])
except KeyError: pass
for func in functions:
try: pre = getattr(func, "pre")
except AttributeError: pre = None
if pre and not pre(event): logging.debug("pre %s" % str(func)) ; return
try: result = func(event)
except: error()
return event
## BOTS
[docs]class Bot(Callbacks):
[docs] def connect(self, *args, **kwargs): pass
[docs] def exit(self, *args, **kwargs):
try: sys.stdout.flush()
except Exception as ex: logging.warn(str(ex))
sys.stdout.close()
os._exit(0)
[docs] def get_one(self): return None
[docs] def read_some(self, *args, **kwargs): pass
[docs] def put_some(self, *args, **kwargs):
input = args[0]
[i, o, e] = select.select([input,], [], [], 0.05)
gotcha = False
if i: text = i[0].read()
return gotcha
[docs] def get_prompt(self, *args, **kwargs): return ""
[docs] def begin(self, *args, **kwargs):
self.connect()
if not self._state: self._state = "running"
self.run_forever(*args, **kwargs)
[docs] def run_forever(self, *args, **kwargs):
while self._state in ["running", "idle", "once", "callback"]:
time.sleep(0.001)
e = self.get_one()
if e:
self.do_one(e)
if e._state == "once": break
self.ready()
[docs] def do_one(self, *args, **kwargs):
ok = True
try: event = args[0] ; event._target = self ; self.put(event) ; event.wait()
except KeyboardInterrupt: raise
except IndexError: ok = False
except Exception: error() ; ok = False
return ok
## PLUGINS
[docs]class Plugins(Object):
[docs] def get_names(self, plugsdir): return [x[:-3] for x in os.listdir(plugsdir) if x.endswith(".py")]
[docs] def load_plugs(self, path):
logging.warn("plugs %s" % path)
for plugname in self.get_names(path):
if "__" in plugname: continue
try: mod = self.load_mod(plugname, path, force=True)
except: error() ; continue
[docs] def load_package(self, modname):
mod = __import__(modname)
path, fn = os.path.split(mod.__file__)
path += os.sep + "plugs"
self.load_plugs(path)
[docs] def load_mod(self, plugname, pdir="", force=False):
logging.info("load %s - %s" % (self.what.lower(), plugname))
if plugname in self:
if not force: return self[plugname]
self[plugname] = imp.reload(self[plugname])
else:
if not pdir: pdir = j(self.root, "plugs")
search = imp.find_module(plugname, [pdir,])
self[plugname] = imp.load_module(plugname, *search)
self.plug_exec(plugname, "init")
return self[plugname]
[docs] def plug_exec(self, plugname, item):
try: todo = getattr(self[plugname], item) ; todo() ; logging.info("exec %s" % get_name(todo))
except AttributeError: pass
[docs] def unload(self, plugname):
self.plug_exec(plugname, "shutdown")
del self[plugname]
[docs] def reload(self, plugname, force=False):
self.unload(plugname)
mod = self.load_mod(plugname, force)
return mod
## BOOT
[docs]def boot(*args, **kwargs):
global kernel
cfg = args[0]
kernel.cfg.update(cfg)
if cfg.do_shell: hello("ZBOT")
set_workdir(kernel.cfg.workdir)
if not kernel.cfg.loglevel: cfg.loglevel = "error"
from zbot.log import log_config
log_config(cfg.loglevel)
if kernel.cfg.loglevel:
logging.warn("E G G S")
logging.warn("")
show_eggs()
if kernel.cfg.loglevel:
logging.warn("")
logging.warn("C O N F I G")
logging.warn("")
for line in kernel.cfg.show(): logging.warn(line)
if kernel.cfg.loglevel:
logging.warn("")
logging.warn("B O O T")
logging.warn("")
kernel.cb.register("Object", handle_loop)
return kernel
## HANDLER
[docs]def handle_loop(event):
try: event.prepare() ; event.handle() ; event.ready()
except KeyboardInterrupt: raise
## WORKDIR
[docs]def set_workdir(*args, **kwargs):
global kernel
workdir = args[0]
if not workdir: workdir = "zbot.workdir"
wd = workdir.split(os.sep)[-1]
if not wd: wd = workdir.split(os.sep)[-2]
else: wd = "zbot.workdir"
if not kernel.cfg.workdir: kernel.cfg.workdir = j(os.getcwd(), wd)
kernel.cfg.rootlist = []
kernel.cfg.rootlist.append(kernel.cfg.workdir)
kernel.cfg.rootlist.append(j("Public", wd, day(), ""))
kernel.cfg.rootlist.append(j("Dropbox", "backups", wd, day(), ""))
make_dir(kernel.cfg.workdir)
## VARS
kernel = Object()
kernel.cfg = Object()
kernel.plugs = Plugins()
kernel.cb = Callbacks()
kernel.cmnds = Callbacks()
kernel.tests = Callbacks()