Package myproxy :: Package ws :: Package test :: Module test_myproxywsgi_with_paster
[hide private]

Source Code for Module myproxy.ws.test.test_myproxywsgi_with_paster

  1  #!/usr/bin/env python 
  2  """Unit tests for MyProxy WSGI Middleware classes and Application testing them 
  3  with Paster web application server.  The server is started from __init__ method 
  4  of the Test Case class and then called by the unit test methods.  The unit 
  5  test methods themselves using a bash script myproxy-ws-logon.sh to query the  
  6  MyProxy web application. 
  7  """ 
  8  __author__ = "P J Kershaw" 
  9  __date__ = "25/05/10" 
 10  __copyright__ = "(C) 2010 Science and Technology Facilities Council" 
 11  __license__ = "BSD - see LICENSE file in top-level directory" 
 12  __contact__ = "Philip.Kershaw@stfc.ac.uk" 
 13  __revision__ = '$Id$' 
 14  from os import path, listdir, remove 
 15  from getpass import getpass 
 16  from ConfigParser import SafeConfigParser, NoOptionError 
 17  import subprocess 
 18  import unittest 
 19  import socket 
 20  import logging 
 21  logging.basicConfig(level=logging.DEBUG) 
 22   
 23  from OpenSSL import SSL, crypto 
 24   
 25  from myproxy.ws.test import (test_ca_dir, logon_shell_script_path, 
 26                               get_trustroots_shell_script_path) 
 27  from myproxy.ws.test.server_utils import PasteDeployAppServer 
 28           
 29   
