Skip to content

Commit

Permalink
schemas: trigger update on ES and reindexing when schema mapping changed
Browse files Browse the repository at this point in the history
* signals on mappings update
* recreate indices on mappings update (del -> create)
* reindex all records belonging to updated schema
* change ```cap fixtures schemas``` to update schemas in the db (before was
just skipping if schema already existed in the db), triggering ES
changes in reindexing

Signed-off-by: Anna Trzcinska <[email protected]>
  • Loading branch information
annatrz committed Nov 18, 2019
1 parent d2b2a25 commit 43bfca2
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 94 deletions.
7 changes: 5 additions & 2 deletions cap/modules/deposit/ext.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""Initialize extension."""

from __future__ import absolute_import, print_function
from cap.modules.schemas.models import Schema

from invenio_search import current_search

from cap.modules.schemas.models import Schema

from .receivers import handle_deposit_mapping_updated


class CAPDeposit(object):
"""CAPDeposit extension."""

def __init__(self, app=None):
"""Extension initialization."""
if app:
Expand Down
35 changes: 35 additions & 0 deletions cap/modules/deposit/receivers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
#
# This file is part of CERN Analysis Preservation Framework.
# Copyright (C) 2016 CERN.
#
# CERN Analysis Preservation Framework is free software; you can redistribute
# it and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# CERN Analysis Preservation Framework is distributed in the hope that it will
# be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with CERN Analysis Preservation Framework; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.
"""Registered signal handlers for records module."""
from invenio_jsonschemas.proxies import current_jsonschemas

from cap.modules.records.utils import reindex_by_schema_url
from cap.modules.schemas.signals import deposit_mapping_updated


@deposit_mapping_updated.connect
def handle_deposit_mapping_updated(schema):
"""Reindex all the deposits when mapping in ES got updated."""
schema_url = current_jsonschemas.path_to_url(schema.deposit_path)
reindex_by_schema_url(schema_url, 'depid')
3 changes: 2 additions & 1 deletion cap/modules/records/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.

"""Data model package."""

from .receivers import handle_record_mapping_updated
37 changes: 37 additions & 0 deletions cap/modules/records/receivers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
#
# This file is part of CERN Analysis Preservation Framework.
# Copyright (C) 2016 CERN.
#
# CERN Analysis Preservation Framework is free software; you can redistribute
# it and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# CERN Analysis Preservation Framework is distributed in the hope that it will
# be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with CERN Analysis Preservation Framework; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.
"""Registered signal handlers for deposit module."""
from invenio_jsonschemas.proxies import current_jsonschemas

from cap.modules.schemas.signals import record_mapping_updated

from .utils import reindex_by_schema_url


@record_mapping_updated.connect
def handle_record_mapping_updated(schema):
"""Reindex all the record when mapping in ES got updated."""
schema_url = current_jsonschemas.path_to_url(schema.record_path)

reindex_by_schema_url(schema_url, 'recid')
34 changes: 22 additions & 12 deletions cap/modules/records/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@
import string

from flask import url_for
from invenio_db import db
from invenio_indexer.api import RecordIndexer
from invenio_pidstore.errors import PIDDoesNotExistError
from invenio_pidstore.models import PersistentIdentifier
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
from invenio_records.models import RecordMetadata
from six.moves.urllib import parse
from sqlalchemy import cast
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.dialects.sqlite import JSON


def generate_recid(experiment):
Expand Down Expand Up @@ -81,17 +87,21 @@ def _get_json_type():

indexer = RecordIndexer()

ids = (x[0] for x in RecordMetadata.query.filter(
RecordMetadata.json['$schema'] == cast(
schema_url, _get_json_type())).values(RecordMetadata.id))
ids = [
x[0] for x in RecordMetadata.query.filter(
RecordMetadata.json['$schema'] == cast(
schema_url, _get_json_type())).values(RecordMetadata.id)
]

filtered_by_pid_type = (x[0] for x in PersistentIdentifier.query.filter(
PersistentIdentifier.status == PIDStatus.REGISTERED,
PersistentIdentifier.object_type == 'rec', PersistentIdentifier.
pid_type == pid_type, PersistentIdentifier.object_uuid.in_(
ids)).values(PersistentIdentifier.object_uuid))
if ids:
filtered_by_pid_type = (
x[0] for x in PersistentIdentifier.query.filter(
PersistentIdentifier.object_type == 'rec', PersistentIdentifier
.pid_type == pid_type, PersistentIdentifier.status ==
PIDStatus.REGISTERED, PersistentIdentifier.object_uuid.in_(
ids)).values(PersistentIdentifier.object_uuid))

print('{} records will be reindexed...'.format(schema_url))
print('{} records will be reindexed...'.format(schema_url))

indexer.bulk_index(filtered_by_pid_type)
indexer.process_bulk_queue(es_bulk_kwargs={'raise_on_error': True})
indexer.bulk_index(filtered_by_pid_type)
indexer.process_bulk_queue(es_bulk_kwargs={'raise_on_error': True})
13 changes: 7 additions & 6 deletions cap/modules/schemas/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,19 @@ def add_schema_from_fixture(data=None):
with db.session.begin_nested():
with db.session.begin_nested():
try:
schema = Schema.get(name=data['name'],
version=data['version'])
click.secho('{} already exist in the db.'.format(
str(name)))
return
schema = Schema.get(name=name, version=data['version'])
schema.update(**data)
msg, fg = '{} updated.'.format(str(name)), 'green'

except JSONSchemaNotFound:
schema = Schema(**data)
db.session.add(schema)
msg, fg = '{} added.'.format(str(name)), 'green'

if allow_all:
schema.add_read_access_for_all_users()
else:
schema.revoke_access_for_all_users()

except IntegrityError:
click.secho('Error occured during adding {} to the db. \n'.format(
Expand All @@ -89,4 +90,4 @@ def add_schema_from_fixture(data=None):
return

db.session.commit()
click.secho('{} has been added.'.format(str(name)), fg='green')
click.secho(msg, fg=fg)
126 changes: 96 additions & 30 deletions cap/modules/schemas/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
from six.moves.urllib.parse import urljoin
from sqlalchemy import UniqueConstraint, event
from sqlalchemy.orm import validates
from sqlalchemy.orm.base import NO_VALUE
from sqlalchemy.orm.exc import NoResultFound
from werkzeug.utils import import_string

from cap.types import json_type

from .permissions import SchemaAdminAction, SchemaReadAction
from .serializers import resolved_schemas_serializer, schema_serializer
from .signals import deposit_mapping_updated, record_mapping_updated

ES_FORBIDDEN = r' ,"\<*>|?'

Expand Down Expand Up @@ -208,11 +210,30 @@ def add_read_access_for_all_users(self):
"""Give read access to all authenticated users."""
assert self.id

db.session.add(
ActionSystemRoles.allow(SchemaReadAction(self.id),
role=authenticated_user))
try:
ActionSystemRoles.query.filter(
ActionSystemRoles.action == 'schema-object-read',
ActionSystemRoles.argument == str(self.id),
ActionSystemRoles.role_name == 'authenticated_user').one()
except NoResultFound:
db.session.add(
ActionSystemRoles.allow(SchemaReadAction(self.id),
role=authenticated_user))
db.session.flush()

def revoke_access_for_all_users(self):
"""Revoke read access to all authenticated users."""
assert self.id

try:
db.session.delete(
ActionSystemRoles.query.filter(
ActionSystemRoles.action == 'schema-object-read',
ActionSystemRoles.argument == str(self.id),
ActionSystemRoles.role_name == 'authenticated_user').one())
except NoResultFound:
pass

def give_admin_access_for_user(self, user):
"""Give admin access for users."""
assert self.id
Expand Down Expand Up @@ -270,39 +291,55 @@ def name_to_es_name(name):
return name.replace('/', '-')


def create_index(index_name, mapping_body, aliases):
"""Create index in elasticsearch, add under given aliases."""
if not es.indices.exists(index_name):
current_search.mappings[index_name] = {} # invenio search needs it

es.indices.create(index=index_name,
body={'mappings': mapping_body},
ignore=False)

for alias in aliases:
es.indices.update_aliases(
{'actions': [{
'add': {
'index': index_name,
'alias': alias
}
}]})


@event.listens_for(Schema, 'after_insert')
def after_insert_schema(target, value, schema):
"""On schema insert, create corresponding indexes and aliases in ES."""
if schema.is_indexed:
create_index(schema.deposit_index, schema.deposit_mapping,
schema.deposit_aliases)
create_index(schema.record_index, schema.record_mapping,
schema.record_aliases)
_recreate_deposit_mapping_in_ES(schema, schema.deposit_mapping)
_recreate_record_mapping_in_ES(schema, schema.record_mapping)

# invenio search needs it
mappings_imp = current_app.config.get('SEARCH_GET_MAPPINGS_IMP')
current_cache.delete_memoized(import_string(mappings_imp))


@event.listens_for(Schema.deposit_mapping, 'set')
def after_deposit_mapping_updated(target, value, oldvalue, initiator):
"""If deposit mapping field was updated:
* trigger mapping update in ES
* send signal
Skip if:
* triggered on creation of schema (not update)
* schema not indexed in ES
"""
if oldvalue == NO_VALUE or not target.is_indexed:
return

_recreate_deposit_mapping_in_ES(target, value)

if target.use_deposit_as_record:
_recreate_record_mapping_in_ES(target, value)


@event.listens_for(Schema.record_mapping, 'set')
def after_record_mapping_updated(target, value, oldvalue, initiator):
"""If record mapping field was updated:
* trigger mapping update in ES
* send signal
Skip if:
* triggered on creation of schema (not update)
* schema not indexed in ES
* flag use_deposit_as_record, so record mapping changes can be ignored
"""
if oldvalue == NO_VALUE or not target.is_indexed or \
target.use_deposit_as_record:
return

_recreate_record_mapping_in_ES(target, value)


@event.listens_for(Schema, 'after_delete')
def before_delete_schema(mapper, connect, schema):
"""On schema delete, delete corresponding indexes and aliases in ES."""
Expand All @@ -316,7 +353,36 @@ def before_delete_schema(mapper, connect, schema):
current_cache.delete_memoized(import_string(mappings_imp))


@db.event.listens_for(Schema, 'before_update', propagate=True)
def timestamp_before_update(mapper, connection, target):
"""Update `updated` property with current time on `before_update` event."""
target.updated = datetime.utcnow()
def _create_index(index_name, mapping_body, aliases):
"""Create index in elasticsearch, add under given aliases."""
if not es.indices.exists(index_name):
current_search.mappings[index_name] = {} # invenio search needs it

es.indices.create(index=index_name,
body={'mappings': mapping_body},
ignore=False)

for alias in aliases:
es.indices.update_aliases(
{'actions': [{
'add': {
'index': index_name,
'alias': alias
}
}]})


def _recreate_deposit_mapping_in_ES(schema, mapping):
if es.indices.exists(schema.deposit_index):
es.indices.delete(index=schema.deposit_index)

_create_index(schema.deposit_index, mapping, schema.deposit_aliases)
deposit_mapping_updated.send(schema)


def _recreate_record_mapping_in_ES(schema, mapping):
if es.indices.exists(schema.record_index):
es.indices.delete(index=schema.record_index)

_create_index(schema.record_index, mapping, schema.record_aliases)
record_mapping_updated.send(schema)
18 changes: 7 additions & 11 deletions tests/unit/test_views.py → cap/modules/schemas/signals.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of CERN Analysis Preservation Framework.
# Copyright (C) 2018 CERN.
# Copyright (C) 2016, 2017 CERN.
#
# CERN Analysis Preservation Framework is free software; you can redistribute
# it and/or modify it under the terms of the GNU General Public License as
Expand All @@ -21,18 +21,14 @@
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.
# or submit itself to any jurisdiction.
"""Unit tests for Cap general views."""
"""Signals for schemas module."""

from flask import url_for
from __future__ import absolute_import, print_function

from blinker import Namespace

def test_view_ping(app):
with app.test_request_context():
url = url_for('cap.ping')
_signals = Namespace()

with app.test_client() as client:
resp = client.get(url)
deposit_mapping_updated = _signals.signal('deposit_mapping-updated')

assert resp.status_code == 200
assert resp.data == 'Pong!'
record_mapping_updated = _signals.signal('record_mapping-updated')
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,10 @@

packages = find_packages()


# Get the version string. Cannot be done with import!
g = {}
with open(os.path.join('cap', 'version.py'), 'rt') as fp:
exec(fp.read(), g)
exec (fp.read(), g)
version = g['__version__']

setup(
Expand Down
Loading

0 comments on commit 43bfca2

Please sign in to comment.