1 """ndg_httpsclient - module containing SSL peer verification class.
2 """
3 __author__ = "P J Kershaw (STFC)"
4 __date__ = "09/12/11"
5 __copyright__ = "(C) 2012 Science and Technology Facilities Council"
6 __license__ = "BSD - see LICENSE file in top-level directory"
7 __contact__ = "Philip.Kershaw@stfc.ac.uk"
8 __revision__ = '$Id$'
9 import re
10 import logging
11 log = logging.getLogger(__name__)
12
13 try:
14 from ndg.httpsclient.subj_alt_name import SubjectAltName
15 from pyasn1.codec.der import decoder as der_decoder
16 SUBJ_ALT_NAME_SUPPORT = True
17 except ImportError, e:
18 SUBJ_ALT_NAME_SUPPORT = False
19 SUBJ_ALT_NAME_SUPPORT_MSG = (
20 'SubjectAltName support is disabled - check pyasn1 package '
21 'installation to enable'
22 )
23 import warnings
24 warnings.warn(SUBJ_ALT_NAME_SUPPORT_MSG)
28 """Check server identity. If hostname doesn't match, allow match of
29 host's Distinguished Name against server DN setting"""
30 DN_LUT = {
31 'commonName': 'CN',
32 'organisationalUnitName': 'OU',
33 'organisation': 'O',
34 'countryName': 'C',
35 'emailAddress': 'EMAILADDRESS',
36 'localityName': 'L',
37 'stateOrProvinceName': 'ST',
38 'streetAddress': 'STREET',
39 'domainComponent': 'DC',
40 'userid': 'UID'
41 }
42 SUBJ_ALT_NAME_EXT_NAME = 'subjectAltName'
43 PARSER_RE_STR = '/(%s)=' % '|'.join(DN_LUT.keys() + DN_LUT.values())
44 PARSER_RE = re.compile(PARSER_RE_STR)
45
46 __slots__ = ('__hostname', '__certDN', '__subj_alt_name_match')
47
49 """Override parent class __init__ to enable setting of certDN
50 setting
51
52 @type certDN: string
53 @param certDN: Set the expected Distinguished Name of the
54 server to avoid errors matching hostnames. This is useful
55 where the hostname is not fully qualified
56 @type hostname: string
57 @param hostname: hostname to match against peer certificate
58 subjectAltNames or subject common name
59 @type subj_alt_name_match: bool
60 @param subj_alt_name_match: flag to enable/disable matching of hostname
61 against peer certificate subjectAltNames. Nb. A setting of True will
62 be ignored if the pyasn1 package is not installed
63 """
64 self.__certDN = None
65 self.__hostname = None
66
67 if certDN is not None:
68 self.certDN = certDN
69
70 if hostname is not None:
71 self.hostname = hostname
72
73 if subj_alt_name_match:
74 if not SUBJ_ALT_NAME_SUPPORT:
75 log.warning('Overriding "subj_alt_name_match" keyword setting: '
76 'peer verification with subjectAltNames is disabled')
77 self.__subj_alt_name_match = False
78 else:
79 self.__subj_alt_name_match = True
80 else:
81 log.debug('Disabling peer verification with subject '
82 'subjectAltNames!')
83 self.__subj_alt_name_match = False
84
87 """Verify server certificate
88
89 @type connection: OpenSSL.SSL.Connection
90 @param connection: SSL connection object
91 @type peerCert: basestring
92 @param peerCert: server host certificate as OpenSSL.crypto.X509
93 instance
94 @type errorStatus: int
95 @param errorStatus: error status passed from caller. This is the value
96 returned by the OpenSSL C function X509_STORE_CTX_get_error(). Look-up
97 x509_vfy.h in the OpenSSL source to get the meanings of the different
98 codes. PyOpenSSL doesn't help you!
99 @type errorDepth: int
100 @param errorDepth: a non-negative integer representing where in the
101 certificate chain the error occurred. If it is zero it occured in the
102 end entity certificate, one if it is the certificate which signed the
103 end entity certificate and so on.
104
105 @type preverifyOK: int
106 @param preverifyOK: the error status - 0 = Error, 1 = OK of the current
107 SSL context irrespective of any verification checks done here. If this
108 function yields an OK status, it should enforce the preverifyOK value
109 so that any error set upstream overrides and is honoured.
110 @rtype: int
111 @return: status code - 0/False = Error, 1/True = OK
112 """
113 if peerCert.has_expired():
114
115 log.error('Certificate %r in peer certificate chain has expired',
116 peerCert.get_subject())
117
118 return False
119
120 elif errorDepth == 0:
121
122
123 peerCertSubj = peerCert.get_subject()
124 peerCertDN = peerCertSubj.get_components()
125 peerCertDN.sort()
126
127 if self.certDN is None:
128
129 if self.hostname is None:
130 log.error('No "hostname" or "certDN" set to check peer '
131 'certificate against')
132 return False
133
134
135 if self.__subj_alt_name_match:
136 dns_names = self._get_subj_alt_name(peerCert)
137 if self.hostname in dns_names:
138 return preverifyOK
139
140
141 if peerCertSubj.commonName == self.hostname:
142 return preverifyOK
143 else:
144 log.error('Peer certificate CN %r doesn\'t match the '
145 'expected CN %r', peerCertSubj.commonName,
146 self.hostname)
147 return False
148 else:
149 if peerCertDN == self.certDN:
150 return preverifyOK
151 else:
152 log.error('Peer certificate DN %r doesn\'t match the '
153 'expected DN %r', peerCertDN, self.certDN)
154 return False
155 else:
156 return preverifyOK
157
158 @classmethod
160 '''Extract subjectAltName DNS name settings from certificate extensions
161
162 @param peer_cert: peer certificate in SSL connection. subjectAltName
163 settings if any will be extracted from this
164 @type peer_cert: OpenSSL.crypto.X509
165 '''
166
167 dns_name = []
168 general_names = SubjectAltName()
169 for i in range(peer_cert.get_extension_count()):
170 ext = peer_cert.get_extension(i)
171 ext_name = ext.get_short_name()
172 if ext_name == cls.SUBJ_ALT_NAME_EXT_NAME:
173
174 ext_dat = ext.get_data()
175 decoded_dat = der_decoder.decode(ext_dat,
176 asn1Spec=general_names)
177
178 for name in decoded_dat:
179 if isinstance(name, SubjectAltName):
180 for entry in range(len(name)):
181 component = name.getComponentByPosition(entry)
182 dns_name.append(str(component.getComponent()))
183
184 return dns_name
185
188
190 if isinstance(val, basestring):
191
192 certDN = val.strip('"')
193
194 dnFields = self.__class__.PARSER_RE.split(certDN)
195 if len(dnFields) < 2:
196 raise TypeError('Error parsing DN string: "%s"' % certDN)
197
198 self.__certDN = zip(dnFields[1::2], dnFields[2::2])
199 self.__certDN.sort()
200
201 elif not isinstance(val, list):
202 for i in val:
203 if not len(i) == 2:
204 raise TypeError('Expecting list of two element DN field, '
205 'DN field value pairs for "certDN" '
206 'attribute')
207 self.__certDN = val
208 else:
209 raise TypeError('Expecting list or string type for "certDN" '
210 'attribute')
211
212 certDN = property(fget=_getCertDN,
213 fset=_setCertDN,
214 doc="Distinguished Name for Server Certificate")
215
216
219
221 if not isinstance(val, basestring):
222 raise TypeError("Expecting string type for hostname "
223 "attribute")
224 self.__hostname = val
225
226 hostname = property(fget=_getHostname,
227 fset=_setHostname,
228 doc="hostname of server")
229