.. _fbf.drivers.tornado.bot: bot ~~~ .. automodule:: fbf.drivers.tornado.bot :show-inheritance: :members: :undoc-members: CODE ---- :: # fbf/drivers/tornado/bot.py # # """ tornado web bot. """ .. _fbf.drivers.tornado.bot_fbf_imports: fbf imports -------------- :: from fbf.lib.botbase import BotBase from fbf.lib.outputcache import add from fbf.utils.generic import toenc, fromenc, strippedtxt, stripcolor from fbf.utils.url import re_url_match from fbf.utils.timeutils import hourmin from fbf.lib.channelbase import ChannelBase from fbf.imports import getjson, gettornado from fbf.lib.container import Container from fbf.utils.exception import handle_exception from fbf.lib.eventbase import EventBase json = getjson() .. _fbf.drivers.tornado.bot_basic_imports: basic imports ---------------- :: import logging import re import cgi import urllib.request, urllib.parse, urllib.error import time import copy import functools .. _fbf.drivers.tornado.bot_tornado_import: tornado import ----------------- :: tornado = gettornado() import tornado.ioloop import tornado.web .. _fbf.drivers.tornado.bot_defines_: defines ---------- :: cpy = copy.deepcopy .. _fbf.drivers.tornado.bot_WebBot_class: WebBot class --------------- :: class TornadoBot(BotBase): """ TornadoBot just inherits from botbase for now. """ def __init__(self, cfg=None, users=None, plugs=None, botname="tornado-bot", *args, **kwargs): BotBase.__init__(self, cfg, users, plugs, botname, bottype="tornado", *args, **kwargs) assert self.cfg self.cfg.type = "tornado" self.websockets = {} def _raw(self, txt, target=None, how="normal", handler=None, end=""): """ put txt to the client. """ logging.warn("> %s (%s)" % (txt, self.cfg.name)) if not txt: return txt = txt + end if handler: handler.write(txt) else: print(txt) def outnocb(self, channel, txt, how=None, event=None, origin=None, response=None, dotime=False, *args, **kwargs): txt = self.normalize(txt) if event and event.how != "background": logging.info("%s - out - %s" % (self.cfg.name, txt)) if "http://" in txt or "https://" in txt: for item in re_url_match.findall(txt): logging.debug("web - raw - found url - %s" % item) url = '%s' % (item, item, item) try: txt = txt.replace(item, url) except ValueError: logging.error("web - invalid url - %s" % url) if response or (event and event.doweb): self._raw(txt, event.target, event.how, event.handler) else: if event: e = cpy(event) e.txt = txt e.channel = channel e.how = event.how or how else: e = EventBase() e.nick = self.cfg.nick e.userhost = self.cfg.nick + "@" + "bot" e.channel = channel e.txt = txt e.div = "content_div" e.origin = origin e.how = how or "overwrite" e.headlines = True e.update(kwargs) update_web(self, e) self.benice(event) def normalize(self, txt): txt = stripcolor(txt) txt = txt.replace("\n", "
"); txt = txt.replace("<", "<") txt = txt.replace(">", ">") txt = strippedtxt(txt) txt = txt.replace("<br>", "
") txt = txt.replace("<b>", "") txt = txt.replace("</b>", "") txt = txt.replace("<i>", "") txt = txt.replace("</i>", "") txt = txt.replace("<h2>", "

") txt = txt.replace("</h2>", "

") txt = txt.replace("<h3>", "

") txt = txt.replace("</h3>", "

") txt = txt.replace("<li>", "
  • ") txt = txt.replace("</li>", "
  • ") return txt def reconnect(self, *args): return True def update_web(bot, event, end="
    "): out = Container(event.userhost, event.tojson(), how="tornado").tojson() logging.warn("> %s - %s" % (event.userhost, event.txt)) logging.debug("> %s" % out) if event.channel not in bot.websockets: logging.info("no %s in websockets dict" % event.channel) ; return for c in bot.websockets[event.channel]: time.sleep(0.001) try: callback = functools.partial(c.write_message, out) bot.server.io_loop.add_callback(callback) except IOError as ex: logging.error("IOError occured: %s" % str(ex)) ; raise