.. _fbf.drivers.console.bot: bot ~~~ .. automodule:: fbf.drivers.console.bot :show-inheritance: :members: :undoc-members: CODE ---- :: # fbf/console/bot.py # # """ console bot. """ .. _fbf.drivers.console.bot_fbf_imports: fbf imports -------------- :: from fbf.lib.datadir import getdatadir from fbf.utils.generic import waitforqueue from fbf.lib.errors import NoSuchCommand, NoInput from fbf.lib.botbase import BotBase from fbf.lib.exit import globalshutdown from fbf.utils.generic import strippedtxt, waitevents from fbf.utils.exception import handle_exception from fbf.lib.eventhandler import mainhandler from .event import ConsoleEvent .. _fbf.drivers.console.bot_basic_imports: basic imports ---------------- :: import time import queue import logging import sys import code import os import readline import atexit import getpass import re import copy .. _fbf.drivers.console.bot_defines_: defines ---------- :: cpy = copy.deepcopy histfilepath = os.path.expanduser(getdatadir() + os.sep + "run" + os.sep + "console-history") .. _fbf.drivers.console.bot_HistoryConsole_class: HistoryConsole class ----------------------- :: class HistoryConsole(code.InteractiveConsole): def __init__(self, locals=None, filename="", histfile=histfilepath): self.fname = histfile code.InteractiveConsole.__init__(self, locals, filename) self.init_history(histfile) def init_history(self, histfile): readline.parse_and_bind("tab: complete") if hasattr(readline, "read_history_file"): try: readline.read_history_file(histfile) except IOError: pass def save_history(self, histfile=None): readline.write_history_file(histfile or self.fname) .. _fbf.drivers.console.bot_the_console: the console -------------- :: console = HistoryConsole() .. _fbf.drivers.console.bot_ConsoleBot_: ConsoleBot ------------- :: class ConsoleBot(BotBase): ERASE_LINE = '\033[2K' BOLD='\033[1m' RED = '\033[91m' YELLOW = '\033[93m' BLUE = '\033[94m' GREEN = '\033[92m' ENDC = '\033[0m' COMMON = '\003[9' def __init__(self, cfg=None, users=None, plugs=None, botname=None, *args, **kwargs): BotBase.__init__(self, cfg, users, plugs, botname, *args, **kwargs) self.type = "console" def startshell(self, connect=True): """ start the console bot. """ self.start(False) while not self.stopped: try: input = console.raw_input("\n> ") if self.stopped: return event = ConsoleEvent() event.parse(self, input, console) event.nooutput = True event.nodispatch = False e = self.put(event) res = e.wait() if res: txt = self.makeresponse(res, dot="
") self.out(e.userhost, txt) mainhandler.handle_one() except IOError: break except NoInput: continue except (KeyboardInterrupt, EOFError): break except Exception as ex: handle_exception() ; break console.save_history() def outnocb(self, printto, txt, *args, **kwargs): assert printto assert txt if not self.cfg.uuid in printto: logging.error("%s is not the owner of this shell . not printing." % printto) ; return txt = self.normalize(txt) self._raw(txt) def _raw(self, txt): """ do raw output to the console. """ logging.info("%s - out - %s" % (self.cfg.name, txt)) sys.stdout.write(txt) sys.stdout.write('\n') def action(self, channel, txt, event=None): txt = self.normalize(txt) self._raw(txt) def notice(self, channel, txt): txt = self.normalize(txt) self._raw(txt) def exit(self, *args, **kwargs): """ called on exit. """ console.save_history() def normalize(self, what): what = strippedtxt(what) what = what.replace("", self.BLUE) what = what.replace("", self.ENDC) what = what.replace("", self.YELLOW) what = what.replace("", self.ENDC) what = what.replace("

", self.GREEN) what = what.replace("

", self.ENDC) what = what.replace("

", self.GREEN) what = what.replace("

", self.ENDC) what = what.replace("<b>", self.BOLD) what = what.replace("</b>", self.ENDC) what = what.replace("
", "\n") what = what.replace("
  • ", "* ") what = what.replace("
  • ", "\n") if what.count(self.ENDC) % 2: what = "%s%s" % (self.ENDC, what) return what