Package ndg :: Package httpsclient :: Module ssl_peer_verification
[hide private]

Source Code for Module ndg.httpsclient.ssl_peer_verification

  1  """ndg_httpsclient - module containing SSL peer verification class. 
  2  """ 
  3  __author__ = "P J Kershaw (STFC)" 
  4  __date__ = "09/12/11" 
  5  __copyright__ = "(C) 2012 Science and Technology Facilities Council" 
  6  __license__ = "BSD - see LICENSE file in top-level directory" 
  7  __contact__ = "Philip.Kershaw@stfc.ac.uk" 
  8  __revision__ = '$Id$' 
  9  import re 
 10  import logging 
 11  log = logging.getLogger(__name__) 
 12   
 13  try: 
 14      from ndg.httpsclient.subj_alt_name import SubjectAltName 
 15      from pyasn1.codec.der import decoder as der_decoder 
 16      SUBJ_ALT_NAME_SUPPORT = True 
 17  except ImportError, e: 
 18      SUBJ_ALT_NAME_SUPPORT = False 
 19      SUBJ_ALT_NAME_SUPPORT_MSG = ( 
 20          'SubjectAltName support is disabled - check pyasn1 package ' 
 21          'installation to enable' 
 22      ) 
 23      import warnings 
 24      warnings.warn(SUBJ_ALT_NAME_SUPPORT_MSG) 
