Source code for invenio_indexer.api

# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.

"""API for indexing of records."""

from __future__ import absolute_import, print_function

import copy
from contextlib import contextmanager

import pytz
from celery import current_app as current_celery_app
from elasticsearch.helpers import bulk
from flask import current_app
from invenio_records.api import Record
from invenio_search import current_search_client
from kombu import Producer as KombuProducer
from kombu.compat import Consumer
from sqlalchemy.orm.exc import NoResultFound

from .proxies import current_record_to_index
from .signals import before_record_index


[docs]class Producer(KombuProducer): """Producer validating published messages. For more information visit :class:`kombu:kombu.Producer`. """
[docs] def publish(self, data, **kwargs): """Validate operation type.""" assert data.get('op') in {'index', 'create', 'delete', 'update'} return super(Producer, self).publish(data, **kwargs)
[docs]class RecordIndexer(object): r"""Provide an interface for indexing records in Elasticsearch. Bulk indexing works by queuing requests for indexing records and processing these requests in bulk. """ def __init__(self, search_client=None, exchange=None, queue=None, routing_key=None, version_type=None, record_to_index=None): """Initialize indexer. :param search_client: Elasticsearch client. (Default: ``current_search_client``) :param exchange: A :class:`kombu.Exchange` instance for message queue. :param queue: A :class:`kombu.Queue` instance for message queue. :param routing_key: Routing key for message queue. :param version_type: Elasticsearch version type. (Default: ``external_gte``) :param record_to_index: Function to extract the index and doc_type from the record. """ self.client = search_client or current_search_client self._exchange = None self._queue = None self._record_to_index = record_to_index or current_record_to_index self._routing_key = None self._version_type = version_type or 'external_gte'
[docs] def record_to_index(self, record): """Get index/doc_type given a record. :param record: The record where to look for the information. :returns: A tuple (index, doc_type). """ return self._record_to_index(record)
@property def mq_queue(self): """Message Queue queue. :returns: The Message Queue queue. """ return self._queue or current_app.config['INDEXER_MQ_QUEUE'] @property def mq_exchange(self): """Message Queue exchange. :returns: The Message Queue exchange. """ return self._exchange or current_app.config['INDEXER_MQ_EXCHANGE'] @property def mq_routing_key(self): """Message Queue routing key. :returns: The Message Queue routing key. """ return (self._routing_key or current_app.config['INDEXER_MQ_ROUTING_KEY']) # # High-level API #
[docs] def index(self, record): """Index a record. The caller is responsible for ensuring that the record has already been committed to the database. If a newer version of a record has already been indexed then the provided record will not be indexed. This behavior can be controlled by providing a different ``version_type`` when initializing ``RecordIndexer``. :param record: Record instance. """ index, doc_type = self.record_to_index(record) return self.client.index( id=str(record.id), version=record.revision_id, version_type=self._version_type, index=index, doc_type=doc_type, body=self._prepare_record(record, index, doc_type), )
[docs] def index_by_id(self, record_uuid): """Index a record by record identifier. :param record_uuid: Record identifier. """ return self.index(Record.get_record(record_uuid))
[docs] def delete(self, record): """Delete a record. :param record: Record instance. """ index, doc_type = self.record_to_index(record) return self.client.delete( id=str(record.id), index=index, doc_type=doc_type, )
[docs] def delete_by_id(self, record_uuid): """Delete record from index by record identifier.""" self.delete(Record.get_record(record_uuid))
[docs] def bulk_index(self, record_id_iterator): """Bulk index records. :param record_id_iterator: Iterator yielding record UUIDs. """ self._bulk_op(record_id_iterator, 'index')
[docs] def bulk_delete(self, record_id_iterator): """Bulk delete records from index. :param record_id_iterator: Iterator yielding record UUIDs. """ self._bulk_op(record_id_iterator, 'delete')
[docs] def process_bulk_queue(self): """Process bulk indexing queue.""" with current_celery_app.pool.acquire(block=True) as conn: consumer = Consumer( connection=conn, queue=self.mq_queue.name, exchange=self.mq_exchange.name, routing_key=self.mq_routing_key, ) req_timeout = current_app.config['INDEXER_BULK_REQUEST_TIMEOUT'] count = bulk( self.client, self._actionsiter(consumer.iterqueue()), stats_only=True, request_timeout=req_timeout, ) consumer.close() return count
@contextmanager
[docs] def create_producer(self): """Context manager that yields an instance of ``Producer``.""" with current_celery_app.pool.acquire(block=True) as conn: yield Producer( conn, exchange=self.mq_exchange, routing_key=self.mq_routing_key, auto_declare=True, )
# # Low-level implementation # def _bulk_op(self, record_id_iterator, op_type, index=None, doc_type=None): """Index record in Elasticsearch asynchronously. :param record_id_iterator: Iterator that yields record UUIDs. :param op_type: Indexing operation (one of ``index``, ``create``, ``delete`` or ``update``). :param index: The Elasticsearch index. (Default: ``None``) :param doc_type: The Elasticsearch doc_type. (Default: ``None``) """ with self.create_producer() as producer: for rec in record_id_iterator: producer.publish(dict( id=str(rec), op=op_type, index=index, doc_type=doc_type )) def _actionsiter(self, message_iterator): """Iterate bulk actions. :param message_iterator: Iterator yielding messages from a queue. """ for message in message_iterator: payload = message.decode() try: if payload['op'] == 'delete': yield self._delete_action(payload) else: yield self._index_action(payload) message.ack() except NoResultFound: message.reject() except Exception: message.reject() current_app.logger.error( "Failed to index record {0}".format(payload.get('id')), exc_info=True) def _delete_action(self, payload): """Bulk delete action. :param payload: Decoded message body. :returns: Dictionary defining an Elasticsearch bulk 'delete' action. """ index, doc_type = payload.get('index'), payload.get('doc_type') if not (index and doc_type): record = Record.get_record(payload['id']) index, doc_type = self.record_to_index(record) return { '_op_type': 'delete', '_index': index, '_type': doc_type, '_id': payload['id'], } def _index_action(self, payload): """Bulk index action. :param payload: Decoded message body. :returns: Dictionary defining an Elasticsearch bulk 'index' action. """ record = Record.get_record(payload['id']) index, doc_type = self.record_to_index(record) return { '_op_type': 'index', '_index': index, '_type': doc_type, '_id': str(record.id), '_version': record.revision_id, '_version_type': self._version_type, '_source': self._prepare_record(record, index, doc_type), } @staticmethod def _prepare_record(record, index, doc_type): """Prepare record data for indexing. :param record: The record to prepare. :param index: The Elasticsearch index. :param doc_type: The Elasticsearch document type. :returns: The record metadata. """ if current_app.config['INDEXER_REPLACE_REFS']: data = copy.deepcopy(record.replace_refs()) else: data = record.dumps() data['_created'] = pytz.utc.localize(record.created).isoformat() \ if record.created else None data['_updated'] = pytz.utc.localize(record.updated).isoformat() \ if record.updated else None # Allow modification of data prior to sending to Elasticsearch. before_record_index.send( current_app._get_current_object(), json=data, record=record, index=index, doc_type=doc_type, ) return data