2
"""Contains workflow classes that conenct processes in a di - graph """
3
__docformat__ = "restructuredtext"
6
from mrv.dge import Graph, ComputeError
11
log = logging.getLogger('mrv.automation.workflow')
17
class TargetError( ValueError ):
18
"""Thrown if target is now supported by the workflow ( and thus cannot be made )"""
21
class DirtyException( Exception ):
22
"""Exception thrown when system is in dirty query mode and the process detects
25
The exception can also contain a report that will be returned using the
28
:note: Thus exeption class must NOT be derived from ComputeError as it will be caught
29
by the DG engine and mis-interpreted - unknown exceptions will just be passed on by it
32
def __init__( self, report = '' ):
33
Exception.__init__( self ) # cannot use super with exceptions apparently
38
return str( self.report )
41
def makeReport( self ):
43
:return: printable report, usually a string or some object that
44
responds to str() appropriately"""
55
class Workflow( Graph ):
56
"""Implements a workflow as connected processes
58
:note: if you have to access the processes directly, use the DiGraph methods"""
61
class ProcessData( object ):
62
"""Allows to store additional information with each process called during the workflow"""
63
__slots__ = ( 'process','plug','mode', '_result', 'starttime', 'endtime','exception','index' )
64
def __init__( self, process, plug, mode ):
65
self.process = process
68
self._result = None # can be weakref or actual value
69
self.starttime = time.clock()
70
self.endtime = self.starttime
71
self.exception = None # stores exception on error
72
self.index = 0 # index of the child - graph stores nodes unordered
75
out = "%s.%s" % ( self.process, self.plug )
81
""":return: time to process the call"""
82
return self.endtime - self.starttime
84
def setResult( self, result ):
85
"""Set the given result
87
:note: uses weak references as the tracking structure should not cause a possible
88
mutation of the program flow ( as instances stay alive although code expects it to be
90
if result is not None:
92
self._result = weakref.ref( result )
95
# END if result not None
98
""":return: result stored in this instance, or None if it is not present or not alive"""
100
if isinstance( self._result, weakref.ref ):
101
return self._result()
107
class CallGraph( nx.DiGraph ):
108
"""Simple wrapper storing a call graph, keeping the root at which the call started
110
:note: this class is specialized to be used by workflows, its not general for that purpose"""
111
def __init__( self ):
112
super( Workflow.CallGraph, self ).__init__( name="Callgraph" )
113
self._call_stack = []
116
def startCall( self, pdata ):
117
"""Add a call of a process"""
118
# keep the call graph
120
curdata = self._call_stack[ -1 ]
121
pdata.index = self.in_degree( curdata )
122
self.add_edge( pdata, curdata )
124
# its the first call, thus we add it as node - would work on first edge add too though
125
self.add_node( pdata )
128
self._call_stack.append( pdata )
130
def endCall( self, result ):
131
"""End the call start started previously
133
:param result: the result of the call"""
134
lastprocessdata = self._call_stack.pop( )
135
lastprocessdata.endtime = time.clock( )
136
lastprocessdata.setResult( result )
138
def callRoot( self ):
139
""":return: root at which the call started"""
142
def callstackSize( self ):
143
""":return: length of the callstack"""
144
return len( self._call_stack )
146
def toCallList( self, reverse = True, pruneIfTrue = lambda x: False ):
148
:return: flattened version of graph as list of ProcessData edges in call order , having
149
the root as last element of the list
151
:param pruneIfTrue: Function taking ProcessData to return true if the node
152
should be pruned from the result
153
:param reverse: if true, the calllist will be properly reversed ( taking childre into account """
155
def predecessors( node, nextNode, reverse, pruneIfTrue ):
158
# invert the callorder - each predecessor list defines the input calls
159
# a process has made - to properly reporoduce that, the call order needs to be
161
predlist = self.predecessors( node )
162
lenpredlist = len( predlist ) - 1
164
lenpredlist *= -1 # will keep the right, non reversed order
166
predlist = [ ( lenpredlist - p.index, p ) for p in predlist ]
170
pruneThisNode = pruneIfTrue( node )
172
prednextnode = nextNode
174
# enumerate the other way round, as the call list needs to be inverted
175
for i,pred in predlist:
176
out.extend( predecessors( pred, prednextnode, reverse, pruneIfTrue ) )
178
if not pruneThisNode:
179
out.append( ( node, nextNode ) )
182
calllist = predecessors( self.callRoot(), None, reverse, pruneIfTrue )
184
calllist.reverse() # actually brings it in the right order, starting at root
187
#} END utility classes
189
def __init__( self, **kwargs ):
190
"""Initalized base class"""
191
super( Workflow, self ).__init__( **kwargs )
193
self._callgraph = None
200
def copyFrom( self, other ):
201
"""Only mode is required """
202
self._mode = other._mode
203
# shallow copy callgraph
204
self._callgraph = other._callgraph
208
def makeTarget( self, target ):
209
""":param target: target to make - can be class or instance
210
:return: result when producing the target"""
213
pb = process.ProcessBase
214
processmode = globalmode = pb.is_state | pb.target_state
216
shell, result = self._evaluate( target, processmode, globalmode )
219
def makeTargets( self, targetList, errstream=None, donestream=None ):
220
"""batch module compatible method allowing to make mutliple targets at once
222
:param targetList: iterable providing the targets to make
223
:param errstream: object with file interface allowing to log errors that occurred
225
:param donestream: if list, targets successfully done will be appended to it, if
226
it is a stream, the string representation will be wrtten to it"""
227
def to_stream(msg, stream):
228
"""print msg to stream or use the logger instead"""
233
# END errstream handling
236
for target in targetList:
238
self.makeTarget( target )
239
except ComputeError,e:
240
to_stream(str( e ) + "\n", errstream)
243
msg = "--> UNHANDLED EXCEPTION: " + str( e ) + "\n"
244
msg += traceback.format_exc( )
245
to_stream(msg, errstream)
246
if donestream is None:
249
# all clear, put item to done list
250
if hasattr( donestream, "write" ):
251
donestream.write( str( target ) + "\n" )
254
donestream.append( target )
255
# END for each target
259
def _evaluateDirtyState( self, outputplug, processmode ):
260
"""Evaluate the given plug in process mode and return a dirty report tuple
261
as used by `makeDirtyReport`"""
262
report = list( ( outputplug, None ) )
264
outputplug.clearCache( clear_affected = False ) # assure it eavaluates
265
outputplug.get( processmode ) # trigger computation, might raise
266
except DirtyException, e:
267
report[ 1 ] = e # remember report as well
269
# Renember normal errors , put them into a dirty report, as well as stacktrace
270
excformat = traceback.format_exc( )
271
report[ 1 ] = DirtyException( report = ''.join( excformat ) )
273
return tuple( report )
276
def makeDirtyReport( self, target, mode = "single" ):
278
:return: list of tuple( shell, DirtyReport|None )
279
If a process ( shell.node ) is dirty, a dirty report will be given explaining
280
why the process is dirty and needs an update
281
:param target: target you which to check for it's dirty state
283
* single - only the process assigned to evaluate target will be checked
284
* multi - as single, but the whole callgraph will be checked, starting
285
at the first node, stepping down the callgraph. This gives a per
287
* deep - try to evaluate target, but fail if one process in the target's
288
call history is dirty
291
pb = process.ProcessBase
292
processmode = pb.is_state | pb.dirty_check
295
# lets make the mode clear
297
globalmode = processmode # input processes may apply dirty check ( and fail )
298
elif mode in ( "single", "multi" ):
299
globalmode = pb.is_state # input process should just return the current state, no checking
301
raise AssertionError( "invalid mode: %s" % mode )
306
######################
307
outputplug = self._setupProcess( target, globalmode )
308
outreports.append( self._evaluateDirtyState( outputplug, processmode ) )
311
# STEP THE CALLGRAPH ?
313
# walk the callgraph and get dirty reports from each node
314
calllist = self._callgraph.toCallList( reverse = False )
315
for s_pdata,d_pdata in calllist:
316
if d_pdata is None: # the root node ( we already called it ) has no destination
319
outputshell = d_pdata.process.toShell( d_pdata.plug )
320
outreports.append( self._evaluateDirtyState( outputshell, processmode ) )
321
# END for each s_pdata, d_pdata
322
# END if multi handling
326
def _clearState( self, global_evaluation_mode ):
327
"""Clear the previous state and re-initialize this instance getting ready
330
:param global_evaluation_mode: evaluation mode to be used"""
331
self._callgraph = Workflow.CallGraph( )
332
self._mode = global_evaluation_mode
335
def _setupProcess( self, target, globalmode ):
336
"""Setup the workflow's dg such that the returned output shell can be queried
339
:param globalmode: mode with which all other processes will be handling
342
# find suitable process
343
inputshell = self.targetRating( target )[1]
344
if inputshell is None:
345
raise TargetError( "Cannot handle target %r" % target )
347
# clear previous callgraph
348
self._clearState( globalmode )
350
# prepare all processes
351
for node in self.iterNodes( ):
352
node.prepareProcess( )
353
# END reset dg handling
356
# OUTPUT SHELL HANDLING
357
#########################
358
# Find a shell that we can query to trigger the graph to evaluate
359
# get output plug that can be queried to get the target - follow the flow
360
# of the graph downstream and get the first compatible plug that would
361
# return the same type that we put in
362
# NOTE: requires an unconnected output plug by convention !
363
these = lambda shell: not shell.plug.providesOutput() or shell.outputs( )
364
allAffectedNodes = ( shell.node for shell in inputshell.iterShells( direction = "down", visit_once = 1, prune = these ) )
368
# use last compatible node in the chain -
369
for node in allAffectedNodes:
371
shell = node.targetRating( target, check_input_plugs = False, raise_on_ambiguity = 0 )[1] # 1 == plug
372
except TypeError,e: # ambiguous outputs
375
# END handle exceptions
379
# so here it comes - take this break away, and workflow might not work anymore
380
# perhaps we should add a flag to define whether workflows should keep on looking
381
# or not - if not, one has to carefully plane the output plugs ...
383
# END for each affected node try to get a valid shell
385
# AFFECTED PLUGS HANDLING
387
# try to use just the affected ones - that would be the best we have
388
outplugs = inputshell.plug.affected()
391
raise TargetError( "Plug %r takes target %r as input, but does not affect an output plug that would take the same target type" % ( str( inputshell ), target ) )
393
is_valid_shell = lambda shell: not these( shell ) and shell.plug.attr.compatabilityRate( target )
394
for plug in outplugs:
395
shell = inputshell.node.toShell( plug )
397
if is_valid_shell( shell ):
398
log.warning("Did not find output plug delivering our target type - fallback to simple affected checks on node")
401
# END valid shell check
402
# END for each plug in node output plugs
403
# END try to get output shell by checking affects of current node
407
raise TypeError( "Target %s cannot be handled by this workflow (%s) as a computable output for %s cannot be found" % ( target, self, str( inputshell ) ) )
409
# we do not care about ambiguity, simply pull one
410
# QUESTION: should we warn about multiple affected plugs ?
411
inputshell.set( target, ignore_connection = True )
415
def _evaluate( self, target, processmode, globalmode ):
416
"""Make or update the target using a process in our workflow
418
:param processmode: the mode with which to call the initial process
419
:return: tuple( shell, result ) - plugshell queried to get the result
421
outputshell = self._setupProcess( target, globalmode )
422
######################################################
423
result = outputshell.get( processmode )
424
######################################################
426
if len( self._callgraph._call_stack ):
427
raise AssertionError( "Callstack was not empty after calculations for %r where done" % target )
429
return ( outputshell, result )
432
def createReportInstance( self, reportType ):
433
"""Create a report instance that describes how the previous target was made
435
:param reportType: Report to populate with information - it must be a Plan based
436
class that can be instantiated and populated with call information.
437
A report analyses the call dependency graph generated during dg evaluation
439
:return: report instance whose makeReport method can be called to retrieve it"""
440
# make the target as dry run
441
return reportType( self._callgraph )
443
#} END main interface
448
def targetSupportList( self ):
449
""":return: list of all supported target type
450
:note: this method is for informational purposes only"""
452
for node in self.iterNodes():
454
uniqueout.update( set( node.supportedTargetTypes() ) )
456
raise AssertionError( "Process %r failed when calling supportedTargetTypes" % p, e )
457
# END for each p in nodes iter
458
return list( uniqueout )
461
def targetRating( self, target ):
463
:return: int range(0,255) indicating how well a target can be made
464
0 means not at all, 255 means perfect.
466
Return value is tuple ( rate, PlugShell ), containing the process and plug with the
467
highest rating or None if rate is 0
469
Walk the dependency graph such that leaf nodes have higher ratings than
471
:note: you can use the `process.ProcessBase` enumeration for comparison"""
475
for node in self.iterNodes( ):
477
rate, shell = node.targetRating( target )
479
# could be that there is a node having ambigous plugs, but we are not
480
# going to take it anyway
482
# END try-except TypeError
487
# is leaf node ? ( no output connections )
488
if not node.connections( 0, 1 ):
489
rate = rate * 2 # prefer leafs in the rating
491
rescache.append( ( rate, shell ) )
492
# END for each process
494
rescache.sort() # last is most suitable
495
if not rescache or rescache[-1][0] == 0:
498
bestpick = rescache[-1]
500
# check if we have several best picks - raise if so
501
allbestpicks = [ pick for pick in rescache if pick[0] == bestpick[0] ]
502
if len( allbestpicks ) > 1:
503
raise AssertionError( "There should only be one suitable process for %r, found %i (%s)" % ( target, len( allbestpicks ), allbestpicks ) )
507
# recompute rate as we might have changed it
508
return shell.node.targetRating( target )
510
def callgraph( self ):
511
""":return: current callgraph instance
512
:note: its strictly read-only and may not be changed"""
513
return self._callgraph
517
#{ Internal Process Interface
519
def _isDryRun( self ):
520
""":return: True if the current computation is a dry run"""
523
def _trackOutputQueryStart( self, process, plug, mode ):
524
"""Called by process base to indicate the start of a call of curProcess to targetProcess
525
This method tracks the actual call path taken through the graph ( which is dependent on the
526
dirty state of the prcoesses, allowing to walk it depth first to resolve the calls.
527
This also allows to create precise reports telling how to achieve a certain goal"""
528
pdata = Workflow.ProcessData( process, plug, mode )
530
# keep the call graph
531
self._callgraph.startCall( pdata )
532
return pdata # return so that decorators can use this information
534
def _trackOutputQueryEnd( self, result = None ):
535
"""Track that the process just finished its computation - thus the previously active process
536
should be on top of the stack again"""
537
# update last data and its call time
538
self._callgraph.endCall( result )
542
def _populateFromGraph( self, graph ):
543
"""Parse the networkx graph and populate ourselves with the respective process
544
instances as described by the graph
546
:param graph: networkx graph whose nodes are process names to be found in the processes
548
raise NotImplementedError( "TODO" )