2
"""Contains base class and common methods for all processes """
3
__docformat__ = "restructuredtext"
6
from mrv.dge import NodeBase
7
from mrv.dgfe import GraphNodeBase
8
from mrv.dge import Attribute
9
import mrv.automation.base as wflbase
11
from mrv.path import make_path
12
from mrv.util import Or
15
def track_output_call( func ):
16
"""Wraps the proecss.evaluateStateBase function allowing to gather plenty of information
17
about the call, as well as error statistics"""
19
def track_func( self, plug, mode ):
20
# return simple result if tracking is disabled
21
if not self.track_compute_calls:
22
return func( self, plug, mode )
24
pdata = self.workflow()._trackOutputQueryStart( self, plug, mode )
27
result = func( self, plug, mode )
30
self.workflow()._trackOutputQueryEnd( None )
33
self.workflow()._trackOutputQueryEnd( result )
42
class ProcessBase( NodeBase ):
43
"""The base class for all processes, defining a common interface
44
Inputs and Outputs of this node are statically described using plugs
46
:note: the process base is able to duplcate properly as it stores in constructor
49
kNo, kGood, kPerfect = 0, 127, 255 # specify how good a certain target can be produced
50
is_state, target_state, dirty_check = ( 1,2,4 )
52
noun = "Noun ProcessBase,redefine in subclass" # used in reports
53
verb = "Verb ProcessBase,redefine in subclass" # used in reports
56
# if False, the computation results will not be tracked in a callgraph
57
track_compute_calls = True
60
__all__.append( "ProcessBase" )
62
def __init__( self, id, *args, **kwargs ):
63
"""Initialize process with most common information
66
* noun: noun describing the process, ( i.e. "Process" )
67
* verb: verb describing the process, ( i.e. "processing" )
68
* workflow: the workflow this instance is part of"""
71
NodeBase.__init__( self, id = id, *args, **kwargs ) # init last - need our info first !
73
#{ iDuplicatable Interface
74
def createInstance( self, *args, **kwargs ):
75
"""Create a copy of self and return it"""
76
return self.__class__( self.id(), *self._args, **self._kwargs )
78
def copyFrom( self, other, *args, **kwargs ):
80
Note: we have already given our args to the class during instance creation,
81
thus we do not copy args again"""
88
def targetRating( self, target, check_input_plugs = True, **kwargs ):
90
:return: tuple( int, PlugShell )
91
int between 0 and 255 - 255 means target matches perfectly, 0
92
means complete incompatability. Any inbetweens indicate the target can be
93
achieved, but maybe just in a basic way
95
If rate is 0, the object will be None, otherwise its a plugShell to the
96
input attribute that can take target as input. In process terms this means
97
that at least one output plug exists that produces the target.
98
:param target: instance or class of target to check for compatability
99
:param check_input_plugs: if True, input plugs will be checked for compatability of target,
100
otherwise the output plugs
101
:raise TypeError: if the result is ambiguous and raise_on_ambiguity = 1"""
102
# query our ouput plugs for a compatible attr
104
if check_input_plugs:
105
tarplugs = self.inputPlugs( )
107
tarplugs = self.outputPlugs( )
109
plugrating = self.filterCompatiblePlugs( tarplugs, target, attr_as_source=False , **kwargs )
111
if not plugrating: # no plug ?
114
# remove all non-writable plugs - they can never be targets
115
writableRatedPlugs = []
116
for rate,plug in plugrating: # rate,plug tuple
117
if plug.attr.flags & Attribute.readonly:
118
continue # need to set the attribute
120
# connected plugs are an option, but prefer the ones being open
121
if self.toShell( plug ).input():
124
writableRatedPlugs.append( (rate,plug) )
125
# END writable only filters
127
if not writableRatedPlugs:
130
writableRatedPlugs.sort() # high comes last
132
rate, plug = writableRatedPlugs[-1]
133
return ( int(rate), self.toShell( plug ) )
136
def supportedTargetTypes( self ):
137
""":return: list target types that can be output
138
:note: targetTypes are classes, not instances"""
139
return [ p.attr.typecls for p in self.inputPlugs() ]
145
def evaluateState( self, plug, mode ):
146
""":return: an instance suitable to be stored in the given plug
148
:param plug: plug that triggered the computation - use it to compare against
149
your classes plugs to see which output is required and return a result suitable
151
:param mode: bit flags as follows:
154
your return value represents the current state of the process - your output will
155
represent what actually is present. You may not alter the state of your environment,
156
thus this operation is strictly read-only.
157
According to your output, when called you need to setup a certain state
158
and return the results according to that state. This flag means you are requrested
159
to return everything that is right according to the state you shall create.
160
If this state is disabled, you should not return the current state, but behave
161
according to the other ones.
164
your return value must represent the 'should' state - thus you must assure
165
that the environment is left in a state that matches your plug state - the result
166
of that operation will be returned.
167
Usually, but not necessarily, the is_state is also requested so that the output
168
represents the complete new is_state ( the new state after you changed the environment
169
to match the plug_state )
172
Always comes in conjunction with is_state. You are required to return the is_state
173
but raise a DirtyException if your inputs would require you to adjust the environment
174
to deliver the plug state. If the is_state if the environment is the plug_state
175
as there is nothing to do for you, do not raise and simply return your output.
177
The call takes place as there is no cache for plugType.
178
:note: needs to be implemented by subclasses, but subclasses can just call their
179
superclass for all unhandled plugs resulting in consistent error messages"""
180
raise PlugUnhandled( "Plug %s.%s cannot be handled - check your implementation" % ( self, str( plug ) ) )
185
def compute( self, plug, mode = None ):
186
"""Base implementation of the output, called by `input` Method.
187
Its used to have a general hook for the flow tracing
189
:param plug: plug to evaluate
190
:param mode: the mode of the valuation
191
:return: result of the computation"""
192
wfl = self.workflow()
193
finalmode = wfl._mode # use global mode
195
# if we are root, we take the mode given by the caller though
196
if self.track_compute_calls:
197
if wfl.callgraph().callRoot().process == self:
200
# either use the explicit mode or the global one
201
finalmode = mode or wfl._mode
203
# exceptions are handled by dgengine
204
# call actually implemented method
205
return self.evaluateState( plug, finalmode )
209
# methods that drive the actual call
211
def prepareProcess( self ):
212
"""Will be called on all processes of the workflow once before a target is
213
actually being queried by someone
214
It should be used to do whatever you think is required to work as process.
215
This uauslly is a special case for most preocesses"""
218
def workflow( self ):
219
""":return: the workflow instance we are connected with. Its used to query global data"""
225
class WorkflowProcessBase( GraphNodeBase, ProcessBase ):
226
"""A process wrapping a workflow, allowing workflows to be nested
227
Derive from this class and initialize it with the workflow you would like to have wrapped
228
The process works by transmitting relevant calls to its underlying workflow, allowing
229
nodeInsideNestedWorkflow -> thisworkflow.node.plug connections
231
Workflows are standin nodes - they can connect anything their wrapped nodes can connect
233
:note: to prevent dependency issues, the workflow instance will be bound on first use
235
__all__.append( "WorkflowProcessBase" )
236
workflow_file = "name of the workflow dot file ( incl. extension )"
237
workflow_directory = "directory containing workflows to load "
239
exclude_connected_plugs = True # if true, all plugs that are connected will be pruned
240
duplicate_wrapped_graph = False # we load our copies directly and thus have a copy
242
def __init__( self, id, wflInstance=None, **kwargs ):
243
""" Will take all important configuration variables from its class variables
244
- you should override these with your subclass
246
:param wflInstance: if given, this instance will be used instead of creating
247
a new workflow. Used by copy constructor.
248
:param kwargs: all arguments required to initialize the ProcessBase"""
250
wrappedwfl = wflInstance
252
wrappedwfl = self._createWrappedWfl( self.workflow_directory, self.workflow_file )
254
# NOTE: baseclass stores wrapped wfl for us
256
GraphNodeBase.__init__( self, wrappedwfl, **kwargs )
257
ProcessBase.__init__( self, id, **kwargs )
259
# adjust the ids of wrapped graph nodes with the name of their graph
260
# NO: if this is done, some recurisve facades have issues with their attribute
261
# names - although this could possibly be solved, renaming the nodes is in
263
#for node in self.wgraph.iterNodes():
264
# node.setID( "%s.%s" % ( id, node.id() ) )
266
# override name - per instance in our case
267
self.noun = wrappedwfl.name
268
self.verb = "internally computing"
270
def createInstance( self, *args, **kwargs ):
271
"""Create a copy of self and return it - required due to our very special constructor"""
272
return self.__class__( self.id(), wflInstance = self.wgraph )
274
def _createWrappedWfl( self, wfldir, wflname ):
276
:return: our wrapped workflow instance as created by a method loading a workflow
278
wfl = wflbase.loadWorkflowFromDotFile( make_path( wfldir ) / wflname )
281
def prepareProcess( self ):
282
"""As we have different callgraphs, but want proper reports, just swap in the
283
callgraph of our own workflow to allow it to be maintained correctly when the nodes
284
of the wrapped graph evaluate.
286
:note: this requires that we get called after the callgraph has bene initialized"""
287
if self.graph._callgraph.number_of_nodes():
288
raise AssertionError( "Callgraph of parent workflow %r was not empty" % self.graph )
290
self.wgraph.copyFrom( self.graph ) # copies required attributes
292
# Prepare all our wrapped nodes
293
for node in self.wgraph.iterNodes( ):
294
node.prepareProcess( )
296
# ProcessBase.prepareProcess( self )
298
def _iterNodes( self ):
299
""":return: generator for nodes that have no output connections or no input connections """
300
noOutput = lambda node: not node.connections( 0, 1 )
301
noInput = lambda node: not node.connections( 1, 0 )
302
return self.wgraph.iterNodes( predicate = Or( noInput, noOutput ) )
304
def _getNodePlugs( self ):
305
"""Override the base method, filtering it's output so that only unconnected plugs
307
outset = super( WorkflowProcessBase, self )._getNodePlugs( )
309
if self.exclude_connected_plugs:
311
for node, plug in outset:
312
shell = node.toShell( plug )
313
if not shell.isConnected():
314
finalset.add( ( node , plug ) )
315
# END if shell is unconnected
316
# END for each node,plug pair
319
self._addIncludeNodePlugs( outset ) # assure we never filter include plugs
320
# END exclude connected plugs