Source code for zot.plugs.info

# zot/plugs/info.py
#
#

""" show version. """

__copyright__ = "Copyright 2015, B.H.J Thate"

## IMPORTS

from zot.utils import to_str, elapsed_days, error
from zot.runtime import kernel, mods, cfg
from zot import __version__, defaults

import threading
import time
import os

## info.version command

[docs]def info_version(event): event.reply("ZOTBOT #%s" % __version__)
kernel.register("info.version", info_version) ## info.uptime command ## info.running command
[docs]def info_running(event): event.reply("threads: %s" % threading.enumerate())
kernel.register("info.running", info_running) ## info.ps command
[docs]def info_ps(event): data = os.popen("ps -p %s -o pid,tid,class,rtprio,ni,pri,psr,rss,pcpu,stat,wchan:14,comm" % os.getpid()).read().strip() event.reply(str(data))
kernel.register("info.ps", info_ps) ## info.keys command
[docs]def info_keys(event): result = [] for obj in kernel.objects(): for key in obj.obj(): if key not in result: result.append(key) for arg in event.args: result = filter(lambda x: arg not in x, result) if result: event.reply(" ".join(result))
kernel.register("info.keys", info_keys) ## info.ls command
[docs]def info_ls(event): rest = event.rest() if not rest: result = set([x.split(".")[-1] for x in kernel.names()]) event.reply(", ".join(sorted(result))) return name = "" keys = kernel.search(rest) if keys: event.reply(", ".join(keys))
kernel.register("info.ls", info_ls) ## info.mods command
[docs]def info_mods(event): rest = event.rest() if not rest: result = set(mods.names()) event.reply(", ".join(sorted([".".join(x.split(".")[1:]) for x in result]))) return name = "" keys = mods.search(rest) event.reply(", ".join([".".join(x.split(".")[1:]) for x in keys]))
kernel.register("info.mods", info_mods) ## info.cfg command
[docs]def info_cfg(event): obj = cfg if event.args: ctype = event.args[0] obj = kernel.last("cfg", ctype) if not obj: try: obj = getattr(defaults, "cfg_%s" % ctype) except AttributeError: pass try: key = event.args[1] value = event.args[2] if key in obj: obj[key] = value ; obj.prefix = "cfg" ; obj.sync() except IndexError: pass if obj: event.reply("\n".join(obj.show()))
kernel.register("info.cfg", info_cfg) ## info.kernel command
[docs]def info_kernel(event): event.reply(kernel.pretty())
kernel.register("info.kernel", info_kernel) ## info.where command
[docs]def info_where(event): name = event.rest() cmnds = kernel.search(name) result = [] for name in cmnds: result.extend([x.plugname for x in kernel.get(name)]) if result: event.reply(", ".join(set(result)))
kernel.register("info.where", info_where) ## HELPERS
[docs]def until(obj): try: return elapsed_days(int(obj.sleep) - int(time.time() - obj._last)) except: error() ; return ""
[docs]def upt(obj): try: return elapsed_days(int(time.time()) - int(obj._start)) except: return ""
[docs]def waiting(obj): if sleeptime(obj): return "%s/%s" % (until(obj), sleeptime(obj)) else: return ""
[docs]def idle(obj): try: return elapsed_days(int(obj._last - obj._start)) except: return ""
[docs]def since(obj): try: return elapsed_days(int(time.time()) - int(obj._last)) except: return ""
[docs]def status(obj): try: return obj._status except: return ""
[docs]def name(obj): try: return obj._name except: return ""
[docs]def running(obj): try: return obj._running except: return ""
[docs]def sleeptime(obj): try: return elapsed_days(obj.sleep) except: return ""
[docs]def plugname(obj): try: return obj.plugname except: return "" ## info.uptime command
funcs = [status, upt, waiting]
[docs]def info_uptime(event): threads = [x for x in threading.enumerate() if status(x)] for obj in sorted(threads, key=name): try: o = obj.func.__self__ except: o = obj res = "%s: " % name(obj) for func in funcs: a = func(o) if not a: continue try: res += "%s " % a except: error() event.reply(res)
kernel.register("info.uptime", info_uptime)