mrv.automation.base
Covered: 89 lines
Missed: 32 lines
Skipped 72 lines
Percent: 73 %
  2
"""general methods and classes """
  3
__docformat__ = "restructuredtext"
  5
from mrv.dge import PlugAlreadyConnected
  6
import logging
  7
log = logging.getLogger("mrv.automation.base")
 11
def _toSimpleType( stringtype ):
 12
	for cls in [ int, float, str ]:
 13
		try:
 14
			return cls( stringtype )
 15
		except ValueError:
 16
			pass
 18
	raise ValueError( "Could not convert %r to any simple type" % stringtype )
 20
def _getNodeInfo( node ):
 21
	""":return: ( nodename, args, kwargs ) - all arguments have been parsed"""
 22
	args = [ node.get_name().strip('"') ]
 23
	nodeattrs = node.get_attributes()
 24
	tl = nodeattrs.get('toplabel', '').strip('"')
 25
	if tl:
 26
		args.extend( [ _toSimpleType( a ) for a in tl.split(',') ] )
 29
	kwargs = dict()
 30
	bl = nodeattrs.get('bottomlabel', '').strip('"')
 31
	if bl:
 32
		for kwa in bl.split(','):
 33
			k,v = tuple(kwa.split('='))
 34
			kwargs[ k ] = _toSimpleType( v )
 40
	typename = node.get_label()
 41
	if typename:
 42
		typename = typename.strip('"').split( "(" )[0]
 44
	return ( typename, args,kwargs )
 46
def loadWorkflowFromDotFile( dotfile, workflowcls = None ):
 47
	"""Create a graph from the given dotfile and create a workflow from it.
 48
	The workflow will be fully intiialized with connected process instances.
 49
	The all compatible plugs will automatically be connected for all processes
 50
	connected in the dot file
 52
	:param workflowcls: if not None, a dgengine.Graph compatible class to be used
 53
		for workflow creation. Defaults to automation.workflow.Workflow.
 54
	:return: List of initialized workflow classes - as they can be nested, the
 55
		creation of one workflow can actually create several of them"""
 56
	import pydot
 57
	import processes
 58
	from workflow import Workflow
 59
	wflclass = workflowcls or Workflow
 60
	dotgraph = pydot.graph_from_dot_file( dotfile )
 62
	if not dotgraph:
 63
		raise AssertionError( "Returned graph from file %r was None" % dotfile )
 67
	edge_lut = {}									# string -> processinst
 68
	wfl = wflclass( name=dotfile.namebase() )
 71
	for node in dotgraph.get_node_list():
 73
		nodeid = node.get_name().strip( '"' )
 75
		if nodeid == "node":
 76
			continue	
 77
		processname,args,kwargs = _getNodeInfo( node )
 81
		if not isinstance( processname, basestring ):
 82
			continue
 85
		try:
 86
			processcls = getattr( processes, processname )
 87
		except AttributeError:
 88
			raise TypeError( "Process '%s' not found in 'processes' module" % processname )
 91
		try:
 92
			processinst = processcls( *args, **kwargs )
 93
		except TypeError:
 94
			log.error( "Process %r could not be created as it required a different init call" % processcls )
 95
			raise
 96
		else:
 97
			edge_lut[ nodeid ] = processinst
 98
			wfl.addNode( processinst )
105
	for edge in dotgraph.get_edge_list():
106
		snode = edge_lut[ edge.get_source().strip('"') ]
107
		dnode = edge_lut[ edge.get_destination().strip('"') ]
108
		destplugs = dnode.inputPlugs( )
110
		numConnections = 0
111
		for sourceplug in snode.outputPlugs():
112
			try:
114
				targetcandidates = snode.filterCompatiblePlugs( destplugs, sourceplug.attr, raise_on_ambiguity = 0, attr_affinity = False, attr_as_source = True )
115
			except ( TypeError,IndexError ),e:	# could have no compatible plugs or is ambigous
116
				log.debug(str(e.args))		# debug
117
				continue
118
			else:
120
				blockedDestinationShells = list()
121
				numplugconnections = 0
122
				for rate,targetplug in targetcandidates:
123
					try:
124
						sshell = snode.toShell( sourceplug )
125
						dshell = dnode.toShell( targetplug )
126
						sshell.connect( dshell )
128
						numConnections += 1
129
						numplugconnections += 1
130
					except PlugAlreadyConnected:
132
						blockedDestinationShells.append( dnode.toShell( targetplug ) )
133
					else:
134
						pass 			# allow several connections ( if no other claims one ... )
140
				if numplugconnections > 0 or len( blockedDestinationShells ) < 2:
141
					continue
144
				sourcemap = dict()			# source->list( edge( s->d  ) ... )
145
				for shell in blockedDestinationShells:
146
					inshell = dshell.input()
147
					sourcemap.setdefault( inshell, list() ).append( ( inshell,dshell ) )
150
				for sourceshell, edgelist in sourcemap.iteritems():
151
					if len( edgelist ) < 2:
152
						continue
153
					sshell = snode.toShell( sourceplug )
154
					dshell = edgelist[-1][1]				# take the last edge as it possibly has lowest connection priority
155
					sshell.connect( dshell, force = 1 )	# connect breaking existing ones
157
					numConnections += 1
158
					break
165
		if numConnections == 0:
166
			raise AssertionError( "Found no compatible connection from %s to %s in workflow %s - check your processes" % ( snode, dnode, wfl ) )
169
	return wfl
172
def addWorkflowsFromDotFiles( module, dotfiles, workflowcls = None ):
173
	"""Create workflows from a list of dot-files and add them to the module
175
	:param workflowcls: see `loadWorkflowFromDotFile`
176
	:return: list of workflow instances created from the given files"""
177
	outwfls = list()
178
	for dotfile in dotfiles:
179
		wflname = dotfile.namebase()
182
		if hasattr( module, wflname ):
183
			continue
185
		wflinst = loadWorkflowFromDotFile( dotfile, workflowcls = workflowcls )
186
		setattr( module, wflname , wflinst )
187
		outwfls.append( wflinst )
189
	return outwfls