mrv.batch
Covered: 40 lines
Missed: 122 lines
Skipped 104 lines
Percent: 24 %
  3
"""This modules contains utilities to do opeations in batch mode.
  4
The module can be used from within python if required, but is more commonly used
  5
from the commandline, possibly wrapped by a shell script to specialize its usae
  6
"""
  7
import sys,os
  8
import signal
  9
from collections import deque
 10
import subprocess
 11
import time
 14
__all__ = None
 16
def superviseJobs( jobs, returnIfLessThan, cmdinput, errorstream, donestream ):
 17
	"""Check on the jobs we have and wait for finished ones. Write information
 18
	about them into the respective streams
 19
	:param returnIfLessThan: return once we have less than the given amount of running jobs"""
 20
	sleeptime = 1.0		 # wait one second in the main loop before checking the processes
 22
	if not jobs:
 23
		return
 25
	while True:
 27
		jobscp = jobs[:]			# are going to alter the jobs queue
 28
		for process in jobscp:
 30
			if process.poll() == None:
 31
				continue
 34
			jobs.remove( process )
 37
			if errorstream:
 38
				errorstream.writelines( process.stderr.readlines() )
 39
				errorstream.flush()
 42
			if donestream is not None and process.returncode == 0:
 43
				donestream.writelines( "\n".join( cmdinput ) + "\n" )
 44
				donestream.flush()
 47
			if len( jobs ) < returnIfLessThan:
 48
				return
 52
		time.sleep( sleeptime )
 55
def killProcess( process ):
 56
	"""Kill the given process
 57
	:note: raises if kill is not supported by the os module"""
 58
	if not hasattr( os, "kill" ):
 59
		raise NotImplementedError( "os module does not support 'kill'ing of processes on your platform" )
 61
	os.kill( process.pid, signal.SIGKILL )
 65
def process( cmd, args, inputList, errorstream = None, donestream = None, inputsPerProcess = 1,
 66
			 numJobs=1):
 67
	"""Launch process at cmd with args and a list of input objects from inputList appended to args
 68
	:param cmd: full path to tool you wish to start, like /bin/bash
 69
	:param args: List of all argument strings to be passed to cmd
 70
	:param inputList: list of input files to be passed as input to cmd
 71
	:param errorstream: stream to which errors will be written to as they occour if not None
 72
	:param donestream: stream to which items from input list will be passed once they
 73
	have been processed if not None. Items are newline terminated
 74
	:param inputsPerProcess: pass the given number of inputs to the cmd, or less if there
 75
	are not enough items on the input list
 76
	:param numJobs: number of processes we may run in parallel
 77
	"""
 79
	jobs = list()
 80
	numInputs = len( inputList )
 81
	for i in range( 0, numInputs, inputsPerProcess ):
 83
		cmdinput = inputList[ i : i + inputsPerProcess ]	# deals with bounds
 84
		callcmd = (cmd,)+tuple(args)+tuple(cmdinput)
 85
		process = subprocess.Popen( callcmd,stderr=subprocess.PIPE, stdin=subprocess.PIPE, env=os.environ )
 87
		jobs.append( process )
 90
		try:
 91
			process.stdin.writelines( '\n'.join( cmdinput ) )
 92
			process.stdin.flush()
 93
			process.stdin.close()
 94
		except IOError:
 95
			pass 	# could be closed already
 99
		if len( jobs ) < numJobs:
100
			continue
103
		if len( jobs ) != numJobs:
104
			raise AssertionError( "invalid job count" )
107
		try:
108
			superviseJobs( jobs, numJobs, cmdinput, errorstream, donestream )
109
		except KeyboardInterrupt:
111
			for process in jobs:
112
				killProcess( process )
113
			jobs = list()
114
			sys.stdout.write("Aborted all running processes - continuing\n")
118
	superviseJobs( jobs, 1, list(), errorstream, donestream )
123
def _usageAndExit( msg = None ):
124
	"""Print usage"""
125
	sys.stdout.write("""python batch.py inputarg [inputarg ...] [-E fileForErrors|-] [-D fileForFinishedOutput|-] [-s numInputsPerProcess] -e cmd [cmdArg ...]
126
-E|D - 	means to use the default stream, either stderr or stdout
127
-I	if specified, arguments will also be read from stdin until it is depleted as
128
	newline separated list of names
129
	Its particlularly important that the pipe to stdin closes once its done as
130
	this command currently does not support streaming of input args
131
-e 	ends the parsing of commandline arguments for the batch process tool
132
	and uses the rest of the commandline as direct input for your command
133
-s	defines how many input arguments will be passed per command invocation
134
-j	the number of processes to keep running in parallel, default 1
136
	The given inputargs will be passed as arguments to the commands or into
137
	the standardinput of the process""")
138
	if msg:
139
		sys.stdout.write(msg+"\n")
141
	sys.exit(1)
144
def _toStream( arg, stream ):
145
	""":return: stream according to arg
146
	:param stream: stream to return if arg sais so """
147
	if arg == "-":
148
		return stream
152
	try:
153
		return open( arg, "w" )
154
	except IOError:
155
		_usageAndExit( "Stream at %s could not be opened for writing" % arg )
158
def _popleftchecked( argv, errmsg ):
159
	"""pop an arg from argv and return with an error message on error"""
160
	try:
161
		return argv.popleft()
162
	except IndexError:
163
		_usageAndExit( errmsg )
166
def main( *args ):
167
	"""Processes the arguments"""
169
	if not args:
170
		_usageAndExit( )
172
	inputList = list()
173
	streams = list( ( None, None ) )
175
	numJobs = 1
176
	inputsPerProcess = 1
177
	cmd = None
178
	cmdargs = list()
179
	haveReadInput = False
184
	argv = deque( args )
185
	while argv:
186
		arg = argv.popleft()
190
		if arg == "-e":
191
			cmd = _popleftchecked( argv, "-e must be followed by the command to execute" )
194
			for rarg in argv:
195
				cmdargs.append( rarg )
198
			break
203
		flagfound = False
204
		for i,(flag,stream) in enumerate( ( ( "-E",sys.stderr ), ( "-D", sys.stdout ) ) ):
205
			if arg == flag:
206
				argval = _popleftchecked( argv, "%s must be followed by - or a filepath" % flag )
207
				streams[ i ] = _toStream( argval, stream )
208
				flagfound = True
209
				break
213
		if flagfound: continue
215
		if arg == "-s":
216
			msg = "-s must be followed by a number > 0"
217
			inputsPerProcess = int( _popleftchecked( argv, msg ) )
218
			flagfound = True
219
			if inputsPerProcess < 1:
220
				_usageAndExit( msg )
223
		if flagfound: continue
224
		if arg == "-j":
225
			msg = "-j must be followed by a number > 0"
226
			numJobs = int( _popleftchecked( argv, msg ) )
227
			flagfound = True
228
			if numJobs < 1:
229
				_usageAndExit( msg )
232
		if flagfound: continue
235
		if arg == "-I":
236
			flagfound = True
237
			if haveReadInput:
238
				_usageAndExit( "-I may only be specified once" )
240
			haveReadInput = True
242
			inputList.extend( ( l.strip() for l in  sys.stdin.readlines() ) )
245
		if flagfound: continue
248
		inputList.append( arg )
253
	if not cmd:
254
		_usageAndExit( "No command to execute - add it after the -e flag" )
258
	process( cmd, cmdargs, inputList, streams[0], streams[1], inputsPerProcess, numJobs )
262
if __name__ == "__main__":
263
	main( *sys.argv[1:] )