Source code for saml2.response

# -*- coding: utf-8 -*-

# Copyright (c) 2014, OneLogin, Inc.
# All rights reserved.

from base64 import b64decode
from copy import deepcopy
from lxml import etree
from os.path import basename
from time import time
import sys
from xml.dom.minidom import Document

import dm.xmlsec.binding as xmlsec

from saml2.constants import OneLogin_Saml2_Constants
from saml2.utils import OneLogin_Saml2_Utils


[docs]class OneLogin_Saml2_Response(object): def __init__(self, settings, response): """ Constructs the response object. :param settings: The setting info :type settings: OneLogin_Saml2_Setting object :param response: The base64 encoded, XML string containing the samlp:Response :type response: string """ self.__settings = settings self.response = b64decode(response) self.document = etree.fromstring(self.response) self.decrypted_document = None self.encrypted = None # Quick check for the presence of EncryptedAssertion encrypted_assertion_nodes = self.__query('//saml:EncryptedAssertion') if encrypted_assertion_nodes: decrypted_document = deepcopy(self.document) self.encrypted = True self.decrypted_document = self.__decrypt_assertion(decrypted_document)
[docs] def is_valid(self, request_data, request_id=None): """ Constructs the response object. :param request_id: Optional argument. The ID of the AuthNRequest sent by this SP to the IdP :type request_id: string :returns: True if the SAML Response is valid, False if not :rtype: bool """ try: # Checks SAML version if self.document.get('Version', None) != '2.0': raise Exception('Unsupported SAML version') # Checks that ID exists if self.document.get('ID', None) is None: raise Exception('Missing ID attribute on SAML Response') # Checks that the response only has one assertion if not self.validate_num_assertions(): raise Exception('Multiple assertions are not supported') # Checks that the response has the SUCCESS status self.check_status() idp_data = self.__settings.get_idp_data() idp_entityid = idp_data.get('entityId', '') sp_data = self.__settings.get_sp_data() sp_entityid = sp_data.get('entityId', '') sign_nodes = self.__query('//ds:Signature') signed_elements = [] for sign_node in sign_nodes: signed_elements.append(sign_node.getparent().tag) if self.__settings.is_strict(): res = OneLogin_Saml2_Utils.validate_xml(etree.tostring(self.document), 'saml-schema-protocol-2.0.xsd', self.__settings.is_debug_active()) if not isinstance(res, Document): raise Exception('Invalid SAML Response. Not match the saml-schema-protocol-2.0.xsd') security = self.__settings.get_security_data() current_url = OneLogin_Saml2_Utils.get_self_url_no_query(request_data) # Check if the InResponseTo of the Response matchs the ID of the AuthNRequest (requestId) if provided in_response_to = self.document.get('InResponseTo', None) if in_response_to and request_id: if in_response_to != request_id: raise Exception('The InResponseTo of the Response: %s, does not match the ID of the AuthNRequest sent by the SP: %s' % (in_response_to, request_id)) if not self.encrypted and security.get('wantAssertionsEncrypted', False): raise Exception('The assertion of the Response is not encrypted and the SP require it') if security.get('wantNameIdEncrypted', False): encrypted_nameid_nodes = self.__query_assertion('/saml:Subject/saml:EncryptedID/xenc:EncryptedData') if not encrypted_nameid_nodes: raise Exception('The NameID of the Response is not encrypted and the SP require it') # Checks that there is at least one AttributeStatement attribute_statement_nodes = self.__query_assertion('/saml:AttributeStatement') if not attribute_statement_nodes: raise Exception('There is no AttributeStatement on the Response') # Validates Asserion timestamps if not self.validate_timestamps(): raise Exception('Timing issues (please check your clock settings)') encrypted_attributes_nodes = self.__query_assertion('/saml:AttributeStatement/saml:EncryptedAttribute') if encrypted_attributes_nodes: raise Exception('There is an EncryptedAttribute in the Response and this SP not support them') # Checks destination destination = self.document.get('Destination', None) if destination: if destination not in current_url: raise Exception('The response was received at %s instead of %s' % (current_url, destination)) # Checks audience valid_audiences = self.get_audiences() if valid_audiences and sp_entityid not in valid_audiences: raise Exception('%s is not a valid audience for this Response' % sp_entityid) # Checks the issuers issuers = self.get_issuers() for issuer in issuers: if not issuer or issuer != idp_entityid: raise Exception('Invalid issuer in the Assertion/Response') # Checks the session Expiration session_expiration = self.get_session_not_on_or_after() if not session_expiration and session_expiration <= time(): raise Exception('The attributes have expired, based on the SessionNotOnOrAfter of the AttributeStatement of this Response') # Checks the SubjectConfirmation, at least one SubjectConfirmation must be valid any_subject_confirmation = False subject_confirmation_nodes = self.__query_assertion('/saml:Subject/saml:SubjectConfirmation') for scn in subject_confirmation_nodes: method = scn.get('Method', None) if method and method != OneLogin_Saml2_Constants.CM_BEARER: continue scData = scn.find('saml:SubjectConfirmationData', namespaces=OneLogin_Saml2_Constants.NSMAP) if scData is None: continue else: irt = scData.get('InResponseTo', None) if irt != in_response_to: continue recipient = scData.get('Recipient', None) if recipient not in current_url: continue nooa = scData.get('NotOnOrAfter', None) if nooa: parsed_nooa = OneLogin_Saml2_Utils.parse_SAML_to_time(nooa) if parsed_nooa <= time(): continue nb = scData.get('NotBefore', None) if nb: parsed_nb = OneLogin_Saml2_Utils.parse_SAML_to_time(nb) if (parsed_nb > time()): continue any_subject_confirmation = True break if not any_subject_confirmation: raise Exception('A valid SubjectConfirmation was not found on this Response') if security.get('wantAssertionsSigned', False) and 'saml:Assertion' not in signed_elements: raise Exception('The Assertion of the Response is not signed and the SP require it') if security.get('wantMessagesSigned', False) and 'samlp:Response' not in signed_elements: raise Exception('The Message of the Response is not signed and the SP require it') document_to_validate = None if len(signed_elements) > 0: cert = idp_data.get('x509cert', None) fingerprint = idp_data.get('certFingerprint', None) # Only validates the first sign found if 'samlp:Response' in signed_elements: document_to_validate = self.document else: if self.encrypted: document_to_validate = self.decrypted_document else: document_to_validate = self.document if document_to_validate is not None: if not OneLogin_Saml2_Utils.validate_sign(document_to_validate, cert, fingerprint): raise Exception('Signature validation failed. SAML Response rejected') return True except: debug = self.__settings.is_debug_active() if debug: print sys.exc_info()[0] return False
[docs] def check_status(self): """ Check if the status of the response is success or not :raises: Exception. If the status is not success """ status = OneLogin_Saml2_Utils.get_status(self.document) code = status.get('code', None) if code and code != OneLogin_Saml2_Constants.STATUS_SUCCESS: splited_code = code.split(':') printable_code = splited_code.pop() status_exception_msg = 'The status code of the Response was not Success, was %s' % printable_code status_msg = status.get('msg', None) if status_msg: status_exception_msg += ' -> ' + status_msg raise Exception(status_exception_msg)
[docs] def get_audiences(self): """ Gets the audiences :returns: The valid audiences for the SAML Response :rtype: list """ audiences = [] audience_nodes = self.__query_assertion('/saml:Conditions/saml:AudienceRestriction/saml:Audience') for audience_node in audience_nodes: audiences.append(audience_node.text)
[docs] def get_issuers(self): """ Gets the issuers (from message and from assertion) :returns: The issuers :rtype: list """ issuers = [] message_issuer_nodes = self.__query('/samlp:Response/saml:Issuer') if message_issuer_nodes: issuers.append(message_issuer_nodes[0].text) assertion_issuer_nodes = self.__query_assertion('/saml:Issuer') if assertion_issuer_nodes: issuers.append(assertion_issuer_nodes[0].text) return list(set(issuers))
[docs] def get_nameid_data(self): """ Gets the NameID Data provided by the SAML Response from the IdP :returns: Name ID Data (Value, Format, NameQualifier, SPNameQualifier) :rtype: dict """ nameid = None encrypted_id_data_nodes = self.__query_assertion('/saml:Subject/saml:EncryptedID/xenc:EncryptedData') if encrypted_id_data_nodes: encrypted_data = encrypted_id_data_nodes[0] xmlsec.initialize() # Load the key into the xmlsec context key = self.__settings.get_sp_key() file_key = OneLogin_Saml2_Utils.write_temp_file(key) # FIXME avoid writing a file enc_key = xmlsec.Key.load(file_key.name, xmlsec.KeyDataFormatPem, None) enc_key.name = basename(file_key.name) file_key.close() enc_ctx = xmlsec.EncCtx() enc_ctx.encKey = enc_key nameid = OneLogin_Saml2_Utils.decrypt_element(encrypted_data, enc_ctx) else: nameid_nodes = self.__query_assertion('/saml:Subject/saml:NameID') if nameid_nodes: nameid = nameid_nodes[0] if nameid is None: raise Exception('Not NameID found in the assertion of the Response') nameid_data = {'Value': nameid.text} for attr in ['Format', 'SPNameQualifier', 'NameQualifier']: value = nameid.get(attr, None) if value: nameid_data[attr] = value return nameid_data
[docs] def get_nameid(self): """ Gets the NameID provided by the SAML Response from the IdP :returns: NameID (value) :rtype: string """ nameid_data = self.get_nameid_data() return nameid_data['Value']
[docs] def get_session_not_on_or_after(self): """ Gets the SessionNotOnOrAfter from the AuthnStatement Could be used to set the local session expiration :returns: The SessionNotOnOrAfter value :rtype: time|None """ not_on_or_after = None authn_statement_nodes = self.__query_assertion('/saml:AuthnStatement[@SessionNotOnOrAfter]') if authn_statement_nodes: not_on_or_after = OneLogin_Saml2_Utils.parse_SAML_to_time(authn_statement_nodes[0].get('SessionNotOnOrAfter')) return not_on_or_after
[docs] def get_session_index(self): """ Gets the SessionIndex from the AuthnStatement Could be used to be stored in the local session in order to be used in a future Logout Request that the SP could send to the SP, to set what specific session must be deleted :returns: The SessionIndex value :rtype: string|None """ session_index = None authn_statement_nodes = self.__query_assertion('/saml:AuthnStatement[@SessionIndex]') if authn_statement_nodes: session_index = authn_statement_nodes[0].get('SessionIndex') return session_index
[docs] def get_attributes(self): """ Gets the Attributes from the AttributeStatement element. EncryptedAttributes are not supported """ attributes = {} attribute_nodes = self.__query_assertion('/saml:AttributeStatement/saml:Attribute') for attribute_node in attribute_nodes: attr_name = attribute_node.get('Name') values = [] for attr in attribute_node.iterchildren('{%s}AttributeValue' % OneLogin_Saml2_Constants.NSMAP['saml']): values.append(attr.text) attributes[attr_name] = values return attributes
[docs] def validate_num_assertions(self): """ Verifies that the document only contains a single Assertion (encrypted or not) :returns: True if only 1 assertion encrypted or not :rtype: bool """ encrypted_assertion_nodes = self.__query('//saml:EncryptedAssertion') assertion_nodes = self.__query('//saml:Assertion') return (len(encrypted_assertion_nodes) + len(assertion_nodes)) == 1
[docs] def validate_timestamps(self): """ Verifies that the document is valid according to Conditions Element :returns: True if the condition is valid, False otherwise :rtype: bool """ conditions_nodes = self.__query('//saml:Conditions') for conditions_node in conditions_nodes: nb_attr = conditions_node.get('NotBefore') nooa_attr = conditions_node.get('NotOnOrAfter') if nb_attr and OneLogin_Saml2_Utils.parse_SAML_to_time(nb_attr) > time() + OneLogin_Saml2_Constants.ALOWED_CLOCK_DRIFT: return False if nooa_attr and OneLogin_Saml2_Utils.parse_SAML_to_time(nooa_attr) + OneLogin_Saml2_Constants.ALOWED_CLOCK_DRIFT <= time(): return False return True
def __query_assertion(self, xpath_expr): """ Extracts nodes that match the query from the Assertion :param query: Xpath Expresion :type query: String :returns: The queried nodes :rtype: list """ if self.encrypted: assertion_expr = '/saml:EncryptedAssertion/saml:Assertion' else: assertion_expr = '/saml:Assertion' signature_expr = '/ds:Signature/ds:SignedInfo/ds:Reference' signed_assertion_query = '/samlp:Response' + assertion_expr + signature_expr assertion_reference_nodes = self.__query(signed_assertion_query) if not assertion_reference_nodes: # Check if the message is signed signed_message_query = '/samlp:Response' + signature_expr message_reference_nodes = self.__query(signed_message_query) if message_reference_nodes: id = message_reference_nodes[0].get('URI') final_query = "/samlp:Response[@ID='%s']/" % id[1:] else: final_query = "/samlp:Response/" final_query += assertion_expr else: id = assertion_reference_nodes[0].get('URI') final_query = '/samlp:Response' + assertion_expr + "[@ID='%s']" % id[1:] final_query += xpath_expr return self.__query(final_query) def __query(self, query): """ Extracts nodes that match the query from the Response :param query: Xpath Expresion :type query: String :returns: The queried nodes :rtype: list """ if self.encrypted: document = self.decrypted_document else: document = self.document return OneLogin_Saml2_Utils.query(document, query) def __decrypt_assertion(self, dom): """ Decrypts the Assertion :raises: Exception if no private key available :param dom: Encrypted Assertion :type dom: Element :returns: Decrypted Assertion :rtype: Element """ key = self.__settings.get_sp_key() if not key: raise Exception('No private key available, check settings') # TODO Study how decrypt assertion