1
2 """Contains workflow classes that conenct processes in a di - graph """
3 __docformat__ = "restructuredtext"
4
5 import networkx as nx
6 from mrv.dge import Graph, ComputeError
7 import time
8 import weakref
9 import traceback
10 import logging
11 log = logging.getLogger('mrv.automation.workflow')
12
13
14
15
16
18 """Thrown if target is now supported by the workflow ( and thus cannot be made )"""
19
20
22 """Exception thrown when system is in dirty query mode and the process detects
23 that it is dirty.
24
25 The exception can also contain a report that will be returned using the
26 makeReport function.
27
28 :note: Thus exeption class must NOT be derived from ComputeError as it will be caught
29 by the DG engine and mis-interpreted - unknown exceptions will just be passed on by it
30 """
31 __slots__ = "report"
35
36
39
40
42 """
43 :return: printable report, usually a string or some object that
44 responds to str() appropriately"""
45 return self.report
46
47
48
49
50
51
52
53
54
56 """Implements a workflow as connected processes
57
58 :note: if you have to access the processes directly, use the DiGraph methods"""
59
60
62 """Allows to store additional information with each process called during the workflow"""
63 __slots__ = ( 'process','plug','mode', '_result', 'starttime', 'endtime','exception','index' )
64 - def __init__( self, process, plug, mode ):
73
75 out = "%s.%s" % ( self.process, self.plug )
76 if self.exception:
77 out += "&ERROR"
78 return out
79
81 """:return: time to process the call"""
82 return self.endtime - self.starttime
83
85 """Set the given result
86
87 :note: uses weak references as the tracking structure should not cause a possible
88 mutation of the program flow ( as instances stay alive although code expects it to be
89 deleted"""
90 if result is not None:
91 try:
92 self._result = weakref.ref( result )
93 except TypeError:
94 self._result = result
95
96
98 """:return: result stored in this instance, or None if it is not present or not alive"""
99 if self._result:
100 if isinstance( self._result, weakref.ref ):
101 return self._result()
102 return self._result
103
104 return None
105
106
108 """Simple wrapper storing a call graph, keeping the root at which the call started
109
110 :note: this class is specialized to be used by workflows, its not general for that purpose"""
115
117 """Add a call of a process"""
118
119 if self._call_stack:
120 curdata = self._call_stack[ -1 ]
121 pdata.index = self.in_degree( curdata )
122 self.add_edge( pdata, curdata )
123 else:
124
125 self.add_node( pdata )
126 self._root = pdata
127
128 self._call_stack.append( pdata )
129
131 """End the call start started previously
132
133 :param result: the result of the call"""
134 lastprocessdata = self._call_stack.pop( )
135 lastprocessdata.endtime = time.clock( )
136 lastprocessdata.setResult( result )
137
139 """:return: root at which the call started"""
140 return self._root
141
143 """:return: length of the callstack"""
144 return len( self._call_stack )
145
146 - def toCallList( self, reverse = True, pruneIfTrue = lambda x: False ):
147 """
148 :return: flattened version of graph as list of ProcessData edges in call order , having
149 the root as last element of the list
150
151 :param pruneIfTrue: Function taking ProcessData to return true if the node
152 should be pruned from the result
153 :param reverse: if true, the calllist will be properly reversed ( taking childre into account """
154
155 def predecessors( node, nextNode, reverse, pruneIfTrue ):
156 out = []
157
158
159
160
161 predlist = self.predecessors( node )
162 lenpredlist = len( predlist ) - 1
163 if not reverse:
164 lenpredlist *= -1
165
166 predlist = [ ( lenpredlist - p.index, p ) for p in predlist ]
167 predlist.sort()
168
169 prednextnode = node
170 pruneThisNode = pruneIfTrue( node )
171 if pruneThisNode:
172 prednextnode = nextNode
173
174
175 for i,pred in predlist:
176 out.extend( predecessors( pred, prednextnode, reverse, pruneIfTrue ) )
177
178 if not pruneThisNode:
179 out.append( ( node, nextNode ) )
180 return out
181
182 calllist = predecessors( self.callRoot(), None, reverse, pruneIfTrue )
183 if not reverse:
184 calllist.reverse()
185 return calllist
186
187
188
190 """Initalized base class"""
191 super( Workflow, self ).__init__( **kwargs )
192
193 self._callgraph = None
194 self._mode = False
195
196
199
201 """Only mode is required """
202 self._mode = other._mode
203
204 self._callgraph = other._callgraph
205
206
207
218
219 - def makeTargets( self, targetList, errstream=None, donestream=None ):
220 """batch module compatible method allowing to make mutliple targets at once
221
222 :param targetList: iterable providing the targets to make
223 :param errstream: object with file interface allowing to log errors that occurred
224 during operation
225 :param donestream: if list, targets successfully done will be appended to it, if
226 it is a stream, the string representation will be wrtten to it"""
227 def to_stream(msg, stream):
228 """print msg to stream or use the logger instead"""
229 if stream:
230 stream.write( msg )
231 else:
232 log.info(msg)
233
234
235
236 for target in targetList:
237 try:
238 self.makeTarget( target )
239 except ComputeError,e:
240 to_stream(str( e ) + "\n", errstream)
241 except Exception, e:
242
243 msg = "--> UNHANDLED EXCEPTION: " + str( e ) + "\n"
244 msg += traceback.format_exc( )
245 to_stream(msg, errstream)
246 if donestream is None:
247 continue
248
249
250 if hasattr( donestream, "write" ):
251 donestream.write( str( target ) + "\n" )
252 else:
253
254 donestream.append( target )
255
256
257
258
260 """Evaluate the given plug in process mode and return a dirty report tuple
261 as used by `makeDirtyReport`"""
262 report = list( ( outputplug, None ) )
263 try:
264 outputplug.clearCache( clear_affected = False )
265 outputplug.get( processmode )
266 except DirtyException, e:
267 report[ 1 ] = e
268 except Exception:
269
270 excformat = traceback.format_exc( )
271 report[ 1 ] = DirtyException( report = ''.join( excformat ) )
272
273 return tuple( report )
274
275
277 """
278 :return: list of tuple( shell, DirtyReport|None )
279 If a process ( shell.node ) is dirty, a dirty report will be given explaining
280 why the process is dirty and needs an update
281 :param target: target you which to check for it's dirty state
282 :param mode:
283 * single - only the process assigned to evaluate target will be checked
284 * multi - as single, but the whole callgraph will be checked, starting
285 at the first node, stepping down the callgraph. This gives a per
286 node dirty report.
287 * deep - try to evaluate target, but fail if one process in the target's
288 call history is dirty
289 """
290 import process
291 pb = process.ProcessBase
292 processmode = pb.is_state | pb.dirty_check
293 globalmode = None
294
295
296 if mode == "deep" :
297 globalmode = processmode
298 elif mode in ( "single", "multi" ):
299 globalmode = pb.is_state
300 else:
301 raise AssertionError( "invalid mode: %s" % mode )
302
303 outreports = []
304
305
306
307 outputplug = self._setupProcess( target, globalmode )
308 outreports.append( self._evaluateDirtyState( outputplug, processmode ) )
309
310
311
312 if mode == "multi":
313
314 calllist = self._callgraph.toCallList( reverse = False )
315 for s_pdata,d_pdata in calllist:
316 if d_pdata is None:
317 continue
318
319 outputshell = d_pdata.process.toShell( d_pdata.plug )
320 outreports.append( self._evaluateDirtyState( outputshell, processmode ) )
321
322
323 return outreports
324
325
327 """Clear the previous state and re-initialize this instance getting ready
328 for a new instance
329
330 :param global_evaluation_mode: evaluation mode to be used"""
331 self._callgraph = Workflow.CallGraph( )
332 self._mode = global_evaluation_mode
333
334
336 """Setup the workflow's dg such that the returned output shell can be queried
337 to evaluate target
338
339 :param globalmode: mode with which all other processes will be handling
340 their input calls
341 """
342
343 inputshell = self.targetRating( target )[1]
344 if inputshell is None:
345 raise TargetError( "Cannot handle target %r" % target )
346
347
348 self._clearState( globalmode )
349
350
351 for node in self.iterNodes( ):
352 node.prepareProcess( )
353
354
355
356
357
358
359
360
361
362
363 these = lambda shell: not shell.plug.providesOutput() or shell.outputs( )
364 allAffectedNodes = ( shell.node for shell in inputshell.iterShells( direction = "down", visit_once = 1, prune = these ) )
365 outputshell = None
366
367
368
369 for node in allAffectedNodes:
370 try:
371 shell = node.targetRating( target, check_input_plugs = False, raise_on_ambiguity = 0 )[1]
372 except TypeError,e:
373 log.error(str( e ))
374 continue
375
376
377 if shell:
378 outputshell = shell
379
380
381
382 break
383
384
385
386 if not outputshell:
387
388 outplugs = inputshell.plug.affected()
389
390 if not outplugs:
391 raise TargetError( "Plug %r takes target %r as input, but does not affect an output plug that would take the same target type" % ( str( inputshell ), target ) )
392
393 is_valid_shell = lambda shell: not these( shell ) and shell.plug.attr.compatabilityRate( target )
394 for plug in outplugs:
395 shell = inputshell.node.toShell( plug )
396
397 if is_valid_shell( shell ):
398 log.warning("Did not find output plug delivering our target type - fallback to simple affected checks on node")
399 outputshell = shell
400 break
401
402
403
404
405
406 if not outputshell:
407 raise TypeError( "Target %s cannot be handled by this workflow (%s) as a computable output for %s cannot be found" % ( target, self, str( inputshell ) ) )
408
409
410
411 inputshell.set( target, ignore_connection = True )
412 return outputshell
413
414
415 - def _evaluate( self, target, processmode, globalmode ):
416 """Make or update the target using a process in our workflow
417
418 :param processmode: the mode with which to call the initial process
419 :return: tuple( shell, result ) - plugshell queried to get the result
420 """
421 outputshell = self._setupProcess( target, globalmode )
422
423 result = outputshell.get( processmode )
424
425
426 if len( self._callgraph._call_stack ):
427 raise AssertionError( "Callstack was not empty after calculations for %r where done" % target )
428
429 return ( outputshell, result )
430
431
433 """Create a report instance that describes how the previous target was made
434
435 :param reportType: Report to populate with information - it must be a Plan based
436 class that can be instantiated and populated with call information.
437 A report analyses the call dependency graph generated during dg evaluation
438 and presents it.
439 :return: report instance whose makeReport method can be called to retrieve it"""
440
441 return reportType( self._callgraph )
442
443
444
445
446
447
449 """:return: list of all supported target type
450 :note: this method is for informational purposes only"""
451 uniqueout = set()
452 for node in self.iterNodes():
453 try:
454 uniqueout.update( set( node.supportedTargetTypes() ) )
455 except Exception, e:
456 raise AssertionError( "Process %r failed when calling supportedTargetTypes" % p, e )
457
458 return list( uniqueout )
459
460
462 """
463 :return: int range(0,255) indicating how well a target can be made
464 0 means not at all, 255 means perfect.
465
466 Return value is tuple ( rate, PlugShell ), containing the process and plug with the
467 highest rating or None if rate is 0
468
469 Walk the dependency graph such that leaf nodes have higher ratings than
470 non-leaf nodes
471 :note: you can use the `process.ProcessBase` enumeration for comparison"""
472 rescache = list()
473 best_process = None
474
475 for node in self.iterNodes( ):
476 try:
477 rate, shell = node.targetRating( target )
478 except TypeError,e:
479
480
481 continue
482
483
484 if not rate:
485 continue
486
487
488 if not node.connections( 0, 1 ):
489 rate = rate * 2
490
491 rescache.append( ( rate, shell ) )
492
493
494 rescache.sort()
495 if not rescache or rescache[-1][0] == 0:
496 return ( 0, None )
497
498 bestpick = rescache[-1]
499
500
501 allbestpicks = [ pick for pick in rescache if pick[0] == bestpick[0] ]
502 if len( allbestpicks ) > 1:
503 raise AssertionError( "There should only be one suitable process for %r, found %i (%s)" % ( target, len( allbestpicks ), allbestpicks ) )
504
505
506 shell = bestpick[1]
507
508 return shell.node.targetRating( target )
509
511 """:return: current callgraph instance
512 :note: its strictly read-only and may not be changed"""
513 return self._callgraph
514
515
516
517
518
520 """:return: True if the current computation is a dry run"""
521 return self._mode
522
524 """Called by process base to indicate the start of a call of curProcess to targetProcess
525 This method tracks the actual call path taken through the graph ( which is dependent on the
526 dirty state of the prcoesses, allowing to walk it depth first to resolve the calls.
527 This also allows to create precise reports telling how to achieve a certain goal"""
528 pdata = Workflow.ProcessData( process, plug, mode )
529
530
531 self._callgraph.startCall( pdata )
532 return pdata
533
535 """Track that the process just finished its computation - thus the previously active process
536 should be on top of the stack again"""
537
538 self._callgraph.endCall( result )
539
540
541
543 """Parse the networkx graph and populate ourselves with the respective process
544 instances as described by the graph
545
546 :param graph: networkx graph whose nodes are process names to be found in the processes
547 module """
548 raise NotImplementedError( "TODO" )
549