30 -class MyProxyLogonAppWithPasterTestCase(unittest.TestCase):
31 """Test MyProxy Logon App WSGI in Paster web application server container 32 with bash shell script clients. For POSIX-like systems ONLY 33 """ 34 THIS_DIR = path.abspath(path.dirname(__file__)) 35 CA_DIRNAME = 'ca' 36 CA_DIR = test_ca_dir 37 CA_ENV_VARNAME = 'X509_CERT_DIR' 38 39 tmp_ca_dir = path.join(THIS_DIR, 'tmp_ca') 40 41 # CA files retrieved by the getTrustRoots unittest are cleared out 42 # afterwards by this classes' __del__' method but some CA file(s) need to be 43 # reserved to allow verification of the paster web service's SSL certificate 44 RESERVED_CA_DIR_FILENAMES = ('3d41aba9.0', ) 45 46 INI_FILENAME = 'myproxywsgi.ini' 47 INI_FILEPATH = path.join(THIS_DIR, INI_FILENAME) 48 CONFIG_FILENAME = 'test_myproxywsgi.cfg' 49 CONFIG_FILEPATH = path.join(THIS_DIR, CONFIG_FILENAME) 50 SSLCERT_FILEPATH = 'localhost.crt' 51 SSLKEY_FILEPATH = 'localhost.key' 52 53 SERVICE_PORTNUM = 10443 54 LOGON_SCRIPT_CMD = logon_shell_script_path 55 LOGON_SCRIPT_USER_OPTNAME = '-l' 56 LOGON_SCRIPT_STDIN_PASS_OPTNAME = '-S' 57 58 SCRIPT_URI_OPTNAME = '-U' 59 60 GET_TRUSTROOTS_SCRIPT_CMD = get_trustroots_shell_script_path 61 GET_TRUSTROOTS_SCRIPT_BOOTSTRAP_OPTNAME = '-b' 62
63 - def __init__(self, *arg, **kw):
64 """Read settings from a config file and create thread for paster 65 based MyProxy Web Service app running over HTTPS 66 """ 67 super(MyProxyLogonAppWithPasterTestCase, self).__init__(*arg, **kw) 68 self.services = [] 69 self.disableServiceStartup = False 70 71 self.cfg = SafeConfigParser({'here': self.__class__.THIS_DIR}) 72 self.cfg.optionxform = str 73 self.cfg.read(self.__class__.CONFIG_FILEPATH) 74 75 # Start the MyProxy web service 76 self.addService(cfgFilePath=self.__class__.INI_FILEPATH, 77 port=self.__class__.SERVICE_PORTNUM, 78 withSSL=True, 79 withLoggingConfig=False)
80
82 # Test curl/base64 based client script 83 optName = 'MyProxyLogonAppWithPasterTestCase.test02GetTrustRootsScript' 84 uri = self.cfg.get(optName, 'uri') 85 86 cmd = ( 87 self.__class__.GET_TRUSTROOTS_SCRIPT_CMD, 88 "%s %s" % (self.__class__.SCRIPT_URI_OPTNAME, uri), 89 "%s" % self.__class__.GET_TRUSTROOTS_SCRIPT_BOOTSTRAP_OPTNAME 90 ) 91 92 try: 93 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, 94 stderr=subprocess.PIPE, 95 env={self.__class__.CA_ENV_VARNAME: 96 self.__class__.tmp_ca_dir}) 97 except OSError, e: 98 self.failIf(e.errno == 13, 'Check that the %r script is set with ' 99 'execute permissions' % 100 self.__class__.GET_TRUSTROOTS_SCRIPT_CMD) 101 raise 102 103 stdoutdata, stderrdata = proc.communicate() 104 self.failIf(len(stderrdata) > 0, "An error message was returned: %s" % 105 stderrdata) 106 print("stdout = %s" % stdoutdata)
107
108 - def test02LogonScript(self):
109 # Test curl/openssl based client script access 110 optName = 'MyProxyLogonAppWithPasterTestCase.test02LogonScript' 111 username = self.cfg.get(optName, 'username') 112 try: 113 password = self.cfg.get(optName, 'password') 114 except NoOptionError: 115 password = getpass(optName + ' password: ') 116 117 uri = self.cfg.get(optName, 'uri') 118 119 cmd = ( 120 self.__class__.LOGON_SCRIPT_CMD, 121 "%s %s"%(self.__class__.SCRIPT_URI_OPTNAME, uri), 122 "%s %s"%(self.__class__.LOGON_SCRIPT_USER_OPTNAME, username), 123 self.__class__.LOGON_SCRIPT_STDIN_PASS_OPTNAME 124 ) 125 126 p1 = subprocess.Popen(["echo", password], stdout=subprocess.PIPE) 127 try: 128 p2 = subprocess.Popen(cmd, stdin=p1.stdout, stdout=subprocess.PIPE, 129 stderr=subprocess.PIPE, 130 env={self.__class__.CA_ENV_VARNAME: 131 self.__class__.tmp_ca_dir}) 132 except OSError, e: 133 self.failIf(e.errno == 13, 'Check that the %r script is set with ' 134 'execute permissions' % self.__class__.LOGON_SCRIPT_CMD) 135 raise 136 137 stdoutdata, stderrdata = p2.communicate() 138 self.failIf(len(stderrdata) > 0, "An error message was returned: %s" % 139 stderrdata) 140 print("stdout = %s" % stdoutdata) 141 142 cert = crypto.load_certificate(crypto.FILETYPE_PEM, stdoutdata) 143 subj = cert.get_subject() 144 self.assert_(subj) 145 self.assert_(subj.CN) 146 print("Returned certificate subject CN=%r" % subj)
147
148 - def addService(self, *arg, **kw):
149 """Utility for setting up threads to run Paste HTTP based services with 150 unit tests 151 152 @param arg: tuple contains ini file path setting for the service 153 @type arg: tuple 154 @param kw: keywords including "port" - port number to run the service 155 from 156 @type kw: dict 157 """ 158 if self.disableServiceStartup: 159 return 160 161 withSSL = kw.pop('withSSL', False) 162 if withSSL: 163 certFilePath = path.join(self.__class__.THIS_DIR, 164 self.__class__.SSLCERT_FILEPATH) 165 priKeyFilePath = path.join(self.__class__.THIS_DIR, 166 self.__class__.SSLKEY_FILEPATH) 167 168 kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD) 169 kw['ssl_context'].set_options(SSL.OP_NO_SSLv2) 170 171 kw['ssl_context'].use_privatekey_file(priKeyFilePath) 172 kw['ssl_context'].use_certificate_file(certFilePath) 173 174 try: 175 self.services.append(PasteDeployAppServer(*arg, **kw)) 176 self.services[-1].startThread() 177 178 except socket.error: 179 pass
180
181 - def __del__(self):
182 """Stop any services started with the addService method and clean up 183 the CA directory following the trust roots call 184 """ 185 if hasattr(self, 'services'): 186 for service in self.services: 187 service.terminateThread() 188 189 parentObj = super(MyProxyLogonAppWithPasterTestCase, self) 190 if hasattr(parentObj, '__del__'): 191 parentObj.__del__() 192 193 for fileName in listdir(self.__class__.CA_DIR): 194 if (fileName not in self.__class__.RESERVED_CA_DIR_FILENAMES and 195 fileName[0] != '.'): 196 filePath = path.join(self.__class__.CA_DIR, fileName) 197 remove(filePath)
198 199 200 if __name__ == "__main__": 201 unittest.main() 202