# coding: utf-8
# truepy
# Copyright (C) 2014-2015 Moses Palmér
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import gzip
import hashlib
import io
import sys
import cryptography.x509
from Crypto.Cipher import DES
from cryptography.hazmat import backends
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import dsa, padding, rsa
from . import LicenseData, fromstring
from ._bean import deserialize, serialize, to_document
from ._bean_serializers import bean_class
from ._name import Name
@bean_class('de.schlichtherle.xml.GenericCertificate')
[docs]class License(object):
SIGNATURE_ENCODING = 'US-ASCII/Base64'
_SALT = b'\xCE\xFB\xDE\xAC\x05\x02\x19\x71'
_ITERATIONS = 2005
_DIGEST = hashlib.md5
_KEY_SIZE = 8
BLOCK_SIZE = 8
[docs] class InvalidSignatureException(Exception):
"""Raised when the signature does not match"""
pass
[docs] class InvalidPasswordException(Exception):
"""Raised when the license password is invalid"""
pass
@property
def encoded(self):
"""The encoded license data"""
return self._encoded
@property
def signature(self):
"""The signature of the license data as base 64 encoded data"""
return self._signature
@property
def signature_algorithm(self):
"""The signature algorithm used to sign"""
return '%swith%s' % (
self._signature_digest,
self._signature_encryption)
@property
def signature_encoding(self):
"""The encoding of the signature; this is always *US-ASCII/Base64*"""
return 'US-ASCII/Base64'
@signature_encoding.setter
def signature_encoding(self, value):
if value != self.SIGNATURE_ENCODING:
raise ValueError('invalid signature encoding: %s', value)
def __init__(self, encoded, signature, signature_algorithm='SHA1withRSA',
signature_encoding=SIGNATURE_ENCODING):
"""A class representing a signed license.
:param str encoded: The encoded license data.
:param str signature: The license signature.
:param str signature_algorithm: The algorithm used to sign the license.
This must be on the form `<digest>with<encryption>`.
:param str signature_encoding: The encoding of the signature. This must
be `US-ASCII/Base64`.
:raises ValueError: if encoded is not an encoded
:class:`~truepy.LicenseData` object, if signature_algorithm is
invalid or if signature_encoding is not US-ASCII/Base64
"""
license_data_xml = fromstring(encoded)
if license_data_xml.tag != 'java' or len(license_data_xml) != 1:
raise ValueError('invalid encoded license data: %s', encoded)
#: The decoded license data as an instance of
#: :class:`~truepy.LicenseData`
self.data = deserialize(license_data_xml[0])
self._encoded = encoded
self._signature = signature
try:
self._signature_digest, self._signature_encryption = \
signature_algorithm.split('with')
except ValueError:
raise ValueError(
'invalid signature algorithm: %s',
signature_algorithm)
self.signature_encoding = signature_encoding
@classmethod
[docs] def issue(self, certificate, key, digest='SHA1', **license_data):
"""Issues a new License.
:param certificate: The issuer certificate.
:type certificate: bytes or cryptography.x509.Certificate
:param key: The private key of the certificate.
:param str digest: The digest algorithm to use.
:param license_data: Parameters to pass on to truepy.LicenseData. Do
not pass issuer; this value will be read from the certificate
subject. You may also specify the single value license_data; this
must in that case be an instance of :class:`~truepy.LicenseData`.
:raises ValueError: if license data cannot be created from the keyword
arguments or if the issuer name is passed
:return: a new license
:rtype: truepy.License
"""
certificate = self._certificate(certificate)
if 'license_data' in license_data:
if len(license_data) != 1:
raise ValueError('invalid keyword arguments')
license_data = license_data['license_data']
else:
if 'issuer' in license_data:
raise ValueError('issuer must not be passed')
issuer = Name.from_x509_name(certificate.subject)
license_data['issuer'] = str(issuer)
try:
license_data = LicenseData(**license_data)
except TypeError:
raise ValueError('invalid keyword arguments')
if not isinstance(license_data, LicenseData):
raise ValueError('invalid license_data: %s', license_data)
if isinstance(key, rsa.RSAPrivateKey):
encryption = 'RSA'
elif isinstance(key, dsa.DSAPrivateKey):
encryption = 'DSA'
else:
raise ValueError('unknown key type')
encoded = to_document(serialize(license_data))
signer = key.signer(
padding.PKCS1v15(),
getattr(hashes, digest)())
signer.update(encoded.encode('ascii'))
signature = base64.b64encode(signer.finalize()).decode('ascii')
return License(encoded, signature, 'with'.join((digest, encryption)))
[docs] def verify(self, certificate):
"""Verifies the signature of this certificate against a certificate.
:param certificate: The issuer certificate.
:type certificate: bytes or cryptography.x509.Certificate
:raises truepy.License.InvalidSignatureException: if the signature does
not match
"""
certificate = self._certificate(certificate)
verifier = certificate.public_key().verifier(
base64.b64decode(self.signature),
padding.PKCS1v15(),
getattr(hashes, self._signature_digest)())
verifier.update(self.encoded.encode('ascii'))
try:
verifier.verify()
except cryptography.exceptions.InvalidSignature as e:
raise self.InvalidSignatureException(e)
@classmethod
def _certificate(self, certificate):
"""Ensures that a variable is a certificate.
If ``certificate`` is a parsed certificate, it will be returned
unmodified, otherwise it will be treated as a *PEM* blob.
:param certificate: The certificate to parse.
:return: a parsed certificate
"""
if isinstance(certificate, cryptography.x509.Certificate):
return certificate
else:
return cryptography.x509.load_pem_x509_certificate(
certificate,
backends.default_backend())
@classmethod
def _key_iv(self, password, salt=_SALT, iterations=_ITERATIONS,
digest=_DIGEST, key_size=_KEY_SIZE):
"""Derives a key from a password.
The default values will generate a key and IV for DES encryption
compatible with PKCS#5 1.5.
:param bytes password: The password from which to derive the key.
:param bytes salt: The password salt. This parameter is not validated.
:param int iterations: The number of hashing iterations. This parameter
is not validated.
:param digest: The digest method to use.
:param int key_size: The key size to generate.
:return: the key and IV
:rtype: (bytes, bytes)
"""
# Perform the hashing iterations
keyiv = password + salt
for i in range(iterations):
keyiv = digest(keyiv).digest()
return (keyiv[:key_size], keyiv[key_size:])
@classmethod
def _unpad(self, data):
""" Removes PKCS#5 1.5 padding from ``data``.
:param bytes data: The data to unpad.
:return: unpadded data
:rtype: bytes
:raises truepy.License.InvalidPasswordException: if the padding is
invalid
"""
if sys.version_info.major < 3:
padding_length = ord(data[-1])
is_valid = all(
ord(d) == padding_length
for d in data[-padding_length:])
else:
padding_length = data[-1]
is_valid = all(
d == padding_length
for d in data[-padding_length:])
if not is_valid:
raise self.InvalidPasswordException('invalid PKCS#5 padding')
return data[:-padding_length]
@classmethod
def _pad(self, data, block_size=BLOCK_SIZE):
"""Adds PKCS#5 1.5 padding to ``data``.
:param bytes data: The data to pad.
:param int block_size: The encryption block size. The default value is
compatible with DES.
:return: padded data
:rtype: bytes
"""
padding_length = block_size - len(data) % block_size
if sys.version_info.major < 3:
return data + ''.join(
[chr(block_size - len(data) % block_size)] * padding_length)
else:
return data + bytes(
padding_length
for i in range(block_size - len(data) % block_size))
@classmethod
[docs] def load(self, f, password):
"""Loads a license from a stream.
:param f: The data stream.
:type f: file or stream
:param bytes password: The password used by the licensed application.
:return: a license object
:rtype: truepy.License
:raises ValueError: if the input data is invalid
:raises truepy.License.InvalidPasswordException: if the password is
invalid
"""
# Initialise cryptography
key, iv = self._key_iv(password)
des = DES.new(
key=key,
IV=iv,
mode=DES.MODE_CBC)
# Decrypt the input stream
encrypted_data = f.read()
decrypted_data = self._unpad(des.decrypt(encrypted_data))
# Decompress and parse the XML
decrypted_stream = io.BytesIO(decrypted_data)
with gzip.GzipFile(fileobj=decrypted_stream, mode='r') as gz:
xml_data = gz.read()
# Use the first child of the top-level java element
element = fromstring(xml_data)[0]
return deserialize(element)
[docs] def store(self, f, password):
"""Stores this license to a stream.
:param f: The data stream.
:type f: file or stream
:param bytes password: The password used by the licensed application.
"""
# Initialise cryptography
key, iv = self._key_iv(password)
des = DES.new(
key=key,
IV=iv,
mode=DES.MODE_CBC)
# Serialize the license
xml_data = to_document(serialize(self)) if sys.version_info.major < 3 \
else bytes(to_document(serialize(self)), 'ascii')
# Compress the XML
compressed_stream = io.BytesIO()
with gzip.GzipFile(fileobj=compressed_stream, mode='w') as gz:
gz.write(xml_data)
compressed_data = compressed_stream.getvalue()
# Encrypt the data and write it to the output stream
encrypted_data = des.encrypt(self._pad(compressed_data))
f.write(encrypted_data)