# -*- coding: utf-8 -*-
import os
import mimetypes
from pyramid import compat
from zope.interface import implementer
from . import utils
from .exceptions import FileNotAllowed
from .extensions import resolve_extensions
from .interfaces import IFileStorage
from .registry import register_file_storage_impl
def includeme(config):
impl = S3FileStorage.from_settings(
config.registry.settings, prefix='storage.'
)
register_file_storage_impl(config, impl)
@implementer(IFileStorage)
[docs]class S3FileStorage(object):
@classmethod
def from_settings(cls, settings, prefix):
return cls(access_key=settings[prefix + 'aws.access_key'],
secret_key=settings[prefix + 'aws.secret_key'],
bucket_name=settings[prefix + 'aws.bucket'],
acl=settings.get(prefix + 'aws.default_acl', 'public-read'),
base_url=settings.get(prefix + 'base_url', ''),
extensions=settings.get(prefix + 'extensions', 'default'))
def __init__(self, access_key, secret_key, bucket_name,
acl=None, base_url='', extensions='default'):
self.access_key = access_key
self.secret_key = secret_key
self.bucket_name = bucket_name
self.acl = acl
self.base_url = base_url
self.extensions = resolve_extensions(extensions)
def get_connection(self):
try:
from boto.s3.connection import S3Connection
except ImportError:
raise RuntimeError("You must have boto installed to use s3")
return S3Connection(self.access_key, self.secret_key)
def get_bucket(self):
return self.get_connection().get_bucket(self.bucket_name)
[docs] def url(self, filename):
"""Returns entire URL of the filename, joined to the base_url
:param filename: base name of file
"""
return compat.urlparse.urljoin(self.base_url, filename)
def exists(self, filename):
return self.get_bucket().new_key(filename).exists()
[docs] def delete(self, filename):
"""Deletes the filename. Filename is resolved with the
absolute path based on base_path. If file does not exist,
returns **False**, otherwise **True**
:param filename: base name of file
"""
self.get_bucket().delete_key(filename)
[docs] def filename_allowed(self, filename, extensions=None):
"""Checks if a filename has an allowed extension
:param filename: base name of file
:param extensions: iterable of extensions (or self.extensions)
"""
_, ext = os.path.splitext(filename)
return self.extension_allowed(ext, extensions)
[docs] def file_allowed(self, fs, extensions=None):
"""Checks if a file can be saved, based on extensions
:param fs: **cgi.FieldStorage** object or similar
:param extensions: iterable of extensions (or self.extensions)
"""
return self.filename_allowed(fs.filename, extensions)
[docs] def extension_allowed(self, ext, extensions=None):
"""Checks if an extension is permitted. Both e.g. ".jpg" and
"jpg" can be passed in. Extension lookup is case-insensitive.
:param extensions: iterable of extensions (or self.extensions)
"""
extensions = extensions or self.extensions
if not extensions:
return True
if ext.startswith('.'):
ext = ext[1:]
return ext.lower() in extensions
[docs] def save(self, fs, *args, **kwargs):
"""Saves contents of a **cgi.FieldStorage** object to the file system.
Returns modified filename(including folder).
Returns the resolved filename, i.e. the folder + (modified/randomized)
filename.
:param fs: **cgi.FieldStorage** object (or similar)
:param folder: relative path of sub-folder
:param randomize: randomize the filename
:param extensions: iterable of allowed extensions, if not default
:param acl: ACL policy (if None then uses default)
:param replace: replace existing key
:param headers: dict of s3 request headers
:returns: modified filename
"""
return self.save_file(fs.file, fs.filename, *args, **kwargs)
[docs] def save_filename(self, filename, *args, **kwargs):
"""Saves a filename in local filesystem to the uploads location.
Returns the resolved filename, i.e. the folder +
the (randomized/incremented) base name.
:param filename: local filename
:param folder: relative path of sub-folder
:param randomize: randomize the filename
:param extensions: iterable of allowed extensions, if not default
:param acl: ACL policy (if None then uses default)
:param replace: replace existing key
:param headers: dict of s3 request headers
:returns: modified filename
"""
return self.save_file(open(filename, "rb"), filename, *args, **kwargs)
[docs] def save_file(self, file, filename, folder=None, randomize=False,
extensions=None, acl=None, replace=False, headers=None):
"""
:param filename: local filename
:param folder: relative path of sub-folder
:param randomize: randomize the filename
:param extensions: iterable of allowed extensions, if not default
:param acl: ACL policy (if None then uses default)
:param replace: replace existing key
:param headers: dict of s3 request headers
:returns: modified filename
"""
acl = acl or self.acl
headers = headers or {}
extensions = extensions or self.extensions
if not self.filename_allowed(filename, extensions):
raise FileNotAllowed()
filename = utils.secure_filename(
os.path.basename(filename)
)
if randomize:
filename = utils.random_filename(filename)
if folder:
filename = folder + "/" + filename
content_type, _ = mimetypes.guess_type(filename)
content_type = content_type or 'application/octet-stream'
headers.update({
'Content-Type': content_type,
})
bucket = self.get_bucket()
key = bucket.get_key(filename) or bucket.new_key(filename)
key.set_metadata('Content-Type', content_type)
file.seek(0)
key.set_contents_from_file(file,
headers=headers,
policy=acl,
replace=replace,
rewind=True)
return filename