Open Table Of Contents

Source code for bridgedb.crypto

# -*- coding: utf-8 -*-
#
# This file is part of BridgeDB, a Tor bridge distribution system.
#
# :authors: Isis Lovecruft 0xA3ADB67A2CDB8B35 <isis@torproject.org>
# :copyright: (c) 2007-2015, The Tor Project, Inc.
#             (c) 2013-2015, Isis Lovecruft
#             (c) 2007-2015, all entities within the AUTHORS file
# :license: 3-clause BSD, see included LICENSE for information

"""This module contains general utilities for working with external
cryptographic tools and libraries, including OpenSSL and GnuPG. It also
includes utilities for creating callable HMAC functions, generating HMACs for
data, and generating and/or storing key material.

.. py:module:: bridgedb.crypto
   :synopsis: BridgeDB general cryptographic utilities.

::

   bridgedb.crypto
     |_getGPGContext() - Get a pre-configured GPGME context.
     |_getHMAC() - Compute an HMAC with some key for some data.
     |_getHMACFunc() - Get a callable for producing HMACs with the given key.
     |_getKey() - Load the master HMAC key from a file, or create a new one.
     |_getRSAKey() - Load an RSA key from a file, or create a new one.
     |_gpgSignMessage() - Sign a message string according to a GPGME context.
     |_writeKeyToFile() - Write to a file readable only by the process owner.
     |
     \_SSLVerifyingContextFactory - OpenSSL.SSL.Context factory which verifies
        |                           certificate chains and matches hostnames.
        |_getContext() - Retrieve an SSL context configured for certificate
        |                verification.
        |_getHostnameFromURL() - Parses the hostname from the request URL.
        \_verifyHostname() - Check that the cert CN matches the request
                             hostname.
..
"""

from __future__ import absolute_import
from __future__ import unicode_literals

import gnupg
import hashlib
import hmac
import io
import logging
import os
import re
import urllib

import OpenSSL

from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA

from twisted.internet import ssl
from twisted.python.procutils import which


#: The hash digest to use for HMACs.
DIGESTMOD = hashlib.sha1

# Test to see if we have the old or new style buffer() interface. Trying
# to use an old-style buffer on Python2.7 prior to version 2.7.5 will produce:
#
#     TypeError: 'buffer' does not have the buffer interface
#
#: ``True`` if we have the new-style `buffer`_ interface; ``False`` otherwise.
#:
#: .. _buffer: https://docs.python.org/2/c-api/buffer.html
NEW_BUFFER_INTERFACE = False
try:
    io.BytesIO(buffer('test'))
except TypeError:  # pragma: no cover
    logging.warn(
        "This Python version is too old! "\
        "It doesn't support new-style buffer interfaces: "\
        "https://mail.python.org/pipermail/python-dev/2010-October/104917.html")
else:
    NEW_BUFFER_INTERFACE = True


