mrv.automation.qa
Covered: 169 lines
Missed: 17 lines
Skipped 108 lines
Percent: 90 %
  2
"""Specialization of workflow to provide quality assurance capabilities.
  4
General Idiom of a quality assurance facility is to provide read-only checks for
  5
possibly quaility issues and possibly a fix for them.
  7
The interface is determined by plugs that define the capabilities of the node implementing
  8
the checks.
 10
The quality assurance framework is defined by:
 11
	 * `QAWorkflow`
 12
	 * `QAProcessBase`
 13
	 * `QACheckResult`
 14
	 * `QACheckAttribute`
 16
They specialize the respective parts of the workflow"""
 17
__docformat__ = "restructuredtext"
 19
from workflow import Workflow
 20
from process import ProcessBase
 21
from mrv.util import EventSender, Event
 22
from mrv.dge import Attribute, plug, ComputeFailed
 23
from mrv.enum import create as enum
 24
import sys
 26
import logging
 27
log = logging.getLogger("mrv.automation.qa")
 30
class CheckIncompatibleError( ComputeFailed ):
 31
	"""Raised if a check cannot accomdate the requested mode and thus cannot run"""
 32
	pass
 38
class QAProcessBase( ProcessBase ):
 39
	"""Quality Assurance Process including a specialized QA interface"""
 43
	eMode = enum( "query", "fix" )	# computation mode for QAProcessBasees
 48
	track_compute_calls = False
 53
	def assureQuality( self, check, mode, *args, **kwargs ):
 54
		"""Called when the test identified by plug should be handled
 56
		:param check: QACheck to be checked for issues
 57
		:param mode: mode of the computation, see `QAProcessBase.eMode`
 58
		:return: QACheckResult instance keeping information about the outcome of the test"""
 59
		raise NotImplementedError( "To be implemented by subclass" )
 61
	def listChecks( self, **kwargs ):
 62
		""":return: list( QACheck, ... ) list of our checks
 63
		:param kwargs: see `QAWorkflow.filterChecks`"""
 64
		return self.workflow().filterChecks( [ self ], **kwargs )
 68
	def evaluateState( self, plug, mode, *args, **kwargs ):
 69
		"""Prepares the call to the actual quality check implemenetation and assuring
 70
		test identified by plug can actually be run in the given mode"""
 71
		if mode is self.eMode.fix and not plug.attr.implements_fix:
 72
			raise CheckIncompatibleError( "Plug %s does not implement issue fixing" % plug )
 74
		return self.assureQuality( plug, mode, *args, **kwargs )
 77
class QACheckAttribute( Attribute ):
 78
	"""The Test Attribute represents an interface to a specific test as implemented
 79
	by the parent `QAProcessBase`.
 80
	The QA Attribute returns specialized quality assurance results and provides
 81
	additional information about the respective test
 83
	:note: as this class holds meta information about the respective test ( see `QACheck` )
 84
		user interfaces may use it to adjust it's display
 85
	:note: this class depends on unknown mel implementations - on error we abort
 86
		but do not throw as this would cause class creation to fail and leave the whole
 87
		qa system unusable"""
 89
	def __init__( 	self, annotation, has_fix = False,
 90
				 	flags = Attribute.computable ):
 91
		"""Initialize attribute with meta information
 93
		:param annotation: information string describing the purpose of the test
 94
		:param has_fix: if True, the check must implement a fix for the issues it checks for,
 95
			if False, it can only report issues
 96
		:param flags: configuration flags for the plug - default to trigger computation even without
 97
			input"""
 98
		super( QACheckAttribute, self ).__init__( QACheckResult, flags )
 99
		self.annotation = annotation
100
		self.implements_fix = has_fix
103
class QACheck( plug ):
104
	"""Defines a test suitable to be run and computed by a `QAProcessBase`
105
	It's nothing more than a convenience class as the actual information is held by the
106
	respective `QACheckAttribute`.
107
	All non-plug calls are passed on to the underlying attribute, allowing it to
108
	be treated like one"""
112
	check_attribute_cls = QACheckAttribute
115
	def __init__( self, *args, **kwargs ):
116
		super( QACheck, self ).__init__( self.check_attribute_cls( *args, **kwargs ) )
118
	def __getattr__( self, attrname ):
119
		return getattr( self.attr, attrname )
122
class QAWorkflow( Workflow, EventSender ):
123
	"""Represents a workflow of QAProcessBase instances and allows to query them more
124
	conveniently"""
127
	sender_as_argument = False
132
	abort_on_error = False
137
	info_to_stdout = True
141
	fIsQAProcessBase = staticmethod( lambda n: isinstance( n, QAProcessBase ) )
142
	fIsQAPlug = staticmethod( lambda p: isinstance( p, QACheck ) )
147
	e_preCheck = Event()
150
	e_checkError = Event()
153
	e_postCheck = Event()
156
	def __init__( self, *args, **kwargs ):
157
		"""Initialize our instance"""
158
		super( QAWorkflow, self ).__init__( *args, **kwargs )
161
		self.abort_on_error = QAWorkflow.abort_on_error