25 26 27 -class ServerSSLCertVerification(object):
28 """Check server identity. If hostname doesn't match, allow match of 29 host's Distinguished Name against server DN setting""" 30 DN_LUT = { 31 'commonName': 'CN', 32 'organisationalUnitName': 'OU', 33 'organisation': 'O', 34 'countryName': 'C', 35 'emailAddress': 'EMAILADDRESS', 36 'localityName': 'L', 37 'stateOrProvinceName': 'ST', 38 'streetAddress': 'STREET', 39 'domainComponent': 'DC', 40 'userid': 'UID' 41 } 42 SUBJ_ALT_NAME_EXT_NAME = 'subjectAltName' 43 PARSER_RE_STR = '/(%s)=' % '|'.join(DN_LUT.keys() + DN_LUT.values()) 44 PARSER_RE = re.compile(PARSER_RE_STR) 45 46 __slots__ = ('__hostname', '__certDN', '__subj_alt_name_match') 47
48 - def __init__(self, certDN=None, hostname=None, subj_alt_name_match=True):
49 """Override parent class __init__ to enable setting of certDN 50 setting 51 52 @type certDN: string 53 @param certDN: Set the expected Distinguished Name of the 54 server to avoid errors matching hostnames. This is useful 55 where the hostname is not fully qualified 56 @type hostname: string 57 @param hostname: hostname to match against peer certificate 58 subjectAltNames or subject common name 59 @type subj_alt_name_match: bool 60 @param subj_alt_name_match: flag to enable/disable matching of hostname 61 against peer certificate subjectAltNames. Nb. A setting of True will 62 be ignored if the pyasn1 package is not installed 63 """ 64 self.__certDN = None 65 self.__hostname = None 66 67 if certDN is not None: 68 self.certDN = certDN 69 70 if hostname is not None: 71 self.hostname = hostname 72 73 if subj_alt_name_match: 74 if not SUBJ_ALT_NAME_SUPPORT: 75 log.warning('Overriding "subj_alt_name_match" keyword setting: ' 76 'peer verification with subjectAltNames is disabled') 77 self.__subj_alt_name_match = False 78 else: 79 self.__subj_alt_name_match = True 80 else: 81 log.debug('Disabling peer verification with subject ' 82 'subjectAltNames!') 83 self.__subj_alt_name_match = False
84
85 - def __call__(self, connection, peerCert, errorStatus, errorDepth, 86 preverifyOK):
87 """Verify server certificate 88 89 @type connection: OpenSSL.SSL.Connection 90 @param connection: SSL connection object 91 @type peerCert: basestring 92 @param peerCert: server host certificate as OpenSSL.crypto.X509 93 instance 94 @type errorStatus: int 95 @param errorStatus: error status passed from caller. This is the value 96 returned by the OpenSSL C function X509_STORE_CTX_get_error(). Look-up 97 x509_vfy.h in the OpenSSL source to get the meanings of the different 98 codes. PyOpenSSL doesn't help you! 99 @type errorDepth: int 100 @param errorDepth: a non-negative integer representing where in the 101 certificate chain the error occurred. If it is zero it occured in the 102 end entity certificate, one if it is the certificate which signed the 103 end entity certificate and so on. 104 105 @type preverifyOK: int 106 @param preverifyOK: the error status - 0 = Error, 1 = OK of the current 107 SSL context irrespective of any verification checks done here. If this 108 function yields an OK status, it should enforce the preverifyOK value 109 so that any error set upstream overrides and is honoured. 110 @rtype: int 111 @return: status code - 0/False = Error, 1/True = OK 112 """ 113 if peerCert.has_expired(): 114 # Any expired certificate in the chain should result in an error 115 log.error('Certificate %r in peer certificate chain has expired', 116 peerCert.get_subject()) 117 118 return False 119 120 elif errorDepth == 0: 121 # Only interested in DN of last certificate in the chain - this must 122 # match the expected Server DN setting 123 peerCertSubj = peerCert.get_subject() 124 peerCertDN = peerCertSubj.get_components() 125 peerCertDN.sort() 126 127 if self.certDN is None: 128 # Check hostname against peer certificate CN field instead: 129 if self.hostname is None: 130 log.error('No "hostname" or "certDN" set to check peer ' 131 'certificate against') 132 return False 133 134 # Check for subject alternative names 135 if self.__subj_alt_name_match: 136 dns_names = self._get_subj_alt_name(peerCert) 137 if self.hostname in dns_names: 138 return preverifyOK 139 140 # If no subjectAltNames, default to check of subject Common Name 141 if peerCertSubj.commonName == self.hostname: 142 return preverifyOK 143 else: 144 log.error('Peer certificate CN %r doesn\'t match the ' 145 'expected CN %r', peerCertSubj.commonName, 146 self.hostname) 147 return False 148 else: 149 if peerCertDN == self.certDN: 150 return preverifyOK 151 else: 152 log.error('Peer certificate DN %r doesn\'t match the ' 153 'expected DN %r', peerCertDN, self.certDN) 154 return False 155 else: 156 return preverifyOK
157 158 @classmethod
159 - def _get_subj_alt_name(cls, peer_cert):
160 '''Extract subjectAltName DNS name settings from certificate extensions 161 162 @param peer_cert: peer certificate in SSL connection. subjectAltName 163 settings if any will be extracted from this 164 @type peer_cert: OpenSSL.crypto.X509 165 ''' 166 # Search through extensions 167 dns_name = [] 168 general_names = SubjectAltName() 169 for i in range(peer_cert.get_extension_count()): 170 ext = peer_cert.get_extension(i) 171 ext_name = ext.get_short_name() 172 if ext_name == cls.SUBJ_ALT_NAME_EXT_NAME: 173 # PyOpenSSL returns extension data in ASN.1 encoded form 174 ext_dat = ext.get_data() 175 decoded_dat = der_decoder.decode(ext_dat, 176 asn1Spec=general_names) 177 178 for name in decoded_dat: 179 if isinstance(name, SubjectAltName): 180 for entry in range(len(name)): 181 component = name.getComponentByPosition(entry) 182 dns_name.append(str(component.getComponent())) 183 184 return dns_name
185
186 - def _getCertDN(self):
187 return self.__certDN
188
189 - def _setCertDN(self, val):
190 if isinstance(val, basestring): 191 # Allow for quoted DN 192 certDN = val.strip('"') 193 194 dnFields = self.__class__.PARSER_RE.split(certDN) 195 if len(dnFields) < 2: 196 raise TypeError('Error parsing DN string: "%s"' % certDN) 197 198 self.__certDN = zip(dnFields[1::2], dnFields[2::2]) 199 self.__certDN.sort() 200 201 elif not isinstance(val, list): 202 for i in val: 203 if not len(i) == 2: 204 raise TypeError('Expecting list of two element DN field, ' 205 'DN field value pairs for "certDN" ' 206 'attribute') 207 self.__certDN = val 208 else: 209 raise TypeError('Expecting list or string type for "certDN" ' 210 'attribute')
211 212 certDN = property(fget=_getCertDN, 213 fset=_setCertDN, 214 doc="Distinguished Name for Server Certificate") 215 216 # Get/Set Property methods
217 - def _getHostname(self):
218 return self.__hostname
219
220 - def _setHostname(self, val):
221 if not isinstance(val, basestring): 222 raise TypeError("Expecting string type for hostname " 223 "attribute") 224 self.__hostname = val
225 226 hostname = property(fget=_getHostname, 227 fset=_setHostname, 228 doc="hostname of server")
229