Source code for zot.plugs.info
# zot/plugs/info.py
#
#
""" show version. """
__copyright__ = "Copyright 2015, B.H.J Thate"
## IMPORTS
from zot.utils import to_str, elapsed_days, error
from zot.runtime import kernel, mods, cfg
from zot import __version__, defaults
import threading
import time
import os
## info.version command
[docs]def info_version(event): event.reply("ZOTBOT #%s" % __version__)
kernel.register("info.version", info_version)
## info.uptime command
## info.running command
[docs]def info_running(event): event.reply("threads: %s" % threading.enumerate())
kernel.register("info.running", info_running)
## info.ps command
[docs]def info_ps(event):
data = os.popen("ps -p %s -o pid,tid,class,rtprio,ni,pri,psr,rss,pcpu,stat,wchan:14,comm" % os.getpid()).read().strip()
event.reply(str(data))
kernel.register("info.ps", info_ps)
## info.keys command
[docs]def info_keys(event):
result = []
for obj in kernel.objects():
for key in obj.obj():
if key not in result: result.append(key)
for arg in event.args:
result = filter(lambda x: arg not in x, result)
if result: event.reply(" ".join(result))
kernel.register("info.keys", info_keys)
## info.ls command
[docs]def info_ls(event):
rest = event.rest()
if not rest:
result = set([x.split(".")[-1] for x in kernel.names()])
event.reply(", ".join(sorted(result)))
return
name = ""
keys = kernel.search(rest)
if keys: event.reply(", ".join(keys))
kernel.register("info.ls", info_ls)
## info.mods command
[docs]def info_mods(event):
rest = event.rest()
if not rest:
result = set(mods.names())
event.reply(", ".join(sorted([".".join(x.split(".")[1:]) for x in result])))
return
name = ""
keys = mods.search(rest)
event.reply(", ".join([".".join(x.split(".")[1:]) for x in keys]))
kernel.register("info.mods", info_mods)
## info.cfg command
[docs]def info_cfg(event):
obj = cfg
if event.args:
ctype = event.args[0]
obj = kernel.last("cfg", ctype)
if not obj:
try: obj = getattr(defaults, "cfg_%s" % ctype)
except AttributeError: pass
try:
key = event.args[1]
value = event.args[2]
if key in obj: obj[key] = value ; obj.prefix = "cfg" ; obj.sync()
except IndexError: pass
if obj: event.reply("\n".join(obj.show()))
kernel.register("info.cfg", info_cfg)
## info.kernel command
[docs]def info_kernel(event): event.reply(kernel.pretty())
kernel.register("info.kernel", info_kernel)
## info.where command
[docs]def info_where(event):
name = event.rest()
cmnds = kernel.search(name)
result = []
for name in cmnds:
result.extend([x.plugname for x in kernel.get(name)])
if result: event.reply(", ".join(set(result)))
kernel.register("info.where", info_where)
## HELPERS
[docs]def until(obj):
try: return elapsed_days(int(obj.sleep) - int(time.time() - obj._last))
except: error() ; return ""
[docs]def upt(obj):
try: return elapsed_days(int(time.time()) - int(obj._start))
except: return ""
[docs]def waiting(obj):
if sleeptime(obj): return "%s/%s" % (until(obj), sleeptime(obj))
else: return ""
[docs]def idle(obj):
try: return elapsed_days(int(obj._last - obj._start))
except: return ""
[docs]def since(obj):
try: return elapsed_days(int(time.time()) - int(obj._last))
except: return ""
[docs]def status(obj):
try: return obj._status
except: return ""
[docs]def name(obj):
try: return obj._name
except: return ""
[docs]def running(obj):
try: return obj._running
except: return ""
[docs]def sleeptime(obj):
try: return elapsed_days(obj.sleep)
except: return ""
[docs]def plugname(obj):
try: return obj.plugname
except: return ""
## info.uptime command
funcs = [status, upt, waiting]
[docs]def info_uptime(event):
threads = [x for x in threading.enumerate() if status(x)]
for obj in sorted(threads, key=name):
try: o = obj.func.__self__
except: o = obj
res = "%s: " % name(obj)
for func in funcs:
a = func(o)
if not a: continue
try: res += "%s " % a
except: error()
event.reply(res)
kernel.register("info.uptime", info_uptime)