# point/utils.py
#
#
""" utils package. """
## IMPORT
from point import __version__
from point.defines import *
from queue import Queue, Empty as QueueEmpty
from traceback import format_exc
from collections import deque
from cgi import escape
import urllib.request, urllib.error, urllib.parse
import urllib.parse
import html.parser
import traceback
import mailbox
import datetime
import optparse
import _thread
import hashlib
import logging
import urllib
import string
import email
import html
import types
import http
import json
import time
import math
import glob
import sys
import os
import re
## start_new_thread alias
run_thr = _thread.start_new_thread
## AGENT
[docs]def useragent(): return 'Mozilla/5.0 (X11; Linux x86_64); POINT %s; http://pikacode.com/bthate/point)' % __version__
## ISTR
[docs]class istr(str): pass
## UNESCAPE
[docs]def unescape(text): return html.parser.HTMLParser().unescape(text)
## txt_parse fucntion
[docs]def txt_parse(txt):
from point import Object
o = Object()
o.args = []
o.wanted = Object()
o.not_wanted = Object()
for word in txt.split():
try:
key, value = word.split("=")
op = key[-1]
post = value[-1]
if post == "-": value = value[:-1]
if op == "-": key = key[:-1] ; o.not_wanted[key] = value
else: o.wanted[key] = value
if post == "-" : continue
if key not in o.args: o.args.append(key)
except ValueError:
if word not in o.args: o.args.append(word)
return o
## SETS
[docs]def unique(a):
""" return the list with duplicate elements removed """
return list(set(a))
[docs]def intersect(a, b):
""" return the intersection of two lists """
return list(set(a) & set(b))
[docs]def union(a, b):
""" return the union of two lists """
return list(set(a) | set(b))
## BLAET
[docs]def blaet(target):
res = []
if "time_start" in target: res.append("%s" % str_day(time.time() - float(target.time_start)))
if "time_in" in target: res.append("%s" % str_day(time.time() - float(target.time_in)))
if "time_sleep" in target: res.append("%s" % str_day(float(target.time_sleep) - (time.time() - target.time_in)))
return "/".join(res)
## SIGNATURES
[docs]def make_signature(data): return str(hashlib.sha1(bytes(str(data), "utf-8")).hexdigest())
[docs]def verify_signature(data, signature):
from point import Object
fromdisk = json.loads(data)
signature2 = make_signature(fromdisk["data"])
return signature2 == signature
## FILES
[docs]def list_files(*args, **kwargs):
path = args[0]
res = []
if not path.endswith(os.sep): path += os.sep
if "search" in kwargs: path += "*%s*" % kwargs["search"]
if "*" not in path: path += "*"
for fnn in glob.glob(path):
if os.path.isdir(fnn): res.extend(list_files(fnn, **kwargs)) ; continue
else: res.append(fnn)
return res
## JOINS
[docs]def j(*args):
if not args: return
todo = list(map(str, filter(None, args)))
return os.path.join(*todo)
[docs]def mj(*args):
if not args: return
todo = list(map(str, filter(None, args)))
return os.path.join(*todo).replace(os.sep, ".")
[docs]def dj(*args):
if not args: return
todo = list(map(str, filter(None, args)))
return os.path.join(*todo).replace(os.sep, "_")
[docs]def aj(sep=None, *args): return os.path.abspath(*j(sep, *args))
## TIME
[docs]def dtime(stamp): return datetime.datetime.fromtimestamp(stamp)
[docs]def ptime(daystr): return datetime.datetime.strptime(daystr, '%Y-%m-%d')
[docs]def tdiff(d1, d2): return datetime.timedelta(d1, d2)
[docs]def rtime(): return str(datetime.datetime.now()).replace(" ", "-=-")
[docs]def hms(): return str(datetime.datetime.today()).split()[1].split(".")[0]
[docs]def day(): return str(datetime.datetime.today()).split()[0]
[docs]def time_string(*args, **kwargs):
timestamp = args[0]
result = None
try: result = str(datetime.datetime.fromtimestamp(timestamp))
except: error()
return result
[docs]def time_time(*args, **kwargs):
stamp = args[0]
time_str = time_string(stamp)
return time_str
[docs]def time_stamp(*args, **kwargs):
daystr = args[0].strip()
instring = ""
for spl in daystr.split():
instring += "%s " % spl
for format in dayformats:
logging.info("trying %s" % format)
try: res = datetime.datetime.strptime(instring, format).timestamp() ; return res
except ValueError: continue
return 0.0
[docs]def make_time(daystr): return time.mktime(time.strptime(daystr, "%a %b %d %H:%M:%S %Y"))
[docs]def a_time(daystr):
if "." in daystr: daystr = daystr.split(".")[0]
try: return time.mktime(time.strptime(daystr, "%Y-%m-%d %H:%M:%S"))
except Exception as ex: error()
[docs]def short_date(*args, **kwargs):
date = args[0]
if not date: return None
res = []
for d in date.split():
if "," in str(d): continue
res.append(d)
ddd = None
try: ddd = "{:4}-{:#02}-{:#02} {:6}".format(res[2], monthint[res[1]], int(res[0]), res[3])
except (IndexError, KeyError, ValueError):
try: ddd = "{:4}-{:#02}-{:#02} {:6}".format(res[3], monthint[res[2]], int(res[1]), res[4])
except (IndexError, KeyError, ValueError): logging.debug("can't parse date %s" % date)
return ddd
[docs]def to_time(*args, **kwargs):
date = args[0]
res = []
for d in date.split():
if "," in str(d): continue
res.append(d)
ddd = None
try: ddd = "{:4}-{:#02}-{:02} {:6}".format(res[4], monthint[res[1]], int(res[2]), res[3])
except (IndexError, KeyError): ddd = ""
return ddd
## NAMES
[docs]def get_modname(obj):
name = obj.__class__.__module__
return name
[docs]def get_clsname(obj):
name = str(obj.__class__)
return name.split(" ")[1][1:-2]
[docs]def get_cls(obj): return get_clsname(obj).split(".")[-1]
[docs]def get_funcname(str_in):
return str_in.split()[1]
## STACK
[docs]def get_exception(*args, **kwargs):
exctype, excvalue, tb = sys.exc_info()
trace = traceback.extract_tb(tb)
result = ""
for i in trace:
fname = i[0]
linenr = i[1]
func = i[2]
plugfile = fname[:-3].split(os.sep)
mod = []
for i in plugfile[::-1]: mod.append(i)
ownname = '.'.join(mod[::-1])
result += "%s:%s %s | " % (ownname, linenr, func)
del trace
return "%s%s: %s" % (result, exctype, excvalue)
[docs]def get_plugname(*args, **kwargs):
result = ""
depth = args[0]
loopframe = sys._getframe(depth)
if not loopframe: return result
res = []
fn = ""
frame = None
while 1:
if depth <= 0: break
depth -= 1
try: frame = loopframe.f_back
except AttributeError: break
if not frame: break
fn = frame.f_code.co_filename
del loopframe
return fn
[docs]def get_frame(search="code"):
result = {}
frame = sys._getframe(1)
search = str(search)
for i in dir(frame):
if search in i:
target = getattr(frame, i)
for j in dir(target):
result[j] = getattr(target, j)
return result
[docs]def get_strace(*args, **kwargs):
result = ""
depth = args[0]
loopframe = sys._getframe(depth)
if not loopframe: return result
while 1:
try: frame = loopframe.f_back
except AttributeError: break
if not frame: break
linenr = frame.f_lineno
func = frame.f_code.co_name
result += "%s:%s | " % (func, linenr)
loopframe = frame
del loopframe
return result
[docs]def get_how(*args, **kwargs):
result = ""
depth = args[0]
loopframe = sys._getframe(depth)
if not loopframe: return result
while 1:
try: frame = loopframe.f_back
except AttributeError: break
if not frame: break
linenr = frame.f_lineno
func = frame.f_code.co_name
result = "%s:%s" % (func, linenr)
loopframe = frame
if depth == 0: return result
depth -= 1
del loopframe
return result
[docs]def get_func(*args, **kwargs):
result = ""
depth = args[0]
loopframe = sys._getframe(depth)
if not loopframe: return result
func = None
while 1:
try: frame = loopframe.f_back
except AttributeError: break
if not frame: break
linenr = frame.f_lineno
func = frame.f_code.co_name
depth -= 1
if depth <= 0: break
del loopframe
return func
[docs]def error(*args, **kwargs): msg = get_exception() ; logging.error(msg) ; return msg
## LOCATING
[docs]def get_source(mod, package):
import pkg_resources as p
source = os.path.abspath(p.resource_filename(mod, package))
logging.info("source is %s" % source)
return source
## RESOLVING
[docs]def resolve_ip(hostname=None, timeout=1.0):
oldtimeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(timeout)
try: ip = socket.gethostbyname(hostname or socket.gethostname())
except socket.timeout: ip = None
socket.setdefaulttimeout(oldtimeout)
return ip
[docs]def resolve_host(ip=None, timeout=1.0):
oldtimeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(timeout)
try: host = socket.gethostbyaddr(ip or resolve_ip())[0]
except socket.timeout: host = None
socket.setdefaulttimeout(oldtimeout)
return host
## DIRECTORIES
[docs]def touch(fname):
try: fd = os.open(fname, os.O_RDONLY | os.O_CREAT) ; os.close(fd)
except: error()
[docs]def check_permissions(ddir, dirmask=dirmask, filemask=filemask):
uid = os.getuid()
gid = os.getgid()
try: stat = os.stat(ddir)
except OSError: make_dir(ddir) ; stat = os.stat(ddir)
if stat.st_uid != uid: os.chown(ddir, uid, gid)
if os.path.isfile(ddir): mask = filemask
else: mask = dirmask
if stat.st_mode != mask: os.chmod(ddir, mask)
[docs]def make_dir(path):
target = os.sep
for item in path.split(target)[:-1]:
target = j(target, item)
try: os.mkdir(target)
except OSError as ex: logging.debug(ex) ; continue
check_permissions(target)
return path
## HELPERS
[docs]def stripbadchar(s): return "".join([c for c in s if ord(c) > 31 or c in allowedchars])
[docs]def enc_char(s):
result = []
for c in s:
if c in allowedchars: result.append(c)
else: result.append(enc_name(c))
return "".join(result)
[docs]def enc_needed(s): return [c for c in s if c not in allowedchars]
[docs]def enc_name(input): return str(base64.urlsafe_b64encode(bytes(input, "utf-8")), "utf-8")
[docs]def split_txt(what, l=375):
txtlist = []
start = 0
end = l
length = len(what)
for i in range(int(length/end+1)):
starttag = what.find("</", end)
if starttag != -1: endword = what.find('>', end) + 1
else:
endword = what.find(' ', end)
if endword == -1: endword = length
res = what[start:endword]
if res: txtlist.append(res)
start = endword
end = start + l
return txtlist
[docs]def smooth(a):
if type(a) not in basic_types: return get_cls(a)
else: return a
[docs]def make_version(name=""): return "%s%s #%s ! %s%s" % (YELLOW, name, __version__, time.ctime(time.time()), ENDC)
[docs]def hello(name=""): print(make_version(name) + "\n")
[docs]def list_eggs(filter=""):
for f in sys.path:
if ".egg" not in f: continue
if filter and filter not in f: continue
yield f
[docs]def show_eggs(filter="point"):
for egg in list_eggs(filter): logging.warn("%s egg: %s" % (filter, egg))
[docs]def stripped(input):
try: return input.split("/")[0]
except: return input
## HEADER
headertxt = '''# %s
#
# this is an p (#%s) file, %s
#
# this file can be edited !!
'''
## FEEDER
def feed(text):
from point import Object
result = []
chunks = text.split("\r\n")
for chunk in chunks:
obj = Object().feed(chunk)
result.append(obj)
return result
## PARSER
[docs]def parse_email(fn):
from point import Object
f = open(fn ,"r", errors="replace", encoding="utf-8")
mails = []
result = []
mess = ""
nr = 0
go = True
for line in f:
if line.startswith("From "): mails.append(mess) ; mess = line ; continue
mess += line
for mess in mails:
m = email.message_from_string(mess)
o = Object()
o.update(m.items())
o.text = ""
for load in m.get_payload(): o.text += str(load)
result.append(o)
logging.warn("%s emails read" % len(result))
return result[1:]
## STRIPPERS
[docs]def strip_html(text):
from bs4 import BeautifulSoup
soup = BeautifulSoup(str(text))
return soup.get_text()
[docs]def strip_wiki(text):
text = text.replace("[[", "")
text = text.replace("]]", "")
text = text.replace("}}", "")
text = text.replace("{{", "")
text = unescape(text)
text = re.sub("<ref .*?/>", "", text)
text = re.sub("<ref>.*?</ref>", "", text)
text = re.sub("<ref .*?</ref>", "", text)
return text
## ENCODING
[docs]def get_encoding(data):
if hasattr(data, 'info') and 'content-type' in data.info and 'charset' in data.info['content-type'].lower():
charset = data.info['content-type'].lower().split('charset', 1)[1].strip()
if charset[0] == '=':
charset = charset[1:].strip()
if ';' in charset: return charset.split(';')[0].strip()
return charset
if '<meta' in data.lower():
metas = re.findall('<meta[^>]+>', data, re.I | re.M)
if metas:
for meta in metas:
test_http_equiv = re.search('http-equiv\s*=\s*[\'"]([^\'"]+)[\'"]', meta, re.I)
if test_http_equiv and test_http_equiv.group(1).lower() == 'content-type':
test_content = re.search('content\s*=\s*[\'"]([^\'"]+)[\'"]', meta, re.I)
if test_content:
test_charset = re.search('charset\s*=\s*([^\s\'"]+)', meta, re.I)
if test_charset: return test_charset.group(1)
if chardet:
test = chardet.detect(data)
if 'encoding' in test: return test['encoding']
return sys.getdefaultencoding()
## URL RELATED
[docs]def do_url(type, url, myheaders={}, postdata={}, keyfile=None, certfile="", port=80):
headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'text/plain; text/html', 'User-Agent': useragent()}
headers.update(myheaders)
urlparts = urllib.parse.urlparse(url)
if "https" in url: connection = http.client.HTTPSConnection(urlparts[1]) # keyfile, certfile)
else: connection = http.client.HTTPConnection(urlparts[1])
postdata = urllib.parse.urlencode(postdata)
logging.warn('%s %s' % (type, url))
connection.request(type, urlparts[2], postdata, headers)
resp = connection.getresponse()
logging.warn("status %s (%s)" % (resp.status, resp.reason))
return resp
[docs]def need_redirect(resp):
if resp.status == 301: url = resp.getheader("Location") ; return url
## TO/FROM
[docs]def to_enc(what, encoding='utf-8'):
if not what: what= ""
w = str(what)
return w.encode(encoding)
[docs]def from_enc(txt, encoding='utf-8', what=""):
if not txt: txt = ""
if type(txt) == str: return txt
try: return txt.decode(encoding)
except UnicodeDecodeError: return decodeperchar(txt, encoding, what)
## PER CHARACTER
[docs]def decode_char(txt, encoding='utf-8', what=""):
res = [] ; nogo = []
for i in txt:
try: res.append(i.decode(encoding))
except UnicodeDecodeError:
if i not in nogo: nogo.append(i)
if nogo: logging.warn("nogo: %s" % " ".join(nogo))
return "".join(res)
## OPTIONS
[docs]def make_opts():
parser = optparse.OptionParser(usage='usage: %prog [options]', version=__version__)
for option in options:
type, default, dest, help = option[2:]
if "store" in type:
try: parser.add_option(option[0], option[1], action=type, default=default, dest=dest, help=help)
except Exception as ex: logging.error("error: %s - option: %s" % (str(ex), option)) ; continue
else:
try: parser.add_option(option[0], option[1], type=type, default=default, dest=dest, help=help)
except Exception as ex: logging.error("error: %s - option: %s" % (str(ex), option)) ; continue
args = parser.parse_args()
return args
## PARSING
[docs]def parse_url(*args, **kwargs):
"""
Attribute Index Value Value if not present
scheme 0 URL scheme specifier empty string
netloc 1 Network location part empty string
path 2 Hierarchical path empty string
query 3 Query component empty string
fragment 4 Fragment identifier empty string
"""
url = args[0]
parsed = urllib.parse.urlsplit(url)
target = parsed[2].split("/")
if "." in target[-1]: basepath = "/".join(target[:-1]) ; file = target[-1]
else: basepath = parsed[2] ; file = None
if basepath.endswith("/"): basepath = basepath[:-1]
base = urllib.parse.urlunsplit((parsed[0], parsed[1], basepath , "", ""))
root = urllib.parse.urlunsplit((parsed[0], parsed[1], "", "", ""))
return (basepath, base, root, file)
[docs]def parse_urls(*args, **kwargs):
import bs4
url, txt = args
basepath, base, root, file = parse_url(url)
s = bs4.BeautifulSoup(txt)
urls = []
tags = s('a')
for tag in tags:
href = tag.get("href")
if href:
href = href.split("#")[0]
if not href: continue
if not href.endswith(".html"): continue
if ".." in href: continue
if href.startswith("mailto"): continue
if not "http" in href:
if href.startswith("/"): href = root + href
else: href = base + "/" + href
if not root in href: logging.warn("%s not in %s" % (root, href)) ; continue
if href not in urls: urls.append(href)
logging.warn("found %s urls" % len(urls))
return urls
## GENERICS
[docs]def reduced_keys(*args, **kwargs):
inlist = args[0]
res = []
for key in inlist:
k = str(key)
if k.startswith("_"): continue
if k.startswith("X"): continue
if k.startswith("x"): continue
if not k.islower(): continue
if "-" in key: continue
if k not in res: res.append(key)
if k in ["args", "rest", "first"]: continue
if k not in res: res.append(k)
return res
[docs]def feed(text):
from point import Object
result = []
chunks = text.split("\r\n")
for chunk in chunks:
obj = Object().feed(chunk)
result.append(obj)
return result
[docs]def dispatch(target, event, cmnd, *args, **kwargs):
try: functions = target[cmnd]
except KeyError: return False
for func in functions: func(event)
return event
[docs]def resolve(*args, **kwargs):
from point import kernel
event = args[0]
event.prepare()
e = None
e = dispatch(kernel, event, event.ucmnd or event.etype, *args, **kwargs)
return e
[docs]def need_skip(obj, black=[], white=[]):
needskip = False
try: value = obj.get_content_type()
except AttributeError: return False
if value in black: needskip = True
if value not in white: needskip = True
return needskip
## day string
[docs]def str_day(seconds):
nsec = int(float(seconds))
year = 365*24*60*60
week = 7*24*60*60
day = 24*60*60
hour = 60*60
minute = 60
nsec -= nsec * leapfactor
years = int(nsec/year)
nsec -= years*year
weeks = int(nsec/week)
nsec -= weeks*week
days = int(nsec/day)
nsec -= days*day
hours = int(nsec/hour)
nsec -= hours*hour
minutes = int(nsec/minute)
sec = int(nsec - minutes*minute)
if years: return "%sy%sd%sh%sm%ss" % (years, days, hours, minutes, sec)
if days: return "%sd%sh%sm%ss" % (days, hours, minutes, sec)
if hours: return "%sh%sm%ss" % (hours, minutes, sec)
if minutes: return "%sm%ss" % (minutes, sec)
return "%ss" % sec
## FORMAT
[docs]def parse(txt):
result = Object()
for word in txt.split():
if word.startswith("."): result["ucmnd"] = word
if word.startswith("-"): result["opt_%s" % word] = "" ; continue
try: key, value = word.split("=")
except ValueError: result[word] = ""
result[key] = value
return result
## strtotime function
[docs]def strtotime(what):
""" convert string to time. """
daymonthyear = 0
hoursmin = 0
try:
dmyre = re.search('(\d+)-(\d+)-(\d+)', str(what))
if dmyre:
(day, month, year) = dmyre.groups()
day = int(day)
month = int(month)
year = int(year)
if day <= calendar.monthrange(year, month)[1]:
date = "%s %s %s" % (day, bdmonths[month], year)
daymonthyear = time.mktime(time.strptime(date, "%d %b %Y"))
else: return None
else:
dmre = re.search('(\d+)-(\d+)', str(what))
if dmre:
year = time.localtime()[0]
(day, month) = dmre.groups()
day = int(day)
month = int(month)
if day <= calendar.monthrange(year, month)[1]:
date = "%s %s %s" % (day, bdmonths[month], year)
daymonthyear = time.mktime(time.strptime(date, "%d %b %Y"))
else: return None
hmsre = re.search('(\d+):(\d+):(\d+)', str(what))
if hmsre:
(h, m, s) = hmsre.groups()
h = int(h)
m = int(m)
s = int(s)
if h > 24 or h < 0 or m > 60 or m < 0 or s > 60 or s < 0: return None
hours = 60 * 60 * (int(hmsre.group(1)))
hoursmin = hours + int(hmsre.group(2)) * 60
hms = hoursmin + int(hmsre.group(3))
else:
hmre = re.search('(\d+):(\d+)', str(what))
if hmre:
(h, m) = hmre.groups()
h = int(h)
m = int(m)
if h > 24 or h < 0 or m > 60 or m < 0: return None
hours = 60 * 60 * (int(hmre.group(1)))
hms = hours + int(hmre.group(2)) * 60
else: hms = 0
return hms
except OverflowError: return None
except ValueError:return None
except Exception as ex: pass
## today function
[docs]def today():
""" return time of 0:00 today. """
if time.daylight: ttime = time.ctime(time.time() + int(time.timezone) + 3600)
else: ttime = time.ctime(time.time() + int(time.timezone))
matched = re.search(timere, ttime)
if matched:
temp = "%s %s %s" % (matched.group(3), matched.group(2), matched.group(7))
timestring = time.strptime(temp, "%d %b %Y")
result = time.mktime(timestring)
return result