diff --git a/backend/dataall/base/utils/json_utils.py b/backend/dataall/base/utils/json_utils.py index 1a57af3df..9b7c9697b 100644 --- a/backend/dataall/base/utils/json_utils.py +++ b/backend/dataall/base/utils/json_utils.py @@ -37,7 +37,7 @@ def to_json(record): elif isinstance(record, type({'a': 'dict'})): return json.loads(json.dumps(record, default=json_decoder)) elif type(record) in [str, 'unicode']: - return record + return json.dumps(record) elif type(record) in [int, float]: return json.dumps(record) elif isinstance(record, bool): diff --git a/backend/dataall/modules/s3_datasets_shares/api/mutations.py b/backend/dataall/modules/s3_datasets_shares/api/mutations.py index 0af2eb548..3f95f0ca3 100644 --- a/backend/dataall/modules/s3_datasets_shares/api/mutations.py +++ b/backend/dataall/modules/s3_datasets_shares/api/mutations.py @@ -1,6 +1,7 @@ from dataall.base.api import gql from dataall.modules.s3_datasets_shares.api.resolvers import ( verify_dataset_share_objects, + reapply_share_items_share_object_for_dataset, ) @@ -10,3 +11,10 @@ type=gql.Boolean, resolver=verify_dataset_share_objects, ) + +reApplyShareObjectItemsOnDataset = gql.MutationField( + name='reApplyShareObjectItemsOnDataset', + args=[gql.Argument(name='datasetUri', type=gql.NonNullableType(gql.String))], + type=gql.Boolean, + resolver=reapply_share_items_share_object_for_dataset, +) diff --git a/backend/dataall/modules/s3_datasets_shares/api/resolvers.py b/backend/dataall/modules/s3_datasets_shares/api/resolvers.py index c8e73c022..f77076a7a 100644 --- a/backend/dataall/modules/s3_datasets_shares/api/resolvers.py +++ b/backend/dataall/modules/s3_datasets_shares/api/resolvers.py @@ -56,6 +56,10 @@ def verify_dataset_share_objects(context: Context, source, input): return S3ShareService.verify_dataset_share_objects(uri=dataset_uri, share_uris=verify_share_uris) +def reapply_share_items_share_object_for_dataset(context: Context, source, datasetUri: str): + return S3ShareService.reapply_share_items_for_dataset(uri=datasetUri) + + def get_s3_consumption_data(context: Context, source, shareUri: str): return S3ShareService.get_s3_consumption_data(uri=shareUri) diff --git a/backend/dataall/modules/s3_datasets_shares/services/s3_share_service.py b/backend/dataall/modules/s3_datasets_shares/services/s3_share_service.py index de9906493..255544bc7 100644 --- a/backend/dataall/modules/s3_datasets_shares/services/s3_share_service.py +++ b/backend/dataall/modules/s3_datasets_shares/services/s3_share_service.py @@ -7,6 +7,8 @@ from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService from dataall.core.permissions.services.tenant_policy_service import TenantPolicyService from dataall.core.environment.services.environment_service import EnvironmentService +from dataall.core.tasks.db.task_models import Task +from dataall.core.tasks.service_handlers import Worker from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository from dataall.modules.shares_base.db.share_state_machines_repositories import ShareStatusRepository from dataall.modules.shares_base.services.share_item_service import ShareItemService @@ -150,6 +152,17 @@ def verify_dataset_share_objects(uri: str, share_uris: list): ShareItemService.verify_items_share_object(uri=share_uri, item_uris=item_uris) return True + @staticmethod + @TenantPolicyService.has_tenant_permission(MANAGE_DATASETS) + @ResourcePolicyService.has_resource_permission(UPDATE_DATASET) + def reapply_share_items_for_dataset(uri: str): + context = get_context() + with context.db_engine.scoped_session() as session: + reapply_share_items_task: Task = Task(action='ecs.dataset.share.reapply', targetUri=uri) + session.add(reapply_share_items_task) + Worker.queue(engine=context.db_engine, task_ids=[reapply_share_items_task.taskUri]) + return True + @staticmethod def list_shared_tables_by_env_dataset(dataset_uri: str, env_uri: str): context = get_context() diff --git a/backend/dataall/modules/shares_base/db/share_object_repositories.py b/backend/dataall/modules/shares_base/db/share_object_repositories.py index 85c6afce7..596a5771a 100644 --- a/backend/dataall/modules/shares_base/db/share_object_repositories.py +++ b/backend/dataall/modules/shares_base/db/share_object_repositories.py @@ -383,6 +383,15 @@ def paginated_list_shareable_items(session, subqueries: List[Query], data: dict query.order_by(shareable_objects.c.itemName).distinct(), data.get('page', 1), data.get('pageSize', 10) ).to_dict() + @staticmethod + def list_active_share_object_for_dataset(session, dataset_uri): + share_objects = ( + session.query(ShareObject) + .filter(and_(ShareObject.datasetUri == dataset_uri, ShareObject.deleted.is_(None))) + .all() + ) + return share_objects + @staticmethod def fetch_submitted_shares_with_notifications(session): """ diff --git a/backend/dataall/modules/shares_base/handlers/ecs_share_handler.py b/backend/dataall/modules/shares_base/handlers/ecs_share_handler.py index 0bf633a66..b0308dd47 100644 --- a/backend/dataall/modules/shares_base/handlers/ecs_share_handler.py +++ b/backend/dataall/modules/shares_base/handlers/ecs_share_handler.py @@ -1,3 +1,4 @@ +import json import logging import os @@ -5,6 +6,7 @@ from dataall.core.stacks.aws.ecs import Ecs from dataall.core.tasks.db.task_models import Task from dataall.modules.shares_base.services.sharing_service import SharingService +from dataall.modules.shares_base.tasks.share_reapplier_task import EcsBulkShareRepplyService log = logging.getLogger(__name__) @@ -30,21 +32,43 @@ def verify_share(engine, task: Task): def reapply_share(engine, task: Task): return EcsShareHandler._manage_share(engine, task, SharingService.reapply_share, 'reapply_share') + @staticmethod + @Worker.handler(path='ecs.dataset.share.reapply') + def reapply_shares_of_dataset(engine, task: Task): + envname = os.environ.get('envname', 'local') + if envname in ['local', 'dkrcompose']: + EcsBulkShareRepplyService.process_reapply_shares_for_dataset(engine, task.targetUri) + else: + context = [ + {'name': 'datasetUri', 'value': task.targetUri}, + ] + return EcsShareHandler._run_share_management_ecs_task( + task_definition_param_str='ecs/task_def_arn/share_reapplier', + container_name_param_str='ecs/container/share_reapplier', + context=context, + ) + @staticmethod def _manage_share(engine, task: Task, local_handler, ecs_handler: str): envname = os.environ.get('envname', 'local') if envname in ['local', 'dkrcompose']: return local_handler(engine, task.targetUri) else: - return EcsShareHandler._run_share_management_ecs_task(share_uri=task.targetUri, handler=ecs_handler) + share_management_context = [ + {'name': 'shareUri', 'value': task.targetUri}, + {'name': 'handler', 'value': ecs_handler}, + ] + return EcsShareHandler._run_share_management_ecs_task( + task_definition_param_str='ecs/task_def_arn/share_management', + container_name_param_str='ecs/container/share_management', + context=share_management_context, + ) @staticmethod - def _run_share_management_ecs_task(share_uri, handler): - return Ecs.run_ecs_task( - task_definition_param='ecs/task_def_arn/share_management', - container_name_param='ecs/container/share_management', - context=[ - {'name': 'shareUri', 'value': share_uri}, - {'name': 'handler', 'value': handler}, - ], + def _run_share_management_ecs_task(task_definition_param_str, container_name_param_str, context): + ecs_task_arn = Ecs.run_ecs_task( + task_definition_param=task_definition_param_str, + container_name_param=container_name_param_str, + context=context, ) + return {'task_arn': ecs_task_arn} diff --git a/backend/dataall/modules/shares_base/services/share_item_service.py b/backend/dataall/modules/shares_base/services/share_item_service.py index 5ad7c1472..7237e4f4d 100644 --- a/backend/dataall/modules/shares_base/services/share_item_service.py +++ b/backend/dataall/modules/shares_base/services/share_item_service.py @@ -3,12 +3,10 @@ from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService from dataall.core.tasks.service_handlers import Worker from dataall.base.context import get_context -from dataall.core.environment.services.environment_service import EnvironmentService from dataall.core.tasks.db.task_models import Task from dataall.base.db.exceptions import ObjectNotFound, UnauthorizedOperation from dataall.modules.shares_base.services.shares_enums import ( ShareObjectActions, - ShareableType, ShareItemStatus, ShareItemActions, ShareItemHealthStatus, diff --git a/backend/dataall/modules/shares_base/tasks/share_reapplier_task.py b/backend/dataall/modules/shares_base/tasks/share_reapplier_task.py index 872095bce..053d53dd2 100644 --- a/backend/dataall/modules/shares_base/tasks/share_reapplier_task.py +++ b/backend/dataall/modules/shares_base/tasks/share_reapplier_task.py @@ -18,32 +18,68 @@ log = logging.getLogger(__name__) -def reapply_shares(engine): +class EcsBulkShareRepplyService: + @classmethod + def process_reapply_shares_for_dataset(cls, engine, dataset_uri): + with engine.scoped_session() as session: + processed_share_objects = [] + share_objects_for_dataset = ShareObjectRepository.list_active_share_object_for_dataset( + session=session, dataset_uri=dataset_uri + ) + log.info(f'Found {len(share_objects_for_dataset)} active share objects on dataset with uri: {dataset_uri}') + share_object: ShareObject + for share_object in share_objects_for_dataset: + log.info( + f'Re-applying Share Items for Share Object (Share URI: {share_object.shareUri} ) with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}' + ) + processed_share_objects.append(share_object.shareUri) + ShareStatusRepository.update_share_item_health_status_batch( + session=session, + share_uri=share_object.shareUri, + old_status=ShareItemHealthStatus.Unhealthy.value, + new_status=ShareItemHealthStatus.PendingReApply.value, + ) + SharingService.reapply_share(engine, share_uri=share_object.shareUri) + return processed_share_objects + + @classmethod + def process_reapply_shares(cls, engine): + with engine.scoped_session() as session: + processed_share_objects = [] + all_share_objects: [ShareObject] = ShareObjectRepository.list_all_active_share_objects(session) + log.info(f'Found {len(all_share_objects)} share objects ') + share_object: ShareObject + for share_object in all_share_objects: + log.info( + f'Re-applying Share Items for Share Object with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}' + ) + processed_share_objects.append(share_object.shareUri) + ShareStatusRepository.update_share_item_health_status_batch( + session=session, + share_uri=share_object.shareUri, + old_status=ShareItemHealthStatus.Unhealthy.value, + new_status=ShareItemHealthStatus.PendingReApply.value, + ) + SharingService.reapply_share(engine, share_uri=share_object.shareUri) + return processed_share_objects + + +def reapply_shares(engine, dataset_uri): """ A method used by the scheduled ECS Task to re-apply_share() on all data.all active shares + If dataset_uri is provided this ECS will reapply on all unhealthy shares belonging to a dataset + else it will reapply on all data.all active unhealthy shares. """ - with engine.scoped_session() as session: - processed_share_objects = [] - all_share_objects: [ShareObject] = ShareObjectRepository.list_all_active_share_objects(session) - log.info(f'Found {len(all_share_objects)} share objects ') - share_object: ShareObject - for share_object in all_share_objects: - log.info( - f'Re-applying Share Items for Share Object with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}' - ) - processed_share_objects.append(share_object.shareUri) - ShareStatusRepository.update_share_item_health_status_batch( - session=session, - share_uri=share_object.shareUri, - old_status=ShareItemHealthStatus.Unhealthy.value, - new_status=ShareItemHealthStatus.PendingReApply.value, - ) - SharingService.reapply_share(engine, share_uri=share_object.shareUri) - return processed_share_objects + if dataset_uri: + return EcsBulkShareRepplyService.process_reapply_shares_for_dataset(engine, dataset_uri) + else: + return EcsBulkShareRepplyService.process_reapply_shares(engine) if __name__ == '__main__': load_modules(modes={ImportMode.SHARES_TASK}) ENVNAME = os.environ.get('envname', 'local') ENGINE = get_engine(envname=ENVNAME) - reapply_shares(engine=ENGINE) + dataset_uri = os.environ.get('datasetUri', '') + processed_shares = reapply_shares(engine=ENGINE, dataset_uri=dataset_uri) + log.info(f'Finished processing {len(processed_shares)} shares') diff --git a/deploy/stacks/container.py b/deploy/stacks/container.py index 20ce3a1c5..6eab0bf88 100644 --- a/deploy/stacks/container.py +++ b/deploy/stacks/container.py @@ -298,6 +298,21 @@ def add_share_reapplier_task(self): ), readonly_root_filesystem=True, ) + + ssm.StringParameter( + self, + f'ShareReapplierTaskARNSSM{self._envname}', + parameter_name=f'/dataall/{self._envname}/ecs/task_def_arn/share_reapplier', + string_value=share_reapplier_task_definition.task_definition_arn, + ) + + ssm.StringParameter( + self, + f'ShareReapplierTaskContainerSSM{self._envname}', + parameter_name=f'/dataall/{self._envname}/ecs/container/share_reapplier', + string_value=share_reapplier_container.container_name, + ) + self.ecs_task_definitions_families.append(share_reapplier_task_definition.family) @run_if(['modules.dataset_base.features.share_notifications.email.persistent_reminders']) diff --git a/frontend/src/modules/Shares/components/ShareBoxList.js b/frontend/src/modules/Shares/components/ShareBoxList.js index adc48099c..a8611e43e 100644 --- a/frontend/src/modules/Shares/components/ShareBoxList.js +++ b/frontend/src/modules/Shares/components/ShareBoxList.js @@ -11,7 +11,7 @@ import CircularProgress from '@mui/material/CircularProgress'; import CheckBoxOutlineBlankIcon from '@mui/icons-material/CheckBoxOutlineBlank'; import CheckBoxIcon from '@mui/icons-material/CheckBox'; import PropTypes from 'prop-types'; -import { useCallback, useEffect, useState } from 'react'; +import React, { useCallback, useEffect, useState } from 'react'; import { Helmet } from 'react-helmet-async'; import { Defaults, Pager, ShareStatus, useSettings } from 'design'; import { SET_ERROR, useDispatch } from 'globalErrors'; @@ -29,6 +29,9 @@ import { ShareBoxListItem } from './ShareBoxListItem'; import { ShareObjectSelectorModal } from './ShareObjectSelectorModal'; import { NavigateShareViewModal } from './NavigateShareViewModal'; import { ShareStatusList } from '../constants'; +import { RefreshRounded } from '@mui/icons-material'; +import { reApplyShareObjectItemsOnDataset } from '../services/reApplyShareObjectItemsOnDataset'; +import { useSnackbar } from 'notistack'; const icon = ; const checkedIcon = ; @@ -53,7 +56,10 @@ export const ShareBoxList = (props) => { useState(false); const [isNavigateShareViewModalOpen, setIsNavigateShareViewModalOpen] = useState(false); + const [reApplyButtonLoadingState, setreApplyButtonLoadingState] = + useState(false); const statusOptions = ShareStatusList; + const { enqueueSnackbar } = useSnackbar(); const handleVerifyObjectItemsModalOpen = () => { setIsVerifyObjectItemsModalOpen(true); @@ -256,6 +262,33 @@ export const ShareBoxList = (props) => { .finally(() => setLoading(false)); }, [client, dispatch]); + const reapplyShares = async (datasetUri) => { + try { + setreApplyButtonLoadingState(true); + const response = await client.mutate( + reApplyShareObjectItemsOnDataset({ datasetUri: datasetUri }) + ); + if (response && !response.errors) { + setreApplyButtonLoadingState(false); + enqueueSnackbar( + `Reapplying process for all unhealthy shares on dataset with uri: ${datasetUri} has started. Please check each individual share for share item health status`, + { + anchorOrigin: { + horizontal: 'right', + vertical: 'top' + }, + variant: 'success' + } + ); + } else { + dispatch({ type: SET_ERROR, error: response.errors[0].message }); + } + } catch (error) { + setreApplyButtonLoadingState(false); + dispatch({ type: SET_ERROR, error: error?.message }); + } + }; + useEffect(() => { setLoading(true); setFilter({ page: 1, pageSize: 10, term: '' }); @@ -337,6 +370,23 @@ export const ShareBoxList = (props) => { )} + {dataset && ( + } + sx={{ m: 1 }} + onClick={(event) => { + reapplyShares(dataset.datasetUri); + }} + type="button" + variant="outlined" + > + Re-apply Share Item(s) for Dataset + + )} + ({ + variables: { + datasetUri + }, + mutation: gql` + mutation reApplyShareObjectItemsOnDataset($datasetUri: String!) { + reApplyShareObjectItemsOnDataset(datasetUri: $datasetUri) + } + ` +});