"""
A set of stateless functions that can be used to complete various steps of an
OAuth handshake or to identify a MediaWiki user.
:Example:
.. code-block:: python
from mwoauth import ConsumerToken, initiate, complete, identify
from six.moves import input # For compatibility between python 2 and 3
# Consruct a "consumer" from the key/secret provided by MediaWiki
import config
consumer_token = ConsumerToken(
config.consumer_key, config.consumer_secret)
mw_uri = "https://en.wikipedia.org/w/index.php"
# Step 1: Initialize -- ask MediaWiki for a temporary key/secret for
# user
redirect, request_token = initiate(mw_uri, consumer_token)
# Step 2: Authorize -- send user to MediaWiki to confirm authorization
print("Point your browser to: %s" % redirect) #
response_qs = input("Response query string: ")
# Step 3: Complete -- obtain authorized key/secret for "resource owner"
access_token = complete(
mw_uri, consumer_token, request_token, response_qs)
print(str(access_token))
# Step 4: Identify -- (optional) get identifying information about the
# user
identity = identify(mw_uri, consumer_token, access_token)
print("Identified as {username}.".format(**identity))
"""
import re
import time
import jwt
import requests
from requests_oauthlib import OAuth1
from six import PY3, b, text_type
from six.moves.urllib.parse import parse_qs, urlencode, urlparse
from . import defaults
from .errors import OAuthException
from .tokens import AccessToken, RequestToken
def force_unicode(val):
if type(val) == text_type:
return val
else:
if PY3:
return str(val, "unicode-escape")
else:
return unicode(val, "unicode-escape")
[docs]def initiate(mw_uri, consumer_token, callback='oob',
user_agent=defaults.USER_AGENT):
"""
Initiate an oauth handshake with MediaWiki.
:Parameters:
mw_uri : `str`
The base URI of the MediaWiki installation. Note that the URI
should end in ``"index.php"``.
consumer_token : :class:`~mwoauth.ConsumerToken`
A token representing you, the consumer. Provided by MediaWiki via
``Special:OAuthConsumerRegistration``.
callback : `str`
Callback URL. Defaults to 'oob'.
:Returns:
A `tuple` of two values:
* a MediaWiki URL to direct the user to
* a :class:`~mwoauth.RequestToken` representing a request for access
"""
auth = OAuth1(consumer_token.key,
client_secret=consumer_token.secret,
callback_uri=callback)
r = requests.post(url=mw_uri,
params={'title': "Special:OAuth/initiate"},
auth=auth,
headers={'User-Agent': user_agent})
credentials = parse_qs(r.content)
if credentials is None or credentials == {}:
raise OAuthException(
"Expected x-www-form-urlencoded response from " +
"MediaWiki, but got something else: " +
"{0}".format(repr(r.content)))
elif b('oauth_token') not in credentials or \
b('oauth_token_secret') not in credentials:
raise OAuthException(
"MediaWiki response lacks token information: "
"{0}".format(repr(credentials)))
else:
request_token = RequestToken(
credentials.get(b('oauth_token'))[0],
credentials.get(b('oauth_token_secret'))[0]
)
params = {'title': "Special:OAuth/authenticate",
'oauth_token': request_token.key,
'oauth_consumer_key': consumer_token.key}
return (
mw_uri + "?" + urlencode(params),
request_token
)
[docs]def complete(mw_uri, consumer_token, request_token, response_qs,
user_agent=defaults.USER_AGENT):
"""
Complete an OAuth handshake with MediaWiki by exchanging an
:Parameters:
mw_uri : `str`
The base URI of the MediaWiki installation. Note that the URI
should end in ``"index.php"``.
consumer_token : :class:`~mwoauth.ConsumerToken`
A key/secret pair representing you, the consumer.
request_token : :class:`~mwoauth.RequestToken`
A temporary token representing the user. Returned by
`initiate()`.
response_qs : `bytes`
The query string of the URL that MediaWiki forwards the user back
after authorization.
:Returns:
An `AccessToken` containing an authorized key/secret pair that
can be stored and used by you.
"""
callback_data = parse_qs(_ensure_bytes(response_qs))
if callback_data is None or callback_data == {}:
raise OAuthException(
"Expected URL query string, but got " +
"something else instead: {0}".format(str(response_qs)))
elif b('oauth_token') not in callback_data or \
b('oauth_verifier') not in callback_data:
raise OAuthException(
"Query string lacks token information: "
"{0}".format(repr(callback_data)))
else:
# Check if the query string references the right temp resource owner
# key
request_token_key = callback_data.get(b("oauth_token"))[0]
# Get the verifier token
verifier = callback_data.get(b("oauth_verifier"))[0]
if not request_token.key == request_token_key:
raise OAuthException(
"Unexpect request token key " +
"{0}, expected {1}.".format(request_token_key, request_token.key))
# Construct a new auth with the verifier
auth = OAuth1(consumer_token.key,
client_secret=consumer_token.secret,
resource_owner_key=request_token.key,
resource_owner_secret=request_token.secret,
verifier=verifier)
# Send the verifier and ask for an authorized resource owner key/secret
r = requests.post(url=mw_uri,
params={'title': "Special:OAuth/token"},
auth=auth,
headers={'User-Agent': user_agent})
# Parse response and construct an authorized resource owner
credentials = parse_qs(r.content)
if credentials is None:
raise OAuthException(
"Expected x-www-form-urlencoded response, " +
"but got some else instead: {0}".format(r.content))
access_token = AccessToken(
credentials.get(b('oauth_token'))[0],
credentials.get(b('oauth_token_secret'))[0]
)
return access_token
def _ensure_bytes(val, encoding="ascii"):
if isinstance(val, bytes):
return val
elif str == bytes:
return val.encode(encoding)
else:
return bytes(val, encoding)
[docs]def identify(mw_uri, consumer_token, access_token, leeway=10.0,
user_agent=defaults.USER_AGENT):
"""
Gather identifying information about a user via an authorized token.
:Parameters:
mw_uri : `str`
The base URI of the MediaWiki installation. Note that the URI
should end in ``"index.php"``.
consumer_token : :class:`~mwoauth.ConsumerToken`
A token representing you, the consumer.
access_token : :class:`~mwoauth.AccessToken`
A token representing an authorized user. Obtained from
`complete()`
leeway : `int` | `float`
The number of seconds of leeway to account for when examining a
tokens "issued at" timestamp.
:Returns:
A dictionary containing identity information.
"""
# Construct an OAuth auth
auth = OAuth1(consumer_token.key,
client_secret=consumer_token.secret,
resource_owner_key=access_token.key,
resource_owner_secret=access_token.secret)
# Request the identity using auth
r = requests.post(url=mw_uri,
params={'title': "Special:OAuth/identify"},
auth=auth,
headers={'User-Agent': user_agent})
# Special:OAuth/identify unhelpfully returns 200 status even when there is
# an error in the API call. Check for error messages manually.
try:
resp = r.json()
if 'error' in resp:
raise OAuthException(
"A MediaWiki API error occurred: {0}".format(resp['message']))
except ValueError as e:
raise OAuthException(
"An error occurred while trying to read json " +
"content: {0}".format(e))
# Decode json & stuff
try:
identity = jwt.decode(r.content, consumer_token.secret,
audience=consumer_token.key,
algorithms=["HS256"],
leeway=leeway)
except jwt.InvalidTokenError as e:
raise OAuthException(
"An error occurred while trying to read json " +
"content: {0}".format(e))
# Verify the issuer is who we expect (server sends $wgCanonicalServer)
issuer = urlparse(identity['iss']).netloc
expected_domain = urlparse(mw_uri).netloc
if not issuer == expected_domain:
raise OAuthException(
"Unexpected issuer " +
"{0}, expected {1}".format(issuer, expected_domain))
# Check that the identity was issued in the past.
now = time.time()
issued_at = float(identity['iat'])
if not now >= (issued_at - leeway):
raise OAuthException(
"Identity issued {0} ".format(issued_at - now) +
"seconds in the future!")
# Verify that the nonce matches our request one,
# to avoid a replay attack
authorization_header = force_unicode(r.request.headers['Authorization'])
request_nonce = re.search(r'oauth_nonce="(.*?)"',
authorization_header).group(1)
if identity['nonce'] != request_nonce:
raise OAuthException(
'Replay attack detected: {0} != {1}'.format(
identity['nonce'], request_nonce))
return identity