Skip to content

Commit

Permalink
oaiharvesting: bulk indexing of oai records
Browse files Browse the repository at this point in the history
* Improves indexing of harvested oai records.

Co-Authored-by: Peter Weber <[email protected]>
  • Loading branch information
rerowep and rerowep committed Aug 28, 2019
1 parent aac70e6 commit c82ef47
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 10 deletions.
1 change: 1 addition & 0 deletions rero_ils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def _(x):

}
CELERY_BROKER_HEARTBEAT = 0
INDEXER_BULK_REQUEST_TIMEOUT = 60

# Database
# ========
Expand Down
24 changes: 21 additions & 3 deletions rero_ils/modules/ebooks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@

from __future__ import absolute_import, print_function

from time import sleep

from celery import shared_task
from celery.task.control import inspect
from flask import current_app

from ..documents.api import Document, DocumentsSearch
from ..utils import bulk_index


@shared_task(ignore_result=True)
def create_records(records):
"""Records creation and indexing."""
n_updated = 0
n_created = 0
uuids = []
for record in records:
record['$schema'] = \
'https://ils.rero.ch/schema/documents/document-minimal-v0.0.1.json'
Expand All @@ -55,15 +60,28 @@ def create_records(records):
existing_record.update(
record,
dbcommit=True,
reindex=True)
reindex=False
)
n_updated += 1
uuids.append(existing_record.id)
else:
# create a new record
Document.create(
new_record = Document.create(
record,
dbcommit=True,
reindex=True
reindex=False
)
n_created += 1
uuids.append(new_record.id)
bulk_index(uuids, process=True)
# wait for bulk index task to finish
inspector = inspect()
reserved = inspector.reserved()
if reserved:
while any(a != [] for a in reserved.values()):
reserved = inspector.reserved()
sleep(1)

current_app.logger.info('create_records: {} updated, {} new'
.format(n_updated, n_created))
return n_created, n_updated
30 changes: 29 additions & 1 deletion rero_ils/modules/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
"""Utilities for rero-ils editor."""

from datetime import time
from time import sleep

from flask import url_for
import click
from flask import current_app
from invenio_indexer.api import RecordIndexer


Expand All @@ -30,3 +32,29 @@ def strtotime(strtime):
hour=int(splittime[0]),
minute=int(splittime[1])
)


def bulk_index(uuids, process=False, verbose=False):
"""Bulk index records."""
if verbose:
click.echo(' add to index: {count}'.format(count=len(uuids)))
indexer = RecordIndexer()
retry = True
minutes = 1
while retry:
try:
indexer.bulk_index(uuids)
retry = False
except Exception as exc:
msg = 'Bulk Index Error: retry in {minutes} min {exc}'.format(
exc=exc,
minutes=minutes
)
current_app.logger.error(msg)
if verbose:
click.secho(msg, fg='red')
sleep(minutes * 60)
retry = True
minutes *= 2
if process:
indexer.process_bulk_queue()
4 changes: 0 additions & 4 deletions tests/api/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
from __future__ import absolute_import, print_function

import pytest
from utils import flush_index

from rero_ils.modules.libraries.api import LibrariesSearch, Library
from rero_ils.modules.locations.api import Location, LocationsSearch


@pytest.fixture(scope="module")
Expand Down
16 changes: 14 additions & 2 deletions tests/ui/documents/test_documents_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@

import mock
import pytest
from utils import get_mapping, mock_response
from utils import flush_index, mock_response

from rero_ils.modules.documents.api import Document, \
from rero_ils.modules.documents.api import Document, DocumentsSearch, \
document_id_fetcher
from rero_ils.modules.ebooks.tasks import create_records
from rero_ils.modules.mef_persons.api import MefPersonsSearch


Expand Down Expand Up @@ -56,6 +57,17 @@ def test_document_can_delete(app, document_data_tmp):
assert document.can_delete


def test_document_create_records(app, ebook_1_data, ebook_2_data):
"""Test can create harvested records."""
n_created, n_updated = create_records([ebook_1_data, ebook_2_data])
flush_index(DocumentsSearch.Meta.index)
assert n_created == 2
assert n_updated == 0
n_created, n_updated = create_records([ebook_1_data])
assert n_created == 0
assert n_updated == 1


def test_document_can_delete_harvested(app, ebook_1_data):
"""Test can delete for harvested records."""
document = Document.create(ebook_1_data, delete_pid=True)
Expand Down

0 comments on commit c82ef47

Please sign in to comment.