# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.
"""API for indexing of records."""
from __future__ import absolute_import, print_function
import copy
from contextlib import contextmanager
import pytz
from celery import current_app as current_celery_app
from elasticsearch.helpers import bulk
from flask import current_app
from invenio_records.api import Record
from invenio_search import current_search_client
from kombu import Producer as KombuProducer
from kombu.compat import Consumer
from sqlalchemy.orm.exc import NoResultFound
from .proxies import current_record_to_index
from .signals import before_record_index
[docs]class Producer(KombuProducer):
"""Producer validating published messages.
For more information visit :class:`kombu:kombu.Producer`.
"""
[docs] def publish(self, data, **kwargs):
"""Validate operation type."""
assert data.get('op') in {'index', 'create', 'delete', 'update'}
return super(Producer, self).publish(data, **kwargs)
[docs]class RecordIndexer(object):
r"""Provide an interface for indexing records in Elasticsearch.
Bulk indexing works by queuing requests for indexing records and processing
these requests in bulk.
"""
def __init__(self, search_client=None, exchange=None, queue=None,
routing_key=None, version_type=None, record_to_index=None):
"""Initialize indexer.
:param search_client: Elasticsearch client.
(Default: ``current_search_client``)
:param exchange: A :class:`kombu.Exchange` instance for message queue.
:param queue: A :class:`kombu.Queue` instance for message queue.
:param routing_key: Routing key for message queue.
:param version_type: Elasticsearch version type.
(Default: ``external_gte``)
:param record_to_index: Function to extract the index and doc_type
from the record.
"""
self.client = search_client or current_search_client
self._exchange = None
self._queue = None
self._record_to_index = record_to_index or current_record_to_index
self._routing_key = None
self._version_type = version_type or 'external_gte'
[docs] def record_to_index(self, record):
"""Get index/doc_type given a record.
:param record: The record where to look for the information.
:returns: A tuple (index, doc_type).
"""
return self._record_to_index(record)
@property
def mq_queue(self):
"""Message Queue queue.
:returns: The Message Queue queue.
"""
return self._queue or current_app.config['INDEXER_MQ_QUEUE']
@property
def mq_exchange(self):
"""Message Queue exchange.
:returns: The Message Queue exchange.
"""
return self._exchange or current_app.config['INDEXER_MQ_EXCHANGE']
@property
def mq_routing_key(self):
"""Message Queue routing key.
:returns: The Message Queue routing key.
"""
return (self._routing_key or
current_app.config['INDEXER_MQ_ROUTING_KEY'])
#
# High-level API
#
[docs] def index(self, record):
"""Index a record.
The caller is responsible for ensuring that the record has already been
committed to the database. If a newer version of a record has already
been indexed then the provided record will not be indexed. This
behavior can be controlled by providing a different ``version_type``
when initializing ``RecordIndexer``.
:param record: Record instance.
"""
index, doc_type = self.record_to_index(record)
return self.client.index(
id=str(record.id),
version=record.revision_id,
version_type=self._version_type,
index=index,
doc_type=doc_type,
body=self._prepare_record(record, index, doc_type),
)
[docs] def index_by_id(self, record_uuid):
"""Index a record by record identifier.
:param record_uuid: Record identifier.
"""
return self.index(Record.get_record(record_uuid))
[docs] def delete(self, record):
"""Delete a record.
:param record: Record instance.
"""
index, doc_type = self.record_to_index(record)
return self.client.delete(
id=str(record.id),
index=index,
doc_type=doc_type,
)
[docs] def delete_by_id(self, record_uuid):
"""Delete record from index by record identifier."""
self.delete(Record.get_record(record_uuid))
[docs] def bulk_index(self, record_id_iterator):
"""Bulk index records.
:param record_id_iterator: Iterator yielding record UUIDs.
"""
self._bulk_op(record_id_iterator, 'index')
[docs] def bulk_delete(self, record_id_iterator):
"""Bulk delete records from index.
:param record_id_iterator: Iterator yielding record UUIDs.
"""
self._bulk_op(record_id_iterator, 'delete')
[docs] def process_bulk_queue(self):
"""Process bulk indexing queue."""
with current_celery_app.pool.acquire(block=True) as conn:
consumer = Consumer(
connection=conn,
queue=self.mq_queue.name,
exchange=self.mq_exchange.name,
routing_key=self.mq_routing_key,
)
req_timeout = current_app.config['INDEXER_BULK_REQUEST_TIMEOUT']
count = bulk(
self.client,
self._actionsiter(consumer.iterqueue()),
stats_only=True,
request_timeout=req_timeout,
)
consumer.close()
return count
@contextmanager
[docs] def create_producer(self):
"""Context manager that yields an instance of ``Producer``."""
with current_celery_app.pool.acquire(block=True) as conn:
yield Producer(
conn,
exchange=self.mq_exchange,
routing_key=self.mq_routing_key,
auto_declare=True,
)
#
# Low-level implementation
#
def _bulk_op(self, record_id_iterator, op_type, index=None, doc_type=None):
"""Index record in Elasticsearch asynchronously.
:param record_id_iterator: Iterator that yields record UUIDs.
:param op_type: Indexing operation (one of ``index``, ``create``,
``delete`` or ``update``).
:param index: The Elasticsearch index. (Default: ``None``)
:param doc_type: The Elasticsearch doc_type. (Default: ``None``)
"""
with self.create_producer() as producer:
for rec in record_id_iterator:
producer.publish(dict(
id=str(rec),
op=op_type,
index=index,
doc_type=doc_type
))
def _actionsiter(self, message_iterator):
"""Iterate bulk actions.
:param message_iterator: Iterator yielding messages from a queue.
"""
for message in message_iterator:
payload = message.decode()
try:
if payload['op'] == 'delete':
yield self._delete_action(payload)
else:
yield self._index_action(payload)
message.ack()
except NoResultFound:
message.reject()
except Exception:
message.reject()
current_app.logger.error(
"Failed to index record {0}".format(payload.get('id')),
exc_info=True)
def _delete_action(self, payload):
"""Bulk delete action.
:param payload: Decoded message body.
:returns: Dictionary defining an Elasticsearch bulk 'delete' action.
"""
index, doc_type = payload.get('index'), payload.get('doc_type')
if not (index and doc_type):
record = Record.get_record(payload['id'])
index, doc_type = self.record_to_index(record)
return {
'_op_type': 'delete',
'_index': index,
'_type': doc_type,
'_id': payload['id'],
}
def _index_action(self, payload):
"""Bulk index action.
:param payload: Decoded message body.
:returns: Dictionary defining an Elasticsearch bulk 'index' action.
"""
record = Record.get_record(payload['id'])
index, doc_type = self.record_to_index(record)
return {
'_op_type': 'index',
'_index': index,
'_type': doc_type,
'_id': str(record.id),
'_version': record.revision_id,
'_version_type': self._version_type,
'_source': self._prepare_record(record, index, doc_type),
}
@staticmethod
def _prepare_record(record, index, doc_type):
"""Prepare record data for indexing.
:param record: The record to prepare.
:param index: The Elasticsearch index.
:param doc_type: The Elasticsearch document type.
:returns: The record metadata.
"""
if current_app.config['INDEXER_REPLACE_REFS']:
data = copy.deepcopy(record.replace_refs())
else:
data = record.dumps()
data['_created'] = pytz.utc.localize(record.created).isoformat() \
if record.created else None
data['_updated'] = pytz.utc.localize(record.updated).isoformat() \
if record.updated else None
# Allow modification of data prior to sending to Elasticsearch.
before_record_index.send(
current_app._get_current_object(),
json=data,
record=record,
index=index,
doc_type=doc_type,
)
return data