Package ndg :: Package saml :: Package test :: Package binding :: Package soap :: Module test_authzservice
[hide private]

Source Code for Module ndg.saml.test.binding.soap.test_authzservice

  1  #!/usr/bin/env python 
  2  """Unit tests for WSGI SAML 2.0 SOAP Authorisation Decision Query Interface 
  3   
  4  NERC DataGrid Project 
  5  """ 
  6  __author__ = "P J Kershaw" 
  7  __date__ = "15/02/2010" 
  8  __copyright__ = "(C) 2010 Science and Technology Facilities Council" 
  9  __license__ = "http://www.apache.org/licenses/LICENSE-2.0" 
 10  __contact__ = "Philip.Kershaw@stfc.ac.uk" 
 11  __revision__ = '$Id: test_authzservice.py 7154 2010-07-01 15:59:59Z pjkersha $' 
 12  import unittest 
 13  from uuid import uuid4 
 14  from datetime import datetime, timedelta 
 15  from cStringIO import StringIO 
 16   
 17  from ndg.soap.etree import SOAPEnvelope 
 18   
 19  from ndg.saml.saml2.core import (SAMLVersion, Subject, NameID, Issuer,  
 20                                   AuthzDecisionQuery, AuthzDecisionStatement,  
 21                                   Status, StatusCode, StatusMessage,  
 22                                   DecisionType, Action, Conditions, Assertion) 
 23  from ndg.saml.xml.etree import (AuthzDecisionQueryElementTree,  
 24                                  ResponseElementTree) 
 25  from ndg.saml.test.binding.soap import WithPasteFixtureBaseTestCase 
 26   
 27   
