mrv.automation.workflow
Covered: 289 lines
Missed: 61 lines
Skipped 200 lines
Percent: 82 %
  2
"""Contains workflow classes that conenct processes in a di - graph """
  3
__docformat__ = "restructuredtext"
  5
import networkx as nx
  6
from mrv.dge import Graph, ComputeError
  7
import time
  8
import weakref
  9
import traceback
 10
import logging
 11
log = logging.getLogger('mrv.automation.workflow')
 17
class TargetError( ValueError ):
 18
	"""Thrown if target is now supported by the workflow ( and thus cannot be made )"""
 21
class DirtyException( Exception ):
 22
	"""Exception thrown when system is in dirty query mode and the process detects
 23
	that it is dirty.
 25
	The exception can also contain a report that will be returned using the
 26
	makeReport function.
 28
	:note: Thus exeption class must NOT be derived from ComputeError as it will be caught
 29
		by the DG engine and mis-interpreted - unknown exceptions will just be passed on by it
 30
	"""
 31
	__slots__ = "report"
 32
	def __init__( self, report = '' ):
 33
		Exception.__init__( self )	# cannot use super with exceptions apparently
 34
		self.report = report
 37
	def __str__( self ):
 38
		return str( self.report )
 41
	def makeReport( self ):
 42
		"""
 43
		:return: printable report, usually a string or some object that
 44
			responds to str() appropriately"""
 45
		return self.report
 55
class Workflow( Graph ):
 56
	"""Implements a workflow as connected processes
 58
	:note: if you have to access the processes directly, use the DiGraph methods"""
 61
	class ProcessData( object ):
 62
		"""Allows to store additional information with each process called during the workflow"""
 63
		__slots__ = ( 'process','plug','mode', '_result', 'starttime', 'endtime','exception','index' )
 64
		def __init__( self, process, plug, mode ):
 65
			self.process = process
 66
			self.plug = plug
 67
			self.mode = mode
 68
			self._result = None					# can be weakref or actual value
 69
			self.starttime = time.clock()
 70
			self.endtime = self.starttime
 71
			self.exception = None				# stores exception on error
 72
			self.index = 0						# index of the child - graph stores nodes unordered
 74
		def __repr__( self ):
 75
			out = "%s.%s" % ( self.process, self.plug )
 76
			if self.exception:
 77
				out += "&ERROR"
 78
			return out
 80
		def elapsed( ):
 81
			""":return: time to process the call"""
 82
			return self.endtime - self.starttime
 84
		def setResult( self, result ):
 85
			"""Set the given result
 87
			:note: uses weak references as the tracking structure should not cause a possible
 88
				mutation of the program flow ( as instances stay alive although code expects it to be
 89
				deleted"""
 90
			if result is not None:
 91
				try:
 92
					self._result = weakref.ref( result )
 93
				except TypeError:
 94
					self._result = result
 97
		def result( self ):
 98
			""":return: result stored in this instance, or None if it is not present or not alive"""
 99
			if self._result:
100
				if isinstance( self._result, weakref.ref ):
101
					return self._result()
102
				return self._result
104
			return None
107
	class CallGraph( nx.DiGraph ):
108
		"""Simple wrapper storing a call graph, keeping the root at which the call started
110
		:note: this class is specialized to be used by workflows, its not general for that purpose"""
111
		def __init__( self ):
112
			super( Workflow.CallGraph, self ).__init__( name="Callgraph" )
113
			self._call_stack = []
114
			self._root = None
116
		def startCall( self, pdata ):
117
			"""Add a call of a process"""
119
			if self._call_stack:
120
				curdata = self._call_stack[ -1 ]
121
				pdata.index = self.in_degree( curdata )
122
				self.add_edge( pdata, curdata )
123
			else:
125
				self.add_node( pdata )
126
				self._root = pdata
128
			self._call_stack.append( pdata )
130
		def endCall( self, result ):
131
			"""End the call start started previously
133
			:param result: the result of the call"""
134
			lastprocessdata = self._call_stack.pop( )
135
			lastprocessdata.endtime = time.clock( )
