Package mrv :: Package automation :: Module process
[hide private]
[frames] | no frames]

Source Code for Module mrv.automation.process

  1  # -*- coding: utf-8 -*- 
  2  """Contains base class and common methods for all processes """ 
  3  __docformat__ = "restructuredtext" 
  4  __all__ = list() 
  5   
  6  from mrv.dge import NodeBase 
  7  from mrv.dgfe import GraphNodeBase 
  8  from mrv.dge import Attribute 
  9  import mrv.automation.base as wflbase 
 10   
 11  from mrv.path import make_path 
 12  from mrv.util import Or 
13 14 15 -def track_output_call( func ):
16 """Wraps the proecss.evaluateStateBase function allowing to gather plenty of information 17 about the call, as well as error statistics""" 18 19 def track_func( self, plug, mode ): 20 # return simple result if tracking is disabled 21 if not self.track_compute_calls: 22 return func( self, plug, mode ) 23 24 pdata = self.workflow()._trackOutputQueryStart( self, plug, mode ) 25 26 try: 27 result = func( self, plug, mode ) 28 except Exception,e: 29 pdata.exception = e 30 self.workflow()._trackOutputQueryEnd( None ) 31 raise 32 33 self.workflow()._trackOutputQueryEnd( result ) 34 return result
35 36 # END track func 37 38 39 return track_func 40
41 42 -class ProcessBase( NodeBase ):
43 """The base class for all processes, defining a common interface 44 Inputs and Outputs of this node are statically described using plugs 45 46 :note: the process base is able to duplcate properly as it stores in constructor 47 arguments accordingly 48 """ 49 kNo, kGood, kPerfect = 0, 127, 255 # specify how good a certain target can be produced 50 is_state, target_state, dirty_check = ( 1,2,4 ) 51 52 noun = "Noun ProcessBase,redefine in subclass" # used in reports 53 verb = "Verb ProcessBase,redefine in subclass" # used in reports 54 55 #{ Configuration 56 # if False, the computation results will not be tracked in a callgraph 57 track_compute_calls = True 58 #} END configuration 59 60 __all__.append( "ProcessBase" ) 61
62 - def __init__( self, id, *args, **kwargs ):
63 """Initialize process with most common information 64 65 :param kwargs: 66 * noun: noun describing the process, ( i.e. "Process" ) 67 * verb: verb describing the process, ( i.e. "processing" ) 68 * workflow: the workflow this instance is part of""" 69 self._args = args 70 self._kwargs = kwargs 71 NodeBase.__init__( self, id = id, *args, **kwargs ) # init last - need our info first !
72 73 #{ iDuplicatable Interface
74 - def createInstance( self, *args, **kwargs ):
75 """Create a copy of self and return it""" 76 return self.__class__( self.id(), *self._args, **self._kwargs )
77
78 - def copyFrom( self, other, *args, **kwargs ):
79 """ 80 Note: we have already given our args to the class during instance creation, 81 thus we do not copy args again""" 82 pass
83 #} END iDuplicatable 84 85 86 #{ Query 87
88 - def targetRating( self, target, check_input_plugs = True, **kwargs ):
89 """ 90 :return: tuple( int, PlugShell ) 91 int between 0 and 255 - 255 means target matches perfectly, 0 92 means complete incompatability. Any inbetweens indicate the target can be 93 achieved, but maybe just in a basic way 94 95 If rate is 0, the object will be None, otherwise its a plugShell to the 96 input attribute that can take target as input. In process terms this means 97 that at least one output plug exists that produces the target. 98 :param target: instance or class of target to check for compatability 99 :param check_input_plugs: if True, input plugs will be checked for compatability of target, 100 otherwise the output plugs 101 :raise TypeError: if the result is ambiguous and raise_on_ambiguity = 1""" 102 # query our ouput plugs for a compatible attr 103 tarplugs = None 104 if check_input_plugs: 105 tarplugs = self.inputPlugs( ) 106 else: 107 tarplugs = self.outputPlugs( ) 108 109 plugrating = self.filterCompatiblePlugs( tarplugs, target, attr_as_source=False , **kwargs ) 110 111 if not plugrating: # no plug ? 112 return ( 0 , None ) 113 114 # remove all non-writable plugs - they can never be targets 115 writableRatedPlugs = [] 116 for rate,plug in plugrating: # rate,plug tuple 117 if plug.attr.flags & Attribute.readonly: 118 continue # need to set the attribute 119 120 # connected plugs are an option, but prefer the ones being open 121 if self.toShell( plug ).input(): 122 rate /= 2.0 123 124 writableRatedPlugs.append( (rate,plug) ) 125 # END writable only filters 126 127 if not writableRatedPlugs: 128 return ( 0, None ) 129 130 writableRatedPlugs.sort() # high comes last 131 132 rate, plug = writableRatedPlugs[-1] 133 return ( int(rate), self.toShell( plug ) )
134 135
136 - def supportedTargetTypes( self ):
137 """:return: list target types that can be output 138 :note: targetTypes are classes, not instances""" 139 return [ p.attr.typecls for p in self.inputPlugs() ]
140 141 #} END query 142 143 #{ Interface 144
145 - def evaluateState( self, plug, mode ):
146 """:return: an instance suitable to be stored in the given plug 147 148 :param plug: plug that triggered the computation - use it to compare against 149 your classes plugs to see which output is required and return a result suitable 150 to be stored in plug 151 :param mode: bit flags as follows: 152 153 is_state: 154 your return value represents the current state of the process - your output will 155 represent what actually is present. You may not alter the state of your environment, 156 thus this operation is strictly read-only. 157 According to your output, when called you need to setup a certain state 158 and return the results according to that state. This flag means you are requrested 159 to return everything that is right according to the state you shall create. 160 If this state is disabled, you should not return the current state, but behave 161 according to the other ones. 162 163 target_state: 164 your return value must represent the 'should' state - thus you must assure 165 that the environment is left in a state that matches your plug state - the result 166 of that operation will be returned. 167 Usually, but not necessarily, the is_state is also requested so that the output 168 represents the complete new is_state ( the new state after you changed the environment 169 to match the plug_state ) 170 171 dirty_check: 172 Always comes in conjunction with is_state. You are required to return the is_state 173 but raise a DirtyException if your inputs would require you to adjust the environment 174 to deliver the plug state. If the is_state if the environment is the plug_state 175 as there is nothing to do for you, do not raise and simply return your output. 176 177 The call takes place as there is no cache for plugType. 178 :note: needs to be implemented by subclasses, but subclasses can just call their 179 superclass for all unhandled plugs resulting in consistent error messages""" 180 raise PlugUnhandled( "Plug %s.%s cannot be handled - check your implementation" % ( self, str( plug ) ) )
181 182 # } END interface 183 184 @track_output_call
185 - def compute( self, plug, mode = None ):
186 """Base implementation of the output, called by `input` Method. 187 Its used to have a general hook for the flow tracing 188 189 :param plug: plug to evaluate 190 :param mode: the mode of the valuation 191 :return: result of the computation""" 192 wfl = self.workflow() 193 finalmode = wfl._mode # use global mode 194 195 # if we are root, we take the mode given by the caller though 196 if self.track_compute_calls: 197 if wfl.callgraph().callRoot().process == self: 198 finalmode = mode 199 else: 200 # either use the explicit mode or the global one 201 finalmode = mode or wfl._mode 202 203 # exceptions are handled by dgengine 204 # call actually implemented method 205 return self.evaluateState( plug, finalmode )
206 207 208 #{ Base 209 # methods that drive the actual call 210
211 - def prepareProcess( self ):
212 """Will be called on all processes of the workflow once before a target is 213 actually being queried by someone 214 It should be used to do whatever you think is required to work as process. 215 This uauslly is a special case for most preocesses""" 216 pass
217
218 - def workflow( self ):
219 """:return: the workflow instance we are connected with. Its used to query global data""" 220 return self.graph
221
222 #} END base 223 224 225 -class WorkflowProcessBase( GraphNodeBase, ProcessBase ):
226 """A process wrapping a workflow, allowing workflows to be nested 227 Derive from this class and initialize it with the workflow you would like to have wrapped 228 The process works by transmitting relevant calls to its underlying workflow, allowing 229 nodeInsideNestedWorkflow -> thisworkflow.node.plug connections 230 231 Workflows are standin nodes - they can connect anything their wrapped nodes can connect 232 233 :note: to prevent dependency issues, the workflow instance will be bound on first use 234 """ 235 __all__.append( "WorkflowProcessBase" ) 236 workflow_file = "name of the workflow dot file ( incl. extension )" 237 workflow_directory = "directory containing workflows to load " 238 239 exclude_connected_plugs = True # if true, all plugs that are connected will be pruned 240 duplicate_wrapped_graph = False # we load our copies directly and thus have a copy 241
242 - def __init__( self, id, wflInstance=None, **kwargs ):
243 """ Will take all important configuration variables from its class variables 244 - you should override these with your subclass 245 246 :param wflInstance: if given, this instance will be used instead of creating 247 a new workflow. Used by copy constructor. 248 :param kwargs: all arguments required to initialize the ProcessBase""" 249 250 wrappedwfl = wflInstance 251 if not wrappedwfl: 252 wrappedwfl = self._createWrappedWfl( self.workflow_directory, self.workflow_file ) 253 254 # NOTE: baseclass stores wrapped wfl for us 255 # init bases 256 GraphNodeBase.__init__( self, wrappedwfl, **kwargs ) 257 ProcessBase.__init__( self, id, **kwargs ) 258 259 # adjust the ids of wrapped graph nodes with the name of their graph 260 # NO: if this is done, some recurisve facades have issues with their attribute 261 # names - although this could possibly be solved, renaming the nodes is in 262 # fact not required 263 #for node in self.wgraph.iterNodes(): 264 # node.setID( "%s.%s" % ( id, node.id() ) ) 265 266 # override name - per instance in our case 267 self.noun = wrappedwfl.name 268 self.verb = "internally computing"
269
270 - def createInstance( self, *args, **kwargs ):
271 """Create a copy of self and return it - required due to our very special constructor""" 272 return self.__class__( self.id(), wflInstance = self.wgraph )
273
274 - def _createWrappedWfl( self, wfldir, wflname ):
275 """ 276 :return: our wrapped workflow instance as created by a method loading a workflow 277 from a file""" 278 wfl = wflbase.loadWorkflowFromDotFile( make_path( wfldir ) / wflname ) 279 return wfl
280
281 - def prepareProcess( self ):
282 """As we have different callgraphs, but want proper reports, just swap in the 283 callgraph of our own workflow to allow it to be maintained correctly when the nodes 284 of the wrapped graph evaluate. 285 286 :note: this requires that we get called after the callgraph has bene initialized""" 287 if self.graph._callgraph.number_of_nodes(): 288 raise AssertionError( "Callgraph of parent workflow %r was not empty" % self.graph ) 289 290 self.wgraph.copyFrom( self.graph ) # copies required attributes 291 292 # Prepare all our wrapped nodes 293 for node in self.wgraph.iterNodes( ): 294 node.prepareProcess( )
295 296 # ProcessBase.prepareProcess( self ) 297
298 - def _iterNodes( self ):
299 """:return: generator for nodes that have no output connections or no input connections """ 300 noOutput = lambda node: not node.connections( 0, 1 ) 301 noInput = lambda node: not node.connections( 1, 0 ) 302 return self.wgraph.iterNodes( predicate = Or( noInput, noOutput ) )
303
304 - def _getNodePlugs( self ):
305 """Override the base method, filtering it's output so that only unconnected plugs 306 will be returned""" 307 outset = super( WorkflowProcessBase, self )._getNodePlugs( ) 308 309 if self.exclude_connected_plugs: 310 finalset = set() 311 for node, plug in outset: 312 shell = node.toShell( plug ) 313 if not shell.isConnected(): 314 finalset.add( ( node , plug ) ) 315 # END if shell is unconnected 316 # END for each node,plug pair 317 318 outset = finalset 319 self._addIncludeNodePlugs( outset ) # assure we never filter include plugs 320 # END exclude connected plugs 321 322 return outset
323