Source code for kern.bots
# kern/bots/__init__.py
#
#
""" bots unit. """
__copyright__ = "Copyright 2015, B.H.J Thate"
## IMPORTS
from kern.utils.parse import txt_parse
from kern.utils.time import short_date
from kern.defines import BLA, ENDC
from kern.dispatcher import Dispatcher
from kern import kernel, fleet
from kern.thing import Thing
from kern.errors import RemoteDisconnect, NotImplemented
import logging
import socket
import time
## BOTS
[docs]class Bot(Dispatcher):
""" Basic Bot. """
default = ""
cc = ""
[docs] def __init__(zelf, *args, **kwargs):
Dispatcher.__init__(zelf, *args, **kwargs)
zelf._start = time.time()
zelf.channels = ["#kern", ]
[docs] def say(zelf, *args, **kwargs): print(args[1])
## CHANNELS
[docs] def announce(zelf, *args, **kwargs):
""" announce on channels. """
zelf._last = time.time()
logging.info("# announce %s" % " ".join(zelf.channels))
for channel in zelf.channels: zelf.say(channel, args[0])
[docs] def event(zelf, *args, **kwargs):
event = Thing()
event._target = zelf
event.origin = "root@zelf"
event.txt = args[0]
return event
[docs] def join(zelf, *args, **kwargs): pass
[docs] def joining(zelf, *args, **kwargs):
""" join channels. """
for channel in zelf.channels: zelf.join(channel)
## TESTBOT
[docs]class TestBot(Bot):
""" USE WITH Bot.put() and Bot.read(). """
testing = True
cc = "!"
default = ""
[docs] def say(zelf, *args, **kwargs): logging.warn(args[1])
[docs] def event(zelf, *args, **kwargs):
o = Thing()
o.origin = "test@kern"
o._target = zelf
o.loglevel = "warning"
o.txt = args[0]
o.parsed = txt_parse(zelf, o.txt)
return o
## CMNDS
[docs]def join(event):
if not event.parsed.args: return
channel = event.parsed.args[0]
for bot in fleet:
if "join" not in bot: continue
bot.join(channel)
event.reply("join %s (%s)" % (channel, bot.server))
kernel.register("bot.join", join)
[docs]def part(event):
if not event.parsed.args: return
for bot in fleet:
try: bot.part(event.parsed.args[0])
except: continue
event.reply("part %s (%s)" % (channel, bot.server))
kernel.register("bot.part", part)
[docs]def nick(event):
if not event.parsed.args: return
for bot in fleet:
try: bot.nick(event.parsed.args[0])
except: continue
event.reply("nick %s" % bot.server)
kernel.register("bot.nick", nick)
[docs]def quit(event):
if not event.parsed.args: return
for bot in fleet:
try: bot.quit(event.parsed.rest)
except: continue
event.reply("quit %s" % bot.server)
kernel.register("bot.quit", quit)