136
			lastprocessdata.setResult( result )
138
		def callRoot( self ):
139
			""":return: root at which the call started"""
140
			return self._root
142
		def callstackSize( self ):
143
			""":return: length of the callstack"""
144
			return len( self._call_stack )
146
		def toCallList( self, reverse = True, pruneIfTrue = lambda x: False ):
147
			"""
148
			:return: flattened version of graph as list of ProcessData edges in call order , having
149
				the root as last element of the list
151
			:param pruneIfTrue: Function taking ProcessData to return true if the node
152
				should be pruned from the result
153
			:param reverse: if true, the calllist will be properly reversed ( taking childre into account """
155
			def predecessors( node, nextNode, reverse, pruneIfTrue ):
156
				out = []
161
				predlist = self.predecessors( node )
162
				lenpredlist = len( predlist ) - 1
163
				if not reverse:
164
					lenpredlist *= -1 	# will keep the right, non reversed order
166
				predlist = [ ( lenpredlist - p.index, p ) for p in predlist ]
167
				predlist.sort()
169
				prednextnode = node
170
				pruneThisNode = pruneIfTrue( node )
171
				if pruneThisNode:
172
					prednextnode = nextNode
175
				for i,pred in predlist:
176
					out.extend( predecessors( pred, prednextnode, reverse, pruneIfTrue ) )
178
				if not pruneThisNode:
179
					out.append( ( node, nextNode ) )
180
				return out
182
			calllist = predecessors( self.callRoot(), None, reverse, pruneIfTrue )
183
			if not reverse:
184
				calllist.reverse() 	# actually brings it in the right order, starting at root
185
			return calllist
189
	def __init__( self, **kwargs ):
190
		"""Initalized base class"""
191
		super( Workflow, self ).__init__( **kwargs )
193
		self._callgraph = None
194
		self._mode = False
197
	def __str__( self ):
198
		return self.name
200
	def copyFrom( self, other ):
201
		"""Only mode is required """
202
		self._mode = other._mode
204
		self._callgraph = other._callgraph
208
	def makeTarget( self, target ):
209
		""":param target: target to make - can be class or instance
210
		:return: result when producing the target"""
212
		import process
213
		pb = process.ProcessBase
214
		processmode = globalmode = pb.is_state | pb.target_state
216
		shell, result = self._evaluate( target, processmode, globalmode )
217
		return result
219
	def makeTargets( self, targetList, errstream=None, donestream=None ):
220
		"""batch module compatible method allowing to make mutliple targets at once
222
		:param targetList: iterable providing the targets to make
223
		:param errstream: object with file interface allowing to log errors that occurred
224
			during operation
225
		:param donestream: if list, targets successfully done will be appended to it, if
226
			it is a stream, the string representation will be wrtten to it"""
227
		def to_stream(msg, stream):
228
			"""print msg to stream or use the logger instead"""
229
			if stream:
230
				stream.write( msg )
231
			else:
232
				log.info(msg)
236
		for target in targetList:
237
			try:
238
				self.makeTarget( target )
239
			except ComputeError,e:
240
				to_stream(str( e ) + "\n", errstream)
241
			except Exception, e:
243
				msg = "--> UNHANDLED EXCEPTION: " + str( e ) + "\n"
244
				msg += traceback.format_exc( )
245
				to_stream(msg, errstream)
246
			if donestream is None:
247
				continue
250
			if hasattr( donestream, "write" ):
251
				donestream.write( str( target ) + "\n" )
252
			else:
254
				donestream.append( target )
259
	def _evaluateDirtyState( self, outputplug, processmode ):
260
		"""Evaluate the given plug in process mode and return a dirty report tuple
261
		as used by `makeDirtyReport`"""
262
		report = list( ( outputplug, None ) )
263
		try:
264
			outputplug.clearCache( clear_affected = False ) 		# assure it eavaluates
265
			outputplug.get( processmode )	# trigger computation, might raise
266
		except DirtyException, e:
267
			report[ 1 ] = e					# remember report as well
268
		except Exception:
270
			excformat = traceback.format_exc( )
