From 43bfca2507fab31065d4166baae2579b1cdd4b65 Mon Sep 17 00:00:00 2001 From: Anna Trzcinska Date: Mon, 18 Nov 2019 16:44:49 +0100 Subject: [PATCH] schemas: trigger update on ES and reindexing when schema mapping changed * signals on mappings update * recreate indices on mappings update (del -> create) * reindex all records belonging to updated schema * change ```cap fixtures schemas``` to update schemas in the db (before was just skipping if schema already existed in the db), triggering ES changes in reindexing Signed-off-by: Anna Trzcinska --- cap/modules/deposit/ext.py | 7 +- cap/modules/deposit/receivers.py | 35 +++++ cap/modules/records/__init__.py | 3 +- cap/modules/records/receivers.py | 37 +++++ cap/modules/records/utils.py | 34 +++-- cap/modules/schemas/cli.py | 13 +- cap/modules/schemas/models.py | 126 +++++++++++++---- .../modules/schemas/signals.py | 18 +-- setup.py | 3 +- tests/integration/test_schemas_views.py | 133 ++++++++++++++---- 10 files changed, 315 insertions(+), 94 deletions(-) create mode 100644 cap/modules/deposit/receivers.py create mode 100644 cap/modules/records/receivers.py rename tests/unit/test_views.py => cap/modules/schemas/signals.py (74%) diff --git a/cap/modules/deposit/ext.py b/cap/modules/deposit/ext.py index a91b0fe9d3..a1ded4c40c 100644 --- a/cap/modules/deposit/ext.py +++ b/cap/modules/deposit/ext.py @@ -1,13 +1,16 @@ """Initialize extension.""" from __future__ import absolute_import, print_function -from cap.modules.schemas.models import Schema + from invenio_search import current_search +from cap.modules.schemas.models import Schema + +from .receivers import handle_deposit_mapping_updated + class CAPDeposit(object): """CAPDeposit extension.""" - def __init__(self, app=None): """Extension initialization.""" if app: diff --git a/cap/modules/deposit/receivers.py b/cap/modules/deposit/receivers.py new file mode 100644 index 0000000000..162d9d4f2d --- /dev/null +++ b/cap/modules/deposit/receivers.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +# +# This file is part of CERN Analysis Preservation Framework. +# Copyright (C) 2016 CERN. +# +# CERN Analysis Preservation Framework is free software; you can redistribute +# it and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# CERN Analysis Preservation Framework is distributed in the hope that it will +# be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with CERN Analysis Preservation Framework; if not, write to the +# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307, USA. +# +# In applying this license, CERN does not +# waive the privileges and immunities granted to it by virtue of its status +# as an Intergovernmental Organization or submit itself to any jurisdiction. +"""Registered signal handlers for records module.""" +from invenio_jsonschemas.proxies import current_jsonschemas + +from cap.modules.records.utils import reindex_by_schema_url +from cap.modules.schemas.signals import deposit_mapping_updated + + +@deposit_mapping_updated.connect +def handle_deposit_mapping_updated(schema): + """Reindex all the deposits when mapping in ES got updated.""" + schema_url = current_jsonschemas.path_to_url(schema.deposit_path) + reindex_by_schema_url(schema_url, 'depid') diff --git a/cap/modules/records/__init__.py b/cap/modules/records/__init__.py index 92592b8d1a..6268f81c78 100644 --- a/cap/modules/records/__init__.py +++ b/cap/modules/records/__init__.py @@ -21,5 +21,6 @@ # In applying this license, CERN does not # waive the privileges and immunities granted to it by virtue of its status # as an Intergovernmental Organization or submit itself to any jurisdiction. - """Data model package.""" + +from .receivers import handle_record_mapping_updated diff --git a/cap/modules/records/receivers.py b/cap/modules/records/receivers.py new file mode 100644 index 0000000000..8650cd70e7 --- /dev/null +++ b/cap/modules/records/receivers.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +# +# This file is part of CERN Analysis Preservation Framework. +# Copyright (C) 2016 CERN. +# +# CERN Analysis Preservation Framework is free software; you can redistribute +# it and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# CERN Analysis Preservation Framework is distributed in the hope that it will +# be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with CERN Analysis Preservation Framework; if not, write to the +# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307, USA. +# +# In applying this license, CERN does not +# waive the privileges and immunities granted to it by virtue of its status +# as an Intergovernmental Organization or submit itself to any jurisdiction. +"""Registered signal handlers for deposit module.""" +from invenio_jsonschemas.proxies import current_jsonschemas + +from cap.modules.schemas.signals import record_mapping_updated + +from .utils import reindex_by_schema_url + + +@record_mapping_updated.connect +def handle_record_mapping_updated(schema): + """Reindex all the record when mapping in ES got updated.""" + schema_url = current_jsonschemas.path_to_url(schema.record_path) + + reindex_by_schema_url(schema_url, 'recid') diff --git a/cap/modules/records/utils.py b/cap/modules/records/utils.py index 54ace6dc88..9c144fc560 100644 --- a/cap/modules/records/utils.py +++ b/cap/modules/records/utils.py @@ -27,9 +27,15 @@ import string from flask import url_for +from invenio_db import db +from invenio_indexer.api import RecordIndexer from invenio_pidstore.errors import PIDDoesNotExistError -from invenio_pidstore.models import PersistentIdentifier +from invenio_pidstore.models import PersistentIdentifier, PIDStatus +from invenio_records.models import RecordMetadata from six.moves.urllib import parse +from sqlalchemy import cast +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.dialects.sqlite import JSON def generate_recid(experiment): @@ -81,17 +87,21 @@ def _get_json_type(): indexer = RecordIndexer() - ids = (x[0] for x in RecordMetadata.query.filter( - RecordMetadata.json['$schema'] == cast( - schema_url, _get_json_type())).values(RecordMetadata.id)) + ids = [ + x[0] for x in RecordMetadata.query.filter( + RecordMetadata.json['$schema'] == cast( + schema_url, _get_json_type())).values(RecordMetadata.id) + ] - filtered_by_pid_type = (x[0] for x in PersistentIdentifier.query.filter( - PersistentIdentifier.status == PIDStatus.REGISTERED, - PersistentIdentifier.object_type == 'rec', PersistentIdentifier. - pid_type == pid_type, PersistentIdentifier.object_uuid.in_( - ids)).values(PersistentIdentifier.object_uuid)) + if ids: + filtered_by_pid_type = ( + x[0] for x in PersistentIdentifier.query.filter( + PersistentIdentifier.object_type == 'rec', PersistentIdentifier + .pid_type == pid_type, PersistentIdentifier.status == + PIDStatus.REGISTERED, PersistentIdentifier.object_uuid.in_( + ids)).values(PersistentIdentifier.object_uuid)) - print('{} records will be reindexed...'.format(schema_url)) + print('{} records will be reindexed...'.format(schema_url)) - indexer.bulk_index(filtered_by_pid_type) - indexer.process_bulk_queue(es_bulk_kwargs={'raise_on_error': True}) + indexer.bulk_index(filtered_by_pid_type) + indexer.process_bulk_queue(es_bulk_kwargs={'raise_on_error': True}) diff --git a/cap/modules/schemas/cli.py b/cap/modules/schemas/cli.py index a730217455..63304c72f6 100644 --- a/cap/modules/schemas/cli.py +++ b/cap/modules/schemas/cli.py @@ -69,18 +69,19 @@ def add_schema_from_fixture(data=None): with db.session.begin_nested(): with db.session.begin_nested(): try: - schema = Schema.get(name=data['name'], - version=data['version']) - click.secho('{} already exist in the db.'.format( - str(name))) - return + schema = Schema.get(name=name, version=data['version']) + schema.update(**data) + msg, fg = '{} updated.'.format(str(name)), 'green' except JSONSchemaNotFound: schema = Schema(**data) db.session.add(schema) + msg, fg = '{} added.'.format(str(name)), 'green' if allow_all: schema.add_read_access_for_all_users() + else: + schema.revoke_access_for_all_users() except IntegrityError: click.secho('Error occured during adding {} to the db. \n'.format( @@ -89,4 +90,4 @@ def add_schema_from_fixture(data=None): return db.session.commit() - click.secho('{} has been added.'.format(str(name)), fg='green') + click.secho(msg, fg=fg) diff --git a/cap/modules/schemas/models.py b/cap/modules/schemas/models.py index f7bdb8b2ac..cc8d6cb6df 100644 --- a/cap/modules/schemas/models.py +++ b/cap/modules/schemas/models.py @@ -37,6 +37,7 @@ from six.moves.urllib.parse import urljoin from sqlalchemy import UniqueConstraint, event from sqlalchemy.orm import validates +from sqlalchemy.orm.base import NO_VALUE from sqlalchemy.orm.exc import NoResultFound from werkzeug.utils import import_string @@ -44,6 +45,7 @@ from .permissions import SchemaAdminAction, SchemaReadAction from .serializers import resolved_schemas_serializer, schema_serializer +from .signals import deposit_mapping_updated, record_mapping_updated ES_FORBIDDEN = r' ,"\<*>|?' @@ -208,11 +210,30 @@ def add_read_access_for_all_users(self): """Give read access to all authenticated users.""" assert self.id - db.session.add( - ActionSystemRoles.allow(SchemaReadAction(self.id), - role=authenticated_user)) + try: + ActionSystemRoles.query.filter( + ActionSystemRoles.action == 'schema-object-read', + ActionSystemRoles.argument == str(self.id), + ActionSystemRoles.role_name == 'authenticated_user').one() + except NoResultFound: + db.session.add( + ActionSystemRoles.allow(SchemaReadAction(self.id), + role=authenticated_user)) db.session.flush() + def revoke_access_for_all_users(self): + """Revoke read access to all authenticated users.""" + assert self.id + + try: + db.session.delete( + ActionSystemRoles.query.filter( + ActionSystemRoles.action == 'schema-object-read', + ActionSystemRoles.argument == str(self.id), + ActionSystemRoles.role_name == 'authenticated_user').one()) + except NoResultFound: + pass + def give_admin_access_for_user(self, user): """Give admin access for users.""" assert self.id @@ -270,39 +291,55 @@ def name_to_es_name(name): return name.replace('/', '-') -def create_index(index_name, mapping_body, aliases): - """Create index in elasticsearch, add under given aliases.""" - if not es.indices.exists(index_name): - current_search.mappings[index_name] = {} # invenio search needs it - - es.indices.create(index=index_name, - body={'mappings': mapping_body}, - ignore=False) - - for alias in aliases: - es.indices.update_aliases( - {'actions': [{ - 'add': { - 'index': index_name, - 'alias': alias - } - }]}) - - @event.listens_for(Schema, 'after_insert') def after_insert_schema(target, value, schema): """On schema insert, create corresponding indexes and aliases in ES.""" if schema.is_indexed: - create_index(schema.deposit_index, schema.deposit_mapping, - schema.deposit_aliases) - create_index(schema.record_index, schema.record_mapping, - schema.record_aliases) + _recreate_deposit_mapping_in_ES(schema, schema.deposit_mapping) + _recreate_record_mapping_in_ES(schema, schema.record_mapping) # invenio search needs it mappings_imp = current_app.config.get('SEARCH_GET_MAPPINGS_IMP') current_cache.delete_memoized(import_string(mappings_imp)) +@event.listens_for(Schema.deposit_mapping, 'set') +def after_deposit_mapping_updated(target, value, oldvalue, initiator): + """If deposit mapping field was updated: + * trigger mapping update in ES + * send signal + + Skip if: + * triggered on creation of schema (not update) + * schema not indexed in ES + """ + if oldvalue == NO_VALUE or not target.is_indexed: + return + + _recreate_deposit_mapping_in_ES(target, value) + + if target.use_deposit_as_record: + _recreate_record_mapping_in_ES(target, value) + + +@event.listens_for(Schema.record_mapping, 'set') +def after_record_mapping_updated(target, value, oldvalue, initiator): + """If record mapping field was updated: + * trigger mapping update in ES + * send signal + + Skip if: + * triggered on creation of schema (not update) + * schema not indexed in ES + * flag use_deposit_as_record, so record mapping changes can be ignored + """ + if oldvalue == NO_VALUE or not target.is_indexed or \ + target.use_deposit_as_record: + return + + _recreate_record_mapping_in_ES(target, value) + + @event.listens_for(Schema, 'after_delete') def before_delete_schema(mapper, connect, schema): """On schema delete, delete corresponding indexes and aliases in ES.""" @@ -316,7 +353,36 @@ def before_delete_schema(mapper, connect, schema): current_cache.delete_memoized(import_string(mappings_imp)) -@db.event.listens_for(Schema, 'before_update', propagate=True) -def timestamp_before_update(mapper, connection, target): - """Update `updated` property with current time on `before_update` event.""" - target.updated = datetime.utcnow() +def _create_index(index_name, mapping_body, aliases): + """Create index in elasticsearch, add under given aliases.""" + if not es.indices.exists(index_name): + current_search.mappings[index_name] = {} # invenio search needs it + + es.indices.create(index=index_name, + body={'mappings': mapping_body}, + ignore=False) + + for alias in aliases: + es.indices.update_aliases( + {'actions': [{ + 'add': { + 'index': index_name, + 'alias': alias + } + }]}) + + +def _recreate_deposit_mapping_in_ES(schema, mapping): + if es.indices.exists(schema.deposit_index): + es.indices.delete(index=schema.deposit_index) + + _create_index(schema.deposit_index, mapping, schema.deposit_aliases) + deposit_mapping_updated.send(schema) + + +def _recreate_record_mapping_in_ES(schema, mapping): + if es.indices.exists(schema.record_index): + es.indices.delete(index=schema.record_index) + + _create_index(schema.record_index, mapping, schema.record_aliases) + record_mapping_updated.send(schema) diff --git a/tests/unit/test_views.py b/cap/modules/schemas/signals.py similarity index 74% rename from tests/unit/test_views.py rename to cap/modules/schemas/signals.py index af373f7617..6e41ebe374 100644 --- a/tests/unit/test_views.py +++ b/cap/modules/schemas/signals.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of CERN Analysis Preservation Framework. -# Copyright (C) 2018 CERN. +# Copyright (C) 2016, 2017 CERN. # # CERN Analysis Preservation Framework is free software; you can redistribute # it and/or modify it under the terms of the GNU General Public License as @@ -21,18 +21,14 @@ # In applying this license, CERN does not # waive the privileges and immunities granted to it by virtue of its status # as an Intergovernmental Organization or submit itself to any jurisdiction. -# or submit itself to any jurisdiction. -"""Unit tests for Cap general views.""" +"""Signals for schemas module.""" -from flask import url_for +from __future__ import absolute_import, print_function +from blinker import Namespace -def test_view_ping(app): - with app.test_request_context(): - url = url_for('cap.ping') +_signals = Namespace() - with app.test_client() as client: - resp = client.get(url) +deposit_mapping_updated = _signals.signal('deposit_mapping-updated') - assert resp.status_code == 200 - assert resp.data == 'Pong!' +record_mapping_updated = _signals.signal('record_mapping-updated') diff --git a/setup.py b/setup.py index 1c82db6f1e..0cc7872691 100644 --- a/setup.py +++ b/setup.py @@ -103,11 +103,10 @@ packages = find_packages() - # Get the version string. Cannot be done with import! g = {} with open(os.path.join('cap', 'version.py'), 'rt') as fp: - exec(fp.read(), g) + exec (fp.read(), g) version = g['__version__'] setup( diff --git a/tests/integration/test_schemas_views.py b/tests/integration/test_schemas_views.py index 62eb52a910..8642173c8c 100644 --- a/tests/integration/test_schemas_views.py +++ b/tests/integration/test_schemas_views.py @@ -25,6 +25,8 @@ """Unit tests for schemas views.""" import json +from pytest import mark + from cap.modules.schemas.models import Schema ######################## @@ -336,12 +338,9 @@ def test_get_resolved_schemas(client, db, users, create_schema, 'record_mapping': {}, 'record_options': {}, 'links': { - 'self': - 'http://analysispreservation.cern.ch/api/jsonschemas/test-analysis/1.0.0', - 'deposit': - 'http://analysispreservation.cern.ch/api/schemas/deposits/records/test-analysis-v1.0.0.json', - 'record': - 'http://analysispreservation.cern.ch/api/schemas/records/test-analysis-v1.0.0.json', + 'self': 'http://analysispreservation.cern.ch/api/jsonschemas/test-analysis/1.0.0', + 'deposit': 'http://analysispreservation.cern.ch/api/schemas/deposits/records/test-analysis-v1.0.0.json', + 'record': 'http://analysispreservation.cern.ch/api/schemas/records/test-analysis-v1.0.0.json', } } @@ -375,21 +374,15 @@ def test_get_only_latest_version_of_schemas(client, db, users, 'deposit_schema': {}, 'record_mapping': {}, 'links': { - 'record': - 'http://analysispreservation.cern.ch/api/schemas/records/schema1-v1.2.3.json', - 'self': - 'http://analysispreservation.cern.ch/api/jsonschemas/schema1/1.2.3', - 'deposit': - 'http://analysispreservation.cern.ch/api/schemas/deposits/records/schema1-v1.2.3.json' + 'record': 'http://analysispreservation.cern.ch/api/schemas/records/schema1-v1.2.3.json', + 'self': 'http://analysispreservation.cern.ch/api/jsonschemas/schema1/1.2.3', + 'deposit': 'http://analysispreservation.cern.ch/api/schemas/deposits/records/schema1-v1.2.3.json' }, }, { 'links': { - 'record': - 'http://analysispreservation.cern.ch/api/schemas/records/schema2-v3.0.0.json', - 'self': - 'http://analysispreservation.cern.ch/api/jsonschemas/schema2/3.0.0', - 'deposit': - 'http://analysispreservation.cern.ch/api/schemas/deposits/records/schema2-v3.0.0.json' + 'record': 'http://analysispreservation.cern.ch/api/schemas/records/schema2-v3.0.0.json', + 'self': 'http://analysispreservation.cern.ch/api/jsonschemas/schema2/3.0.0', + 'deposit': 'http://analysispreservation.cern.ch/api/schemas/deposits/records/schema2-v3.0.0.json' }, 'deposit_options': {}, 'record_schema': {}, @@ -622,20 +615,62 @@ def test_put(client, db, auth_headers_for_user, users, json_headers): 'title': 'deposit_options' }, 'record_schema': {}, - 'record_options': - { # same as deposit_options because use_deposit_as_record == True - 'title': 'deposit_options' - }, - 'record_mapping': - { # same as deposit_mapping because use_deposit_as_record == True - 'doc': { - 'properties': { - "keyword": { - "type": "keyword" - } + 'record_options': { # same as deposit_options because use_deposit_as_record == True + 'title': 'deposit_options' + }, + 'record_mapping': { # same as deposit_mapping because use_deposit_as_record == True + 'doc': { + 'properties': { + "keyword": { + "type": "keyword" + } + } + } + }, + 'deposit_mapping': { + 'doc': { + 'properties': { + "keyword": { + "type": "keyword" + } + } + } + }, + 'links': { + 'self': u'http://analysispreservation.cern.ch/api/jsonschemas/cms-schema/1.2.3', + 'deposit': 'http://analysispreservation.cern.ch/api/schemas/deposits/records/cms-schema-v1.2.3.json', + 'record': u'http://analysispreservation.cern.ch/api/schemas/records/cms-schema-v1.2.3.json', + # 'versions': u'http://analysispreservation.cern.ch/api/jsonschemas/cms-schema/versions' + } + } + + resp = client.get('/jsonschemas/cms-schema/1.2.3', + headers=json_headers + auth_headers_for_user(owner)) + + assert resp.status_code == 200 + assert resp.json == { + 'name': 'cms-schema', + 'version': '1.2.3', + 'fullname': 'New fullname', + 'is_indexed': False, + 'use_deposit_as_record': True, + 'deposit_schema': {}, + 'deposit_options': { + 'title': 'deposit_options' + }, + 'record_schema': {}, + 'record_options': { # same as deposit_options because use_deposit_as_record == True + 'title': 'deposit_options' + }, + 'record_mapping': { # same as deposit_mapping because use_deposit_as_record == True + 'doc': { + 'properties': { + "keyword": { + "type": "keyword" } } - }, + } + }, 'deposit_mapping': { 'doc': { 'properties': { @@ -695,6 +730,44 @@ def test_put_when_not_an_schema_owner_returns_403(client, db, assert resp.status_code == 403 +@mark.skip +def test_put_on_mappings_change_triggers_change_in_ES_and_reindexing( + client, db, auth_headers_for_superuser, json_headers, superuser): + schema = json.dumps(dict(name='cms-schema', version='1.0.0')) + new_schema = dict( + name='cms-schema', + version='1.0.0', + record_mapping={'doc': { + 'properties': { + "title": { + "type": "text" + } + } + }}, + deposit_mapping={ + 'doc': { + 'properties': { + "keyword": { + "type": "keyword" + } + } + } + }, + is_indexed=True) + + resp = client.post('/jsonschemas/', + data=schema, + headers=json_headers + auth_headers_for_superuser) + + assert resp.status_code == 200 + + resp = client.put('/jsonschemas/cms-schema/1.0.0', + data=json.dumps(new_schema), + headers=json_headers + auth_headers_for_superuser) + + assert resp.status_code == 200 + + ##################################### # api/jsonschemas/{id}/{version} [DELETE] #####################################