Source code for core.utils

# core/utils.py
#
#

""" utils package. """

__copyright__ = "Copyright 2014 B.H.J Thate"

## IMPORT

from core import __version__
from core.defines import *

from queue import Queue, Empty as QueueEmpty
from traceback import format_exc
from collections import deque
from cgi import escape

import urllib.request, urllib.error, urllib.parse
import urllib.parse
import html.parser
import traceback
import mailbox
import datetime
import optparse
import _thread
import hashlib
import logging
import urllib
import string
import email
import html
import types
import http
import json
import time
import math
import glob
import sys
import os
import re

## start_new_thread alias

run_thr = _thread.start_new_thread

## AGENT

[docs]def useragent(): return 'Mozilla/5.0 (X11; Linux x86_64); CORELIB %s; http://pikacode.com/bthate/corelib)' % __version__ ## ISTR
[docs]class istr(str): pass ## UNESCAPE
[docs]def unescape(text): return html.parser.HTMLParser().unescape(text) ## txt_parse function
[docs]def txt_parse(txt): from core import Object o = Object() o.args = [] o.wanted = Object() o.not_wanted = Object() o.switch = Object() if not txt: return o for word in txt.split(): try: key, value = word.split("=") op = key[-1] post = value[-1] last = word[-1] if post == "-": value = value[:-1] if word == "!": key = key[:-1] ; o.switch[key] = value #if post == "-": o.not_wanted[key] = value o.wanted[key] = value if post == "-" : continue o.args.append(key) except ValueError: o.args.append(word) return o ## SETS
[docs]def unique(a): return list(set(a))
[docs]def intersect(a, b): return list(set(a) & set(b))
[docs]def union(a, b): return list(set(a) | set(b)) ## STATE/STATUS
[docs]def get_status(obj, type=""): from core import Object o = Object() for key in obj._status: o[key] = obj._status[key] return o
[docs]def get_state(obj, type=""): from core import Object o = Object() for key in obj._state: val = obj._state[key] try: val = short_date(time.ctime(val)) except: val = None if val: o[key] = val else: o[key] = obj._state[key] return o ## UPTIME
[docs]def get_uptime(obj): res = [] target = obj._state if "boot" in target: res.append(str_day(time.time() - float(target.boot))) if" output" in target: res.append(str_day(time.time() - float(target.output))) if "sleep" in target: res.append(str_day(float(target.sleep) - (time.time() - float(target.boot)) % float(target.sleep))) return "/".join(res) ## SIGNATURES
[docs]def make_signature(data): return str(hashlib.sha1(bytes(str(data), "utf-8")).hexdigest())
[docs]def verify_signature(data, signature): from core import Object fromdisk = json.loads(data) signature2 = make_signature(fromdisk["data"]) return signature2 == signature ## FILES
[docs]def list_files(*args, **kwargs): path = args[0] res = [] if not path.endswith(os.sep): path += os.sep if "search" in kwargs: path += "*%s*" % kwargs["search"] if "*" not in path: path += "*" for fnn in glob.glob(path): if os.path.isdir(fnn): res.extend(list_files(fnn, **kwargs)) ; continue else: res.append(fnn) return res ## JOINS
[docs]def j(*args): if not args: return todo = list(map(str, filter(None, args))) return os.path.join(*todo)
[docs]def mj(*args): if not args: return todo = list(map(str, filter(None, args))) return os.path.join(*todo).replace(os.sep, ".")
[docs]def dj(*args): if not args: return todo = list(map(str, filter(None, args))) return os.path.join(*todo).replace(os.sep, "_")
[docs]def aj(sep=None, *args): return os.path.abspath(*j(sep, *args)) ## TIME
[docs]def dtime(stamp): return datetime.datetime.fromtimestamp(stamp)
[docs]def ptime(daystr): return datetime.datetime.strptime(daystr, '%Y-%m-%d')
[docs]def tdiff(d1, d2): return datetime.timedelta(d1, d2)
[docs]def rtime(): return str(datetime.datetime.now()).replace(" ", os.sep).replace(":", "_")
[docs]def ftime(datestr): return str(datestr.replace(" ", os.sep).replace(":", "_"))
[docs]def hms(): return str(datetime.datetime.today()).split()[1].split(".")[0]
[docs]def day(): return str(datetime.datetime.today()).split()[0]
[docs]def time_string(*args, **kwargs): timestamp = args[0] result = None try: result = str(datetime.datetime.fromtimestamp(timestamp)) except: error() return result
[docs]def time_time(*args, **kwargs): stamp = args[0] time_str = time_string(stamp) return time_str
[docs]def make_time(daystr): return time.mktime(time.strptime(daystr, "%a %b %d %H:%M:%S %Y"))
[docs]def a_time(daystr): if daystr: return time.mktime(time.strptime(daystr, "%Y-%m-%d %H:%M:%S"))
[docs]def b_time(daystr): if "saved" in obj: return a_time(obj.saved) return 0.0
[docs]def short_date(*args, **kwargs): date = args[0] if not date: return None res = date.split() # Mon, 25 Oct 2010 18:05:33 -0700 (PDT) # ['13', 'Oct', '2012', '20:43:46', '+0300'] ddd = "" try: if "+" in res[3]: raise ValueError if "-" in res[3]: raise ValueError int(res[3]) ddd = "{:4}-{:#02}-{:#02} {:6}".format(res[3], monthint[res[2]], int(res[1]), res[4]) except (IndexError, KeyError, ValueError): try: if "+" in res[4]: raise ValueError if "-" in res[4]: raise ValueError int(res[4]) ddd = "{:4}-{:#02}-{:02} {:6}".format(res[4], monthint[res[1]], int(res[2]), res[3]) except (IndexError, KeyError, ValueError): try: ddd = "{:4}-{:#02}-{:02} {:6}".format(res[2], monthint[res[1]], int(res[0]), res[3]) except (IndexError, KeyError): try: ddd = "{:4}-{:#02}-{:02}".format(res[2], monthint[res[1]], int(res[0])) except (IndexError, KeyError): ddd = "" return ddd
[docs]def short_time(*args, **kwargs): date = args[0] if not date: return None res = date.split() ddd = "" try: if "+" in res[3]: raise ValueError if "-" in res[3]: raise ValueError int(res[3]) ddd = "{:6}".format(res[4]) except (IndexError, KeyError, ValueError): try: if "+" in res[4]: raise ValueError if "-" in res[4]: raise ValueError int(res[4]) ddd = "{:6}".format(res[3]) except (IndexError, KeyError, ValueError): try: ddd = "{:6}".format(res[3]) except (IndexError, KeyError): pass return ddd ## NAMES
[docs]def get_funcname(func): try: name = str(func).split()[1] except (AttributeError, ValueError, IndexError): name = None return name
[docs]def get_clsname(obj): try: name = str(obj.__class__).split(".")[-1][:-2] except (AttributeError, ValueError, IndexError): try: name = obj.__class__.__module__ except AttributeError: name = None return name
[docs]def get_how(func): s = str(func) try: pre, post = s.split(" of ", 1) txt = pre.split()[2] except ValueError: try: pre, post = s.split(" from ", ) txt = " ".join(pre.split()[1:]) except ValueError: try: pre, post = s.split(" at ", 1) txt = " ".join(pre.split()[1:]) except ValueError: txt = "" return txt
[docs]def get_name(obj): return str(type(obj)).split(" ")[1][1:-2] ## HIGHEST
[docs]def get_highest(target, file_name): """ determine new file extension. """ highest = 0 for i in os.listdir(target): if file_name in i: try: seqnr = i.split('.')[-1] except IndexError: continue try: if int(seqnr) > highest: highest = int(seqnr) except ValueError: pass return file_name + '.' + str(highest + 1) ## STACK
[docs]def get_exception(*args, **kwargs): exctype, excvalue, tb = sys.exc_info() trace = traceback.extract_tb(tb) result = "" for i in trace: fname = i[0] linenr = i[1] func = i[2] plugfile = fname[:-3].split(os.sep) mod = [] for i in plugfile[::-1]: mod.append(i) ownname = '.'.join(mod[::-1]) result += "%s:%s %s | " % (ownname, linenr, func) del trace return "%s%s: %s" % (result, exctype, excvalue)
[docs]def get_plugname(*args, **kwargs): result = "" try: depth = args[0] except IndexError: depth = 1 loopframe = sys._getframe(depth) if not loopframe: return result res = [] fn = "" frame = None while 1: if depth <= 0: break depth -= 1 try: frame = loopframe.f_back except AttributeError: break if not frame: break fn = frame.f_code.co_filename loopframe = frame del loopframe return fn
[docs]def get_frame(search="code"): result = {} frame = sys._getframe(1) search = str(search) for i in dir(frame): if search in i: target = getattr(frame, i) for j in dir(target): result[j] = getattr(target, j) return result
[docs]def get_strace(*args, **kwargs): result = "" try: depth = args[0] except IndexError: depth = 1 loopframe = sys._getframe(depth) if not loopframe: return result while 1: try: frame = loopframe.f_back except AttributeError: break if not frame: break linenr = frame.f_lineno func = frame.f_code.co_name result += "%s:%s | " % (func, linenr) loopframe = frame del loopframe return result
[docs]def get_trace(*args, **kwargs): result = "" depth = args[0] loopframe = sys._getframe(depth) if not loopframe: return result while 1: try: frame = loopframe.f_back except AttributeError: break if not frame: break linenr = frame.f_lineno func = frame.f_code.co_name result = "%s:%s" % (func, linenr) loopframe = frame if depth == 0: return result depth -= 1 del loopframe return result
[docs]def get_func(*args, **kwargs): result = "" depth = args[0] loopframe = sys._getframe(depth) if not loopframe: return result func = None while 1: try: frame = loopframe.f_back except AttributeError: break if not frame: break linenr = frame.f_lineno func = frame.f_code.co_name depth -= 1 if depth <= 0: break del loopframe return func ## ERROR
[docs]def error(*args, **kwargs): msg = get_exception() ; logging.error(msg) ; return msg ## LOCATING
[docs]def get_source(mod, package): import pkg_resources as p source = os.path.abspath(p.resource_filename(mod, package)) logging.warn("source %s" % source) return source ## RESOLVING
[docs]def resolve_ip(hostname=None, timeout=1.0): oldtimeout = socket.getdefaulttimeout() socket.setdefaulttimeout(timeout) try: ip = socket.gethostbyname(hostname or socket.gethostname()) except socket.timeout: ip = None socket.setdefaulttimeout(oldtimeout) return ip
[docs]def resolve_host(ip=None, timeout=1.0): oldtimeout = socket.getdefaulttimeout() socket.setdefaulttimeout(timeout) try: host = socket.gethostbyaddr(ip or resolve_ip())[0] except socket.timeout: host = None socket.setdefaulttimeout(oldtimeout) return host ## DIRECTORIES
[docs]def touch(fname): try: fd = os.open(fname, os.O_RDONLY | os.O_CREAT) ; os.close(fd) except: error()
[docs]def check_permissions(ddir, dirmask=dirmask, filemask=filemask): uid = os.getuid() gid = os.getgid() try: stat = os.stat(ddir) except OSError: cdir(ddir) ; stat = os.stat(ddir) if stat.st_uid != uid: os.chown(ddir, uid, gid) if os.path.isfile(ddir): mask = filemask else: mask = dirmask if stat.st_mode != mask: os.chmod(ddir, mask)
[docs]def cdir(path): res = "" for p in path.split(os.sep): res += "%s%s" % (p, os.sep) padje = os.path.abspath(res) if os.path.isdir(padje): continue try: os.mkdir(padje) ; check_permissions(padje) except OSError: error() return True ## HELPERS
[docs]def get_urls(data): urls = [] from bs4 import BeautifulSoup soup = BeautifulSoup(data) tags = soup('a') for tag in tags: href = tag.get("href") if href: href = href.split("#")[0] if not href: continue if not href.endswith(".html"): continue if ".." in href: continue if href.startswith("mailto"): continue if href not in urls: urls.append(href) logging.warn("found %s urls" % len(urls)) return urls
[docs]def stripbadchar(s): return "".join([c for c in s if ord(c) > 31 or c in allowedchars])
[docs]def enc_char(s): result = [] for c in s: if c in allowedchars: result.append(c) else: result.append(enc_name(c)) return "".join(result)
[docs]def enc_needed(s): return [c for c in s if c not in allowedchars]
[docs]def enc_name(input): return str(base64.urlsafe_b64encode(bytes(input, "utf-8")), "utf-8")
[docs]def split_txt(what, l=375): txtlist = [] start = 0 end = l length = len(what) for i in range(int(length/end+1)): endword = what.find(' ', end) if endword == -1: endword = length res = what[start:endword] if res: txtlist.append(res) start = endword end = start + l return txtlist
[docs]def pretty(a): if type(a) not in basic_types: return str(type(a)) else: return bytes("%s\n" % a, "utf-8")
[docs]def smooth(a): if type(a) not in basic_types: return str(type(a)) else: return a
[docs]def full(a): if type(a) not in basic_types: return str(a) else: return a
[docs]def verzin(a): if type(a) is float and not a.is_integer(): return short_date(time.ctime(a)) if type(a) not in basic_types: return str(type(a)) else: return a
[docs]def hello(*args): if len(args) != 2: ver = __version__ else: ver = args[1] print("%s%s %s %s%s\n" % (YELLOW, args[0], ver, copyright(), ENDC))
[docs]def list_eggs(filter="core"): for f in sys.path: if ".egg" not in f: continue if filter and filter not in f: continue yield f
[docs]def show_eggs(filter="core"): for path in list_eggs(filter): logging.warn(path)
[docs]def stripped(input): try: return input.split("/")[0] except: return input ## HEADER
headertxt = '''# this is an core (#%s) file, %s # # this file can be edited !! ''' ## FEEDER def feed(text): from core import Object result = [] chunks = text.split("\r\n") for chunk in chunks: obj = Object().feed(chunk) result.append(obj) return result ## PARSER
[docs]def parse_email(fn): from core import Object f = open(fn ,"r", errors="replace", encoding="utf-8") mails = [] result = [] mess = "" nr = 0 go = True for line in f: if line.startswith("From "): mails.append(mess) ; mess = line ; continue mess += line for mess in mails: m = email.message_from_string(mess) o = Object() o.update(m.items()) o.text = "" for load in m.get_payload(): o.text += str(load) result.append(o) logging.warn("%s emails read" % len(result)) return result[1:] ## STRIPPERS
[docs]def strip_html(text): from bs4 import BeautifulSoup soup = BeautifulSoup(str(text)) return soup.get_text()
[docs]def strip_wiki(text): text = text.replace("[[", "") text = text.replace("]]", "") text = text.replace("}}", "") text = text.replace("{{", "") text = unescape(text) text = re.sub("<ref .*?/>", "", text) text = re.sub("<ref>.*?</ref>", "", text) text = re.sub("<ref .*?</ref>", "", text) return text ## ENCODING
[docs]def get_encoding(data): if hasattr(data, 'info') and 'content-type' in data.info and 'charset' in data.info['content-type'].lower(): charset = data.info['content-type'].lower().split('charset', 1)[1].strip() if charset[0] == '=': charset = charset[1:].strip() if ';' in charset: return charset.split(';')[0].strip() return charset if '<meta' in data.lower(): metas = re.findall('<meta[^>]+>', data, re.I | re.M) if metas: for meta in metas: test_http_equiv = re.search('http-equiv\s*=\s*[\'"]([^\'"]+)[\'"]', meta, re.I) if test_http_equiv and test_http_equiv.group(1).lower() == 'content-type': test_content = re.search('content\s*=\s*[\'"]([^\'"]+)[\'"]', meta, re.I) if test_content: test_charset = re.search('charset\s*=\s*([^\s\'"]+)', meta, re.I) if test_charset: return test_charset.group(1) if chardet: test = chardet.detect(data) if 'encoding' in test: return test['encoding'] return sys.getdefaultencoding() ## SED
[docs]def run_sed(filename, sedstring): """ replace oldcore strings with the new one. """ logging.warn("sed %s" % filename) f = open(filename, 'r') tmp = filename + '.tmp' fout = open(tmp, 'w') if sedstring: char = "#" seds = sedstring.split(char) fr = seds[1] to = seds[2] for line in f: l = re.sub(fr, to, line) fout.write(l) else: for line in f: l = re.sub("\t", " ", line.rstrip() + "\n") fout.write(l) fout.flush() fout.close() try: os.rename(tmp, filename) except WindowsError: os.remove(filename) ; os.rename(tmp, filename) ## URL
[docs]def fetch_url(type, url, myheaders={}, postdata={}, keyfile=None, certfile="", port=80): headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'text/plain; text/html; application/json', 'User-Agent': useragent()} headers.update(myheaders) urlparts = urllib.parse.urlparse(url) if "https" in url: connection = http.client.HTTPSConnection(urlparts[1]) # keyfile, certfile) else: connection = http.client.HTTPConnection(urlparts[1]) logging.info('%s %s' % (type, url)) connection.request(type, url) resp = connection.getresponse() logging.info("status %s (%s)" % (resp.status, resp.reason)) return resp
[docs]def need_redirect(resp): if resp.status == 301: url = resp.getheader("Location") ; return url ## TO/FROM
[docs]def to_enc(what, encoding='utf-8'): if not what: what= "" w = str(what) return w.encode(encoding)
[docs]def from_enc(txt, encoding='utf-8', what=""): if not txt: txt = "" if type(txt) == str: return txt try: return txt.decode(encoding) except UnicodeDecodeError: return decodeperchar(txt, encoding, what) ## PER CHARACTER
[docs]def decode_char(txt, encoding='utf-8', what=""): res = [] ; nogo = [] for i in txt: try: res.append(i.decode(encoding)) except UnicodeDecodeError: if i not in nogo: nogo.append(i) if nogo: logging.info("nogo: %s" % " ".join(nogo)) return "".join(res) ## OPTIONS
[docs]def make_opts(options): from core import __version__ parser = optparse.OptionParser(usage='usage: %prog [options]', version=str(__version__)) for option in options: type, default, dest, help = option[2:] if "store" in type: try: parser.add_option(option[0], option[1], action=type, default=default, dest=dest, help=help) except Exception as ex: logging.error("error: %s - option: %s" % (str(ex), option)) ; continue else: try: parser.add_option(option[0], option[1], type=type, default=default, dest=dest, help=help) except Exception as ex: logging.error("error: %s - option: %s" % (str(ex), option)) ; continue args = parser.parse_args() return args ## PARSING
[docs]def parse_url(*args, **kwargs): """ Attribute Index Value Value if not present scheme 0 URL scheme specifier empty string netloc 1 Network location part empty string path 2 Hierarchical path empty string query 3 Query component empty string fragment 4 Fragment identifier empty string """ url = args[0] parsed = urllib.parse.urlsplit(url) target = parsed[2].split("/") if "." in target[-1]: basepath = "/".join(target[:-1]) ; file = target[-1] else: basepath = parsed[2] ; file = None if basepath.endswith("/"): basepath = basepath[:-1] base = urllib.parse.urlunsplit((parsed[0], parsed[1], basepath , "", "")) root = urllib.parse.urlunsplit((parsed[0], parsed[1], "", "", "")) return (basepath, base, root, file)
[docs]def parse_urls(*args, **kwargs): import bs4 url, txt = args basepath, base, root, file = parse_url(url) s = bs4.BeautifulSoup(txt) urls = [] tags = s('a') for tag in tags: href = tag.get("href") if href: href = href.split("#")[0] if not href: continue if not href.endswith(".html"): continue if ".." in href: continue if href.startswith("mailto"): continue if not "http" in href: if href.startswith("/"): href = root + href else: href = base + "/" + href if not root in href: logging.warn("%s not in %s" % (root, href)) ; continue if href not in urls: urls.append(href) logging.warn("found %s urls" % len(urls)) return urls ## GENERICS
[docs]def reduced_keys(*args, **kwargs): inlist = args[0] res = [] for key in inlist: k = str(key) if k.startswith("_"): continue if k.startswith("X"): continue if k.startswith("x"): continue if not k.islower(): continue if "-" in key: continue if k not in res: res.append(key) if k in ["args", "rest", "first"]: continue if k not in res: res.append(k) return res
[docs]def feed(text): from core import Object result = [] chunks = text.split("\r\n") for chunk in chunks: obj = Object().feed(chunk) result.append(obj) return result
[docs]def dispatch(target, event, cmnd, *args, **kwargs): try: functions = target[cmnd] except KeyError: return False for func in functions: func(event) return event
[docs]def resolve(*args, **kwargs): from core import kernel event = args[0] event.prepare() e = None e = dispatch(kernel, event, event.ucmnd or event.etype, *args, **kwargs) return e
[docs]def need_skip(obj, black=[], white=[]): needskip = False try: value = obj.get_content_type() except AttributeError: return False if value in black: needskip = True if value not in white: needskip = True return needskip ## day string
[docs]def nr_days(seconds): return int(seconds/(60*60*24))
[docs]def str_day(seconds): txt = "" nsec = int(float(seconds)) year = 365*24*60*60 week = 7*24*60*60 day = 24*60*60 hour = 60*60 minute = 60 #nsec -= nsec * leapfactor years = int(nsec/year) nsec -= years*year weeks = int(nsec/week) nsec -= weeks*week days = int(nsec/day) nsec -= days*day hours = int(nsec/hour) nsec -= hours*hour minutes = int(nsec/minute) sec = nsec - minutes*minute if years: txt = "%sy" % years if weeks: txt += "%sw" % weeks ; return txt if days: txt = "%sd" % days ; return txt if hours: txt = "%sh" % hours ; return txt if minutes: txt = "%sm" % minutes ; return txt if sec: txt = "%ss" % int(sec) if txt: return txt else: return "0s" ## FORMAT
[docs]def format(*args, **kwargs): obj = args[0] try: keys = kwargs["keys"] except KeyError: keys = zelf.get_keys() try: counter = kwargs["nr"] except KeyError: counter = 1 return "%s %s (%s days)" % (counter, " ".join([str(getattr(obj, x, None)) for x in keys if getattr(obj, x, None)]), obj.get_days()) ## PARSE
[docs]def parse(txt): result = Object() for word in txt.split(): if word.startswith("."): result["ucmnd"] = word if word.startswith("-"): result["opt_%s" % word] = "" ; continue try: key, value = word.split("=") except ValueError: result[word] = "" result[key] = value return result ## strtotime function
[docs]def get_day(daystr): """ convert string to time. """ try: dmyre = re.search('(\d+)-(\d+)-(\d+)', daystr) (day, month, year) = dmyre.groups() day = int(day) month = int(month) year = int(year) if day <= calendar.monthrange(year, month)[1]: date = "%s %s %s" % (day, bdmonths[month], year) return time.mktime(time.strptime(date, "%d %b %Y")) except AttributeError: return 0 except ValueError: return 0
[docs]def get_hour(daystr): try: hmsre = re.search('(\d+):(\d+):(\d+)', daystr) hours = 60 * 60 * (int(hmsre.group(1))) hoursmin = hours + int(hmsre.group(2)) * 60 hms = hoursmin + int(hmsre.group(3)) return hms except AttributeError: pass except ValueError: pass try: hmre = re.search('(\d+):(\d+)', daystr) hours = 60 * 60 * (int(hmre.group(1))) hms = hours + int(hmre.group(2)) * 60 return hms except AttributeError: return 0 except ValueError: return 0 ## today function
[docs]def today(): """ return time of 0:00 today. """ if time.daylight: ttime = time.ctime(time.time() + int(time.timezone) + 3600) else: ttime = time.ctime(time.time() + int(time.timezone)) matched = re.search(timere, ttime) if matched: temp = "%s %s %s" % (matched.group(3), matched.group(2), matched.group(7)) timestring = time.strptime(temp, "%d %b %Y") result = time.mktime(timestring) return result