28 -class TestAuthorisationServiceMiddleware(object):
29 """Test Authorisation Service interface stub""" 30 QUERY_INTERFACE_KEYNAME_OPTNAME = 'queryInterfaceKeyName' 31 RESOURCE_URI = 'http://localhost/dap/data/' 32 ISSUER_DN = '/O=Test/OU=Authorisation/CN=Service Stub' 33
34 - def __init__(self, app, global_conf, **app_conf):
38
39 - def __call__(self, environ, start_response):
40 environ[self.queryInterfaceKeyName] = self.authzDecisionQueryFactory() 41 return self._app(environ, start_response)
42
44 """Makes the authorisation decision""" 45 46 def authzDecisionQuery(query, response): 47 """Authorisation Decision Query interface called by the next 48 middleware in the stack the SAML SOAP Query interface middleware 49 instance 50 (ndg.saml.saml2.binding.soap.server.wsgi.queryinterface.SOAPQueryInterfaceMiddleware) 51 """ 52 now = datetime.utcnow() 53 response.issueInstant = now 54 55 # Make up a request ID that this response is responding to 56 response.inResponseTo = query.id 57 response.id = str(uuid4()) 58 response.version = SAMLVersion(SAMLVersion.VERSION_20) 59 60 response.status = Status() 61 response.status.statusCode = StatusCode() 62 response.status.statusCode.value = StatusCode.SUCCESS_URI 63 response.status.statusMessage = StatusMessage() 64 response.status.statusMessage.value = \ 65 "Response created successfully" 66 67 assertion = Assertion() 68 assertion.version = SAMLVersion(SAMLVersion.VERSION_20) 69 assertion.id = str(uuid4()) 70 assertion.issueInstant = now 71 72 authzDecisionStatement = AuthzDecisionStatement() 73 74 # Make some simple logic to simulate a full access policy 75 if query.resource == self.__class__.RESOURCE_URI: 76 if query.actions[0].value == Action.HTTP_GET_ACTION: 77 authzDecisionStatement.decision = DecisionType.PERMIT 78 else: 79 authzDecisionStatement.decision = DecisionType.DENY 80 else: 81 authzDecisionStatement.decision = DecisionType.INDETERMINATE 82 83 authzDecisionStatement.resource = query.resource 84 85 authzDecisionStatement.actions.append(Action()) 86 authzDecisionStatement.actions[-1].namespace = Action.GHPP_NS_URI 87 authzDecisionStatement.actions[-1].value = Action.HTTP_GET_ACTION 88 assertion.authzDecisionStatements.append(authzDecisionStatement) 89 90 # Add a conditions statement for a validity of 8 hours 91 assertion.conditions = Conditions() 92 assertion.conditions.notBefore = now 93 assertion.conditions.notOnOrAfter = now + timedelta(seconds=60*60*8) 94 95 assertion.subject = Subject() 96 assertion.subject.nameID = NameID() 97 assertion.subject.nameID.format = query.subject.nameID.format 98 assertion.subject.nameID.value = query.subject.nameID.value 99 100 assertion.issuer = Issuer() 101 assertion.issuer.format = Issuer.X509_SUBJECT 102 assertion.issuer.value = \ 103 TestAuthorisationServiceMiddleware.ISSUER_DN 104 105 response.assertions.append(assertion) 106 return response
107 108 return authzDecisionQuery
109 110
111 -class SOAPAuthzDecisionInterfaceMiddlewareTestCase( 112 WithPasteFixtureBaseTestCase):
113 CONFIG_FILENAME = 'authz-decision-interface.ini' 114 RESOURCE_URI = TestAuthorisationServiceMiddleware.RESOURCE_URI 115
116 - def _createAuthzDecisionQuery(self, 117 issuer="/O=Site A/CN=PEP", 118 subject="https://openid.localhost/philip.kershaw", 119 resource=RESOURCE_URI, 120 action=Action.HTTP_GET_ACTION, 121 actionNs=Action.GHPP_NS_URI):
122 query = AuthzDecisionQuery() 123 query.version = SAMLVersion(SAMLVersion.VERSION_20) 124 query.id = str(uuid4()) 125 query.issueInstant = datetime.utcnow() 126 127 query.issuer = Issuer() 128 query.issuer.format = Issuer.X509_SUBJECT 129 query.issuer.value = issuer 130 131 query.subject = Subject() 132 query.subject.nameID = NameID() 133 query.subject.nameID.format = "urn:ndg:saml:test:openid" 134 query.subject.nameID.value = subject 135 136 query.resource = resource 137 138 query.actions.append(Action()) 139 query.actions[0].namespace = actionNs 140 query.actions[0].value = action 141 142 return query
143
144 - def _makeRequest(self, query=None, **kw):
145 """Convenience method to construct queries for tests""" 146 147 if query is None: 148 query = self._createAuthzDecisionQuery(**kw) 149 150 elem = AuthzDecisionQueryElementTree.toXML(query) 151 soapRequest = SOAPEnvelope() 152 soapRequest.create() 153 soapRequest.body.elem.append(elem) 154 155 request = soapRequest.serialize() 156 157 return request
158
159 - def _getSAMLResponse(self, responseBody):
160 """Deserialise response string into ElementTree element""" 161 soapResponse = SOAPEnvelope() 162 163 responseStream = StringIO() 164 responseStream.write(responseBody) 165 responseStream.seek(0) 166 167 soapResponse.parse(responseStream) 168 169 print("Parsed response ...") 170 print(soapResponse.serialize()) 171 # print(prettyPrint(soapResponse.elem)) 172 173 response = ResponseElementTree.fromXML(soapResponse.body.elem[0]) 174 175 return response
176
177 - def test01AccessGranted(self):
178 query = self._createAuthzDecisionQuery() 179 request = self._makeRequest(query=query) 180 181 header = { 182 'soapAction': "http://www.oasis-open.org/committees/security", 183 'Content-length': str(len(request)), 184 'Content-type': 'text/xml' 185 } 186 response = self.app.post('/authorisationservice/', 187 params=request, 188 headers=header, 189 status=200) 190 print("Response status=%d" % response.status) 191 samlResponse = self._getSAMLResponse(response.body) 192 193 self.assert_(samlResponse.status.statusCode.value == \ 194 StatusCode.SUCCESS_URI) 195 self.assert_(samlResponse.inResponseTo == query.id) 196 self.assert_(samlResponse.assertions[0].subject.nameID.value == \ 197 query.subject.nameID.value) 198 self.assert_(samlResponse.assertions[0]) 199 self.assert_(samlResponse.assertions[0].authzDecisionStatements[0]) 200 self.assert_(samlResponse.assertions[0].authzDecisionStatements[0 201 ].decision == DecisionType.PERMIT)
202
203 - def test02AccessDenied(self):
204 query = self._createAuthzDecisionQuery(action=Action.HTTP_POST_ACTION) 205 request = self._makeRequest(query=query) 206 207 header = { 208 'soapAction': "http://www.oasis-open.org/committees/security", 209 'Content-length': str(len(request)), 210 'Content-type': 'text/xml' 211 } 212 response = self.app.post('/authorisationservice/', 213 params=request, 214 headers=header, 215 status=200) 216 print("Response status=%d" % response.status) 217 218 samlResponse = self._getSAMLResponse(response.body) 219 220 self.assert_(samlResponse.status.statusCode.value == \ 221 StatusCode.SUCCESS_URI) 222 self.assert_(samlResponse.inResponseTo == query.id) 223 self.assert_(samlResponse.assertions[0].subject.nameID.value == \ 224 query.subject.nameID.value) 225 self.assert_(samlResponse.assertions[0]) 226 self.assert_(samlResponse.assertions[0].authzDecisionStatements[0]) 227 self.assert_(samlResponse.assertions[0].authzDecisionStatements[0 228 ].decision == DecisionType.DENY)
229
231 query = self._createAuthzDecisionQuery( 232 resource=self.__class__.RESOURCE_URI + 'invalid') 233 request = self._makeRequest(query=query) 234 235 header = { 236 'soapAction': "http://www.oasis-open.org/committees/security", 237 'Content-length': str(len(request)), 238 'Content-type': 'text/xml' 239 } 240 response = self.app.post('/authorisationservice/', 241 params=request, 242 headers=header, 243 status=200) 244 print("Response status=%d" % response.status) 245 246 samlResponse = self._getSAMLResponse(response.body) 247 248 self.assert_(samlResponse.status.statusCode.value == \ 249 StatusCode.SUCCESS_URI) 250 self.assert_(samlResponse.inResponseTo == query.id) 251 self.assert_(samlResponse.assertions[0].subject.nameID.value == \ 252 query.subject.nameID.value) 253 self.assert_(samlResponse.assertions[0]) 254 self.assert_(samlResponse.assertions[0].authzDecisionStatements[0]) 255 self.assert_(samlResponse.assertions[0].authzDecisionStatements[0 256 ].decision == DecisionType.INDETERMINATE)
257 258 259 if __name__ == "__main__": 260 unittest.main() 261