# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015, 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.
"""OAI-PMH 2.0 response generator."""
from datetime import MINYEAR, datetime, timedelta
from flask import current_app, url_for
from invenio_db import db
from invenio_records.api import Record
from invenio_records.models import RecordMetadata
from lxml import etree
from lxml.etree import Element, ElementTree, SubElement
from .fetchers import oaiid_fetcher
from .models import OAISet
from .provider import OAIIDProvider
from .query import get_records
from .resumption_token import serialize
from .utils import datetime_to_datestamp, serializer
NS_OAIPMH = 'http://www.openarchives.org/OAI/2.0/'
NS_OAIPMH_XSD = 'http://www.openarchives.org/OAI/2.0/OAI-PMH.xsd'
NS_XSI = 'http://www.w3.org/2001/XMLSchema-instance'
NS_OAIDC = 'http://www.openarchives.org/OAI/2.0/oai_dc/'
NS_DC = "http://purl.org/dc/elements/1.1/"
NSMAP = {
None: NS_OAIPMH,
}
NSMAP_DESCRIPTION = {
'oai_dc': NS_OAIDC,
'dc': NS_DC,
'xsi': NS_XSI,
}
DATETIME_FORMATS = {
'YYYY-MM-DDThh:mm:ssZ': '%Y-%m-%dT%H:%M:%SZ',
'YYYY-MM-DD': '%Y-%m-%d',
}
[docs]def envelope(**kwargs):
"""Create OAI-PMH envelope for response."""
e_oaipmh = Element(etree.QName(NS_OAIPMH, 'OAI-PMH'), nsmap=NSMAP)
e_oaipmh.set(etree.QName(NS_XSI, 'schemaLocation'),
'{0} {1}'.format(NS_OAIPMH, NS_OAIPMH_XSD))
e_tree = ElementTree(element=e_oaipmh)
e_oaipmh.addprevious(etree.ProcessingInstruction(
'xml-stylesheet', 'type="text/xsl" href="{0}"'.format(url_for(
'invenio_oaiserver.static', filename='xsl/oai2.v1.0.xsl'))))
e_responseDate = SubElement(
e_oaipmh, etree.QName(
NS_OAIPMH, 'responseDate'))
# date should be first possible moment
e_responseDate.text = datetime_to_datestamp(datetime.utcnow())
e_request = SubElement(e_oaipmh, etree.QName(NS_OAIPMH, 'request'))
for key, value in kwargs.items():
if key == 'from_' or key == 'until':
value = datetime_to_datestamp(value)
elif key == 'resumptionToken':
value = value['token']
e_request.set(key, value)
e_request.text = url_for('invenio_oaiserver.response', _external=True)
return e_tree, e_oaipmh
[docs]def error(errors):
"""Create error element."""
e_tree, e_oaipmh = envelope()
for code, message in errors:
e_error = SubElement(e_oaipmh, etree.QName(NS_OAIPMH, 'error'))
e_error.set('code', code)
e_error.text = message
return e_tree
[docs]def verb(**kwargs):
"""Create OAI-PMH envelope for response with verb."""
e_tree, e_oaipmh = envelope(**kwargs)
e_element = SubElement(e_oaipmh, etree.QName(NS_OAIPMH, kwargs['verb']))
return e_tree, e_element
[docs]def identify(**kwargs):
"""Create OAI-PMH response for verb Identify."""
cfg = current_app.config
e_tree, e_identify = verb(**kwargs)
e_repositoryName = SubElement(
e_identify, etree.QName(NS_OAIPMH, 'repositoryName'))
e_repositoryName.text = cfg['OAISERVER_REPOSITORY_NAME']
e_baseURL = SubElement(e_identify, etree.QName(NS_OAIPMH, 'baseURL'))
e_baseURL.text = url_for('invenio_oaiserver.response', _external=True)
e_protocolVersion = SubElement(e_identify,
etree.QName(NS_OAIPMH, 'protocolVersion'))
e_protocolVersion.text = cfg['OAISERVER_PROTOCOL_VERSION']
for adminEmail in cfg['OAISERVER_ADMIN_EMAILS']:
e = SubElement(e_identify, etree.QName(NS_OAIPMH, 'adminEmail'))
e.text = adminEmail
e_earliestDatestamp = SubElement(
e_identify, etree.QName(
NS_OAIPMH, 'earliestDatestamp'))
e_earliestDatestamp.text = datetime_to_datestamp(
db.session.query(db.func.min(RecordMetadata.created)).scalar() or
datetime(MINYEAR, 1, 1)
)
e_deletedRecord = SubElement(e_identify,
etree.QName(NS_OAIPMH, 'deletedRecord'))
e_deletedRecord.text = 'no'
e_granularity = SubElement(e_identify,
etree.QName(NS_OAIPMH, 'granularity'))
assert cfg['OAISERVER_GRANULARITY'] in DATETIME_FORMATS
e_granularity.text = cfg['OAISERVER_GRANULARITY']
compressions = cfg['OAISERVER_COMPRESSIONS']
if compressions != ['identity']:
for compression in compressions:
e_compression = SubElement(e_identify,
etree.QName(NS_OAIPMH, 'compression'))
e_compression.text = compression
for description in cfg.get('OAISERVER_DESCRIPTIONS', []):
e_description = SubElement(e_identify,
etree.QName(NS_OAIPMH, 'description'))
e_description.append(etree.fromstring(description))
return e_tree
[docs]def resumption_token(parent, pagination, **kwargs):
"""Attach resumption token element to a parent."""
# Do not add resumptionToken if all results fit to the first page.
if pagination.page == 1 and not pagination.has_next:
return
token = serialize(pagination, **kwargs)
e_resumptionToken = SubElement(parent, etree.QName(NS_OAIPMH,
'resumptionToken'))
if pagination.total:
expiration_date = datetime.utcnow() + timedelta(
seconds=current_app.config[
'OAISERVER_RESUMPTION_TOKEN_EXPIRE_TIME'
]
)
e_resumptionToken.set('expirationDate', datetime_to_datestamp(
expiration_date
))
e_resumptionToken.set('cursor', str(
(pagination.page - 1) * pagination.per_page
))
e_resumptionToken.set('completeListSize', str(pagination.total))
if token:
e_resumptionToken.text = token
[docs]def listsets(**kwargs):
"""Create OAI-PMH response for ListSets verb."""
e_tree, e_listsets = verb(**kwargs)
page = kwargs.get('resumptionToken', {}).get('page', 1)
size = current_app.config['OAISERVER_PAGE_SIZE']
oai_sets = OAISet.query.paginate(page=page, per_page=size, error_out=False)
for oai_set in oai_sets.items:
e_set = SubElement(e_listsets, etree.QName(NS_OAIPMH, 'set'))
e_setSpec = SubElement(e_set, etree.QName(NS_OAIPMH, 'setSpec'))
e_setSpec.text = oai_set.spec
e_setName = SubElement(e_set, etree.QName(NS_OAIPMH, 'setName'))
e_setName.text = oai_set.name
if oai_set.description:
e_setDescription = SubElement(e_set, etree.QName(NS_OAIPMH,
'setDescription'))
e_dc = SubElement(
e_setDescription, etree.QName(NS_OAIDC, 'dc'),
nsmap=NSMAP_DESCRIPTION
)
e_dc.set(etree.QName(NS_XSI, 'schemaLocation'), NS_OAIDC)
e_description = SubElement(e_dc, etree.QName(NS_DC, 'description'))
e_description.text = oai_set.description
resumption_token(e_listsets, oai_sets, **kwargs)
return e_tree
[docs]def getrecord(**kwargs):
"""Create OAI-PMH response for verb Identify."""
record_dumper = serializer(kwargs['metadataPrefix'])
pid = OAIIDProvider.get(pid_value=kwargs['identifier']).pid
record = Record.get_record(pid.object_uuid)
e_tree, e_getrecord = verb(**kwargs)
e_record = SubElement(e_getrecord, etree.QName(NS_OAIPMH, 'record'))
header(
e_record,
identifier=pid.pid_value,
datestamp=record.updated,
sets=record.get('_oai', {}).get('sets', []),
)
e_metadata = SubElement(e_record,
etree.QName(NS_OAIPMH, 'metadata'))
e_metadata.append(record_dumper(pid, {'_source': record}))
return e_tree
[docs]def listidentifiers(**kwargs):
"""Create OAI-PMH response for verb ListIdentifiers."""
e_tree, e_listidentifiers = verb(**kwargs)
result = get_records(**kwargs)
for record in result.items:
pid = oaiid_fetcher(record['id'], record['json']['_source'])
header(
e_listidentifiers,
identifier=pid.pid_value,
datestamp=record['updated'],
sets=record['json']['_source'].get('_oai', {}).get('sets', []),
)
resumption_token(e_listidentifiers, result, **kwargs)
return e_tree
[docs]def listrecords(**kwargs):
"""Create OAI-PMH response for verb ListRecords."""
record_dumper = serializer(kwargs['metadataPrefix'])
e_tree, e_listrecords = verb(**kwargs)
result = get_records(**kwargs)
for record in result.items:
pid = oaiid_fetcher(record['id'], record['json']['_source'])
e_record = SubElement(e_listrecords,
etree.QName(NS_OAIPMH, 'record'))
header(
e_record,
identifier=pid.pid_value,
datestamp=record['updated'],
sets=record['json']['_source'].get('_oai', {}).get('sets', []),
)
e_metadata = SubElement(e_record, etree.QName(NS_OAIPMH, 'metadata'))
e_metadata.append(record_dumper(pid, record['json']))
resumption_token(e_listrecords, result, **kwargs)
return e_tree