Package mrv :: Package automation :: Module qa
[hide private]
[frames] | no frames]

Source Code for Module mrv.automation.qa

  1  # -*- coding: utf-8 -*- 
  2  """Specialization of workflow to provide quality assurance capabilities. 
  3   
  4  General Idiom of a quality assurance facility is to provide read-only checks for 
  5  possibly quaility issues and possibly a fix for them. 
  6   
  7  The interface is determined by plugs that define the capabilities of the node implementing 
  8  the checks. 
  9   
 10  The quality assurance framework is defined by: 
 11           * `QAWorkflow` 
 12           * `QAProcessBase` 
 13           * `QACheckResult` 
 14           * `QACheckAttribute` 
 15   
 16  They specialize the respective parts of the workflow""" 
 17  __docformat__ = "restructuredtext" 
 18   
 19  from workflow import Workflow 
 20  from process import ProcessBase 
 21  from mrv.util import EventSender, Event 
 22  from mrv.dge import Attribute, plug, ComputeFailed 
 23  from mrv.enum import create as enum 
 24  import sys 
 25   
 26  import logging 
 27  log = logging.getLogger("mrv.automation.qa") 
 28   
 29  #{ Exceptions 
30 -class CheckIncompatibleError( ComputeFailed ):
31 """Raised if a check cannot accomdate the requested mode and thus cannot run""" 32 pass
33 34 35 #} END exceptions 36 37
38 -class QAProcessBase( ProcessBase ):
39 """Quality Assurance Process including a specialized QA interface""" 40 41 # query: find issues and report them using `QACheckResult`, but do not attempt to fix 42 # fix: find issues and fix them, report fixed ( and possibly failed ) items by 43 eMode = enum( "query", "fix" ) # computation mode for QAProcessBasees 44 45 #( Configuration 46 # QA Processes do not require this feature due to their quite simplistic call structure 47 # If required, subclasses can override this though 48 track_compute_calls = False 49 #) END configuration 50 51 52 #{ Interface
53 - def assureQuality( self, check, mode, *args, **kwargs ):
54 """Called when the test identified by plug should be handled 55 56 :param check: QACheck to be checked for issues 57 :param mode: mode of the computation, see `QAProcessBase.eMode` 58 :return: QACheckResult instance keeping information about the outcome of the test""" 59 raise NotImplementedError( "To be implemented by subclass" )
60
61 - def listChecks( self, **kwargs ):
62 """:return: list( QACheck, ... ) list of our checks 63 :param kwargs: see `QAWorkflow.filterChecks`""" 64 return self.workflow().filterChecks( [ self ], **kwargs )
65 66 #} END interface 67
68 - def evaluateState( self, plug, mode, *args, **kwargs ):
69 """Prepares the call to the actual quality check implemenetation and assuring 70 test identified by plug can actually be run in the given mode""" 71 if mode is self.eMode.fix and not plug.attr.implements_fix: 72 raise CheckIncompatibleError( "Plug %s does not implement issue fixing" % plug ) 73 74 return self.assureQuality( plug, mode, *args, **kwargs )
75 76
77 -class QACheckAttribute( Attribute ):
78 """The Test Attribute represents an interface to a specific test as implemented 79 by the parent `QAProcessBase`. 80 The QA Attribute returns specialized quality assurance results and provides 81 additional information about the respective test 82 83 :note: as this class holds meta information about the respective test ( see `QACheck` ) 84 user interfaces may use it to adjust it's display 85 :note: this class depends on unknown mel implementations - on error we abort 86 but do not throw as this would cause class creation to fail and leave the whole 87 qa system unusable""" 88
89 - def __init__( self, annotation, has_fix = False, 90 flags = Attribute.computable ):
91 """Initialize attribute with meta information 92 93 :param annotation: information string describing the purpose of the test 94 :param has_fix: if True, the check must implement a fix for the issues it checks for, 95 if False, it can only report issues 96 :param flags: configuration flags for the plug - default to trigger computation even without 97 input""" 98 super( QACheckAttribute, self ).__init__( QACheckResult, flags ) 99 self.annotation = annotation 100 self.implements_fix = has_fix
101 102
103 -class QACheck( plug ):
104 """Defines a test suitable to be run and computed by a `QAProcessBase` 105 It's nothing more than a convenience class as the actual information is held by the 106 respective `QACheckAttribute`. 107 All non-plug calls are passed on to the underlying attribute, allowing it to 108 be treated like one""" 109 #{ Configuration 110 111 # class of the check attribute to use when instanciating this check 112 check_attribute_cls = QACheckAttribute 113 #} END configuration 114
115 - def __init__( self, *args, **kwargs ):
116 super( QACheck, self ).__init__( self.check_attribute_cls( *args, **kwargs ) )
117
118 - def __getattr__( self, attrname ):
119 return getattr( self.attr, attrname )
120 121
122 -class QAWorkflow( Workflow, EventSender ):
123 """Represents a workflow of QAProcessBase instances and allows to query them more 124 conveniently""" 125 126 #( Configuration 127 sender_as_argument = False 128 129 # if True, we will abort once the first error has been raised during check execution 130 # It is also held as instance variable so it can be set on per instance basis, allowing 131 # error check callbacks to adjust the error handling behaviour and abort the operation 132 abort_on_error = False 133 134 # as checks can take some time, it might be useful to have realtime results 135 # to std out in UI mode at least. It accompanies the feedback the workflow 136 # gives and keeps the default unittest style 137 info_to_stdout = True 138 #) END configuration 139 140 #( Filters 141 fIsQAProcessBase = staticmethod( lambda n: isinstance( n, QAProcessBase ) ) 142 fIsQAPlug = staticmethod( lambda p: isinstance( p, QACheck ) ) 143 #) END filters 144 145 #{ Events 146 # called before a check is run as func: func( event, check ) 147 e_preCheck = Event() 148 149 # called if a check fails with an error: func( event, check, exception, workflow ) 150 e_checkError = Event() 151 152 # called after a check has been run: func( event, check, result ) 153 e_postCheck = Event() 154 #} 155
156 - def __init__( self, *args, **kwargs ):
157 """Initialize our instance""" 158 super( QAWorkflow, self ).__init__( *args, **kwargs ) 159 160 # store abort on error as instance variable so that it can easily be overwritten 161 self.abort_on_error = QAWorkflow.abort_on_error
162
163 - def listQAProcessBasees( self, predicate = lambda p: True ):
164 """:return: list( Process, ... ) list of QA Processes known to this QA Workflow 165 :param predicate: include process p in result if func( p ) returns True""" 166 return self.iterNodes( predicate = lambda n: self.fIsQAProcessBase( n ) and predicate( n ) )
167
168 - def filterChecks( self, processes, predicate = lambda c: True ):
169 """As `listChecks`, but allows you do define the processes to use 170 171 :param predicate: func( p ) for plug p returns True for it to be included in the result""" 172 outchecks = list() 173 for node in processes: 174 outchecks.extend( node.toShells( node.plugs( lambda c: self.fIsQAPlug( c ) and predicate( c ) ) ) ) 175 return outchecks
176
177 - def listChecks( self, predicate = lambda c: True ):
178 """List all checks as supported by `QAProcessBase` es in this QA Workflow 179 180 :param predicate: include check c in result if func( c ) returns True""" 181 return self.filterChecks( self.listQAProcessBasees( ), predicate = predicate )
182
183 - def runChecks( self, checks, mode = QAProcessBase.eMode.query, clear_result = True ):
184 """Run the given checks in the given mode and return their results 185 186 :param checks: list( QACheckShell, ... ) as retrieved by `listChecks` 187 :param mode: `QAProcessBase.eMode` 188 :param clear_result: if True, the plug's cache will be removed forcing a computation 189 if False, you might get a cached value depending on the plug's setup 190 :return: list( tuple( QACheckShell, QACheckResult ), ... ) list of pairs of 191 QACheckShells and the check's result. The test result will be empty if the test 192 did not run or failed with an exception 193 :note: Sends the following events: ``e_preCheck`` , ``e_postCheck``, ``e_checkError`` 194 e_checkError may set the abort_on_error variable to True to cause the operation 195 not to proceed with other checks""" 196 # reset abort on error to class default 197 self.abort_on_error = self.__class__.abort_on_error 198 self._clearState( mode ) # assure we get a new callgraph 199 200 outresult = list() 201 for checkshell in checks: 202 if self.info_to_stdout: 203 checkplug = checkshell.plug 204 log.info( "Running %s: %s ... " % ( checkplug.name(), checkplug.annotation ) ) 205 # END extra info 206 207 self.e_preCheck.send( self.e_preCheck, checkshell ) 208 209 result = QACheckResult() # null value 210 if clear_result: 211 checkshell.clearCache( clear_affected = False ) 212 213 shellmode = mode 214 # some only can do check mode 215 if not checkshell.plug.implements_fix: 216 shellmode = checkshell.node.eMode.query 217 218 try: 219 result = checkshell.get( shellmode ) 220 except Exception, e: 221 self.e_checkError.send( self.e_checkError, checkshell, e, self ) 222 223 if self.abort_on_error: 224 raise 225 # END error handling 226 227 if self.info_to_stdout: 228 msg = "FAILED" 229 if result.isSuccessful(): 230 msg = "OK" 231 log.info(msg) 232 # END extra info 233 234 # record result 235 outresult.append( ( checkshell, result ) ) 236 self.e_postCheck.send( self.e_postCheck, checkshell, result ) 237 # END for each check to run 238 239 return outresult
240
241 -class QACheckResult( object ):
242 """Wrapper class declaring test results as a type that provides a simple interface 243 to retrieve the test results 244 245 :note: test results are only reqtrieved by QACheckAttribute plugs"""
246 - def __init__( self , fixed_items = None , failed_items = None, header = "" ):
247 """Initialize ourselves with default values 248 249 :param fixed_items: if list of items, the instance is initialized with it 250 :param failed_items: list of items that could not be fixed 251 :param header: optional string giving additional specialized information on the 252 outcome of the test. Tests must supply a header - otherwise the result will be treated 253 as failed check""" 254 self.header = header 255 self.fixed_items = ( isinstance( fixed_items, list ) and fixed_items ) or list() 256 self.failed_items = ( isinstance( failed_items, list ) and failed_items ) or list()
257
258 - def fixedItems( self ):
259 """ 260 :return: list( Item , ... ) list of items ( the exact type may differ 261 depending on the actual test ) which have been fixed so they represent the 262 desired state""" 263 return self.fixed_items
264
265 - def failedItems( self ):
266 """ 267 :return: ( list( Item, ... ) list of failed items being items that could not be 268 fixed and are not yet in the desired state""" 269 return self.failed_items
270
271 - def isNull( self ):
272 """:return: True if the test result is empty, and thus resembles a null value""" 273 return not self.header or ( not self.failed_items and not self.fixed_items )
274
275 - def isSuccessful( self ):
276 """:return: True if the check is successful, and False if there are at least some failed objects""" 277 if not self.header: 278 return False 279 280 # we are successful if there are no failed items left 281 return not self.failed_items
282
283 - def __str__( self ):
284 if not self.header: 285 return "No check-result available" 286 287 msg = self.header + "\n" 288 if self.fixed_items: 289 msg += ", ".join( str( i ) for i in self.fixed_items ) + "\n" 290 msg += ", ".join( str( i ) for i in self.failed_items ) 291 return msg
292