# -*- coding: utf-8 -*-
"""
This module holds core-like methods for library modules but not for the hole package
"""
from __future__ import print_function
# system
from future import standard_library
standard_library.install_aliases()
from builtins import filter
from builtins import zip
from builtins import range
from past.builtins import basestring
from builtins import object
import sys
import os
import inspect
import types
from time import time, sleep
import numpy as np
from fnmatch import fnmatch,fnmatchcase
from itertools import groupby
from collections import OrderedDict
from string import Formatter
formater = Formatter() # string formatter str.format
# ----------------------------BASIC FUNCTIONS---------------------------- #
[docs]class StdoutSIM(object):
"""
Simple logger to simulate stdout output adding a closed control
"""
def __init__(self, stdout = None, closed = False):
if stdout is None:
stdout = sys.stdout
self.stdout = stdout
self.closed = closed
[docs] def write(self, text, **kwargs):
if not self.closed:
self.stdout.write(text,**kwargs)
[docs] def printline(self, text, **kwargs):
if not self.closed:
self.stdout.printline(text, **kwargs)
[docs] def printlines(self, lines, **kwargs):
if not self.closed:
self.stdout.printlines(lines, **kwargs)
[docs] def flush(self):
if not self.closed:
self.stdout.flush()
[docs] def close(self):
"""
do nothing
"""
pass
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
stdout = StdoutSIM()
[docs]class StdoutMULTI(object):
"""
Enclose several file-like objects.
:param file_list = list of file-like objects
"""
def __init__(self, file_list):
self.file_list = file_list
[docs] def write(self, text, **kwargs):
for log in self.file_list:
log.write(text, **kwargs)
[docs] def printline(self, text, **kwargs):
for log in self.file_list:
log.printline(text, **kwargs)
[docs] def printlines(self, lines, **kwargs):
for log in self.file_list:
log.printlines(lines, **kwargs)
[docs] def flush(self,**kwargs):
for log in self.file_list:
log.flush(**kwargs)
[docs] def close(self,**kwargs):
for log in self.file_list:
log.close(**kwargs)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
[docs]class StdoutLOG(StdoutMULTI):
"""
Simple logger to save stdout output
so anything printed in the console is logged to a file.
:param path: path to logging file
:param mode: mode for opening the file.
:param add_stdout: if True closes previous logs
and continues with new log
"""
def __init__(self, path, mode = "w+", add_stdout = True):
self.closed = False
self._old = sys.stdout # register old logger
self._cleaned = False
if hasattr(path,"write"): # must be file-lie
self._file = path
self._path = None
self.mode = None
else:
self._file = open(path, mode)
self._path = path
self.mode = mode
# keep logs to write to
if add_stdout:
file_list = [self._old, self._file]
else:
file_list = [self._file]
super(StdoutLOG,self).__init__(file_list=file_list)
sys.stdout = self # register log
[docs] def close(self,**kwargs):
"""
Close StdoutLOG but it does not ensures cleanup to
restore sys.stdout
"""
if not self.closed:
# close file
self._file.close()
# unregister this log
if self._file in self.file_list:
# removes from file_list of loggers to prevent output
# but sys.stdout is not replaced with old to not effect
# anything done after the instance creating
self.file_list.remove(self._file)
self.closed = True
self._cleanup()
def _cleanup(self):
"""
Ensure first sys.stdout is restored after all
StdoutLOG are closed
"""
if not self._cleaned and self.closed:
# if possible restore sys.stdout as it was before
if sys.stdout is self: # output not replaced
sys.stdout = self._old # return old log
# at least this node has been cleaned
self._cleaned = True
try: # try to clean up old references
self._old._cleanup()
# overheats more but deletes circular references
if self._old._cleaned and self._old in self.file_list:
self.file_list.remove(self._old)
except AttributeError:
pass # old node not cleaned
# other node must have the old reference
[docs]def decorateInstanceMethods(self, decorator,excludeMth=("__init__"),includeMth=None):
"""
Decorate methods in an instance. It should be used in the __init__ method of a class.
:param self: class instance.
:param decorator: decorator function to apply to self.
:param excludeMth: list of methods to exclude.
:param includeMth: list of methods to include if not in exclude.
if excludeMth is None then decorateInstanceMethods checks for includeMth list.
if includeMth and excludeMth is None then all methods of self are decorated.
:return: self
.. note:: It must be used at instance initialization (i.e. inside __init__ method)
"""
classmethods = dict(inspect.getmembers(self, predicate=inspect.ismethod))
#if "__init__" in classmethods.keys(): del classmethods["__init__"] # init method should not be decorated
for key in classmethods:
if excludeMth and key in excludeMth:
continue
if includeMth is None or key in includeMth:
setattr(self, key, decorator(classmethods[key]))
return self
# ----------------------------DECORATORS---------------------------- #
[docs]def addto(instance,funcname=None):
"""
Decorator: Add function as method to instance.
:param instance: class instance.
:param funcname: name to register in instance.
:return:
"""
def decorator(fn):
fn = types.MethodType(fn, instance, instance.__class__) # convert to bound method
if funcname:
setattr(instance, funcname, fn)
else:
setattr(instance, fn.__name__, fn) # set fn method with name fn.func_name in instance
return fn
return decorator
# ----------------------------CLASS OBJECTS---------------------------- #
# ----------------------------DECORATED FUNCTIONS---------------------------- #
# ----------------------------SPECIALISED FUNCTIONS---------------------------- #
[docs]class FactorConvert(object):
"""
Keep track of factor and converts to any available factor.
"""
_factors = (("exa", "E", 1000000000000000000),
("peta","P",1000000000000000),
("tera","T",1000000000000),
("giga","G",1000000000),
("mega","M",1000000),
("kilo","k",1000),
("hecto","h",100),
("deca","da",10),
(None,"",1),
("deci","d",0.1),
("centi","c",0.01),
("milli","m",0.001),
("micro","u",0.000001),
("nano","n",0.000000001),
("pico","p",0.000000000001),
("femto","f",0.000000000000001),
("atto","a",0.000000000000000001))
def __init__(self, factor = None, abbreviate = True):
"""
:param factor: anything to look in factors (i.e. factor list with Factor structures).
:param abbreviate: index to return from Factor structure when factor is asked.
.. note: A factor structure is of the form ("Name","abbreviation",value)
"""
self._factor = None
self._factorsCache = None
self.abbreviate = abbreviate
self.factors = self._factors
self.factor = factor
@property
def factors(self):
return self._factors
@factors.setter
def factors(self, value):
self._factorsCache = list(zip(*value)) # itertools.izip is faster but this operation is one time
self._factors = value
@factors.deleter
def factors(self):
raise Exception("Property cannot be deleted")
@property
def factor(self):
return self._factor[self.abbreviate]
@factor.setter
def factor(self, value):
self._factor = self.getFactor(value) # transform units
@factor.deleter
def factor(self):
raise Exception("Property cannot be deleted")
[docs] def convert(self, factor, to = None):
"""
Convert from actual factor to another factor.
:param factor: number
:param to: factor to convert
:return: converted value, units
"""
to = self.getFactor(to)
return factor * self._factor[2] / float(to[2]), to[self.abbreviate] # converted value, factor
[docs] def convert2sample(self, factor, to = None):
"""
Convert to resemble sample.
:param factor: number
:param to: sample factor.
:return: converted value, units
"""
if to is None or to==0:
unit = None
else: # calculate as fast as possible
try:
unit = factor * self._factor[2] / to
except:
to = self.getFactor(to)[2]
unit = factor * self._factor[2] / to
return self.convert(factor, unit) # to units.
[docs] def exactFactorIndex(self, key):
"""
Find the index of a factor that contains a key.
:param key: anything to look in factors (i.e. factor name, factor value, abbreviation).
:return: factor structure, else None.
"""
for i in self._factorsCache:# try to find it
if key in i: return i.index(key)
[docs] def nearFactorIndex(self, factor):
"""
Find the index of nearest factor value.
:param factor: factor value.
:return: factor structure near factor value.
"""
return (np.abs(np.array(self._factorsCache[2]) - factor)).argmin()
[docs] def getFactor(self, key):
"""
Tries to find factor value in factors.
:param key: anything to look in factors (i.e. factor name, factor value, abbreviation).
If key is a factor value it will look for the nearest factor value.
:return: factor structure, else raises error.
"""
if key is None or isinstance(key, basestring):
index = self.exactFactorIndex(key)
if index is None: raise Exception("Factor not found for {}".format(key))
else:
index = self.nearFactorIndex(key)
return self._factors[index]
@staticmethod
[docs] def split(value):
"""
Get number fraction.
:param value: number
:return: integer, fraction
"""
left = int(value)
right = value-left
return left,right
@staticmethod
[docs] def parts(value, precision = 4):
"""
Get number parts.
:param value: number
:param precision: decimal precision
:return: ([... ,Hundreds, Tens, Ones],[Tenths, ...])
"""
p = "{{:0.{}f}}".format(precision).format(value).split(".")
if len(p)>1:
return [int(i) for i in p[0]],[int(i) for i in p[1]]
else:
return [int(i) for i in p[0]],[]
[docs]class Magnitude(object):
def __init__(self, value =0, factor = None, unit = None, precision = None, abbreviate = False):
self.value = value
self.precision = precision
self.factor = factor
self.unit = unit
self.abbreviate = abbreviate
def __str__(self):
return self.format_value(self.value)
[docs]class TimeCode(object):
"""
Context to profile code by printing a prelude and prologue with time.
:param msg: prelude or description message
:param factor: factor supported by FactorConvert class
:param precision: number of digits after a float point
:param abv: if True prints "s", if False "seconds" for time
:param endmsg: prologue message
:param enableMsg: (True) A flag specifying if context
should be printed or not.
:param printfunc: function to print messages. By default it
is sys.stdout.write
"""
def __init__(self, msg = None, factor = None, precision = None,
abv=None, endmsg = "{time}\n", enableMsg= True,
printfunc= None, profiler = None, profile_point = None):
self.msg = msg
self.factor = factor
self.precision = precision
self.abv = abv
self.endmsg = endmsg
self.enableMsg = enableMsg
self.printfunc = printfunc
self.time_start = None
self.time_end = None
self.profiler = profiler
self.profile_point = profile_point
@property
def time(self):
if self.time_start is None:
return 0
else:
return time()-self.time_start
@property
def time_end(self):
if self._time_end is None:
return self.time
return self._time_end
@time_end.setter
def time_end(self,value):
self._time_end = value
@time_end.deleter
def time_end(self):
del self._time_end
def __enter__(self):
if self.printfunc is None:
self.printfunc = sys.stdout.write
if self.enableMsg and self.msg is not None:
self.printfunc(self.msg)
self.time_start = time() # begin chronometer
self.time_end = None
if self.profiler is not None:
if self.profile_point is None:
msg = self.msg
else:
msg = self.profile_point
if isinstance(msg,basestring):
self.profile_point = self.profiler.open_point(msg = msg)
else:
self.profile_point = self.profiler.open_point(*msg)
elif isinstance(self.profile_point, Profiler):
self.profile_point.time_start = self.time_start
return self
def __exit__(self, type, value, traceback):
if isinstance(self.profile_point, Profiler):
self.profile_point.close()
t = self.time # end chronometer
self.time_end = t
if self.enableMsg and self.endmsg is not None:
# abbreviation
if self.abv is True:
u,i = "s",True
else:
u,i = " seconds",False
self.printfunc(self.endmsg.format(
time=Magnitude(value = t, precision=self.precision,
unit=u, factor=self.factor, abbreviate=i)))
[docs]class Profiler(object):
r"""
profiler for code points.
:param msg: custom comment for profiling point
:param tag: classification tag
:parameter space: (" ")
:parameter format_line: ("{space}{tag}{msg}{time}")
:parameter format_structure: ("\\n{space}[{tag}{msg}{time}{child}]{side}")
:parameter points: profile instaces which are divided in "side" or "children" points
according if they are side by side or are inside of the profiler.
"""
def __init__(self, msg = None, tag= None):
self.time_start = time()
self.msg = msg
self.tag = tag
self.space = " "
self.format_line = "{space}{tag}{msg}{time}"
self.format_structure = "\n{space}[{tag}{msg}{time}{child}]{side}"
self.format_tag = "({}) "
self.format_time = " -> {:0.4f} secs"
self.format_msg = "{!r}"
self.time_end = None
self.points = []
@property
def time(self):
"""
:return: overall time of profiling
"""
try:
return self.time_end -self.time_start
except:
points_diff = [i.time for i in self.points if i.time]
if points_diff:
return np.sum(points_diff)
return None
def _add_point(self, point):
"""
keep track of points and organize self.points (not intended for the user)
:param point: profile instance
"""
if self.points and not point in self.points:
child = self.points[-1]
if child.time_end is None:
child._add_point(point)
else:
self.points.append(point)
elif not self.points:
self.points.append(point)
def _close_point(self, point):
"""
close a points and adds to the list of points (not intended for the user)
:param point: profile instance
"""
point.close()
self._add_point(point)
[docs] def open_point(self, msg = None, tag= None):
"""
Open a profiling point to track time.
:param msg: custom comment for profiling point
:param tag: classification tag
:return:
"""
point = Profiler(msg, tag)
self._add_point(point)
return point
[docs] def close(self):
"""
close profiler and all their points
"""
for p in self.points:
p.close()
if self.time_end is None:
self.time_end = time()
def _collapse_structure(self, children, collapse = None):
"""
Collapse list of profiles (not intended for the user)
:param children: list of profiles
:param collapse: list for collapsing repeated tags or messages.
:return: filtered list
"""
# collapse them
if collapse is not None and children:
new_children = [] # init new list of collapsed children
indexes = dict() # keep cache of indexed children
for child in children: # loop over children
cmp = tuple(child[:2]) # comparison and key index
if new_children \
and (collapse is True or cmp[0] in collapse or cmp[1] in collapse) \
and cmp in indexes:
index = indexes[cmp] # return cached index
new_children[index][2] += child[2] # add time
new_children[index][3].extend(child[3]) # add children of child
else: # if not indexed
indexes[cmp] = len(new_children) # cache actual index
new_children.append(child) # append a new child
return new_children # replace for new children list
return children
[docs] def restructure(self, structure, collapse):
"""
reprocess an already created structure.
:param structure: structure.
:param collapse: list for collapsing repeated tags or messages.
:return: reprocessed structure
"""
tag, msg, time, children = structure
if children:
children = self._collapse_structure(children,collapse)
for i,child in enumerate(children):
children[i] = self.restructure(child, collapse)
structure[3] = children
return structure
[docs] def structure(self, collapse = None):
"""
profiling structure.
:param collapse: list for collapsing repeated tags or messages.
:return: structure with format [tag,msg,time,children]
"""
# collect structure of children
children = [point.structure(collapse) for point in self.points]
children = self._collapse_structure(children, collapse)
return [self.tag, self.msg, self.time, children] # permit item assignment
def _helper_lines_unformatted(self, struct, level=0):
"""
helper to generate lines (not intended for the user)
:param struct: profiling structure
:param level: structure level
:return: generator with outputs (level, tag, msg, time)
"""
tag,msg,time,children = struct
yield (level, tag, msg, time)
for child in children:
for i in self._helper_lines_unformatted(child, level + 1):
yield i
[docs] def string_lines(self):
"""
string with plain structure of profiling
"""
return "\n".join(self.lines_formatted())
def _helper_string_structured(self, structs, level=0):
"""
helper to generate string with structure of profiling
:param structs: list of structures
:param level: structure level
:return: string
"""
if structs:
mystr = self.format_structure
tag,msg,time,children = structs[0]
space,tag,msg,time = self.formatter(level,tag,msg,time)
mystr = mystr.format(space=space, tag=tag, msg=msg, time=time,
child=self._helper_string_structured(children, level + 1),
side=self._helper_string_structured(structs[1:], level))
return mystr
else:
return ""
[docs] def string_structured(self, collapse = None, structure = None):
"""
string with plain structure of profiling
:param collapse: list for collapsing repeated tags or messages.
:param structure: (None) uses and already created structure. If None
it creates the structure.
:return: string
"""
if structure is None:
return self._helper_string_structured([self.structure(collapse)])
else:
return self._helper_string_structured([self.restructure(structure, collapse)])
[docs]class Controlstdout(object):
"""
Context manager to control output to stdout
:param disable: if True suppress output.
:param buffer: (None) if True creates a buffer to collect all data
printed to the stdout which can be retrieved with self.buffered.
A file can be given but if it is write-only it cannot retrieve
data to self.buffered so "w+" is recommended to be used with self.buffered.
.. warning:: If a reference to sys.stdout is kept before the Controlstdout
instance then output can be printed trough it and cannot be
controlled by the Controlstdout context.
"""
def __init__(self, disable = True, buffer = None):
self.disable = disable
self.buffer = buffer
self.buffered = ""
self.stdout_old = None
self.stdout_new = None
def __enter__(self):
self.stdout_old = sys.stdout
self.stdout_new = StdoutSIM(self.disable)
if self.buffer:
if self.buffer is True: # in case it is not defined
import io
self.buffer = io.StringIO()
self.stdout_new = StdoutMULTI([self.stdout_new, self.buffer])
sys.stdout = self.stdout_new
return self
def __exit__(self, type, value, traceback):
if self.buffer:
try:
self.buffer.seek(0)
self.buffered = self.buffer.read()
self.buffer.close()
except:
pass
sys.stdout = self.stdout_old
[docs]def glob(path, contents="*", check = os.path.isfile):
"""
Return a list of paths matching a pathname pattern with valid files.
:param path: path to process ing glob filter
:param contents: If path is a folder then looks for contents using
:param check: function to filter contents. it must receive the path
and return True to let it pass and False to suppress it.
:return: return list of files
"""
from glob import glob
fns = glob(path)
# special case: Folder
if len(fns) == 1 and not os.path.isfile(fns[0]):
fns = glob(os.path.join(fns[0], contents))
return list(filter(check,fns)) # [p for p in fns if check(p)]
[docs]def ensureList(obj):
""" ensures that object is list """
if isinstance(obj,list):
return obj # returns original lis
elif hasattr(obj, '__iter__'): # for python 2.x check if obj is iterablet
return list(obj) # converts to list
else:
return [obj] # object is returned inside list
[docs]class globFilter(object):
'''glob filter for patterns'''
def __init__(self, include=None, exclude=None, case= False):
"""
:param include: permitted patterns
:param exclude: excluded patterns. it takes priority over includes
:param case: True or False for case sensitive patterns
"""
if include is None: include = []
self.include = ensureList(include)
if exclude is None: exclude = []
self.exclude = ensureList(exclude)
if case:
self.cmpfunc = fnmatchcase
else:
self.cmpfunc = fnmatch
def __call__(self, cmp=None):
"""
Evaluate filter.
:param cmp: iterator or string
:return: True or false if cmp pass filter test
"""
if hasattr(cmp,"__iter__"):
return [self(i) for i in cmp]
else:
cmpfunc = self.cmpfunc # prevents from accessing self
for pattern in self.exclude: # if exclude is [] don't do the test
if cmpfunc(cmp, pattern):
return False
if self.include: # test includes
for pattern in self.include:
if cmpfunc(cmp, pattern):
return True
else: # if include is False or None return True anyway i.e. equivalent to ['*']
return True
return False
[docs]def lookinglob(pattern, path=None, ext=None, forward=None,
filelist=None, aslist = False, raiseErr = False):
"""
Look for patterns in Path. It looks as {if path}{if pattern}{if forward}{if ext}.
:param pattern: string to look for pattern.
:param path: (None) path to look for pattern
:param ext: (None) extension of pattern in path
:param forward: (None) look changes after pattern and before ext parameter.
:param filelist: (None) simulates the files in path and look patterns in this list.
:param aslist: (False) if False it returns the first match case string
else the list of matching cases.
:param raiseErr: If true raises Exception if patter is not found in path or there
are more than one match
:return: matched case if returnAll is False else the list of matched cases
or if no match is found None
"""
if path is None and ext is None and forward is None:
# filter pure pattern
tests = [pattern]
else:
if path is None:
path = ""
# create a list of test with pattern
tests = [pattern, "*{pattern}", "{path}{pattern}", "{path}*{pattern}"]
if forward:
tests.extend(
["{pattern}*","*{pattern}*", "{path}{pattern}*", "{path}*{pattern}*"]
)
if ext is not None and not pattern.endswith(ext):
if not ext.startswith("."):
ext = "."+ext
tests = [i + ext for i in tests]
tests = set(tests) # eliminate repeated keys
if filelist is None:
# look in a real path
ress = []
for test in tests:
# get list of matches
res = glob(test.format(path=path, pattern=pattern))
if not res: # if not match continue
continue
# return first match
if not aslist and len(res)==1:
return res[0]
# keep acumulatting matches
ress.extend(res)
else:
# look in a simulated path with files in file_list
include = [test.format(path=path, pattern=pattern) for test in tests]
ress = list(filter(globFilter(case=True,include=include),filelist))
if len(ress)==1 and aslist:
return ress # return list anyways
elif len(ress)==1:
return ress[0] # return only matched
elif ress and raiseErr:
raise Exception("More than one file with pattern '{}'".format(pattern))
elif raiseErr:
#if file_list is not None:
# path = file_list
if path is None:
raise Exception("pattern '{}' not found".format(pattern))
raise Exception("pattern '{}' not in {}".format(pattern, path))
elif aslist:
return list(set(ress)) # return unique matches
return None
if __name__ == "__main__":
if False:
def myfc():
sleep(1)
return "======"*10000
with TimeCode("init",100,abv=True):
myfc()
if False:
fac = FactorConvert()
print(fac.convert(10,100))
print(fac.parts(1001010.01010101))
print("{:f} {}".format(*FactorConvert("m").convert2sample(36797.59, "m")))
print(FactorConvert("m").convert(36797.59))
print(fac.convert(1000))
if True:
pf = Profiler("Init")
p1 = pf.open_point("for loop")
for i in range(5):
with TimeCode("loop {}".format(i),profiler=pf):
pass
with TimeCode("loop {}".format(i),profiler=pf):
pass
p2 = pf.open_point("in level of p1")
p1.close()
p3 = pf.open_point("other process")
#print pf.string_structured()
print(pf.string_structured(True,pf.structure()))
# ---------------------------- EXCEPTIONS ---------------------------- #
[docs]class TimeOutException(Exception):
"""
Raise an exception when a process surpasses the timeout
"""
[docs]class TransferExeption(Exception):
"""
Raise an exception when transfered data is corrupt
"""
[docs]class VariableNotSettable(Exception):
"""
Exception for property not settable
"""
[docs]class VariableNotDeletable(Exception):
"""
Exception for property not deletable
"""
[docs]class VariableNotGettable(Exception):
"""
Exception for property not gettable
"""
[docs]class VariableNotAvailable(Exception):
"""
Exception for variable that is not available
"""
[docs]class NotConvertibleToInt(ValueError):
"""
Exception to denote that value cannot be represented as int
"""
[docs]class ClassNotAllowed(Exception):
"""
Exception to denote that given class is not allowed
"""
[docs]class NotCreatable(Exception):
"""
Defines objectGetter error: objectGetter cannot create new object.
"""
[docs]class NotCallable(Exception):
"""
Defines objectGetter error: given object is not callable.
"""
[docs]class NoParserFound(Exception):
"""
Raise when no parser is found to use in a shell i.e to interpret user
input
"""
[docs]class CorruptPersistent(EOFError,IOError):
"""
Used for persistent data read from disk like pickles to denote
it has been corrupted
"""
# ----------------------------GLOBAL VARIABLES---------------------------- #
[docs]class NameSpace(object):
"""
Used to store variables
"""