3
"""This modules contains utilities to do opeations in batch mode.
4
The module can be used from within python if required, but is more commonly used
5
from the commandline, possibly wrapped by a shell script to specialize its usae
9
from collections import deque
13
# module is supposed to be used as standalone program - we prevent from x import *
16
def superviseJobs( jobs, returnIfLessThan, cmdinput, errorstream, donestream ):
17
"""Check on the jobs we have and wait for finished ones. Write information
18
about them into the respective streams
19
:param returnIfLessThan: return once we have less than the given amount of running jobs"""
20
sleeptime = 1.0 # wait one second in the main loop before checking the processes
27
jobscp = jobs[:] # are going to alter the jobs queue
28
for process in jobscp:
29
# check if subprocess is done
30
if process.poll() == None:
33
# pop the process off the queue
34
jobs.remove( process )
36
# the process finished - get the stderr
38
errorstream.writelines( process.stderr.readlines() )
41
# append to the done list only if there is no error
42
if donestream is not None and process.returncode == 0:
43
donestream.writelines( "\n".join( cmdinput ) + "\n" )
47
if len( jobs ) < returnIfLessThan:
52
time.sleep( sleeptime )
55
def killProcess( process ):
56
"""Kill the given process
57
:note: raises if kill is not supported by the os module"""
58
if not hasattr( os, "kill" ):
59
raise NotImplementedError( "os module does not support 'kill'ing of processes on your platform" )
61
os.kill( process.pid, signal.SIGKILL )
65
def process( cmd, args, inputList, errorstream = None, donestream = None, inputsPerProcess = 1,
67
"""Launch process at cmd with args and a list of input objects from inputList appended to args
68
:param cmd: full path to tool you wish to start, like /bin/bash
69
:param args: List of all argument strings to be passed to cmd
70
:param inputList: list of input files to be passed as input to cmd
71
:param errorstream: stream to which errors will be written to as they occour if not None
72
:param donestream: stream to which items from input list will be passed once they
73
have been processed if not None. Items are newline terminated
74
:param inputsPerProcess: pass the given number of inputs to the cmd, or less if there
75
are not enough items on the input list
76
:param numJobs: number of processes we may run in parallel
78
# very simple for now - just get the input together and call the cmd
80
numInputs = len( inputList )
81
for i in range( 0, numInputs, inputsPerProcess ):
83
cmdinput = inputList[ i : i + inputsPerProcess ] # deals with bounds
84
callcmd = (cmd,)+tuple(args)+tuple(cmdinput)
85
process = subprocess.Popen( callcmd,stderr=subprocess.PIPE, stdin=subprocess.PIPE, env=os.environ )
87
jobs.append( process )
89
# fill our input argumets additionally to stdin
91
process.stdin.writelines( '\n'.join( cmdinput ) )
95
pass # could be closed already
99
if len( jobs ) < numJobs:
103
if len( jobs ) != numJobs:
104
raise AssertionError( "invalid job count" )
106
# we have a full queue now - get a new one asap
108
superviseJobs( jobs, numJobs, cmdinput, errorstream, donestream )
109
except KeyboardInterrupt:
110
# kill all processes - we do not know which one hangs
112
killProcess( process )
114
sys.stdout.write("Aborted all running processes - continuing\n")
115
# END for each chunk of inputs
117
# queue is empty, finalize our pending jobs
118
superviseJobs( jobs, 1, list(), errorstream, donestream )
123
def _usageAndExit( msg = None ):
125
sys.stdout.write("""python batch.py inputarg [inputarg ...] [-E fileForErrors|-] [-D fileForFinishedOutput|-] [-s numInputsPerProcess] -e cmd [cmdArg ...]
126
-E|D - means to use the default stream, either stderr or stdout
127
-I if specified, arguments will also be read from stdin until it is depleted as
128
newline separated list of names
129
Its particlularly important that the pipe to stdin closes once its done as
130
this command currently does not support streaming of input args
131
-e ends the parsing of commandline arguments for the batch process tool
132
and uses the rest of the commandline as direct input for your command
133
-s defines how many input arguments will be passed per command invocation
134
-j the number of processes to keep running in parallel, default 1
136
The given inputargs will be passed as arguments to the commands or into
137
the standardinput of the process""")
139
sys.stdout.write(msg+"\n")
144
def _toStream( arg, stream ):
145
""":return: stream according to arg
146
:param stream: stream to return if arg sais so """
151
# arg should be a file
153
return open( arg, "w" )
155
_usageAndExit( "Stream at %s could not be opened for writing" % arg )
158
def _popleftchecked( argv, errmsg ):
159
"""pop an arg from argv and return with an error message on error"""
161
return argv.popleft()
163
_usageAndExit( errmsg )
167
"""Processes the arguments"""
173
streams = list( ( None, None ) )
179
haveReadInput = False
189
#####################
191
cmd = _popleftchecked( argv, "-e must be followed by the command to execute" )
195
cmdargs.append( rarg )
204
for i,(flag,stream) in enumerate( ( ( "-E",sys.stderr ), ( "-D", sys.stdout ) ) ):
206
argval = _popleftchecked( argv, "%s must be followed by - or a filepath" % flag )
207
streams[ i ] = _toStream( argval, stream )
211
# END for each stream arg
213
if flagfound: continue
216
msg = "-s must be followed by a number > 0"
217
inputsPerProcess = int( _popleftchecked( argv, msg ) )
219
if inputsPerProcess < 1:
223
if flagfound: continue
225
msg = "-j must be followed by a number > 0"
226
numJobs = int( _popleftchecked( argv, msg ) )
232
if flagfound: continue
234
# INPUT ARGUMENTS FROM STDIN
238
_usageAndExit( "-I may only be specified once" )
241
# read stripped lines from stdin
242
inputList.extend( ( l.strip() for l in sys.stdin.readlines() ) )
245
if flagfound: continue
247
# its an input argument
248
inputList.append( arg )
250
# END for each argument
254
_usageAndExit( "No command to execute - add it after the -e flag" )
257
# have everything, transfer control to the actual batch method
258
process( cmd, cmdargs, inputList, streams[0], streams[1], inputsPerProcess, numJobs )
262
if __name__ == "__main__":
263
main( *sys.argv[1:] )
265
#} END command line tool