Package ndg :: Package saml :: Package test :: Package binding :: Package soap :: Module test_attributeservice
[hide private]

Source Code for Module ndg.saml.test.binding.soap.test_attributeservice

  1  #!/usr/bin/env python 
  2  """Unit tests for WSGI SAML 2.0 SOAP Attribute Query Interface 
  3   
  4  NERC DataGrid Project 
  5  """ 
  6  __author__ = "P J Kershaw" 
  7  __date__ = "21/08/09" 
  8  __copyright__ = "(C) 2009 Science and Technology Facilities Council" 
  9  __license__ = "http://www.apache.org/licenses/LICENSE-2.0" 
 10  __contact__ = "Philip.Kershaw@stfc.ac.uk" 
 11  __revision__ = '$Id: test_attributeservice.py 7953 2011-12-15 08:39:16Z pjkersha $' 
 12  import unittest 
 13  from uuid import uuid4 
 14  from datetime import datetime, timedelta 
 15  from cStringIO import StringIO 
 16   
 17  from ndg.soap.etree import SOAPEnvelope 
 18   
 19  from ndg.saml.saml2.core import (Assertion, Attribute, AttributeStatement,  
 20                                   SAMLVersion, Subject, NameID, Issuer,  
 21                                   AttributeQuery, XSStringAttributeValue,  
 22                                   Conditions, StatusCode) 
 23  from ndg.saml.xml import XMLConstants 
 24  from ndg.saml.xml.etree import AttributeQueryElementTree, ResponseElementTree 
 25  from ndg.saml.test.binding.soap import WithPasteFixtureBaseTestCase 
