Source code for kern.bots

# kern/bots/__init__.py
#
#

""" bots unit. """

__copyright__ = "Copyright 2015, B.H.J Thate"

## IMPORTS

from kern.utils.parse import txt_parse
from kern.utils.time import short_date
from kern.defines import BLA, ENDC
from kern.dispatcher import Dispatcher
from kern import kernel, fleet
from kern.thing import Thing

from kern.errors import RemoteDisconnect, NotImplemented

import logging
import socket
import time

## BOTS

[docs]class Bot(Dispatcher): """ Basic Bot. """ default = "" cc = ""
[docs] def __init__(zelf, *args, **kwargs): Dispatcher.__init__(zelf, *args, **kwargs) zelf._start = time.time() zelf.channels = ["#kern", ]
[docs] def say(zelf, *args, **kwargs): print(args[1]) ## CHANNELS
[docs] def announce(zelf, *args, **kwargs): """ announce on channels. """ zelf._last = time.time() logging.info("# announce %s" % " ".join(zelf.channels)) for channel in zelf.channels: zelf.say(channel, args[0])
[docs] def event(zelf, *args, **kwargs): event = Thing() event._target = zelf event.origin = "root@zelf" event.txt = args[0] return event
[docs] def join(zelf, *args, **kwargs): pass
[docs] def joining(zelf, *args, **kwargs): """ join channels. """ for channel in zelf.channels: zelf.join(channel) ## TESTBOT
[docs]class TestBot(Bot): """ USE WITH Bot.put() and Bot.read(). """ testing = True cc = "!" default = ""
[docs] def say(zelf, *args, **kwargs): logging.warn(args[1])
[docs] def event(zelf, *args, **kwargs): o = Thing() o.origin = "test@kern" o._target = zelf o.loglevel = "warning" o.txt = args[0] o.parsed = txt_parse(zelf, o.txt) return o ## CMNDS
[docs]def join(event): if not event.parsed.args: return channel = event.parsed.args[0] for bot in fleet: if "join" not in bot: continue bot.join(channel) event.reply("join %s (%s)" % (channel, bot.server))
kernel.register("bot.join", join)
[docs]def part(event): if not event.parsed.args: return for bot in fleet: try: bot.part(event.parsed.args[0]) except: continue event.reply("part %s (%s)" % (channel, bot.server))
kernel.register("bot.part", part)
[docs]def nick(event): if not event.parsed.args: return for bot in fleet: try: bot.nick(event.parsed.args[0]) except: continue event.reply("nick %s" % bot.server)
kernel.register("bot.nick", nick)
[docs]def quit(event): if not event.parsed.args: return for bot in fleet: try: bot.quit(event.parsed.rest) except: continue event.reply("quit %s" % bot.server)
kernel.register("bot.quit", quit)