"""
Flask-GoogleLogin
"""
from base64 import (urlsafe_b64encode as b64encode,
urlsafe_b64decode as b64decode)
from urllib import urlencode
from urlparse import parse_qsl
from functools import wraps
from flask import request, redirect, abort, current_app
from flask_login import LoginManager, make_secure_token
import requests
GOOGLE_OAUTH2_AUTH_URL = 'https://accounts.google.com/o/oauth2/auth'
GOOGLE_OAUTH2_TOKEN_URL = 'https://accounts.google.com/o/oauth2/token'
GOOGLE_OAUTH2_USERINFO_URL = 'https://www.googleapis.com/oauth2/v1/userinfo'
USERINFO_PROFILE_SCOPE = 'https://www.googleapis.com/auth/userinfo.profile'
[docs]class GoogleLogin(object):
"""Main extension class"""
def __init__(self, app=None, login_manager=None):
if login_manager:
self.login_manager = login_manager
else:
self.login_manager = LoginManager()
if app:
self._app = app
self.init_app(app)
[docs] def init_app(self, app, add_context_processor=True, login_manager=None):
"""Initialize with app configuration. Existing
`flaskext.login.LoginManager` instance can be passed."""
if login_manager:
self.login_manager = login_manager
else:
self.login_manager = LoginManager()
# Check if login manager has been init
if not hasattr(app, 'login_manager'):
self.login_manager.init_app(
app,
add_context_processor=add_context_processor)
# Clear flashed messages since we redirect to auth immediately
self.login_manager.login_message = None
self.login_manager.needs_refresh_message = None
# Set default unauthorized callback
self.login_manager.unauthorized_handler(self.unauthorized_callback)
@property
def app(self):
return getattr(self, '_app', current_app)
@property
def scopes(self):
return self.app.config.get('GOOGLE_LOGIN_SCOPES', '')
@property
def client_id(self):
return self.app.config['GOOGLE_LOGIN_CLIENT_ID']
@property
def client_secret(self):
return self.app.config['GOOGLE_LOGIN_CLIENT_SECRET']
@property
def redirect_uri(self):
return self.app.config.get('GOOGLE_LOGIN_REDIRECT_URI')
[docs] def login_url(self, params=None, **kwargs):
"""Return login url with params encoded in state"""
if not params:
params = {}
kwargs.setdefault('access_type', 'online')
if 'approval_prompt' not in kwargs:
kwargs.setdefault('approval_prompt', 'auto')
scopes = kwargs.pop('scopes', self.scopes.split(','))
if USERINFO_PROFILE_SCOPE not in scopes:
scopes.append(USERINFO_PROFILE_SCOPE)
# NOTE: redirect_uri is stored in state for use later in getting token
params['redirect_uri'] = kwargs.pop('redirect_uri', self.redirect_uri)
state = b64encode(urlencode(dict(sig=make_secure_token(**params),
**params)))
return GOOGLE_OAUTH2_AUTH_URL + '?' + urlencode(
dict(response_type='code',
client_id=self.client_id,
scope=' '.join(scopes),
redirect_uri=params['redirect_uri'],
state=state,
**kwargs))
[docs] def unauthorized_callback(self):
"""Redirect to login url with next param set as request.url"""
return redirect(self.login_url(params=dict(next=request.url)))
[docs] def login(self, code, redirect_uri):
"""Exchanges code for tokens and returns retrieved `userinfo` and
`token`"""
token = requests.post(GOOGLE_OAUTH2_TOKEN_URL, data=dict(
code=code,
redirect_uri=redirect_uri,
grant_type='authorization_code',
client_id=self.client_id,
client_secret=self.client_secret,
)).json
if not token or token.get('error'):
abort(400)
userinfo = requests.get(GOOGLE_OAUTH2_USERINFO_URL, params=dict(
access_token=token['access_token'],
)).json
if not userinfo or userinfo.get('error'):
abort(400)
return token, userinfo
[docs] def get_access_token(self, refresh_token):
"""Use a refresh token to obtain a new access token"""
token = requests.post(GOOGLE_OAUTH2_TOKEN_URL, data=dict(
refresh_token=refresh_token,
grant_type='refresh_token',
client_id=self.client_id,
client_secret=self.client_secret,
)).json
if not token or token.get('error'):
return
return token
[docs] def oauth2callback(self, view_func):
"""Decorator for OAuth2 callback. Calls `GoogleLogin.login` then
passes results to `view_func`."""
@wraps(view_func)
def decorated(*args, **kwargs):
code = request.args.get('code')
if not code:
abort(400)
# Check sig
params = dict(parse_qsl(b64decode(str(request.args.get('state')))))
if params.pop('sig', None) != make_secure_token(**params):
return self.login_manager.unauthorized()
# Get userinfo and token
token, userinfo = self.login(code, params.pop('redirect_uri'))
params.update(token=token, userinfo=userinfo)
return view_func(**params)
return decorated
[docs] def user_loader(self, func):
"""Shortcut to `login_manager`'s
`flaskext.login.LoginManager.user_loader`"""
self.login_manager.user_loader(func)