# botlib/cmnds.py
#
#
""" botlib basic commands. """
from .object import Config, Object
from .space import runtime, users
from .space import kernel as _kernel
from .space import fleet as _fleet
from .utils import day, elapsed, get_day, get_hour, now, to_date, to_day, to_time
from .utils import name, sname, tname
from .compose import compose
from .error import ENODATE
from .clock import Timer
from .db import Db
from .trace import get_exception
import botlib.space
import threading
import _thread
import logging
import mailbox
import time
import json
import sys
import os
format = "%-4s %-18s %-8s %-8s %-15s %-15s"
starttime = time.time()
[docs]class Email(Object):
""" Email object. """
pass
[docs]class Entry(Object):
""" Data Entry class . """
pass
[docs]class Log(Entry):
""" a log entry to log what is happening on the day. """
pass
[docs]class Shop(Entry):
""" a shopping item for the wife to shop. """
pass
[docs]class Todo(Entry):
""" a todo entry """
pass
[docs]class Tomorrow(Entry):
""" a todo for tomorrow to be done. """
pass
[docs]class Watch(Entry):
""" files to watch. """
pass
[docs]def alias(event):
""" key, value alias. """
try:
cmnd, value = event._parsed.rest.split(" ", 1)
except ValueError:
event.reply('alias <cmnd> <aliased>')
return
if value not in _kernel._names:
event.reply("only commands can be aliased.")
return
botlib.space.alias[cmnd] = value
botlib.space.alias.save()
event.ok(cmnd)
[docs]def announce(event):
""" announce text on all channels in fleet. """
_kernel.announce(event._parsed.rest)
[docs]def begin(event):
""" begin stopwatch. """
global starttime
try:
starttime = to_time(now())
except ENODATE:
event.reply("can't detect date from %s" % event.txt)
return
event.reply("time is %s" % time.ctime(starttime))
[docs]def cfg(event):
if not users.allowed(event.origin, "OPER"):
event.reply("you are not allowed to give the cfg command.")
return
args = event._parsed.args
if not args:
event.reply(botlib.space.cfg)
return
name = args[0]
object = Config()
object.fromdisk(name)
try:
key = args[1]
except IndexError:
event.reply(object)
return
if len(args) > 3:
val = args[2:]
elif len(args) == 3:
val = args[2]
else:
event.reply("missing a value.")
return
try:
object[key] = json.loads('"%s"' % val)
except (SyntaxError, ValueError) as ex:
event.reply(str(ex))
return
if key == "user" and "@" in object[key]:
object["username"], object["server"] = object[key].split("@")
object.sync()
event.reply(object)
[docs]def cmnds(event):
""" show list of commands. """
res = sorted(set([cmnd for cmnd, mod in _kernel._names.items() if event._parsed.rest in str(mod)]))
event.reply(res or "no modules loaded.")
[docs]def deleted(event):
""" show deleted records. """
event.nodel = True
nr = 0
db = Db()
for object in db.selected(event):
nr += 1
event.display(object, event._parsed.args, str(nr))
if not nr:
event.reply("no results")
[docs]def dump(event):
""" dump objects matching the given criteria. """
nr = 0
db = Db()
for object in db.selected(event):
nr += 1
event.reply(object.nice())
if not nr:
event.reply("no results")
[docs]def end(event):
""" stop stopwatch. """
diff = time.time() - starttime
if diff:
event.reply("time elapsed is %s" % elapsed(diff))
[docs]def exit(event):
""" stop the bot. """
if not users.allowed(event.origin, "OPER"):
event.reply("you are not allowed to give the exit command. ")
return
_thread.interrupt_main()
[docs]def fetcher(event):
""" fetch all rss feeds. """
clients = runtime.get("RSS", [])
for client in clients:
c = compose(client)
thrs = c.fetcher()
res = _kernel.waiter(thrs)
event.reply("fetched %s" % ",".join([str(x) for x in res]))
[docs]def fleet(event):
nr = 0
for obj in _fleet:
event.reply(format % (nr, obj.id(), obj._state.printable(nokeys=True), "", obj._counter.printable(), obj._error.printable(nokeys=True)))
nr += 1
[docs]def find(event):
""" present a list of objects based on prompt input. """
nr = 0
db = Db()
for object in db.selected(event):
event.display(object, event._parsed.args, str(nr), direct=True)
nr += 1
if not nr:
event.reply("no results")
[docs]def first(event):
""" show the first record matching the given criteria. """
db = Db()
object = db.first(*event._parsed.args)
if object:
event.reply(object.nice())
else:
event.reply("no result")
[docs]def kernel(event):
k = _kernel
uptime = elapsed(int(time.time() - k._time.start))
event.reply(format % (0, "kernel", k._state.printable(nokeys=True), uptime, k._counter.printable(), k._error.printable()))
[docs]def last(event):
""" show last objectect matching the criteria. """
if not event._parsed.args:
event.reply("last <prefix> <value>")
return
db = Db()
object = db.last(*event._parsed.args)
if object:
event.reply(object.nice())
else:
event.reply("no result")
[docs]def log(event):
""" log some text. """
if not event._parsed.rest:
event.reply("log <item>")
return
o = Log()
o.log = event._parsed.rest
o.save()
event.ok(1)
[docs]def loglevel(event):
""" set loglevel. """
from botlib.log import loglevel as _loglevel
level = event._parsed.rest
if not level:
event.reply(botlib.space.cfg.loglevel)
return
botlib.space.loglevel = level
_kernel.sync(os.path.join(botlib.space.cfg.workdir, "runtime", "kernel"))
_loglevel(level)
[docs]def loud(event):
""" disable silent mode of a bot. """
for bot in _fleet:
if event.id() == bot.id():
bot.cfg.silent = False
bot.cfg.sync()
event.reply("silent mode disabled.")
[docs]def license(event):
from botlib import __license__
event.reply(__license__)
[docs]def ls(event):
""" show subdirs in working directory. """
event.reply(" ".join(os.listdir(botlib.space.cfg.workdir)))
[docs]def mbox(event):
if not event._parsed.rest:
event.reply("mbox <path>")
return
fn = os.path.expanduser(event._parsed.args[0])
event.reply("reading from %s" % fn)
nr = 0
if os.path.isdir(fn):
thing = mailbox.Maildir(fn, create=False)
elif os.path.isfile(fn):
thing = mailbox.mbox(fn, create=False)
else:
event.reply("need a mbox or maildir.")
return
try:
thing.lock()
except FileNotFoundError:
pass
for m in thing:
try:
o = Email()
o.update(m.items())
try:
sdate = os.sep.join(to_date(o.Date).split())
except AttributeError:
sdate = None
o.text = ""
for load in m.walk():
if load.get_content_type() == 'text/plain':
o.text += load.get_payload()
o.text = o.text.replace("\\n", "\n")
if sdate:
o.save(stime=sdate)
else:
o.save()
nr += 1
except:
logging.error(get_exception())
if nr:
event.ok(nr)
[docs]def pid(event):
""" show pid of the BOTLIB bot. """
event.reply(str(os.getpid()))
[docs]def ps(event):
""" show running threads. """
res = []
nr = 1
txt = []
k = _kernel
for thr in sorted(k.running(), key=lambda x: tname(x)):
object = Object(vars(thr))
if "start" not in object._time:
continue
if "sleep" in object:
uptime = elapsed(object.sleep - int(time.time() - object._time.latest))
else:
uptime = elapsed(int(time.time() - object._time.start))
thrname = tname(thr)
res = format % (nr, thrname, object._state.printable(nokeys=True), uptime, object._counter.printable(), object._error.printable(nokeys=True))
event.reply(res.rstrip())
nr += 1
[docs]def real_reboot():
""" actual reboot. """
from botlib.utils import reset
_kernel.shutdown()
reset()
os.execl(sys.argv[0], *(sys.argv + ["-r"]))
[docs]def reboot(event):
""" reboot the bot, allowing statefull reboot (keeping connections alive). """
if not botlib.space.cfg.reboot:
event.reply("# reboot is not enabled.")
return
if not users.allowed(event.origin, "OPER"):
event.reply("you are not allowed to give the reboot command. ")
return
event.announce("rebooting")
real_reboot()
[docs]def reload(event):
""" reload a plugin. """
if not users.allowed(event.origin, "OPER"):
event.reply("you are not allowed to give the reload command.")
return
if not event._parsed.rest:
event.reply(",".join([x.split(".")[-1] for x in _kernel.modules()]))
return
for modname in _kernel.modules():
if event._parsed.rest not in modname:
continue
event.reply(_kernel.reload(modname, True))
[docs]def restore(event):
""" set deleted=False in selected records. """
db = Db()
nr = 0
event.nodel = True
for object in db.selected(event):
object.deleted = False
object.sync()
nr += 1
if not nr:
event.reply("no results")
event.ok(nr)
[docs]def rm(event):
""" set deleted flag on objects. """
db = Db()
nr = 0
for object in db.selected(event):
object.deleted = True
object.sync()
nr += 1
event.ok(nr)
[docs]def silent(event):
""" put a bot into silent mode. """
for bot in _fleet:
if event.id() == bot.id():
bot.cfg.silent = True
bot.cfg.sync()
event.reply("silent mode enabled.")
[docs]def shop(event):
""" add a shopitem to the shopping list. """
if not event._parsed.rest:
event.reply("shop <item>")
return
o = Shop()
o.shop = event._parsed.rest
o.save()
event.ok(1)
[docs]def show(event):
import botlib.space
if not event._parsed.rest:
event.reply("choose one of %s" % ",".join(botlib.space.__all__))
return
try:
item, value = event._parsed.rest.split(" ", 1)
except:
item = event._parsed.rest
value = None
if item == "bot":
bot = _fleet.get_bot(event.id())
event.reply(bot)
return
if item == "fleet":
for bot in _fleet:
event.reply(bot.id())
return
obj = getattr(botlib.space, item, None)
if value:
val = getattr(obj, value, None)
event.reply(val)
else:
event.reply(obj)
[docs]def save(event):
""" make a kernel dump. """
botlib.space.alias.sync()
botlib.space.kernel.sync()
botlib.space.seen.sync()
event.reply("saved")
[docs]def start(event):
""" start a plugin. """
if not users.allowed(event.origin, "OPER"):
event.reply("you are not allowed to give the start command. ")
return
modnames = _kernel.modules()
if not event._parsed.rest:
res = set([x.split(".")[-1] for x in modnames])
event.reply(sorted(res))
return
modname = event._parsed.args[0]
name = "botlib.%s" % modname
if name not in modnames:
event.reply("no %s module found." % name)
return
mod = _kernel.reload(name, force=True, event=event)
event.ok(sname(mod).lower())
[docs]def stop(event):
""" stop a plugin. """
if not users.allowed(event.origin, "OPER"):
event.reply("you are not allowed to give the stop command.")
return
if not event._parsed.rest:
event.reply("stop what ?")
return
name = "botlib.%s" % event._parsed.args[0]
try:
mod = _kernel._table[name]
if "stop" in dir(mod):
mod.stop(event)
if "exit" in dir(mod):
mod.exit(event)
if "shutdown" in dir(mod):
mod.shutdown(event)
except (KeyError, ImportError):
event.reply("no %s module found." % name)
return
_kernel.kill(name.upper())
event.reply("stopped %s" % name)
[docs]def synchronize(event):
clients = runtime.get("RSS", [])
for client in clients:
c = compose(client)
seen = c.synchronize()
event.reply("%s urls updated" % len(seen.urls))
[docs]def test(event):
""" echo origin. """
from .utils import stripped
event.reply("hello %s" % stripped(event.origin))
[docs]def timer(event):
""" timer command to schedule a text to be printed on a given time. stopwatch to measure elapsed time. """
if not event._parsed.rest:
event.reply("timer <string with time>")
return
seconds = 0
line = ""
for word in event._parsed.args:
if word.startswith("+"):
try:
seconds = int(word[1:])
except:
event.reply("%s is not an integer" % seconds)
return
else:
line += word + " "
if seconds:
target = time.time() + seconds
else:
try:
target = get_day(event._parsed.rest)
except ENODATE:
try:
target = to_day(day())
except ENODATE:
pass
try:
hour = get_hour(event._parsed.rest)
if hour:
target += hour
except ENODATE:
pass
if not target or time.time() > target:
event.reply("already passed given time.")
return
e = Event(event)
e.services = "clock"
e._prefix = "timer"
e.txt = event._parsed.rest
e.time = target
e.done = False
e.save()
timer = Timer(target - time.time(), e.direct, e.txt)
_kernel.launch(timer.start)
event.ok(time.ctime(target))
[docs]def today(event):
""" show objects logged for today. """
db = Db()
event._parsed.start = to_day(day())
event._parsed.end = time.time()
nr = 0
for object in db.selected(event):
nr += 1
event.display(object, event._parsed.args, str(nr))
if not nr:
event.reply("no results")
[docs]def todo(event):
""" log a todo item. """
if not event._parsed.rest:
event.reply("todo <item>")
return
o = Todo()
o.todo = event._parsed.rest
o.save()
event.ok(1)
[docs]def tomorrow(event):
""" show todo items for tomorrow. """
if not event._parsed.rest:
event.reply("tomorrow <item>")
return
o = Tomorrow()
o.tomorrow = event.txt.replace("tomorrow", "").strip()
o.save()
event.ok(1)
[docs]def uptime(event):
""" show uptime. """
event.reply("uptime is %s" % elapsed(time.time() - botlib._starttime))
[docs]def version(event):
""" show version. """
from botlib import __version__, __txt__
event.reply("BOTLIB #%s - %s" % (__version__, __txt__))
[docs]def watch(event):
if not event._parsed.rest:
event.reply("watch <item>")
return
o = Watch()
o.watch = event._parsed.rest
o.save()
event.ok(1)
[docs]def week(event):
""" show last week's logged objects. """
db = Db()
event._parsed.start = to_day(day()) - 7 * 24 * 60 * 60
event._parsed.end = time.time()
nr = 0
for object in db.selected(event):
nr += 1
event.display(object, event._parsed.args, str(nr))
if not nr:
event.reply("no results")
[docs]def whoami(event):
""" show origin. """
event.reply(event.origin)
[docs]def yesterday(event):
""" show objects added yesterday. """
db = Db()
event._parsed.start = to_day(day()) - 24 * 60 * 60
event._parsed.end = to_day(day())
nr = 0
for object in db.selected(event):
nr += 1
event.display(object, event._parsed.args, str(nr))
if not nr:
event.reply("no results")