#!/usr/bin/env python
"""
#############################################################################
##
## project : Tango Control System
##
## $Author: Sergi Rubio Manrique, srubio@cells.es $
##
## $Revision: 2008 $
##
## copyleft : ALBA Synchrotron Controls Section, CELLS
## Bellaterra
## Spain
##
#############################################################################
##
## This file is part of Tango Control System
##
## Tango Control System is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as published
## by the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## Tango Control System is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
###########################################################################
"""
__doc__ = """
fandango.functional::
contains functional programming methods for python, it should use only python main library methods and not be dependent of any other module
.. contents::
"""
import re
import random
import math
import time,datetime
from operator import isCallable
from functools import partial
from itertools import count,cycle,repeat,chain,groupby,islice,imap,starmap
from itertools import dropwhile,takewhile,ifilter,ifilterfalse,izip
try: from itertools import combinations,permutations,product
except: pass
__test__ = {}
#Load all by default
#__all__ = [
#'partial','first','last','anyone','everyone',
#'notNone','isTrue','join','splitList','contains',
#'matchAll','matchAny','matchMap','matchTuples','inCl','matchCl','searchCl',
#'toRegexp','isString','isRegexp','isNumber','isSequence','isDictionary','isIterable','isMapping',
#'list2str','char2int','int2char','int2hex','int2bin','hex2int',
#'bin2unsigned','signedint2bin',
#]
########################################################################
## Some miscellaneous logic methods
########################################################################
[docs]def first(seq,default=Exception):
"""Returns first element of sequence"""
try:
return seq[0]
except Exception,e:
try:
return seq.next()
except Exception,d:
if default is not Exception:
return default
else:
raise d
#raise e #if .next() also doesn't work throw unsubscriptable exception
return
[docs]def last(seq,MAX=1000,default=Exception):
"""Returns last element of sequence"""
try:
return seq[-1]
except Exception,e:
try:
n = seq.next()
except:
if default is not Exception:
return default
else:
raise e #if .next() also doesn't work throw unsubscriptable exception
try:
for i in range(1,MAX):
n = seq.next()
if i>(MAX-1):
raise IndexError('len(seq)>%d'%MAX)
except StopIteration,e: #It catches generators end
return n
return
max = max
min = min
[docs]def avg(seq):
seq = [float(s) for s in seq if s is not None]
if not bool(seq) or not len(seq): return 0
return sum(seq)/len(seq)
[docs]def rms(seq):
seq = [float(s)**2 for s in seq if s is not None]
if not bool(seq) or not len(seq): return 0
return math.sqrt(sum(seq)/float(len(seq)))
[docs]def randomize(seq):
done,result = list(range(len(seq))),[]
while done: result.append(seq[done.pop(random.randrange(len(done)))])
return result
[docs]def randpop(seq):
return seq.pop(random.randrange(len(seq)))
[docs]def floor(x,unit=1):
""" Returns greatest multiple of 'unit' below 'x' """
return unit*int(x/unit)
[docs]def xor(A,B):
"""Returns (A and not B) or (not A and B);
the difference with A^B is that it works also with different types and returns one of the two objects..
"""
return (A and not B) or (not A and B)
[docs]def reldiff(x,y,floor=None):
"""
Checks relative (%) difference <floor between x and y
floor would be a decimal value, e.g. 0.05
"""
d = x-y
if not d: return 0
ref = x or y
d = float(d)/ref
return d if not floor else (0,d)[abs(d)>=floor]
#return 0 if x*(1-r)<y<x*(1+r) else -1
[docs]def absdiff(x,y,floor=0.01):
"""
Checks absolute difference <floor between x and y
floor would be a decimal value, e.g. 0.05
"""
d = x-y
if not d: return 0
return d if not floor else (0,d)[abs(d)>=floor]
#return 0 if x-r<y<x+r else -1
[docs]def seqdiff(x,y,method=reldiff,floor=None):
"""
Being x and y two arrays it checks (method) difference <floor between the elements of them.
floor would be a decimal value, e.g. 0.05
"""
if not floor:
d = any(method(v,w) for v,w in zip(x,y))
else:
d = any(method(v,w,floor) for v,w in zip(x,y))
return d
[docs]def notNone(arg,default=None):
""" Returns arg if not None, else returns default. """
return [arg,default][arg is None]
[docs]def isTrue(arg):
""" Returns True if arg is not None, not False and not an empty iterable. """
if hasattr(arg,'__len__'): return len(arg)
else: return arg
[docs]def join(*seqs):
""" It returns a list containing the objects of all given sequences. """
if len(seqs)==1 and isSequence(seqs[0]):
seqs = seqs[0]
result = []
for seq in seqs:
if isSequence(seq): result.extend(seq)
else: result.append(seq)
# result += list(seq)
return result
[docs]def djoin(a,b):
""" This method merges dictionaries and/or lists """
if not any(map(isDictionary,(a,b))): return join(a,b)
other,dct = sorted((a,b),key=isDictionary)
if not isDictionary(other):
other = dict.fromkeys(other if isSequence(other) else [other,])
for k,v in other.items():
dct[k] = v if not k in dct else djoin(dct[k],v)
return dct
[docs]def kmap(method,keys,values=None,sort=True):
g = ((k,method((values or keys)[i])) for i,k in enumerate(keys))
return sorted(g) if sort else list(g)
__test__['kmap'] = [
{'args':[str.lower,'BCA','YZX',False],'result':[('A', 'x'), ('B', 'y'), ('C', 'z')]}
]
[docs]def splitList(seq,split):
"""splits a list in lists of 'split' size"""
#return [seq[split*i:split*(i+1)] for i in range(1+len(seq)/split)]
return [seq[i:i+split] for i in range(len(seq))[::split]]
[docs]def contains(a,b,regexp=True):
""" Returns a in b; using a as regular expression if wanted """
return inCl(a,b,regexp)
[docs]def anyone(seq,method=bool):
"""Returns first that is true or last that is false"""
if not seq: return False
s = None
for s in seq:
if method(s): return s
return s if not s else None
[docs]def everyone(seq,method=bool):
"""Returns last that is true or first that is false"""
if not seq: return False
for s in seq:
if not method(s): return s if not s else None
return seq[-1]
#Dictionary methods
[docs]def setitem(mapping,key,value):
mapping[key]=value
[docs]def getitem(mapping,key):
return mapping[key]
[docs]def setlocal(key,value):
setitem(locals(),key,value)
########################################################################
## Regular expressions
########################################################################
[docs]def matchAll(exprs,seq):
""" Returns a list of matched strings from sequence.
If sequence is list it returns exp as a list.
"""
exprs,seq = toSequence(exprs),toSequence(seq)
if anyone(isRegexp(e) for e in exprs):
exprs = [(e.endswith('$') and e or (e+'$')) for e in exprs]
return [s for s in seq if anyone(matchCl(e,s) for e in exprs)]
else:
return [s for s in seq if s in exprs]
[docs]def matchAny(exprs,seq):
""" Returns seq if any of the expressions in exp is matched, if not it returns None """
exprs = toSequence(exprs)
for exp in exprs:
if matchCl(exp,seq):
#print '===============> matchAny(): %s matched by %s' % (seq,exp)
return seq
return None
[docs]def matchMap(mapping,key,regexp=True,default=Exception):
""" from a mapping type (dict or tuples list) with strings as keys it returns the value from the matched key or raises KeyError exception """
if not mapping:
if default is not Exception:
return default
raise ValueError('mapping')
if hasattr(mapping,'items'): mapping = mapping.items()
if not isSequence(mapping) or not isSequence(mapping[0]): raise TypeError('dict or tuplelist required')
if not isString(key): key = str(key)
for tag,value in mapping:
if (matchCl(tag,key) if regexp else (key in tag)):
return value
if default is not Exception:
return default
raise KeyError(key)
[docs]def matchTuples(mapping,key,value):
""" mapping is a (regexp,[regexp]) tuple list where it is verified that value matches any from the matched key """
for k,regexps in mapping:
if re.match(k,key):
if any(re.match(e,value) for e in regexps):
return True
else:
return False
return True
[docs]def inCl(exp,seq,regexp=True):
""" Returns a caseless "in" boolean function, using regex if wanted """
if not seq:
return False
if isString(seq):
return searchCl(exp,seq) if (regexp and isRegexp(exp)) else exp.lower() in seq.lower()
elif isSequence(seq) and isString(seq[0]):
if regexp:
return any(matchCl(exp,s) for s in seq)
else:
return any(exp.lower()==s.lower() for s in seq)
else:
return exp in seq
[docs]def matchCl(exp,seq,terminate=False,extend=False):
""" Returns a caseless match between expression and given string """
if extend:
if '&' in exp:
return all(matchCl(e.strip(),seq,terminate=False,extend=True) for e in exp.split('&'))
if exp.startswith('!'):
return not matchCl(exp[1:],seq,terminate,extend=True)
return re.match(toRegexp(exp.lower(),terminate=terminate),seq.lower())
clmatch = matchCl #For backward compatibility
[docs]def searchCl(exp,seq,terminate=False,extend=False):
""" Returns a caseless regular expression search between expression and given string """
if extend:
if '&' in exp:
return all(searchCl(e.strip(),seq,terminate=False,extend=True) for e in exp.split('&'))
if exp.startswith('!'):
return not searchCl(exp[1:],seq,terminate,extend=True)
return re.search(toRegexp(exp.lower(),terminate=terminate),seq.lower())
clsearch = searchCl #For backward compatibility
[docs]def replaceCl(exp,repl,seq,regexp=True,lower=False):
"""
Replaces caseless expression exp by repl in string seq
repl can be string or callable(matchobj) ; to reuse matchobj.group(x) if needed in the replacement string
lower argument controls whether replaced string should be always lower case or not
"""
if lower and hasattr(repl,'lower'):
repl = repl.lower()
if regexp:
r,s = '',1
while s:
s = searchCl(exp,seq)
if s:
r+=seq[:s.start()]+repl
seq=seq[s.end():]
return r+seq
else:
return seq.lower().replace(exp.lower(),repl)
clsub = replaceCl
[docs]def splitCl(exp,seq,inclusive=False):
"""
Split an string by occurences of exp
"""
s,e = seq.lower(),exp.lower()
matches = re.finditer(e,s)
if not matches:
r = [seq]
else:
i,r = 0,[]
for m in matches:
l = seq[i:m.end() if inclusive else m.start()]
if l: r.append(l)
i = m.end()
if i<len(seq):
r.append(seq[i:])
return r
clsplit = splitCl
[docs]def sortedRe(iterator,order):
""" Returns a list sorted using regular expressions.
order = list of regular expressions to match ('[a-z]','[0-9].*','.*')
"""
if '.*' not in order: order=list(order)+['.*']
rorder = [re.compile(c) for c in order]
def sorter(k,ks=rorder):
k = str(k[0] if isinstance(k,tuple) else k).lower()
return str((i for i,r in enumerate(ks) if r.match(k)).next())+k
for i in sorted(iterator,key=sorter):
print '%s:%s' % (i,sorter(i))
return sorted(iterator,key=sorter)
[docs]def toCl(exp,terminate=False,wildcards=('*',' '),lower=True):
""" Replaces * by .* and ? by . in the given expression.
"""
## @PROTECTED: DO NOT MODIFY THIS METHOD, MANY, MANY APPS DEPEND ON IT
exp = str(exp).strip()
if lower: exp = exp.lower()
if not any(s in exp for s in ('.*','\*',']*')):
for w in wildcards:
exp = exp.replace(w,'.*')
if terminate and not exp.strip().endswith('$'): exp += '$'
exp = exp.replace('(?p<','(?P<') #Preventing missing P<name> clausses
return exp
[docs]def toRegexp(exp,terminate=False):
""" Case sensitive version of the previous one, for backwards compatibility """
return toCl(exp,terminate,wildcards=('*',),lower=False)
[docs]def filtersmart(seq,filters):
"""
filtersmart(sequence,filters=['any_filter','+all_filter','!neg_filter'])
appies a list of filters to a sequence of strings,
behavior of filters depends on first filter character:
'[a-zA-Z0-9] : an individual filter matches all strings that contain it, one matching filter is enough
'!' : negate, discards all matching values
'+' : complementary, it must match all complementaries and at least a 'normal filter' to be valid
'^' : matches string since the beginning (startswith instead of contains)
'$' : matches the end of strings
',' : will be used as filter separator if a single string is provided
"""
seq = seq if isSequence(seq) else (seq,)
if isString(filters):
filters = filters.split(',')
raw,comp,neg = [],[],[]
def parse(s):
s = toRegexp(s)
if '*' not in s:
if not s.startswith('^'): s='.*'+s
if not s.startswith('$'): s=s+'.*'
return s
for f in filters:
if f.startswith('+'): comp.append(parse(f[1:]))
elif f.startswith('!'): neg.append(parse(f[1:]))
else: raw.append(parse(f))
return [s for s in seq if (not any(matchCl(n,s) for n in neg)) and any(matchCl(r,s) for r in raw) and (not comp or all(matchCl(c,s) for c in comp))]
########################################################################
## Methods for piped iterators
## Inspired by Maxim Krikun [ http://code.activestate.com/recipes/276960-shell-like-data-processing/?in=user-1085177]
########################################################################
[docs]class Piped:
"""This class gives a "Pipeable" interface to a python method:
cat | Piped(method,args) | Piped(list)
list(method(args,cat))
e.g.:
class grep:
#keep only lines that match the regexp
def __init__(self,pat,flags=0):
self.fun = re.compile(pat,flags).match
def __ror__(self,input):
return ifilter(self.fun,input) #imap,izip,count,ifilter could ub useful
cat('filename') | grep('myname') | printlines
"""
import itertools
def __init__(self,method,*args,**kwargs):
self.process=partial(method,*args,**kwargs)
def __ror__(self,input):
return imap(self.process,input)
[docs]class iPiped:
""" Used to pipe methods that already return iterators
e.g.: hdb.keys() | iPiped(filter,partial(fandango.inCl,'elotech')) | plist
"""
def __init__(self,method,*args,**kwargs): self.process = partial(method,*args,**kwargs)
def __ror__(self,input): return self.process(input)
[docs]class zPiped:
"""
Returns a callable that applies elements of a list of tuples to a set of functions
e.g. [(1,2),(3,0)] | zPiped(str,bool) | plist => [('1',True),('3',False)]
"""
def __init__(self,*args): self.processes = args
def __ror__(self,input): return (tuple(p(i[j]) for j,p in enumerate(self.processes))+tuple(i[len(self.processes):]) for i in input)
pgrep = lambda exp: iPiped(lambda input: (x for x in input if inCl(exp,x)))
pmatch = lambda exp: iPiped(lambda input: (x for x in input if matchCl(exp,str(x))))
pfilter = lambda meth=bool,*args: iPiped(filter,partial(meth,*args))
ppass = Piped(lambda x:x)
plist = iPiped(list)
psorted = iPiped(sorted)
pdict = iPiped(dict)
ptuple = iPiped(tuple)
pindex = lambda i: Piped(lambda x:x[i])
pslice = lambda i,j: Piped(lambda x:x[i,j])
penum = iPiped(lambda input: izip(count(),input) )
pzip = iPiped(lambda i:izip(*i))
ptext = iPiped(lambda input: '\n'.join(imap(str,input)))
########################################################################
## Methods for identifying types
########################################################################
""" Note of the author:
This methods are not intended to be universal, are just practical for general Tango application purposes.
"""
reint = '[0-9]+'
refloat = '[0-9]+(\.[0-9]+)?([eE][+-]?[0-9]+)?'
[docs]def isString(seq):
if isinstance(seq,basestring): return True # It matches most python str-like classes
if any(s in str(type(seq)).lower() for s in ('vector','array','list',)): return False
if 'qstring' == str(type(seq)).lower(): return True # It matches QString
return False
WILDCARDS = '^$*+?{[]\|()' #r'
[docs]def isRegexp(seq,wildcards=WILDCARDS):
""" This function is just a hint, use it with care. """
return anyone(c in wildcards for c in seq)
[docs]def isNumber(seq):
#return operator.isNumberType(seq)
if isinstance(seq,bool): return False
try:
float(seq)
return True
except: return False
NaN = float('nan')
[docs]def isNaN(seq):
return isinstance(seq,(int,float)) and math.isnan(seq) or (isString(seq) and seq.lower().strip() == 'nan')
[docs]def isNone(seq):
return seq is None or (isString(seq) and seq.lower().strip() in ('none','null','nan',''))
[docs]def isFalse(seq):
return not seq or str(seq).lower().strip() in ('false','0','no')
[docs]def isGenerator(seq):
from types import GeneratorType
# A generator check must be added to the rest of methods in this module!
return isinstance(seq,GeneratorType)
[docs]def isSequence(seq,INCLUDE_GENERATORS = True):
""" It excludes Strings, dictionaries but includes generators"""
if any(isinstance(seq,t) for t in (list,set,tuple)):
return True
if isString(seq):
return False
if hasattr(seq,'items'):
return False
if INCLUDE_GENERATORS:
if hasattr(seq,'__iter__'):
return True
elif hasattr(seq,'__len__'):
return True
return False
[docs]def isDictionary(seq):
""" It includes dicts and also nested lists """
if isinstance(seq,dict): return True
if hasattr(seq,'items') or hasattr(seq,'iteritems'): return True
try:
if seq and isSequence(seq) and isSequence(seq[0]):
if seq[0] and not isSequence(seq[0][0]): return True #First element of tuple must be hashable
except: pass
return False
isMapping = isDictionary
[docs]def isIterable(seq):
""" It includes dicts and listlikes but not strings """
return hasattr(seq,'__iter__') and not isString(seq)
[docs]def isNested(seq,strict=False):
if not isIterable(seq) or not len(seq): return False
child = seq[0] if isSequence(seq) else seq.values()[0]
if not strict and isIterable(child): return True
if any(all(map(f,(seq,child))) for f in (isSequence,isDictionary)): return True
return False
[docs]def shape(seq):
"""
Returns the N dimensions of a python sequence
"""
if not isSequence(seq):
return []
else:
d = [len(seq)]
if isNested(seq):
d.extend(shape(seq[0]))
return d
[docs]def isBool(seq,is_zero=True):
codes = ['true','yes','false','no']
if is_zero: codes+=['0','1']
if seq in (True,False):
return True
elif isString(seq):
return seq.lower() in codes #none/nan will not be considered boolean
else:
return False
[docs]def isDate(seq):
try:
return str2time(seq)
except:
return False
###############################################################################
[docs]def str2int(seq):
""" It returns the first integer encountered in the string """
return int(re.search(reint,seq).group())
[docs]def str2float(seq):
""" It returns the first float (x.ye-z) encountered in the string """
return float(re.search(refloat,seq).group())
[docs]def str2bool(seq):
""" It parses true/yes/no/false/1/0 as booleans """
return seq.lower().strip() not in ('false','0','none','no')
[docs]def str2type(seq,use_eval=True,sep_exp='[,;\ ]+'):
"""
Tries to convert string to an standard python type.
If use_eval is True, then it tries to evaluate as code.
Lines separated by sep_exp will be automatically split
"""
seq = str(seq).strip()
m = sep_exp and (seq[0] not in '{[(') and re.search(sep_exp,seq)
if m:
return [str2type(s,use_eval) for s in str2list(seq,m.group())]
elif isBool(seq,is_zero=False):
return str2bool(seq)
elif use_eval:
try:
return eval(seq)
except:
return seq
elif isNumber(seq):
return str2float(seq)
else:
return seq
[docs]def doc2str(obj):
return obj.__name__+'\n\n'+obj.__doc__
[docs]def rtf2plain(t,e='[<][^>]*[>]'):
t = re.sub(e,'',t)
if re.search(e,t):
return rtf2plain(t,e)
else:
return t
[docs]def html2text(txt):
return rtf2plain(txt)
[docs]def dict2json(dct,filename=None,throw=False,recursive=True,encoding='latin-1'):
"""
It will check that all objects in dict are serializable.
If throw is False, a corrected dictionary will be returned.
If filename is given, dict will be saved as a .json file.
"""
import json
result = {}
for k,v in dct.items():
try:
json.dumps(v,encoding=encoding)
result[k] = v
except Exception,e:
if throw: raise e
if isString(v): result[k] = ''
elif isSequence(v):
try:
result[k] = toList(v)
json.dumps(result[k])
except:
result[k] = []
elif isMapping(v) and recursive:
result[k] = dict2json(v,None,False,True,encoding=encoding)
if filename:
json.dump(result,open(filename,'w'),encoding=encoding)
return result if not filename else filename
[docs]def unicode2str(obj):
"""
Converts an unpacked unicode object (json) to
nested python primitives (map,list,str)
"""
if isMapping(obj):
n = dict(unicode2python(t) for t in obj.items())
elif isSequence(obj):
n = list(unicode2python(t) for t in obj)
elif isString(obj):
n = str(obj)
else:
n = obj
return n
[docs]def toList(val,default=[],check=isSequence):
if val is None:
return default
elif hasattr(val,'__len__') and len(val)==0: #To prevent exceptions due to non evaluable numpy arrays
return []
elif not check(val): #You can use (lambda s:isinstance(s,list)) if you want
return [val]
elif not hasattr(val,'__len__'): #It forces the return type to have a fixed length
return list(val)
else:
return val
toSequence = toList
[docs]def toString(*val):
if len(val)==1: val = val[0]
if hasattr(val,'text'):
try: return val.text()
except: return val.text(0)
else:
return str(val)
[docs]def toStringList(seq):
return map(toString,seq)
[docs]def str2list(s,separator='',regexp=False,sep_offset=0):
""" Arguments allow to split by regexp and to keep or not the separator character
sep_offset = 0 : do not keep
sep_offset = -1 : keep with posterior
sep_offset = 1 : keep with precedent
"""
if not regexp:
return map(str.strip,s.split(separator) if separator else s.split())
elif not sep_offset:
return map(str.strip,re.split(separator,s) if separator else re.split('[\ \\n]',s))
else:
r,seps,m = [],[],1
while m:
m = clsearch(separator,s)
if m:
r.append(s[:m.start()])
seps.append(s[m.start():m.end()])
s = s[m.end():]
r.append(s)
for i,p in enumerate(seps):
if sep_offset<0: r[i]+=p
else: r[i+1] = p+r[i+1]
return r
[docs]def code2atoms(code):
begin = '[\[\(\{]'
end = '[\]\)\}]'
#ops = '[,]'
l0 = str2list(code,begin,1,1)
l0 = filter(bool,map(str.strip,l0))
l1 = [a for l in l0 for a in str2list(l,end,1,-1)]
l1 = filter(bool,map(str.strip,l1))
#l2 = [a for l in l1 for a in str2list(l,ops,1,-1)]
return l1
[docs]def text2list(s,separator='\n'):
return filter(bool,str2list(s,separator))
[docs]def str2lines(s,length=80,separator='\n'):
return separator.join(s[i:i+length] for i in range(0,len(s),length))
[docs]def list2str(s,separator='\t',MAX_LENGTH=255):
s = str(separator).join(str(t) for t in s)
if MAX_LENGTH>0 and separator not in ('\n','\r') and len(s)>MAX_LENGTH:
s = s[:MAX_LENGTH-4]+'... '
return s
[docs]def text2tuples(s,separator='\t'):
return [str2list(t,separator) for t in text2list(s)]
[docs]def tuples2text(s,separator='\t',lineseparator='\n'):
return list2str([list2str(t,separator) for t in s],lineseparator)
[docs]def dict2str(s,sep=':\t',linesep='\n',listsep='\n\t'):
return linesep.join(sorted(
sep.join((str(k),list2str(toList(v),listsep,0)))
for k,v in s.items()))
[docs]def obj2str(obj,sep=',',linesep='\n'):
if isMapping(obj): return dict2str(obj,sep,linesep)
elif isSequence(obj): return list2str(obj,sep)
else: return toString(obj)
########################################################################
## Number conversion
########################################################################
[docs]def negbin(old):
""" Given a binary number as an string, it returns all bits negated """
return ''.join(('0','1')[x=='0'] for x in old)
[docs]def char2int(c):
"""ord(c)"""
return ord(c)
[docs]def int2char(n):
"""unichr(n)"""
return unichr(n)
[docs]def int2hex(n): return hex(n)
[docs]def int2bin(n): return bin(n)
[docs]def hex2int(c): return int(c,16)
[docs]def bin2unsigned(c): return int(c,2)
[docs]def signedint2bin(x,N=16):
""" It converts an integer to an string with its binary representation """
if x>=0: bStr = bin(int(x))
else: bStr = bin(int(x)%2**16)
bStr = bStr.replace('0b','')
if len(bStr)<N: bStr='0'*(N-len(bStr))+bStr
return bStr[-N:]
[docs]def bin2signedint(x,N=16):
""" Converts an string with a binary number into a signed integer """
i = int(x,2)
if i>=2**(N-1): i=i-2**N
return i
[docs]def int2bool(dec,N=16):
"""Converts an integer to a binary represented as a boolean array"""
result,dec = [],int(dec)
for i in range(N):
result.append(bool(dec % 2))
dec = dec >> 1
return result
[docs]def bool2int(seq):
""" Converts a boolean array to an unsigned integer """
return fandango.bin2unsigned(''.join(map(str,map(int,reversed(seq)))))
########################################################################
## Time conversion
########################################################################
END_OF_TIME = 1024*1024*1024*2-1 #Jan 19 04:14:07 2038
TIME_UNITS = {'ns':1e-9,'us':1e-6,'ms':1e-3,'':1,'s':1,'m':60, 'h':3600,'d':86.4e3,'w':604.8e3,'y':31.536e6}
RAW_TIME = '^([+-]?[0-9]+[.]?(?:[0-9]+)?)(?: )?(%s)$'%'|'.join(TIME_UNITS) # e.g. 3600.5 s
[docs]def now():
return time.time()
[docs]def time2tuple(epoch=None):
if epoch is None: epoch = now()
elif epoch<0: epoch = now()-epoch
return time.localtime(epoch)
[docs]def tuple2time(tup):
return time.mktime(tup)
[docs]def date2time(date):
return tuple2time(date.timetuple())
[docs]def date2str(date):
#return time.ctime(date2time(date))
return time.strftime('%Y-%m-%d %H:%M:%S',time2tuple(date2time(date)))
[docs]def time2date(epoch=None):
if epoch is None: epoch = now()
elif epoch<0: epoch = now()-epoch
return datetime.datetime.fromtimestamp(epoch)
[docs]def time2str(epoch=None,cad='%Y-%m-%d %H:%M:%S'):
if epoch is None: epoch = now()
elif epoch<0: epoch = now()+epoch
return time.strftime(cad,time2tuple(epoch))
epoch2str = time2str
[docs]def str2time(seq='',cad=''):
"""
:param seq: Date must be in ((Y-m-d|d/m/Y) (H:M[:S]?)) format or -N [d/m/y/s/h]
See RAW_TIME and TIME_UNITS to see the units used for pattern matching.
The conversion itself is done by time.strptime method.
:param cad: You can pass a custom time format
"""
if seq in (None,''): return time.time()
seq = str(seq).strip()
m = re.match(RAW_TIME,seq)
if m:
#Converting from a time(unit) format
value,unit = m.groups()
try: return float(value)*TIME_UNITS[unit]
except: raise Exception('PARAMS_ERROR','time format cannot be parsed!: %s'%seq)
else:
#Converting from a date format
t,ms = None,re.match('.*(\.[0-9]+)$',seq) #Splitting the decimal part
if ms: ms,seq = float(ms.groups()[0]),seq.replace(ms.groups()[0],'')
else: ms = 0
time_fmts = ([cad] if cad else [None]+
[('%s%s%s'%(date.replace('-',dash),separator if hour else '',hour))
for date in ('%Y-%m-%d','%y-%m-%d','%d-%m-%Y','%d-%m-%y','%m-%d-%Y','%m-%d-%y')
for dash in ('-','/')
for separator in (' ','T')
for hour in ('%H:%M','%H:%M:%S','')
])
for tf in time_fmts:
try:
tf = (tf,) if tf else () #tf=None will try default system format
t = time.strptime(seq,*tf)
break
except: pass
if t is not None: return time.mktime(t)+ms
else: raise Exception('PARAMS_ERROR','date format cannot be parsed!: %s'%str(seq))
str2epoch = str2time
[docs]def time2gmt(epoch=None):
if epoch is None: epoch = now()
return tuple2time(time.gmtime(epoch))
[docs]def timezone():
t = now()
return int(t-time2gmt(t))/3600
#Auxiliary methods:
[docs]def ctime2time(time_struct):
try:
return (float(time_struct.tv_sec)+1e-6*float(time_struct.tv_usec))
except:
return -1
[docs]def mysql2time(mysql_time):
try:
return time.mktime(mysql_time.timetuple())
except:
return -1
########################################################################
## Extended eval
########################################################################
[docs]def iif(condition,truepart,falsepart=None,forward=False):
"""
if condition is boolean return (falsepart,truepart)[condition]
if condition is callable returns truepart if condition(tp) else falsepart
if forward is True condition(truepart) is returned instead of truepart
if forward is callable, forward(truepart) is returned instead
"""
if isCallable(condition):
v = condition(truepart)
if not v:
return falsepart
elif not condition:
return falsepart
if isCallable(forward):
return forward(truepart)
elif forward:
return v
else:
return truepart
[docs]def ifThen(condition,callback,falsables=tuple()):
"""
This function allows to execute a callable on an object only if it has a valid value.
ifThen(value,callable) will return callable(value) only if value is not in falsables.
It is a List-like method, it can be combined with fandango.excepts.trial
"""
if condition not in falsables:
if not isSequence(callback):
return callback(condition)
else:
return ifThen(callback[0](condition),callback[1:],falsables)
else:
return condition
[docs]def retry(callable,retries=3,pause=0,args=[],kwargs={}):
r = None
for i in range(retries):
try:
r = callable(*args,**kwargs)
break
except Exception,e:
if i==(retries-1): raise e
elif pause: time.sleep(pause)
return r
[docs]def retried(retries=3,pause=0):
"""
"""
def retrier(f):
def retried_f(*args,**kwargs):
return retry(f,retries=retries,pause=pause,args=args,kwargs=kwargs)
return retried_f
return retrier
[docs]def evalF(formula):
"""
Returns a function that executes the formula passes as argument.
The formula should use x,y,z as predefined arguments, or use args[..] array instead
e.g.:
map(evalF("x>2"),range(5)) : [False, False, False, True, True]
It is optimized to be efficient (but still 50% slower than a pure lambda)
"""
#return (lambda *args: eval(formula,locals={'args':args,'x':args[0],'y':args[1],'z':args[2]}))
c = compile(formula,formula,'eval') #returning a lambda that evals a compiled code makes the method 500% faster
return (lambda *args: eval(c,{'args':args,'x':args and args[0],'y':len(args)>1 and args[1],'z':len(args)>2 and args[2]}))
[docs]def testF(f,args=[],t=5.):
"""
it returns how many times f(*args) can be executed in t seconds
"""
args = toSequence(args)
ct,t0 = 0,time.time()
while time.time()<t0+t:
f(*args)
ct+=1
return ct
[docs]def evalX(target,_locals=None,modules=None,instances=None,_trace=False,_exception=Exception):
"""
evalX is an enhanced eval function capable of evaluating multiple types and import modules if needed.
The _locals/modules/instances dictionaries WILL BE UPDATED with the result of the code! (if '=' or import are used)
It is used by some fandango classes to send python code to remote threads; that will evaluate and return the values as pickle objects.
target may be:
- dictionary of built-in types (pickable): {'__target__':callable or method_name,'__args__':[],'__class_':'','__module':'','__class_args__':[]}
- string to eval: eval('import $MODULE' or '$VAR=code()' or 'code()')
- list if list[0] is callable: value = list[0](*list[1:])
- callable: value = callable()
"""
import imp,__builtin__
# Only if immutable types are passed as arguments these dictionaries will be preserved.
_locals = notNone(_locals,{})
modules = notNone(modules,{})
instances = notNone(instances,{})
def import_module(module,reload=False):
alias = module.split(' as ',1)[-1] if ' as ' in module else module
module = module.split('import ',1)[-1].strip().split()[0]
if reload or alias not in modules:
if '.' not in module:
modules[module] = imp.load_module(module,*imp.find_module(module))
else:
parent,child = module.rsplit('.',1)
print parent
mparent = import_module(parent)
setattr(mparent,child,imp.load_module(module,*imp.find_module(child,mparent.__path__)))
modules[module] = getattr(mparent,child)
if alias:
modules[alias] = modules[module]
_locals[alias] = modules[alias]
print '%s(%s) : %s' % (alias,module,modules[alias])
return modules[alias]
def get_instance(_module,_klass,_klass_args):
if (_module,_klass,_klass_args) not in instances:
instances[(_module,_klass,_klass_args)] = getattr(import_module(_module),klass)(*klass_args)
return instances[(_module,_klass,_klass_args)]
if 'import_module' not in _locals: _locals['import_module'] = lambda m: import_module(m,reload=True)
if isDictionary(target):
model = target
keywords = ['__args__','__target__','__class__','__module__','__class_args__']
args = model['__args__'] if '__args__' in model else dict((k,v) for k,v in model.items() if k not in keywords)
target = model.get('__target__',None)
module = model.get('__module__',None)
klass = model.get('__class__',None)
klass_args = model.get('__class_args__',tuple())
if isCallable(target):
target = model['__target__']
elif isString(target):
if module:
#module,subs = module.split('.',1)
if klass:
if _trace: print('evalX: %s.%s(%s).%s(%s)'%(module,klass,klass_args,target,args))
target = getattr(get_instance(module,klass,klass_args),target)
else:
if _trace: print('evalX: %s.%s(%s)'%(module,target,args))
target = getattr(import_module(module),target)
elif klass and klass in dir(__builtin__):
if _trace: print('evalX: %s(%s).%s(%s)'%(klass,klass_args,target,args))
instance = getattr(__builtin__,klass)(*klass_args)
target = getattr(instance,target)
elif target in dir(__builtin__):
if _trace: print('evalX: %s(%s)'%(target,args))
target = getattr(__builtin__,target)
else:
raise _exception('%s()_MethodNotFound'%target)
else:
raise _exception('%s()_NotCallable'%target)
value = target(**args) if isDictionary(args) else target(*args)
if _trace: print('%s: %s'%(model,value))
return value
else:
#Parse: method[0](*method[1:])
if isIterable(target) and isCallable(target[0]):
value = target[0](*target[1:])
#Parse: method()
elif isCallable(target):
value = target()
elif isString(target):
if _trace: print('evalX("%s")'%target)
#Parse: import $MODULE
if target.startswith('import ') or ' import ' in target:
import_module(target) #Modules dictionary is updated here
value = target
#Parse: $VAR = #code
elif ( '=' in target and
'='!=target.split('=',1)[1][0] and
re.match('[A-Za-z\._]+[A-Za-z0-9\._]*$',target.split('=',1)[0].strip())
):
var = target.split('=',1)[0].strip()
_locals[var]=eval(target.split('=',1)[1].strip(),modules,_locals)
value = var
#Parse: #code
else:
value = eval(target,modules,_locals)
else:
raise _exception('targetMustBeCallable, not %s(%s)'%(type(target),target))
if _trace: print('Out of evalX(%s): %s'%(target,value))
return value
from . import doc
__doc__ = doc.get_fn_autodoc(__name__,vars(),module_vars=['END_OF_TIME'])