Source code for velruse.providers.facebook

"""Facebook Authentication Views"""
import datetime
import uuid

from pyramid.httpexceptions import HTTPFound
import requests

from ..api import (
from ..compat import parse_qsl
from ..exceptions import CSRFError
from ..exceptions import ThirdPartyFailure
from ..settings import ProviderSettings
from ..utils import flat_url

[docs]class FacebookAuthenticationComplete(AuthenticationComplete): """Facebook auth complete"""
[docs]def includeme(config): config.add_directive('add_facebook_login', add_facebook_login) config.add_directive('add_facebook_login_from_settings', add_facebook_login_from_settings)
[docs]def add_facebook_login_from_settings(config, prefix='velruse.facebook.'): settings = config.registry.settings p = ProviderSettings(settings, prefix) p.update('consumer_key', required=True) p.update('consumer_secret', required=True) p.update('scope') p.update('login_path') p.update('callback_path') config.add_facebook_login(**p.kwargs)
[docs]def add_facebook_login(config, consumer_key, consumer_secret, scope=None, login_path='/login/facebook', callback_path='/login/facebook/callback', name='facebook'): """ Add a Facebook login provider to the application. """ provider = FacebookProvider(name, consumer_key, consumer_secret, scope) config.add_route(provider.login_route, login_path) config.add_view(provider, attr='login', route_name=provider.login_route, permission=NO_PERMISSION_REQUIRED) config.add_route(provider.callback_route, callback_path, use_global_views=True, factory=provider.callback) register_provider(config, name, provider)
class FacebookProvider(object): def __init__(self, name, consumer_key, consumer_secret, scope): = name self.type = 'facebook' self.consumer_key = consumer_key self.consumer_secret = consumer_secret self.scope = scope self.display = 'page' self.login_route = 'velruse.%s-login' % name self.callback_route = 'velruse.%s-callback' % name def login(self, request): """Initiate a facebook login""" scope = request.POST.get('scope', self.scope) display = request.POST.get('display', self.display) request.session['state'] = state = uuid.uuid4().hex fb_url = flat_url( '', scope=scope, display=display, client_id=self.consumer_key, redirect_uri=request.route_url(self.callback_route), state=state) return HTTPFound(location=fb_url) def callback(self, request): """Process the facebook redirect""" sess_state = request.session.get('state') req_state = request.GET.get('state') if not sess_state or sess_state != req_state: raise CSRFError( 'CSRF Validation check failed. Request state {req_state} is ' 'not the same as session state {sess_state}'.format( req_state=req_state, sess_state=sess_state ) ) code = request.GET.get('code') if not code: reason = request.GET.get('error_reason', 'No reason provided.') return AuthenticationDenied(reason=reason,, provider_type=self.type) # Now retrieve the access token with the code access_url = flat_url( '', client_id=self.consumer_key, client_secret=self.consumer_secret, redirect_uri=request.route_url(self.callback_route), code=code) r = requests.get(access_url) if r.status_code != 200: raise ThirdPartyFailure("Status %s: %s" % ( r.status_code, r.content)) access_token = dict(parse_qsl(r.content))['access_token'] # Retrieve profile data graph_url = flat_url('', access_token=access_token) r = requests.get(graph_url) if r.status_code != 200: raise ThirdPartyFailure("Status %s: %s" % ( r.status_code, r.content)) fb_profile = r.json() profile = extract_fb_data(fb_profile) cred = {'oauthAccessToken': access_token} return FacebookAuthenticationComplete(profile=profile, credentials=cred,, provider_type=self.type) def extract_fb_data(data): """Extact and normalize facebook data as parsed from the graph JSON""" # Setup the normalized contact info nick = None # Setup the nick and preferred username to the last portion of the # FB link URL if its not their ID link = data.get('link') if link: last = link.split('/')[-1] if last != data['id']: nick = last profile = { 'accounts': [{'domain': '', 'userid': data['id']}], 'displayName': data['name'], 'preferredUsername': nick or data['name'], } gender = data.get('gender') if gender: profile['gender'] = gender email = data.get('email') if email: profile['emails'] = [{'value': email, 'primary': True}] if data.get('verified') and email: profile['verifiedEmail'] = email tz = data.get('timezone') if tz: # -5.5 -> -05:30 offset = float(tz) h = int(offset) m = int(abs(offset - h) * 60) profile['utcOffset'] = '{h:+03d}:{m:02d}'.format(h=h, m=m) bday = data.get('birthday') if bday: try: mth, day, yr = bday.split('/') date =, int(mth), int(day)) profile['birthday'] = date.strftime('%Y-%m-%d') except ValueError: pass name = {} pcard_map = {'first_name': 'givenName', 'last_name': 'familyName'} for key, val in pcard_map.items(): part = data.get(key) if part: name[val] = part name['formatted'] = data['name'] profile['name'] = name # Now strip out empty values for k, v in profile.items(): if not v or (isinstance(v, list) and not v[0]): del profile[k] return profile