Source code for invenio_records.api

# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015, 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.

"""Record API."""

from __future__ import absolute_import, print_function

from copy import deepcopy

from flask import current_app
from invenio_db import db
from jsonpatch import apply_patch
from sqlalchemy.orm.attributes import flag_modified
from werkzeug.local import LocalProxy

from .errors import MissingModelError
from .models import RecordMetadata
from .signals import after_record_delete, after_record_insert, \
    after_record_revert, after_record_update, before_record_delete, \
    before_record_insert, before_record_revert, before_record_update

_records_state = LocalProxy(lambda: current_app.extensions['invenio-records'])


[docs]class RecordBase(dict): """Base class for Record and RecordBase.""" def __init__(self, data, model=None): """Initialize instance with dictionary data and SQLAlchemy model. :param data: Dict with record metadata. :param model: :class:`~invenio_records.models.RecordMetadata` instance. """ self.model = model super(RecordBase, self).__init__(data or {}) @property def id(self): """Get model identifier.""" return self.model.id if self.model else None @property def revision_id(self): """Get revision identifier.""" return self.model.version_id-1 if self.model else None @property def created(self): """Get creation timestamp.""" return self.model.created if self.model else None @property def updated(self): """Get last updated timestamp.""" return self.model.updated if self.model else None
[docs] def validate(self, **kwargs): r"""Validate record according to schema defined in ``$schema`` key. :Keyword Arguments: * **format_checker** -- A ``format_checker`` is an instance of class :class:`jsonschema.FormatChecker` containing business logic to validate arbitrary formats. For example: >>> from jsonschema import FormatChecker >>> from jsonschema.validators import validate >>> checker = FormatChecker() >>> checker.checks('foo')(lambda el: el.startswith('foo')) <function <lambda> at ...> >>> validate('foo', {'format': 'foo'}, format_checker=checker) returns ``None``, which means that the validation was successful, while >>> validate('bar', {'format': 'foo'}, format_checker=checker) Traceback (most recent call last): ... ValidationError: 'bar' is not a 'foo' ... raises a :class:`jsonschema.exceptions.ValidationError`. * **validator** -- A :class:`jsonschema.IValidator` class used for record validation. It will be used as `cls` argument when calling :func:`jsonschema.validate`. For example >>> from jsonschema.validators import extend, Draft4Validator >>> NoRequiredValidator = extend( ... Draft4Validator, ... validators={'required': lambda v, r, i, s: None}) >>> schema = { ... 'type': 'object', ... 'properties': { ... 'name': { 'type': 'string' }, ... 'email': { 'type': 'string' }, ... 'address': {'type': 'string' }, ... 'telephone': { 'type': 'string' } ... }, ... 'required': ['name', 'email'] ... } >>> from jsonschema.validators import validate >>> validate({}, schema, NoRequiredValidator) returns ``None``, which means that the validation was successful, while >>> validate({}, schema) Traceback (most recent call last): ... ValidationError: 'name' is a required property ... raises a :class:`jsonschema.exceptions.ValidationError`. """ if '$schema' in self and self['$schema'] is not None: kwargs['cls'] = kwargs.pop('validator', None) return _records_state.validate(self, self['$schema'], **kwargs) return True
[docs] def replace_refs(self): """Replace the ``$ref`` keys within the JSON.""" return _records_state.replace_refs(self)
[docs] def dumps(self, **kwargs): """Return pure Python dictionary with record metadata.""" return deepcopy(dict(self))
[docs]class Record(RecordBase): """Define API for metadata creation and manipulation.""" @classmethod
[docs] def create(cls, data, id_=None, **kwargs): r"""Create a record instance and store it in database. Procedure followed: #. The signal :data:`invenio_records.signals.before_record_insert` is called with the data as function parameter. #. The record data is validate. #. The record is added in the database. #. The signal :data:`invenio_records.signals.after_record_insert` is called with the data as function parameter. :param data: Dict with record metadata. :param id_: Force the UUID for the record. :param \**kwargs: See below. :returns: A new Record instance. :Keyword Arguments: * **format_checker** -- An instance of class :class:`jsonschema.FormatChecker`, which contains validation rules for formats. See :func:`~invenio_records.api.RecordBase.validate` for details. * **validator** -- A :class:`jsonschema.IValidator` class that will be used to validate. See :func:`~invenio_records.api.RecordBase.validate` for details. """ from .models import RecordMetadata with db.session.begin_nested(): record = cls(data) before_record_insert.send(record) record.validate(**kwargs) record.model = RecordMetadata(id=id_, json=record) db.session.add(record.model) after_record_insert.send(record) return record
@classmethod
[docs] def get_record(cls, id_, with_deleted=False): """Get record instance. Raises database exception if record does not exists. :param id_: Record ID. :param with_deleted: If `True` then it includes deleted records. :returns: The Record instance. """ with db.session.no_autoflush: query = RecordMetadata.query.filter_by(id=id_) if not with_deleted: query = query.filter(RecordMetadata.json != None) # noqa obj = query.one() return cls(obj.json, model=obj)
@classmethod
[docs] def get_records(cls, ids, with_deleted=False): """Get multiple record instances. :param ids: List of record ID. :param with_deleted: If `True` then it includes deleted records. :returns: A list od Record instance. """ with db.session.no_autoflush: query = RecordMetadata.query.filter(RecordMetadata.id.in_(ids)) if not with_deleted: query = query.filter(RecordMetadata.json != None) # noqa return [cls(obj.json, model=obj) for obj in query.all()]
[docs] def patch(self, patch): """Patch record metadata. :params patch: Dictionary of record metadata. :returns: A new Record instance. """ data = apply_patch(dict(self), patch) return self.__class__(data, model=self.model)
[docs] def commit(self): """Store changes on current instance in database. Procedure followed: #. The signal :data:`invenio_records.signals.before_record_insert` is called with the record as function parameter. #. The record data is validate. #. The record is committed to the database. #. The signal :data:`invenio_records.signals.after_record_insert` is called with the record as function parameter. :returns: The Record instance. """ if self.model is None or self.model.json is None: raise MissingModelError() with db.session.begin_nested(): before_record_update.send(self) self.validate() self.model.json = dict(self) flag_modified(self.model, 'json') db.session.merge(self.model) after_record_update.send(self) return self
[docs] def delete(self, force=False): """Delete a record. If `force` is ``False``, the record is soft-deleted, i.e. the record stays in the database. This ensures e.g. that the same record identifier cannot be used twice, and that you can still retrieve the history of an object. If `force` is True, the record is completely removed from the database. Procedure followed: #. The signal :data:`invenio_records.signals.before_record_insert` is called with the record as function parameter. #. The record is deleted or soft-deleted. #. The signal :data:`invenio_records.signals.after_record_insert` is called with the record as function parameter. :param force: Completely remove record from database. :returns: The Record instance. """ if self.model is None: raise MissingModelError() with db.session.begin_nested(): before_record_delete.send(self) if force: db.session.delete(self.model) else: self.model.json = None db.session.merge(self.model) after_record_delete.send(self) return self
[docs] def revert(self, revision_id): """Revert to a specific revision. Procedure followed: #. The signal :data:`invenio_records.signals.before_record_insert` is called with the record as function parameter. #. The record is reverted. #. The signal :data:`invenio_records.signals.after_record_insert` is called with the reverted record as function parameter. :param revision_id: Specify with revision the record should be reverted. :returns: The new Record instance. """ if self.model is None: raise MissingModelError() revision = self.revisions[revision_id] with db.session.begin_nested(): before_record_revert.send(self) self.model.json = dict(revision) db.session.merge(self.model) after_record_revert.send(self) return self.__class__(self.model.json, model=self.model)
@property def revisions(self): """Get revision iterator.""" if self.model is None: raise MissingModelError() return RevisionsIterator(self.model)
[docs]class RecordRevision(RecordBase): """API for record revisions.""" def __init__(self, model): """Initialize revision.""" super(RecordRevision, self).__init__(model.json, model=model)
[docs]class RevisionsIterator(object): """Iterator for record revisions.""" def __init__(self, model): """Initialize iterator.""" self._it = None self.model = model def __len__(self): """Get number of revisions.""" return self.model.versions.count() def __iter__(self): """Get iterator.""" self._it = iter(self.model.versions) return self
[docs] def next(self): """Python 2.7 compatibility.""" return self.__next__() # pragma: no cover
def __next__(self): """Get next revision item.""" return RecordRevision(next(self._it)) def __getitem__(self, revision_id): """Get a specific revision.""" return RecordRevision(self.model.versions[revision_id]) def __contains__(self, revision_id): """Test if revision exists.""" try: self[revision_id] return True except IndexError: return False