# -*- coding: utf-8 -*-
# Copyright (c) 2014, OneLogin, Inc.
# All rights reserved.
from base64 import b64decode
from copy import deepcopy
from lxml import etree
from os.path import basename
from time import time
import sys
from xml.dom.minidom import Document
import dm.xmlsec.binding as xmlsec
from saml2.constants import OneLogin_Saml2_Constants
from saml2.utils import OneLogin_Saml2_Utils
[docs]class OneLogin_Saml2_Response(object):
def __init__(self, settings, response):
"""
Constructs the response object.
:param settings: The setting info
:type settings: OneLogin_Saml2_Setting object
:param response: The base64 encoded, XML string containing the samlp:Response
:type response: string
"""
self.__settings = settings
self.response = b64decode(response)
self.document = etree.fromstring(self.response)
self.decrypted_document = None
self.encrypted = None
# Quick check for the presence of EncryptedAssertion
encrypted_assertion_nodes = self.__query('//saml:EncryptedAssertion')
if encrypted_assertion_nodes:
decrypted_document = deepcopy(self.document)
self.encrypted = True
self.decrypted_document = self.__decrypt_assertion(decrypted_document)
[docs] def is_valid(self, request_data, request_id=None):
"""
Constructs the response object.
:param request_id: Optional argument. The ID of the AuthNRequest sent by this SP to the IdP
:type request_id: string
:returns: True if the SAML Response is valid, False if not
:rtype: bool
"""
try:
# Checks SAML version
if self.document.get('Version', None) != '2.0':
raise Exception('Unsupported SAML version')
# Checks that ID exists
if self.document.get('ID', None) is None:
raise Exception('Missing ID attribute on SAML Response')
# Checks that the response only has one assertion
if not self.validate_num_assertions():
raise Exception('Multiple assertions are not supported')
# Checks that the response has the SUCCESS status
self.check_status()
idp_data = self.__settings.get_idp_data()
idp_entityid = idp_data.get('entityId', '')
sp_data = self.__settings.get_sp_data()
sp_entityid = sp_data.get('entityId', '')
sign_nodes = self.__query('//ds:Signature')
signed_elements = []
for sign_node in sign_nodes:
signed_elements.append(sign_node.getparent().tag)
if self.__settings.is_strict():
res = OneLogin_Saml2_Utils.validate_xml(etree.tostring(self.document), 'saml-schema-protocol-2.0.xsd', self.__settings.is_debug_active())
if not isinstance(res, Document):
raise Exception('Invalid SAML Response. Not match the saml-schema-protocol-2.0.xsd')
security = self.__settings.get_security_data()
current_url = OneLogin_Saml2_Utils.get_self_url_no_query(request_data)
# Check if the InResponseTo of the Response matchs the ID of the AuthNRequest (requestId) if provided
in_response_to = self.document.get('InResponseTo', None)
if in_response_to and request_id:
if in_response_to != request_id:
raise Exception('The InResponseTo of the Response: %s, does not match the ID of the AuthNRequest sent by the SP: %s' % (in_response_to, request_id))
if not self.encrypted and security.get('wantAssertionsEncrypted', False):
raise Exception('The assertion of the Response is not encrypted and the SP require it')
if security.get('wantNameIdEncrypted', False):
encrypted_nameid_nodes = self.__query_assertion('/saml:Subject/saml:EncryptedID/xenc:EncryptedData')
if not encrypted_nameid_nodes:
raise Exception('The NameID of the Response is not encrypted and the SP require it')
# Checks that there is at least one AttributeStatement
attribute_statement_nodes = self.__query_assertion('/saml:AttributeStatement')
if not attribute_statement_nodes:
raise Exception('There is no AttributeStatement on the Response')
# Validates Asserion timestamps
if not self.validate_timestamps():
raise Exception('Timing issues (please check your clock settings)')
encrypted_attributes_nodes = self.__query_assertion('/saml:AttributeStatement/saml:EncryptedAttribute')
if encrypted_attributes_nodes:
raise Exception('There is an EncryptedAttribute in the Response and this SP not support them')
# Checks destination
destination = self.document.get('Destination', None)
if destination:
if destination not in current_url:
raise Exception('The response was received at %s instead of %s' % (current_url, destination))
# Checks audience
valid_audiences = self.get_audiences()
if valid_audiences and sp_entityid not in valid_audiences:
raise Exception('%s is not a valid audience for this Response' % sp_entityid)
# Checks the issuers
issuers = self.get_issuers()
for issuer in issuers:
if not issuer or issuer != idp_entityid:
raise Exception('Invalid issuer in the Assertion/Response')
# Checks the session Expiration
session_expiration = self.get_session_not_on_or_after()
if not session_expiration and session_expiration <= time():
raise Exception('The attributes have expired, based on the SessionNotOnOrAfter of the AttributeStatement of this Response')
# Checks the SubjectConfirmation, at least one SubjectConfirmation must be valid
any_subject_confirmation = False
subject_confirmation_nodes = self.__query_assertion('/saml:Subject/saml:SubjectConfirmation')
for scn in subject_confirmation_nodes:
method = scn.get('Method', None)
if method and method != OneLogin_Saml2_Constants.CM_BEARER:
continue
scData = scn.find('saml:SubjectConfirmationData', namespaces=OneLogin_Saml2_Constants.NSMAP)
if scData is None:
continue
else:
irt = scData.get('InResponseTo', None)
if irt != in_response_to:
continue
recipient = scData.get('Recipient', None)
if recipient not in current_url:
continue
nooa = scData.get('NotOnOrAfter', None)
if nooa:
parsed_nooa = OneLogin_Saml2_Utils.parse_SAML_to_time(nooa)
if parsed_nooa <= time():
continue
nb = scData.get('NotBefore', None)
if nb:
parsed_nb = OneLogin_Saml2_Utils.parse_SAML_to_time(nb)
if (parsed_nb > time()):
continue
any_subject_confirmation = True
break
if not any_subject_confirmation:
raise Exception('A valid SubjectConfirmation was not found on this Response')
if security.get('wantAssertionsSigned', False) and 'saml:Assertion' not in signed_elements:
raise Exception('The Assertion of the Response is not signed and the SP require it')
if security.get('wantMessagesSigned', False) and 'samlp:Response' not in signed_elements:
raise Exception('The Message of the Response is not signed and the SP require it')
document_to_validate = None
if len(signed_elements) > 0:
cert = idp_data.get('x509cert', None)
fingerprint = idp_data.get('certFingerprint', None)
# Only validates the first sign found
if 'samlp:Response' in signed_elements:
document_to_validate = self.document
else:
if self.encrypted:
document_to_validate = self.decrypted_document
else:
document_to_validate = self.document
if document_to_validate is not None:
if not OneLogin_Saml2_Utils.validate_sign(document_to_validate, cert, fingerprint):
raise Exception('Signature validation failed. SAML Response rejected')
return True
except:
debug = self.__settings.is_debug_active()
if debug:
print sys.exc_info()[0]
return False
[docs] def check_status(self):
"""
Check if the status of the response is success or not
:raises: Exception. If the status is not success
"""
status = OneLogin_Saml2_Utils.get_status(self.document)
code = status.get('code', None)
if code and code != OneLogin_Saml2_Constants.STATUS_SUCCESS:
splited_code = code.split(':')
printable_code = splited_code.pop()
status_exception_msg = 'The status code of the Response was not Success, was %s' % printable_code
status_msg = status.get('msg', None)
if status_msg:
status_exception_msg += ' -> ' + status_msg
raise Exception(status_exception_msg)
[docs] def get_audiences(self):
"""
Gets the audiences
:returns: The valid audiences for the SAML Response
:rtype: list
"""
audiences = []
audience_nodes = self.__query_assertion('/saml:Conditions/saml:AudienceRestriction/saml:Audience')
for audience_node in audience_nodes:
audiences.append(audience_node.text)
[docs] def get_issuers(self):
"""
Gets the issuers (from message and from assertion)
:returns: The issuers
:rtype: list
"""
issuers = []
message_issuer_nodes = self.__query('/samlp:Response/saml:Issuer')
if message_issuer_nodes:
issuers.append(message_issuer_nodes[0].text)
assertion_issuer_nodes = self.__query_assertion('/saml:Issuer')
if assertion_issuer_nodes:
issuers.append(assertion_issuer_nodes[0].text)
return list(set(issuers))
[docs] def get_nameid_data(self):
"""
Gets the NameID Data provided by the SAML Response from the IdP
:returns: Name ID Data (Value, Format, NameQualifier, SPNameQualifier)
:rtype: dict
"""
nameid = None
encrypted_id_data_nodes = self.__query_assertion('/saml:Subject/saml:EncryptedID/xenc:EncryptedData')
if encrypted_id_data_nodes:
encrypted_data = encrypted_id_data_nodes[0]
xmlsec.initialize()
# Load the key into the xmlsec context
key = self.__settings.get_sp_key()
file_key = OneLogin_Saml2_Utils.write_temp_file(key) # FIXME avoid writing a file
enc_key = xmlsec.Key.load(file_key.name, xmlsec.KeyDataFormatPem, None)
enc_key.name = basename(file_key.name)
file_key.close()
enc_ctx = xmlsec.EncCtx()
enc_ctx.encKey = enc_key
nameid = OneLogin_Saml2_Utils.decrypt_element(encrypted_data, enc_ctx)
else:
nameid_nodes = self.__query_assertion('/saml:Subject/saml:NameID')
if nameid_nodes:
nameid = nameid_nodes[0]
if nameid is None:
raise Exception('Not NameID found in the assertion of the Response')
nameid_data = {'Value': nameid.text}
for attr in ['Format', 'SPNameQualifier', 'NameQualifier']:
value = nameid.get(attr, None)
if value:
nameid_data[attr] = value
return nameid_data
[docs] def get_nameid(self):
"""
Gets the NameID provided by the SAML Response from the IdP
:returns: NameID (value)
:rtype: string
"""
nameid_data = self.get_nameid_data()
return nameid_data['Value']
[docs] def get_session_not_on_or_after(self):
"""
Gets the SessionNotOnOrAfter from the AuthnStatement
Could be used to set the local session expiration
:returns: The SessionNotOnOrAfter value
:rtype: time|None
"""
not_on_or_after = None
authn_statement_nodes = self.__query_assertion('/saml:AuthnStatement[@SessionNotOnOrAfter]')
if authn_statement_nodes:
not_on_or_after = OneLogin_Saml2_Utils.parse_SAML_to_time(authn_statement_nodes[0].get('SessionNotOnOrAfter'))
return not_on_or_after
[docs] def get_session_index(self):
"""
Gets the SessionIndex from the AuthnStatement
Could be used to be stored in the local session in order
to be used in a future Logout Request that the SP could
send to the SP, to set what specific session must be deleted
:returns: The SessionIndex value
:rtype: string|None
"""
session_index = None
authn_statement_nodes = self.__query_assertion('/saml:AuthnStatement[@SessionIndex]')
if authn_statement_nodes:
session_index = authn_statement_nodes[0].get('SessionIndex')
return session_index
[docs] def get_attributes(self):
"""
Gets the Attributes from the AttributeStatement element.
EncryptedAttributes are not supported
"""
attributes = {}
attribute_nodes = self.__query_assertion('/saml:AttributeStatement/saml:Attribute')
for attribute_node in attribute_nodes:
attr_name = attribute_node.get('Name')
values = []
for attr in attribute_node.iterchildren('{%s}AttributeValue' % OneLogin_Saml2_Constants.NSMAP['saml']):
values.append(attr.text)
attributes[attr_name] = values
return attributes
[docs] def validate_num_assertions(self):
"""
Verifies that the document only contains a single Assertion (encrypted or not)
:returns: True if only 1 assertion encrypted or not
:rtype: bool
"""
encrypted_assertion_nodes = self.__query('//saml:EncryptedAssertion')
assertion_nodes = self.__query('//saml:Assertion')
return (len(encrypted_assertion_nodes) + len(assertion_nodes)) == 1
[docs] def validate_timestamps(self):
"""
Verifies that the document is valid according to Conditions Element
:returns: True if the condition is valid, False otherwise
:rtype: bool
"""
conditions_nodes = self.__query('//saml:Conditions')
for conditions_node in conditions_nodes:
nb_attr = conditions_node.get('NotBefore')
nooa_attr = conditions_node.get('NotOnOrAfter')
if nb_attr and OneLogin_Saml2_Utils.parse_SAML_to_time(nb_attr) > time() + OneLogin_Saml2_Constants.ALOWED_CLOCK_DRIFT:
return False
if nooa_attr and OneLogin_Saml2_Utils.parse_SAML_to_time(nooa_attr) + OneLogin_Saml2_Constants.ALOWED_CLOCK_DRIFT <= time():
return False
return True
def __query_assertion(self, xpath_expr):
"""
Extracts nodes that match the query from the Assertion
:param query: Xpath Expresion
:type query: String
:returns: The queried nodes
:rtype: list
"""
if self.encrypted:
assertion_expr = '/saml:EncryptedAssertion/saml:Assertion'
else:
assertion_expr = '/saml:Assertion'
signature_expr = '/ds:Signature/ds:SignedInfo/ds:Reference'
signed_assertion_query = '/samlp:Response' + assertion_expr + signature_expr
assertion_reference_nodes = self.__query(signed_assertion_query)
if not assertion_reference_nodes:
# Check if the message is signed
signed_message_query = '/samlp:Response' + signature_expr
message_reference_nodes = self.__query(signed_message_query)
if message_reference_nodes:
id = message_reference_nodes[0].get('URI')
final_query = "/samlp:Response[@ID='%s']/" % id[1:]
else:
final_query = "/samlp:Response/"
final_query += assertion_expr
else:
id = assertion_reference_nodes[0].get('URI')
final_query = '/samlp:Response' + assertion_expr + "[@ID='%s']" % id[1:]
final_query += xpath_expr
return self.__query(final_query)
def __query(self, query):
"""
Extracts nodes that match the query from the Response
:param query: Xpath Expresion
:type query: String
:returns: The queried nodes
:rtype: list
"""
if self.encrypted:
document = self.decrypted_document
else:
document = self.document
return OneLogin_Saml2_Utils.query(document, query)
def __decrypt_assertion(self, dom):
"""
Decrypts the Assertion
:raises: Exception if no private key available
:param dom: Encrypted Assertion
:type dom: Element
:returns: Decrypted Assertion
:rtype: Element
"""
key = self.__settings.get_sp_key()
if not key:
raise Exception('No private key available, check settings')
# TODO Study how decrypt assertion