# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.
"""Implementention of various utility functions."""
from functools import partial
import six
from flask import abort, current_app, jsonify, make_response, request, url_for
from invenio_pidstore.errors import PIDDeletedError, PIDDoesNotExistError, \
PIDMissingObjectError, PIDRedirectedError, PIDUnregistered
from invenio_pidstore.resolver import Resolver
from invenio_records.api import Record
from werkzeug.routing import BaseConverter, BuildError, PathConverter
from werkzeug.utils import cached_property, import_string
from .errors import PIDDeletedRESTError, PIDDoesNotExistRESTError, \
PIDMissingObjectRESTError, PIDRedirectedRESTError, \
PIDUnregisteredRESTError
from .proxies import current_records_rest
[docs]def build_default_endpoint_prefixes(records_rest_endpoints):
"""Build the default_endpoint_prefixes map."""
pid_types = set()
guessed = set()
endpoint_prefixes = {}
for key, endpoint in records_rest_endpoints.items():
pid_type = endpoint['pid_type']
pid_types.add(pid_type)
is_guessed = key == pid_type
is_default = endpoint.get('default_endpoint_prefix', False)
if is_default:
if pid_type in endpoint_prefixes and pid_type not in guessed:
raise ValueError('More than one "{0}" defined.'.format(
pid_type
))
endpoint_prefixes[pid_type] = key
guessed -= {pid_type}
elif is_guessed and pid_type not in endpoint_prefixes:
endpoint_prefixes[pid_type] = key
guessed |= {pid_type}
not_found = pid_types - set(endpoint_prefixes.keys())
if not_found:
raise ValueError('No endpoint-prefix for {0}.'.format(
', '.join(not_found)
))
return endpoint_prefixes
[docs]def obj_or_import_string(value, default=None):
"""Import string or return object.
:params value: Import path or class object to instantiate.
:params default: Default object to return if the import fails.
:returns: The imported object.
"""
if isinstance(value, six.string_types):
return import_string(value)
elif value:
return value
return default
[docs]def load_or_import_from_config(key, app=None, default=None):
"""Load or import value from config.
:returns: The loaded value.
"""
app = app or current_app
imp = app.config.get(key)
return obj_or_import_string(imp, default=default)
[docs]def allow_all(*args, **kwargs):
"""Return permission that always allow an access.
:returns: A object instance with a ``can()`` method.
"""
return type('Allow', (), {'can': lambda self: True})()
[docs]def deny_all(*args, **kwargs):
"""Return permission that always deny an access.
:returns: A object instance with a ``can()`` method.
"""
return type('Deny', (), {'can': lambda self: False})()
[docs]def check_elasticsearch(record, *args, **kwargs):
"""Return permission that check if the record exists in ES index.
:params record: A record object.
:returns: A object instance with a ``can()`` method.
"""
def can(self):
"""Try to search for given record."""
search = request._methodview.search_class()
search = search.get_record(str(record.id))
return search.count() == 1
return type('CheckES', (), {'can': can})()
[docs]class LazyPIDValue(object):
"""Lazy resolver for PID value."""
def __init__(self, resolver, value):
"""Initialize with resolver and URL value.
:params resolver: Used to resolve PID value and return a record.
:params value: PID value.
"""
self.resolver = resolver
self.value = value
@cached_property
[docs] def data(self):
"""Resolve PID value and return tuple with PID and record.
:returns: A tuple with the PID and the record resolved.
"""
try:
return self.resolver.resolve(self.value)
except PIDDoesNotExistError:
raise PIDDoesNotExistRESTError()
except PIDUnregistered:
raise PIDUnregisteredRESTError()
except PIDDeletedError:
raise PIDDeletedRESTError()
except PIDMissingObjectError as e:
current_app.logger.exception(
'No object assigned to {0}.'.format(e.pid),
extra={'pid': e.pid})
raise PIDMissingObjectRESTError(e.pid)
except PIDRedirectedError as e:
try:
location = url_for(
'.{0}_item'.format(
current_records_rest.default_endpoint_prefixes[
e.destination_pid.pid_type]),
pid_value=e.destination_pid.pid_value)
data = dict(
status=301,
message='Moved Permanently',
location=location,
)
response = make_response(jsonify(data), data['status'])
response.headers['Location'] = location
abort(response)
except (BuildError, KeyError):
current_app.logger.exception(
'Invalid redirect - pid_type "{0}" '
'endpoint missing.'.format(
e.destination_pid.pid_type),
extra={
'pid': e.pid,
'destination_pid': e.destination_pid,
})
raise PIDRedirectedRESTError(e.destination_pid.pid_type)
[docs]class PIDConverter(BaseConverter):
"""Resolve PID value."""
def __init__(self, url_map, pid_type, getter=None, record_class=None):
"""Initialize PID resolver."""
super(PIDConverter, self).__init__(url_map)
getter = obj_or_import_string(getter, default=partial(
obj_or_import_string(record_class, default=Record).get_record,
with_deleted=True
))
self.resolver = Resolver(pid_type=pid_type, object_type='rec',
getter=getter)
[docs] def to_python(self, value):
"""Resolve PID value."""
return LazyPIDValue(self.resolver, value)
[docs]class PIDPathConverter(PIDConverter, PathConverter):
"""Resolve PID path value."""