26 27 28 -class TestAttributeServiceMiddleware(object):
29 """Test Attribute Service interface stub""" 30 QUERY_INTERFACE_KEYNAME_OPTNAME = 'queryInterfaceKeyName' 31 ISSUER_DN = '/O=Test/OU=Attribute Service/CN=Service Stub' 32 33 FIRSTNAME_ATTRNAME = "urn:ndg:saml:firstname" 34 LASTNAME_ATTRNAME = "urn:ndg:saml:lastname" 35 EMAILADDRESS_ATTRNAME = "urn:ndg:saml:emailaddress" 36 37 VALID_QUERY_ISSUERS = ( 38 "/O=Site A/CN=Authorisation Service", 39 "/O=Site B/CN=Authorisation Service" 40 ) 41 VALID_SUBJECTS = ("https://openid.localhost/philip.kershaw", ) 42 VALID_ATTR_NAME_URIS = ( 43 FIRSTNAME_ATTRNAME, LASTNAME_ATTRNAME, EMAILADDRESS_ATTRNAME 44 ) 45
46 - def __init__(self, app, global_conf, **app_conf):
47 self.queryInterfaceKeyName = app_conf[ 48 self.__class__.QUERY_INTERFACE_KEYNAME_OPTNAME] 49 self._app = app 50 51 self.firstName = "Philip" 52 self.lastName = "Kershaw" 53 self.emailAddress = "pkershaw@somewhere.ac.uk"
54
55 - def __call__(self, environ, start_response):
56 environ[self.queryInterfaceKeyName] = self.attributeQueryFactory() 57 return self._app(environ, start_response)
58
59 - def attributeQueryFactory(self):
60 """Makes the attribute query method""" 61 62 def attributeQuery(query, response): 63 """Attribute Query interface called by the next middleware in the 64 stack the SAML SOAP Query interface middleware instance 65 (ndg.saml.saml2.binding.soap.server.wsgi.queryinterface.SOAPQueryInterfaceMiddleware) 66 """ 67 response.issueInstant = datetime.utcnow() 68 response.id = str(uuid4()) 69 response.inResponseTo = query.id 70 71 if query.issuer.value not in self.__class__.VALID_QUERY_ISSUERS: 72 response.status.statusCode.value = \ 73 StatusCode.REQUEST_DENIED_URI 74 return response 75 76 if query.subject.nameID.value not in self.__class__.VALID_SUBJECTS: 77 response.status.statusCode.value = \ 78 StatusCode.UNKNOWN_PRINCIPAL_URI 79 return response 80 81 assertion = Assertion() 82 83 assertion.version = SAMLVersion(SAMLVersion.VERSION_20) 84 assertion.id = str(uuid4()) 85 assertion.issueInstant = response.issueInstant 86 87 assertion.conditions = Conditions() 88 assertion.conditions.notBefore = assertion.issueInstant 89 assertion.conditions.notOnOrAfter = \ 90 assertion.conditions.notBefore + timedelta(seconds=60*60*8) 91 92 assertion.subject = Subject() 93 assertion.subject.nameID = NameID() 94 assertion.subject.nameID.format = query.subject.nameID.format 95 assertion.subject.nameID.value = query.subject.nameID.value 96 97 assertion.attributeStatements.append(AttributeStatement()) 98 99 for attribute in query.attributes: 100 if attribute.name == self.__class__.FIRSTNAME_ATTRNAME: 101 # special case handling for 'FirstName' attribute 102 fnAttribute = Attribute() 103 fnAttribute.name = attribute.name 104 fnAttribute.nameFormat = attribute.nameFormat 105 fnAttribute.friendlyName = attribute.friendlyName 106 107 firstName = XSStringAttributeValue() 108 firstName.value = self.firstName 109 fnAttribute.attributeValues.append(firstName) 110 111 assertion.attributeStatements[0].attributes.append( 112 fnAttribute) 113 114 elif attribute.name == self.__class__.LASTNAME_ATTRNAME: 115 lnAttribute = Attribute() 116 lnAttribute.name = attribute.name 117 lnAttribute.nameFormat = attribute.nameFormat 118 lnAttribute.friendlyName = attribute.friendlyName 119 120 lastName = XSStringAttributeValue() 121 lastName.value = self.lastName 122 lnAttribute.attributeValues.append(lastName) 123 124 assertion.attributeStatements[0].attributes.append( 125 lnAttribute) 126 127 elif (attribute.name == self.__class__.EMAILADDRESS_ATTRNAME and 128 query.issuer.value == 129 self.__class__.VALID_QUERY_ISSUERS[0]): 130 emailAddressAttribute = Attribute() 131 emailAddressAttribute.name = attribute.name 132 emailAddressAttribute.nameFormat = attribute.nameFormat 133 emailAddressAttribute.friendlyName = attribute.friendlyName 134 135 emailAddress = XSStringAttributeValue() 136 emailAddress.value = self.emailAddress 137 emailAddressAttribute.attributeValues.append(emailAddress) 138 139 assertion.attributeStatements[0].attributes.append( 140 emailAddressAttribute) 141 else: 142 response.status.statusCode.value = \ 143 StatusCode.INVALID_ATTR_NAME_VALUE_URI 144 return response 145 146 147 response.assertions.append(assertion) 148 response.status.statusCode.value = StatusCode.SUCCESS_URI 149 150 return response
151 152 return attributeQuery
153
154 155 -class SOAPAttributeInterfaceMiddlewareTestCase(WithPasteFixtureBaseTestCase):
156 """Test SAML Attribute Query over SOAP Binding querying a test attribute 157 server served using Paste Paster over HTTPS""" 158 CONFIG_FILENAME = 'attribute-interface.ini' 159 SERVICE_URI = '/attributeauthority' 160 161 @staticmethod
162 - def _createAttributeQuery(issuer="/O=Site A/CN=Authorisation Service", 163 subject="https://openid.localhost/philip.kershaw"):
164 """Helper to create a query""" 165 attributeQuery = AttributeQuery() 166 attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20) 167 attributeQuery.id = str(uuid4()) 168 attributeQuery.issueInstant = datetime.utcnow() 169 170 attributeQuery.issuer = Issuer() 171 attributeQuery.issuer.format = Issuer.X509_SUBJECT 172 attributeQuery.issuer.value = issuer 173 174 attributeQuery.subject = Subject() 175 attributeQuery.subject.nameID = NameID() 176 attributeQuery.subject.nameID.format = "urn:ndg:saml:test:openid" 177 attributeQuery.subject.nameID.value = subject 178 179 180 # special case handling for 'FirstName' attribute 181 fnAttribute = Attribute() 182 fnAttribute.name = TestAttributeServiceMiddleware.FIRSTNAME_ATTRNAME 183 fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string" 184 fnAttribute.friendlyName = "FirstName" 185 186 attributeQuery.attributes.append(fnAttribute) 187 188 # special case handling for 'LastName' attribute 189 lnAttribute = Attribute() 190 lnAttribute.name = TestAttributeServiceMiddleware.LASTNAME_ATTRNAME 191 lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string" 192 lnAttribute.friendlyName = "LastName" 193 194 attributeQuery.attributes.append(lnAttribute) 195 196 # special case handling for 'LastName' attribute 197 emailAddressAttribute = Attribute() 198 emailAddressAttribute.name = \ 199 TestAttributeServiceMiddleware.EMAILADDRESS_ATTRNAME 200 emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\ 201 XSStringAttributeValue.TYPE_LOCAL_NAME 202 emailAddressAttribute.friendlyName = "emailAddress" 203 204 attributeQuery.attributes.append(emailAddressAttribute) 205 206 return attributeQuery
207 208 @classmethod
209 - def _makeRequest(cls, attributeQuery=None, **kw):
210 """Convenience method to construct queries for tests""" 211 212 if attributeQuery is None: 213 attributeQuery = cls._createAttributeQuery(**kw) 214 215 elem = AttributeQueryElementTree.toXML(attributeQuery) 216 soapRequest = SOAPEnvelope() 217 soapRequest.create() 218 soapRequest.body.elem.append(elem) 219 220 request = soapRequest.serialize() 221 222 return request
223 224 @staticmethod
225 - def _getSAMLResponse(responseBody):
226 """Deserialise response string into ElementTree element""" 227 soapResponse = SOAPEnvelope() 228 229 responseStream = StringIO() 230 responseStream.write(responseBody) 231 responseStream.seek(0) 232 233 soapResponse.parse(responseStream) 234 235 print("Parsed response ...") 236 print(soapResponse.serialize()) 237 # print(prettyPrint(soapResponse.elem)) 238 239 response = ResponseElementTree.fromXML(soapResponse.body.elem[0]) 240 241 return response
242
243 - def test01ValidQuery(self):
244 attributeQuery = self._createAttributeQuery() 245 request = self._makeRequest(attributeQuery=attributeQuery) 246 247 header = { 248 'soapAction': "http://www.oasis-open.org/committees/security", 249 'Content-length': str(len(request)), 250 'Content-type': 'text/xml' 251 } 252 response = self.app.post(self.__class__.SERVICE_URI, 253 params=request, 254 headers=header, 255 status=200) 256 print("Response status=%d" % response.status) 257 samlResponse = self._getSAMLResponse(response.body) 258 259 self.assert_(samlResponse.status.statusCode.value == \ 260 StatusCode.SUCCESS_URI) 261 self.assert_(samlResponse.inResponseTo == attributeQuery.id) 262 self.assert_(samlResponse.assertions[0].subject.nameID.value == \ 263 attributeQuery.subject.nameID.value)
264
266 request = self._makeRequest(issuer="/O=Site B/CN=Authorisation Service") 267 268 header = { 269 'soapAction': "http://www.oasis-open.org/committees/security", 270 'Content-length': str(len(request)), 271 'Content-type': 'text/xml' 272 } 273 274 response = self.app.post(self.__class__.SERVICE_URI, 275 params=request, 276 headers=header, 277 status=200) 278 279 print("Response status=%d" % response.status) 280 281 samlResponse = self._getSAMLResponse(response.body) 282 283 self.assert_(samlResponse.status.statusCode.value == \ 284 StatusCode.INVALID_ATTR_NAME_VALUE_URI)
285
287 attributeQuery = self._createAttributeQuery() 288 289 # Add an unsupported Attribute name 290 attribute = Attribute() 291 attribute.name = "urn:my:attribute" 292 attribute.nameFormat = XMLConstants.XSD_NS+"#"+\ 293 XSStringAttributeValue.TYPE_LOCAL_NAME 294 attribute.friendlyName = "myAttribute" 295 attributeQuery.attributes.append(attribute) 296 297 request = self._makeRequest(attributeQuery=attributeQuery) 298 299 header = { 300 'soapAction': "http://www.oasis-open.org/committees/security", 301 'Content-length': str(len(request)), 302 'Content-type': 'text/xml' 303 } 304 305 response = self.app.post(self.__class__.SERVICE_URI, 306 params=request, 307 headers=header, 308 status=200) 309 310 print("Response status=%d" % response.status) 311 312 samlResponse = self._getSAMLResponse(response.body) 313 314 self.assert_(samlResponse.status.statusCode.value == \ 315 StatusCode.INVALID_ATTR_NAME_VALUE_URI)
316
317 - def test04InvalidQueryIssuer(self):
318 request = self._makeRequest(issuer="/CN=My Attribute Query Issuer") 319 320 header = { 321 'soapAction': "http://www.oasis-open.org/committees/security", 322 'Content-length': str(len(request)), 323 'Content-type': 'text/xml' 324 } 325 326 response = self.app.post(self.__class__.SERVICE_URI, 327 params=request, 328 headers=header, 329 status=200) 330 331 print("Response status=%d" % response.status) 332 333 samlResponse = self._getSAMLResponse(response.body) 334 335 self.assert_(samlResponse.status.statusCode.value == \ 336 StatusCode.REQUEST_DENIED_URI)
337
338 - def test05UnknownPrincipal(self):
339 request = self._makeRequest(subject="Joe.Bloggs") 340 341 header = { 342 'soapAction': "http://www.oasis-open.org/committees/security", 343 'Content-length': str(len(request)), 344 'Content-type': 'text/xml' 345 } 346 347 response = self.app.post(self.__class__.SERVICE_URI, 348 params=request, 349 headers=header, 350 status=200) 351 352 print("Response status=%d" % response.status) 353 354 samlResponse = self._getSAMLResponse(response.body) 355 356 self.assert_(samlResponse.status.statusCode.value == \ 357 StatusCode.UNKNOWN_PRINCIPAL_URI)
358 359 360 if __name__ == "__main__": 361 unittest.main() 362