Source code for otp_sns.models
from binascii import unhexlify
from functools import partial
import logging
from boto import sns
from django.db import models
from django_otp.models import Device
from django_otp.oath import totp
from django_otp.util import random_hex, hex_validator
from .conf import settings
logger = logging.getLogger(__name__)
[docs]class SNSDevice(Device):
"""
A device that delivers codes to an AWS SNS topic. The primary application
of this is expected to be SMS delivery, although there's no such limitation
in the code.
Creating SNS topics and adding subscriptions is an exercise left to the
client. This device only posts and verifies.
This uses TOTP to generate temporary tokens. We use the default 30 second
time step and allow a one step grace period.
.. attribute:: topic
The ARN of the SNS topic to post to.
.. attribute:: message
The message to show the user after posting a token. (Default:
settings.OTP_SNS_DEFAULT_MESSAGE)
.. attribute:: key
The secret key used to generate TOTP tokens.
"""
topic = models.CharField(max_length=256,
help_text="The ARN of the SNS topic to post to."
)
message = models.CharField(max_length=64,
default=settings.OTP_SNS_DEFAULT_MESSAGE,
help_text="The message to present the user after sending the token."
)
key = models.CharField(max_length=40,
validators=[hex_validator(20)],
default=lambda: random_hex(20),
help_text="A random key used to generate tokens (hex-encoded)."
)
class Meta(Device.Meta):
verbose_name = "SNS Device"
def __unicode__(self):
return self.topic
@property
def bin_key(self):
return unhexlify(self.key)
def generate_challenge(self):
"""
Generates a random token and publishes it to
:attr:`~otp_sns.models.SNSDevice.topic`. The token will only be valid
for a limited time. If there's an error, we'll log it and raise a
StandardError.
:returns: :attr:`~otp_sns.models.SNSDevice.message` on success.
"""
token = totp(self.bin_key)
# Special topic for test cases
if self.topic == 'test':
return token
try:
region = self.topic.split(':')[3]
connection = sns.connect_to_region(region,
aws_access_key_id=settings.OTP_SNS_AWS_ID,
aws_secret_access_key=settings.OTP_SNS_AWS_KEY
)
result = connection.publish(self.topic, '{0:06}'.format(token))
result['PublishResponse']['PublishResult']['MessageId']
except StandardError as e:
logger.error('Error posting SNS token: {0}'.format(e))
raise StandardError('Failed to send the token')
return self.message
def verify_token(self, token):
try:
token = int(token)
except ValueError:
return False
else:
return any(totp(self.bin_key, drift=drift) == token for drift in [0, -1])