1
2
3 """This modules contains utilities to do opeations in batch mode.
4 The module can be used from within python if required, but is more commonly used
5 from the commandline, possibly wrapped by a shell script to specialize its usae
6 """
7 import sys,os
8 import signal
9 from collections import deque
10 import subprocess
11 import time
12
13
14 __all__ = None
15
16 -def superviseJobs( jobs, returnIfLessThan, cmdinput, errorstream, donestream ):
17 """Check on the jobs we have and wait for finished ones. Write information
18 about them into the respective streams
19 :param returnIfLessThan: return once we have less than the given amount of running jobs"""
20 sleeptime = 1.0
21
22 if not jobs:
23 return
24
25 while True:
26
27 jobscp = jobs[:]
28 for process in jobscp:
29
30 if process.poll() == None:
31 continue
32
33
34 jobs.remove( process )
35
36
37 if errorstream:
38 errorstream.writelines( process.stderr.readlines() )
39 errorstream.flush()
40
41
42 if donestream is not None and process.returncode == 0:
43 donestream.writelines( "\n".join( cmdinput ) + "\n" )
44 donestream.flush()
45
46
47 if len( jobs ) < returnIfLessThan:
48 return
49
50
51
52 time.sleep( sleeptime )
53
54
56 """Kill the given process
57 :note: raises if kill is not supported by the os module"""
58 if not hasattr( os, "kill" ):
59 raise NotImplementedError( "os module does not support 'kill'ing of processes on your platform" )
60
61 os.kill( process.pid, signal.SIGKILL )
62
63
64
65 -def process( cmd, args, inputList, errorstream = None, donestream = None, inputsPerProcess = 1,
66 numJobs=1):
67 """Launch process at cmd with args and a list of input objects from inputList appended to args
68 :param cmd: full path to tool you wish to start, like /bin/bash
69 :param args: List of all argument strings to be passed to cmd
70 :param inputList: list of input files to be passed as input to cmd
71 :param errorstream: stream to which errors will be written to as they occour if not None
72 :param donestream: stream to which items from input list will be passed once they
73 have been processed if not None. Items are newline terminated
74 :param inputsPerProcess: pass the given number of inputs to the cmd, or less if there
75 are not enough items on the input list
76 :param numJobs: number of processes we may run in parallel
77 """
78
79 jobs = list()
80 numInputs = len( inputList )
81 for i in range( 0, numInputs, inputsPerProcess ):
82
83 cmdinput = inputList[ i : i + inputsPerProcess ]
84 callcmd = (cmd,)+tuple(args)+tuple(cmdinput)
85 process = subprocess.Popen( callcmd,stderr=subprocess.PIPE, stdin=subprocess.PIPE, env=os.environ )
86
87 jobs.append( process )
88
89
90 try:
91 process.stdin.writelines( '\n'.join( cmdinput ) )
92 process.stdin.flush()
93 process.stdin.close()
94 except IOError:
95 pass
96
97
98
99 if len( jobs ) < numJobs:
100 continue
101
102
103 if len( jobs ) != numJobs:
104 raise AssertionError( "invalid job count" )
105
106
107 try:
108 superviseJobs( jobs, numJobs, cmdinput, errorstream, donestream )
109 except KeyboardInterrupt:
110
111 for process in jobs:
112 killProcess( process )
113 jobs = list()
114 sys.stdout.write("Aborted all running processes - continuing\n")
115
116
117
118 superviseJobs( jobs, 1, list(), errorstream, donestream )
119
120
121
122
124 """Print usage"""
125 sys.stdout.write("""python batch.py inputarg [inputarg ...] [-E fileForErrors|-] [-D fileForFinishedOutput|-] [-s numInputsPerProcess] -e cmd [cmdArg ...]
126 -E|D - means to use the default stream, either stderr or stdout
127 -I if specified, arguments will also be read from stdin until it is depleted as
128 newline separated list of names
129 Its particlularly important that the pipe to stdin closes once its done as
130 this command currently does not support streaming of input args
131 -e ends the parsing of commandline arguments for the batch process tool
132 and uses the rest of the commandline as direct input for your command
133 -s defines how many input arguments will be passed per command invocation
134 -j the number of processes to keep running in parallel, default 1
135
136 The given inputargs will be passed as arguments to the commands or into
137 the standardinput of the process""")
138 if msg:
139 sys.stdout.write(msg+"\n")
140
141 sys.exit(1)
142
143
145 """:return: stream according to arg
146 :param stream: stream to return if arg sais so """
147 if arg == "-":
148 return stream
149
150
151
152 try:
153 return open( arg, "w" )
154 except IOError:
155 _usageAndExit( "Stream at %s could not be opened for writing" % arg )
156
157
159 """pop an arg from argv and return with an error message on error"""
160 try:
161 return argv.popleft()
162 except IndexError:
163 _usageAndExit( errmsg )
164
165
167 """Processes the arguments"""
168
169 if not args:
170 _usageAndExit( )
171
172 inputList = list()
173 streams = list( ( None, None ) )
174
175 numJobs = 1
176 inputsPerProcess = 1
177 cmd = None
178 cmdargs = list()
179 haveReadInput = False
180
181
182
183
184 argv = deque( args )
185 while argv:
186 arg = argv.popleft()
187
188
189
190 if arg == "-e":
191 cmd = _popleftchecked( argv, "-e must be followed by the command to execute" )
192
193
194 for rarg in argv:
195 cmdargs.append( rarg )
196
197
198 break
199
200
201
202
203 flagfound = False
204 for i,(flag,stream) in enumerate( ( ( "-E",sys.stderr ), ( "-D", sys.stdout ) ) ):
205 if arg == flag:
206 argval = _popleftchecked( argv, "%s must be followed by - or a filepath" % flag )
207 streams[ i ] = _toStream( argval, stream )
208 flagfound = True
209 break
210
211
212
213 if flagfound: continue
214
215 if arg == "-s":
216 msg = "-s must be followed by a number > 0"
217 inputsPerProcess = int( _popleftchecked( argv, msg ) )
218 flagfound = True
219 if inputsPerProcess < 1:
220 _usageAndExit( msg )
221
222
223 if flagfound: continue
224 if arg == "-j":
225 msg = "-j must be followed by a number > 0"
226 numJobs = int( _popleftchecked( argv, msg ) )
227 flagfound = True
228 if numJobs < 1:
229 _usageAndExit( msg )
230
231
232 if flagfound: continue
233
234
235 if arg == "-I":
236 flagfound = True
237 if haveReadInput:
238 _usageAndExit( "-I may only be specified once" )
239
240 haveReadInput = True
241
242 inputList.extend( ( l.strip() for l in sys.stdin.readlines() ) )
243
244
245 if flagfound: continue
246
247
248 inputList.append( arg )
249
250
251
252
253 if not cmd:
254 _usageAndExit( "No command to execute - add it after the -e flag" )
255
256
257
258 process( cmd, cmdargs, inputList, streams[0], streams[1], inputsPerProcess, numJobs )
259
260
261
262 if __name__ == "__main__":
263 main( *sys.argv[1:] )
264
265
266