Source code for pyramid_storage.local

# -*- coding: utf-8 -*-

import os
import shutil

from pyramid import compat
from zope.interface import implementer

from . import utils
from .extensions import resolve_extensions
from .exceptions import FileNotAllowed
from .interfaces import IFileStorage
from .registry import register_file_storage_impl


def includeme(config):

    impl = LocalFileStorage.from_settings(
        config.registry.settings, prefix='storage.'
    )

    register_file_storage_impl(config, impl)


@implementer(IFileStorage)
[docs]class LocalFileStorage(object): """Manages storage and retrieval of file uploads to local filesystem on server. :param base_path: the absolute base path where uploads are stored :param base_url: absolute or relative base URL for uploads :param extensions: extensions string """ @classmethod
[docs] def from_settings(cls, settings, prefix): """Returns a new instance from config settings. :param settings: dict(-like) of settings :param prefix: prefix separating these settings """ options = ( ('base_path', True, None), ('base_url', False, ''), ('extensions', False, 'default'), ) kwargs = {} for name, required, default in options: try: kwargs[name] = settings[prefix + name] except KeyError: if required: raise ValueError("%s%s is required" % (prefix, name)) kwargs[name] = default return cls(**kwargs)
def __init__(self, base_path, base_url='', extensions='default'): self.base_path = base_path self.base_url = base_url self.extensions = resolve_extensions(extensions)
[docs] def url(self, filename): """Returns entire URL of the filename, joined to the base_url :param filename: base name of file """ return compat.urlparse.urljoin(self.base_url, filename)
[docs] def path(self, filename): """Returns absolute file path of the filename, joined to the base_path. :param filename: base name of file """ return os.path.join(self.base_path, filename)
[docs] def delete(self, filename): """Deletes the filename. Filename is resolved with the absolute path based on base_path. If file does not exist, returns **False**, otherwise **True** :param filename: base name of file """ if self.exists(filename): os.remove(self.path(filename)) return True return False
[docs] def exists(self, filename): """Checks if file exists. Resolves filename's absolute path based on base_path. :param filename: base name of file """ return os.path.exists(self.path(filename))
[docs] def filename_allowed(self, filename, extensions=None): """Checks if a filename has an allowed extension :param filename: base name of file :param extensions: iterable of extensions (or self.extensions) """ _, ext = os.path.splitext(filename) return self.extension_allowed(ext, extensions)
[docs] def file_allowed(self, fs, extensions=None): """Checks if a file can be saved, based on extensions :param fs: **cgi.FieldStorage** object or similar :param extensions: iterable of extensions (or self.extensions) """ return self.filename_allowed(fs.filename, extensions)
[docs] def extension_allowed(self, ext, extensions=None): """Checks if an extension is permitted. Both e.g. ".jpg" and "jpg" can be passed in. Extension lookup is case-insensitive. :param extensions: iterable of extensions (or self.extensions) """ extensions = extensions or self.extensions if not extensions: return True if ext.startswith('.'): ext = ext[1:] return ext.lower() in extensions
[docs] def save(self, fs, *args, **kwargs): """Saves contents of a **cgi.FieldStorage** object to the file system. Returns modified filename(including folder). If there is a clash with an existing filename then filename will be resolved accordingly. If path directories do not exist they will be created. Returns the resolved filename, i.e. the folder + the (randomized/incremented) base name. :param fs: **cgi.FieldStorage** object (or similar) :param folder: relative path of sub-folder :param randomize: randomize the filename :param extensions: iterable of allowed extensions, if not default :returns: modified filename """ return self.save_file(fs.file, fs.filename, *args, **kwargs)
[docs] def save_filename(self, filename, *args, **kwargs): """Saves a filename in local filesystem to the uploads location. Returns the resolved filename, i.e. the folder + the (randomized/incremented) base name. :param filename: local filename :param folder: relative path of sub-folder :param randomize: randomize the filename :param extensions: iterable of allowed extensions, if not default :returns: modified filename """ return self.save_file(open(filename, "rb"), filename, *args, **kwargs)
[docs] def save_file(self, file, filename, folder=None, randomize=False, extensions=None): """Saves a file object to the uploads location. Returns the resolved filename, i.e. the folder + the (randomized/incremented) base name. :param fs: **cgi.FieldStorage** object (or similar) :param filename: original filename :param folder: relative path of sub-folder :param randomize: randomize the filename :param extensions: iterable of allowed extensions, if not default :returns: modified filename """ extensions = extensions or self.extensions if not self.filename_allowed(filename, extensions): raise FileNotAllowed() filename = utils.secure_filename( os.path.basename(filename) ) if folder: dest_folder = os.path.join(self.base_path, folder) else: dest_folder = self.base_path if not os.path.exists(dest_folder): os.makedirs(dest_folder) if randomize: filename = utils.random_filename(filename) filename, path = self.resolve_name(filename, dest_folder) file.seek(0) with open(path, "wb") as dest: shutil.copyfileobj(file, dest) if folder: filename = os.path.join(folder, filename) return filename
[docs] def resolve_name(self, name, folder): """Resolves a unique name and the correct path. If a filename for that path already exists then a numeric prefix will be added, for example test.jpg -> test-1.jpg etc. :param name: base name of file :param folder: absolute folder path """ basename, ext = os.path.splitext(name) counter = 0 while True: path = os.path.join(folder, name) if not os.path.exists(path): return name, path counter += 1 name = '%s-%d%s' % (basename, counter, ext)