Source code for point.utils

# point/utils.py
#
#

""" utils package. """

## IMPORT

from point import __version__
from point.defines import *

from queue import Queue, Empty as QueueEmpty
from traceback import format_exc
from collections import deque
from cgi import escape

import urllib.request, urllib.error, urllib.parse
import urllib.parse
import html.parser
import traceback
import mailbox
import datetime
import optparse
import _thread
import hashlib
import logging
import urllib
import string
import email
import html
import types
import http
import json
import time
import math
import glob
import sys
import os
import re

## start_new_thread alias

run_thr = _thread.start_new_thread

## AGENT

[docs]def useragent(): return 'Mozilla/5.0 (X11; Linux x86_64); POINT %s; http://pikacode.com/bthate/point)' % __version__ ## ISTR
[docs]class istr(str): pass ## UNESCAPE
[docs]def unescape(text): return html.parser.HTMLParser().unescape(text) ## txt_parse fucntion
[docs]def txt_parse(txt): from point import Object o = Object() o.args = [] o.wanted = Object() o.not_wanted = Object() for word in txt.split(): try: key, value = word.split("=") op = key[-1] post = value[-1] if post == "-": value = value[:-1] if op == "-": key = key[:-1] ; o.not_wanted[key] = value else: o.wanted[key] = value if post == "-" : continue if key not in o.args: o.args.append(key) except ValueError: if word not in o.args: o.args.append(word) return o ## SETS
[docs]def unique(a): """ return the list with duplicate elements removed """ return list(set(a))
[docs]def intersect(a, b): """ return the intersection of two lists """ return list(set(a) & set(b))
[docs]def union(a, b): """ return the union of two lists """ return list(set(a) | set(b)) ## BLAET
[docs]def blaet(target): res = [] if "time_start" in target: res.append("%s" % str_day(time.time() - float(target.time_start))) if "time_in" in target: res.append("%s" % str_day(time.time() - float(target.time_in))) if "time_sleep" in target: res.append("%s" % str_day(float(target.time_sleep) - (time.time() - target.time_in))) return "/".join(res) ## SIGNATURES
[docs]def make_signature(data): return str(hashlib.sha1(bytes(str(data), "utf-8")).hexdigest())
[docs]def verify_signature(data, signature): from point import Object fromdisk = json.loads(data) signature2 = make_signature(fromdisk["data"]) return signature2 == signature ## FILES
[docs]def list_files(*args, **kwargs): path = args[0] res = [] if not path.endswith(os.sep): path += os.sep if "search" in kwargs: path += "*%s*" % kwargs["search"] if "*" not in path: path += "*" for fnn in glob.glob(path): if os.path.isdir(fnn): res.extend(list_files(fnn, **kwargs)) ; continue else: res.append(fnn) return res ## JOINS
[docs]def j(*args): if not args: return todo = list(map(str, filter(None, args))) return os.path.join(*todo)
[docs]def mj(*args): if not args: return todo = list(map(str, filter(None, args))) return os.path.join(*todo).replace(os.sep, ".")
[docs]def dj(*args): if not args: return todo = list(map(str, filter(None, args))) return os.path.join(*todo).replace(os.sep, "_")
[docs]def aj(sep=None, *args): return os.path.abspath(*j(sep, *args)) ## TIME
[docs]def dtime(stamp): return datetime.datetime.fromtimestamp(stamp)
[docs]def ptime(daystr): return datetime.datetime.strptime(daystr, '%Y-%m-%d')
[docs]def tdiff(d1, d2): return datetime.timedelta(d1, d2)
[docs]def rtime(): return str(datetime.datetime.now()).replace(" ", "-=-")
[docs]def hms(): return str(datetime.datetime.today()).split()[1].split(".")[0]
[docs]def day(): return str(datetime.datetime.today()).split()[0]
[docs]def time_string(*args, **kwargs): timestamp = args[0] result = None try: result = str(datetime.datetime.fromtimestamp(timestamp)) except: error() return result
[docs]def time_time(*args, **kwargs): stamp = args[0] time_str = time_string(stamp) return time_str
[docs]def time_stamp(*args, **kwargs): daystr = args[0].strip() instring = "" for spl in daystr.split(): instring += "%s " % spl for format in dayformats: logging.info("trying %s" % format) try: res = datetime.datetime.strptime(instring, format).timestamp() ; return res except ValueError: continue return 0.0
[docs]def make_time(daystr): return time.mktime(time.strptime(daystr, "%a %b %d %H:%M:%S %Y"))
[docs]def a_time(daystr): if "." in daystr: daystr = daystr.split(".")[0] try: return time.mktime(time.strptime(daystr, "%Y-%m-%d %H:%M:%S")) except Exception as ex: error()
[docs]def short_date(*args, **kwargs): date = args[0] if not date: return None res = [] for d in date.split(): if "," in str(d): continue res.append(d) ddd = None try: ddd = "{:4}-{:#02}-{:#02} {:6}".format(res[2], monthint[res[1]], int(res[0]), res[3]) except (IndexError, KeyError, ValueError): try: ddd = "{:4}-{:#02}-{:#02} {:6}".format(res[3], monthint[res[2]], int(res[1]), res[4]) except (IndexError, KeyError, ValueError): logging.debug("can't parse date %s" % date) return ddd
[docs]def to_time(*args, **kwargs): date = args[0] res = [] for d in date.split(): if "," in str(d): continue res.append(d) ddd = None try: ddd = "{:4}-{:#02}-{:02} {:6}".format(res[4], monthint[res[1]], int(res[2]), res[3]) except (IndexError, KeyError): ddd = "" return ddd ## NAMES
[docs]def get_modname(obj): name = obj.__class__.__module__ return name
[docs]def get_clsname(obj): name = str(obj.__class__) return name.split(" ")[1][1:-2]
[docs]def get_cls(obj): return get_clsname(obj).split(".")[-1]
[docs]def get_funcname(str_in): return str_in.split()[1] ## STACK
[docs]def get_exception(*args, **kwargs): exctype, excvalue, tb = sys.exc_info() trace = traceback.extract_tb(tb) result = "" for i in trace: fname = i[0] linenr = i[1] func = i[2] plugfile = fname[:-3].split(os.sep) mod = [] for i in plugfile[::-1]: mod.append(i) ownname = '.'.join(mod[::-1]) result += "%s:%s %s | " % (ownname, linenr, func) del trace return "%s%s: %s" % (result, exctype, excvalue)
[docs]def get_plugname(*args, **kwargs): result = "" depth = args[0] loopframe = sys._getframe(depth) if not loopframe: return result res = [] fn = "" frame = None while 1: if depth <= 0: break depth -= 1 try: frame = loopframe.f_back except AttributeError: break if not frame: break fn = frame.f_code.co_filename del loopframe return fn
[docs]def get_frame(search="code"): result = {} frame = sys._getframe(1) search = str(search) for i in dir(frame): if search in i: target = getattr(frame, i) for j in dir(target): result[j] = getattr(target, j) return result
[docs]def get_strace(*args, **kwargs): result = "" depth = args[0] loopframe = sys._getframe(depth) if not loopframe: return result while 1: try: frame = loopframe.f_back except AttributeError: break if not frame: break linenr = frame.f_lineno func = frame.f_code.co_name result += "%s:%s | " % (func, linenr) loopframe = frame del loopframe return result
[docs]def get_how(*args, **kwargs): result = "" depth = args[0] loopframe = sys._getframe(depth) if not loopframe: return result while 1: try: frame = loopframe.f_back except AttributeError: break if not frame: break linenr = frame.f_lineno func = frame.f_code.co_name result = "%s:%s" % (func, linenr) loopframe = frame if depth == 0: return result depth -= 1 del loopframe return result
[docs]def get_func(*args, **kwargs): result = "" depth = args[0] loopframe = sys._getframe(depth) if not loopframe: return result func = None while 1: try: frame = loopframe.f_back except AttributeError: break if not frame: break linenr = frame.f_lineno func = frame.f_code.co_name depth -= 1 if depth <= 0: break del loopframe return func
[docs]def error(*args, **kwargs): msg = get_exception() ; logging.error(msg) ; return msg ## LOCATING
[docs]def get_source(mod, package): import pkg_resources as p source = os.path.abspath(p.resource_filename(mod, package)) logging.info("source is %s" % source) return source ## RESOLVING
[docs]def resolve_ip(hostname=None, timeout=1.0): oldtimeout = socket.getdefaulttimeout() socket.setdefaulttimeout(timeout) try: ip = socket.gethostbyname(hostname or socket.gethostname()) except socket.timeout: ip = None socket.setdefaulttimeout(oldtimeout) return ip
[docs]def resolve_host(ip=None, timeout=1.0): oldtimeout = socket.getdefaulttimeout() socket.setdefaulttimeout(timeout) try: host = socket.gethostbyaddr(ip or resolve_ip())[0] except socket.timeout: host = None socket.setdefaulttimeout(oldtimeout) return host ## DIRECTORIES
[docs]def touch(fname): try: fd = os.open(fname, os.O_RDONLY | os.O_CREAT) ; os.close(fd) except: error()
[docs]def check_permissions(ddir, dirmask=dirmask, filemask=filemask): uid = os.getuid() gid = os.getgid() try: stat = os.stat(ddir) except OSError: make_dir(ddir) ; stat = os.stat(ddir) if stat.st_uid != uid: os.chown(ddir, uid, gid) if os.path.isfile(ddir): mask = filemask else: mask = dirmask if stat.st_mode != mask: os.chmod(ddir, mask)
[docs]def make_dir(path): target = os.sep for item in path.split(target)[:-1]: target = j(target, item) try: os.mkdir(target) except OSError as ex: logging.debug(ex) ; continue check_permissions(target) return path ## HELPERS
[docs]def stripbadchar(s): return "".join([c for c in s if ord(c) > 31 or c in allowedchars])
[docs]def enc_char(s): result = [] for c in s: if c in allowedchars: result.append(c) else: result.append(enc_name(c)) return "".join(result)
[docs]def enc_needed(s): return [c for c in s if c not in allowedchars]
[docs]def enc_name(input): return str(base64.urlsafe_b64encode(bytes(input, "utf-8")), "utf-8")
[docs]def split_txt(what, l=375): txtlist = [] start = 0 end = l length = len(what) for i in range(int(length/end+1)): starttag = what.find("</", end) if starttag != -1: endword = what.find('>', end) + 1 else: endword = what.find(' ', end) if endword == -1: endword = length res = what[start:endword] if res: txtlist.append(res) start = endword end = start + l return txtlist
[docs]def smooth(a): if type(a) not in basic_types: return get_cls(a) else: return a
[docs]def make_version(name=""): return "%s%s #%s ! %s%s" % (YELLOW, name, __version__, time.ctime(time.time()), ENDC)
[docs]def hello(name=""): print(make_version(name) + "\n")
[docs]def list_eggs(filter=""): for f in sys.path: if ".egg" not in f: continue if filter and filter not in f: continue yield f
[docs]def show_eggs(filter="point"): for egg in list_eggs(filter): logging.warn("%s egg: %s" % (filter, egg))
[docs]def stripped(input): try: return input.split("/")[0] except: return input ## HEADER
headertxt = '''# %s # # this is an p (#%s) file, %s # # this file can be edited !! ''' ## FEEDER def feed(text): from point import Object result = [] chunks = text.split("\r\n") for chunk in chunks: obj = Object().feed(chunk) result.append(obj) return result ## PARSER
[docs]def parse_email(fn): from point import Object f = open(fn ,"r", errors="replace", encoding="utf-8") mails = [] result = [] mess = "" nr = 0 go = True for line in f: if line.startswith("From "): mails.append(mess) ; mess = line ; continue mess += line for mess in mails: m = email.message_from_string(mess) o = Object() o.update(m.items()) o.text = "" for load in m.get_payload(): o.text += str(load) result.append(o) logging.warn("%s emails read" % len(result)) return result[1:] ## STRIPPERS
[docs]def strip_html(text): from bs4 import BeautifulSoup soup = BeautifulSoup(str(text)) return soup.get_text()
[docs]def strip_wiki(text): text = text.replace("[[", "") text = text.replace("]]", "") text = text.replace("}}", "") text = text.replace("{{", "") text = unescape(text) text = re.sub("<ref .*?/>", "", text) text = re.sub("<ref>.*?</ref>", "", text) text = re.sub("<ref .*?</ref>", "", text) return text ## ENCODING
[docs]def get_encoding(data): if hasattr(data, 'info') and 'content-type' in data.info and 'charset' in data.info['content-type'].lower(): charset = data.info['content-type'].lower().split('charset', 1)[1].strip() if charset[0] == '=': charset = charset[1:].strip() if ';' in charset: return charset.split(';')[0].strip() return charset if '<meta' in data.lower(): metas = re.findall('<meta[^>]+>', data, re.I | re.M) if metas: for meta in metas: test_http_equiv = re.search('http-equiv\s*=\s*[\'"]([^\'"]+)[\'"]', meta, re.I) if test_http_equiv and test_http_equiv.group(1).lower() == 'content-type': test_content = re.search('content\s*=\s*[\'"]([^\'"]+)[\'"]', meta, re.I) if test_content: test_charset = re.search('charset\s*=\s*([^\s\'"]+)', meta, re.I) if test_charset: return test_charset.group(1) if chardet: test = chardet.detect(data) if 'encoding' in test: return test['encoding'] return sys.getdefaultencoding() ## URL RELATED
[docs]def do_url(type, url, myheaders={}, postdata={}, keyfile=None, certfile="", port=80): headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'text/plain; text/html', 'User-Agent': useragent()} headers.update(myheaders) urlparts = urllib.parse.urlparse(url) if "https" in url: connection = http.client.HTTPSConnection(urlparts[1]) # keyfile, certfile) else: connection = http.client.HTTPConnection(urlparts[1]) postdata = urllib.parse.urlencode(postdata) logging.warn('%s %s' % (type, url)) connection.request(type, urlparts[2], postdata, headers) resp = connection.getresponse() logging.warn("status %s (%s)" % (resp.status, resp.reason)) return resp
[docs]def need_redirect(resp): if resp.status == 301: url = resp.getheader("Location") ; return url ## TO/FROM
[docs]def to_enc(what, encoding='utf-8'): if not what: what= "" w = str(what) return w.encode(encoding)
[docs]def from_enc(txt, encoding='utf-8', what=""): if not txt: txt = "" if type(txt) == str: return txt try: return txt.decode(encoding) except UnicodeDecodeError: return decodeperchar(txt, encoding, what) ## PER CHARACTER
[docs]def decode_char(txt, encoding='utf-8', what=""): res = [] ; nogo = [] for i in txt: try: res.append(i.decode(encoding)) except UnicodeDecodeError: if i not in nogo: nogo.append(i) if nogo: logging.warn("nogo: %s" % " ".join(nogo)) return "".join(res) ## OPTIONS
[docs]def make_opts(): parser = optparse.OptionParser(usage='usage: %prog [options]', version=__version__) for option in options: type, default, dest, help = option[2:] if "store" in type: try: parser.add_option(option[0], option[1], action=type, default=default, dest=dest, help=help) except Exception as ex: logging.error("error: %s - option: %s" % (str(ex), option)) ; continue else: try: parser.add_option(option[0], option[1], type=type, default=default, dest=dest, help=help) except Exception as ex: logging.error("error: %s - option: %s" % (str(ex), option)) ; continue args = parser.parse_args() return args ## PARSING
[docs]def parse_url(*args, **kwargs): """ Attribute Index Value Value if not present scheme 0 URL scheme specifier empty string netloc 1 Network location part empty string path 2 Hierarchical path empty string query 3 Query component empty string fragment 4 Fragment identifier empty string """ url = args[0] parsed = urllib.parse.urlsplit(url) target = parsed[2].split("/") if "." in target[-1]: basepath = "/".join(target[:-1]) ; file = target[-1] else: basepath = parsed[2] ; file = None if basepath.endswith("/"): basepath = basepath[:-1] base = urllib.parse.urlunsplit((parsed[0], parsed[1], basepath , "", "")) root = urllib.parse.urlunsplit((parsed[0], parsed[1], "", "", "")) return (basepath, base, root, file)
[docs]def parse_urls(*args, **kwargs): import bs4 url, txt = args basepath, base, root, file = parse_url(url) s = bs4.BeautifulSoup(txt) urls = [] tags = s('a') for tag in tags: href = tag.get("href") if href: href = href.split("#")[0] if not href: continue if not href.endswith(".html"): continue if ".." in href: continue if href.startswith("mailto"): continue if not "http" in href: if href.startswith("/"): href = root + href else: href = base + "/" + href if not root in href: logging.warn("%s not in %s" % (root, href)) ; continue if href not in urls: urls.append(href) logging.warn("found %s urls" % len(urls)) return urls ## GENERICS
[docs]def reduced_keys(*args, **kwargs): inlist = args[0] res = [] for key in inlist: k = str(key) if k.startswith("_"): continue if k.startswith("X"): continue if k.startswith("x"): continue if not k.islower(): continue if "-" in key: continue if k not in res: res.append(key) if k in ["args", "rest", "first"]: continue if k not in res: res.append(k) return res
[docs]def feed(text): from point import Object result = [] chunks = text.split("\r\n") for chunk in chunks: obj = Object().feed(chunk) result.append(obj) return result
[docs]def dispatch(target, event, cmnd, *args, **kwargs): try: functions = target[cmnd] except KeyError: return False for func in functions: func(event) return event
[docs]def resolve(*args, **kwargs): from point import kernel event = args[0] event.prepare() e = None e = dispatch(kernel, event, event.ucmnd or event.etype, *args, **kwargs) return e
[docs]def need_skip(obj, black=[], white=[]): needskip = False try: value = obj.get_content_type() except AttributeError: return False if value in black: needskip = True if value not in white: needskip = True return needskip ## day string
[docs]def str_day(seconds): nsec = int(float(seconds)) year = 365*24*60*60 week = 7*24*60*60 day = 24*60*60 hour = 60*60 minute = 60 nsec -= nsec * leapfactor years = int(nsec/year) nsec -= years*year weeks = int(nsec/week) nsec -= weeks*week days = int(nsec/day) nsec -= days*day hours = int(nsec/hour) nsec -= hours*hour minutes = int(nsec/minute) sec = int(nsec - minutes*minute) if years: return "%sy%sd%sh%sm%ss" % (years, days, hours, minutes, sec) if days: return "%sd%sh%sm%ss" % (days, hours, minutes, sec) if hours: return "%sh%sm%ss" % (hours, minutes, sec) if minutes: return "%sm%ss" % (minutes, sec) return "%ss" % sec ## FORMAT
[docs]def format(*args, **kwargs): obj = args[0] keys = args[1] tijd = to_time(time.ctime(time.time())) txt = "%s " % tijd logging.debug("format %s" % type(obj)) if type(obj) in [str, ]: return obj if type(obj) in [types.FunctionType, types.MethodType, types.BuiltinFunctionType, types.BuiltinMethodType]: return str(obj) if type(obj) in [list, types.GeneratorType]: return txt + " ".join([get_clsname(x) for x in sorted(obj)]) try: tijd = obj["timed"] except KeyError: pass if "format" in obj: format = obj.format else: format = "raw" if format == "timed": return str(obj) if format == "dump": return str(obj) if format == "type": return " ".join([str(type(x)) for x in obj.values()]) if format == "keys": return " ".join(sorted(obj.clean_keys())) if format == "show": return " ".join(obj.show()) if format == "values": res = "" for key in keys: if key not in obj: continue if not obj[key]: continue res += "%s " % obj[key] if not res: return res if "timed" in obj: res = "%s %s" % (obj["timed"], res) if "timed" in obj: t1 = time.time() t2 = a_time(obj.timed) time_diff = float(t1 - t2) res += " (%s days)" % int(time_diff/(24*60*60)) return res return str(obj) ## PARSE
[docs]def parse(txt): result = Object() for word in txt.split(): if word.startswith("."): result["ucmnd"] = word if word.startswith("-"): result["opt_%s" % word] = "" ; continue try: key, value = word.split("=") except ValueError: result[word] = "" result[key] = value return result ## strtotime function
[docs]def strtotime(what): """ convert string to time. """ daymonthyear = 0 hoursmin = 0 try: dmyre = re.search('(\d+)-(\d+)-(\d+)', str(what)) if dmyre: (day, month, year) = dmyre.groups() day = int(day) month = int(month) year = int(year) if day <= calendar.monthrange(year, month)[1]: date = "%s %s %s" % (day, bdmonths[month], year) daymonthyear = time.mktime(time.strptime(date, "%d %b %Y")) else: return None else: dmre = re.search('(\d+)-(\d+)', str(what)) if dmre: year = time.localtime()[0] (day, month) = dmre.groups() day = int(day) month = int(month) if day <= calendar.monthrange(year, month)[1]: date = "%s %s %s" % (day, bdmonths[month], year) daymonthyear = time.mktime(time.strptime(date, "%d %b %Y")) else: return None hmsre = re.search('(\d+):(\d+):(\d+)', str(what)) if hmsre: (h, m, s) = hmsre.groups() h = int(h) m = int(m) s = int(s) if h > 24 or h < 0 or m > 60 or m < 0 or s > 60 or s < 0: return None hours = 60 * 60 * (int(hmsre.group(1))) hoursmin = hours + int(hmsre.group(2)) * 60 hms = hoursmin + int(hmsre.group(3)) else: hmre = re.search('(\d+):(\d+)', str(what)) if hmre: (h, m) = hmre.groups() h = int(h) m = int(m) if h > 24 or h < 0 or m > 60 or m < 0: return None hours = 60 * 60 * (int(hmre.group(1))) hms = hours + int(hmre.group(2)) * 60 else: hms = 0 return hms except OverflowError: return None except ValueError:return None except Exception as ex: pass ## today function
[docs]def today(): """ return time of 0:00 today. """ if time.daylight: ttime = time.ctime(time.time() + int(time.timezone) + 3600) else: ttime = time.ctime(time.time() + int(time.timezone)) matched = re.search(timere, ttime) if matched: temp = "%s %s %s" % (matched.group(3), matched.group(2), matched.group(7)) timestring = time.strptime(temp, "%d %b %Y") result = time.mktime(timestring) return result