# bot/utils.py
#
#
""" utils package. """
## IMPORT
from bot import __version__
from bot.defines import *
from queue import Queue, Empty as QueueEmpty
from traceback import format_exc
from collections import deque
from cgi import escape
import urllib.request, urllib.error, urllib.parse
import urllib.parse
import html.parser
import traceback
import mailbox
import datetime
import optparse
import _thread
import hashlib
import logging
import urllib
import string
import email
import html
import http
import time
import math
import glob
import sys
import os
import re
## SHUTDOWN
[docs]def shutdown():
from bot import kernel
logging.warn("shutdown is here !!")
try: sys.stdout.flush() ; sys.stdout.close()
except: pass
for bot in kernel.fleet: bot.exit()
kernel.plugs.exit()
os._exit(0)
## AGENT
[docs]def useragent(): return 'Mozilla/5.0 (X11; Linux x86_64); BOTJE %s; http://pikacode.com/milla/botje)' % __version__
## ISTR
[docs]class istr(str): pass
## UNESCAPE
[docs]def unescape(text): return html.parser.HTMLParser().unescape(text)
## SIGNATURES
[docs]def make_signature(data): return str(hashlib.sha1(bytes(str(data), "utf-8")).hexdigest())
## FILES
[docs]def list_files(*args, **kwargs):
path = args[0]
res = []
if not path.endswith(os.sep): path += os.sep
if "search" in kwargs: path += "*%s*" % kwargs["search"]
if "*" not in path: path += "*"
for fnn in glob.glob(path):
if os.path.isdir(fnn): res.extend(list_files(fnn, **kwargs)) ; continue
else: res.append(fnn)
return res
## JOINS
[docs]def j(*args):
if not args: return
todo = list(map(str, filter(None, args)))
return os.path.join(*todo)
[docs]def mj(*args):
if not args: return
todo = list(map(str, filter(None, args)))
return os.path.join(*todo).replace(os.sep, ".")
[docs]def dj(*args):
if not args: return
todo = list(map(str, filter(None, args)))
return os.path.join(*todo).replace(os.sep, "_")
[docs]def aj(sep=None, *args): return os.path.abspath(*j(sep, *args))
## TIME
[docs]def dtime(stamp): return datetime.datetime.fromtimestamp(stamp)
[docs]def ptime(daystr): return datetime.datetime.strptime(daystr, '%Y-%m-%d')
[docs]def tdiff(d1, d2): return datetime.timedelta(d1, d2)
[docs]def rtime(): return str(datetime.datetime.now()).replace(" ", "-=-")
[docs]def hms(): return str(datetime.datetime.today()).split()[1].split(".")[0]
[docs]def day(): return str(datetime.datetime.today()).split()[0]
[docs]def time_string(*args, **kwargs):
timestamp = args[0]
result = None
try: result = str(datetime.datetime.fromtimestamp(stamp))
except: pass
return result
[docs]def time_stamp(*args, **kwargs):
daystr = args[0].strip()
instring = ""
for spl in daystr.split():
instring += "%s " % spl
for format in dayformats:
try: res = datetime.datetime.strptime(instring, format).timestamp() ; return res
except ValueError: continue
return 0.0
[docs]def short_date(*args, **kwargs):
date = args[0]
res = []
for d in date.split():
if "," in str(d): continue
res.append(d)
ddd = None
try: ddd = "{:4}-{:#02}-{:#02} {:6}".format(res[2], monthint[res[1]], int(res[0]), res[3])
except (IndexError, KeyError, ValueError):
try: ddd = "{:4}-{:#02}-{:#02} {:6}".format(res[3], monthint[res[2]], int(res[1]), res[4])
except (IndexError, KeyError, ValueError): logging.debug("can't parse date %s" % date)
return ddd
[docs]def to_time(*args, **kwargs):
date = args[0]
res = []
for d in date.split():
if "," in str(d): continue
res.append(d)
ddd = None
try: ddd = "{:4}-{:#02}-{:02} {:6}".format(res[4], monthint[res[1]], int(res[2]), res[3])
except (IndexError, KeyError): ddd = ""
return ddd
## GETTERS
[docs]def get_opts(*args, **kwargs):
input = args[0]
from bot import Object
result = Object()
for opt in input.split():
try: name, value = opt.split("=") ; result[name] = str(value)
except ValueError: pass
except: error()
return result
[docs]def get_knobs(*args, **kwargs):
input = args[0]
result = []
for opt in input.split():
if opt.startswith("+"): result.append(opt[1:])
return result
[docs]def get_args(*args, **kwargs):
input = args[0]
result = []
for arg in input.split():
try: name, value = arg.split("=")
except ValueError: result.append(arg)
except: error()
return result
## NAMES
[docs]def get_modname(obj):
name = obj.__class__.__module__
return name
[docs]def get_clsname(obj):
name = str(obj.__class__)
return name.split(" ")[1][1:-2]
[docs]def get_cls(obj): return get_clsname(obj).split(".")[-1]
## STACK
[docs]def get_exception(*args, **kwargs):
exctype, excvalue, tb = sys.exc_info()
trace = traceback.extract_tb(tb)
result = ""
for i in trace:
fname = i[0]
linenr = i[1]
func = i[2]
plugfile = fname[:-3].split(os.sep)
mod = []
for i in plugfile[::-1]: mod.append(i)
ownname = '.'.join(mod[::-1])
result += "%s:%s %s | " % (ownname, linenr, func)
del trace
return "%s%s: %s" % (result, exctype, excvalue)
[docs]def get_plugname(*args, **kwargs):
result = ""
depth = args[0]
loopframe = sys._getframe(depth)
if not loopframe: return result
res = []
fn = ""
frame = None
while 1:
if depth <= 0: break
depth -= 1
try: frame = loopframe.f_back
except AttributeError: break
if not frame: break
fn = frame.f_code.co_filename
del loopframe
return fn
[docs]def get_frame(search="code"):
result = {}
frame = sys._getframe(1)
search = str(search)
for i in dir(frame):
if search in i:
target = getattr(frame, i)
for j in dir(target):
result[j] = getattr(target, j)
return result
[docs]def get_strace(*args, **kwargs):
result = ""
depth = args[0]
loopframe = sys._getframe(depth)
if not loopframe: return result
while 1:
try: frame = loopframe.f_back
except AttributeError: break
if not frame: break
linenr = frame.f_lineno
func = frame.f_code.co_name
result += "%s:%s | " % (func, linenr)
loopframe = frame
del loopframe
return result
[docs]def get_how(*args, **kwargs):
result = ""
depth = args[0]
loopframe = sys._getframe(depth)
if not loopframe: return result
while 1:
try: frame = loopframe.f_back
except AttributeError: break
if not frame: break
linenr = frame.f_lineno
func = frame.f_code.co_name
result = "%s:%s" % (func, linenr)
loopframe = frame
if depth == 0: return result
depth -= 1
del loopframe
return result
[docs]def get_func(*args, **kwargs):
result = ""
depth = args[0]
loopframe = sys._getframe(depth)
if not loopframe: return result
func = None
while 1:
try: frame = loopframe.f_back
except AttributeError: break
if not frame: break
linenr = frame.f_lineno
func = frame.f_code.co_name
depth -= 1
if depth <= 0: break
del loopframe
return func
[docs]def error(*args, **kwargs):
msg = get_exception()
logging.error("error detected:\n\n%s\n" % msg)
return msg
## LOCATING
[docs]def get_source(mod, package):
import pkg_resources as p
source = os.path.abspath(p.resource_filename(mod, package))
logging.info("source is %s" % source)
return source
## RESOLVING
[docs]def resolve_ip(hostname=None, timeout=1.0):
oldtimeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(timeout)
try: ip = socket.gethostbyname(hostname or socket.gethostname())
except socket.timeout: ip = None
socket.setdefaulttimeout(oldtimeout)
return ip
[docs]def resolve_host(ip=None, timeout=1.0):
oldtimeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(timeout)
try: host = socket.gethostbyaddr(ip or resolve_ip())[0]
except socket.timeout: host = None
socket.setdefaulttimeout(oldtimeout)
return host
## DIRECTORIES
[docs]def touch(fname):
try: fd = os.open(fname, os.O_RDONLY | os.O_CREAT) ; os.close(fd)
except: error()
[docs]def check_permissions(ddir, dirmask=dirmask, filemask=filemask):
uid = os.getuid()
gid = os.getgid()
try: stat = os.stat(ddir)
except OSError: make_dir(ddir) ; stat = os.stat(ddir)
if stat.st_uid != uid: os.chown(ddir, uid, gid)
if os.path.isfile(ddir): mask = filemask
else: mask = dirmask
if stat.st_mode != mask: os.chmod(ddir, mask)
[docs]def make_dir(path):
target = os.sep
for item in path.split(target)[:-1]:
target = j(target, item)
try: os.mkdir(target)
except OSError as ex: logging.debug(ex) ; continue
check_permissions(target)
return path
## HELPERS
[docs]def stripbadchar(s): return "".join([c for c in s if ord(c) > 31 or c in allowedchars])
[docs]def enc_char(s):
result = []
for c in s:
if c in allowedchars: result.append(c)
else: result.append(enc_name(c))
return "".join(result)
[docs]def enc_needed(s): return [c for c in s if c not in allowedchars]
[docs]def enc_name(input): return str(base64.urlsafe_b64encode(bytes(input, "utf-8")), "utf-8")
[docs]def split_txt(what, l=375):
txtlist = []
start = 0
end = l
length = len(what)
for i in range(int(length/end+1)):
starttag = what.find("</", end)
if starttag != -1: endword = what.find('>', end) + 1
else:
endword = what.find(' ', end)
if endword == -1: endword = length
res = what[start:endword]
if res: txtlist.append(res)
start = endword
end = start + l
return txtlist
[docs]def smooth(a):
if type(a) not in basic_types: return get_cls(a)
else: return a
[docs]def make_version(name=""): return "%s%s %s -=- ! %s%s" % (YELLOW, name, __version__, time.ctime(time.time()), ENDC)
[docs]def hello(name=""): print(make_version(name) + "\n")
[docs]def list_eggs(filter=""):
for f in sys.path:
if ".egg" not in f: continue
if filter and filter not in f: continue
yield f
[docs]def show_eggs(filter="bot"):
for egg in list_eggs(filter): logging.warn("%s egg: %s" % (filter, egg))
[docs]def stripped(input):
try: return input.split("/")[0]
except: return input
## HEADER
headertxt = '''# %s
#
# this is an bot (#%s) file, %s
#
# this file can be edited !!
'''
## FEEDER
def feed(text):
from bot import Object
result = []
chunks = text.split("\r\n")
for chunk in chunks:
obj = Object().feed(chunk)
result.append(obj)
return result
## PARSER
[docs]def parse_email(fn):
from bot import Object
f = open(fn ,"r", errors="replace", encoding="utf-8")
mails = []
result = []
mess = ""
nr = 0
go = True
for line in f:
if line.startswith("From "): mails.append(mess) ; mess = line ; continue
mess += line
for mess in mails:
m = email.message_from_string(mess)
o = Object()
o.update(m.items())
o.text = ""
for load in m.get_payload(): o.text += str(load)
result.append(o)
logging.warn("%s emails read" % len(result))
return result[1:]
## STRIPPERS
[docs]def strip_html(text):
from bs4 import BeautifulSoup
soup = BeautifulSoup(text)
result = soup.findAll("text")
if len(result): return str(result[0])
return ""
[docs]def strip_wiki(text):
text = text.replace("[[", "")
text = text.replace("]]", "")
text = text.replace("}}", "")
text = text.replace("{{", "")
text = unescape(text)
text = re.sub("<ref .*?/>", "", text)
text = re.sub("<ref>.*?</ref>", "", text)
text = re.sub("<ref .*?</ref>", "", text)
return text
## ENCODING
[docs]def get_encoding(data):
if hasattr(data, 'info') and 'content-type' in data.info and 'charset' in data.info['content-type'].lower():
charset = data.info['content-type'].lower().split('charset', 1)[1].strip()
if charset[0] == '=':
charset = charset[1:].strip()
if ';' in charset: return charset.split(';')[0].strip()
return charset
if '<meta' in data.lower():
metas = re.findall('<meta[^>]+>', data, re.I | re.M)
if metas:
for meta in metas:
test_http_equiv = re.search('http-equiv\s*=\s*[\'"]([^\'"]+)[\'"]', meta, re.I)
if test_http_equiv and test_http_equiv.group(1).lower() == 'content-type':
test_content = re.search('content\s*=\s*[\'"]([^\'"]+)[\'"]', meta, re.I)
if test_content:
test_charset = re.search('charset\s*=\s*([^\s\'"]+)', meta, re.I)
if test_charset: return test_charset.group(1)
if chardet:
test = chardet.detect(data)
if 'encoding' in test: return test['encoding']
return sys.getdefaultencoding()
## URL RELATED
[docs]def do_url(type, url, myheaders={}, postdata={}, keyfile=None, certfile="", port=80):
headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'text/plain; text/html', 'User-Agent': useragent()}
headers.update(myheaders)
urlparts = urllib.parse.urlparse(url)
if "https" in url: connection = http.client.HTTPSConnection(urlparts[1]) # keyfile, certfile)
else: connection = http.client.HTTPConnection(urlparts[1])
postdata = urllib.parse.urlencode(postdata)
logging.warn('%s %s' % (type, url))
connection.request(type, urlparts[2], postdata, headers)
resp = connection.getresponse()
logging.warn("status %s (%s)" % (resp.status, resp.reason))
return resp
[docs]def need_redirect(resp):
if resp.status == 301: url = resp.getheader("Location") ; return url
## TO/FROM
[docs]def to_enc(what, encoding='utf-8'):
if not what: what= ""
w = str(what)
return w.encode(encoding)
[docs]def from_enc(txt, encoding='utf-8', what=""):
if not txt: txt = ""
if type(txt) == str: return txt
try: return txt.decode(encoding)
except UnicodeDecodeError: return decodeperchar(txt, encoding, what)
## PER CHARACTER
[docs]def decode_char(txt, encoding='utf-8', what=""):
res = [] ; nogo = []
for i in txt:
try: res.append(i.decode(encoding))
except UnicodeDecodeError:
if i not in nogo: nogo.append(i)
if nogo: logging.warn("nogo: %s" % " ".join(nogo))
return "".join(res)
## OPTIONS
[docs]def make_opts():
parser = optparse.OptionParser(usage='usage: %prog [options]', version=__version__)
for option in options:
type, default, dest, help = option[2:]
if "store" in type:
try: parser.add_option(option[0], option[1], action=type, default=default, dest=dest, help=help)
except Exception as ex: logging.error("error: %s - option: %s" % (str(ex), option)) ; continue
else:
try: parser.add_option(option[0], option[1], type=type, default=default, dest=dest, help=help)
except Exception as ex: logging.error("error: %s - option: %s" % (str(ex), option)) ; continue
args = parser.parse_args()
return args
## PARSING
[docs]def parse_url(*args, **kwargs):
"""
Attribute Index Value Value if not present
scheme 0 URL scheme specifier empty string
netloc 1 Network location part empty string
path 2 Hierarchical path empty string
query 3 Query component empty string
fragment 4 Fragment identifier empty string
"""
url = args[0]
parsed = urllib.parse.urlsplit(url)
target = parsed[2].split("/")
if "." in target[-1]: basepath = "/".join(target[:-1]) ; file = target[-1]
else: basepath = parsed[2] ; file = None
if basepath.endswith("/"): basepath = basepath[:-1]
base = urllib.parse.urlunsplit((parsed[0], parsed[1], basepath , "", ""))
root = urllib.parse.urlunsplit((parsed[0], parsed[1], "", "", ""))
return (basepath, base, root, file)
[docs]def parse_urls(*args, **kwargs):
import bs4
url, txt = args
basepath, base, root, file = parse_url(url)
s = bs4.BeautifulSoup(txt)
urls = []
tags = s('a')
for tag in tags:
href = tag.get("href")
if href:
href = href.split("#")[0]
if not href: continue
if not href.endswith(".html"): continue
if ".." in href: continue
if href.startswith("mailto"): continue
if not "http" in href:
if href.startswith("/"): href = root + href
else: href = base + "/" + href
if not root in href: logging.warn("%s not in %s" % (root, href)) ; continue
if href not in urls: urls.append(href)
logging.warn("found %s urls" % len(urls))
return urls
## GENERICS
[docs]def reduced_keys(*args, **kwargs):
inlist = args[0]
res = []
for key in inlist:
k = str(key)
if k.startswith("_"): continue
if k.startswith("X"): continue
if k.startswith("x"): continue
if not k.islower(): continue
if "-" in key: continue
if k not in res: res.append(key)
if k in ["args", "rest", "first"]: continue
if k not in res: res.append(k)
return res
[docs]def feed(text):
from bot import Object
result = []
chunks = text.split("\r\n")
for chunk in chunks:
obj = Object().feed(chunk)
result.append(obj)
return result
[docs]def dispatch(target, event, cmnd, *args, **kwargs):
try: functions = target[cmnd]
except KeyError: return False
for func in functions: func(event)
return event
[docs]def resolve(*args, **kwargs):
from bot import kernel
event = args[0]
event.prepare()
e = None
e = dispatch(kernel, event, event.ucmnd or event.etype, *args, **kwargs)
return e
[docs]def need_skip(obj, black=[], white=[]):
needskip = False
try: value = obj.get_content_type()
except AttributeError: return False
if value in black: needskip = True
if value not in white: needskip = True
return needskip
[docs]def do_objects(*args, **kwargs):
event = args[0]
done = []
opts = event.opts
args = event.args
for obj in event.objects():
go = False ; res = ""
for opt in opts:
try: value = getattr(obj, opt)
except AttributeError: continue
try:
if opts[opt] in str(value): go = True
else: go = False ; break
except (TypeError, KeyError): go = False
if not opts:
for arg in args:
if arg in obj: go = True
if not go: continue
done.append(obj)
return done