Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

oaiharvesting: bulk indexing of oai records #456

Merged
merged 1 commit into from
Sep 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rero_ils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def _(x):

}
CELERY_BROKER_HEARTBEAT = 0
INDEXER_BULK_REQUEST_TIMEOUT = 60

# Database
# ========
Expand Down
24 changes: 21 additions & 3 deletions rero_ils/modules/ebooks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@

from __future__ import absolute_import, print_function

from time import sleep

from celery import shared_task
from celery.task.control import inspect
from flask import current_app

from ..documents.api import Document, DocumentsSearch
from ..utils import bulk_index


@shared_task(ignore_result=True)
def create_records(records):
"""Records creation and indexing."""
n_updated = 0
n_created = 0
uuids = []
for record in records:
record['$schema'] = \
'https://ils.rero.ch/schema/documents/document-minimal-v0.0.1.json'
Expand All @@ -55,15 +60,28 @@ def create_records(records):
existing_record.update(
record,
dbcommit=True,
reindex=True)
reindex=False
)
n_updated += 1
uuids.append(existing_record.id)
else:
# create a new record
Document.create(
new_record = Document.create(
record,
dbcommit=True,
reindex=True
reindex=False
)
n_created += 1
uuids.append(new_record.id)
bulk_index(uuids, process=True)
# wait for bulk index task to finish
inspector = inspect()
reserved = inspector.reserved()
if reserved:
while any(a != [] for a in reserved.values()):
reserved = inspector.reserved()
sleep(1)

current_app.logger.info('create_records: {} updated, {} new'
.format(n_updated, n_created))
return n_created, n_updated
30 changes: 29 additions & 1 deletion rero_ils/modules/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
"""Utilities for rero-ils editor."""

from datetime import time
from time import sleep

from flask import url_for
import click
from flask import current_app
from invenio_indexer.api import RecordIndexer


Expand All @@ -30,3 +32,29 @@ def strtotime(strtime):
hour=int(splittime[0]),
minute=int(splittime[1])
)


def bulk_index(uuids, process=False, verbose=False):
"""Bulk index records."""
if verbose:
click.echo(' add to index: {count}'.format(count=len(uuids)))
rerowep marked this conversation as resolved.
Show resolved Hide resolved
indexer = RecordIndexer()
retry = True
minutes = 1
while retry:
try:
indexer.bulk_index(uuids)
retry = False
except Exception as exc:
msg = 'Bulk Index Error: retry in {minutes} min {exc}'.format(
exc=exc,
minutes=minutes
)
current_app.logger.error(msg)
if verbose:
click.secho(msg, fg='red')
sleep(minutes * 60)
retry = True
minutes *= 2
if process:
indexer.process_bulk_queue()
4 changes: 0 additions & 4 deletions tests/api/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
from __future__ import absolute_import, print_function

import pytest
from utils import flush_index

from rero_ils.modules.libraries.api import LibrariesSearch, Library
from rero_ils.modules.locations.api import Location, LocationsSearch


@pytest.fixture(scope="module")
Expand Down
18 changes: 15 additions & 3 deletions tests/ui/documents/test_documents_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

import mock
import pytest
from utils import get_mapping, mock_response
from utils import mock_response

from rero_ils.modules.documents.api import Document, \
document_id_fetcher
from rero_ils.modules.documents.api import Document, document_id_fetcher
from rero_ils.modules.ebooks.tasks import create_records
from rero_ils.modules.mef_persons.api import MefPersonsSearch


Expand Down Expand Up @@ -56,6 +56,18 @@ def test_document_can_delete(app, document_data_tmp):
assert document.can_delete


def test_document_create_records(app, ebook_1_data, ebook_2_data):
"""Test can create harvested records."""
n_created, n_updated = create_records([ebook_1_data, ebook_2_data])
assert n_created == 2
assert n_updated == 0

# TODO: find a way to execute celery worker tasks in travis tests
# n_created, n_updated = create_records([ebook_1_data])
# assert n_created == 0
# assert n_updated == 1


def test_document_can_delete_harvested(app, ebook_1_data):
"""Test can delete for harvested records."""
document = Document.create(ebook_1_data, delete_pid=True)
Expand Down