Package mrv :: Module batch
[hide private]
[frames] | no frames]

Source Code for Module mrv.batch

  1  #!/usr/bin/env python  
  2  # -*- coding: utf-8 -*- 
  3  """This modules contains utilities to do opeations in batch mode. 
  4  The module can be used from within python if required, but is more commonly used 
  5  from the commandline, possibly wrapped by a shell script to specialize its usae 
  6  """ 
  7  import sys,os 
  8  import signal 
  9  from collections import deque 
 10  import subprocess 
 11  import time 
 12   
 13  # module is supposed to be used as standalone program - we prevent from x import * 
 14  __all__ = None 
 15   
16 -def superviseJobs( jobs, returnIfLessThan, cmdinput, errorstream, donestream ):
17 """Check on the jobs we have and wait for finished ones. Write information 18 about them into the respective streams 19 :param returnIfLessThan: return once we have less than the given amount of running jobs""" 20 sleeptime = 1.0 # wait one second in the main loop before checking the processes 21 22 if not jobs: 23 return 24 25 while True: 26 27 jobscp = jobs[:] # are going to alter the jobs queue 28 for process in jobscp: 29 # check if subprocess is done 30 if process.poll() == None: 31 continue 32 33 # pop the process off the queue 34 jobs.remove( process ) 35 36 # the process finished - get the stderr 37 if errorstream: 38 errorstream.writelines( process.stderr.readlines() ) 39 errorstream.flush() 40 41 # append to the done list only if there is no error 42 if donestream is not None and process.returncode == 0: 43 donestream.writelines( "\n".join( cmdinput ) + "\n" ) 44 donestream.flush() 45 46 # can we return ? 47 if len( jobs ) < returnIfLessThan: 48 return 49 50 # END for each job 51 52 time.sleep( sleeptime )
53 # END endless loop 54
55 -def killProcess( process ):
56 """Kill the given process 57 :note: raises if kill is not supported by the os module""" 58 if not hasattr( os, "kill" ): 59 raise NotImplementedError( "os module does not support 'kill'ing of processes on your platform" ) 60 61 os.kill( process.pid, signal.SIGKILL )
62 63 64
65 -def process( cmd, args, inputList, errorstream = None, donestream = None, inputsPerProcess = 1, 66 numJobs=1):
67 """Launch process at cmd with args and a list of input objects from inputList appended to args 68 :param cmd: full path to tool you wish to start, like /bin/bash 69 :param args: List of all argument strings to be passed to cmd 70 :param inputList: list of input files to be passed as input to cmd 71 :param errorstream: stream to which errors will be written to as they occour if not None 72 :param donestream: stream to which items from input list will be passed once they 73 have been processed if not None. Items are newline terminated 74 :param inputsPerProcess: pass the given number of inputs to the cmd, or less if there 75 are not enough items on the input list 76 :param numJobs: number of processes we may run in parallel 77 """ 78 # very simple for now - just get the input together and call the cmd 79 jobs = list() 80 numInputs = len( inputList ) 81 for i in range( 0, numInputs, inputsPerProcess ): 82 83 cmdinput = inputList[ i : i + inputsPerProcess ] # deals with bounds 84 callcmd = (cmd,)+tuple(args)+tuple(cmdinput) 85 process = subprocess.Popen( callcmd,stderr=subprocess.PIPE, stdin=subprocess.PIPE, env=os.environ ) 86 87 jobs.append( process ) 88 89 # fill our input argumets additionally to stdin 90 try: 91 process.stdin.writelines( '\n'.join( cmdinput ) ) 92 process.stdin.flush() 93 process.stdin.close() 94 except IOError: 95 pass # could be closed already 96 97 98 # get another job ? 99 if len( jobs ) < numJobs: 100 continue 101 102 103 if len( jobs ) != numJobs: 104 raise AssertionError( "invalid job count" ) 105 106 # we have a full queue now - get a new one asap 107 try: 108 superviseJobs( jobs, numJobs, cmdinput, errorstream, donestream ) 109 except KeyboardInterrupt: 110 # kill all processes - we do not know which one hangs 111 for process in jobs: 112 killProcess( process ) 113 jobs = list() 114 sys.stdout.write("Aborted all running processes - continuing\n") 115 # END for each chunk of inputs 116 117 # queue is empty, finalize our pending jobs 118 superviseJobs( jobs, 1, list(), errorstream, donestream )
119 120 121 #{ Command Line Tool 122
123 -def _usageAndExit( msg = None ):
124 """Print usage""" 125 sys.stdout.write("""python batch.py inputarg [inputarg ...] [-E fileForErrors|-] [-D fileForFinishedOutput|-] [-s numInputsPerProcess] -e cmd [cmdArg ...] 126 -E|D - means to use the default stream, either stderr or stdout 127 -I if specified, arguments will also be read from stdin until it is depleted as 128 newline separated list of names 129 Its particlularly important that the pipe to stdin closes once its done as 130 this command currently does not support streaming of input args 131 -e ends the parsing of commandline arguments for the batch process tool 132 and uses the rest of the commandline as direct input for your command 133 -s defines how many input arguments will be passed per command invocation 134 -j the number of processes to keep running in parallel, default 1 135 136 The given inputargs will be passed as arguments to the commands or into 137 the standardinput of the process""") 138 if msg: 139 sys.stdout.write(msg+"\n") 140 141 sys.exit(1)
142 143
144 -def _toStream( arg, stream ):
145 """:return: stream according to arg 146 :param stream: stream to return if arg sais so """ 147 if arg == "-": 148 return stream 149 # stream handling 150 151 # arg should be a file 152 try: 153 return open( arg, "w" ) 154 except IOError: 155 _usageAndExit( "Stream at %s could not be opened for writing" % arg )
156 157
158 -def _popleftchecked( argv, errmsg ):
159 """pop an arg from argv and return with an error message on error""" 160 try: 161 return argv.popleft() 162 except IndexError: 163 _usageAndExit( errmsg )
164 165
166 -def main( *args ):
167 """Processes the arguments""" 168 169 if not args: 170 _usageAndExit( ) 171 172 inputList = list() 173 streams = list( ( None, None ) ) 174 175 numJobs = 1 176 inputsPerProcess = 1 177 cmd = None 178 cmdargs = list() 179 haveReadInput = False 180 181 182 # PARSE ARGUMENTS 183 ################## 184 argv = deque( args ) 185 while argv: 186 arg = argv.popleft() 187 188 # COMAMND TO EXECUTE 189 ##################### 190 if arg == "-e": 191 cmd = _popleftchecked( argv, "-e must be followed by the command to execute" ) 192 193 # get cmd args 194 for rarg in argv: 195 cmdargs.append( rarg ) 196 197 # done processing 198 break 199 # END -e 200 201 # STREAMS 202 ############ 203 flagfound = False 204 for i,(flag,stream) in enumerate( ( ( "-E",sys.stderr ), ( "-D", sys.stdout ) ) ): 205 if arg == flag: 206 argval = _popleftchecked( argv, "%s must be followed by - or a filepath" % flag ) 207 streams[ i ] = _toStream( argval, stream ) 208 flagfound = True 209 break 210 # END if arg matches 211 # END for each stream arg 212 213 if flagfound: continue 214 215 if arg == "-s": 216 msg = "-s must be followed by a number > 0" 217 inputsPerProcess = int( _popleftchecked( argv, msg ) ) 218 flagfound = True 219 if inputsPerProcess < 1: 220 _usageAndExit( msg ) 221 # END -s 222 223 if flagfound: continue 224 if arg == "-j": 225 msg = "-j must be followed by a number > 0" 226 numJobs = int( _popleftchecked( argv, msg ) ) 227 flagfound = True 228 if numJobs < 1: 229 _usageAndExit( msg ) 230 # END -s 231 232 if flagfound: continue 233 234 # INPUT ARGUMENTS FROM STDIN 235 if arg == "-I": 236 flagfound = True 237 if haveReadInput: 238 _usageAndExit( "-I may only be specified once" ) 239 240 haveReadInput = True 241 # read stripped lines from stdin 242 inputList.extend( ( l.strip() for l in sys.stdin.readlines() ) ) 243 # END -s 244 245 if flagfound: continue 246 247 # its an input argument 248 inputList.append( arg ) 249 250 # END for each argument 251 252 253 if not cmd: 254 _usageAndExit( "No command to execute - add it after the -e flag" ) 255 256 257 # have everything, transfer control to the actual batch method 258 process( cmd, cmdargs, inputList, streams[0], streams[1], inputsPerProcess, numJobs )
259 260 261 262 if __name__ == "__main__": 263 main( *sys.argv[1:] ) 264 265 #} END command line tool 266