Source code for mwoauth.flask

"""
.. autoclass:: mwoauth.flask.MWOAuth
  :members:
  :member-order: bysource

.. autofunction:: mwoauth.flask.authorized
"""

import logging
from functools import wraps

import flask
from requests_oauthlib import OAuth1

from six.moves.urllib.parse import urljoin

from .errors import OAuthException
from .handshaker import Handshaker
from .tokens import AccessToken, RequestToken

logger = logging.getLogger(__name__)


[docs]class MWOAuth: """ Implements a basic MediaWiki OAuth pattern with a set of routes - /mwoauth/initiate -- Starts an OAuth handshake - /mwoauth/callback -- Completes an OAuth handshake - /mwoauth/identify -- Gets identity information about an authorized user - /mwoauth/logout -- Dicards OAuth tokens and user identity There's also a convenient decorator provided :func:`~mwoauth.flask.MWOAuth.authorized`. When applied to a routing function, this decorator will redirect non-authorized users to /mwoauth/initiate with a "?next=" that will return them to the originating route once authorization is completed. :Example: .. code-block:: python from flask import Flask import mwoauth import mwoauth.flask app = Flask(__name__) @app.route("/") def index(): return "Hello world" flask_mwoauth = mwoauth.flask.MWOAuth( "https://en.wikipedia.org", mwoauth.ConsumerToken("...", "...")) app.register_blueprint(flask_mwoauth.bp) @app.route("/my_settings/") @mwoauth.flask.authorized def my_settings(): return flask_mwoauth.identity() :Parameters: host : str The host name (including protocol) of the MediaWiki wiki to use for the OAuth handshake. consumer_token : :class:`mwoauth.ConsumerToken` The consumer token information user_agent : str A User-Agent header to include with requests. A warning will be logged is this is not set. default_next : str Where should the user be redirected after an OAuth handshake when no '?next=' param is provided render_logout : func A method that renders the logout page seen at /mwoauth/logout render_identify : func A method that renders the identify page seen at /mwoauth/identify. Takes one positional argument -- the `identity` dictionary returned by MediaWiki. render_error : func A method that renders an error. Takes two arguements: * message : str (The error message) * status : int (The https status number) **kwargs : dict Parameters to be passed to :class:`flask.Blueprint` during its construction. """ def __init__(self, host, consumer_token, user_agent=None, default_next="index", render_logout=None, render_indentify=None, render_error=None, **kwargs): self.bp = flask.Blueprint('mwoauth', __name__, **kwargs) self.host = host self.user_agent = user_agent self.consumer_token = consumer_token self.handshaker = None self.default_next = default_next self.render_logout = render_logout or generic_logout self.render_identify = render_indentify or generic_identify self.render_error = render_error or generic_error @self.bp.route("/mwoauth/initiate/") def mwoauth_initiate(): """Start an OAuth handshake.""" mw_authorizer_url, request_token = self._handshaker().initiate() rt_session_key = _str(request_token.key) + "_request_token" next_session_key = _str(request_token.key) + "_next" # Ensures that Flask's default session storage strategy will work flask.session[rt_session_key] = \ dict(zip(request_token._fields, request_token)) if 'next' in flask.request.args: flask.session[next_session_key] = \ flask.request.args.get('next') return flask.redirect(mw_authorizer_url) @self.bp.route("/mwoauth/callback/") def mwoauth_callback(): """Complete the oauth handshake.""" # Generate session keys request_token_key = _str( flask.request.args.get('oauth_token', 'None')) rt_session_key = request_token_key + "_request_token" next_session_key = request_token_key + "_next" # Make sure we're continuing an in-progress handshake if rt_session_key not in flask.session: flask.session.pop(rt_session_key, None) flask.session.pop(next_session_key, None) return self.render_error( "OAuth callback failed. " + "Couldn't find request_token in session. " + "Are cookies disabled?", 403) # Complete the handshake try: access_token = self._handshaker().complete( RequestToken(**flask.session[rt_session_key]), _str(flask.request.query_string)) except OAuthException as e: flask.session.pop(rt_session_key, None) flask.session.pop(next_session_key, None) return self.render_error( "OAuth callback failed. " + str(e), 403) # Store the access token flask.session['mwoauth_access_token'] = \ dict(zip(access_token._fields, access_token)) # Identify the user identity = self._handshaker().identify(access_token) flask.session['mwoauth_identity'] = identity # Redirect to wherever we're supposed to go if next_session_key in flask.session: return flask.redirect( flask.url_for(flask.session[next_session_key])) else: return flask.redirect( flask.url_for(self.default_next)) @self.bp.route("/mwoauth/identify/") @authorized def mwoauth_identify(): """Return user information if authenticated.""" return flask.jsonify(flask.session['mwoauth_identity']) @self.bp.route("/mwoauth/logout/") def mwoauth_logout(): """Delete the local session.""" flask.session.pop('mwoauth_access_token', None) flask.session.pop('mwoauth_identity', None) if 'next' in flask.request.args: return flask.redirect( flask.url_for(flask.request.args.get('next'))) else: return self.render_logout() def _handshaker(self): if not self.handshaker: full_callback = urljoin( flask.request.url_root, flask.url_for("mwoauth.mwoauth_callback")) print(full_callback) self.handshaker = Handshaker( self.host, self.consumer_token, user_agent=self.user_agent, callback=full_callback) return self.handshaker @staticmethod def identify(): return flask.session.get('mwoauth_identity')
[docs] def mwapi_session(self, *args, **kwargs): """ Create :class:`mwapi.Session` that is authorized for the current user. `args` and `kwargs` are passed directly to :class:`mwapi.Session` """ import mwapi auth1 = self.generate_auth() return mwapi.Session(*args, user_agent=self.user_agent, auth=auth1, **kwargs)
[docs] def requests_session(self, *args, **kwargs): """ Create :class:`requests.Session` that is authorized for the current user. `args` and `kwargs` are passed directly to :class:`requests.Session` """ import requests auth1 = self.generate_auth() return requests.Session(*args, auth=auth1, **kwargs)
def generate_auth(self): if 'mwoauth_access_token' in flask.session: access_token = AccessToken( **flask.session['mwoauth_access_token']) auth1 = OAuth1(self.consumer_token.key, client_secret=self.consumer_token.secret, resource_owner_key=access_token.key, resource_owner_secret=access_token.secret) return auth1 else: raise OAuthException( "Cannot generate auth. User has not authorized.")
[docs]def authorized(route): """ Wrap a flask route. Ensure that the user has authorized via OAuth or redirect the user to the authorization endpoint with a delayed redirect back to the originating endpoint. """ @wraps(route) def authorized_route(*args, **kwargs): if 'mwoauth_access_token' in flask.session: return route(*args, **kwargs) else: return flask.redirect( flask.url_for('mwoauth.mwoauth_initiate') + "?next=" + flask.request.endpoint) return authorized_route
def generic_logout(): return "Logged out" def generic_identify(identity): return flask.jsonify(identity) def generic_error(message, status): return '<span style="color: red;">' + message + '</span>', status def encode_token(token): return dict(zip(token._fields, token)) def _str(val): """Ensure that the val is the default str() type for python2 or 3.""" if str == bytes: if isinstance(val, str): return val else: return str(val) else: if isinstance(val, str): return val else: return str(val, 'ascii')