Source code for core.bots

# core/bots/__init__.py
#
#

""" bots unit. """

__copyright__ = "Copyright 2015, B.H.J Thate"

## IMPORTS

from core.utils.parse import txt_parse
from core.utils.time import short_date
from core.defines import BLA, ENDC
from core.dispatcher import Dispatcher
from core.kernel import kernel, fleet
from core.thing import Thing

from core.errors import RemoteDisconnect, NotImplemented

import logging
import socket
import time

## BOTS

[docs]class Bot(Dispatcher): """ Basic Bot. """ default = "" cc = "" def __init__(zelf, *args, **kwargs): Dispatcher.__init__(zelf, *args, **kwargs) zelf._start = time.time() zelf.channels = ["#core", ]
[docs] def say(zelf, *args, **kwargs): print(args[1]) ## CHANNELS
[docs] def announce(zelf, *args, **kwargs): """ announce on channels. """ zelf._last = time.time() logging.info("# announce %s" % " ".join(zelf.channels)) for channel in zelf.channels: zelf.say(channel, args[0])
[docs] def event(zelf, *args, **kwargs): event = Thing() event._target = zelf event.origin = "root@zelf" event.txt = args[0] return event
[docs] def join(zelf, *args, **kwargs): pass
[docs] def joining(zelf, *args, **kwargs): """ join channels. """ for channel in zelf.channels: zelf.join(channel) ## TESTBOT
[docs]class TestBot(Bot): """ USE WITH Bot.put() and Bot.read(). """ testing = True cc = "!" default = ""
[docs] def say(zelf, *args, **kwargs): logging.warn(args[1])
[docs] def event(zelf, *args, **kwargs): o = Thing() o.origin = "test@core" o._target = zelf o.loglevel = "warning" o.txt = args[0] o.parsed = txt_parse(zelf, o.txt) return o ## CMNDS
[docs]def join(event): if not event.parsed.args: return channel = event.parsed.args[0] for bot in fleet: if "join" not in bot: continue bot.join(channel) event.reply("join %s (%s)" % (channel, bot.server))
kernel.register("bot.join", join)
[docs]def part(event): if not event.parsed.args: return for bot in fleet: try: bot.part(event.parsed.args[0]) except: continue event.reply("part %s (%s)" % (channel, bot.server))
kernel.register("bot.part", part)
[docs]def nick(event): if not event.parsed.args: return for bot in fleet: try: bot.nick(event.parsed.args[0]) except: continue event.reply("nick %s" % bot.server)
kernel.register("bot.nick", nick)
[docs]def quit(event): if not event.parsed.args: return for bot in fleet: try: bot.quit(event.parsed.rest) except: continue event.reply("quit %s" % bot.server)
kernel.register("bot.quit", quit)