#!/usr/bin/env python
# encoding: utf-8
# Author: sorin sbarnea
# License: public domain
from __future__ import print_function
from __future__ import unicode_literals
import logging, sys, subprocess, types, time, os, codecs, unittest
if sys.hexversion < 0x02060000:
sys.stderr.write("You need Python 2.6 or newer.\n")
sys.exit(-1)
if sys.version_info[0] == 3:
string_types = str,
else:
string_types = basestring,
global logger
global stdout
global stderr
global timing
global log_command
logger = None
stdout = False
stderr = False
timing = True # print execution time of each command in the log, just after the return code
log_command = True # outputs the command being executed to the log (before command output)
_sentinel = object()
def quote_command(cmd):
"""
This function does assure that the command line is entirely quoted.
This is required in order to prevent getting "The input line is too long" error message.
"""
if not (os.name == "nt" or os.name == "dos"):
return cmd # the escaping is required only on Windows platforms, in fact it will break cmd line on others
if '"' in cmd[1:-1]:
cmd = '"' + cmd + '"'
return cmd
[docs]def system2(cmd, cwd=None, logger=_sentinel, stdout=_sentinel, log_command=_sentinel, timing=_sentinel):
#def tee(cmd, cwd=None, logger=tee_logger, console=tee_console):
""" Works exactly like :func:`system` but it returns both the exit code and the output as a list of lines.
This method returns a tuple: (return_code, output_lines_as_list). The return code of 0 means success.
"""
t = time.clock()
output = []
if log_command is _sentinel: log_command = globals().get('log_command')
if timing is _sentinel: timing = globals().get('timing')
if logger is _sentinel: # default to python native logger if logger parameter is not used
logger = globals().get('logger')
if stdout is _sentinel:
stdout = globals().get('stdout')
#logging.debug("logger=%s stdout=%s" % (logger, stdout))
f = sys.stdout
if not f.encoding or f.encoding == 'ascii':
# `ascii` is not a valid encoding by our standards, it's better to output to UTF-8 because it can encoding any Unicode text
encoding = 'utf_8'
else:
encoding = f.encoding
def filelogger(msg):
try:
msg += '\n' # we'll use the same endline on all platforms, you like it or not
try:
f.write(msg)
except TypeError:
f.write(msg.encode("utf-8"))
except Exception as e:
import traceback
print(' ****** ERROR: Exception: %s\nencoding = %s' % (e, encoding))
traceback.print_exc(file=sys.stderr)
sys.exit(-1)
pass
def nop(msg):
pass
if not logger:
mylogger = nop
elif isinstance(logger, string_types):
f = codecs.open(logger, "a+b", 'utf_8')
mylogger = filelogger
elif isinstance(logger, (types.FunctionType, types.MethodType, types.BuiltinFunctionType)):
mylogger = logger
else:
method_write = getattr(logger, "write", None)
# if we can call write() we'll aceppt it :D
if hasattr(method_write,'__call__'): # this should work for filehandles
f = logger
mylogger = filelogger
else:
sys.exit("tee() does not support this type of logger=%s" % type(logger))
if cwd is not None and not os.path.isdir(cwd):
os.makedirs(cwd) # this throws exception if fails
cmd = quote_command(cmd) # to prevent _popen() bug
p = subprocess.Popen(cmd, cwd=cwd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if log_command:
mylogger("Running: %s" % cmd)
while True:
line=""
try:
line = p.stdout.readline()
line = line.decode(encoding)
except Exception as e:
logging.error(e)
logging.error("The output of the command could not be decoded as %s\ncmd: %s\n line ignored: %s" %\
(encoding, cmd, repr(line)))
pass
output.append(line)
if not line:
break
line = line.rstrip('\n\r')
mylogger(line) # they are added by logging anyway
if stdout :
print(line)
returncode = p.wait()
if log_command :
if timing:
def secondsToStr(t):
from functools import reduce
return "%02d:%02d:%02d" % reduce(lambda ll,b : divmod(ll[0],b) + ll[1:], [(t*1000,),1000,60,60])[:3]
mylogger("Returned: %d (execution time %s)\n" % (returncode, secondsToStr(time.clock()-t)))
else:
mylogger("Returned: %d\n" % returncode)
if not returncode == 0: # running a tool that returns non-zero? this deserves a warning
logging.warning("Returned: %d from: %s\nOutput %s" % (returncode, cmd, '\n'.join(output)))
return returncode, output
[docs]def system(cmd, cwd=None, logger=None, stdout=None, log_command=_sentinel, timing=_sentinel):
""" This works similar to :py:func:`os.system` but add some useful optional parameters.
* ``cmd`` - command to be executed
* ``cwd`` - optional working directory to be set before running cmd
* ``logger`` - None, 'log.txt', handle or a function like print or :py:meth:`logging.Logger.warning`
Returns the exit code reported by the execution of the command, 0 means success.
>>> import os, logging
>>> tee.system("echo test", logger=logging.error) # output using python logging
>>> tee.system("echo test", logger="log.txt") # output to a file
>>> f = open("log.txt", "w")
>>> tee.system("echo test", logger=f) # output to a filehandle
>>> tee.system("echo test", logger=print) # use the print() function for output
"""
(returncode, output) = system2(cmd, cwd=cwd, logger=logger, stdout=stdout, log_command=log_command, timing=timing)
return returncode
class testTee(unittest.TestCase):
def test_1(self):
"""
CMD os.system()
1 sort /? ok ok
2 "sort" /? ok ok
3 sort "/?" ok ok
4 "sort" "/?" ok [bad]
5 ""sort /?"" ok [bad]
6 "sort /?" [bad] ok
7 "sort "/?"" [bad] ok
8 ""sort" "/?"" [bad] ok
"""
quotes = {
'dir >nul': 'dir >nul',
'cd /D "C:\\Program Files\\"':'"cd /D "C:\\Program Files\\""',
'python -c "import os" dummy':'"python -c "import os" dummy"',
'sort':'sort',
}
# we fake the os name because we want to run the test on any platform
save = os.name
os.name = 'nt'
for key, value in quotes.iteritems():
resulted_value = quote_command(key)
self.assertEqual(value, resulted_value, "Returned <%s>, expected <%s>" % (resulted_value, value))
#ret = os.system(resulted_value)
#if not ret==0:
# print("failed")
os.name = save
if __name__ == '__main__':
import os
unittest.main()
"""
import colorer
import tempfile, os
logging.basicConfig(level=logging.NOTSET,
format='%(message)s')
# default (stdout)
print("#1")
system("python --version")
# function/method
print("#2")
system("python --version", logger=logging.error)
# function (this is the same as default)
print("#3")
system("python --version", logger=print)
# handler
print("#4")
f = tempfile.NamedTemporaryFile()
system("python --version", logger=f)
f.close()
# test with string (filename)
print("#5")
(f, fname) = tempfile.mkstemp()
system("python --version", logger=fname)
os.close(f)
os.unlink(fname)
print("#6")
stdout = False
logger = None
system("echo test")
print("#7")
stdout = True
system("echo test2")
"""