2
"""general methods and classes """
3
__docformat__ = "restructuredtext"
5
from mrv.dge import PlugAlreadyConnected
7
log = logging.getLogger("mrv.automation.base")
11
def _toSimpleType( stringtype ):
12
for cls in [ int, float, str ]:
14
return cls( stringtype )
17
# END for each simple type
18
raise ValueError( "Could not convert %r to any simple type" % stringtype )
20
def _getNodeInfo( node ):
21
""":return: ( nodename, args, kwargs ) - all arguments have been parsed"""
22
args = [ node.get_name().strip('"') ]
23
nodeattrs = node.get_attributes()
24
tl = nodeattrs.get('toplabel', '').strip('"')
26
args.extend( [ _toSimpleType( a ) for a in tl.split(',') ] )
30
bl = nodeattrs.get('bottomlabel', '').strip('"')
32
for kwa in bl.split(','):
33
k,v = tuple(kwa.split('='))
34
kwargs[ k ] = _toSimpleType( v )
35
# END for each kw value
36
# END if bottom label is set
38
# convert name such that if one can write nodename(args,kwargs), without
39
# destroing the original node name
40
typename = node.get_label()
42
typename = typename.strip('"').split( "(" )[0]
44
return ( typename, args,kwargs )
46
def loadWorkflowFromDotFile( dotfile, workflowcls = None ):
47
"""Create a graph from the given dotfile and create a workflow from it.
48
The workflow will be fully intiialized with connected process instances.
49
The all compatible plugs will automatically be connected for all processes
50
connected in the dot file
52
:param workflowcls: if not None, a dgengine.Graph compatible class to be used
53
for workflow creation. Defaults to automation.workflow.Workflow.
54
:return: List of initialized workflow classes - as they can be nested, the
55
creation of one workflow can actually create several of them"""
58
from workflow import Workflow
59
wflclass = workflowcls or Workflow
60
dotgraph = pydot.graph_from_dot_file( dotfile )
63
raise AssertionError( "Returned graph from file %r was None" % dotfile )
66
# use the filename as name
67
edge_lut = {} # string -> processinst
68
wfl = wflclass( name=dotfile.namebase() )
71
for node in dotgraph.get_node_list():
72
# can have initializers
73
nodeid = node.get_name().strip( '"' )
77
processname,args,kwargs = _getNodeInfo( node )
79
# skip nodes with incorrect label - the parser returns one node each time it appears
80
# in the file, although its mentioned in connections, at least if labels are used
81
if not isinstance( processname, basestring ):
86
processcls = getattr( processes, processname )
87
except AttributeError:
88
raise TypeError( "Process '%s' not found in 'processes' module" % processname )
90
# create instance and add to workflow
92
processinst = processcls( *args, **kwargs )
94
log.error( "Process %r could not be created as it required a different init call" % processcls )
97
edge_lut[ nodeid ] = processinst
98
wfl.addNode( processinst )
99
# END for each node in graph
104
# create most suitable plug connections
105
for edge in dotgraph.get_edge_list():
106
snode = edge_lut[ edge.get_source().strip('"') ]
107
dnode = edge_lut[ edge.get_destination().strip('"') ]
108
destplugs = dnode.inputPlugs( )
111
for sourceplug in snode.outputPlugs():
114
targetcandidates = snode.filterCompatiblePlugs( destplugs, sourceplug.attr, raise_on_ambiguity = 0, attr_affinity = False, attr_as_source = True )
115
except ( TypeError,IndexError ),e: # could have no compatible plugs or is ambigous
116
log.debug(str(e.args)) # debug
119
# if a plug is already connected, try another one
120
blockedDestinationShells = list()
121
numplugconnections = 0
122
for rate,targetplug in targetcandidates:
124
sshell = snode.toShell( sourceplug )
125
dshell = dnode.toShell( targetplug )
126
sshell.connect( dshell )
129
numplugconnections += 1
130
except PlugAlreadyConnected:
131
# remember the connected d-shell - we might disconnect it later
132
blockedDestinationShells.append( dnode.toShell( targetplug ) )
134
pass # allow several connections ( if no other claims one ... )
135
# END for each candidate
137
# if we have no connecitons, and one node already connected has at least two from
138
# the same plug disconnect the node in question
139
# Dont do anything if we are connected or have less than 2 blocked
140
if numplugconnections > 0 or len( blockedDestinationShells ) < 2:
143
# count connections by sourceshell
144
sourcemap = dict() # source->list( edge( s->d ) ... )
145
for shell in blockedDestinationShells:
146
inshell = dshell.input()
147
sourcemap.setdefault( inshell, list() ).append( ( inshell,dshell ) )
149
# find multiple edges
150
for sourceshell, edgelist in sourcemap.iteritems():
151
if len( edgelist ) < 2:
153
sshell = snode.toShell( sourceplug )
154
dshell = edgelist[-1][1] # take the last edge as it possibly has lowest connection priority
155
sshell.connect( dshell, force = 1 ) # connect breaking existing ones
159
# END for each sourceshell record
161
# END try connecting plugs
162
# END for each output plug on snode
164
# assure we have a connection
165
if numConnections == 0:
166
raise AssertionError( "Found no compatible connection from %s to %s in workflow %s - check your processes" % ( snode, dnode, wfl ) )
172
def addWorkflowsFromDotFiles( module, dotfiles, workflowcls = None ):
173
"""Create workflows from a list of dot-files and add them to the module
175
:param workflowcls: see `loadWorkflowFromDotFile`
176
:return: list of workflow instances created from the given files"""
178
for dotfile in dotfiles:
179
wflname = dotfile.namebase()
180
# it can be that a previous nested workflow already created the workflow
181
# in which case we do not want to recreate it
182
if hasattr( module, wflname ):
185
wflinst = loadWorkflowFromDotFile( dotfile, workflowcls = workflowcls )
186
setattr( module, wflname , wflinst )
187
outwfls.append( wflinst )