2
"""Specialization of workflow to provide quality assurance capabilities.
4
General Idiom of a quality assurance facility is to provide read-only checks for
5
possibly quaility issues and possibly a fix for them.
7
The interface is determined by plugs that define the capabilities of the node implementing
10
The quality assurance framework is defined by:
16
They specialize the respective parts of the workflow"""
17
__docformat__ = "restructuredtext"
19
from workflow import Workflow
20
from process import ProcessBase
21
from mrv.util import EventSender, Event
22
from mrv.dge import Attribute, plug, ComputeFailed
23
from mrv.enum import create as enum
27
log = logging.getLogger("mrv.automation.qa")
30
class CheckIncompatibleError( ComputeFailed ):
31
"""Raised if a check cannot accomdate the requested mode and thus cannot run"""
38
class QAProcessBase( ProcessBase ):
39
"""Quality Assurance Process including a specialized QA interface"""
41
# query: find issues and report them using `QACheckResult`, but do not attempt to fix
42
# fix: find issues and fix them, report fixed ( and possibly failed ) items by
43
eMode = enum( "query", "fix" ) # computation mode for QAProcessBasees
46
# QA Processes do not require this feature due to their quite simplistic call structure
47
# If required, subclasses can override this though
48
track_compute_calls = False
53
def assureQuality( self, check, mode, *args, **kwargs ):
54
"""Called when the test identified by plug should be handled
56
:param check: QACheck to be checked for issues
57
:param mode: mode of the computation, see `QAProcessBase.eMode`
58
:return: QACheckResult instance keeping information about the outcome of the test"""
59
raise NotImplementedError( "To be implemented by subclass" )
61
def listChecks( self, **kwargs ):
62
""":return: list( QACheck, ... ) list of our checks
63
:param kwargs: see `QAWorkflow.filterChecks`"""
64
return self.workflow().filterChecks( [ self ], **kwargs )
68
def evaluateState( self, plug, mode, *args, **kwargs ):
69
"""Prepares the call to the actual quality check implemenetation and assuring
70
test identified by plug can actually be run in the given mode"""
71
if mode is self.eMode.fix and not plug.attr.implements_fix:
72
raise CheckIncompatibleError( "Plug %s does not implement issue fixing" % plug )
74
return self.assureQuality( plug, mode, *args, **kwargs )
77
class QACheckAttribute( Attribute ):
78
"""The Test Attribute represents an interface to a specific test as implemented
79
by the parent `QAProcessBase`.
80
The QA Attribute returns specialized quality assurance results and provides
81
additional information about the respective test
83
:note: as this class holds meta information about the respective test ( see `QACheck` )
84
user interfaces may use it to adjust it's display
85
:note: this class depends on unknown mel implementations - on error we abort
86
but do not throw as this would cause class creation to fail and leave the whole
89
def __init__( self, annotation, has_fix = False,
90
flags = Attribute.computable ):
91
"""Initialize attribute with meta information
93
:param annotation: information string describing the purpose of the test
94
:param has_fix: if True, the check must implement a fix for the issues it checks for,
95
if False, it can only report issues
96
:param flags: configuration flags for the plug - default to trigger computation even without
98
super( QACheckAttribute, self ).__init__( QACheckResult, flags )
99
self.annotation = annotation
100
self.implements_fix = has_fix
103
class QACheck( plug ):
104
"""Defines a test suitable to be run and computed by a `QAProcessBase`
105
It's nothing more than a convenience class as the actual information is held by the
106
respective `QACheckAttribute`.
107
All non-plug calls are passed on to the underlying attribute, allowing it to
108
be treated like one"""
111
# class of the check attribute to use when instanciating this check
112
check_attribute_cls = QACheckAttribute
115
def __init__( self, *args, **kwargs ):
116
super( QACheck, self ).__init__( self.check_attribute_cls( *args, **kwargs ) )
118
def __getattr__( self, attrname ):
119
return getattr( self.attr, attrname )
122
class QAWorkflow( Workflow, EventSender ):
123
"""Represents a workflow of QAProcessBase instances and allows to query them more
127
sender_as_argument = False
129
# if True, we will abort once the first error has been raised during check execution
130
# It is also held as instance variable so it can be set on per instance basis, allowing
131
# error check callbacks to adjust the error handling behaviour and abort the operation
132
abort_on_error = False
134
# as checks can take some time, it might be useful to have realtime results
135
# to std out in UI mode at least. It accompanies the feedback the workflow
136
# gives and keeps the default unittest style
137
info_to_stdout = True
141
fIsQAProcessBase = staticmethod( lambda n: isinstance( n, QAProcessBase ) )
142
fIsQAPlug = staticmethod( lambda p: isinstance( p, QACheck ) )
146
# called before a check is run as func: func( event, check )
149
# called if a check fails with an error: func( event, check, exception, workflow )
150
e_checkError = Event()
152
# called after a check has been run: func( event, check, result )
153
e_postCheck = Event()
156
def __init__( self, *args, **kwargs ):
157
"""Initialize our instance"""
158
super( QAWorkflow, self ).__init__( *args, **kwargs )
160
# store abort on error as instance variable so that it can easily be overwritten
161
self.abort_on_error = QAWorkflow.abort_on_error
163
def listQAProcessBasees( self, predicate = lambda p: True ):
164
""":return: list( Process, ... ) list of QA Processes known to this QA Workflow
165
:param predicate: include process p in result if func( p ) returns True"""
166
return self.iterNodes( predicate = lambda n: self.fIsQAProcessBase( n ) and predicate( n ) )
168
def filterChecks( self, processes, predicate = lambda c: True ):
169
"""As `listChecks`, but allows you do define the processes to use
171
:param predicate: func( p ) for plug p returns True for it to be included in the result"""
173
for node in processes:
174
outchecks.extend( node.toShells( node.plugs( lambda c: self.fIsQAPlug( c ) and predicate( c ) ) ) )
177
def listChecks( self, predicate = lambda c: True ):
178
"""List all checks as supported by `QAProcessBase` es in this QA Workflow
180
:param predicate: include check c in result if func( c ) returns True"""
181
return self.filterChecks( self.listQAProcessBasees( ), predicate = predicate )
183
def runChecks( self, checks, mode = QAProcessBase.eMode.query, clear_result = True ):
184
"""Run the given checks in the given mode and return their results
186
:param checks: list( QACheckShell, ... ) as retrieved by `listChecks`
187
:param mode: `QAProcessBase.eMode`
188
:param clear_result: if True, the plug's cache will be removed forcing a computation
189
if False, you might get a cached value depending on the plug's setup
190
:return: list( tuple( QACheckShell, QACheckResult ), ... ) list of pairs of
191
QACheckShells and the check's result. The test result will be empty if the test
192
did not run or failed with an exception
193
:note: Sends the following events: ``e_preCheck`` , ``e_postCheck``, ``e_checkError``
194
e_checkError may set the abort_on_error variable to True to cause the operation
195
not to proceed with other checks"""
196
# reset abort on error to class default
197
self.abort_on_error = self.__class__.abort_on_error
198
self._clearState( mode ) # assure we get a new callgraph
201
for checkshell in checks:
202
if self.info_to_stdout:
203
checkplug = checkshell.plug
204
log.info( "Running %s: %s ... " % ( checkplug.name(), checkplug.annotation ) )
207
self.e_preCheck.send( self.e_preCheck, checkshell )
209
result = QACheckResult() # null value
211
checkshell.clearCache( clear_affected = False )
214
# some only can do check mode
215
if not checkshell.plug.implements_fix:
216
shellmode = checkshell.node.eMode.query
219
result = checkshell.get( shellmode )
221
self.e_checkError.send( self.e_checkError, checkshell, e, self )
223
if self.abort_on_error:
227
if self.info_to_stdout:
229
if result.isSuccessful():
235
outresult.append( ( checkshell, result ) )
236
self.e_postCheck.send( self.e_postCheck, checkshell, result )
237
# END for each check to run
241
class QACheckResult( object ):
242
"""Wrapper class declaring test results as a type that provides a simple interface
243
to retrieve the test results
245
:note: test results are only reqtrieved by QACheckAttribute plugs"""
246
def __init__( self , fixed_items = None , failed_items = None, header = "" ):
247
"""Initialize ourselves with default values
249
:param fixed_items: if list of items, the instance is initialized with it
250
:param failed_items: list of items that could not be fixed
251
:param header: optional string giving additional specialized information on the
252
outcome of the test. Tests must supply a header - otherwise the result will be treated
255
self.fixed_items = ( isinstance( fixed_items, list ) and fixed_items ) or list()
256
self.failed_items = ( isinstance( failed_items, list ) and failed_items ) or list()
258
def fixedItems( self ):
260
:return: list( Item , ... ) list of items ( the exact type may differ
261
depending on the actual test ) which have been fixed so they represent the
263
return self.fixed_items
265
def failedItems( self ):
267
:return: ( list( Item, ... ) list of failed items being items that could not be
268
fixed and are not yet in the desired state"""
269
return self.failed_items
272
""":return: True if the test result is empty, and thus resembles a null value"""
273
return not self.header or ( not self.failed_items and not self.fixed_items )
275
def isSuccessful( self ):
276
""":return: True if the check is successful, and False if there are at least some failed objects"""
280
# we are successful if there are no failed items left
281
return not self.failed_items
285
return "No check-result available"
287
msg = self.header + "\n"
289
msg += ", ".join( str( i ) for i in self.fixed_items ) + "\n"
290
msg += ", ".join( str( i ) for i in self.failed_items )