Home | Trees | Indices | Help |
|
---|
|
1 #!/usr/bin/env python 2 """SOAP module unit test module 3 4 NERC DataGrid Project 5 """ 6 __author__ = "P J Kershaw" 7 __date__ = "24/07/09" 8 __copyright__ = "(C) 2009 Science and Technology Facilities Council" 9 __contact__ = "Philip.Kershaw@stfc.ac.uk" 10 __license__ = "http://www.apache.org/licenses/LICENSE-2.0" 11 __contact__ = "Philip.Kershaw@stfc.ac.uk" 12 __revision__ = "$Id: test_soap.py 7134 2010-06-30 13:49:40Z pjkersha $" 13 import logging 14 logging.basicConfig(level=logging.DEBUG) 15 16 import unittest 17 import socket 18 from cStringIO import StringIO 19 from os import path 20 import paste.fixture 21 from urllib2 import HTTPHandler, URLError 22 23 from ndg.soap import SOAPFaultBase 24 from ndg.soap.etree import SOAPEnvelope, SOAPFault, SOAPFaultException 25 from ndg.soap.client import UrlLib2SOAPClient, UrlLib2SOAPRequest 26 from ndg.soap.test import PasteDeployAppServer 27 2830 """Simple WSGI interface for SOAP service""" 3146 4733 requestFile = environ['wsgi.input'] 34 35 print("Server received request from client:\n\n%s" % 36 requestFile.read()) 37 38 soapResponse = SOAPEnvelope() 39 soapResponse.create() 40 41 response = soapResponse.serialize() 42 start_response("200 OK", 43 [('Content-length', str(len(response))), 44 ('Content-type', 'text/xml')]) 45 return [response]49 EG_SOAPFAULT_CODE = "%s:%s" % (SOAPFaultBase.DEFAULT_ELEMENT_NS_PREFIX, 50 "MustUnderstand") 51 EG_SOAPFAULT_STRING = "Can't process element X set with mustUnderstand" 52142 14354 envelope = SOAPEnvelope() 55 envelope.create() 56 soap = envelope.serialize() 57 58 self.assert_(len(soap) > 0) 59 self.assert_("Envelope" in soap) 60 self.assert_("Body" in soap) 61 self.assert_("Header" in soap) 62 63 print(envelope.prettyPrint()) 64 stream = StringIO() 65 stream.write(soap) 66 stream.seek(0) 67 68 envelope2 = SOAPEnvelope() 69 envelope2.parse(stream) 70 soap2 = envelope2.serialize() 71 self.assert_(soap2 == soap)7274 75 fault = SOAPFaultBase(self.__class__.EG_SOAPFAULT_STRING, 76 self.__class__.EG_SOAPFAULT_CODE) 77 78 self.assert_(fault.faultCode == self.__class__.EG_SOAPFAULT_CODE) 79 self.assert_(fault.faultString == self.__class__.EG_SOAPFAULT_STRING)8082 fault = SOAPFault(self.__class__.EG_SOAPFAULT_STRING, 83 self.__class__.EG_SOAPFAULT_CODE) 84 fault.create() 85 return fault8688 # Use ElementTree implementation 89 fault = self._createSOAPFault() 90 faultStr = fault.serialize() 91 print(faultStr) 92 self.assert_(self.__class__.EG_SOAPFAULT_STRING in faultStr)9395 fault = self._createSOAPFault() 96 faultStr = fault.serialize() 97 stream = StringIO() 98 stream.write(faultStr) 99 stream.seek(0) 100 101 fault2 = SOAPFault() 102 fault2.parse(stream) 103 self.assert_(fault2.faultCode == fault.faultCode) 104 self.assert_(fault2.faultString == fault.faultString)105107 try: 108 raise SOAPFaultException("bad request", SOAPFault.CLIENT_FAULT_CODE) 109 110 except SOAPFaultException, e: 111 self.assert_(e.fault.faultString == "bad request") 112 self.assert_(SOAPFault.CLIENT_FAULT_CODE in e.fault.faultCode) 113 e.fault.create() 114 self.assert_("bad request" in e.fault.serialize()) 115 self.assert_(SOAPFault.CLIENT_FAULT_CODE in e.fault.serialize()) 116 return 117 118 self.fail("Expecting SOAPFaultException raised")119121 # Create full SOAP Response containing a SOAP Fault 122 envelope = SOAPEnvelope() 123 envelope.body.fault = self._createSOAPFault() 124 envelope.create() 125 soap = envelope.serialize() 126 127 self.assert_(len(soap) > 0) 128 self.assert_("Envelope" in soap) 129 self.assert_("Body" in soap) 130 self.assert_("Header" in soap) 131 self.assert_("Fault" in soap) 132 133 print(envelope.prettyPrint()) 134 stream = StringIO() 135 stream.write(soap) 136 stream.seek(0) 137 138 envelope2 = SOAPEnvelope() 139 envelope2.parse(stream) 140 soap2 = envelope2.serialize() 141 self.assert_(soap2 == soap)145 SOAP_SERVICE_PORTNUM = 10080 146 ENDPOINT = 'http://localhost:%d/soap' % SOAP_SERVICE_PORTNUM 147 THIS_DIR = path.abspath(path.dirname(__file__)) 148222 223 224 if __name__ == "__main__": 225 unittest.main() 226150 """Use paste.fixture to test client/server SOAP interface""" 151 self.services = [] 152 self.disableServiceStartup = False 153 154 wsgiApp = SOAPBindingMiddleware() 155 self.app = paste.fixture.TestApp(wsgiApp) 156 157 super(SOAPServiceTestCase, self).__init__(*args, **kwargs)158160 requestEnvelope = SOAPEnvelope() 161 requestEnvelope.create() 162 request = requestEnvelope.serialize() 163 164 response = self.app.post('/my-soap-endpoint', 165 params=request, 166 status=200) 167 print(response.headers) 168 print(response.status) 169 print(response.body)170172 173 # Paster based service is threaded from this call 174 self.addService(app=SOAPBindingMiddleware(), 175 port=self.__class__.SOAP_SERVICE_PORTNUM) 176 177 client = UrlLib2SOAPClient() 178 179 # ElementTree based envelope class 180 client.responseEnvelopeClass = SOAPEnvelope 181 182 request = UrlLib2SOAPRequest() 183 request.url = self.__class__.ENDPOINT 184 request.envelope = SOAPEnvelope() 185 request.envelope.create() 186 187 client.openerDirector.add_handler(HTTPHandler()) 188 try: 189 response = client.send(request) 190 except URLError, e: 191 self.fail("soap_server.py must be running for this test") 192 193 print("Response from server:\n\n%s" % response.envelope.serialize())194196 """Utility for setting up threads to run Paste HTTP based services with 197 unit tests 198 199 @param arg: tuple contains ini file path setting for the service 200 @type arg: tuple 201 @param kw: keywords including "port" - port number to run the service 202 from 203 @type kw: dict 204 """ 205 if self.disableServiceStartup: 206 return 207 208 try: 209 self.services.append(PasteDeployAppServer(*arg, **kw)) 210 self.services[-1].startThread() 211 212 except socket.error: 213 pass214216 """Stop any services started with the addService method and clean up 217 the CA directory following the trust roots call 218 """ 219 if hasattr(self, 'services'): 220 for service in self.services: 221 service.terminateThread()
Home | Trees | Indices | Help |
|
---|
Generated by Epydoc 3.0.1 on Wed Apr 4 22:19:49 2012 | http://epydoc.sourceforge.net |