271
			report[ 1 ] = DirtyException( report = ''.join( excformat ) )
273
		return tuple( report )
276
	def makeDirtyReport( self, target, mode = "single" ):
277
		"""
278
		:return: list of tuple( shell, DirtyReport|None )
279
			If a process ( shell.node ) is dirty, a dirty report will be given explaining
280
			why the process is dirty and needs an update
281
		:param target: target you which to check for it's dirty state
282
		:param mode:
283
		 	 * single - only the process assigned to evaluate target will be checked
284
			 * multi - as single, but the whole callgraph will be checked, starting
285
						at the first node, stepping down the callgraph. This gives a per
286
						node dirty report.
287
			 * deep - try to evaluate target, but fail if one process in the target's
288
			 	call history is dirty
289
		"""
290
		import process
291
		pb = process.ProcessBase
292
		processmode = pb.is_state | pb.dirty_check
293
		globalmode = None
296
		if mode == "deep" :
297
			globalmode = processmode		# input processes may apply dirty check ( and fail )
298
		elif mode in ( "single", "multi" ):
299
			globalmode = pb.is_state		# input process should just return the current state, no checking
300
		else:
301
			raise AssertionError( "invalid mode: %s" % mode )
303
		outreports = []
307
		outputplug = self._setupProcess( target, globalmode )
308
		outreports.append( self._evaluateDirtyState( outputplug, processmode ) )
312
		if mode == "multi":
314
			calllist = self._callgraph.toCallList( reverse = False )
315
			for s_pdata,d_pdata in calllist:
316
				if d_pdata is None:					# the root node ( we already called it ) has no destination
317
					continue
319
				outputshell = d_pdata.process.toShell( d_pdata.plug )
320
				outreports.append( self._evaluateDirtyState( outputshell, processmode ) )
323
		return outreports
326
	def _clearState( self, global_evaluation_mode ):
327
		"""Clear the previous state and re-initialize this instance getting ready
328
		for a new instance
330
		:param global_evaluation_mode: evaluation mode to be used"""
331
		self._callgraph = Workflow.CallGraph( )
332
		self._mode = global_evaluation_mode
335
	def _setupProcess( self, target, globalmode ):
336
		"""Setup the workflow's dg such that the returned output shell can be queried
337
		to evaluate target
339
		:param globalmode: mode with which all other processes will be handling
340
			their input calls
341
		"""
343
		inputshell = self.targetRating( target )[1]
344
		if inputshell is None:
345
			raise TargetError( "Cannot handle target %r" % target )
348
		self._clearState( globalmode )
351
		for node in self.iterNodes( ):
352
			node.prepareProcess( )
363
		these = lambda shell: not shell.plug.providesOutput() or shell.outputs( )
364
		allAffectedNodes = ( shell.node for shell in inputshell.iterShells( direction = "down", visit_once = 1, prune = these ) )
365
		outputshell = None
369
		for node in allAffectedNodes:
370
			try:
371
				shell = node.targetRating( target, check_input_plugs = False, raise_on_ambiguity = 0 )[1] # 1 == plug
372
			except TypeError,e:		# ambiguous outputs
373
				log.error(str( e ))
374
				continue
377
			if shell:
378
				outputshell = shell
382
				break
386
		if not outputshell:
388
			outplugs = inputshell.plug.affected()
390
			if not outplugs:
391
				raise TargetError( "Plug %r takes target %r as input, but does not affect an output plug that would take the same target type" % ( str( inputshell ), target ) )
393
			is_valid_shell = lambda shell: not these( shell ) and shell.plug.attr.compatabilityRate( target )
394
			for plug in outplugs:
395
				shell = inputshell.node.toShell( plug )
397
				if is_valid_shell( shell ):
398
					log.warning("Did not find output plug delivering our target type - fallback to simple affected checks on node")
399
					outputshell = shell
400
					break
406
		if not outputshell:
407
			raise TypeError( "Target %s cannot be handled by this workflow (%s) as a computable output for %s cannot be found" % ( target, self, str( inputshell ) ) )
411
		inputshell.set( target, ignore_connection = True )
412
		return outputshell