163
	def listQAProcessBasees( self, predicate = lambda p: True ):
164
		""":return: list( Process, ... ) list of QA Processes known to this QA Workflow
165
		:param predicate: include process p in result if func( p ) returns True"""
166
		return self.iterNodes( predicate = lambda n: self.fIsQAProcessBase( n ) and predicate( n ) )
168
	def filterChecks( self, processes, predicate = lambda c: True ):
169
		"""As `listChecks`, but allows you do define the processes to use
171
		:param predicate: func( p ) for plug p returns True for it to be included in the result"""
172
		outchecks = list()
173
		for node in processes:
174
			outchecks.extend( node.toShells( node.plugs( lambda c: self.fIsQAPlug( c ) and predicate( c ) ) ) )
175
		return outchecks
177
	def listChecks( self, predicate = lambda c: True  ):
178
		"""List all checks as supported by `QAProcessBase` es in this QA Workflow
180
		:param predicate: include check c in result if func( c ) returns True"""
181
		return self.filterChecks( self.listQAProcessBasees( ), predicate = predicate )
183
	def runChecks( self, checks, mode = QAProcessBase.eMode.query, clear_result = True ):
184
		"""Run the given checks in the given mode and return their results
186
		:param checks: list( QACheckShell, ... ) as retrieved by `listChecks`
187
		:param mode: `QAProcessBase.eMode`
188
		:param clear_result: if True, the plug's cache will be removed forcing a computation
189
			if False, you might get a cached value depending on the plug's setup
190
		:return: list( tuple( QACheckShell, QACheckResult ), ... ) list of pairs of
191
			QACheckShells and the check's result. The test result will be empty if the test
192
			did not run or failed with an exception
193
		:note: Sends the following events: ``e_preCheck`` , ``e_postCheck``, ``e_checkError``
194
			e_checkError may set the abort_on_error variable to True to cause the operation
195
			not to proceed with other checks"""
197
		self.abort_on_error = self.__class__.abort_on_error
198
		self._clearState( mode )	# assure we get a new callgraph
200
		outresult = list()
201
		for checkshell in checks:
202
			if self.info_to_stdout:
203
				checkplug = checkshell.plug
204
				log.info( "Running %s: %s ... " % ( checkplug.name(), checkplug.annotation ) )
207
			self.e_preCheck.send( self.e_preCheck, checkshell )
209
			result = QACheckResult()	 	# null value
210
			if clear_result:
211
				checkshell.clearCache( clear_affected = False )
213
			shellmode = mode
215
			if not checkshell.plug.implements_fix:
216
				shellmode = checkshell.node.eMode.query
218
			try:
219
				result = checkshell.get( shellmode )
220
			except Exception, e:
221
				self.e_checkError.send( self.e_checkError, checkshell, e, self )
223
				if self.abort_on_error:
224
					raise
227
			if self.info_to_stdout:
228
				msg = "FAILED"
229
				if result.isSuccessful():
230
					msg = "OK"
231
				log.info(msg)
235
			outresult.append( ( checkshell, result ) )
236
			self.e_postCheck.send( self.e_postCheck, checkshell, result )
239
		return outresult
241
class QACheckResult( object ):
242
	"""Wrapper class declaring test results as a type that provides a simple interface
243
	to retrieve the test results
245
	:note: test results are only reqtrieved by QACheckAttribute plugs"""
246
	def __init__( self , fixed_items = None , failed_items = None, header = "" ):
247
		"""Initialize ourselves with default values
249
		:param fixed_items: if list of items, the instance is initialized with it
250
		:param failed_items: list of items that could not be fixed
251
		:param header: optional string giving additional specialized information on the
252
			outcome of the test. Tests must supply a header - otherwise the result will be treated
253
			as failed check"""
254
		self.header = header
255
		self.fixed_items = ( isinstance( fixed_items, list ) and fixed_items ) or list()
256
		self.failed_items = ( isinstance( failed_items, list ) and failed_items ) or list()
258
	def fixedItems( self ):
259
		"""
260
		:return: list( Item , ... ) list of items ( the exact type may differ
261
			depending on the actual test ) which have been fixed so they represent the
262
			desired state"""
263
		return self.fixed_items
265
	def failedItems( self ):
266
		"""
267
		:return: ( list( Item, ... ) list of failed items being items that could not be
268
			fixed and are not yet in the desired state"""
269
		return self.failed_items
271
	def isNull( self ):
272
		""":return: True if the test result is empty, and thus resembles a null value"""
273
		return not self.header or ( not self.failed_items and not self.fixed_items )
275
	def isSuccessful( self ):
276
		""":return: True if the check is successful, and False if there are at least some failed objects"""
277
		if not self.header:
278
			return False
281
		return not self.failed_items
283
	def __str__( self ):
284
		if not self.header:
285
			return "No check-result available"
287
		msg = self.header + "\n"
288
		if self.fixed_items:
289
			msg += ", ".join( str( i ) for i in self.fixed_items ) + "\n"
290
		msg += ", ".join( str( i ) for i in self.failed_items )
291
		return msg