# core/utils.py
#
#
""" utils package. """
__copyright__ = "Copyright 2014 B.H.J Thate"
## IMPORT
from core import __version__
from core.defines import *
from queue import Queue, Empty as QueueEmpty
from traceback import format_exc
from collections import deque
from cgi import escape
import urllib.request, urllib.error, urllib.parse
import urllib.parse
import html.parser
import traceback
import mailbox
import datetime
import optparse
import _thread
import hashlib
import logging
import urllib
import string
import email
import html
import types
import http
import json
import time
import math
import glob
import sys
import os
import re
## start_new_thread alias
run_thr = _thread.start_new_thread
## AGENT
[docs]def useragent(): return 'Mozilla/5.0 (X11; Linux x86_64); CORELIB %s; http://pikacode.com/bthate/corelib)' % __version__
## ISTR
[docs]class istr(str): pass
## UNESCAPE
[docs]def unescape(text): return html.parser.HTMLParser().unescape(text)
## txt_parse function
[docs]def txt_parse(txt):
from core import Object
o = Object()
o.args = []
o.wanted = Object()
o.not_wanted = Object()
o.switch = Object()
if not txt: return o
for word in txt.split():
try:
key, value = word.split("=")
op = key[-1]
post = value[-1]
last = word[-1]
if post == "-": value = value[:-1]
if word == "!": key = key[:-1] ; o.switch[key] = value
#if post == "-": o.not_wanted[key] = value
o.wanted[key] = value
if post == "-" : continue
o.args.append(key)
except ValueError: o.args.append(word)
return o
## SETS
[docs]def unique(a): return list(set(a))
[docs]def intersect(a, b): return list(set(a) & set(b))
[docs]def union(a, b): return list(set(a) | set(b))
## STATE/STATUS
[docs]def get_status(obj, type=""):
from core import Object
o = Object()
for key in obj._status: o[key] = obj._status[key]
return o
[docs]def get_state(obj, type=""):
from core import Object
o = Object()
for key in obj._state:
val = obj._state[key]
try: val = short_date(time.ctime(val))
except: val = None
if val: o[key] = val
else: o[key] = obj._state[key]
return o
## UPTIME
[docs]def get_uptime(obj):
res = []
target = obj._state
if "boot" in target: res.append(str_day(time.time() - float(target.boot)))
if" output" in target: res.append(str_day(time.time() - float(target.output)))
if "sleep" in target: res.append(str_day(float(target.sleep) - (time.time() - float(target.boot)) % float(target.sleep)))
return "/".join(res)
## SIGNATURES
[docs]def make_signature(data): return str(hashlib.sha1(bytes(str(data), "utf-8")).hexdigest())
[docs]def verify_signature(data, signature):
from core import Object
fromdisk = json.loads(data)
signature2 = make_signature(fromdisk["data"])
return signature2 == signature
## FILES
[docs]def list_files(*args, **kwargs):
path = args[0]
res = []
if not path.endswith(os.sep): path += os.sep
if "search" in kwargs: path += "*%s*" % kwargs["search"]
if "*" not in path: path += "*"
for fnn in glob.glob(path):
if os.path.isdir(fnn): res.extend(list_files(fnn, **kwargs)) ; continue
else: res.append(fnn)
return res
## JOINS
[docs]def j(*args):
if not args: return
todo = list(map(str, filter(None, args)))
return os.path.join(*todo)
[docs]def mj(*args):
if not args: return
todo = list(map(str, filter(None, args)))
return os.path.join(*todo).replace(os.sep, ".")
[docs]def dj(*args):
if not args: return
todo = list(map(str, filter(None, args)))
return os.path.join(*todo).replace(os.sep, "_")
[docs]def aj(sep=None, *args): return os.path.abspath(*j(sep, *args))
## TIME
[docs]def dtime(stamp): return datetime.datetime.fromtimestamp(stamp)
[docs]def ptime(daystr): return datetime.datetime.strptime(daystr, '%Y-%m-%d')
[docs]def tdiff(d1, d2): return datetime.timedelta(d1, d2)
[docs]def rtime(): return str(datetime.datetime.now()).replace(" ", os.sep).replace(":", "_")
[docs]def ftime(datestr): return str(datestr.replace(" ", os.sep).replace(":", "_"))
[docs]def hms(): return str(datetime.datetime.today()).split()[1].split(".")[0]
[docs]def day(): return str(datetime.datetime.today()).split()[0]
[docs]def time_string(*args, **kwargs):
timestamp = args[0]
result = None
try: result = str(datetime.datetime.fromtimestamp(timestamp))
except: error()
return result
[docs]def time_time(*args, **kwargs):
stamp = args[0]
time_str = time_string(stamp)
return time_str
[docs]def make_time(daystr): return time.mktime(time.strptime(daystr, "%a %b %d %H:%M:%S %Y"))
[docs]def a_time(daystr):
if daystr: return time.mktime(time.strptime(daystr, "%Y-%m-%d %H:%M:%S"))
[docs]def b_time(daystr):
if "saved" in obj: return a_time(obj.saved)
return 0.0
[docs]def short_date(*args, **kwargs):
date = args[0]
if not date: return None
res = date.split()
# Mon, 25 Oct 2010 18:05:33 -0700 (PDT)
# ['13', 'Oct', '2012', '20:43:46', '+0300']
ddd = ""
try:
if "+" in res[3]: raise ValueError
if "-" in res[3]: raise ValueError
int(res[3])
ddd = "{:4}-{:#02}-{:#02} {:6}".format(res[3], monthint[res[2]], int(res[1]), res[4])
except (IndexError, KeyError, ValueError):
try:
if "+" in res[4]: raise ValueError
if "-" in res[4]: raise ValueError
int(res[4])
ddd = "{:4}-{:#02}-{:02} {:6}".format(res[4], monthint[res[1]], int(res[2]), res[3])
except (IndexError, KeyError, ValueError):
try: ddd = "{:4}-{:#02}-{:02} {:6}".format(res[2], monthint[res[1]], int(res[0]), res[3])
except (IndexError, KeyError):
try: ddd = "{:4}-{:#02}-{:02}".format(res[2], monthint[res[1]], int(res[0]))
except (IndexError, KeyError): ddd = ""
return ddd
[docs]def short_time(*args, **kwargs):
date = args[0]
if not date: return None
res = date.split()
ddd = ""
try:
if "+" in res[3]: raise ValueError
if "-" in res[3]: raise ValueError
int(res[3])
ddd = "{:6}".format(res[4])
except (IndexError, KeyError, ValueError):
try:
if "+" in res[4]: raise ValueError
if "-" in res[4]: raise ValueError
int(res[4])
ddd = "{:6}".format(res[3])
except (IndexError, KeyError, ValueError):
try: ddd = "{:6}".format(res[3])
except (IndexError, KeyError): pass
return ddd
## NAMES
[docs]def get_funcname(func):
try: name = str(func).split()[1]
except (AttributeError, ValueError, IndexError): name = None
return name
[docs]def get_clsname(obj):
try: name = str(obj.__class__).split(".")[-1][:-2]
except (AttributeError, ValueError, IndexError):
try: name = obj.__class__.__module__
except AttributeError: name = None
return name
[docs]def get_how(func):
s = str(func)
try:
pre, post = s.split(" of ", 1)
txt = pre.split()[2]
except ValueError:
try:
pre, post = s.split(" from ", )
txt = " ".join(pre.split()[1:])
except ValueError:
try:
pre, post = s.split(" at ", 1)
txt = " ".join(pre.split()[1:])
except ValueError: txt = ""
return txt
[docs]def get_name(obj): return str(type(obj)).split(" ")[1][1:-2]
## HIGHEST
[docs]def get_highest(target, file_name):
""" determine new file extension. """
highest = 0
for i in os.listdir(target):
if file_name in i:
try: seqnr = i.split('.')[-1]
except IndexError: continue
try:
if int(seqnr) > highest: highest = int(seqnr)
except ValueError: pass
return file_name + '.' + str(highest + 1)
## STACK
[docs]def get_exception(*args, **kwargs):
exctype, excvalue, tb = sys.exc_info()
trace = traceback.extract_tb(tb)
result = ""
for i in trace:
fname = i[0]
linenr = i[1]
func = i[2]
plugfile = fname[:-3].split(os.sep)
mod = []
for i in plugfile[::-1]: mod.append(i)
ownname = '.'.join(mod[::-1])
result += "%s:%s %s | " % (ownname, linenr, func)
del trace
return "%s%s: %s" % (result, exctype, excvalue)
[docs]def get_plugname(*args, **kwargs):
result = ""
try: depth = args[0]
except IndexError: depth = 1
loopframe = sys._getframe(depth)
if not loopframe: return result
res = []
fn = ""
frame = None
while 1:
if depth <= 0: break
depth -= 1
try: frame = loopframe.f_back
except AttributeError: break
if not frame: break
fn = frame.f_code.co_filename
loopframe = frame
del loopframe
return fn
[docs]def get_frame(search="code"):
result = {}
frame = sys._getframe(1)
search = str(search)
for i in dir(frame):
if search in i:
target = getattr(frame, i)
for j in dir(target):
result[j] = getattr(target, j)
return result
[docs]def get_strace(*args, **kwargs):
result = ""
try: depth = args[0]
except IndexError: depth = 1
loopframe = sys._getframe(depth)
if not loopframe: return result
while 1:
try: frame = loopframe.f_back
except AttributeError: break
if not frame: break
linenr = frame.f_lineno
func = frame.f_code.co_name
result += "%s:%s | " % (func, linenr)
loopframe = frame
del loopframe
return result
[docs]def get_trace(*args, **kwargs):
result = ""
depth = args[0]
loopframe = sys._getframe(depth)
if not loopframe: return result
while 1:
try: frame = loopframe.f_back
except AttributeError: break
if not frame: break
linenr = frame.f_lineno
func = frame.f_code.co_name
result = "%s:%s" % (func, linenr)
loopframe = frame
if depth == 0: return result
depth -= 1
del loopframe
return result
[docs]def get_func(*args, **kwargs):
result = ""
depth = args[0]
loopframe = sys._getframe(depth)
if not loopframe: return result
func = None
while 1:
try: frame = loopframe.f_back
except AttributeError: break
if not frame: break
linenr = frame.f_lineno
func = frame.f_code.co_name
depth -= 1
if depth <= 0: break
del loopframe
return func
## ERROR
[docs]def error(*args, **kwargs): msg = get_exception() ; logging.error(msg) ; return msg
## LOCATING
[docs]def get_source(mod, package):
import pkg_resources as p
source = os.path.abspath(p.resource_filename(mod, package))
logging.warn("source %s" % source)
return source
## RESOLVING
[docs]def resolve_ip(hostname=None, timeout=1.0):
oldtimeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(timeout)
try: ip = socket.gethostbyname(hostname or socket.gethostname())
except socket.timeout: ip = None
socket.setdefaulttimeout(oldtimeout)
return ip
[docs]def resolve_host(ip=None, timeout=1.0):
oldtimeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(timeout)
try: host = socket.gethostbyaddr(ip or resolve_ip())[0]
except socket.timeout: host = None
socket.setdefaulttimeout(oldtimeout)
return host
## DIRECTORIES
[docs]def touch(fname):
try: fd = os.open(fname, os.O_RDONLY | os.O_CREAT) ; os.close(fd)
except: error()
[docs]def check_permissions(ddir, dirmask=dirmask, filemask=filemask):
uid = os.getuid()
gid = os.getgid()
try: stat = os.stat(ddir)
except OSError: cdir(ddir) ; stat = os.stat(ddir)
if stat.st_uid != uid: os.chown(ddir, uid, gid)
if os.path.isfile(ddir): mask = filemask
else: mask = dirmask
if stat.st_mode != mask: os.chmod(ddir, mask)
[docs]def cdir(path):
res = ""
for p in path.split(os.sep):
res += "%s%s" % (p, os.sep)
padje = os.path.abspath(res)
if os.path.isdir(padje): continue
try: os.mkdir(padje) ; check_permissions(padje)
except OSError: error()
return True
## HELPERS
[docs]def get_urls(data):
urls = []
from bs4 import BeautifulSoup
soup = BeautifulSoup(data)
tags = soup('a')
for tag in tags:
href = tag.get("href")
if href:
href = href.split("#")[0]
if not href: continue
if not href.endswith(".html"): continue
if ".." in href: continue
if href.startswith("mailto"): continue
if href not in urls: urls.append(href)
logging.warn("found %s urls" % len(urls))
return urls
[docs]def stripbadchar(s): return "".join([c for c in s if ord(c) > 31 or c in allowedchars])
[docs]def enc_char(s):
result = []
for c in s:
if c in allowedchars: result.append(c)
else: result.append(enc_name(c))
return "".join(result)
[docs]def enc_needed(s): return [c for c in s if c not in allowedchars]
[docs]def enc_name(input): return str(base64.urlsafe_b64encode(bytes(input, "utf-8")), "utf-8")
[docs]def split_txt(what, l=375):
txtlist = []
start = 0
end = l
length = len(what)
for i in range(int(length/end+1)):
endword = what.find(' ', end)
if endword == -1: endword = length
res = what[start:endword]
if res: txtlist.append(res)
start = endword
end = start + l
return txtlist
[docs]def pretty(a):
if type(a) not in basic_types: return str(type(a))
else: return bytes("%s\n" % a, "utf-8")
[docs]def smooth(a):
if type(a) not in basic_types: return str(type(a))
else: return a
[docs]def full(a):
if type(a) not in basic_types: return str(a)
else: return a
[docs]def verzin(a):
if type(a) is float and not a.is_integer(): return short_date(time.ctime(a))
if type(a) not in basic_types: return str(type(a))
else: return a
[docs]def copyright(): return "Copyright 2014 B.H.J Thate"
[docs]def hello(*args):
if len(args) != 2: ver = __version__
else: ver = args[1]
print("%s%s %s %s%s\n" % (YELLOW, args[0], ver, copyright(), ENDC))
[docs]def list_eggs(filter="core"):
for f in sys.path:
if ".egg" not in f: continue
if filter and filter not in f: continue
yield f
[docs]def show_eggs(filter="core"):
for path in list_eggs(filter): logging.warn(path)
[docs]def stripped(input):
try: return input.split("/")[0]
except: return input
## HEADER
headertxt = '''# this is an core (#%s) file, %s
#
# this file can be edited !!
'''
## FEEDER
def feed(text):
from core import Object
result = []
chunks = text.split("\r\n")
for chunk in chunks:
obj = Object().feed(chunk)
result.append(obj)
return result
## PARSER
[docs]def parse_email(fn):
from core import Object
f = open(fn ,"r", errors="replace", encoding="utf-8")
mails = []
result = []
mess = ""
nr = 0
go = True
for line in f:
if line.startswith("From "): mails.append(mess) ; mess = line ; continue
mess += line
for mess in mails:
m = email.message_from_string(mess)
o = Object()
o.update(m.items())
o.text = ""
for load in m.get_payload(): o.text += str(load)
result.append(o)
logging.warn("%s emails read" % len(result))
return result[1:]
## STRIPPERS
[docs]def strip_html(text):
from bs4 import BeautifulSoup
soup = BeautifulSoup(str(text))
return soup.get_text()
[docs]def strip_wiki(text):
text = text.replace("[[", "")
text = text.replace("]]", "")
text = text.replace("}}", "")
text = text.replace("{{", "")
text = unescape(text)
text = re.sub("<ref .*?/>", "", text)
text = re.sub("<ref>.*?</ref>", "", text)
text = re.sub("<ref .*?</ref>", "", text)
return text
## ENCODING
[docs]def get_encoding(data):
if hasattr(data, 'info') and 'content-type' in data.info and 'charset' in data.info['content-type'].lower():
charset = data.info['content-type'].lower().split('charset', 1)[1].strip()
if charset[0] == '=':
charset = charset[1:].strip()
if ';' in charset: return charset.split(';')[0].strip()
return charset
if '<meta' in data.lower():
metas = re.findall('<meta[^>]+>', data, re.I | re.M)
if metas:
for meta in metas:
test_http_equiv = re.search('http-equiv\s*=\s*[\'"]([^\'"]+)[\'"]', meta, re.I)
if test_http_equiv and test_http_equiv.group(1).lower() == 'content-type':
test_content = re.search('content\s*=\s*[\'"]([^\'"]+)[\'"]', meta, re.I)
if test_content:
test_charset = re.search('charset\s*=\s*([^\s\'"]+)', meta, re.I)
if test_charset: return test_charset.group(1)
if chardet:
test = chardet.detect(data)
if 'encoding' in test: return test['encoding']
return sys.getdefaultencoding()
## SED
[docs]def run_sed(filename, sedstring):
""" replace oldcore strings with the new one. """
logging.warn("sed %s" % filename)
f = open(filename, 'r')
tmp = filename + '.tmp'
fout = open(tmp, 'w')
if sedstring:
char = "#"
seds = sedstring.split(char)
fr = seds[1]
to = seds[2]
for line in f:
l = re.sub(fr, to, line)
fout.write(l)
else:
for line in f:
l = re.sub("\t", " ", line.rstrip() + "\n")
fout.write(l)
fout.flush()
fout.close()
try: os.rename(tmp, filename)
except WindowsError: os.remove(filename) ; os.rename(tmp, filename)
## URL
[docs]def fetch_url(type, url, myheaders={}, postdata={}, keyfile=None, certfile="", port=80):
headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'text/plain; text/html; application/json', 'User-Agent': useragent()}
headers.update(myheaders)
urlparts = urllib.parse.urlparse(url)
if "https" in url: connection = http.client.HTTPSConnection(urlparts[1]) # keyfile, certfile)
else: connection = http.client.HTTPConnection(urlparts[1])
logging.info('%s %s' % (type, url))
connection.request(type, url)
resp = connection.getresponse()
logging.info("status %s (%s)" % (resp.status, resp.reason))
return resp
[docs]def need_redirect(resp):
if resp.status == 301: url = resp.getheader("Location") ; return url
## TO/FROM
[docs]def to_enc(what, encoding='utf-8'):
if not what: what= ""
w = str(what)
return w.encode(encoding)
[docs]def from_enc(txt, encoding='utf-8', what=""):
if not txt: txt = ""
if type(txt) == str: return txt
try: return txt.decode(encoding)
except UnicodeDecodeError: return decodeperchar(txt, encoding, what)
## PER CHARACTER
[docs]def decode_char(txt, encoding='utf-8', what=""):
res = [] ; nogo = []
for i in txt:
try: res.append(i.decode(encoding))
except UnicodeDecodeError:
if i not in nogo: nogo.append(i)
if nogo: logging.info("nogo: %s" % " ".join(nogo))
return "".join(res)
## OPTIONS
[docs]def make_opts(options):
from core import __version__
parser = optparse.OptionParser(usage='usage: %prog [options]', version=str(__version__))
for option in options:
type, default, dest, help = option[2:]
if "store" in type:
try: parser.add_option(option[0], option[1], action=type, default=default, dest=dest, help=help)
except Exception as ex: logging.error("error: %s - option: %s" % (str(ex), option)) ; continue
else:
try: parser.add_option(option[0], option[1], type=type, default=default, dest=dest, help=help)
except Exception as ex: logging.error("error: %s - option: %s" % (str(ex), option)) ; continue
args = parser.parse_args()
return args
## PARSING
[docs]def parse_url(*args, **kwargs):
"""
Attribute Index Value Value if not present
scheme 0 URL scheme specifier empty string
netloc 1 Network location part empty string
path 2 Hierarchical path empty string
query 3 Query component empty string
fragment 4 Fragment identifier empty string
"""
url = args[0]
parsed = urllib.parse.urlsplit(url)
target = parsed[2].split("/")
if "." in target[-1]: basepath = "/".join(target[:-1]) ; file = target[-1]
else: basepath = parsed[2] ; file = None
if basepath.endswith("/"): basepath = basepath[:-1]
base = urllib.parse.urlunsplit((parsed[0], parsed[1], basepath , "", ""))
root = urllib.parse.urlunsplit((parsed[0], parsed[1], "", "", ""))
return (basepath, base, root, file)
[docs]def parse_urls(*args, **kwargs):
import bs4
url, txt = args
basepath, base, root, file = parse_url(url)
s = bs4.BeautifulSoup(txt)
urls = []
tags = s('a')
for tag in tags:
href = tag.get("href")
if href:
href = href.split("#")[0]
if not href: continue
if not href.endswith(".html"): continue
if ".." in href: continue
if href.startswith("mailto"): continue
if not "http" in href:
if href.startswith("/"): href = root + href
else: href = base + "/" + href
if not root in href: logging.warn("%s not in %s" % (root, href)) ; continue
if href not in urls: urls.append(href)
logging.warn("found %s urls" % len(urls))
return urls
## GENERICS
[docs]def reduced_keys(*args, **kwargs):
inlist = args[0]
res = []
for key in inlist:
k = str(key)
if k.startswith("_"): continue
if k.startswith("X"): continue
if k.startswith("x"): continue
if not k.islower(): continue
if "-" in key: continue
if k not in res: res.append(key)
if k in ["args", "rest", "first"]: continue
if k not in res: res.append(k)
return res
[docs]def feed(text):
from core import Object
result = []
chunks = text.split("\r\n")
for chunk in chunks:
obj = Object().feed(chunk)
result.append(obj)
return result
[docs]def dispatch(target, event, cmnd, *args, **kwargs):
try: functions = target[cmnd]
except KeyError: return False
for func in functions: func(event)
return event
[docs]def resolve(*args, **kwargs):
from core import kernel
event = args[0]
event.prepare()
e = None
e = dispatch(kernel, event, event.ucmnd or event.etype, *args, **kwargs)
return e
[docs]def need_skip(obj, black=[], white=[]):
needskip = False
try: value = obj.get_content_type()
except AttributeError: return False
if value in black: needskip = True
if value not in white: needskip = True
return needskip
## day string
[docs]def nr_days(seconds): return int(seconds/(60*60*24))
[docs]def str_day(seconds):
txt = ""
nsec = int(float(seconds))
year = 365*24*60*60
week = 7*24*60*60
day = 24*60*60
hour = 60*60
minute = 60
#nsec -= nsec * leapfactor
years = int(nsec/year)
nsec -= years*year
weeks = int(nsec/week)
nsec -= weeks*week
days = int(nsec/day)
nsec -= days*day
hours = int(nsec/hour)
nsec -= hours*hour
minutes = int(nsec/minute)
sec = nsec - minutes*minute
if years: txt = "%sy" % years
if weeks: txt += "%sw" % weeks ; return txt
if days: txt = "%sd" % days ; return txt
if hours: txt = "%sh" % hours ; return txt
if minutes: txt = "%sm" % minutes ; return txt
if sec: txt = "%ss" % int(sec)
if txt: return txt
else: return "0s"
## FORMAT
[docs]def parse(txt):
result = Object()
for word in txt.split():
if word.startswith("."): result["ucmnd"] = word
if word.startswith("-"): result["opt_%s" % word] = "" ; continue
try: key, value = word.split("=")
except ValueError: result[word] = ""
result[key] = value
return result
## strtotime function
[docs]def get_day(daystr):
""" convert string to time. """
try:
dmyre = re.search('(\d+)-(\d+)-(\d+)', daystr)
(day, month, year) = dmyre.groups()
day = int(day)
month = int(month)
year = int(year)
if day <= calendar.monthrange(year, month)[1]:
date = "%s %s %s" % (day, bdmonths[month], year)
return time.mktime(time.strptime(date, "%d %b %Y"))
except AttributeError: return 0
except ValueError: return 0
[docs]def get_hour(daystr):
try:
hmsre = re.search('(\d+):(\d+):(\d+)', daystr)
hours = 60 * 60 * (int(hmsre.group(1)))
hoursmin = hours + int(hmsre.group(2)) * 60
hms = hoursmin + int(hmsre.group(3))
return hms
except AttributeError: pass
except ValueError: pass
try:
hmre = re.search('(\d+):(\d+)', daystr)
hours = 60 * 60 * (int(hmre.group(1)))
hms = hours + int(hmre.group(2)) * 60
return hms
except AttributeError: return 0
except ValueError: return 0
## today function
[docs]def today():
""" return time of 0:00 today. """
if time.daylight: ttime = time.ctime(time.time() + int(time.timezone) + 3600)
else: ttime = time.ctime(time.time() + int(time.timezone))
matched = re.search(timere, ttime)
if matched:
temp = "%s %s %s" % (matched.group(3), matched.group(2), matched.group(7))
timestring = time.strptime(temp, "%d %b %Y")
result = time.mktime(timestring)
return result