Source code for saml2.auth

# -*- coding: utf-8 -*-

# Copyright (c) 2014, OneLogin, Inc.
# All rights reserved.

from base64 import b64encode
from urllib import urlencode, quote
from xml.etree.ElementTree import tostring

import dm.xmlsec.binding as xmlsec

from saml2.settings import OneLogin_Saml2_Settings
from saml2.response import OneLogin_Saml2_Response
from saml2.errors import OneLogin_Saml2_Error
from saml2.logout_response import OneLogin_Saml2_Logout_Response
from saml2.constants import OneLogin_Saml2_Constants
from saml2.utils import OneLogin_Saml2_Utils
from saml2.logout_request import OneLogin_Saml2_Logout_Request
from saml2.authn_request import OneLogin_Saml2_Authn_Request


[docs]class OneLogin_Saml2_Auth(object): def __init__(self, request_data, old_settings=None): """ Initializes the SP SAML instance. Arguments are: * (dict) old_settings. Setting data """ self.__request_data = request_data self.__settings = OneLogin_Saml2_Settings(old_settings) self.__attributes = [] self.__nameid = '' self.__authenticated = False self.__errors = []
[docs] def get_settings(self): """ Returns the settings info :return: Setting info :rtype: OneLogin_Saml2_Setting object """ return self.__settings
[docs] def set_strict(self, value): """ Set the strict mode active/disable :param value: :type value: bool """ assert isinstance(value, bool) self.__settings.set_strict(value)
[docs] def process_response(self, request_id=None): """ Process the SAML Response sent by the IdP. :param request_id: Is an optional argumen. Is the ID of the AuthNRequest sent by this SP to the IdP. :type request_id: string :raises: OneLogin_Saml2_Error.SAML_RESPONSE_NOT_FOUND, when a POST with a SAMLResponse is not found """ self.__errors = [] if 'post_data' in self.__request_data and 'SAMLResponse' in self.__request_data['post_data']: # AuthnResponse -- HTTP_POST Binding response = OneLogin_Saml2_Response(self.__settings, self.__request_data['post_data']['SAMLResponse']) if response.is_valid(request_id): self.__attributes = response.get_attributes() self.__nameid = response.get_nameid() self.__authenticated = True else: self.__errors.append('invalid_response') else: self.__errors.append('invalid_binding') raise OneLogin_Saml2_Error( 'SAML Response not found, Only supported HTTP_POST Binding', OneLogin_Saml2_Error.SAML_RESPONSE_NOT_FOUND )
[docs] def process_slo(self, keep_local_session=False, request_id=None, delete_session_cb=None): """ Process the SAML Logout Response / Logout Request sent by the IdP. :param keep_local_session: When false will destroy the local session, otherwise will destroy it :type keep_local_session: bool :param request_id: The ID of the LogoutRequest sent by this SP to the IdP :type request_id: string :returns: Redirection url """ self.__errors = [] if 'get_data' in self.__request_data and 'SAMLResponse' in self.__request_data['get_data']: logout_response = OneLogin_Saml2_Logout_Response(self.__settings, self.__request_data['get_data']['SAMLResponse']) if not logout_response.is_valid(self.__request_data, request_id): self.__errors.append('invalid_logout_response') elif logout_response.get_status() != OneLogin_Saml2_Constants.STATUS_SUCCESS: self.__errors.append('logout_not_success') elif not keep_local_session: OneLogin_Saml2_Utils.delete_local_session(delete_session_cb) elif 'get_data' in self.__request_data and 'SAMLRequest' in self.__request_data['get_data']: request = OneLogin_Saml2_Utils.decode_base64_and_inflate(self.__request_data['get_data']['SAMLRequest']) if not OneLogin_Saml2_Logout_Request.is_valid(self.__settings, request, self.__request_data): self.__errors.append('invalid_logout_request') else: if not keep_local_session: OneLogin_Saml2_Utils.delete_local_session(delete_session_cb) in_response_to = OneLogin_Saml2_Logout_Request.get_id(request) response_builder = OneLogin_Saml2_Logout_Response(self.__settings) response_builder.build(in_response_to) logout_response = response_builder.get_response() parameters = {'SAMLResponse': logout_response} if 'RelayState' in self.__request_data['get_data']: parameters['RelayState'] = self.__request_data['get_data']['RelayState'] security = self.__settings.get_security_data() if 'logoutResponseSigned' in security and security['logoutResponseSigned']: signature = self.build_response_signature(logout_response, parameters.get('RelayState', None)) parameters['SigAlg'] = OneLogin_Saml2_Constants.RSA_SHA1 parameters['Signature'] = signature return self.redirect_to(self.get_slo_url(), parameters) else: self.__errors.append('invalid_binding') raise OneLogin_Saml2_Error( 'SAML LogoutRequest/LogoutResponse not found. Only supported HTTP_REDIRECT Binding', OneLogin_Saml2_Error.SAML_LOGOUTMESSAGE_NOT_FOUND )
[docs] def redirect_to(self, url=None, parameters={}): """ Redirects the user to the url past by parameter or to the url that we defined in our SSO Request. :param url: The target URL to redirect the user :type url: string :param parameters: Extra parameters to be passed as part of the url :type parameters: dict :returns: Redirection url """ if url is None and 'RelayState' in self.__request_data['get_data']: url = self.__request_data['get_data']['RelayState'] return OneLogin_Saml2_Utils.redirect(url, parameters, request_data=self.__request_data)
[docs] def is_authenticated(self): """ Checks if the user is authenticated or not. :returns: True if is authenticated, False if not :rtype: bool """ return self.__authenticated
[docs] def get_attributes(self): """ Returns the set of SAML attributes. :returns: SAML attributes :rtype: dict """ return self.__attributes
[docs] def get_nameid(self): """ Returns the nameID. :returns: NameID :rtype: string """ return self.__nameid
[docs] def get_errors(self): """ Returns a list with code errors if something went wrong :returns: List of errors :rtype: list """ return self.__errors
[docs] def get_attribute(self, name): """ Returns the requested SAML attribute. :param name: Name of the attribute :type name: string :returns: Attribute value if exists or None :rtype: string """ assert isinstance(name, basestring) value = None if name in self.__attributes.keys(): value = self.__attributes[name] return value
[docs] def login(self, return_to=None): """ Initiates the SSO process. :param return_to: Optional argument. The target URL the user should be redirected to after login. :type return_to: string :returns: Redirection url """ authn_request = OneLogin_Saml2_Authn_Request(self.__settings) saml_request = authn_request.get_request() parameters = {'SAMLRequest': saml_request} if return_to is not None: parameters['RelayState'] = return_to else: parameters['RelayState'] = OneLogin_Saml2_Utils.get_self_url_no_query(self.__request_data) security = self.__settings.get_security_data() if security.get('authnRequestsSigned', False): parameters['SigAlg'] = OneLogin_Saml2_Constants.RSA_SHA1 parameters['Signature'] = self.build_request_signature(saml_request, parameters['RelayState']) return self.redirect_to(self.get_sso_url(), parameters)
[docs] def logout(self, return_to=None): """ Initiates the SLO process. :param return_to: Optional argument. The target URL the user should be redirected to after logout. :type return_to: string :returns: Redirection url """ slo_url = self.get_slo_url() if slo_url is None: raise OneLogin_Saml2_Error( 'The IdP does not support Single Log Out', OneLogin_Saml2_Error.SAML_SINGLE_LOGOUT_NOT_SUPPORTED ) logout_request = OneLogin_Saml2_Logout_Request(self.__settings) saml_request = logout_request.get_request() parameters = {'SAMLRequest': logout_request.get_request()} if return_to is not None: parameters['RelayState'] = return_to else: parameters['RelayState'] = OneLogin_Saml2_Utils.get_self_url_no_query(self.__request_data) security = self.__settings.get_security_data() if security.get('logoutRequestSigned', False): parameters['SigAlg'] = OneLogin_Saml2_Constants.RSA_SHA1 parameters['Signature'] = self.build_request_signature(saml_request, parameters['RelayState']) return self.redirect_to(slo_url, parameters)
[docs] def get_sso_url(self): """ Gets the SSO url. :returns: An URL, the SSO endpoint of the IdP :rtype: string """ idp_data = self.__settings.get_idp_data() return idp_data['singleSignOnService']['url']
[docs] def get_slo_url(self): """ Gets the SLO url. :returns: An URL, the SLO endpoint of the IdP :rtype: string """ url = None idp_data = self.__settings.get_idp_data() if 'singleLogoutService' in idp_data.keys() and 'url' in idp_data['singleLogoutService']: url = idp_data['singleLogoutService']['url'] return url
[docs] def build_request_signature(self, saml_request, relay_state): """ Builds the Signature of the SAML Request. :param saml_request: The SAML Request :type saml_request: string :param relay_state: The target URL the user should be redirected to :type relay_state: string """ if not self.__settings.check_sp_certs(): raise OneLogin_Saml2_Error( "Trying to sign the SAML Request but can't load the SP certs", OneLogin_Saml2_Error.SP_CERTS_NOT_FOUND ) xmlsec.initialize() # Load the key into the xmlsec context key = self.__settings.get_sp_key() file_key = OneLogin_Saml2_Utils.write_temp_file(key) # FIXME avoid writing a file dsig_ctx = xmlsec.DSigCtx() dsig_ctx.signKey = xmlsec.Key.load(file_key.name, xmlsec.KeyDataFormatPem, None) file_key.close() data = { 'SAMLRequest': quote(saml_request), 'RelayState': quote(relay_state), 'SignAlg': quote(OneLogin_Saml2_Constants.RSA_SHA1), } msg = urlencode(data) signature = dsig_ctx.signBinary(msg, xmlsec.TransformRsaSha1) return b64encode(signature)
[docs] def build_response_signature(self, saml_response, relay_state): """ Builds the Signature of the SAML Response. :param saml_request: The SAML Response :type saml_request: string :param relay_state: The target URL the user should be redirected to :type relay_state: string """ if not self.__settings.check_sp_certs(): raise OneLogin_Saml2_Error( "Trying to sign the SAML Response but can't load the SP certs", OneLogin_Saml2_Error.SP_CERTS_NOT_FOUND ) xmlsec.initialize() # Load the key into the xmlsec context key = self.__settings.get_sp_key() file_key = OneLogin_Saml2_Utils.write_temp_file(key) # FIXME avoid writing a file dsig_ctx = xmlsec.DSigCtx() dsig_ctx.signKey = xmlsec.Key.load(file_key.name, xmlsec.KeyDataFormatPem, None) file_key.close() data = { 'SAMLResponse': quote(saml_response), 'RelayState': quote(relay_state), 'SignAlg': quote(OneLogin_Saml2_Constants.RSA_SHA1), } msg = urlencode(data) import pdb; dbp.set_trace() print msg data2 = { 'SAMLResponse': saml_response, 'RelayState': relay_state, 'SignAlg': OneLogin_Saml2_Constants.RSA_SHA1, } msg2 = urlencode(data2) print msg2 signature = dsig_ctx.signBinary(msg, xmlsec.TransformRsaSha1) return b64encode(signature)