import os
import subprocess
from multiprocessing import Process
import time
[docs]class Runner:
def __init__(self, code, code_bin, environment, use_mpi=True, nmpiproc=2, nconcurrent=1, runtime=3600):
if code.lower() not in ['abinit', 'vasp', 'octopus', 'dftb', 'fireball']:
raise ValueError('Code not supported: ', code)
else:
self.code = code.lower()
if environment.lower() not in ['local', 'remote']:
raise ValueError('Environment must be local or remote: ')
else:
self.environment = environment.lower()
self.use_mpi = use_mpi
self.nmpiproc = nmpiproc
self.nconcurrent = nconcurrent
self.code_bin = code_bin
self.runtime = runtime
[docs] def initialize(self, dirpath):
"""
Utility that copy a given script and execute the given
command inside the directory
:param dirpath: (str) Directory to execute runner
"""
if not os.path.isdir(dirpath):
os.mkdir(dirpath)
if self.code == 'abinit':
pass
# pychemia.code.abinit.AbiFiles(basedir=dirpath)
elif self.code == 'vasp':
pass
[docs] def run(self, dirpath='.', analyser=None):
if self.code == 'abinit':
outfile = 'abinit.stdout'
errfile = 'abinit.stderr'
infile = 'abinit.in'
elif self.code == 'vasp':
outfile = 'vasp.stdout'
errfile = 'vasp.stderr'
infile = None
if self.environment == 'local':
def worker(path):
cwd = os.getcwd()
os.chdir(path)
outf = open(outfile, 'a')
errf = open(errfile, 'a')
for i in [outf, errf]:
i.write('' + (40 * '=') + ' New Run ' + (40 * '=') + '\n')
if infile is not None:
rf = open(infile, 'r')
else:
rf = None
initime = time.time()
if self.use_mpi:
childp = subprocess.Popen(['mpirun', '-np', str(self.nmpiproc),
'--map-by', 'socket:PE=2', self.code_bin],
stdin=rf, stdout=outf, stderr=errf)
else:
childp = subprocess.Popen([self.code_bin], stdin=rf, stdout=outf, stderr=errf)
childp.poll()
while childp.returncode is None:
childp.poll()
rf = open('vasp.stdout', 'r')
for iline in rf.readlines()[-20:]:
if iline.strip().startswith("WARNING: Sub-Space-Matrix is not hermitian in DAV"):
print("[WARNING: Sub-Space-Matrix is not hermitian in DAV] Stopping now...")
self._stop_run(childp)
if time.time() - initime > self.runtime:
self._stop_run(childp)
break
else:
if analyser is not None:
ret = analyser()
print(os.path.basename(path), ret)
time.sleep(60)
print('The return code was', childp.returncode)
os.chdir(cwd)
errf.close()
outf.close()
p = Process(target=worker, args=(dirpath,))
p.start()
p.join()
return p
@staticmethod
def _stop_run(childp, softtime=600, extratime=60):
print('Creating Stoping files')
wf = open('CHKPT', 'w')
wf.close()
wf = open('STOPCAR', 'w')
wf.write('LSTOP = .TRUE.\n')
wf.close()
time.sleep(softtime)
childp.poll()
if childp.returncode is None:
print('Sending SIGTERM to process')
childp.terminate()
time.sleep(extratime)
childp.poll()
if childp.returncode is None:
print('Sending SIGKILL to process')
childp.kill()
if os.path.isfile('CHKPT'):
os.remove('CHKPT')
if os.path.isfile('STOPCAR'):
os.remove('STOPCAR')
[docs] def run_multidirs(self, workdirs, worker, checker):
pt = []
for i in range(self.nconcurrent):
pt.append(None)
index = 0
while True:
for i in range(self.nconcurrent):
if pt[i] is None or not pt[i].is_alive():
ret = checker(workdirs[index])
if ret:
print('Launching for process ' + str(i) + ' on dir ' + os.path.basename(
workdirs[index]) + ' index ' + str(index))
pt[i] = Process(target=worker, args=(workdirs[index],))
pt[i].start()
index = (index + 1) % len(workdirs)
time.sleep(30)
complete = True
for idir in workdirs:
if not os.path.isfile(idir + os.sep + 'COMPLETE'):
complete = False
if complete:
print('Finishing...')
break
[docs] def run_multidirs_nonstop(self, workdirs, worker, checker):
def superworker(s_workdirs, s_worker, s_checker):
self.run_multidirs(s_workdirs, s_worker, s_checker)
proc = Process(target=superworker, args=(workdirs, worker, checker))
proc.start()