Package mrv :: Package automation :: Module workflow
[hide private]
[frames] | no frames]

Source Code for Module mrv.automation.workflow

  1  # -*- coding: utf-8 -*- 
  2  """Contains workflow classes that conenct processes in a di - graph """ 
  3  __docformat__ = "restructuredtext" 
  4   
  5  import networkx as nx 
  6  from mrv.dge import Graph, ComputeError 
  7  import time 
  8  import weakref 
  9  import traceback 
 10  import logging 
 11  log = logging.getLogger('mrv.automation.workflow') 
 12   
 13  ##################### 
 14  ## EXCEPTIONS ###### 
 15  ################### 
 16  #{ Exceptions 
17 -class TargetError( ValueError ):
18 """Thrown if target is now supported by the workflow ( and thus cannot be made )"""
19 20
21 -class DirtyException( Exception ):
22 """Exception thrown when system is in dirty query mode and the process detects 23 that it is dirty. 24 25 The exception can also contain a report that will be returned using the 26 makeReport function. 27 28 :note: Thus exeption class must NOT be derived from ComputeError as it will be caught 29 by the DG engine and mis-interpreted - unknown exceptions will just be passed on by it 30 """ 31 __slots__ = "report"
32 - def __init__( self, report = '' ):
33 Exception.__init__( self ) # cannot use super with exceptions apparently 34 self.report = report
35 36
37 - def __str__( self ):
38 return str( self.report )
39 #{ Interface 40
41 - def makeReport( self ):
42 """ 43 :return: printable report, usually a string or some object that 44 responds to str() appropriately""" 45 return self.report
46 47 #} END interface 48 49 #} END exceptions 50 51 ##################### 52 ## CLASSES ###### 53 ################### 54
55 -class Workflow( Graph ):
56 """Implements a workflow as connected processes 57 58 :note: if you have to access the processes directly, use the DiGraph methods""" 59 60 #{ Utility Classes
61 - class ProcessData( object ):
62 """Allows to store additional information with each process called during the workflow""" 63 __slots__ = ( 'process','plug','mode', '_result', 'starttime', 'endtime','exception','index' )
64 - def __init__( self, process, plug, mode ):
65 self.process = process 66 self.plug = plug 67 self.mode = mode 68 self._result = None # can be weakref or actual value 69 self.starttime = time.clock() 70 self.endtime = self.starttime 71 self.exception = None # stores exception on error 72 self.index = 0 # index of the child - graph stores nodes unordered
73
74 - def __repr__( self ):
75 out = "%s.%s" % ( self.process, self.plug ) 76 if self.exception: 77 out += "&ERROR" 78 return out
79
80 - def elapsed( ):
81 """:return: time to process the call""" 82 return self.endtime - self.starttime
83
84 - def setResult( self, result ):
85 """Set the given result 86 87 :note: uses weak references as the tracking structure should not cause a possible 88 mutation of the program flow ( as instances stay alive although code expects it to be 89 deleted""" 90 if result is not None: 91 try: 92 self._result = weakref.ref( result ) 93 except TypeError: 94 self._result = result
95 # END if result not None 96
97 - def result( self ):
98 """:return: result stored in this instance, or None if it is not present or not alive""" 99 if self._result: 100 if isinstance( self._result, weakref.ref ): 101 return self._result() 102 return self._result 103 104 return None
105 106
107 - class CallGraph( nx.DiGraph ):
108 """Simple wrapper storing a call graph, keeping the root at which the call started 109 110 :note: this class is specialized to be used by workflows, its not general for that purpose"""
111 - def __init__( self ):
112 super( Workflow.CallGraph, self ).__init__( name="Callgraph" ) 113 self._call_stack = [] 114 self._root = None
115
116 - def startCall( self, pdata ):
117 """Add a call of a process""" 118 # keep the call graph 119 if self._call_stack: 120 curdata = self._call_stack[ -1 ] 121 pdata.index = self.in_degree( curdata ) 122 self.add_edge( pdata, curdata ) 123 else: 124 # its the first call, thus we add it as node - would work on first edge add too though 125 self.add_node( pdata ) 126 self._root = pdata 127 128 self._call_stack.append( pdata )
129
130 - def endCall( self, result ):
131 """End the call start started previously 132 133 :param result: the result of the call""" 134 lastprocessdata = self._call_stack.pop( ) 135 lastprocessdata.endtime = time.clock( ) 136 lastprocessdata.setResult( result )
137
138 - def callRoot( self ):
139 """:return: root at which the call started""" 140 return self._root
141
142 - def callstackSize( self ):
143 """:return: length of the callstack""" 144 return len( self._call_stack )
145
146 - def toCallList( self, reverse = True, pruneIfTrue = lambda x: False ):
147 """ 148 :return: flattened version of graph as list of ProcessData edges in call order , having 149 the root as last element of the list 150 151 :param pruneIfTrue: Function taking ProcessData to return true if the node 152 should be pruned from the result 153 :param reverse: if true, the calllist will be properly reversed ( taking childre into account """ 154 155 def predecessors( node, nextNode, reverse, pruneIfTrue ): 156 out = [] 157 158 # invert the callorder - each predecessor list defines the input calls 159 # a process has made - to properly reporoduce that, the call order needs to be 160 # inverted as well 161 predlist = self.predecessors( node ) 162 lenpredlist = len( predlist ) - 1 163 if not reverse: 164 lenpredlist *= -1 # will keep the right, non reversed order 165 166 predlist = [ ( lenpredlist - p.index, p ) for p in predlist ] 167 predlist.sort() 168 169 prednextnode = node 170 pruneThisNode = pruneIfTrue( node ) 171 if pruneThisNode: 172 prednextnode = nextNode 173 174 # enumerate the other way round, as the call list needs to be inverted 175 for i,pred in predlist: 176 out.extend( predecessors( pred, prednextnode, reverse, pruneIfTrue ) ) 177 178 if not pruneThisNode: 179 out.append( ( node, nextNode ) ) 180 return out
181 # END predecessors 182 calllist = predecessors( self.callRoot(), None, reverse, pruneIfTrue ) 183 if not reverse: 184 calllist.reverse() # actually brings it in the right order, starting at root 185 return calllist
186 187 #} END utility classes 188
189 - def __init__( self, **kwargs ):
190 """Initalized base class""" 191 super( Workflow, self ).__init__( **kwargs ) 192 193 self._callgraph = None 194 self._mode = False
195 196
197 - def __str__( self ):
198 return self.name
199
200 - def copyFrom( self, other ):
201 """Only mode is required """ 202 self._mode = other._mode 203 # shallow copy callgraph 204 self._callgraph = other._callgraph
205 206 #{ Main Interface 207
208 - def makeTarget( self, target ):
209 """:param target: target to make - can be class or instance 210 :return: result when producing the target""" 211 # generate mode 212 import process 213 pb = process.ProcessBase 214 processmode = globalmode = pb.is_state | pb.target_state 215 216 shell, result = self._evaluate( target, processmode, globalmode ) 217 return result
218
219 - def makeTargets( self, targetList, errstream=None, donestream=None ):
220 """batch module compatible method allowing to make mutliple targets at once 221 222 :param targetList: iterable providing the targets to make 223 :param errstream: object with file interface allowing to log errors that occurred 224 during operation 225 :param donestream: if list, targets successfully done will be appended to it, if 226 it is a stream, the string representation will be wrtten to it""" 227 def to_stream(msg, stream): 228 """print msg to stream or use the logger instead""" 229 if stream: 230 stream.write( msg ) 231 else: 232 log.info(msg)
233 # END errstream handling 234 # END utility 235 236 for target in targetList: 237 try: 238 self.makeTarget( target ) 239 except ComputeError,e: 240 to_stream(str( e ) + "\n", errstream) 241 except Exception, e: 242 # except all 243 msg = "--> UNHANDLED EXCEPTION: " + str( e ) + "\n" 244 msg += traceback.format_exc( ) 245 to_stream(msg, errstream) 246 if donestream is None: 247 continue 248 249 # all clear, put item to done list 250 if hasattr( donestream, "write" ): 251 donestream.write( str( target ) + "\n" ) 252 else: 253 # assume its a list 254 donestream.append( target ) 255 # END for each target 256 257 258
259 - def _evaluateDirtyState( self, outputplug, processmode ):
260 """Evaluate the given plug in process mode and return a dirty report tuple 261 as used by `makeDirtyReport`""" 262 report = list( ( outputplug, None ) ) 263 try: 264 outputplug.clearCache( clear_affected = False ) # assure it eavaluates 265 outputplug.get( processmode ) # trigger computation, might raise 266 except DirtyException, e: 267 report[ 1 ] = e # remember report as well 268 except Exception: 269 # Renember normal errors , put them into a dirty report, as well as stacktrace 270 excformat = traceback.format_exc( ) 271 report[ 1 ] = DirtyException( report = ''.join( excformat ) ) 272 273 return tuple( report )
274 275
276 - def makeDirtyReport( self, target, mode = "single" ):
277 """ 278 :return: list of tuple( shell, DirtyReport|None ) 279 If a process ( shell.node ) is dirty, a dirty report will be given explaining 280 why the process is dirty and needs an update 281 :param target: target you which to check for it's dirty state 282 :param mode: 283 * single - only the process assigned to evaluate target will be checked 284 * multi - as single, but the whole callgraph will be checked, starting 285 at the first node, stepping down the callgraph. This gives a per 286 node dirty report. 287 * deep - try to evaluate target, but fail if one process in the target's 288 call history is dirty 289 """ 290 import process 291 pb = process.ProcessBase 292 processmode = pb.is_state | pb.dirty_check 293 globalmode = None 294 295 # lets make the mode clear 296 if mode == "deep" : 297 globalmode = processmode # input processes may apply dirty check ( and fail ) 298 elif mode in ( "single", "multi" ): 299 globalmode = pb.is_state # input process should just return the current state, no checking 300 else: 301 raise AssertionError( "invalid mode: %s" % mode ) 302 303 outreports = [] 304 305 # GET INITIAL REPORT 306 ###################### 307 outputplug = self._setupProcess( target, globalmode ) 308 outreports.append( self._evaluateDirtyState( outputplug, processmode ) ) 309 310 311 # STEP THE CALLGRAPH ? 312 if mode == "multi": 313 # walk the callgraph and get dirty reports from each node 314 calllist = self._callgraph.toCallList( reverse = False ) 315 for s_pdata,d_pdata in calllist: 316 if d_pdata is None: # the root node ( we already called it ) has no destination 317 continue 318 319 outputshell = d_pdata.process.toShell( d_pdata.plug ) 320 outreports.append( self._evaluateDirtyState( outputshell, processmode ) ) 321 # END for each s_pdata, d_pdata 322 # END if multi handling 323 return outreports
324 325
326 - def _clearState( self, global_evaluation_mode ):
327 """Clear the previous state and re-initialize this instance getting ready 328 for a new instance 329 330 :param global_evaluation_mode: evaluation mode to be used""" 331 self._callgraph = Workflow.CallGraph( ) 332 self._mode = global_evaluation_mode
333 334
335 - def _setupProcess( self, target, globalmode ):
336 """Setup the workflow's dg such that the returned output shell can be queried 337 to evaluate target 338 339 :param globalmode: mode with which all other processes will be handling 340 their input calls 341 """ 342 # find suitable process 343 inputshell = self.targetRating( target )[1] 344 if inputshell is None: 345 raise TargetError( "Cannot handle target %r" % target ) 346 347 # clear previous callgraph 348 self._clearState( globalmode ) 349 350 # prepare all processes 351 for node in self.iterNodes( ): 352 node.prepareProcess( ) 353 # END reset dg handling 354 355 356 # OUTPUT SHELL HANDLING 357 ######################### 358 # Find a shell that we can query to trigger the graph to evaluate 359 # get output plug that can be queried to get the target - follow the flow 360 # of the graph downstream and get the first compatible plug that would 361 # return the same type that we put in 362 # NOTE: requires an unconnected output plug by convention ! 363 these = lambda shell: not shell.plug.providesOutput() or shell.outputs( ) 364 allAffectedNodes = ( shell.node for shell in inputshell.iterShells( direction = "down", visit_once = 1, prune = these ) ) 365 outputshell = None 366 # AFFECTED NODES 367 ################## 368 # use last compatible node in the chain - 369 for node in allAffectedNodes: 370 try: 371 shell = node.targetRating( target, check_input_plugs = False, raise_on_ambiguity = 0 )[1] # 1 == plug 372 except TypeError,e: # ambiguous outputs 373 log.error(str( e )) 374 continue 375 # END handle exceptions 376 377 if shell: 378 outputshell = shell 379 # so here it comes - take this break away, and workflow might not work anymore 380 # perhaps we should add a flag to define whether workflows should keep on looking 381 # or not - if not, one has to carefully plane the output plugs ... 382 break 383 # END for each affected node try to get a valid shell 384 385 # AFFECTED PLUGS HANDLING 386 if not outputshell: 387 # try to use just the affected ones - that would be the best we have 388 outplugs = inputshell.plug.affected() 389 390 if not outplugs: 391 raise TargetError( "Plug %r takes target %r as input, but does not affect an output plug that would take the same target type" % ( str( inputshell ), target ) ) 392 393 is_valid_shell = lambda shell: not these( shell ) and shell.plug.attr.compatabilityRate( target ) 394 for plug in outplugs: 395 shell = inputshell.node.toShell( plug ) 396 397 if is_valid_shell( shell ): 398 log.warning("Did not find output plug delivering our target type - fallback to simple affected checks on node") 399 outputshell = shell 400 break 401 # END valid shell check 402 # END for each plug in node output plugs 403 # END try to get output shell by checking affects of current node 404 405 # still no shell ? 406 if not outputshell: 407 raise TypeError( "Target %s cannot be handled by this workflow (%s) as a computable output for %s cannot be found" % ( target, self, str( inputshell ) ) ) 408 409 # we do not care about ambiguity, simply pull one 410 # QUESTION: should we warn about multiple affected plugs ? 411 inputshell.set( target, ignore_connection = True ) 412 return outputshell
413 414
415 - def _evaluate( self, target, processmode, globalmode ):
416 """Make or update the target using a process in our workflow 417 418 :param processmode: the mode with which to call the initial process 419 :return: tuple( shell, result ) - plugshell queried to get the result 420 """ 421 outputshell = self._setupProcess( target, globalmode ) 422 ###################################################### 423 result = outputshell.get( processmode ) 424 ###################################################### 425 426 if len( self._callgraph._call_stack ): 427 raise AssertionError( "Callstack was not empty after calculations for %r where done" % target ) 428 429 return ( outputshell, result )
430 431
432 - def createReportInstance( self, reportType ):
433 """Create a report instance that describes how the previous target was made 434 435 :param reportType: Report to populate with information - it must be a Plan based 436 class that can be instantiated and populated with call information. 437 A report analyses the call dependency graph generated during dg evaluation 438 and presents it. 439 :return: report instance whose makeReport method can be called to retrieve it""" 440 # make the target as dry run 441 return reportType( self._callgraph )
442 443 #} END main interface 444 445 446 #{ Query 447
448 - def targetSupportList( self ):
449 """:return: list of all supported target type 450 :note: this method is for informational purposes only""" 451 uniqueout = set() 452 for node in self.iterNodes(): 453 try: 454 uniqueout.update( set( node.supportedTargetTypes() ) ) 455 except Exception, e: 456 raise AssertionError( "Process %r failed when calling supportedTargetTypes" % p, e ) 457 # END for each p in nodes iter 458 return list( uniqueout )
459 460
461 - def targetRating( self, target ):
462 """ 463 :return: int range(0,255) indicating how well a target can be made 464 0 means not at all, 255 means perfect. 465 466 Return value is tuple ( rate, PlugShell ), containing the process and plug with the 467 highest rating or None if rate is 0 468 469 Walk the dependency graph such that leaf nodes have higher ratings than 470 non-leaf nodes 471 :note: you can use the `process.ProcessBase` enumeration for comparison""" 472 rescache = list() 473 best_process = None 474 475 for node in self.iterNodes( ): 476 try: 477 rate, shell = node.targetRating( target ) 478 except TypeError,e: 479 # could be that there is a node having ambigous plugs, but we are not 480 # going to take it anyway 481 continue 482 # END try-except TypeError 483 484 if not rate: 485 continue 486 487 # is leaf node ? ( no output connections ) 488 if not node.connections( 0, 1 ): 489 rate = rate * 2 # prefer leafs in the rating 490 491 rescache.append( ( rate, shell ) ) 492 # END for each process 493 494 rescache.sort() # last is most suitable 495 if not rescache or rescache[-1][0] == 0: 496 return ( 0, None ) 497 498 bestpick = rescache[-1] 499 500 # check if we have several best picks - raise if so 501 allbestpicks = [ pick for pick in rescache if pick[0] == bestpick[0] ] 502 if len( allbestpicks ) > 1: 503 raise AssertionError( "There should only be one suitable process for %r, found %i (%s)" % ( target, len( allbestpicks ), allbestpicks ) ) 504 505 506 shell = bestpick[1] 507 # recompute rate as we might have changed it 508 return shell.node.targetRating( target )
509
510 - def callgraph( self ):
511 """:return: current callgraph instance 512 :note: its strictly read-only and may not be changed""" 513 return self._callgraph
514 515 #} END query 516 517 #{ Internal Process Interface 518
519 - def _isDryRun( self ):
520 """:return: True if the current computation is a dry run""" 521 return self._mode
522
523 - def _trackOutputQueryStart( self, process, plug, mode ):
524 """Called by process base to indicate the start of a call of curProcess to targetProcess 525 This method tracks the actual call path taken through the graph ( which is dependent on the 526 dirty state of the prcoesses, allowing to walk it depth first to resolve the calls. 527 This also allows to create precise reports telling how to achieve a certain goal""" 528 pdata = Workflow.ProcessData( process, plug, mode ) 529 530 # keep the call graph 531 self._callgraph.startCall( pdata ) 532 return pdata # return so that decorators can use this information
533
534 - def _trackOutputQueryEnd( self, result = None ):
535 """Track that the process just finished its computation - thus the previously active process 536 should be on top of the stack again""" 537 # update last data and its call time 538 self._callgraph.endCall( result )
539 #} 540 541
542 - def _populateFromGraph( self, graph ):
543 """Parse the networkx graph and populate ourselves with the respective process 544 instances as described by the graph 545 546 :param graph: networkx graph whose nodes are process names to be found in the processes 547 module """ 548 raise NotImplementedError( "TODO" )
549