1
2 """Unit tests for MyProxy WSGI Middleware classes and Application testing them
3 with Paster web application server. The server is started from __init__ method
4 of the Test Case class and then called by the unit test methods. The unit
5 test methods themselves using a bash script myproxy-ws-logon.sh to query the
6 MyProxy web application.
7 """
8 __author__ = "P J Kershaw"
9 __date__ = "25/05/10"
10 __copyright__ = "(C) 2010 Science and Technology Facilities Council"
11 __license__ = "BSD - see LICENSE file in top-level directory"
12 __contact__ = "Philip.Kershaw@stfc.ac.uk"
13 __revision__ = '$Id$'
14 from os import path, listdir, remove
15 from getpass import getpass
16 from ConfigParser import SafeConfigParser, NoOptionError
17 import subprocess
18 import unittest
19 import socket
20 import logging
21 logging.basicConfig(level=logging.DEBUG)
22
23 from OpenSSL import SSL, crypto
24
25 from myproxy.ws.test import (test_ca_dir, logon_shell_script_path,
26 get_trustroots_shell_script_path)
27 from myproxy.ws.test.server_utils import PasteDeployAppServer
28
29
31 """Test MyProxy Logon App WSGI in Paster web application server container
32 with bash shell script clients. For POSIX-like systems ONLY
33 """
34 THIS_DIR = path.abspath(path.dirname(__file__))
35 CA_DIRNAME = 'ca'
36 CA_DIR = test_ca_dir
37 CA_ENV_VARNAME = 'X509_CERT_DIR'
38
39 tmp_ca_dir = path.join(THIS_DIR, 'tmp_ca')
40
41
42
43
44 RESERVED_CA_DIR_FILENAMES = ('3d41aba9.0', )
45
46 INI_FILENAME = 'myproxywsgi.ini'
47 INI_FILEPATH = path.join(THIS_DIR, INI_FILENAME)
48 CONFIG_FILENAME = 'test_myproxywsgi.cfg'
49 CONFIG_FILEPATH = path.join(THIS_DIR, CONFIG_FILENAME)
50 SSLCERT_FILEPATH = 'localhost.crt'
51 SSLKEY_FILEPATH = 'localhost.key'
52
53 SERVICE_PORTNUM = 10443
54 LOGON_SCRIPT_CMD = logon_shell_script_path
55 LOGON_SCRIPT_USER_OPTNAME = '-l'
56 LOGON_SCRIPT_STDIN_PASS_OPTNAME = '-S'
57
58 SCRIPT_URI_OPTNAME = '-U'
59
60 GET_TRUSTROOTS_SCRIPT_CMD = get_trustroots_shell_script_path
61 GET_TRUSTROOTS_SCRIPT_BOOTSTRAP_OPTNAME = '-b'
62
64 """Read settings from a config file and create thread for paster
65 based MyProxy Web Service app running over HTTPS
66 """
67 super(MyProxyLogonAppWithPasterTestCase, self).__init__(*arg, **kw)
68 self.services = []
69 self.disableServiceStartup = False
70
71 self.cfg = SafeConfigParser({'here': self.__class__.THIS_DIR})
72 self.cfg.optionxform = str
73 self.cfg.read(self.__class__.CONFIG_FILEPATH)
74
75
76 self.addService(cfgFilePath=self.__class__.INI_FILEPATH,
77 port=self.__class__.SERVICE_PORTNUM,
78 withSSL=True,
79 withLoggingConfig=False)
80
82
83 optName = 'MyProxyLogonAppWithPasterTestCase.test02GetTrustRootsScript'
84 uri = self.cfg.get(optName, 'uri')
85
86 cmd = (
87 self.__class__.GET_TRUSTROOTS_SCRIPT_CMD,
88 "%s %s" % (self.__class__.SCRIPT_URI_OPTNAME, uri),
89 "%s" % self.__class__.GET_TRUSTROOTS_SCRIPT_BOOTSTRAP_OPTNAME
90 )
91
92 try:
93 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
94 stderr=subprocess.PIPE,
95 env={self.__class__.CA_ENV_VARNAME:
96 self.__class__.tmp_ca_dir})
97 except OSError, e:
98 self.failIf(e.errno == 13, 'Check that the %r script is set with '
99 'execute permissions' %
100 self.__class__.GET_TRUSTROOTS_SCRIPT_CMD)
101 raise
102
103 stdoutdata, stderrdata = proc.communicate()
104 self.failIf(len(stderrdata) > 0, "An error message was returned: %s" %
105 stderrdata)
106 print("stdout = %s" % stdoutdata)
107
109
110 optName = 'MyProxyLogonAppWithPasterTestCase.test02LogonScript'
111 username = self.cfg.get(optName, 'username')
112 try:
113 password = self.cfg.get(optName, 'password')
114 except NoOptionError:
115 password = getpass(optName + ' password: ')
116
117 uri = self.cfg.get(optName, 'uri')
118
119 cmd = (
120 self.__class__.LOGON_SCRIPT_CMD,
121 "%s %s"%(self.__class__.SCRIPT_URI_OPTNAME, uri),
122 "%s %s"%(self.__class__.LOGON_SCRIPT_USER_OPTNAME, username),
123 self.__class__.LOGON_SCRIPT_STDIN_PASS_OPTNAME
124 )
125
126 p1 = subprocess.Popen(["echo", password], stdout=subprocess.PIPE)
127 try:
128 p2 = subprocess.Popen(cmd, stdin=p1.stdout, stdout=subprocess.PIPE,
129 stderr=subprocess.PIPE,
130 env={self.__class__.CA_ENV_VARNAME:
131 self.__class__.tmp_ca_dir})
132 except OSError, e:
133 self.failIf(e.errno == 13, 'Check that the %r script is set with '
134 'execute permissions' % self.__class__.LOGON_SCRIPT_CMD)
135 raise
136
137 stdoutdata, stderrdata = p2.communicate()
138 self.failIf(len(stderrdata) > 0, "An error message was returned: %s" %
139 stderrdata)
140 print("stdout = %s" % stdoutdata)
141
142 cert = crypto.load_certificate(crypto.FILETYPE_PEM, stdoutdata)
143 subj = cert.get_subject()
144 self.assert_(subj)
145 self.assert_(subj.CN)
146 print("Returned certificate subject CN=%r" % subj)
147
149 """Utility for setting up threads to run Paste HTTP based services with
150 unit tests
151
152 @param arg: tuple contains ini file path setting for the service
153 @type arg: tuple
154 @param kw: keywords including "port" - port number to run the service
155 from
156 @type kw: dict
157 """
158 if self.disableServiceStartup:
159 return
160
161 withSSL = kw.pop('withSSL', False)
162 if withSSL:
163 certFilePath = path.join(self.__class__.THIS_DIR,
164 self.__class__.SSLCERT_FILEPATH)
165 priKeyFilePath = path.join(self.__class__.THIS_DIR,
166 self.__class__.SSLKEY_FILEPATH)
167
168 kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD)
169 kw['ssl_context'].set_options(SSL.OP_NO_SSLv2)
170
171 kw['ssl_context'].use_privatekey_file(priKeyFilePath)
172 kw['ssl_context'].use_certificate_file(certFilePath)
173
174 try:
175 self.services.append(PasteDeployAppServer(*arg, **kw))
176 self.services[-1].startThread()
177
178 except socket.error:
179 pass
180
182 """Stop any services started with the addService method and clean up
183 the CA directory following the trust roots call
184 """
185 if hasattr(self, 'services'):
186 for service in self.services:
187 service.terminateThread()
188
189 parentObj = super(MyProxyLogonAppWithPasterTestCase, self)
190 if hasattr(parentObj, '__del__'):
191 parentObj.__del__()
192
193 for fileName in listdir(self.__class__.CA_DIR):
194 if (fileName not in self.__class__.RESERVED_CA_DIR_FILENAMES and
195 fileName[0] != '.'):
196 filePath = path.join(self.__class__.CA_DIR, fileName)
197 remove(filePath)
198
199
200 if __name__ == "__main__":
201 unittest.main()
202