Source code for monty.subprocess
# coding: utf-8
from __future__ import absolute_import, print_function, division, \
unicode_literals
__author__ = 'Matteo Giantomass'
__copyright__ = "Copyright 2014, The Materials Virtual Lab"
__version__ = '0.1'
__maintainer__ = 'Matteo Giantomassi'
__email__ = 'gmatteo@gmail.com'
__date__ = '10/26/14'
[docs]class Command(object):
"""
Enables to run subprocess commands in a different thread with TIMEOUT
option.
Based on jcollado's solution:
http://stackoverflow.com/questions/1191374/subprocess-with-timeout/4825933#4825933
and
https://gist.github.com/kirpit/1306188
.. attribute:: retcode
Return code of the subprocess
.. attribute:: killed
True if subprocess has been killed due to the timeout
.. attribute:: output
stdout of the subprocess
.. attribute:: error
stderr of the subprocess
Example:
com = Command("sleep 1").run(timeout=2)
print(com.retcode, com.killed, com.output, com.output)
"""
def __init__(self, command):
from .string import is_string
if is_string(command):
import shlex
command = shlex.split(command)
self.command = command
self.process = None
self.retcode = None
self.output, self.error = '', ''
self.killed = False
def __str__(self):
return "command: %s, retcode: %s" % (self.command, self.retcode)
[docs] def run(self, timeout=None, **kwargs):
"""
Run a command in a separated thread and wait timeout seconds.
kwargs are keyword arguments passed to Popen.
Return: self
"""
from subprocess import Popen, PIPE
def target(**kw):
try:
#print('Thread started')
self.process = Popen(self.command, **kw)
self.output, self.error = self.process.communicate()
self.retcode = self.process.returncode
#print('Thread stopped')
except:
import traceback
self.error = traceback.format_exc()
self.retcode = -1
# default stdout and stderr
if 'stdout' not in kwargs:
kwargs['stdout'] = PIPE
if 'stderr' not in kwargs:
kwargs['stderr'] = PIPE
# thread
import threading
thread = threading.Thread(target=target, kwargs=kwargs)
thread.start()
thread.join(timeout)
if thread.is_alive():
#print("Terminating process")
self.process.terminate()
self.killed = True
thread.join()
return self