From c82ef47ec6a450d1f5f587987cb2b6be5af5eb80 Mon Sep 17 00:00:00 2001 From: Peter Weber Date: Fri, 23 Aug 2019 08:45:15 +0200 Subject: [PATCH] oaiharvesting: bulk indexing of oai records * Improves indexing of harvested oai records. Co-Authored-by: Peter Weber --- rero_ils/config.py | 1 + rero_ils/modules/ebooks/tasks.py | 24 ++++++++++++++++--- rero_ils/modules/utils.py | 30 +++++++++++++++++++++++- tests/api/conftest.py | 4 ---- tests/ui/documents/test_documents_api.py | 16 +++++++++++-- 5 files changed, 65 insertions(+), 10 deletions(-) diff --git a/rero_ils/config.py b/rero_ils/config.py index a9997c2f18..b9bf007291 100644 --- a/rero_ils/config.py +++ b/rero_ils/config.py @@ -224,6 +224,7 @@ def _(x): } CELERY_BROKER_HEARTBEAT = 0 +INDEXER_BULK_REQUEST_TIMEOUT = 60 # Database # ======== diff --git a/rero_ils/modules/ebooks/tasks.py b/rero_ils/modules/ebooks/tasks.py index 14fa927c0a..574ea65b19 100644 --- a/rero_ils/modules/ebooks/tasks.py +++ b/rero_ils/modules/ebooks/tasks.py @@ -19,10 +19,14 @@ from __future__ import absolute_import, print_function +from time import sleep + from celery import shared_task +from celery.task.control import inspect from flask import current_app from ..documents.api import Document, DocumentsSearch +from ..utils import bulk_index @shared_task(ignore_result=True) @@ -30,6 +34,7 @@ def create_records(records): """Records creation and indexing.""" n_updated = 0 n_created = 0 + uuids = [] for record in records: record['$schema'] = \ 'https://ils.rero.ch/schema/documents/document-minimal-v0.0.1.json' @@ -55,15 +60,28 @@ def create_records(records): existing_record.update( record, dbcommit=True, - reindex=True) + reindex=False + ) n_updated += 1 + uuids.append(existing_record.id) else: # create a new record - Document.create( + new_record = Document.create( record, dbcommit=True, - reindex=True + reindex=False ) n_created += 1 + uuids.append(new_record.id) + bulk_index(uuids, process=True) + # wait for bulk index task to finish + inspector = inspect() + reserved = inspector.reserved() + if reserved: + while any(a != [] for a in reserved.values()): + reserved = inspector.reserved() + sleep(1) + current_app.logger.info('create_records: {} updated, {} new' .format(n_updated, n_created)) + return n_created, n_updated diff --git a/rero_ils/modules/utils.py b/rero_ils/modules/utils.py index c5513158ad..b4640ed67a 100644 --- a/rero_ils/modules/utils.py +++ b/rero_ils/modules/utils.py @@ -18,8 +18,10 @@ """Utilities for rero-ils editor.""" from datetime import time +from time import sleep -from flask import url_for +import click +from flask import current_app from invenio_indexer.api import RecordIndexer @@ -30,3 +32,29 @@ def strtotime(strtime): hour=int(splittime[0]), minute=int(splittime[1]) ) + + +def bulk_index(uuids, process=False, verbose=False): + """Bulk index records.""" + if verbose: + click.echo(' add to index: {count}'.format(count=len(uuids))) + indexer = RecordIndexer() + retry = True + minutes = 1 + while retry: + try: + indexer.bulk_index(uuids) + retry = False + except Exception as exc: + msg = 'Bulk Index Error: retry in {minutes} min {exc}'.format( + exc=exc, + minutes=minutes + ) + current_app.logger.error(msg) + if verbose: + click.secho(msg, fg='red') + sleep(minutes * 60) + retry = True + minutes *= 2 + if process: + indexer.process_bulk_queue() diff --git a/tests/api/conftest.py b/tests/api/conftest.py index 4103654ae8..07f4ec4880 100644 --- a/tests/api/conftest.py +++ b/tests/api/conftest.py @@ -20,10 +20,6 @@ from __future__ import absolute_import, print_function import pytest -from utils import flush_index - -from rero_ils.modules.libraries.api import LibrariesSearch, Library -from rero_ils.modules.locations.api import Location, LocationsSearch @pytest.fixture(scope="module") diff --git a/tests/ui/documents/test_documents_api.py b/tests/ui/documents/test_documents_api.py index 590543f924..194a3923d5 100644 --- a/tests/ui/documents/test_documents_api.py +++ b/tests/ui/documents/test_documents_api.py @@ -21,10 +21,11 @@ import mock import pytest -from utils import get_mapping, mock_response +from utils import flush_index, mock_response -from rero_ils.modules.documents.api import Document, \ +from rero_ils.modules.documents.api import Document, DocumentsSearch, \ document_id_fetcher +from rero_ils.modules.ebooks.tasks import create_records from rero_ils.modules.mef_persons.api import MefPersonsSearch @@ -56,6 +57,17 @@ def test_document_can_delete(app, document_data_tmp): assert document.can_delete +def test_document_create_records(app, ebook_1_data, ebook_2_data): + """Test can create harvested records.""" + n_created, n_updated = create_records([ebook_1_data, ebook_2_data]) + flush_index(DocumentsSearch.Meta.index) + assert n_created == 2 + assert n_updated == 0 + n_created, n_updated = create_records([ebook_1_data]) + assert n_created == 0 + assert n_updated == 1 + + def test_document_can_delete_harvested(app, ebook_1_data): """Test can delete for harvested records.""" document = Document.create(ebook_1_data, delete_pid=True)