Source code for inspire_crawler.tasks

# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2016 CERN.
#
# INSPIRE is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.

"""Celery tasks for dealing with crawler."""

from __future__ import absolute_import, print_function

import json
import os

from six.moves.urllib.parse import urlparse

from celery import shared_task

from flask import current_app

from invenio_db import db

from invenio_workflows.proxies import workflow_object_class

from .errors import (
    CrawlerInvalidResultsPath,
    CrawlerJobError,
    CrawlerScheduleError,
)
from .models import CrawlerJob, JobStatus, CrawlerWorkflowObject


@shared_task(ignore_results=True)
[docs]def submit_results(job_id, errors, log_file, results_uri, results_data=None): """Receive the submission of the results of a crawl job. Then it spawns the appropiate workflow according to whichever workflow the crawl job specifies. :param job_id: Id of the crawler job. :param errors: Errors that happened, if any (seems ambiguous) :param log_file: Path to the log file of the crawler job. :param results_uri: URI to the file containing the results of the crawl job, namely the records extracted. :param results_data: Optional data payload with the results list, to skip retrieving them from the `results_uri`, useful for slow or unreliable storages. """ def _extract_results_data(results_path): if not os.path.exists(results_path): raise CrawlerInvalidResultsPath( "Path specified in result does not exist: {0}".format( results_path ) ) current_app.logger.info( 'Parsing records from {}'.format(results_path) ) results_data = [] with open(results_path) as records: lines = ( line.strip() for line in records if line.strip() ) for line in lines: current_app.logger.debug( 'Reading record line: {}'.format(line) ) record = json.loads(line) results_data.append(record) current_app.logger.debug( 'Read {} records from {}'.format(len(results_data), results_path) ) return results_data results_path = urlparse(results_uri).path job = CrawlerJob.get_by_job(job_id) job.logs = log_file job.results = results_uri if errors: job.status = JobStatus.ERROR job.save() db.session.commit() raise CrawlerJobError(str(errors)) if results_data is None: results_data = _extract_results_data(results_path) for record in results_data: current_app.logger.debug( 'Parsing record: {}'.format(record) ) obj = workflow_object_class.create(data=record) obj.extra_data['crawler_job_id'] = job_id obj.extra_data['crawler_results_path'] = results_path obj.extra_data['record_extra'] = record.pop('extra_data', {}) obj.data_type = current_app.config['CRAWLER_DATA_TYPE'] obj.save() db.session.commit() crawler_object = CrawlerWorkflowObject( job_id=job_id, object_id=obj.id ) db.session.add(crawler_object) obj.start_workflow(job.workflow, delayed=True) current_app.logger.info('Parsed {} records.'.format(len(results_data))) job.status = JobStatus.FINISHED job.save() db.session.commit()
@shared_task(ignore_results=True)
[docs]def schedule_crawl(spider, workflow, **kwargs): """Schedule a crawl using configuration from the workflow objects.""" from inspire_crawler.utils import get_crawler_instance crawler = get_crawler_instance() crawler_settings = current_app.config.get('CRAWLER_SETTINGS') crawler_settings.update(kwargs.get("crawler_settings", {})) crawler_arguments = kwargs crawler_arguments.update( current_app.config.get('CRAWLER_SPIDER_ARGUMENTS', {}).get(spider, {}) ) job_id = crawler.schedule( project=current_app.config.get('CRAWLER_PROJECT'), spider=spider, settings=crawler_settings, **crawler_arguments ) if job_id: CrawlerJob.create( job_id=job_id, spider=spider, workflow=workflow, ) db.session.commit() current_app.logger.info("Scheduled job {0}".format(job_id)) else: raise CrawlerScheduleError( "Could not schedule '{0}' spider for project '{1}'".format( spider, current_app.config.get('CRAWLER_PROJECT') ) )