[docs]class PKCS1PaddingError(Exception): """Raised when there is a problem adding or removing PKCS#1 padding."""
[docs]class RSAKeyGenerationError(Exception): """Raised when there was an error creating an RSA keypair."""
[docs]def writeKeyToFile(key, filename): """Write **key** to **filename**, with ``0400`` permissions. If **filename** doesn't exist, it will be created. If it does exist already, and is writable by the owner of the current process, then it will be truncated to zero-length and overwritten. :param bytes key: A key (or some other private data) to write to **filename**. :param str filename: The path of the file to write to. :raises: Any exceptions which may occur. """ logging.info("Writing key to file: %r" % filename) flags = os.O_WRONLY | os.O_TRUNC | os.O_CREAT | getattr(os, "O_BIN", 0) fd = os.open(filename, flags, 0400) os.write(fd, key) os.fsync(fd) os.close(fd)
[docs]def getRSAKey(filename, bits=2048): """Load the RSA key stored in **filename**, or create and save a new key. >>> from bridgedb import crypto >>> keyfile = 'doctest_getRSAKey' >>> message = "The secret words are Squeamish Ossifrage." >>> keypair = crypto.getRSAKey(keyfile, bits=2048) >>> (secretkey, publickey) = keypair >>> encrypted = publickey.encrypt(message) >>> assert encrypted != message >>> decrypted = secretkey.decrypt(encrypted) >>> assert message == decrypted If **filename** already exists, it is assumed to contain a PEM-encoded RSA private key, which will be read from the file. (The parameters of a private RSA key contain the public exponent and public modulus, which together comprise the public key ― ergo having two separate keyfiles is assumed unnecessary.) If **filename** doesn't exist, a new RSA keypair will be created, and the private key will be stored in **filename**, using :func:`writeKeyToFile`. Once the private key is either loaded or created, the public key is extracted from it. Both keys are then input into PKCS#1 RSAES-OAEP cipher schemes (see `RFC 3447 §7.1`__) in order to introduce padding, and then returned. .. __: https://tools.ietf.org/html/rfc3447#section-7.1 :param str filename: The filename to which the secret parameters of the RSA key are stored in. :param int bits: If no key is found within the file, create a new key with this bitlength and store it in **filename**. :rtype: tuple of ``Crypto.Cipher.PKCS1_OAEP.PKCS1OAEP_Cipher`` :returns: A 2-tuple of ``(privatekey, publickey)``, which are PKCS#1 RSAES-OAEP padded and encoded private and public keys, forming an RSA keypair. """ filename = os.path.extsep.join([filename, 'sec']) keyfile = os.path.join(os.getcwd(), filename) try: fh = open(keyfile, 'rb') except IOError: logging.info("Generating %d-bit RSA keypair..." % bits) secretKey = RSA.generate(bits, e=65537) # Store a PEM copy of the secret key (which contains the parameters # necessary to create the corresponding public key): secretKeyPEM = secretKey.exportKey("PEM") writeKeyToFile(secretKeyPEM, keyfile) else: logging.info("Secret RSA keyfile %r found. Loading..." % filename) secretKey = RSA.importKey(fh.read()) fh.close() publicKey = secretKey.publickey() # Add PKCS#1 OAEP padding to the secret and public keys: sk = PKCS1_OAEP.new(secretKey) pk = PKCS1_OAEP.new(publicKey) return (sk, pk)
[docs]def getKey(filename): """Load the master key stored in ``filename``, or create a new key. If ``filename`` does not exist, create a new 32-byte key and store it in ``filename``. >>> import os >>> from bridgedb import crypto >>> name = 'doctest_getKey' >>> os.path.exists(name) False >>> k1 = crypto.getKey(name) >>> os.path.exists(name) True >>> open(name).read() == k1 True >>> k2 = crypto.getKey(name) >>> k1 == k2 True :param string filename: The filename to store the secret key in. :rtype: bytes :returns: A byte string containing the secret key. """ try: fh = open(filename, 'rb') except IOError: logging.debug("getKey(): Creating new secret key.") key = OpenSSL.rand.bytes(32) writeKeyToFile(key, filename) else: logging.debug("getKey(): Secret key file found. Loading...") key = fh.read() fh.close() return key
[docs]def getHMAC(key, value): """Return the HMAC of **value** using the **key**.""" h = hmac.new(key, value, digestmod=DIGESTMOD) return h.digest()
[docs]def getHMACFunc(key, hex=True): """Return a function that computes the HMAC of its input using the **key**. :param bool hex: If True, the output of the function will be hex-encoded. :rtype: callable :returns: A function which can be uses to generate HMACs. """ h = hmac.new(key, digestmod=DIGESTMOD) def hmac_fn(value): h_tmp = h.copy() h_tmp.update(value) if hex: return h_tmp.hexdigest() else: return h_tmp.digest() return hmac_fn
[docs]def removePKCS1Padding(message): """Remove PKCS#1 padding from a **message**. (PKCS#1 v1.0? See :trac:`13042`.) Each block is 128 bytes total in size: * 2 bytes for the type info (``'\\x00\\x01'``) * 1 byte for the separator (``'\\x00'``) * variable length padding (``'\\xFF'``) * variable length for the **message** .. Note that the above strings are double escaped, due to the way that Sphinx renders escaped strings in docstrings. For more information on the structure of PKCS#1 padding, see :rfc:`2313`, particularly `the notes in §8.1`__. .. __: https://tools.ietf.org/html/rfc2313#section-8.1 :param str message: A message which is PKCS#1 padded. :raises PKCS1PaddingError: if there is an issue parsing the **message**. :rtype: bytes :returns: The message without the PKCS#1 padding. """ padding = b'\xFF' typeinfo = b'\x00\x01' separator = b'\x00' unpadded = None try: if message.index(typeinfo) != 0: raise PKCS1PaddingError("Couldn't find PKCS#1 identifier bytes!") start = message.index(separator, 2) + 1 # 2 bytes for the typeinfo, # and 1 byte for the separator. except ValueError: raise PKCS1PaddingError("Couldn't find PKCS#1 separator byte!") else: unpadded = message[start:] return unpadded
[docs]def initializeGnuPG(config): """Initialize a GnuPG interface and test our configured keys. .. note:: This function uses python-gnupg_. :type config: :class:`bridgedb.persistent.Conf` :param config: The loaded config file. :rtype: 2-tuple :returns: If ``EMAIL_GPG_SIGNING_ENABLED`` isn't ``True``, or we couldn't initialize GnuPG and make a successful test signature with the specified key, then a 2-tuple of ``None`` is returned. Otherwise, the first item in the tuple is a :class:`gnupg.GPG` interface_ with the GnuPG homedir set to the ``EMAIL_GPG_HOMEDIR`` option and the signing key specified by the ``EMAIL_GPG_SIGNING_KEY_FINGERPRINT`` option in bridgedb.conf set as the default key. The second item in the tuple is a signing function with the passphrase (as specified in either ``EMAIL_GPG_PASSPHRASE`` or ``EMAIL_GPG_PASSPHRASE_FILE``) already set. .. _python-gnupg: https://pypi.python.org/pypi/gnupg/ .. _interface: https://python-gnupg.readthedocs.org/en/latest/gnupg.html#gnupg-module """ ret = (None, None) if not config.EMAIL_GPG_SIGNING_ENABLED: return ret homedir = config.EMAIL_GPG_HOMEDIR primary = config.EMAIL_GPG_PRIMARY_KEY_FINGERPRINT passphrase = config.EMAIL_GPG_PASSPHRASE passFile = config.EMAIL_GPG_PASSPHRASE_FILE logging.info("Using %s as our GnuPG home directory..." % homedir) gpg = gnupg.GPG(homedir=homedir) logging.info("Initialized GnuPG interface using %s binary with version %s." % (gpg.binary, gpg.binary_version)) primarySK = None primaryPK = None secrets = gpg.list_keys(secret=True) publics = gpg.list_keys() if not secrets: logging.warn("No secret keys found in %s!" % gpg.secring) return ret primarySK = filter(lambda key: key['fingerprint'] == primary, secrets) primaryPK = filter(lambda key: key['fingerprint'] == primary, publics) if primarySK and primaryPK: logging.info("Found GnuPG primary key with fingerprint: %s" % primary) for sub in primaryPK[0]['subkeys']: logging.info(" Subkey: %s Usage: %s" % (sub[0], sub[1].upper())) else: logging.warn("GnuPG key %s could not be found in %s!" % (primary, gpg.secring)) return ret if passphrase: logging.info("Read GnuPG passphrase from config.") elif passFile: try: with open(passFile) as fh: passphrase = fh.read() except (IOError, OSError): logging.error("Could not open GnuPG passphrase file: %s!" % passFile) else: logging.info("Read GnuPG passphrase from file: %s" % passFile) def gpgSignMessage(message): """Sign **message** with the default key specified by ``EMAIL_GPG_PRIMARY_KEY_FINGERPRINT``. :param str message: A message to sign. :rtype: str or ``None``. :returns: A string containing the clearsigned message, or ``None`` if the signing failed. """ sig = gpg.sign(message, default_key=primary, passphrase=passphrase) if sig and sig.data: return sig.data logging.debug("Testing signature created with GnuPG key...") sig = gpgSignMessage("Testing 1 2 3") if sig: logging.info("Test signature with GnuPG key %s okay:\n%s" % (primary, sig)) return (gpg, gpgSignMessage) return ret
[docs]class SSLVerifyingContextFactory(ssl.CertificateOptions): """``OpenSSL.SSL.Context`` factory which does full certificate-chain and hostname verfication. """ isClient = True def __init__(self, url, **kwargs): """Create a client-side verifying SSL Context factory. To pass acceptable certificates for a server which does client-authentication checks: initialise with a ``caCerts=[]`` keyword argument, which should be a list of ``OpenSSL.crypto.X509`` instances (one for each peer certificate to add to the store), and set ``SSLVerifyingContextFactory.isClient=False``. :param str url: The URL being requested by an :api:`twisted.web.client.Agent`. :param bool isClient: True if we're being used in a client implementation; False if we're a server. """ self.hostname = self.getHostnameFromURL(url) # ``verify`` here refers to server-side verification of certificates # presented by a client: self.verify = False if self.isClient else True super(SSLVerifyingContextFactory, self).__init__(verify=self.verify, fixBrokenPeers=True, **kwargs)
[docs] def getContext(self, hostname=None, port=None): """Retrieve a configured ``OpenSSL.SSL.Context``. Any certificates in the ``caCerts`` list given during initialisation are added to the ``Context``'s certificate store. The **hostname** and **port** arguments seem unused, but they are required due to some Twisted and pyOpenSSL internals. See :api:`twisted.web.client.Agent._wrapContextFactory`. :rtype: ``OpenSSL.SSL.Context`` :returns: An SSL Context which verifies certificates. """ ctx = super(SSLVerifyingContextFactory, self).getContext() store = ctx.get_cert_store() verifyOptions = OpenSSL.SSL.VERIFY_PEER ctx.set_verify(verifyOptions, self.verifyHostname) return ctx
[docs] def getHostnameFromURL(self, url): """Parse the hostname from the originally requested URL. :param str url: The URL being requested by an :api:`twisted.web.client.Agent`. :rtype: str :returns: The full hostname (including any subdomains). """ hostname = urllib.splithost(urllib.splittype(url)[1])[0] logging.debug("Parsed hostname %r for cert CN matching." % hostname) return hostname
[docs] def verifyHostname(self, connection, x509, errnum, depth, okay): """Callback method for additional SSL certificate validation. If the certificate is signed by a valid CA, and the chain is valid, verify that the level 0 certificate has a subject common name which is valid for the hostname of the originally requested URL. :param connection: An ``OpenSSL.SSL.Connection``. :param x509: An ``OpenSSL.crypto.X509`` object. :param errnum: A pyOpenSSL error number. See that project's docs. :param depth: The depth which the current certificate is at in the certificate chain. :param bool okay: True if all the pyOpenSSL default checks on the certificate passed. False otherwise. """ commonName = x509.get_subject().commonName logging.debug("Received cert at level %d: '%s'" % (depth, commonName)) # We only want to verify that the hostname matches for the level 0 # certificate: if okay and (depth == 0): cn = commonName.replace('*', '.*') hostnamesMatch = re.search(cn, self.hostname) if not hostnamesMatch: logging.warn("Invalid certificate subject CN for '%s': '%s'" % (self.hostname, commonName)) return False logging.debug("Valid certificate subject CN for '%s': '%s'" % (self.hostname, commonName)) return True