#!/usr/bin/env python
#############################################################################
##
## project : Tango Control System
##
## $Author: Sergi Rubio Manrique, srubio@cells.es $
##
## $Revision: 2008 $
##
## copyleft : ALBA Synchrotron Controls Section, CELLS
## Bellaterra
## Spain
##
#############################################################################
##
## This file is part of Tango Control System
##
## Tango Control System is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as published
## by the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## Tango Control System is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
###########################################################################
"""
Example of usage:
class logged_class(Logger):
def __init__(self,name,system):
#parent must be also an instance of a Logger object
self.call__init__(Logger,name,parent=system)
pass
...
Example of logging:
In [17]: import logging
In [18]: l = logging.getLogger("something")
In [19]: l.debug("message")
In [20]: l.error("message")
No handlers could be found for logger "something"
In [21]: l.addHandler(logging.StreamHandler())
In [22]: l.error("message")
message
"""
import time, logging, weakref, traceback, sys
from objects import Object,Decorator
from pprint import pprint
from functional import \
time2str,first,matchCl,isSequence,isMapping,isCallable,isString
import warnings
[docs]def printf(*args):
# This is a 'lambdable' version of print
print(''.join(map(str,args)))
[docs]def printerr(*args):
sys.stderr.write(*args)
[docs]def shortstr(s,max_len=144):
s = str(s)
if max_len>0 and len(s) > max_len:
s = s[:max_len-3]+'...'
return s
[docs]def except2str(e=None,max_len=int(7.5*80)):
if e is None: e = traceback.format_exc()
e = str(e)
if 'desc=' in e or 'desc =' in e:
r,c = '',0
for i in range(e.count('desc')):
c = e.index('desc',c)+1
r+=e[c-15:c+max_len-18]+'...\n'
result = r
else:
result = str(e)[-(max_len-3):]+'...'
return result or e[:max_len]
[docs]def test2str(obj,meth='',args=[],kwargs={}):
"""
Executes a method providing a verbose output.
For usage examples see fandango.device.FolderDS.FolderAPI.__test__()
"""
fs = str(obj) if not meth else '%s.%s'%(obj,meth)
r = 'Testing %s(*%s,**%s)\n\n' % (fs,args,kwargs)
v = None
try:
f = getattr(obj,meth) if meth and isString(meth) else (meth or obj)
v = f(*args,**kwargs)
if isMapping(v):
s = '\n'.join(map(str,v.items()))
elif isSequence(v):
s = '\n'.join(map(str,v))
else: s = str(v)
except:
s = traceback.format_exc()
r += '\n'.join('\t%s'%l for l in s.split('\n'))+'\n\n'
return r,v
[docs]def printtest(obj,meth='',args=[],kwargs={}):
"""
Executes a method providing a verbose output.
For usage examples see fandango.device.FolderDS.FolderAPI.__test__()
"""
r,v = test2str(obj,meth,args,kwargs)
print(r)
return v
ERROR,WARNING,INFO,DEBUG = logging.ERROR,logging.WARNING,logging.INFO,logging.DEBUG
LogLevels = {'ERROR':ERROR,'WARNING':WARNING,'INFO':INFO,'DEBUG':DEBUG,}
[docs]class FakeLogger():
"""
This class just simulates a Logger using prints with date and header, it doesn't allow any customization
"""
_instances = []
def __init__(self,header='',keep=False):
self.LogLevel = 1
self.header = '%s: '%header if header else ''
if keep: self._instances.append(self)
[docs] def setLogLevel(self,s):
self.LogLevel = str(s).lower()!='DEBUG'
[docs] def trace(self,s):
if not self.LogLevel:
print time2str()+' '+'TRACE\t'+self.header+s
[docs] def debug(self,s):
if not self.LogLevel:
print time2str()+' '+'DEBUG\t'+self.header+s
[docs] def info(self,s):print time2str()+' '+'INFO\t'+self.header+s
[docs] def warning(self,s):print time2str()+' '+'WARNING\t'+self.header+s
[docs] def error(self,s):print time2str()+' '+'ERROR\t'+self.header+s
[docs]class Logger(Object):
"""
This class provides logging methods (debug,info,error,warning) to all classes inheriting it.
To use it you must inherit from it and add it within your __init__ method:
class MyTangoDevice(Device_4Impl,Logger):
def __init__(self,cl, name):
PyTango.Device_4Impl.__init__(self,cl,name)
self.call__init__(Logger,name,format='%(levelname)-8s %(asctime)s %(name)s: %(message)s')
Constructor arguments allow to customize the output format:
* name='fandango.Logger' #object name to appear at the beginning
* parent=None
* format='%(levelname)-8s %(asctime)s %(name)s: %(message)s'\
* use_tango=True #Use Tango Logger if available
* use_print=True #Use printouts instead of linux logger (use_tango will override this option)
* level='INFO' #default log level
* max_len=0 #max length of log strings
"""
root_inited = False
Error = ERROR
Warning = WARNING
Info = INFO
Debug = DEBUG
def __init__(self, name='fandango.Logger', parent=None,\
format='%(levelname)-8s %(asctime)s %(name)s: %(message)s',\
use_tango=True,use_print=True,level='INFO',max_len=0):
self.max_len = max_len
self.call__init__(Object)
self.__levelAliases = LogLevels.copy()
self.log_name = name
if parent is not None:
self.full_name = '%s.%s' % (parent.full_name, name)
else:
self.full_name = name
self.log_obj = logging.getLogger(self.full_name)
self.log_handlers = []
self.use_tango = use_tango and hasattr(self,'debug_stream')
self._ForcePrint = use_print
self.parent = None
self.children = []
if parent is not None:
self.parent = weakref.ref(parent)
parent.addChild(self)
self.setLogLevel(level)
if not Logger.root_inited:
#print 'log format is ',format
self.initRoot(format)
Logger.root_inited = True
def __del__(self):
parent = self.getParent()
if parent is not None:
parent.delChild(self)
[docs] def initRoot(self,_format='%(threadName)-12s %(levelname)-8s %(asctime)s %(name)s: %(message)s'):
logging.basicConfig(level=logging.INFO,format=_format)
#logging.basicConfig(level=logging.DEBUG,
# format='%(threadName)-12s %(levelname)-8s %(asctime)s %(name)s: %(message)s')
[docs] def setLogPrint(self,force):
''' This method enables/disables a print to be executed for each log call '''
self._ForcePrint=force
[docs] def getTimeString(self,t=None):
if t is None: t=time.time()
cad='%Y-%m-%d %H:%M:%S'
s = time.strftime(cad,time.localtime(t))
ms = int((t-int(t))*1e3)
return '%s.%d'%(s,ms)
[docs] def logPrint(self,prio,msg):
name = self.log_name or ''
l = self.__levelAliases.get(prio,prio)
if l<self.log_obj.level: return
print ('%s %7s %s: %s'%(name,prio,self.getTimeString(),str(msg).replace('\r','')))
[docs] def setLogLevel(self,level):
''' This method allows to change the default logging level'''
#if isinstance(level,basestring): level = level.upper()
if type(level)==type(logging.NOTSET):
self.log_obj.setLevel(level)
self.debug('log.Logger: Logging level set to %s'%str(level).upper())
else:
l = self.getLogLevel(level)
if l is not None:
self.log_obj.setLevel(l)
self.debug('log.Logger: Logging level set to "%s" = %s'%(level,l))
else:
self.warning('log.Logger: Logging level cannot be set to "%s"'%level)
return level
setLevel = setLogLevel
[docs] def setLevelAlias(self,alias,level):
''' setLevelAlias(alias,level), allows to setup predefined levels for different tags '''
self.__levelAliases[alias]=level
[docs] def getLogLevel(self,alias=None):
if alias is None:
l = self.log_obj.level
for k,v in self.__levelAliases.items():
if v==l:
l = k
return l
else:
if not isinstance(alias,basestring):
try: return (k for k,v in self.__levelAliases.iteritems() if v==alias).next()
except: return None
elif alias.lower() in ('debug','info','warning','error'):
return logging.__dict__.get(alias.upper())
else:
try: return (v for k,v in self.__levelAliases.iteritems() if k.lower()==alias.lower()).next()
except: return None
return
[docs] def getRootLog(self):
return logging.getLogger()
[docs] def getTangoLog(self):
if not self.use_tango: return None
if getattr(self,'__tango_log',None): return self.tango_obj
try:
#import PyTango
#if PyTango.Util.instance().is_svr_starting(): return None
self.get_name() #Will trigger exception if Tango object is not ready
self.__tango_log = self
except:
print(traceback.format_exc())
self.warning('Unable to setup tango logging for %s'%self.log_name)
self.__tango_log = None
return self.__tango_log
[docs] def getParent(self):
if self.parent is None:
return None
return self.parent()
[docs] def getChildren(self):
children = []
for ref in self.children:
child = ref()
if child is not None:
children.append(child)
return children
[docs] def addChild(self, child):
ref = weakref.ref(child)
if not ref in self.children:
self.children.append(ref)
[docs] def delChild(self, child):
ref = weakref.ref(child)
if ref in self.children:
self.children.remove(ref)
def __eq__(self, other):
return self is other
[docs] def addLogHandler(self, handler):
self.log_obj.addHandler(handler)
self.log_handlers.append(handler)
[docs] def copyLogHandlers(self, other):
for handler in other.log_handlers:
self.addLogHandler(handler)
[docs] def output(self, msg, *args, **kw):
self.log_obj.log(Logger.Output, msg, *args, **kw)
[docs] def debug(self, msg, *args, **kw):
self.sendToStream(msg,'debug',3,*args,**kw)
[docs] def trace(self, msg, *args, **kw):
self.sendToStream(msg,'debug',3,*args,**kw)
[docs] def info(self, msg, *args, **kw):
self.sendToStream(msg,'info',2,*args,**kw)
[docs] def warning(self, msg, *args, **kw):
self.sendToStream(msg,'warning',1,*args,**kw)
[docs] def error(self, msg, *args, **kw):
self.sendToStream(msg,'error',0,*args,**kw)
[docs] def sendToStream(self,msg,level,prio,*args,**kw):
#stream should be a number in trace=4,debug=3,info=2,warning=1,error=0
try:
prio = min(prio,3)
if self.max_len>0: msg = shortstr(msg,self.max_len)
msg = str(msg).replace('\r','').replace('%','%%')
obj = self.getTangoLog()
if obj:
stream = (obj.error_stream,obj.warn_stream,obj.info_stream,obj.debug_stream)[prio]
stream(msg, *args, **kw)
elif self._ForcePrint:
self.logPrint(level.upper(),msg)
else:
stream = (self.log_obj.error,self.log_obj.warning,self.log_obj.info,self.log_obj.debug)[prio]
stream(msg, *args, **kw)
except Exception,e:
print 'Exception in Logger.%s! \nmsg:%s\ne:%s\nargs:%s\nkw:%s'%(level,msg,e,str(args),str(kw))
print traceback.format_exc()
[docs] def deprecated(self, msg, *args, **kw):
filename, lineno, func = self.log_obj.findCaller()
depr_msg = warnings.formatwarning(msg, DeprecationWarning, filename, lineno)
self.log_obj.warning(depr_msg, *args, **kw)
[docs] def flushOutput(self):
self.syncLog()
[docs] def syncLog(self):
logger = self
synced = []
while logger is not None:
for handler in logger.log_handlers:
if handler in synced:
continue
try:
sync = getattr(handler, 'sync')
except:
continue
sync()
synced.append(handler)
logger = logger.getParent()
[docs] def changeLogName(self,name):
"""Change the log name."""
p = self.getParent()
if p is not None:
self.full_name = '%s.%s' % (p.full_name, name)
else:
self.full_name = name
self.log_obj = logging.getLogger(self.full_name)
for handler in self.log_handlers:
self.log_obj.addHandler(handler)
for child in self.getChildren():
self.changeLogName(child.log_name)
[docs]class LogFilter(logging.Filter):
def __init__(self, level):
self.filter_level = level
logging.Filter.__init__(self)
[docs] def filter(self, record):
ok = (record.levelno == self.filter_level)
return ok
__doc__ += """
fandango.logger submodule provides a default Logger instance
and its info/debug/warning/error/trace methods directly available
as module methods.
import fandango.log
fandango.log.info('just a test')
fandango.Logger.INFO 2016-02-19 11:49:55.609 just a test
"""
_LogLevel = 'INFO'
for a in sys.argv:
if a.startswith('--log-level='):
_LogLevel = a.split('=')[-1].upper()
_Logger = Logger(level=_LogLevel)
info = _Logger.info
debug = _Logger.debug
warning = _Logger.warning
error = _Logger.error
trace = _Logger.trace
[docs]class InOutLogged(Decorator):
"""
This class provides an easy way to trace whenever python enter/leaves
a function.
"""
def __call__(self,*args,**kwargs):
debug('In %s(%s,%s)'%(self.f.__name__,args,kwargs,))
r = self.f(*args,**kwargs)
debug('Out of '+self.f.__name__)
return r
from . import doc
__doc__ = doc.get_fn_autodoc(__name__,vars())