415
	def _evaluate( self, target, processmode, globalmode ):
416
		"""Make or update the target using a process in our workflow
418
		:param processmode: the mode with which to call the initial process
419
		:return: tuple( shell, result ) - plugshell queried to get the result
420
		"""
421
		outputshell = self._setupProcess( target, globalmode )
423
		result = outputshell.get( processmode )
426
		if len( self._callgraph._call_stack ):
427
			raise AssertionError( "Callstack was not empty after calculations for %r where done" % target )
429
		return ( outputshell, result )
432
	def createReportInstance( self, reportType ):
433
		"""Create a report instance that describes how the previous target was made
435
		:param reportType: Report to populate with information - it must be a Plan based
436
			class that can be instantiated and populated with call information.
437
			A report analyses the call dependency graph generated during dg evaluation
438
			and presents it.
439
		:return: report instance whose makeReport method can be called to retrieve it"""
441
		return reportType( self._callgraph )
448
	def targetSupportList( self ):
449
		""":return: list of all supported target type
450
		:note: this method is for informational purposes only"""
451
		uniqueout = set()
452
		for node in self.iterNodes():
453
			try:
454
				uniqueout.update( set( node.supportedTargetTypes() ) )
455
			except Exception, e:
456
				raise AssertionError( "Process %r failed when calling supportedTargetTypes" % p, e )
458
		return list( uniqueout )
461
	def targetRating( self, target ):
462
		"""
463
		:return: int range(0,255) indicating how well a target can be made
464
			0 means not at all, 255 means perfect.
466
			Return value is tuple ( rate, PlugShell ), containing the process and plug with the
467
			highest rating or None if rate is 0
469
			Walk the dependency graph such that leaf nodes have higher ratings than
470
			non-leaf nodes
471
		:note: you can use the `process.ProcessBase` enumeration for comparison"""
472
		rescache = list()
473
		best_process = None
475
		for node in self.iterNodes( ):
476
			try:
477
				rate, shell = node.targetRating( target )
478
			except TypeError,e:
481
				continue
484
			if not rate:
485
				continue
488
			if not node.connections( 0, 1 ):
489
				rate = rate * 2									# prefer leafs in the rating
491
			rescache.append( ( rate, shell ) )
494
		rescache.sort()							# last is most suitable
495
		if not rescache or rescache[-1][0] == 0:
496
			return ( 0, None )
498
		bestpick = rescache[-1]
501
		allbestpicks = [ pick for pick in rescache if pick[0] == bestpick[0] ]
502
		if len( allbestpicks ) > 1:
503
			raise AssertionError( "There should only be one suitable process for %r, found %i (%s)" % ( target, len( allbestpicks ), allbestpicks ) )
506
		shell = bestpick[1]
508
		return shell.node.targetRating( target )
510
	def callgraph( self ):
511
		""":return: current callgraph instance
512
		:note: its strictly read-only and may not be changed"""
513
		return self._callgraph
519
	def _isDryRun( self ):
520
		""":return: True if the current computation is a dry run"""
521
		return self._mode
523
	def _trackOutputQueryStart( self, process, plug, mode ):
524
		"""Called by process base to indicate the start of a call of curProcess to targetProcess
525
		This method tracks the actual call path taken through the graph ( which is dependent on the
526
		dirty state of the prcoesses, allowing to walk it depth first to resolve the calls.
527
		This also allows to create precise reports telling how to achieve a certain goal"""
528
		pdata = Workflow.ProcessData( process, plug, mode )
531
		self._callgraph.startCall( pdata )
532
		return pdata			# return so that decorators can use this information
534
	def _trackOutputQueryEnd( self, result = None ):
535
		"""Track that the process just finished its computation - thus the previously active process
536
		should be on top of the stack again"""
538
		self._callgraph.endCall( result )
542
	def _populateFromGraph( self, graph ):
543
		"""Parse the networkx graph and populate ourselves with the respective process
544
		instances as described by the graph
546
		:param graph: networkx graph whose nodes are process names to be found in the processes
547
			module """
548
		raise NotImplementedError( "TODO" )