Source code for bot.utils

# bot/utils.py
#
#

""" utils package. """

## IMPORT

from bot import __version__
from bot.defines import *

from queue import Queue, Empty as QueueEmpty
from traceback import format_exc
from collections import deque 
from cgi import escape

import urllib.request, urllib.error, urllib.parse
import urllib.parse
import html.parser
import traceback
import mailbox
import datetime
import optparse
import _thread
import hashlib
import logging
import urllib
import string
import email
import html
import http
import time
import math
import glob
import sys
import os
import re

## SHUTDOWN

[docs]def shutdown(): from bot import kernel logging.warn("shutdown is here !!") try: sys.stdout.flush() ; sys.stdout.close() except: pass for bot in kernel.fleet: bot.exit() kernel.plugs.exit() os._exit(0) ## AGENT
[docs]def useragent(): return 'Mozilla/5.0 (X11; Linux x86_64); BOTJE %s; http://pikacode.com/milla/botje)' % __version__ ## ISTR
[docs]class istr(str): pass ## UNESCAPE
[docs]def unescape(text): return html.parser.HTMLParser().unescape(text) ## SIGNATURES
[docs]def make_signature(data): return str(hashlib.sha1(bytes(str(data), "utf-8")).hexdigest()) ## FILES
[docs]def list_files(*args, **kwargs): path = args[0] res = [] if not path.endswith(os.sep): path += os.sep if "search" in kwargs: path += "*%s*" % kwargs["search"] if "*" not in path: path += "*" for fnn in glob.glob(path): if os.path.isdir(fnn): res.extend(list_files(fnn, **kwargs)) ; continue else: res.append(fnn) return res ## JOINS
[docs]def j(*args): if not args: return todo = list(map(str, filter(None, args))) return os.path.join(*todo)
[docs]def mj(*args): if not args: return todo = list(map(str, filter(None, args))) return os.path.join(*todo).replace(os.sep, ".")
[docs]def dj(*args): if not args: return todo = list(map(str, filter(None, args))) return os.path.join(*todo).replace(os.sep, "_")
[docs]def aj(sep=None, *args): return os.path.abspath(*j(sep, *args)) ## TIME
[docs]def dtime(stamp): return datetime.datetime.fromtimestamp(stamp)
[docs]def ptime(daystr): return datetime.datetime.strptime(daystr, '%Y-%m-%d')
[docs]def tdiff(d1, d2): return datetime.timedelta(d1, d2)
[docs]def rtime(): return str(datetime.datetime.now()).replace(" ", "-=-")
[docs]def hms(): return str(datetime.datetime.today()).split()[1].split(".")[0]
[docs]def day(): return str(datetime.datetime.today()).split()[0]
[docs]def time_string(*args, **kwargs): timestamp = args[0] result = None try: result = str(datetime.datetime.fromtimestamp(stamp)) except: pass return result
[docs]def time_stamp(*args, **kwargs): daystr = args[0].strip() instring = "" for spl in daystr.split(): instring += "%s " % spl for format in dayformats: try: res = datetime.datetime.strptime(instring, format).timestamp() ; return res except ValueError: continue return 0.0
[docs]def short_date(*args, **kwargs): date = args[0] res = [] for d in date.split(): if "," in str(d): continue res.append(d) ddd = None try: ddd = "{:4}-{:#02}-{:#02} {:6}".format(res[2], monthint[res[1]], int(res[0]), res[3]) except (IndexError, KeyError, ValueError): try: ddd = "{:4}-{:#02}-{:#02} {:6}".format(res[3], monthint[res[2]], int(res[1]), res[4]) except (IndexError, KeyError, ValueError): logging.debug("can't parse date %s" % date) return ddd
[docs]def to_time(*args, **kwargs): date = args[0] res = [] for d in date.split(): if "," in str(d): continue res.append(d) ddd = None try: ddd = "{:4}-{:#02}-{:02} {:6}".format(res[4], monthint[res[1]], int(res[2]), res[3]) except (IndexError, KeyError): ddd = "" return ddd ## GETTERS
[docs]def get_opts(*args, **kwargs): input = args[0] from bot import Object result = Object() for opt in input.split(): try: name, value = opt.split("=") ; result[name] = str(value) except ValueError: pass except: error() return result
[docs]def get_knobs(*args, **kwargs): input = args[0] result = [] for opt in input.split(): if opt.startswith("+"): result.append(opt[1:]) return result
[docs]def get_args(*args, **kwargs): input = args[0] result = [] for arg in input.split(): try: name, value = arg.split("=") except ValueError: result.append(arg) except: error() return result ## NAMES
[docs]def get_modname(obj): name = obj.__class__.__module__ return name
[docs]def get_clsname(obj): name = str(obj.__class__) return name.split(" ")[1][1:-2]
[docs]def get_cls(obj): return get_clsname(obj).split(".")[-1] ## STACK
[docs]def get_exception(*args, **kwargs): exctype, excvalue, tb = sys.exc_info() trace = traceback.extract_tb(tb) result = "" for i in trace: fname = i[0] linenr = i[1] func = i[2] plugfile = fname[:-3].split(os.sep) mod = [] for i in plugfile[::-1]: mod.append(i) ownname = '.'.join(mod[::-1]) result += "%s:%s %s | " % (ownname, linenr, func) del trace return "%s%s: %s" % (result, exctype, excvalue)
[docs]def get_plugname(*args, **kwargs): result = "" depth = args[0] loopframe = sys._getframe(depth) if not loopframe: return result res = [] fn = "" frame = None while 1: if depth <= 0: break depth -= 1 try: frame = loopframe.f_back except AttributeError: break if not frame: break fn = frame.f_code.co_filename del loopframe return fn
[docs]def get_frame(search="code"): result = {} frame = sys._getframe(1) search = str(search) for i in dir(frame): if search in i: target = getattr(frame, i) for j in dir(target): result[j] = getattr(target, j) return result
[docs]def get_strace(*args, **kwargs): result = "" depth = args[0] loopframe = sys._getframe(depth) if not loopframe: return result while 1: try: frame = loopframe.f_back except AttributeError: break if not frame: break linenr = frame.f_lineno func = frame.f_code.co_name result += "%s:%s | " % (func, linenr) loopframe = frame del loopframe return result
[docs]def get_how(*args, **kwargs): result = "" depth = args[0] loopframe = sys._getframe(depth) if not loopframe: return result while 1: try: frame = loopframe.f_back except AttributeError: break if not frame: break linenr = frame.f_lineno func = frame.f_code.co_name result = "%s:%s" % (func, linenr) loopframe = frame if depth == 0: return result depth -= 1 del loopframe return result
[docs]def get_func(*args, **kwargs): result = "" depth = args[0] loopframe = sys._getframe(depth) if not loopframe: return result func = None while 1: try: frame = loopframe.f_back except AttributeError: break if not frame: break linenr = frame.f_lineno func = frame.f_code.co_name depth -= 1 if depth <= 0: break del loopframe return func
[docs]def error(*args, **kwargs): msg = get_exception() logging.error("error detected:\n\n%s\n" % msg) return msg ## LOCATING
[docs]def get_source(mod, package): import pkg_resources as p source = os.path.abspath(p.resource_filename(mod, package)) logging.info("source is %s" % source) return source ## RESOLVING
[docs]def resolve_ip(hostname=None, timeout=1.0): oldtimeout = socket.getdefaulttimeout() socket.setdefaulttimeout(timeout) try: ip = socket.gethostbyname(hostname or socket.gethostname()) except socket.timeout: ip = None socket.setdefaulttimeout(oldtimeout) return ip
[docs]def resolve_host(ip=None, timeout=1.0): oldtimeout = socket.getdefaulttimeout() socket.setdefaulttimeout(timeout) try: host = socket.gethostbyaddr(ip or resolve_ip())[0] except socket.timeout: host = None socket.setdefaulttimeout(oldtimeout) return host ## DIRECTORIES
[docs]def touch(fname): try: fd = os.open(fname, os.O_RDONLY | os.O_CREAT) ; os.close(fd) except: error()
[docs]def check_permissions(ddir, dirmask=dirmask, filemask=filemask): uid = os.getuid() gid = os.getgid() try: stat = os.stat(ddir) except OSError: make_dir(ddir) ; stat = os.stat(ddir) if stat.st_uid != uid: os.chown(ddir, uid, gid) if os.path.isfile(ddir): mask = filemask else: mask = dirmask if stat.st_mode != mask: os.chmod(ddir, mask)
[docs]def make_dir(path): target = os.sep for item in path.split(target)[:-1]: target = j(target, item) try: os.mkdir(target) except OSError as ex: logging.debug(ex) ; continue check_permissions(target) return path ## HELPERS
[docs]def stripbadchar(s): return "".join([c for c in s if ord(c) > 31 or c in allowedchars])
[docs]def enc_char(s): result = [] for c in s: if c in allowedchars: result.append(c) else: result.append(enc_name(c)) return "".join(result)
[docs]def enc_needed(s): return [c for c in s if c not in allowedchars]
[docs]def enc_name(input): return str(base64.urlsafe_b64encode(bytes(input, "utf-8")), "utf-8")
[docs]def split_txt(what, l=375): txtlist = [] start = 0 end = l length = len(what) for i in range(int(length/end+1)): starttag = what.find("</", end) if starttag != -1: endword = what.find('>', end) + 1 else: endword = what.find(' ', end) if endword == -1: endword = length res = what[start:endword] if res: txtlist.append(res) start = endword end = start + l return txtlist
[docs]def smooth(a): if type(a) not in basic_types: return get_cls(a) else: return a
[docs]def make_version(name=""): return "%s%s %s -=- ! %s%s" % (YELLOW, name, __version__, time.ctime(time.time()), ENDC)
[docs]def hello(name=""): print(make_version(name) + "\n")
[docs]def list_eggs(filter=""): for f in sys.path: if ".egg" not in f: continue if filter and filter not in f: continue yield f
[docs]def show_eggs(filter="bot"): for egg in list_eggs(filter): logging.warn("%s egg: %s" % (filter, egg))
[docs]def stripped(input): try: return input.split("/")[0] except: return input ## HEADER
headertxt = '''# %s # # this is an bot (#%s) file, %s # # this file can be edited !! ''' ## FEEDER def feed(text): from bot import Object result = [] chunks = text.split("\r\n") for chunk in chunks: obj = Object().feed(chunk) result.append(obj) return result ## PARSER
[docs]def parse_email(fn): from bot import Object f = open(fn ,"r", errors="replace", encoding="utf-8") mails = [] result = [] mess = "" nr = 0 go = True for line in f: if line.startswith("From "): mails.append(mess) ; mess = line ; continue mess += line for mess in mails: m = email.message_from_string(mess) o = Object() o.update(m.items()) o.text = "" for load in m.get_payload(): o.text += str(load) result.append(o) logging.warn("%s emails read" % len(result)) return result[1:] ## STRIPPERS
[docs]def strip_html(text): from bs4 import BeautifulSoup soup = BeautifulSoup(text) result = soup.findAll("text") if len(result): return str(result[0]) return ""
[docs]def strip_wiki(text): text = text.replace("[[", "") text = text.replace("]]", "") text = text.replace("}}", "") text = text.replace("{{", "") text = unescape(text) text = re.sub("<ref .*?/>", "", text) text = re.sub("<ref>.*?</ref>", "", text) text = re.sub("<ref .*?</ref>", "", text) return text ## ENCODING
[docs]def get_encoding(data): if hasattr(data, 'info') and 'content-type' in data.info and 'charset' in data.info['content-type'].lower(): charset = data.info['content-type'].lower().split('charset', 1)[1].strip() if charset[0] == '=': charset = charset[1:].strip() if ';' in charset: return charset.split(';')[0].strip() return charset if '<meta' in data.lower(): metas = re.findall('<meta[^>]+>', data, re.I | re.M) if metas: for meta in metas: test_http_equiv = re.search('http-equiv\s*=\s*[\'"]([^\'"]+)[\'"]', meta, re.I) if test_http_equiv and test_http_equiv.group(1).lower() == 'content-type': test_content = re.search('content\s*=\s*[\'"]([^\'"]+)[\'"]', meta, re.I) if test_content: test_charset = re.search('charset\s*=\s*([^\s\'"]+)', meta, re.I) if test_charset: return test_charset.group(1) if chardet: test = chardet.detect(data) if 'encoding' in test: return test['encoding'] return sys.getdefaultencoding() ## URL RELATED
[docs]def do_url(type, url, myheaders={}, postdata={}, keyfile=None, certfile="", port=80): headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'text/plain; text/html', 'User-Agent': useragent()} headers.update(myheaders) urlparts = urllib.parse.urlparse(url) if "https" in url: connection = http.client.HTTPSConnection(urlparts[1]) # keyfile, certfile) else: connection = http.client.HTTPConnection(urlparts[1]) postdata = urllib.parse.urlencode(postdata) logging.warn('%s %s' % (type, url)) connection.request(type, urlparts[2], postdata, headers) resp = connection.getresponse() logging.warn("status %s (%s)" % (resp.status, resp.reason)) return resp
[docs]def need_redirect(resp): if resp.status == 301: url = resp.getheader("Location") ; return url ## TO/FROM
[docs]def to_enc(what, encoding='utf-8'): if not what: what= "" w = str(what) return w.encode(encoding)
[docs]def from_enc(txt, encoding='utf-8', what=""): if not txt: txt = "" if type(txt) == str: return txt try: return txt.decode(encoding) except UnicodeDecodeError: return decodeperchar(txt, encoding, what) ## PER CHARACTER
[docs]def decode_char(txt, encoding='utf-8', what=""): res = [] ; nogo = [] for i in txt: try: res.append(i.decode(encoding)) except UnicodeDecodeError: if i not in nogo: nogo.append(i) if nogo: logging.warn("nogo: %s" % " ".join(nogo)) return "".join(res) ## OPTIONS
[docs]def make_opts(): parser = optparse.OptionParser(usage='usage: %prog [options]', version=__version__) for option in options: type, default, dest, help = option[2:] if "store" in type: try: parser.add_option(option[0], option[1], action=type, default=default, dest=dest, help=help) except Exception as ex: logging.error("error: %s - option: %s" % (str(ex), option)) ; continue else: try: parser.add_option(option[0], option[1], type=type, default=default, dest=dest, help=help) except Exception as ex: logging.error("error: %s - option: %s" % (str(ex), option)) ; continue args = parser.parse_args() return args ## PARSING
[docs]def parse_url(*args, **kwargs): """ Attribute Index Value Value if not present scheme 0 URL scheme specifier empty string netloc 1 Network location part empty string path 2 Hierarchical path empty string query 3 Query component empty string fragment 4 Fragment identifier empty string """ url = args[0] parsed = urllib.parse.urlsplit(url) target = parsed[2].split("/") if "." in target[-1]: basepath = "/".join(target[:-1]) ; file = target[-1] else: basepath = parsed[2] ; file = None if basepath.endswith("/"): basepath = basepath[:-1] base = urllib.parse.urlunsplit((parsed[0], parsed[1], basepath , "", "")) root = urllib.parse.urlunsplit((parsed[0], parsed[1], "", "", "")) return (basepath, base, root, file)
[docs]def parse_urls(*args, **kwargs): import bs4 url, txt = args basepath, base, root, file = parse_url(url) s = bs4.BeautifulSoup(txt) urls = [] tags = s('a') for tag in tags: href = tag.get("href") if href: href = href.split("#")[0] if not href: continue if not href.endswith(".html"): continue if ".." in href: continue if href.startswith("mailto"): continue if not "http" in href: if href.startswith("/"): href = root + href else: href = base + "/" + href if not root in href: logging.warn("%s not in %s" % (root, href)) ; continue if href not in urls: urls.append(href) logging.warn("found %s urls" % len(urls)) return urls ## GENERICS
[docs]def reduced_keys(*args, **kwargs): inlist = args[0] res = [] for key in inlist: k = str(key) if k.startswith("_"): continue if k.startswith("X"): continue if k.startswith("x"): continue if not k.islower(): continue if "-" in key: continue if k not in res: res.append(key) if k in ["args", "rest", "first"]: continue if k not in res: res.append(k) return res
[docs]def feed(text): from bot import Object result = [] chunks = text.split("\r\n") for chunk in chunks: obj = Object().feed(chunk) result.append(obj) return result
[docs]def dispatch(target, event, cmnd, *args, **kwargs): try: functions = target[cmnd] except KeyError: return False for func in functions: func(event) return event
[docs]def resolve(*args, **kwargs): from bot import kernel event = args[0] event.prepare() e = None e = dispatch(kernel, event, event.ucmnd or event.etype, *args, **kwargs) return e
[docs]def need_skip(obj, black=[], white=[]): needskip = False try: value = obj.get_content_type() except AttributeError: return False if value in black: needskip = True if value not in white: needskip = True return needskip
[docs]def do_objects(*args, **kwargs): event = args[0] done = [] opts = event.opts args = event.args for obj in event.objects(): go = False ; res = "" for opt in opts: try: value = getattr(obj, opt) except AttributeError: continue try: if opts[opt] in str(value): go = True else: go = False ; break except (TypeError, KeyError): go = False if not opts: for arg in args: if arg in obj: go = True if not go: continue done.append(obj) return done
[docs]def format_obj(*args, **kwargs): obj = args[0] args = args[1] res = "" for arg in args: try: v = getattr(obj, arg) except: break v = re.sub("\s+", " ", v) res += "%s%s" % (v, " -=- ") if res: return res[:-5].strip()