From 3ccacfc0e57e9d369b1d0c266f884e004d92cd7e Mon Sep 17 00:00:00 2001 From: Noah Paige <69586985+noah-paige@users.noreply.github.com> Date: Mon, 1 Jul 2024 13:56:58 -0400 Subject: [PATCH] Add delete docs not found when re indexing in catalog task (#1365) ### Feature or Bugfix - Feature ### Detail - Add logic to Catalog Indexer Task to Delete Docs No Longer in RDS - TODO: Add Ability to Re-index Catalog Items via Dataall Admin UI ### Relates - https://github.com/data-dot-all/dataall/issues/1078 ### Security Please answer the questions below briefly where applicable, or write `N/A`. Based on [OWASP 10](https://owasp.org/Top10/en/). - Does this PR introduce or modify any input fields or queries - this includes fetching data from storage outside the application (e.g. a database, an S3 bucket)? - Is the input sanitized? - What precautions are you taking before deserializing the data you consume? - Is injection prevented by parametrizing queries? - Have you ensured no `eval` or similar functions are used? - Does this PR introduce any functionality or component that requires authorization? - How have you ensured it respects the existing AuthN/AuthZ mechanisms? - Are you logging failed auth attempts? - Are you using or adding any cryptographic features? - Do you use a standard proven implementations? - Are the used keys controlled by the customer? Where are they stored? - Are you introducing any new policies/roles/users? - Have you used the least-privilege principle? How? By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. --- backend/dataall/modules/catalog/__init__.py | 11 ++ .../dataall/modules/catalog/api/mutations.py | 8 + .../dataall/modules/catalog/api/resolvers.py | 5 + .../modules/catalog/handlers/__init__.py | 3 + .../catalog/handlers/ecs_catalog_handlers.py | 27 +++ .../modules/catalog/indexers/base_indexer.py | 11 ++ .../catalog/indexers/catalog_indexer.py | 2 +- .../catalog/services/catalog_service.py | 28 +++ .../catalog/tasks/catalog_indexer_task.py | 51 +++-- .../dataall/modules/dashboards/__init__.py | 18 ++ .../indexers/dashboard_catalog_indexer.py | 11 +- .../dataall/modules/s3_datasets/__init__.py | 3 +- .../indexers/dataset_catalog_indexer.py | 16 +- .../s3_datasets/indexers/dataset_indexer.py | 13 +- .../s3_datasets/indexers/location_indexer.py | 17 +- .../s3_datasets/indexers/table_indexer.py | 14 +- .../services/dataset_location_service.py | 3 + .../services/dataset_table_service.py | 3 + .../s3_datasets/tasks/tables_syncer.py | 2 + backend/local_graphql_server.py | 2 +- deploy/stacks/container.py | 17 +- .../components/MaintenanceViewer.js | 178 +++++++++++++++++- .../src/modules/Maintenance/services/index.js | 1 + .../services/startReindexCatalog.js | 10 + .../tasks/test_dataset_catalog_indexer.py | 28 ++- 25 files changed, 432 insertions(+), 50 deletions(-) create mode 100644 backend/dataall/modules/catalog/handlers/__init__.py create mode 100644 backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py create mode 100644 backend/dataall/modules/catalog/services/catalog_service.py create mode 100644 frontend/src/modules/Maintenance/services/startReindexCatalog.js diff --git a/backend/dataall/modules/catalog/__init__.py b/backend/dataall/modules/catalog/__init__.py index c02f06803..95e76f0a6 100644 --- a/backend/dataall/modules/catalog/__init__.py +++ b/backend/dataall/modules/catalog/__init__.py @@ -16,6 +16,17 @@ def __init__(self): from dataall.modules.catalog import tasks +class CatalogAsyncHandlersModuleInterface(ModuleInterface): + """Implements ModuleInterface for catalog async lambda""" + + @staticmethod + def is_supported(modes: Set[ImportMode]): + return ImportMode.HANDLERS in modes + + def __init__(self): + import dataall.modules.catalog.handlers + + class CatalogApiModuleInterface(ModuleInterface): """ Implements ModuleInterface for catalog code in GraphQl lambda. diff --git a/backend/dataall/modules/catalog/api/mutations.py b/backend/dataall/modules/catalog/api/mutations.py index f16188f53..4df0f2fdf 100644 --- a/backend/dataall/modules/catalog/api/mutations.py +++ b/backend/dataall/modules/catalog/api/mutations.py @@ -7,6 +7,7 @@ create_term, approve_term_association, dismiss_term_association, + start_reindex_catalog, ) @@ -107,3 +108,10 @@ resolver=dismiss_term_association, args=[gql.Argument(name='linkUri', type=gql.NonNullableType(gql.String))], ) + +startReindexCatalog = gql.MutationField( + name='startReindexCatalog', + args=[gql.Argument(name='handleDeletes', type=gql.NonNullableType(gql.Boolean))], + type=gql.Boolean, + resolver=start_reindex_catalog, +) diff --git a/backend/dataall/modules/catalog/api/resolvers.py b/backend/dataall/modules/catalog/api/resolvers.py index 3d8052786..40ccec24d 100644 --- a/backend/dataall/modules/catalog/api/resolvers.py +++ b/backend/dataall/modules/catalog/api/resolvers.py @@ -1,5 +1,6 @@ from dataall.modules.catalog.api.enums import GlossaryRole from dataall.modules.catalog.services.glossaries_service import GlossariesService +from dataall.modules.catalog.services.catalog_service import CatalogService from dataall.base.api.context import Context from dataall.modules.catalog.db.glossary_models import TermLink, GlossaryNode from dataall.base.db import exceptions @@ -157,3 +158,7 @@ def search_glossary(context: Context, source, filter: dict = None): if not filter: filter = {} return GlossariesService.search_glossary_terms(data=filter) + + +def start_reindex_catalog(context: Context, source, handleDeletes: bool): + return CatalogService.start_reindex_catalog(with_deletes=handleDeletes) diff --git a/backend/dataall/modules/catalog/handlers/__init__.py b/backend/dataall/modules/catalog/handlers/__init__.py new file mode 100644 index 000000000..a1f6ee221 --- /dev/null +++ b/backend/dataall/modules/catalog/handlers/__init__.py @@ -0,0 +1,3 @@ +from dataall.modules.catalog.handlers import ecs_catalog_handlers + +__all__ = ['ecs_catalog_handlers'] diff --git a/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py b/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py new file mode 100644 index 000000000..430a2d409 --- /dev/null +++ b/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py @@ -0,0 +1,27 @@ +import logging +import os + +from dataall.core.tasks.service_handlers import Worker +from dataall.core.stacks.aws.ecs import Ecs +from dataall.core.tasks.db.task_models import Task +from dataall.modules.catalog.tasks.catalog_indexer_task import CatalogIndexerTask + +log = logging.getLogger(__name__) + + +class EcsCatalogIndexHandler: + @staticmethod + @Worker.handler(path='ecs.reindex.catalog') + def run_ecs_reindex_catalog_task(engine, task: Task): + envname = os.environ.get('envname', 'local') + if envname in ['local', 'dkrcompose']: + CatalogIndexerTask.index_objects(engine, str(task.payload.get('with_deletes', False))) + else: + ecs_task_arn = Ecs.run_ecs_task( + task_definition_param='ecs/task_def_arn/catalog_indexer', + container_name_param='ecs/container/catalog_indexer', + context=[ + {'name': 'with_deletes', 'value': str(task.payload.get('with_deletes', False))}, + ], + ) + return {'task_arn': ecs_task_arn} diff --git a/backend/dataall/modules/catalog/indexers/base_indexer.py b/backend/dataall/modules/catalog/indexers/base_indexer.py index 05a51d9ad..98ce693e7 100644 --- a/backend/dataall/modules/catalog/indexers/base_indexer.py +++ b/backend/dataall/modules/catalog/indexers/base_indexer.py @@ -52,6 +52,17 @@ def _index(cls, doc_id, doc): log.error(f'ES config is missing doc {doc} for id {doc_id} was not indexed') return False + @classmethod + def search(cls, query): + es = cls.es() + if es: + res = es.search(index=cls._INDEX, body=query) + log.info(f'Search query {query} returned {res["hits"]["total"]["value"]} records') + return res + else: + log.error(f'ES config is missing, search query {query} failed') + return {} + @staticmethod def _get_target_glossary_terms(session, target_uri): q = ( diff --git a/backend/dataall/modules/catalog/indexers/catalog_indexer.py b/backend/dataall/modules/catalog/indexers/catalog_indexer.py index 2f27c7df8..035d46b6f 100644 --- a/backend/dataall/modules/catalog/indexers/catalog_indexer.py +++ b/backend/dataall/modules/catalog/indexers/catalog_indexer.py @@ -12,5 +12,5 @@ def __init__(self): def all(): return CatalogIndexer._INDEXERS - def index(self, session) -> int: + def index(self, session) -> List[str]: raise NotImplementedError('index is not implemented') diff --git a/backend/dataall/modules/catalog/services/catalog_service.py b/backend/dataall/modules/catalog/services/catalog_service.py new file mode 100644 index 000000000..60a94323d --- /dev/null +++ b/backend/dataall/modules/catalog/services/catalog_service.py @@ -0,0 +1,28 @@ +import logging + +from dataall.base.context import get_context + +from dataall.core.permissions.services.tenant_policy_service import TenantPolicyValidationService +from dataall.core.tasks.db.task_models import Task +from dataall.core.tasks.service_handlers import Worker + + +logger = logging.getLogger(__name__) + + +class CatalogService: + @staticmethod + def start_reindex_catalog(with_deletes: bool) -> bool: + context = get_context() + groups = context.groups if context.groups is not None else [] + if not TenantPolicyValidationService.is_tenant_admin(groups): + raise Exception('Only data.all admin group members can start re-index catalog task') + + with context.db_engine.scoped_session() as session: + reindex_catalog_task: Task = Task( + action='ecs.reindex.catalog', targetUri='ALL', payload={'with_deletes': with_deletes} + ) + session.add(reindex_catalog_task) + + Worker.queue(engine=context.db_engine, task_ids=[reindex_catalog_task.taskUri]) + return True diff --git a/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py b/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py index 570c8b6c3..7fd628465 100644 --- a/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py +++ b/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py @@ -1,8 +1,10 @@ import logging import os import sys +from typing import List from dataall.modules.catalog.indexers.catalog_indexer import CatalogIndexer +from dataall.modules.catalog.indexers.base_indexer import BaseIndexer from dataall.base.db import get_engine from dataall.base.loader import load_modules, ImportMode from dataall.base.utils.alarm_service import AlarmService @@ -14,23 +16,44 @@ log = logging.getLogger(__name__) -def index_objects(engine): - try: - indexed_objects_counter = 0 - with engine.scoped_session() as session: - for indexer in CatalogIndexer.all(): - indexed_objects_counter += indexer.index(session) +class CatalogIndexerTask: + """ + This class is responsible for indexing objects in the catalog. + """ - log.info(f'Successfully indexed {indexed_objects_counter} objects') - return indexed_objects_counter - except Exception as e: - AlarmService().trigger_catalog_indexing_failure_alarm(error=str(e)) - raise e + @classmethod + def index_objects(cls, engine, with_deletes='False'): + try: + indexed_object_uris = [] + with engine.scoped_session() as session: + for indexer in CatalogIndexer.all(): + indexed_object_uris += indexer.index(session) + + log.info(f'Successfully indexed {len(indexed_object_uris)} objects') + + if with_deletes == 'True': + CatalogIndexerTask._delete_old_objects(indexed_object_uris) + return len(indexed_object_uris) + except Exception as e: + AlarmService().trigger_catalog_indexing_failure_alarm(error=str(e)) + raise e + + @classmethod + def _delete_old_objects(cls, indexed_object_uris: List[str]) -> None: + # Search for documents in opensearch without an ID in the indexed_object_uris list + query = {'query': {'bool': {'must_not': {'terms': {'_id': indexed_object_uris}}}}} + # Delete All "Outdated" Objects from Index + docs = BaseIndexer.search(query) + for doc in docs.get('hits', {}).get('hits', []): + log.info(f'Deleting document {doc["_id"]}...') + BaseIndexer.delete_doc(doc_id=doc['_id']) + + log.info(f'Deleted {len(docs.get("hits", {}).get("hits", []))} records') if __name__ == '__main__': + load_modules({ImportMode.CATALOG_INDEXER_TASK}) ENVNAME = os.environ.get('envname', 'local') ENGINE = get_engine(envname=ENVNAME) - - load_modules({ImportMode.CATALOG_INDEXER_TASK}) - index_objects(engine=ENGINE) + with_deletes = os.environ.get('with_deletes', 'False') + CatalogIndexerTask.index_objects(engine=ENGINE, with_deletes=with_deletes) diff --git a/backend/dataall/modules/dashboards/__init__.py b/backend/dataall/modules/dashboards/__init__.py index a47dd7f26..ffbc8e92d 100644 --- a/backend/dataall/modules/dashboards/__init__.py +++ b/backend/dataall/modules/dashboards/__init__.py @@ -75,3 +75,21 @@ def __init__(self): DashboardCatalogIndexer() log.info('Dashboard catalog indexer task has been loaded') + + +class DashboardAsyncHandlersModuleInterface(ModuleInterface): + """Implements ModuleInterface for dashboard async lambda""" + + @staticmethod + def is_supported(modes: Set[ImportMode]): + return ImportMode.HANDLERS in modes + + @staticmethod + def depends_on() -> List[Type['ModuleInterface']]: + from dataall.modules.catalog import CatalogAsyncHandlersModuleInterface + + return [CatalogAsyncHandlersModuleInterface] + + def __init__(self): + pass + log.info('S3 Dataset handlers have been imported') diff --git a/backend/dataall/modules/dashboards/indexers/dashboard_catalog_indexer.py b/backend/dataall/modules/dashboards/indexers/dashboard_catalog_indexer.py index 7cc0884fc..474d6dc5d 100644 --- a/backend/dataall/modules/dashboards/indexers/dashboard_catalog_indexer.py +++ b/backend/dataall/modules/dashboards/indexers/dashboard_catalog_indexer.py @@ -1,5 +1,7 @@ import logging +from typing import List + from dataall.modules.catalog.indexers.catalog_indexer import CatalogIndexer from dataall.modules.dashboards.db.dashboard_models import Dashboard from dataall.modules.dashboards.indexers.dashboard_indexer import DashboardIndexer @@ -8,11 +10,14 @@ class DashboardCatalogIndexer(CatalogIndexer): - def index(self, session) -> int: - all_dashboards: [Dashboard] = session.query(Dashboard).all() + def index(self, session) -> List[str]: + all_dashboards: List[Dashboard] = session.query(Dashboard).all() + all_dashboard_uris = [] + log.info(f'Found {len(all_dashboards)} dashboards') dashboard: Dashboard for dashboard in all_dashboards: + all_dashboard_uris.append(dashboard.dashboardUri) DashboardIndexer.upsert(session=session, dashboard_uri=dashboard.dashboardUri) - return len(all_dashboards) + return all_dashboard_uris diff --git a/backend/dataall/modules/s3_datasets/__init__.py b/backend/dataall/modules/s3_datasets/__init__.py index c828e7993..f0b73a6d0 100644 --- a/backend/dataall/modules/s3_datasets/__init__.py +++ b/backend/dataall/modules/s3_datasets/__init__.py @@ -90,8 +90,9 @@ def is_supported(modes: Set[ImportMode]): @staticmethod def depends_on() -> List[Type['ModuleInterface']]: from dataall.modules.datasets_base import DatasetBaseModuleInterface + from dataall.modules.catalog import CatalogAsyncHandlersModuleInterface - return [DatasetBaseModuleInterface] + return [DatasetBaseModuleInterface, CatalogAsyncHandlersModuleInterface] def __init__(self): import dataall.modules.s3_datasets.handlers diff --git a/backend/dataall/modules/s3_datasets/indexers/dataset_catalog_indexer.py b/backend/dataall/modules/s3_datasets/indexers/dataset_catalog_indexer.py index 7140d64f1..3e4853bca 100644 --- a/backend/dataall/modules/s3_datasets/indexers/dataset_catalog_indexer.py +++ b/backend/dataall/modules/s3_datasets/indexers/dataset_catalog_indexer.py @@ -1,5 +1,6 @@ import logging +from typing import List from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer from dataall.modules.s3_datasets.indexers.location_indexer import DatasetLocationIndexer from dataall.modules.s3_datasets.indexers.table_indexer import DatasetTableIndexer @@ -16,13 +17,18 @@ class DatasetCatalogIndexer(CatalogIndexer): Register automatically itself when CatalogIndexer instance is created """ - def index(self, session) -> int: - all_datasets: [S3Dataset] = DatasetRepository.list_all_active_datasets(session) + def index(self, session) -> List[str]: + all_datasets: List[S3Dataset] = DatasetRepository.list_all_active_datasets(session) + all_dataset_uris = [] log.info(f'Found {len(all_datasets)} datasets') - indexed = 0 for dataset in all_datasets: tables = DatasetTableIndexer.upsert_all(session, dataset.datasetUri) + all_dataset_uris += [table.tableUri for table in tables] + folders = DatasetLocationIndexer.upsert_all(session, dataset_uri=dataset.datasetUri) + all_dataset_uris += [folder.locationUri for folder in folders] + DatasetIndexer.upsert(session=session, dataset_uri=dataset.datasetUri) - indexed += len(tables) + len(folders) + 1 - return indexed + all_dataset_uris.append(dataset.datasetUri) + + return all_dataset_uris diff --git a/backend/dataall/modules/s3_datasets/indexers/dataset_indexer.py b/backend/dataall/modules/s3_datasets/indexers/dataset_indexer.py index e245bcea1..2d2a8c5d5 100644 --- a/backend/dataall/modules/s3_datasets/indexers/dataset_indexer.py +++ b/backend/dataall/modules/s3_datasets/indexers/dataset_indexer.py @@ -14,14 +14,15 @@ class DatasetIndexer(BaseIndexer): @classmethod def upsert(cls, session, dataset_uri: str): dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri) - env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) - org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) - - count_tables = DatasetRepository.count_dataset_tables(session, dataset_uri) - count_folders = DatasetLocationRepository.count_dataset_locations(session, dataset_uri) - count_upvotes = VoteRepository.count_upvotes(session, dataset_uri, target_type='dataset') if dataset: + env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) + org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) + + count_tables = DatasetRepository.count_dataset_tables(session, dataset_uri) + count_folders = DatasetLocationRepository.count_dataset_locations(session, dataset_uri) + count_upvotes = VoteRepository.count_upvotes(session, dataset_uri, target_type='dataset') + glossary = BaseIndexer._get_target_glossary_terms(session, dataset_uri) BaseIndexer._index( doc_id=dataset_uri, diff --git a/backend/dataall/modules/s3_datasets/indexers/location_indexer.py b/backend/dataall/modules/s3_datasets/indexers/location_indexer.py index b5216dacb..82170e8f9 100644 --- a/backend/dataall/modules/s3_datasets/indexers/location_indexer.py +++ b/backend/dataall/modules/s3_datasets/indexers/location_indexer.py @@ -6,19 +6,18 @@ from dataall.core.organizations.db.organization_repositories import OrganizationRepository from dataall.modules.s3_datasets.db.dataset_location_repositories import DatasetLocationRepository from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository -from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer from dataall.modules.catalog.indexers.base_indexer import BaseIndexer class DatasetLocationIndexer(BaseIndexer): @classmethod - def upsert(cls, session, folder_uri: str): + def upsert(cls, session, folder_uri: str, dataset=None, env=None, org=None): folder = DatasetLocationRepository.get_location_by_uri(session, folder_uri) if folder: - dataset = DatasetRepository.get_dataset_by_uri(session, folder.datasetUri) - env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) - org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) + dataset = DatasetRepository.get_dataset_by_uri(session, folder.datasetUri) if not dataset else dataset + env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) if not env else env + org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) if not org else org glossary = BaseIndexer._get_target_glossary_terms(session, folder_uri) BaseIndexer._index( @@ -46,12 +45,16 @@ def upsert(cls, session, folder_uri: str): 'glossary': glossary, }, ) - DatasetIndexer.upsert(session=session, dataset_uri=folder.datasetUri) return folder @classmethod def upsert_all(cls, session, dataset_uri: str): folders = DatasetLocationRepository.get_dataset_folders(session, dataset_uri) + dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri) + env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) + org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) for folder in folders: - DatasetLocationIndexer.upsert(session=session, folder_uri=folder.locationUri) + DatasetLocationIndexer.upsert( + session=session, folder_uri=folder.locationUri, dataset=dataset, env=env, org=org + ) return folders diff --git a/backend/dataall/modules/s3_datasets/indexers/table_indexer.py b/backend/dataall/modules/s3_datasets/indexers/table_indexer.py index 15b6320c1..cf24c4852 100644 --- a/backend/dataall/modules/s3_datasets/indexers/table_indexer.py +++ b/backend/dataall/modules/s3_datasets/indexers/table_indexer.py @@ -12,13 +12,13 @@ class DatasetTableIndexer(BaseIndexer): @classmethod - def upsert(cls, session, table_uri: str): + def upsert(cls, session, table_uri: str, dataset=None, env=None, org=None): table = DatasetTableRepository.get_dataset_table_by_uri(session, table_uri) if table: - dataset = DatasetRepository.get_dataset_by_uri(session, table.datasetUri) - env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) - org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) + dataset = DatasetRepository.get_dataset_by_uri(session, table.datasetUri) if not dataset else dataset + env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) if not env else env + org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) if not org else org glossary = BaseIndexer._get_target_glossary_terms(session, table_uri) tags = table.tags if table.tags else [] @@ -48,14 +48,16 @@ def upsert(cls, session, table_uri: str): 'glossary': glossary, }, ) - DatasetIndexer.upsert(session=session, dataset_uri=table.datasetUri) return table @classmethod def upsert_all(cls, session, dataset_uri: str): tables = DatasetTableRepository.find_all_active_tables(session, dataset_uri) + dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri) + env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri) + org = OrganizationRepository.get_organization_by_uri(session, dataset.organizationUri) for table in tables: - DatasetTableIndexer.upsert(session=session, table_uri=table.tableUri) + DatasetTableIndexer.upsert(session=session, table_uri=table.tableUri, dataset=dataset, env=env, org=org) return tables @classmethod diff --git a/backend/dataall/modules/s3_datasets/services/dataset_location_service.py b/backend/dataall/modules/s3_datasets/services/dataset_location_service.py index 9da13bf61..a4ac2b33f 100644 --- a/backend/dataall/modules/s3_datasets/services/dataset_location_service.py +++ b/backend/dataall/modules/s3_datasets/services/dataset_location_service.py @@ -1,3 +1,4 @@ +from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer from dataall.base.context import get_context from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService from dataall.core.permissions.services.tenant_policy_service import TenantPolicyService @@ -48,6 +49,7 @@ def create_storage_location(uri: str, data: dict): S3LocationClient(location, dataset).create_bucket_prefix() DatasetLocationIndexer.upsert(session=session, folder_uri=location.locationUri) + DatasetIndexer.upsert(session, dataset.datasetUri) return location @staticmethod @@ -77,6 +79,7 @@ def update_storage_location(uri: str, data: dict): DatasetLocationService._create_glossary_links(session, location, data['terms']) DatasetLocationIndexer.upsert(session, folder_uri=location.locationUri) + DatasetIndexer.upsert(session, location.datasetUri) return location diff --git a/backend/dataall/modules/s3_datasets/services/dataset_table_service.py b/backend/dataall/modules/s3_datasets/services/dataset_table_service.py index 50b63c61e..804156912 100644 --- a/backend/dataall/modules/s3_datasets/services/dataset_table_service.py +++ b/backend/dataall/modules/s3_datasets/services/dataset_table_service.py @@ -9,6 +9,7 @@ from dataall.modules.s3_datasets.aws.glue_dataset_client import DatasetCrawler from dataall.modules.s3_datasets.db.dataset_table_repositories import DatasetTableRepository from dataall.modules.s3_datasets.indexers.table_indexer import DatasetTableIndexer +from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer from dataall.modules.s3_datasets.services.dataset_permissions import ( UPDATE_DATASET_TABLE, MANAGE_DATASETS, @@ -58,6 +59,7 @@ def update_table(uri: str, table_data: dict = None): ) DatasetTableIndexer.upsert(session, table_uri=table.tableUri) + DatasetIndexer.upsert(session=session, dataset_uri=table.datasetUri) return table @staticmethod @@ -115,6 +117,7 @@ def sync_tables_for_dataset(cls, uri): cls.sync_existing_tables(session, uri=dataset.datasetUri, glue_tables=tables) DatasetTableIndexer.upsert_all(session=session, dataset_uri=dataset.datasetUri) DatasetTableIndexer.remove_all_deleted(session=session, dataset_uri=dataset.datasetUri) + DatasetIndexer.upsert(session=session, dataset_uri=dataset.datasetUri) return DatasetRepository.paginated_dataset_tables( session=session, uri=uri, diff --git a/backend/dataall/modules/s3_datasets/tasks/tables_syncer.py b/backend/dataall/modules/s3_datasets/tasks/tables_syncer.py index 0fbf52709..e530a9e6a 100644 --- a/backend/dataall/modules/s3_datasets/tasks/tables_syncer.py +++ b/backend/dataall/modules/s3_datasets/tasks/tables_syncer.py @@ -13,6 +13,7 @@ from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, S3Dataset from dataall.modules.s3_datasets.indexers.table_indexer import DatasetTableIndexer +from dataall.modules.s3_datasets.indexers.dataset_indexer import DatasetIndexer from dataall.modules.s3_datasets.services.dataset_alarm_service import DatasetAlarmService root = logging.getLogger() @@ -68,6 +69,7 @@ def sync_tables(engine): processed_tables.extend(tables) DatasetTableIndexer.upsert_all(session, dataset_uri=dataset.datasetUri) + DatasetIndexer.upsert(session=session, dataset_uri=dataset.datasetUri) except Exception as e: log.error( f'Failed to sync tables for dataset ' diff --git a/backend/local_graphql_server.py b/backend/local_graphql_server.py index 339016b42..1ea96a732 100644 --- a/backend/local_graphql_server.py +++ b/backend/local_graphql_server.py @@ -31,7 +31,7 @@ es = connect(envname=ENVNAME) logger.info('Connected') # create_schema_and_tables(engine, envname=ENVNAME) -load_modules(modes={ImportMode.API, ImportMode.HANDLERS, ImportMode.SHARES_TASK}) +load_modules(modes={ImportMode.API, ImportMode.HANDLERS, ImportMode.SHARES_TASK, ImportMode.CATALOG_INDEXER_TASK}) Base.metadata.create_all(engine.engine) CDKPROXY_URL = 'http://cdkproxy:2805' if ENVNAME == 'dkrcompose' else 'http://localhost:2805' config.set_property('cdk_proxy_url', CDKPROXY_URL) diff --git a/deploy/stacks/container.py b/deploy/stacks/container.py index 6eab0bf88..bbad9efa5 100644 --- a/deploy/stacks/container.py +++ b/deploy/stacks/container.py @@ -195,10 +195,11 @@ def __init__( @run_if(['modules.s3_datasets.active', 'modules.dashboards.active']) def add_catalog_indexer_task(self): + container_id = 'container' catalog_indexer_task, catalog_indexer_task_def = self.set_scheduled_task( cluster=self.ecs_cluster, command=['python3.9', '-m', 'dataall.modules.catalog.tasks.catalog_indexer_task'], - container_id='container', + container_id=container_id, ecr_repository=self._ecr_repository, environment=self._create_env('INFO'), image_tag=self._cdkproxy_image_tag, @@ -212,6 +213,20 @@ def add_catalog_indexer_task(self): prod_sizing=self._prod_sizing, ) + ssm.StringParameter( + self, + f'CatalogIndexerTaskARNSSM{self._envname}', + parameter_name=f'/dataall/{self._envname}/ecs/task_def_arn/catalog_indexer', + string_value=catalog_indexer_task_def.task_definition_arn, + ) + + ssm.StringParameter( + self, + f'CatalogIndexerTaskContainerSSM{self._envname}', + parameter_name=f'/dataall/{self._envname}/ecs/container/catalog_indexer', + string_value=container_id, + ) + self.ecs_task_definitions_families.append(catalog_indexer_task.task_definition.family) @run_if(['modules.s3_datasets.active']) diff --git a/frontend/src/modules/Maintenance/components/MaintenanceViewer.js b/frontend/src/modules/Maintenance/components/MaintenanceViewer.js index 2feb3713e..8fcf252f0 100644 --- a/frontend/src/modules/Maintenance/components/MaintenanceViewer.js +++ b/frontend/src/modules/Maintenance/components/MaintenanceViewer.js @@ -3,6 +3,7 @@ import { Button, Card, CardHeader, + CardContent, CircularProgress, Dialog, Divider, @@ -10,20 +11,26 @@ import { IconButton, MenuItem, TextField, - Typography + Switch, + Typography, + FormControlLabel, + FormGroup, + Alert } from '@mui/material'; -import React, { useCallback, useEffect, useState } from 'react'; import { Article, CancelRounded, SystemUpdate } from '@mui/icons-material'; +import React, { useCallback, useEffect, useState } from 'react'; import { LoadingButton } from '@mui/lab'; import { Label } from 'design'; import { getMaintenanceStatus, stopMaintenanceWindow, - startMaintenanceWindow + startMaintenanceWindow, + startReindexCatalog } from '../services'; import { useClient } from 'services'; import { SET_ERROR, useDispatch } from 'globalErrors'; import { useSnackbar } from 'notistack'; +import { ModuleNames, isModuleEnabled } from 'utils'; const maintenanceModes = [ { value: 'READ-ONLY', label: 'Read-Only' }, @@ -162,12 +169,146 @@ export const MaintenanceConfirmationPopUp = (props) => { ); }; +export const ReIndexConfirmationPopUp = (props) => { + const { popUpReIndex, setPopUpReIndex, setUpdatingReIndex } = props; + const client = useClient(); + const dispatch = useDispatch(); + const { enqueueSnackbar } = useSnackbar(); + const [withDelete, setWithDelete] = useState(false); + + const handleReindexStart = async () => { + setUpdatingReIndex(true); + if (!client) { + dispatch({ + type: SET_ERROR, + error: 'Client not initialized for re-indexing catalog task' + }); + } + const response = await client.mutate( + startReindexCatalog({ handleDeletes: withDelete }) + ); + if (!response.errors && response.data.startReindexCatalog != null) { + const respData = response.data.startReindexCatalog; + if (respData === true) { + enqueueSnackbar('Re Index Task has Started. Please check the status', { + anchorOrigin: { + horizontal: 'right', + vertical: 'top' + }, + variant: 'success' + }); + } else { + enqueueSnackbar('Could not start re index task', { + anchorOrigin: { + horizontal: 'right', + vertical: 'top' + }, + variant: 'success' + }); + } + } else { + const error = response.errors + ? response.errors[0].message + : 'Something went wrong while starting re index task. Please check gql logs'; + dispatch({ type: SET_ERROR, error }); + } + setPopUpReIndex(false); + setUpdatingReIndex(false); + }; + + return ( + + + + Start Data.all Catalog Reindexing Task? + + + + + + Starting a reindexing job will update all catalog objects in + data.all with the latest information found in RDS. + + + + + + { + setWithDelete(!withDelete); + }} + edge="start" + name="withDelete" + /> + } + label={ + + With Deletes + + Specifying withDeletes will identify catalog objects + no longer in data.all's DB (if any) and attempt to delete / + clean up the catalog + + + } + /> + + + + + + + Please confirm if you want to start the reindexing task: + + } + sx={{ m: 1 }} + variant="outlined" + onClick={handleReindexStart} + > + Start + + } + sx={{ m: 1 }} + variant="outlined" + onClick={() => { + setPopUpReIndex(false); + }} + > + Cancel + + + + + + ); +}; + export const MaintenanceViewer = () => { const client = useClient(); const [refreshing, setRefreshing] = useState(false); + const refreshingReIndex = false; + const [updatingReIndex, setUpdatingReIndex] = useState(false); const [updating, setUpdating] = useState(false); const [mode, setMode] = useState(''); const [popUp, setPopUp] = useState(false); + const [popUpReIndex, setPopUpReIndex] = useState(false); const [confirmedMode, setConfirmedMode] = useState(''); const [maintenanceButtonText, setMaintenanceButtonText] = useState(START_MAINTENANCE); @@ -339,6 +480,37 @@ export const MaintenanceViewer = () => { return ( + {refreshingReIndex ? ( + + ) : ( + + {isModuleEnabled(ModuleNames.CATALOG) && ( + + + Re-Index Data.all Catalog} /> + + + setPopUpReIndex(true)} + startIcon={} + sx={{ m: 1 }} + variant="contained" + > + Start Re-Index Catalog Task + + + + + + )} + + )} {refreshing ? ( ) : ( diff --git a/frontend/src/modules/Maintenance/services/index.js b/frontend/src/modules/Maintenance/services/index.js index ca55e15e2..0f057a7e6 100644 --- a/frontend/src/modules/Maintenance/services/index.js +++ b/frontend/src/modules/Maintenance/services/index.js @@ -1,3 +1,4 @@ export * from './getMaintenanceStatus'; export * from './stopMaintenanceWindow'; export * from './startMaintenanceWindow'; +export * from './startReindexCatalog'; diff --git a/frontend/src/modules/Maintenance/services/startReindexCatalog.js b/frontend/src/modules/Maintenance/services/startReindexCatalog.js new file mode 100644 index 000000000..cfde0a568 --- /dev/null +++ b/frontend/src/modules/Maintenance/services/startReindexCatalog.js @@ -0,0 +1,10 @@ +import { gql } from 'apollo-boost'; + +export const startReindexCatalog = ({ handleDeletes }) => ({ + variables: { handleDeletes }, + mutation: gql` + mutation startReindexCatalog($handleDeletes: Boolean!) { + startReindexCatalog(handleDeletes: $handleDeletes) + } + ` +}); diff --git a/tests/modules/s3_datasets/tasks/test_dataset_catalog_indexer.py b/tests/modules/s3_datasets/tasks/test_dataset_catalog_indexer.py index 7b67c3335..8629f149b 100644 --- a/tests/modules/s3_datasets/tasks/test_dataset_catalog_indexer.py +++ b/tests/modules/s3_datasets/tasks/test_dataset_catalog_indexer.py @@ -1,6 +1,6 @@ import pytest -from dataall.modules.catalog.tasks.catalog_indexer_task import index_objects +from dataall.modules.catalog.tasks.catalog_indexer_task import CatalogIndexerTask from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, S3Dataset @@ -54,6 +54,30 @@ def test_catalog_indexer(db, org, env, sync_dataset, table, mocker): mocker.patch( 'dataall.modules.s3_datasets.indexers.dataset_indexer.DatasetIndexer.upsert', return_value=sync_dataset ) - indexed_objects_counter = index_objects(engine=db) + indexed_objects_counter = CatalogIndexerTask.index_objects(engine=db) # Count should be One table + One Dataset = 2 assert indexed_objects_counter == 2 + + +def test_catalog_indexer_with_deletes(db, org, env, sync_dataset, table, mocker): + # When Table no longer exists + mocker.patch('dataall.modules.s3_datasets.indexers.table_indexer.DatasetTableIndexer.upsert_all', return_value=[]) + mocker.patch( + 'dataall.modules.s3_datasets.indexers.dataset_indexer.DatasetIndexer.upsert', return_value=sync_dataset + ) + mocker.patch( + 'dataall.modules.catalog.indexers.base_indexer.BaseIndexer.search', + return_value={'hits': {'hits': [{'_id': table.tableUri}]}}, + ) + delete_doc_path = mocker.patch( + 'dataall.modules.catalog.indexers.base_indexer.BaseIndexer.delete_doc', return_value=True + ) + + # And with_deletes 'True' for index_objects + indexed_objects_counter = CatalogIndexerTask.index_objects(engine=db, with_deletes='True') + + # Index Objects Should call Delete Doc 1 time for Table + assert delete_doc_path.call_count == 1 + + # Count should be One Dataset = 1 + assert indexed_objects_counter == 1