# -*- coding: utf-8 -*-
# Copyright (c) 2014, OneLogin, Inc.
# All rights reserved.
from base64 import b64encode
from urllib import urlencode, quote
from xml.etree.ElementTree import tostring
import dm.xmlsec.binding as xmlsec
from saml2.settings import OneLogin_Saml2_Settings
from saml2.response import OneLogin_Saml2_Response
from saml2.errors import OneLogin_Saml2_Error
from saml2.logout_response import OneLogin_Saml2_Logout_Response
from saml2.constants import OneLogin_Saml2_Constants
from saml2.utils import OneLogin_Saml2_Utils
from saml2.logout_request import OneLogin_Saml2_Logout_Request
from saml2.authn_request import OneLogin_Saml2_Authn_Request
[docs]class OneLogin_Saml2_Auth(object):
def __init__(self, request_data, old_settings=None):
"""
Initializes the SP SAML instance.
Arguments are:
* (dict) old_settings. Setting data
"""
self.__request_data = request_data
self.__settings = OneLogin_Saml2_Settings(old_settings)
self.__attributes = []
self.__nameid = ''
self.__authenticated = False
self.__errors = []
[docs] def get_settings(self):
"""
Returns the settings info
:return: Setting info
:rtype: OneLogin_Saml2_Setting object
"""
return self.__settings
[docs] def set_strict(self, value):
"""
Set the strict mode active/disable
:param value:
:type value: bool
"""
assert isinstance(value, bool)
self.__settings.set_strict(value)
[docs] def process_response(self, request_id=None):
"""
Process the SAML Response sent by the IdP.
:param request_id: Is an optional argumen. Is the ID of the AuthNRequest sent by this SP to the IdP.
:type request_id: string
:raises: OneLogin_Saml2_Error.SAML_RESPONSE_NOT_FOUND, when a POST with a SAMLResponse is not found
"""
self.__errors = []
if 'post_data' in self.__request_data and 'SAMLResponse' in self.__request_data['post_data']:
# AuthnResponse -- HTTP_POST Binding
response = OneLogin_Saml2_Response(self.__settings, self.__request_data['post_data']['SAMLResponse'])
if response.is_valid(request_id):
self.__attributes = response.get_attributes()
self.__nameid = response.get_nameid()
self.__authenticated = True
else:
self.__errors.append('invalid_response')
else:
self.__errors.append('invalid_binding')
raise OneLogin_Saml2_Error(
'SAML Response not found, Only supported HTTP_POST Binding',
OneLogin_Saml2_Error.SAML_RESPONSE_NOT_FOUND
)
[docs] def process_slo(self, keep_local_session=False, request_id=None, delete_session_cb=None):
"""
Process the SAML Logout Response / Logout Request sent by the IdP.
:param keep_local_session: When false will destroy the local session, otherwise will destroy it
:type keep_local_session: bool
:param request_id: The ID of the LogoutRequest sent by this SP to the IdP
:type request_id: string
:returns: Redirection url
"""
self.__errors = []
if 'get_data' in self.__request_data and 'SAMLResponse' in self.__request_data['get_data']:
logout_response = OneLogin_Saml2_Logout_Response(self.__settings, self.__request_data['get_data']['SAMLResponse'])
if not logout_response.is_valid(self.__request_data, request_id):
self.__errors.append('invalid_logout_response')
elif logout_response.get_status() != OneLogin_Saml2_Constants.STATUS_SUCCESS:
self.__errors.append('logout_not_success')
elif not keep_local_session:
OneLogin_Saml2_Utils.delete_local_session(delete_session_cb)
elif 'get_data' in self.__request_data and 'SAMLRequest' in self.__request_data['get_data']:
request = OneLogin_Saml2_Utils.decode_base64_and_inflate(self.__request_data['get_data']['SAMLRequest'])
if not OneLogin_Saml2_Logout_Request.is_valid(self.__settings, request, self.__request_data):
self.__errors.append('invalid_logout_request')
else:
if not keep_local_session:
OneLogin_Saml2_Utils.delete_local_session(delete_session_cb)
in_response_to = OneLogin_Saml2_Logout_Request.get_id(request)
response_builder = OneLogin_Saml2_Logout_Response(self.__settings)
response_builder.build(in_response_to)
logout_response = response_builder.get_response()
parameters = {'SAMLResponse': logout_response}
if 'RelayState' in self.__request_data['get_data']:
parameters['RelayState'] = self.__request_data['get_data']['RelayState']
security = self.__settings.get_security_data()
if 'logoutResponseSigned' in security and security['logoutResponseSigned']:
signature = self.build_response_signature(logout_response, parameters.get('RelayState', None))
parameters['SigAlg'] = OneLogin_Saml2_Constants.RSA_SHA1
parameters['Signature'] = signature
return self.redirect_to(self.get_slo_url(), parameters)
else:
self.__errors.append('invalid_binding')
raise OneLogin_Saml2_Error(
'SAML LogoutRequest/LogoutResponse not found. Only supported HTTP_REDIRECT Binding',
OneLogin_Saml2_Error.SAML_LOGOUTMESSAGE_NOT_FOUND
)
[docs] def redirect_to(self, url=None, parameters={}):
"""
Redirects the user to the url past by parameter or to the url that we defined in our SSO Request.
:param url: The target URL to redirect the user
:type url: string
:param parameters: Extra parameters to be passed as part of the url
:type parameters: dict
:returns: Redirection url
"""
if url is None and 'RelayState' in self.__request_data['get_data']:
url = self.__request_data['get_data']['RelayState']
return OneLogin_Saml2_Utils.redirect(url, parameters, request_data=self.__request_data)
[docs] def is_authenticated(self):
"""
Checks if the user is authenticated or not.
:returns: True if is authenticated, False if not
:rtype: bool
"""
return self.__authenticated
[docs] def get_attributes(self):
"""
Returns the set of SAML attributes.
:returns: SAML attributes
:rtype: dict
"""
return self.__attributes
[docs] def get_nameid(self):
"""
Returns the nameID.
:returns: NameID
:rtype: string
"""
return self.__nameid
[docs] def get_errors(self):
"""
Returns a list with code errors if something went wrong
:returns: List of errors
:rtype: list
"""
return self.__errors
[docs] def get_attribute(self, name):
"""
Returns the requested SAML attribute.
:param name: Name of the attribute
:type name: string
:returns: Attribute value if exists or None
:rtype: string
"""
assert isinstance(name, basestring)
value = None
if name in self.__attributes.keys():
value = self.__attributes[name]
return value
[docs] def login(self, return_to=None):
"""
Initiates the SSO process.
:param return_to: Optional argument. The target URL the user should be redirected to after login.
:type return_to: string
:returns: Redirection url
"""
authn_request = OneLogin_Saml2_Authn_Request(self.__settings)
saml_request = authn_request.get_request()
parameters = {'SAMLRequest': saml_request}
if return_to is not None:
parameters['RelayState'] = return_to
else:
parameters['RelayState'] = OneLogin_Saml2_Utils.get_self_url_no_query(self.__request_data)
security = self.__settings.get_security_data()
if security.get('authnRequestsSigned', False):
parameters['SigAlg'] = OneLogin_Saml2_Constants.RSA_SHA1
parameters['Signature'] = self.build_request_signature(saml_request, parameters['RelayState'])
return self.redirect_to(self.get_sso_url(), parameters)
[docs] def logout(self, return_to=None):
"""
Initiates the SLO process.
:param return_to: Optional argument. The target URL the user should be redirected to after logout.
:type return_to: string
:returns: Redirection url
"""
slo_url = self.get_slo_url()
if slo_url is None:
raise OneLogin_Saml2_Error(
'The IdP does not support Single Log Out',
OneLogin_Saml2_Error.SAML_SINGLE_LOGOUT_NOT_SUPPORTED
)
logout_request = OneLogin_Saml2_Logout_Request(self.__settings)
saml_request = logout_request.get_request()
parameters = {'SAMLRequest': logout_request.get_request()}
if return_to is not None:
parameters['RelayState'] = return_to
else:
parameters['RelayState'] = OneLogin_Saml2_Utils.get_self_url_no_query(self.__request_data)
security = self.__settings.get_security_data()
if security.get('logoutRequestSigned', False):
parameters['SigAlg'] = OneLogin_Saml2_Constants.RSA_SHA1
parameters['Signature'] = self.build_request_signature(saml_request, parameters['RelayState'])
return self.redirect_to(slo_url, parameters)
[docs] def get_sso_url(self):
"""
Gets the SSO url.
:returns: An URL, the SSO endpoint of the IdP
:rtype: string
"""
idp_data = self.__settings.get_idp_data()
return idp_data['singleSignOnService']['url']
[docs] def get_slo_url(self):
"""
Gets the SLO url.
:returns: An URL, the SLO endpoint of the IdP
:rtype: string
"""
url = None
idp_data = self.__settings.get_idp_data()
if 'singleLogoutService' in idp_data.keys() and 'url' in idp_data['singleLogoutService']:
url = idp_data['singleLogoutService']['url']
return url
[docs] def build_request_signature(self, saml_request, relay_state):
"""
Builds the Signature of the SAML Request.
:param saml_request: The SAML Request
:type saml_request: string
:param relay_state: The target URL the user should be redirected to
:type relay_state: string
"""
if not self.__settings.check_sp_certs():
raise OneLogin_Saml2_Error(
"Trying to sign the SAML Request but can't load the SP certs",
OneLogin_Saml2_Error.SP_CERTS_NOT_FOUND
)
xmlsec.initialize()
# Load the key into the xmlsec context
key = self.__settings.get_sp_key()
file_key = OneLogin_Saml2_Utils.write_temp_file(key) # FIXME avoid writing a file
dsig_ctx = xmlsec.DSigCtx()
dsig_ctx.signKey = xmlsec.Key.load(file_key.name, xmlsec.KeyDataFormatPem, None)
file_key.close()
data = {
'SAMLRequest': quote(saml_request),
'RelayState': quote(relay_state),
'SignAlg': quote(OneLogin_Saml2_Constants.RSA_SHA1),
}
msg = urlencode(data)
signature = dsig_ctx.signBinary(msg, xmlsec.TransformRsaSha1)
return b64encode(signature)
[docs] def build_response_signature(self, saml_response, relay_state):
"""
Builds the Signature of the SAML Response.
:param saml_request: The SAML Response
:type saml_request: string
:param relay_state: The target URL the user should be redirected to
:type relay_state: string
"""
if not self.__settings.check_sp_certs():
raise OneLogin_Saml2_Error(
"Trying to sign the SAML Response but can't load the SP certs",
OneLogin_Saml2_Error.SP_CERTS_NOT_FOUND
)
xmlsec.initialize()
# Load the key into the xmlsec context
key = self.__settings.get_sp_key()
file_key = OneLogin_Saml2_Utils.write_temp_file(key) # FIXME avoid writing a file
dsig_ctx = xmlsec.DSigCtx()
dsig_ctx.signKey = xmlsec.Key.load(file_key.name, xmlsec.KeyDataFormatPem, None)
file_key.close()
data = {
'SAMLResponse': quote(saml_response),
'RelayState': quote(relay_state),
'SignAlg': quote(OneLogin_Saml2_Constants.RSA_SHA1),
}
msg = urlencode(data)
import pdb; dbp.set_trace()
print msg
data2 = {
'SAMLResponse': saml_response,
'RelayState': relay_state,
'SignAlg': OneLogin_Saml2_Constants.RSA_SHA1,
}
msg2 = urlencode(data2)
print msg2
signature = dsig_ctx.signBinary(msg, xmlsec.TransformRsaSha1)
return b64encode(signature)