Source code for zot.utils

# zot/utils.py
#
#

""" utils package. """

__copyright__ = "Copyright 2015, B.H.J Thate"

## IMPORT

from zot import __version__, __copyright__
from zot.defines import *

from queue import Queue, Empty as QueueEmpty
from traceback import format_exc
from collections import deque
from cgi import escape

import urllib.request, urllib.error, urllib.parse
import urllib.parse
import html.parser
import traceback
import threading
import mailbox
import datetime
import optparse
import _thread
import hashlib
import logging
import urllib
import string
import email
import html
import types
import http
import json
import time
import math
import glob
import sys
import os
import re

## start_new_thread alias

run_thr = _thread.start_new_thread

## AGENT

[docs]def useragent(): return 'Mozilla/5.0 (X11; Linux x86_64); ZOTBOT %s; http://pikacode.com/bthate/zotbot)' % __version__ ## ISTR
[docs]class istr(str): pass ## UNESCAPE
[docs]def unescape(text): return html.parser.HTMLParser().unescape(text)
[docs]def to_str(obj): res = "" try: res = obj.json(indent=4) except AttributeError: if type(obj) == list: res = ", ".join([str(x) for x in obj]) elif type(obj) == types.ModuleType: res = str(obj) return res ## LOCK
[docs]def locked(func, *args, **kwargs): """ locking function for %s """ % str(func) lock = _thread.allocate_lock() def lockedfunc(*args, **kwargs): """ the locked function. """ lock.acquire() res = None try: res = func(*args, **kwargs) finally: try: lock.release() except: pass return res return lockedfunc ## txt_parse function
[docs]def txt_parse(*args, **kwargs): from zot.object import Object txt = args[0] try: cc = args[1] except: cc = "!" o = Object() o.args = [] o.wanted = Object() o.not_wanted = Object() o.switch = Object() if not txt: return o splitted = txt.split() c = 0 o.new_txt = "" for word in splitted: if not c: o.cmnd = word if word[0] == cc: o.cmnd = o.cmnd[1:] c += 1 continue try: key, value = word.split("=") op = key[-1] post = value[-1] last = word[-1] if key == "i": try: o.index = int(value) except: pass if post == "-": value = value[:-1] if key.startswith("!"): key = key[1:] ; o.switch[key] = value if op == "-": key = key[:-1] ; o.not_wanted[key] = value else: o.wanted[key] = value #if op == "-": continue if post == "-" : continue o.args.append(key) except ValueError: o.args.append(word) o.new_txt += " " + word o.new_txt = o.new_txt.strip() return o ## SETS
[docs]def unique(a): return list(set(a))
[docs]def intersect(a, b): return list(set(a) & set(b))
[docs]def union(a, b): return list(set(a) | set(b)) ## SIGNATURES
[docs]def make_signature(data): return str(hashlib.sha1(bytes(str(data), "utf-8")).hexdigest())
[docs]def verify_signature(data, signature): from zot import Object fromdisk = json.loads(data) signature2 = make_signature(fromdisk["data"]) return signature2 == signature ## FILES
[docs]def list_files(*args, **kwargs): path = args[0] res = [] if not path.endswith(os.sep): path += os.sep if "search" in kwargs: path += "*%s*" % kwargs["search"] if "*" not in path: path += "*" for fnn in glob.glob(path): if os.path.isdir(fnn): res.extend(list_files(fnn, **kwargs)) ; continue else: res.append(fnn) return res ## JOINS
[docs]def j(*args): if not args: return todo = list(map(str, filter(None, args))) return os.path.join(*todo)
[docs]def mj(*args): if not args: return todo = list(map(str, filter(None, args))) return os.path.join(*todo).replace(os.sep, ".")
[docs]def dj(*args): if not args: return todo = list(map(str, filter(None, args))) return os.path.join(*todo).replace(os.sep, "_")
[docs]def aj(sep=None, *args): return os.path.abspath(*j(sep, *args)) ## TIME
[docs]def dtime(stamp): return datetime.datetime.fromtimestamp(stamp)
[docs]def ptime(daystr): return datetime.datetime.strptime(daystr, '%Y-%m-%d')
[docs]def tdiff(d1, d2): return datetime.timedelta(d1, d2)
[docs]def rtime(): return str(datetime.datetime.now()).replace(" ", os.sep).replace(":", "_")
[docs]def ftime(datestr): return str(datestr.replace(" ", os.sep).replace(":", "_"))
[docs]def hms(): return str(datetime.datetime.today()).split()[1].split(".")[0]
[docs]def day(): return str(datetime.datetime.today()).split()[0]
[docs]def time_string(*args, **kwargs): timestamp = args[0] result = None try: result = str(datetime.datetime.fromtimestamp(timestamp)) except: error() return result
[docs]def time_time(*args, **kwargs): stamp = args[0] time_str = time_string(stamp) return time_str
[docs]def make_time(daystr): daystr = daystr.replace("_", ":") return time.mktime(time.strptime(daystr, "%a %b %d %H:%M:%S %Y"))
[docs]def a_time(daystr): daystr = daystr.replace("_", ":") if daystr: return time.mktime(time.strptime(daystr, "%Y-%m-%d %H:%M:%S"))
[docs]def b_time(daystr): if "saved" in obj: return a_time(obj.saved) return 0.0
[docs]def short_date(*args, **kwargs): # Mon, 25 Oct 2010 18:05:33 -0700 (PDT) # ['13', 'Oct', '2012', '20:43:46', '+0300'] date = args[0] if not date: return None date = date.replace("_", ":") res = date.split() ddd = "" try: if "+" in res[3]: raise ValueError if "-" in res[3]: raise ValueError int(res[3]) ddd = "{:4}-{:#02}-{:#02} {:6}".format(res[3], monthint[res[2]], int(res[1]), res[4]) except (IndexError, KeyError, ValueError): try: if "+" in res[4]: raise ValueError if "-" in res[4]: raise ValueError int(res[4]) ddd = "{:4}-{:#02}-{:02} {:6}".format(res[4], monthint[res[1]], int(res[2]), res[3]) except (IndexError, KeyError, ValueError): try: ddd = "{:4}-{:#02}-{:02} {:6}".format(res[2], monthint[res[1]], int(res[0]), res[3]) except (IndexError, KeyError): try: ddd = "{:4}-{:#02}-{:02}".format(res[2], monthint[res[1]], int(res[0])) except (IndexError, KeyError): ddd = "" return ddd.replace(":", "_")
[docs]def short_time(*args, **kwargs): date = args[0] if not date: return None date = date.replace("_", ":") res = date.split() ddd = "" try: if "+" in res[3]: raise ValueError if "-" in res[3]: raise ValueError int(res[3]) ddd = "{:6}".format(res[4]) except (IndexError, KeyError, ValueError): try: if "+" in res[4]: raise ValueError if "-" in res[4]: raise ValueError int(res[4]) ddd = "{:6}".format(res[3]) except (IndexError, KeyError, ValueError): try: ddd = "{:6}".format(res[3]) except (IndexError, KeyError): pass return ddd.replace(":", "_") ## NAMES
[docs]def get_type(obj): return get_named(obj).split(".")[-1]
[docs]def get_named(obj): return get_name(obj).split()[-1][1:-2]
[docs]def get_name(obj): t = str(type(obj)) s = str(obj) #print(t, s) if "function" in t: pre, post = s.split(" at ", 1) pre, post = pre.split() name = "<function '" + post + "'>" elif "module" in t: pre, post = s.split(" from ", 1) name = "<" + pre[1:] + ">" elif "class" in t: if "method" in t: pre, post = s.split(" of ", 1) name = "<method '" + pre.split()[-1] + "'>" else: pre, post = t.split("class ", 1) name = "<object '" + post[:-1][1:-1] + "'>" elif "built-in" in t: name = "<" + t + ">" return name ## HIGHEST
[docs]def get_highest(target, file_name): """ determine new file extension. """ highest = 0 for i in os.listdir(target): if file_name in i: try: seqnr = i.split('.')[-1] except IndexError: continue try: if int(seqnr) > highest: highest = int(seqnr) except ValueError: pass return file_name + '.' + str(highest + 1) ## STACK
[docs]def get_exception(*args, **kwargs): exctype, excvalue, tb = sys.exc_info() trace = traceback.extract_tb(tb) result = "" for i in trace: fname = i[0] linenr = i[1] func = i[2] plugfile = fname[:-3].split(os.sep) mod = [] for i in plugfile[::-1]: mod.append(i) if i == "zot": break ownname = '.'.join(mod[::-1]) result += "%s:%s %s | " % (ownname, linenr, func) del trace return "%s%s: %s" % (result, exctype, excvalue)
[docs]def get_plugname(*args, **kwargs): result = "" try: depth = args[0] except IndexError: depth = 1 loopframe = sys._getframe(depth) if not loopframe: return result res = [] fn = "" while 1: if depth <= 0: break depth -= 1 try: frame = loopframe.f_back except AttributeError: break if not frame: break fn = frame.f_code.co_filename loopframe = frame del loopframe return ".".join(fn.split(os.sep)[-3:])[:-3]
[docs]def get_frame(search="code"): result = {} frame = sys._getframe(1) search = str(search) for i in dir(frame): if search in i: target = getattr(frame, i) for j in dir(target): result[j] = getattr(target, j) return result
[docs]def get_strace(*args, **kwargs): result = "" try: depth = args[0] except IndexError: depth = 1 loopframe = sys._getframe(depth) if not loopframe: return result while 1: try: frame = loopframe.f_back except AttributeError: break if not frame: break linenr = frame.f_lineno func = frame.f_code.co_name result += "%s:%s | " % (func, linenr) loopframe = frame del loopframe return result[:-3]
[docs]def get_func(*args, **kwargs): result = "" if args: depth = args[0] else: depth = 2 loopframe = sys._getframe(depth) if not loopframe: return result func = None linenr = 0 while 1: try: frame = loopframe.f_back except AttributeError: break if not frame: break linenr = frame.f_lineno func = frame.f_code.co_name depth -= 1 if depth <= 0: break del loopframe return "%s:%s" % (func, linenr) ## ERROR
[docs]def error(*args, **kwargs): from zot.log import log txt = "^ " + get_exception() if args and args[0]: level = args[0].get("loglevel", "error") else: level = "error" log(level, txt) ## LOCATING
[docs]def get_source(mod, package): import pkg_resources as p source = os.path.abspath(p.resource_filename(mod, package)) logging.warn("source %s" % source) return source ## RESOLVING
[docs]def resolve_ip(hostname=None, timeout=1.0): oldtimeout = socket.getdefaulttimeout() socket.setdefaulttimeout(timeout) try: ip = socket.gethostbyname(hostname or socket.gethostname()) except socket.timeout: ip = None socket.setdefaulttimeout(oldtimeout) return ip
[docs]def resolve_host(ip=None, timeout=1.0): oldtimeout = socket.getdefaulttimeout() socket.setdefaulttimeout(timeout) try: host = socket.gethostbyaddr(ip or resolve_ip())[0] except socket.timeout: host = None socket.setdefaulttimeout(oldtimeout) return host ## DIRECTORIES
[docs]def touch(fname): logging.warn("touch %s" % fname) try: fd = os.open(fname, os.O_RDONLY | os.O_CREAT) ; os.close(fd) except: error()
[docs]def check_permissions(ddir, dirmask=dirmask, filemask=filemask): uid = os.getuid() gid = os.getgid() try: stat = os.stat(ddir) except OSError: cdir(ddir) ; stat = os.stat(ddir) if stat.st_uid != uid: os.chown(ddir, uid, gid) if os.path.isfile(ddir): mask = filemask else: mask = dirmask if stat.st_mode != mask: os.chmod(ddir, mask)
[docs]def cdir(path): res = "" for p in path.split(os.sep): res += "%s%s" % (p, os.sep) padje = os.path.abspath(os.path.normpath(res)) if os.path.isdir(padje): continue try: os.mkdir(padje) except FileExistsError: pass except OSError: error() return True ## HELPERS
[docs]def get_urls(data): urls = [] from bs4 import BeautifulSoup soup = BeautifulSoup(data) tags = soup('a') for tag in tags: href = tag.get("href") if href: href = href.split("#")[0] if not href: continue if not href.endswith(".html"): continue if ".." in href: continue if href.startswith("mailto"): continue if href not in urls: urls.append(href) logging.warn("found %s urls" % len(urls)) return urls
[docs]def stripbadchar(s): return "".join([c for c in s if ord(c) > 31 or c in allowedchars])
[docs]def enc_char(s): result = [] for c in s: if c in allowedchars: result.append(c) else: result.append(enc_name(c)) return "".join(result)
[docs]def enc_needed(s): return [c for c in s if c not in allowedchars]
[docs]def enc_name(input): return str(base64.urlsafe_b64encode(bytes(input, "utf-8")), "utf-8")
[docs]def split_txt(what, l=375): txtlist = [] start = 0 end = l length = len(what) for i in range(int(length/end+1)): endword = what.find(' ', end) if endword == -1: endword = length res = what[start:endword] if res: txtlist.append(res) start = endword end = start + l return txtlist
[docs]def pretty(a): if type(a) not in basic_types: return get_name(a) else: return bytes("%s\n" % a, "utf-8")
[docs]def smooth(a): if type(a) not in basic_types: return str(type(a)) else: return a
[docs]def full(a): if type(a) not in basic_types: return str(a) else: return a
[docs]def verzin(a): if type(a) is float and not a.is_integer(): return short_date(time.ctime(a)) if type(a) not in basic_types: return str(type(a)) else: return a
[docs]def hello(*args): if len(args) != 2: ver = __version__ else: ver = args[1] print("%s%s %s %s%s\n" % (YELLOW, args[0], ver, __copyright__, ENDC))
[docs]def list_eggs(filter="zot"): for f in sys.path: if ".egg" not in f: continue if filter and filter not in f: continue yield f
[docs]def show_eggs(filter="zot"): for path in list_eggs(filter): logging.warn(path)
[docs]def stripped(input): try: return input.split("/")[0] except: return input ## HEADER
headertxt = '''# this is an ZOTBOT file, %s # # this file can be edited !! ''' ## FEEDER def feed(text): from zot.object import Object result = [] chunks = text.split("\r\n") for chunk in chunks: obj = Object().feed(chunk) result.append(obj) return result ## PARSER
[docs]def parse_email(fn): from zot import Object f = open(fn ,"r", errors="replace", encoding="utf-8") mails = [] result = [] mess = "" nr = 0 go = True for line in f: if line.startswith("From "): mails.append(mess) ; mess = line ; continue mess += line for mess in mails: m = email.message_from_string(mess) o = Object() o.update(m.items()) o.text = "" for load in m.get_payload(): o.text += str(load) result.append(o) logging.warn("%s emails read" % len(result)) return result[1:] ## STRIPPERS
[docs]def strip_html(text): from bs4 import BeautifulSoup soup = BeautifulSoup(str(text)) return soup.get_text()
[docs]def strip_wiki(text): text = text.replace("[[", "") text = text.replace("]]", "") text = text.replace("}}", "") text = text.replace("{{", "") text = unescape(text) text = re.sub("<ref .*?/>", "", text) text = re.sub("<ref>.*?</ref>", "", text) text = re.sub("<ref .*?</ref>", "", text) return text ## ENCODING
[docs]def get_encoding(data): if hasattr(data, 'info') and 'content-type' in data.info and 'charset' in data.info['content-type'].lower(): charset = data.info['content-type'].lower().split('charset', 1)[1].strip() if charset[0] == '=': charset = charset[1:].strip() if ';' in charset: return charset.split(';')[0].strip() return charset if '<meta' in data.lower(): metas = re.findall('<meta[^>]+>', data, re.I | re.M) if metas: for meta in metas: test_http_equiv = re.search('http-equiv\s*=\s*[\'"]([^\'"]+)[\'"]', meta, re.I) if test_http_equiv and test_http_equiv.group(1).lower() == 'content-type': test_content = re.search('content\s*=\s*[\'"]([^\'"]+)[\'"]', meta, re.I) if test_content: test_charset = re.search('charset\s*=\s*([^\s\'"]+)', meta, re.I) if test_charset: return test_charset.group(1) if chardet: test = chardet.detect(data) if 'encoding' in test: return test['encoding'] return sys.getdefaultencoding() ## SED
[docs]def run_sed(filename, sedstring): """ replace oldzot strings with the new one. """ logging.warn("sed %s" % filename) f = open(filename, 'r') tmp = filename + '.tmp' fout = open(tmp, 'w') if sedstring: char = "#" seds = sedstring.split(char) fr = seds[1] to = seds[2] for line in f: l = re.sub(fr, to, line) fout.write(l) else: for line in f: l = re.sub("\t", " ", line.rstrip() + "\n") fout.write(l) fout.flush() fout.close() try: os.rename(tmp, filename) except WindowsError: os.remove(filename) ; os.rename(tmp, filename) ## URL
[docs]def fetch_url(type, url, myheaders={}, postdata={}, keyfile=None, certfile="", port=80): headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'text/plain; text/html; application/json', 'User-Agent': useragent()} headers.update(myheaders) urlparts = urllib.parse.urlparse(url) if "https" in url: connection = http.client.HTTPSConnection(urlparts[1]) # keyfile, certfile) else: connection = http.client.HTTPConnection(urlparts[1]) logging.info('%s %s' % (type, url)) connection.request(type, url) resp = connection.getresponse() logging.info("status %s (%s)" % (resp.status, resp.reason)) return resp
[docs]def need_redirect(resp): if resp.status == 301: url = resp.getheader("Location") ; return url ## TO/FROM
[docs]def to_enc(what, encoding='utf-8'): if not what: what= "" w = str(what) return w.encode(encoding)
[docs]def from_enc(txt, encoding='utf-8', what=""): if not txt: txt = "" if type(txt) == str: return txt try: return txt.decode(encoding) except UnicodeDecodeError: return decodeperchar(txt, encoding, what) ## PER CHARACTER
[docs]def decode_char(txt, encoding='utf-8', what=""): res = [] ; nogo = [] for i in txt: try: res.append(i.decode(encoding)) except UnicodeDecodeError: if i not in nogo: nogo.append(i) if nogo: logging.info("nogo: %s" % " ".join(nogo)) return "".join(res) ## OPTIONS
[docs]def make_opts(options): from zot import __version__ parser = optparse.OptionParser(usage='usage: %prog [options]', version=str(__version__)) for option in options: type, default, dest, help = option[2:] if "store" in type: try: parser.add_option(option[0], option[1], action=type, default=default, dest=dest, help=help) except Exception as ex: logging.error("error: %s - option: %s" % (str(ex), option)) ; continue else: try: parser.add_option(option[0], option[1], type=type, default=default, dest=dest, help=help) except Exception as ex: logging.error("error: %s - option: %s" % (str(ex), option)) ; continue args = parser.parse_args() return args ## PARSING
[docs]def parse_url(*args, **kwargs): """ Attribute Index Value Value if not present scheme 0 URL scheme specifier empty string netloc 1 Network location part empty string path 2 Hierarchical path empty string query 3 Query component empty string fragment 4 Fragment identifier empty string """ url = args[0] parsed = urllib.parse.urlsplit(url) target = parsed[2].split("/") if "." in target[-1]: basepath = "/".join(target[:-1]) ; file = target[-1] else: basepath = parsed[2] ; file = None if basepath.endswith("/"): basepath = basepath[:-1] base = urllib.parse.urlunsplit((parsed[0], parsed[1], basepath , "", "")) root = urllib.parse.urlunsplit((parsed[0], parsed[1], "", "", "")) return (basepath, base, root, file)
[docs]def parse_urls(*args, **kwargs): import bs4 url, txt = args basepath, base, root, file = parse_url(url) s = bs4.BeautifulSoup(txt) urls = [] tags = s('a') for tag in tags: href = tag.get("href") if href: href = href.split("#")[0] if not href: continue if not href.endswith(".html"): continue if ".." in href: continue if href.startswith("mailto"): continue if not "http" in href: if href.startswith("/"): href = root + href else: href = base + "/" + href if not root in href: logging.warn("%s not in %s" % (root, href)) ; continue if href not in urls: urls.append(href) logging.warn("found %s urls" % len(urls)) return urls ## GENERICS
[docs]def reduced_keys(*args, **kwargs): inlist = args[0] res = [] for key in inlist: k = str(key) if k.startswith("_"): continue if k.startswith("X"): continue if k.startswith("x"): continue if not k.islower(): continue if "-" in key: continue if k not in res: res.append(key) if k in ["args", "rest", "first"]: continue if k not in res: res.append(k) return res
[docs]def feed(text): from zot.object import Object result = [] chunks = text.split("\r\n") for chunk in chunks: obj = Object().feed(chunk) result.append(obj) return result ## day string
[docs]def nr_days(seconds): return int(seconds/(60*60*24))
[docs]def elapsed(seconds): txt = "" nsec = int(float(seconds)) year = 365*24*60*60 week = 7*24*60*60 day = 24*60*60 hour = 60*60 minute = 60 #nsec -= nsec * leapfactor years = int(nsec/year) nsec -= years*year weeks = int(nsec/week) nsec -= weeks*week days = int(nsec/day) nsec -= days*day hours = int(nsec/hour) nsec -= hours*hour minutes = int(nsec/minute) sec = nsec - minutes*minute txt += "%sy" % years txt += "%sw" % weeks txt += "%sd" % days txt += " %sh" % hours txt += "%sm" % minutes txt += "%ss" % int(sec) if txt: return txt else: return "0s"
[docs]def elapsed_days(seconds): txt = "" nsec = int(float(seconds)) year = 365*24*60*60 week = 7*24*60*60 day = 24*60*60 hour = 60*60 minute = 60 #nsec -= nsec * leapfactor years = int(nsec/year) nsec -= years*year weeks = int(nsec/week) nsec -= weeks*week days = int(nsec/day) nsec -= days*day hours = int(nsec/hour) nsec -= hours*hour minutes = int(nsec/minute) sec = nsec - minutes*minute if years: txt += "%sy" % years if weeks: days += weeks * 7 if days: txt += "%sd" % days txt += " " if hours: txt += "%sh" % hours if minutes: txt += "%sm" % minutes if sec and not hours: txt += "%ss" % sec if txt: return txt.strip() else: return "0s"
[docs]def str_day(seconds): txt = "" nsec = int(float(seconds)) year = 365*24*60*60 week = 7*24*60*60 day = 24*60*60 hour = 60*60 minute = 60 #nsec -= nsec * leapfactor years = int(nsec/year) nsec -= years*year weeks = int(nsec/week) nsec -= weeks*week days = int(nsec/day) nsec -= days*day hours = int(nsec/hour) nsec -= hours*hour minutes = int(nsec/minute) sec = nsec - minutes*minute if years: txt = "%sy" % years if weeks: txt += "%sw" % weeks ; return txt if days: txt = "%sd" % days ; return txt if hours: txt = "%sh" % hours ; return txt if minutes: txt = "%sm" % minutes ; return txt if sec: txt = "%ss" % int(sec) if txt: return txt else: return "0s" ## strtotime function
[docs]def get_day(daystr): """ convert string to time. """ try: dmyre = re.search('(\d+)-(\d+)-(\d+)', daystr) (day, month, year) = dmyre.groups() day = int(day) month = int(month) year = int(year) if day <= calendar.monthrange(year, month)[1]: date = "%s %s %s" % (day, bdmonths[month], year) return time.mktime(time.strptime(date, "%d %b %Y")) except AttributeError: return 0 except ValueError: return 0
[docs]def get_hour(daystr): try: hmsre = re.search('(\d+):(\d+):(\d+)', daystr) hours = 60 * 60 * (int(hmsre.group(1))) hoursmin = hours + int(hmsre.group(2)) * 60 hms = hoursmin + int(hmsre.group(3)) return hms except AttributeError: pass except ValueError: pass try: hmre = re.search('(\d+):(\d+)', daystr) hours = 60 * 60 * (int(hmre.group(1))) hms = hours + int(hmre.group(2)) * 60 return hms except AttributeError: return 0 except ValueError: return 0 ## today function
[docs]def today(): """ return time of 0:00 today. """ if time.daylight: ttime = time.ctime(time.time() + int(time.timezone) + 3600) else: ttime = time.ctime(time.time() + int(time.timezone)) matched = re.search(timere, ttime) if matched: temp = "%s %s %s" % (matched.group(3), matched.group(2), matched.group(7)) timestring = time.strptime(temp, "%d %b %Y") result = time.mktime(timestring) return result