From 8750f6323c67390cd6a19f23f9b2df6f8e057054 Mon Sep 17 00:00:00 2001 From: Tejas Rajopadhye Date: Thu, 26 Dec 2024 15:21:03 -0600 Subject: [PATCH] Notification Improvements Stage 1, 70% --- .../catalog/tasks/catalog_indexer_task.py | 6 + .../services/admin_notifications.py | 22 ++++ .../ses_email_notification_service.py | 29 +++++ .../omics/tasks/omics_workflows_fetcher.py | 92 +++++++------- .../redshift_table_share_processor.py | 7 +- .../s3_datasets/tasks/tables_syncer.py | 106 +++++++++------- .../glue_table_share_processor.py | 6 +- .../s3_access_point_share_processor.py | 6 +- .../s3_bucket_share_processor.py | 6 +- .../services/share_notification_service.py | 118 +++++++++++++----- .../services/share_processor_manager.py | 2 +- .../shares_base/services/sharing_service.py | 52 +++++++- .../tasks/persistent_email_reminders_task.py | 37 +++--- .../tasks/share_expiration_task.py | 106 +++++++++------- .../shares_base/tasks/share_manager_task.py | 6 + .../shares_base/tasks/share_reapplier_task.py | 96 ++++++++------ .../shares_base/tasks/share_verifier_task.py | 44 +++++-- deploy/stacks/container.py | 16 +-- .../test_redshift_table_processor.py | 20 +-- 19 files changed, 518 insertions(+), 259 deletions(-) create mode 100644 backend/dataall/modules/notifications/services/admin_notifications.py diff --git a/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py b/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py index 807b712a3..4183c4bc5 100644 --- a/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py +++ b/backend/dataall/modules/catalog/tasks/catalog_indexer_task.py @@ -8,6 +8,7 @@ from dataall.base.db import get_engine from dataall.base.loader import load_modules, ImportMode from dataall.base.utils.alarm_service import AlarmService +from dataall.modules.notifications.services.admin_notifications import AdminNotificationService log = logging.getLogger(__name__) @@ -32,6 +33,11 @@ def index_objects(cls, engine, with_deletes='False'): return len(indexed_object_uris) except Exception as e: AlarmService().trigger_catalog_indexing_failure_alarm(error=str(e)) + AdminNotificationService().notify_admins_with_error_log( + process_error='Exception occurred during cataloging task', + error_logs=[str(e)], + process_name='Catalog Task' + ) raise e @classmethod diff --git a/backend/dataall/modules/notifications/services/admin_notifications.py b/backend/dataall/modules/notifications/services/admin_notifications.py new file mode 100644 index 000000000..02b15e179 --- /dev/null +++ b/backend/dataall/modules/notifications/services/admin_notifications.py @@ -0,0 +1,22 @@ +from typing import List + +from dataall.modules.notifications.services.ses_email_notification_service import SESEmailNotificationService + + +class AdminNotificationService: + admin_group = 'DAAdministrators' + + def notify_admins_with_error_log(self, process_error: str, error_logs: List[str], process_name:str = ''): + + subject = f'Data.all alert | Attention Required | Failure in : {process_name}' + email_message = f""" + Following error occurred when , {process_error}

+ """ + for error_log in error_logs: + email_message += error_log + "

" + + SESEmailNotificationService.create_and_send_email_notifications( + subject=subject, + msg=email_message, + recipient_groups_list=[AdminNotificationService.admin_group] + ) \ No newline at end of file diff --git a/backend/dataall/modules/notifications/services/ses_email_notification_service.py b/backend/dataall/modules/notifications/services/ses_email_notification_service.py index 29edfce64..000c0d2da 100644 --- a/backend/dataall/modules/notifications/services/ses_email_notification_service.py +++ b/backend/dataall/modules/notifications/services/ses_email_notification_service.py @@ -3,6 +3,7 @@ from dataall.base.aws.cognito import Cognito from dataall.base.aws.ses import Ses +from dataall.base.config import config from dataall.base.services.service_provider_factory import ServiceProviderFactory from dataall.modules.notifications.services.base_email_notification_service import BaseEmailNotificationService @@ -60,3 +61,31 @@ def send_email_to_users(email_list, email_provider, message, subject): # https://aws.amazon.com/blogs/messaging-and-targeting/how-to-send-messages-to-multiple-recipients-with-amazon-simple-email-service-ses/ for emailId in email_list: email_provider.send_email([emailId], message, subject) + + @staticmethod + def create_and_send_email_notifications(subject, msg, recipient_groups_list=None, recipient_email_ids=None): + """ + Method to directly send email notification instead of creating an SQS Task + This approach is used while sending email notifications in an ECS task ( e.g. persistent email reminder task, share expiration task, etc ) + Emails send to groups mentioned in recipient_groups_list and / or emails mentioned in recipient_email_ids + """ + if recipient_groups_list is None: + recipient_groups_list = [] + if recipient_email_ids is None: + recipient_email_ids = [] + + share_notification_config = config.get_property( + 'modules.datasets_base.features.share_notifications', default=None + ) + if share_notification_config: + for share_notification_config_type in share_notification_config.keys(): + n_config = share_notification_config[share_notification_config_type] + if n_config.get('active', False) == True: + if share_notification_config_type == 'email': + SESEmailNotificationService.send_email_task( + subject, msg, recipient_groups_list, recipient_email_ids + ) + else: + log.info(f'Notification type : {share_notification_config_type} is not active') + else: + log.info('Notifications are not active') \ No newline at end of file diff --git a/backend/dataall/modules/omics/tasks/omics_workflows_fetcher.py b/backend/dataall/modules/omics/tasks/omics_workflows_fetcher.py index d20d69cfc..40c52acd4 100644 --- a/backend/dataall/modules/omics/tasks/omics_workflows_fetcher.py +++ b/backend/dataall/modules/omics/tasks/omics_workflows_fetcher.py @@ -5,6 +5,7 @@ from dataall.core.environment.db.environment_models import Environment from dataall.base.db import get_engine +from dataall.modules.notifications.services.admin_notifications import AdminNotificationService from dataall.modules.omics.aws.omics_client import OmicsClient from dataall.modules.omics.db.omics_models import OmicsWorkflow from dataall.modules.omics.api.enums import OmicsWorkflowType @@ -16,49 +17,56 @@ def fetch_omics_workflows(engine): """List Omics workflows.""" - log.info('Starting omics workflows fetcher') - with engine.scoped_session() as session: - environments = OmicsRepository(session).list_environments_with_omics_enabled() - # designed for ready2run and private workflows; when private workflow support is - # introduced, we will need go over all environments - if len(environments) == 0: - log.info('No environments found. Nothing to do.') - return True - env = environments[0] - ready_workflows = OmicsClient(awsAccountId=env.AwsAccountId, region=env.region).list_workflows( - type=OmicsWorkflowType.READY2RUN.value - ) - # Removing private workflows until fully supported after initial launch - # private_workflows = OmicsClient.list_workflows(awsAccountId=env.AwsAccountId, region=env.region, type=OmicsWorkflowType.PRIVATE.value) - workflows = ready_workflows # + private_workflows - log.info(f'Found workflows {str(workflows)} in environment {env.environmentUri}') - for workflow in workflows: - log.info(f"Processing workflow name={workflow['name']}, id={workflow['id']}...") - existing_workflow = OmicsRepository(session).get_workflow_by_id(workflow['id']) - if existing_workflow is not None: - log.info( - f"Workflow name={workflow['name']}, id={workflow['id']} has already been registered in database. Updating information..." - ) - existing_workflow.name = workflow['name'] - existing_workflow.label = workflow['name'] - session.commit() - - else: - log.info( - f"Workflow name={workflow['name']} , id={workflow['id']} in environment {env.environmentUri} is new. Registering..." - ) - omicsWorkflow = OmicsWorkflow( - id=workflow['id'], - name=workflow['name'], - arn=workflow['arn'], - type=workflow['type'], - environmentUri=env.environmentUri, - label=workflow['name'], - owner=env.environmentUri, - ) - OmicsRepository(session).save_omics_workflow(omicsWorkflow) - return True + try: + log.info('Starting omics workflows fetcher') + with engine.scoped_session() as session: + environments = OmicsRepository(session).list_environments_with_omics_enabled() + # designed for ready2run and private workflows; when private workflow support is + # introduced, we will need go over all environments + if len(environments) == 0: + log.info('No environments found. Nothing to do.') + return True + env = environments[0] + ready_workflows = OmicsClient(awsAccountId=env.AwsAccountId, region=env.region).list_workflows( + type=OmicsWorkflowType.READY2RUN.value + ) + # Removing private workflows until fully supported after initial launch + # private_workflows = OmicsClient.list_workflows(awsAccountId=env.AwsAccountId, region=env.region, type=OmicsWorkflowType.PRIVATE.value) + workflows = ready_workflows # + private_workflows + log.info(f'Found workflows {str(workflows)} in environment {env.environmentUri}') + for workflow in workflows: + log.info(f"Processing workflow name={workflow['name']}, id={workflow['id']}...") + existing_workflow = OmicsRepository(session).get_workflow_by_id(workflow['id']) + if existing_workflow is not None: + log.info( + f"Workflow name={workflow['name']}, id={workflow['id']} has already been registered in database. Updating information..." + ) + existing_workflow.name = workflow['name'] + existing_workflow.label = workflow['name'] + session.commit() + else: + log.info( + f"Workflow name={workflow['name']} , id={workflow['id']} in environment {env.environmentUri} is new. Registering..." + ) + omicsWorkflow = OmicsWorkflow( + id=workflow['id'], + name=workflow['name'], + arn=workflow['arn'], + type=workflow['type'], + environmentUri=env.environmentUri, + label=workflow['name'], + owner=env.environmentUri, + ) + OmicsRepository(session).save_omics_workflow(omicsWorkflow) + return True + except Exception as e: + log.error(f'Error occured while processing omics workflow task due to: {e}') + AdminNotificationService().notify_admins_with_error_log( + process_error='Error occured while processing omics workflow task', + error_logs=[str(e)], + process_name='Omics Workflow' + ) if __name__ == '__main__': ENVNAME = os.environ.get('envname', 'local') diff --git a/backend/dataall/modules/redshift_datasets_shares/services/redshift_table_share_processor.py b/backend/dataall/modules/redshift_datasets_shares/services/redshift_table_share_processor.py index ec9a05932..6c49c3dee 100644 --- a/backend/dataall/modules/redshift_datasets_shares/services/redshift_table_share_processor.py +++ b/backend/dataall/modules/redshift_datasets_shares/services/redshift_table_share_processor.py @@ -441,7 +441,7 @@ def process_revoked_shares(self) -> bool: return False return success - def verify_shares(self) -> bool: + def verify_shares_health_status(self) -> bool: """ 1) (in source namespace) Check the datashare exists 2) (in source namespace) Check that schema is added to datashare @@ -459,7 +459,7 @@ def verify_shares(self) -> bool: 9) (in target namespace) Check that the redshift role has select access to the requested table in the local db. 10) (in target namespace) Check that the redshift role has select access to the requested table in the external schema. """ - + share_object_item_health_status = True log.info('##### Verifying Redshift tables #######') if not self.tables: log.info('No tables to verify. Skipping...') @@ -599,11 +599,12 @@ def verify_shares(self) -> bool: ' | '.join(ds_level_errors) + ' | ' + ' | '.join(tbl_level_errors), datetime.now(), ) + share_object_item_health_status = False else: ShareStatusRepository.update_share_item_health_status( self.session, share_item, ShareItemHealthStatus.Healthy.value, None, datetime.now() ) - return True + return share_object_item_health_status def cleanup_shares(self) -> bool: """ diff --git a/backend/dataall/modules/s3_datasets/tasks/tables_syncer.py b/backend/dataall/modules/s3_datasets/tasks/tables_syncer.py index 9bd47dd3c..fea3edcde 100644 --- a/backend/dataall/modules/s3_datasets/tasks/tables_syncer.py +++ b/backend/dataall/modules/s3_datasets/tasks/tables_syncer.py @@ -7,6 +7,7 @@ from dataall.core.environment.db.environment_models import Environment, EnvironmentGroup from dataall.core.environment.services.environment_service import EnvironmentService from dataall.base.db import get_engine +from dataall.modules.notifications.services.admin_notifications import AdminNotificationService from dataall.modules.s3_datasets.aws.glue_dataset_client import DatasetCrawler from dataall.modules.s3_datasets.aws.lf_table_client import LakeFormationTableClient from dataall.modules.s3_datasets.services.dataset_table_service import DatasetTableService @@ -20,60 +21,75 @@ def sync_tables(engine): - with engine.scoped_session() as session: - processed_tables = [] - all_datasets: [S3Dataset] = DatasetRepository.list_all_active_datasets(session) - log.info(f'Found {len(all_datasets)} datasets for tables sync') - dataset: S3Dataset - for dataset in all_datasets: - log.info(f'Synchronizing dataset {dataset.name}|{dataset.datasetUri} tables') - env: Environment = ( - session.query(Environment) - .filter( - and_( - Environment.environmentUri == dataset.environmentUri, - Environment.deleted.is_(None), + task_exceptions = [] + try: + with engine.scoped_session() as session: + processed_tables = [] + all_datasets: [S3Dataset] = DatasetRepository.list_all_active_datasets(session) + log.info(f'Found {len(all_datasets)} datasets for tables sync') + dataset: S3Dataset + for dataset in all_datasets: + log.info(f'Synchronizing dataset {dataset.name}|{dataset.datasetUri} tables') + env: Environment = ( + session.query(Environment) + .filter( + and_( + Environment.environmentUri == dataset.environmentUri, + Environment.deleted.is_(None), + ) ) + .first() + ) + env_group: EnvironmentGroup = EnvironmentService.get_environment_group( + session, dataset.SamlAdminGroupName, env.environmentUri ) - .first() - ) - env_group: EnvironmentGroup = EnvironmentService.get_environment_group( - session, dataset.SamlAdminGroupName, env.environmentUri - ) - try: - if not env or not is_assumable_pivot_role(env): - log.info(f'Dataset {dataset.GlueDatabaseName} has an invalid environment') - else: - tables = DatasetCrawler(dataset).list_glue_database_tables(dataset.S3BucketName) + try: + if not env or not is_assumable_pivot_role(env): + log.info(f'Dataset {dataset.GlueDatabaseName} has an invalid environment') + else: + tables = DatasetCrawler(dataset).list_glue_database_tables(dataset.S3BucketName) - log.info(f'Found {len(tables)} tables on Glue database {dataset.GlueDatabaseName}') + log.info(f'Found {len(tables)} tables on Glue database {dataset.GlueDatabaseName}') - DatasetTableService.sync_existing_tables(session, uri=dataset.datasetUri, glue_tables=tables) + DatasetTableService.sync_existing_tables(session, uri=dataset.datasetUri, glue_tables=tables) - tables = session.query(DatasetTable).filter(DatasetTable.datasetUri == dataset.datasetUri).all() + tables = session.query(DatasetTable).filter(DatasetTable.datasetUri == dataset.datasetUri).all() - log.info('Updating tables permissions on Lake Formation...') + log.info('Updating tables permissions on Lake Formation...') - for table in tables: - LakeFormationTableClient(table).grant_principals_all_table_permissions( - principals=[ - SessionHelper.get_delegation_role_arn(env.AwsAccountId, env.region), - env_group.environmentIAMRoleArn, - ], - ) + for table in tables: + LakeFormationTableClient(table).grant_principals_all_table_permissions( + principals=[ + SessionHelper.get_delegation_role_arn(env.AwsAccountId, env.region), + env_group.environmentIAMRoleArn, + ], + ) - processed_tables.extend(tables) + processed_tables.extend(tables) - DatasetTableIndexer.upsert_all(session, dataset_uri=dataset.datasetUri) - DatasetIndexer.upsert(session=session, dataset_uri=dataset.datasetUri) - except Exception as e: - log.error( - f'Failed to sync tables for dataset ' - f'{dataset.AwsAccountId}/{dataset.GlueDatabaseName} ' - f'due to: {e}' - ) - DatasetAlarmService().trigger_dataset_sync_failure_alarm(dataset, str(e)) - return processed_tables + DatasetTableIndexer.upsert_all(session, dataset_uri=dataset.datasetUri) + DatasetIndexer.upsert(session=session, dataset_uri=dataset.datasetUri) + except Exception as e: + log.error( + f'Failed to sync tables for dataset ' + f'{dataset.AwsAccountId}/{dataset.GlueDatabaseName} ' + f'due to: {e}' + ) + DatasetAlarmService().trigger_dataset_sync_failure_alarm(dataset, str(e)) + task_exceptions.append(str(e)) + return processed_tables + except Exception as e: + log.error( + f'Error while running table syncer task due to: {e}' + ) + task_exceptions.append(str(e)) + finally: + if len(task_exceptions) > 0: + AdminNotificationService().notify_admins_with_error_log( + process_name='Table Syncer', + error_logs=task_exceptions, + process_error='Error while running table syncer task' + ) def is_assumable_pivot_role(env: Environment): diff --git a/backend/dataall/modules/s3_datasets_shares/services/share_processors/glue_table_share_processor.py b/backend/dataall/modules/s3_datasets_shares/services/share_processors/glue_table_share_processor.py index 97282b574..359666d82 100644 --- a/backend/dataall/modules/s3_datasets_shares/services/share_processors/glue_table_share_processor.py +++ b/backend/dataall/modules/s3_datasets_shares/services/share_processors/glue_table_share_processor.py @@ -353,8 +353,9 @@ def process_revoked_shares(self) -> bool: success = False return success - def verify_shares(self) -> bool: + def verify_shares_health_status(self) -> bool: log.info('##### Verifying tables #######') + share_object_item_health_status = True if not self.tables: log.info('No tables to verify. Skipping...') else: @@ -430,11 +431,12 @@ def verify_shares(self) -> bool: ' | '.join(manager.db_level_errors) + ' | ' + ' | '.join(manager.tbl_level_errors), datetime.now(), ) + share_object_item_health_status = False else: ShareStatusRepository.update_share_item_health_status( self.session, share_item, ShareItemHealthStatus.Healthy.value, None, datetime.now() ) - return True + return share_object_item_health_status def cleanup_shares(self) -> bool: """ diff --git a/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_access_point_share_processor.py b/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_access_point_share_processor.py index d522bb79d..41b214490 100644 --- a/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_access_point_share_processor.py +++ b/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_access_point_share_processor.py @@ -172,7 +172,8 @@ def process_revoked_shares(self) -> bool: return success - def verify_shares(self) -> bool: + def verify_shares_health_status(self) -> bool: + share_object_item_health_status = True log.info('##### Verifying folders shares #######') if not self.folders: log.info('No Folders to verify. Skipping...') @@ -208,11 +209,12 @@ def verify_shares(self) -> bool: ' | '.join(manager.folder_errors), datetime.now(), ) + share_object_item_health_status = False else: ShareStatusRepository.update_share_item_health_status( self.session, sharing_item, ShareItemHealthStatus.Healthy.value, None, datetime.now() ) - return True + return share_object_item_health_status def cleanup_shares(self) -> bool: """ diff --git a/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_bucket_share_processor.py b/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_bucket_share_processor.py index 447988156..ef968ed5d 100644 --- a/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_bucket_share_processor.py +++ b/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_bucket_share_processor.py @@ -158,7 +158,8 @@ def process_revoked_shares(self) -> bool: return success - def verify_shares(self) -> bool: + def verify_shares_health_status(self) -> bool: + share_object_item_health_status = True log.info('##### Verifying S3 bucket share #######') if not self.buckets: log.info('No Buckets to verify. Skipping...') @@ -192,11 +193,12 @@ def verify_shares(self) -> bool: ' | '.join(manager.bucket_errors), datetime.now(), ) + share_object_item_health_status = False else: ShareStatusRepository.update_share_item_health_status( self.session, sharing_item, ShareItemHealthStatus.Healthy.value, None, datetime.now() ) - return True + return share_object_item_health_status def cleanup_shares(self) -> bool: """ diff --git a/backend/dataall/modules/shares_base/services/share_notification_service.py b/backend/dataall/modules/shares_base/services/share_notification_service.py index f8e664883..f03317bb9 100644 --- a/backend/dataall/modules/shares_base/services/share_notification_service.py +++ b/backend/dataall/modules/shares_base/services/share_notification_service.py @@ -1,6 +1,7 @@ import logging import enum import os +from typing import List from dataall.base.config import config from dataall.core.tasks.db.task_models import Task @@ -24,6 +25,9 @@ class DataSharingNotificationType(enum.Enum): SHARE_OBJECT_EXTENSION_REJECTED = 'SHARE_OBJECT_EXTENSION_REJECTED' SHARE_OBJECT_REJECTED = 'SHARE_OBJECT_REJECTED' SHARE_OBJECT_PENDING_APPROVAL = 'SHARE_OBJECT_PENDING_APPROVAL' + SHARE_OBJECT_FAILED = 'SHARE_OBJECT_FAILED' + SHARE_OBJECT_UNHEALTHY = 'SHARE_OBJECT_UNHEALTHY' + SHARE_OBJECT_HEALTHY = 'SHARE_OBJECT_HEALTHY' DATASET_VERSION = 'DATASET_VERSION' @@ -102,7 +106,7 @@ def notify_persistent_email_reminder(self, email_id: str): msg=msg_intro.replace('
', '').replace('', '').replace('', ''), ) - self._create_and_send_email_notifications( + SESEmailNotificationService.create_and_send_email_notifications( subject=subject, msg=email_notification_msg, recipient_groups_list=[self.dataset.SamlAdminGroupName, self.dataset.stewards], @@ -219,7 +223,7 @@ def notify_share_expiration_to_owners(self): msg=msg_intro.replace('
', '').replace('', '').replace('', ''), ) - self._create_and_send_email_notifications( + SESEmailNotificationService.create_and_send_email_notifications( subject=subject, msg=email_notification_msg, recipient_groups_list=[self.dataset.SamlAdminGroupName, self.dataset.stewards], @@ -253,11 +257,84 @@ def notify_share_expiration_to_requesters(self): msg=msg_intro.replace('
', '').replace('', '').replace('', ''), ) - self._create_and_send_email_notifications( + SESEmailNotificationService.create_and_send_email_notifications( subject=subject, msg=email_notification_msg, recipient_groups_list=[self.share.groupUri] ) return notifications + def notify_share_object_failed(self): + share_link_text = '' + if os.environ.get('frontend_domain_url'): + share_link_text = ( + f'

Please visit data.all share link ' + f'to take action or view more details' + ) + msg = ( + f'Share request made for dataset: {self.dataset.label} with requestor principal: {self.share.principalRoleName} failed.

' + f'You can delete and resubmit the failed items in the share. If your share item still remains in the Share_Failed state then please get in touch with data.all admins.' + ) + subject = f'Data.all | Attention Required | Share failed for {self.dataset.label}' + email_notification_msg = msg + share_link_text + + notifications = self.register_notifications( + notification_type=DataSharingNotificationType.SHARE_OBJECT_FAILED.value, msg=msg + ) + + SESEmailNotificationService.create_and_send_email_notifications(subject=subject, msg=email_notification_msg, recipient_groups_list=[self.share.groupUri, self.dataset.SamlAdminGroupName, self.dataset.stewards]) + + return notifications + + # Send notification when any of the share item gets into unhealthy state + def notify_share_object_items_unhealthy(self): + share_link_text = '' + if os.environ.get('frontend_domain_url'): + share_link_text = ( + f'

Please visit data.all share link ' + f'to take action or view more details' + ) + msg = ( + f'Hello Team,
' + f'Your share with share uri: {self.share.shareUri} has one or more unhealthy share items.

' + f'Once you visit your share link you can click on the Reapply button and this should correct your share and get it into an healthy state. If this doesn\'t get your share in healthy state then please get in touch with data.all admins for your share.' + f'
If you are using any terraform / cloudformation or any other IaC to also manage your bucket policy, kms policy and requestor IAM role, please make them aware of the data.all changes so that they don\'t wipe off data.all related policies' + ) + subject = f'Data.all | Attention Required | Share for {self.dataset.label} dataset in unhealthy state' + email_notification_msg = msg + share_link_text + + notifications = self.register_notifications( + notification_type=DataSharingNotificationType.SHARE_OBJECT_UNHEALTHY.value, msg=msg, to_recipients=[self.share.groupUri] + ) + + SESEmailNotificationService.create_and_send_email_notifications(subject=subject, msg=email_notification_msg, recipient_groups_list=[self.share.groupUri]) + + return notifications + + # Send notifications when a share gets into healthy state. + # These notifications are sent when a share which was initially unhealthy goes into healthy state + def notify_share_object_items_healthy(self): + share_link_text = '' + if os.environ.get('frontend_domain_url'): + share_link_text = ( + f'

Please visit data.all share link ' + f'to take action or view more details' + ) + msg = ( + f'Hello Team,
' + f'Your share with share uri: {self.share.shareUri} is in healthy state
' + ) + subject = f'Data.all | Share for {self.dataset.label} dataset now in healthy state' + email_notification_msg = msg + share_link_text + + notifications = self.register_notifications( + notification_type=DataSharingNotificationType.SHARE_OBJECT_HEALTHY.value, msg=msg, to_recipients=[self.share.groupUri] + ) + + SESEmailNotificationService.create_and_send_email_notifications(subject=subject, msg=email_notification_msg, recipient_groups_list=[self.share.groupUri]) + return notifications + def _get_share_object_targeted_users(self): targeted_users = list() targeted_users.append(self.dataset.SamlAdminGroupName) @@ -266,15 +343,20 @@ def _get_share_object_targeted_users(self): targeted_users.append(self.share.groupUri) return targeted_users - def register_notifications(self, notification_type, msg): + def register_notifications(self, notification_type, msg, to_recipients: List[any] = None): """ Notifications sent to: + if to_recipients is None - dataset.SamlAdminGroupName - dataset.stewards - share.groupUri + else + - to_recipients """ + if not to_recipients: + to_recipients = self.notification_target_users notifications = [] - for recipient in self.notification_target_users: + for recipient in to_recipients: log.info(f'Creating notification for {recipient}, msg {msg}') notifications.append( NotificationRepository.create_notification( @@ -333,29 +415,3 @@ def _create_notification_task(self, subject, msg): else: log.info('Notifications are not active') - def _create_and_send_email_notifications(self, subject, msg, recipient_groups_list=None, recipient_email_ids=None): - """ - Method to directly send email notification instead of creating an SQS Task - This approach is used while sending email notifications in an ECS task ( e.g. persistent email reminder task, share expiration task, etc ) - Emails send to groups mentioned in recipient_groups_list and / or emails mentioned in recipient_email_ids - """ - if recipient_groups_list is None: - recipient_groups_list = [] - if recipient_email_ids is None: - recipient_email_ids = [] - - share_notification_config = config.get_property( - 'modules.datasets_base.features.share_notifications', default=None - ) - if share_notification_config: - for share_notification_config_type in share_notification_config.keys(): - n_config = share_notification_config[share_notification_config_type] - if n_config.get('active', False) == True: - if share_notification_config_type == 'email': - SESEmailNotificationService.send_email_task( - subject, msg, recipient_groups_list, recipient_email_ids - ) - else: - log.info(f'Notification type : {share_notification_config_type} is not active') - else: - log.info('Notifications are not active') diff --git a/backend/dataall/modules/shares_base/services/share_processor_manager.py b/backend/dataall/modules/shares_base/services/share_processor_manager.py index e16637e56..4c989c9aa 100644 --- a/backend/dataall/modules/shares_base/services/share_processor_manager.py +++ b/backend/dataall/modules/shares_base/services/share_processor_manager.py @@ -19,7 +19,7 @@ def process_revoked_shares(self) -> bool: ... @abstractmethod - def verify_shares(self) -> bool: + def verify_shares_health_status(self) -> bool: """Executes a series of actions to verify share items using the share manager. Returns True if the verifying was successful""" ... diff --git a/backend/dataall/modules/shares_base/services/sharing_service.py b/backend/dataall/modules/shares_base/services/sharing_service.py index dd8749bb5..dc1711a21 100644 --- a/backend/dataall/modules/shares_base/services/sharing_service.py +++ b/backend/dataall/modules/shares_base/services/sharing_service.py @@ -1,14 +1,17 @@ import logging from dataclasses import dataclass -from typing import Any +from typing import Any, List + from dataall.core.resource_lock.db.resource_lock_repositories import ResourceLockRepository from dataall.base.db import Engine from dataall.core.environment.db.environment_models import ConsumptionRole, Environment, EnvironmentGroup +from dataall.modules.notifications.services.admin_notifications import AdminNotificationService from dataall.modules.shares_base.db.share_object_state_machines import ( ShareObjectSM, ShareItemSM, ) +from dataall.modules.shares_base.services.share_notification_service import ShareNotificationService from dataall.modules.shares_base.services.shares_enums import ( ShareItemHealthStatus, ShareObjectActions, @@ -127,11 +130,16 @@ def approve_share(cls, engine: Engine, share_uri: str) -> bool: log.exception('Error occurred during share approval') new_share_item_state = share_item_sm.run_transition(ShareItemActions.Failure.value) share_item_sm.update_state(session, share_data.share.shareUri, new_share_item_state) - return False + share_successful = False finally: new_share_state = share_object_sm.run_transition(ShareObjectActions.Finish.value) share_object_sm.update_state(session, share_data.share, new_share_state) + if not share_successful: + # Create UI and email notifications + ShareNotificationService(session=session, dataset=share_data.dataset, + share=share_data.share).notify_share_object_failed() + return share_successful @classmethod def revoke_share(cls, engine: Engine, share_uri: str) -> bool: @@ -224,7 +232,7 @@ def revoke_share(cls, engine: Engine, share_uri: str) -> bool: log.error(f'Error occurred during share revoking: {e}') new_share_item_state = share_item_sm.run_transition(ShareItemActions.Failure.value) share_item_sm.update_state(session, share_data.share.shareUri, new_share_item_state) - return False + revoke_successful = False finally: existing_pending_items = ShareStatusRepository.check_pending_share_items(session, share_uri) @@ -233,6 +241,11 @@ def revoke_share(cls, engine: Engine, share_uri: str) -> bool: else: new_share_state = share_sm.run_transition(ShareObjectActions.Finish.value) share_sm.update_state(session, share_data.share, new_share_state) + if not revoke_successful: + # Create UI and email notifications + ShareNotificationService(session=session, dataset=share_data.dataset, + share=share_data.share).notify_share_object_failed() + return revoke_successful @classmethod def verify_share( @@ -255,6 +268,7 @@ def verify_share( ------- """ with engine.scoped_session() as session: + share_object_item_health_status: List = [] share_data, share_items = cls._get_share_data_and_items(session, share_uri, status, healthStatus) for type, processor in ShareProcessorManager.SHARING_PROCESSORS.items(): try: @@ -268,12 +282,19 @@ def verify_share( healthStatus=healthStatus, ) if shareable_items: - processor.Processor(session, share_data, shareable_items).verify_shares() + health_status = processor.Processor(session, share_data, shareable_items).verify_shares_health_status() + share_object_item_health_status.append(health_status) else: log.info(f'There are no items to verify of type {type.value}') except Exception as e: log.error(f'Error occurred during share verifying of {type.value}: {e}') + AdminNotificationService().notify_admins_with_error_log( + process_error=f'Error occurred during verification of share with uri: {share_data.share.shareUri} for processor type: {type.value} due to an unknown exception', + error_logs=[str(e)], process_name='Sharing Service') + if False in share_object_item_health_status: + log.info(f'Sending notifications since share object item(s) for share: {share_data.share.shareUri} are in unhealthy state after verifying shares') + ShareNotificationService(session=session, dataset=share_data.dataset, share=share_data.share).notify_share_object_items_unhealthy() return True @classmethod @@ -295,6 +316,7 @@ def reapply_share(cls, engine: Engine, share_uri: str) -> bool: False if any re-apply of share item(s) failed """ reapply_successful = True + code_exception_list = [] with engine.scoped_session() as session: share_data, share_items = cls._get_share_data_and_items( session, share_uri, None, ShareItemHealthStatus.PendingReApply.value @@ -334,11 +356,23 @@ def reapply_share(cls, engine: Engine, share_uri: str) -> bool: log.info(f'Reapplying {type.value} succeeded = {success}') if not success: reapply_successful = False + if success: + log.info(f'Sending notifications to the share owner to inform that the share with uri: {share_data.share.shareUri} is now in healthy state') + ShareNotificationService(session=session, dataset=share_data.dataset, + share=share_data.share).notify_share_object_items_healthy() else: log.info(f'There are no items to reapply of type {type.value}') except Exception as e: log.error(f'Error occurred during share reapplying of {type.value}: {e}') - + AdminNotificationService().notify_admins_with_error_log( + process_error=f'Error occurred during reapplying of share with uri: {share_data.share.shareUri} for processor type: {type.value} due to an unknown exception', + error_logs=[str(e)], process_name='Sharing Service') + + if not reapply_successful: + log.info( + f'Sending notifications since share object item(s) for share: {share_data.share.shareUri} are in unhealthy state after reapplying shares') + ShareNotificationService(session=session, dataset=share_data.dataset, + share=share_data.share).notify_share_object_items_unhealthy() return reapply_successful except ResourceLockTimeout as e: @@ -349,11 +383,19 @@ def reapply_share(cls, engine: Engine, share_uri: str) -> bool: new_status=ShareItemHealthStatus.Unhealthy.value, message=str(e), ) + code_exception_list.append(str(e)) except Exception as e: log.exception('Error occurred during share approval') + code_exception_list.append(str(e)) return False + finally: + if len(code_exception_list) > 0: + AdminNotificationService().notify_admins_with_error_log( + process_error=f'Error occurred during reapplying of share with uri: {share_data.share.shareUri}', + error_logs=[str(e)], process_name='Sharing Service') + @classmethod def cleanup_share( cls, diff --git a/backend/dataall/modules/shares_base/tasks/persistent_email_reminders_task.py b/backend/dataall/modules/shares_base/tasks/persistent_email_reminders_task.py index 42635f6f9..54d2574b8 100644 --- a/backend/dataall/modules/shares_base/tasks/persistent_email_reminders_task.py +++ b/backend/dataall/modules/shares_base/tasks/persistent_email_reminders_task.py @@ -2,6 +2,7 @@ import os import sys from dataall.base.loader import load_modules, ImportMode +from dataall.modules.notifications.services.admin_notifications import AdminNotificationService from dataall.modules.shares_base.db.share_object_models import ShareObject from dataall.base.db import get_engine from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository @@ -17,20 +18,28 @@ def persistent_email_reminders(engine): A method used by the scheduled ECS Task to run persistent_email_reminder() process against ALL active share objects within data.all and send emails to all pending shares. """ - with engine.scoped_session() as session: - log.info('Running Persistent Email Reminders Task') - pending_shares = ShareObjectRepository.fetch_submitted_shares_with_notifications(session=session) - log.info(f'Found {len(pending_shares)} pending shares') - pending_share: ShareObject - for pending_share in pending_shares: - log.info(f'Sending Email Reminder for Share: {pending_share.shareUri}') - share = ShareObjectRepository.get_share_by_uri(session, pending_share.shareUri) - dataset = DatasetBaseRepository.get_dataset_by_uri(session, share.datasetUri) - ShareNotificationService(session=session, dataset=dataset, share=share).notify_persistent_email_reminder( - email_id=share.owner - ) - log.info(f'Email reminder sent for share {share.shareUri}') - log.info('Completed Persistent Email Reminders Task') + try: + with engine.scoped_session() as session: + log.info('Running Persistent Email Reminders Task') + pending_shares = ShareObjectRepository.fetch_submitted_shares_with_notifications(session=session) + log.info(f'Found {len(pending_shares)} pending shares') + pending_share: ShareObject + for pending_share in pending_shares: + log.info(f'Sending Email Reminder for Share: {pending_share.shareUri}') + share = ShareObjectRepository.get_share_by_uri(session, pending_share.shareUri) + dataset = DatasetBaseRepository.get_dataset_by_uri(session, share.datasetUri) + ShareNotificationService(session=session, dataset=dataset, share=share).notify_persistent_email_reminder( + email_id=share.owner + ) + log.info(f'Email reminder sent for share {share.shareUri}') + log.info('Completed Persistent Email Reminders Task') + except Exception as e: + log.error(f'Error while running persistent email reminder task: {e}') + AdminNotificationService().notify_admins_with_error_log( + process_name='Persistent Email Service', + error_logs=[str(e)], + process_error='Error while running persistent email reminder task' + ) if __name__ == '__main__': diff --git a/backend/dataall/modules/shares_base/tasks/share_expiration_task.py b/backend/dataall/modules/shares_base/tasks/share_expiration_task.py index 51328e55a..b2ddde75d 100644 --- a/backend/dataall/modules/shares_base/tasks/share_expiration_task.py +++ b/backend/dataall/modules/shares_base/tasks/share_expiration_task.py @@ -4,6 +4,7 @@ from datetime import datetime from dataall.base.loader import load_modules, ImportMode from dataall.base.db import get_engine +from dataall.modules.notifications.services.admin_notifications import AdminNotificationService from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository from dataall.modules.shares_base.db.share_object_state_machines import ShareObjectSM, ShareItemSM from dataall.modules.shares_base.db.share_state_machines_repositories import ShareStatusRepository @@ -19,56 +20,69 @@ def share_expiration_checker(engine): """ Checks all the share objects which have expiryDate on them and then revokes or notifies users based on if its expired or not """ - with engine.scoped_session() as session: - log.info('Starting share expiration task') - shares = ShareObjectRepository.get_all_active_shares_with_expiration(session) - log.info(f'Fetched {len(shares)} active shares with expiration') - for share in shares: - try: - if share.expiryDate.date() < datetime.today().date(): - log.info(f'Revoking share with uri: {share.shareUri} as it is expired') - # Put all share items in revoke state and then revoke - share_items_to_revoke = ShareObjectRepository.get_all_share_items_in_share( - session, share.shareUri, ['Share_Succeeded'] - ) - item_uris = [share_item.shareItemUri for share_item in share_items_to_revoke] - revoked_items_states = ShareStatusRepository.get_share_items_states( - session, share.shareUri, item_uris - ) + task_exceptions = [] + try: + with engine.scoped_session() as session: + log.info('Starting share expiration task') + shares = ShareObjectRepository.get_all_active_shares_with_expiration(session) + log.info(f'Fetched {len(shares)} active shares with expiration') + for share in shares: + try: + if share.expiryDate.date() < datetime.today().date(): + log.info(f'Revoking share with uri: {share.shareUri} as it is expired') + # Put all share items in revoke state and then revoke + share_items_to_revoke = ShareObjectRepository.get_all_share_items_in_share( + session, share.shareUri, ['Share_Succeeded'] + ) + item_uris = [share_item.shareItemUri for share_item in share_items_to_revoke] + revoked_items_states = ShareStatusRepository.get_share_items_states( + session, share.shareUri, item_uris + ) - share_sm = ShareObjectSM(share.status) - new_share_state = share_sm.run_transition(ShareObjectActions.RevokeItems.value) + share_sm = ShareObjectSM(share.status) + new_share_state = share_sm.run_transition(ShareObjectActions.RevokeItems.value) - for item_state in revoked_items_states: - item_sm = ShareItemSM(item_state) - new_state = item_sm.run_transition(ShareObjectActions.RevokeItems.value) - for item in share_items_to_revoke: - if item.status == item_state: - item_sm.update_state_single_item(session, item, new_state) + for item_state in revoked_items_states: + item_sm = ShareItemSM(item_state) + new_state = item_sm.run_transition(ShareObjectActions.RevokeItems.value) + for item in share_items_to_revoke: + if item.status == item_state: + item_sm.update_state_single_item(session, item, new_state) - share_sm.update_state(session, share, new_share_state) - SharingService.revoke_share(engine=engine, share_uri=share.shareUri) - else: - log.info(f'Share with share uri: {share.shareUri} has not yet expired') - dataset = DatasetBaseRepository.get_dataset_by_uri(session, share.datasetUri) - if share.submittedForExtension: - log.info( - f'Sending notifications to the owners: {dataset.SamlAdminGroupName}, {dataset.stewards} as share extension requested for share with uri: {share.shareUri}' - ) - ShareNotificationService( - session=session, dataset=dataset, share=share - ).notify_share_expiration_to_owners() + share_sm.update_state(session, share, new_share_state) + SharingService.revoke_share(engine=engine, share_uri=share.shareUri) else: - log.info( - f'Sending notifications to the requesters with group: {share.groupUri} as share extension is not requested for share with uri: {share.shareUri}' - ) - ShareNotificationService( - session=session, dataset=dataset, share=share - ).notify_share_expiration_to_requesters() - except Exception as e: - log.error( - f'Error occured while processing share expiration processing for share with URI: {share.shareUri} due to: {e}' - ) + log.info(f'Share with share uri: {share.shareUri} has not yet expired') + dataset = DatasetBaseRepository.get_dataset_by_uri(session, share.datasetUri) + if share.submittedForExtension: + log.info( + f'Sending notifications to the owners: {dataset.SamlAdminGroupName}, {dataset.stewards} as share extension requested for share with uri: {share.shareUri}' + ) + ShareNotificationService( + session=session, dataset=dataset, share=share + ).notify_share_expiration_to_owners() + else: + log.info( + f'Sending notifications to the requesters with group: {share.groupUri} as share extension is not requested for share with uri: {share.shareUri}' + ) + ShareNotificationService( + session=session, dataset=dataset, share=share + ).notify_share_expiration_to_requesters() + except Exception as e: + log.error( + f'Error occurred while processing share expiration processing for share with URI: {share.shareUri} due to: {e}' + ) + task_exceptions.append(f'Error occurred while processing share expiration processing for share with URI: {share.shareUri} due to: {e}') + except Exception as e: + log.error(f'Error occurred while processing share expiration due to : {e}') + task_exceptions.append(f'Error occurred while processing share expiration due to: {e}') + finally: + if len(task_exceptions) > 0: + AdminNotificationService().notify_admins_with_error_log( + process_error='Error occurred while processing share expiration task', + error_logs=task_exceptions, + process_name='Share Expiration Task' + ) if __name__ == '__main__': diff --git a/backend/dataall/modules/shares_base/tasks/share_manager_task.py b/backend/dataall/modules/shares_base/tasks/share_manager_task.py index d120251c9..585b76397 100644 --- a/backend/dataall/modules/shares_base/tasks/share_manager_task.py +++ b/backend/dataall/modules/shares_base/tasks/share_manager_task.py @@ -2,6 +2,7 @@ import os import sys +from dataall.modules.notifications.services.admin_notifications import AdminNotificationService from dataall.modules.shares_base.services.sharing_service import SharingService from dataall.base.db import get_engine from dataall.base.loader import load_modules, ImportMode @@ -25,4 +26,9 @@ except Exception as e: log.error(f'Sharing task failed due to: {e}') + AdminNotificationService().notify_admins_with_error_log( + process_error='Error occurred while running Sharing task', + error_logs=[str(e)], + process_name='Sharing Service' + ) raise e diff --git a/backend/dataall/modules/shares_base/tasks/share_reapplier_task.py b/backend/dataall/modules/shares_base/tasks/share_reapplier_task.py index 225f069bd..3c337e04d 100644 --- a/backend/dataall/modules/shares_base/tasks/share_reapplier_task.py +++ b/backend/dataall/modules/shares_base/tasks/share_reapplier_task.py @@ -1,7 +1,9 @@ import logging import os import sys +from typing import List +from dataall.modules.notifications.services.admin_notifications import AdminNotificationService from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository from dataall.modules.shares_base.db.share_object_models import ShareObject from dataall.modules.shares_base.db.share_state_machines_repositories import ShareStatusRepository @@ -17,47 +19,71 @@ class EcsBulkShareRepplyService: @classmethod def process_reapply_shares_for_dataset(cls, engine, dataset_uri): - with engine.scoped_session() as session: - processed_share_objects = [] - share_objects_for_dataset = ShareObjectRepository.list_active_share_object_for_dataset( - session=session, dataset_uri=dataset_uri - ) - log.info(f'Found {len(share_objects_for_dataset)} active share objects on dataset with uri: {dataset_uri}') - share_object: ShareObject - for share_object in share_objects_for_dataset: - log.info( - f'Re-applying Share Items for Share Object (Share URI: {share_object.shareUri} ) with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}' - ) - processed_share_objects.append(share_object.shareUri) - ShareStatusRepository.update_share_item_health_status_batch( - session=session, - share_uri=share_object.shareUri, - old_status=ShareItemHealthStatus.Unhealthy.value, - new_status=ShareItemHealthStatus.PendingReApply.value, + task_exceptions = [] + try: + with engine.scoped_session() as session: + share_objects_for_dataset = ShareObjectRepository.list_active_share_object_for_dataset( + session=session, dataset_uri=dataset_uri ) - SharingService.reapply_share(engine, share_uri=share_object.shareUri) + log.info(f'Found {len(share_objects_for_dataset)} active share objects on dataset with uri: {dataset_uri}') + processed_share_objects, task_exceptions = cls._reapply_share_objects(engine=engine, session=session, share_objects=share_objects_for_dataset) return processed_share_objects + except Exception as e: + log.error(f'Error occurred while reapplying share task due to: {e}') + task_exceptions.append(f'Error occurred while reapplying share task due to: {e}') + finally: + if len(task_exceptions) > 0: + AdminNotificationService().notify_admins_with_error_log( + process_error='Error occurred while processing share during reapplying task', + error_logs=task_exceptions, + process_name='Share Reapplier Task' + ) @classmethod - def process_reapply_shares(cls, engine): - with engine.scoped_session() as session: - processed_share_objects = [] - all_share_objects: [ShareObject] = ShareObjectRepository.list_all_active_share_objects(session) - log.info(f'Found {len(all_share_objects)} share objects ') - share_object: ShareObject - for share_object in all_share_objects: - log.info( - f'Re-applying Share Items for Share Object with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}' - ) - processed_share_objects.append(share_object.shareUri) - ShareStatusRepository.update_share_item_health_status_batch( - session=session, - share_uri=share_object.shareUri, - old_status=ShareItemHealthStatus.Unhealthy.value, - new_status=ShareItemHealthStatus.PendingReApply.value, - ) + def _reapply_share_objects(cls, engine, session, share_objects: List[ShareObject]): + share_object: ShareObject + processed_share_objects = [] + task_exceptions = [] + for share_object in share_objects: + log.info( + f'Re-applying Share Items for Share Object, Share URI: {share_object.shareUri} ) with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}' + ) + processed_share_objects.append(share_object.shareUri) + ShareStatusRepository.update_share_item_health_status_batch( + session=session, + share_uri=share_object.shareUri, + old_status=ShareItemHealthStatus.Unhealthy.value, + new_status=ShareItemHealthStatus.PendingReApply.value, + ) + try: SharingService.reapply_share(engine, share_uri=share_object.shareUri) + except Exception as e: + log.error( + f'Error occurred while reapplying share for share with uri:{share_object.shareUri} due to: {e}') + task_exceptions.append( + f'Error occurred while reapplying share for share with uri:{share_object.shareUri} due to: {e}') + return (processed_share_objects, task_exceptions) + @classmethod + def process_reapply_shares(cls, engine): + task_exceptions = [] + try: + with engine.scoped_session() as session: + all_share_objects: [ShareObject] = ShareObjectRepository.list_all_active_share_objects(session) + log.info(f'Found {len(all_share_objects)} share objects ') + share_object: ShareObject + processed_share_objects, task_exceptions = cls._reapply_share_objects(engine=engine, session=session, + share_objects=all_share_objects) return processed_share_objects + except Exception as e: + log.error(f'Error occurred while reapplying share task due to: {e}') + task_exceptions.append(f'Error occurred while reapplying share task due to: {e}') + finally: + if len(task_exceptions) > 0: + AdminNotificationService().notify_admins_with_error_log( + process_error='Error occurred while processing share during reapplying task', + error_logs=task_exceptions, + process_name='Share Reapplier Task' + ) def reapply_shares(engine, dataset_uri): diff --git a/backend/dataall/modules/shares_base/tasks/share_verifier_task.py b/backend/dataall/modules/shares_base/tasks/share_verifier_task.py index a731a5756..1114844d1 100644 --- a/backend/dataall/modules/shares_base/tasks/share_verifier_task.py +++ b/backend/dataall/modules/shares_base/tasks/share_verifier_task.py @@ -1,6 +1,8 @@ import logging import os import sys + +from dataall.modules.notifications.services.admin_notifications import AdminNotificationService from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository from dataall.modules.shares_base.db.share_object_models import ShareObject from dataall.modules.shares_base.services.shares_enums import ShareItemStatus @@ -19,20 +21,36 @@ def verify_shares(engine): A method used by the scheduled ECS Task to run verify_shares() process against ALL shared items in ALL active share objects within data.all and update the health status of those shared items. """ - with engine.scoped_session() as session: - processed_share_objects = [] - all_share_objects: [ShareObject] = ShareObjectRepository.list_all_active_share_objects(session) - log.info(f'Found {len(all_share_objects)} share objects verify ') - share_object: ShareObject - for share_object in all_share_objects: - log.info( - f'Verifying Share Items for Share Object with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}' - ) - processed_share_objects.append(share_object.shareUri) - SharingService.verify_share( - engine, share_uri=share_object.shareUri, status=ShareItemStatus.Share_Succeeded.value, healthStatus=None + task_exceptions = [] + try: + with engine.scoped_session() as session: + processed_share_objects = [] + all_share_objects: [ShareObject] = ShareObjectRepository.list_all_active_share_objects(session) + log.info(f'Found {len(all_share_objects)} share objects verify ') + share_object: ShareObject + for share_object in all_share_objects: + log.info( + f'Verifying Share Items for Share Object with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}' + ) + processed_share_objects.append(share_object.shareUri) + try: + SharingService.verify_share( + engine, share_uri=share_object.shareUri, status=ShareItemStatus.Share_Succeeded.value, healthStatus=None + ) + except Exception as e: + log.error(f'Error occurred while verifying share with uri: {share_object.shareUri} due to: {e}') + task_exceptions.append(f'Error occurred while verifying share with uri: {share_object.shareUri} due to: {e}') + return processed_share_objects + except Exception as e: + log.error(f'Error occurred while verifying shares task due to: {e}') + task_exceptions.append(f'Error occurred while verifying shares task due to: {e}') + finally: + if len(task_exceptions) > 0: + AdminNotificationService().notify_admins_with_error_log( + process_error='Error occurred while verifying shares task', + error_logs=task_exceptions, + process_name='Share Verifier' ) - return processed_share_objects def trigger_reapply_task(): diff --git a/deploy/stacks/container.py b/deploy/stacks/container.py index a65a609e6..e555e8c7c 100644 --- a/deploy/stacks/container.py +++ b/deploy/stacks/container.py @@ -161,7 +161,7 @@ def __init__( command=['python3.9', '-m', 'dataall.core.environment.tasks.env_stacks_updater'], container_id='container', ecr_repository=ecr_repository, - environment=self._create_env(), + environment=self.env_vars, image_tag=self._cdkproxy_image_tag, log_group=self.create_log_group(envname, resource_prefix, log_group_name='stacks-updater'), schedule_expression=Schedule.expression('cron(0 1 * * ? *)'), @@ -217,7 +217,7 @@ def add_catalog_indexer_task(self): command=['python3.9', '-m', 'dataall.modules.catalog.tasks.catalog_indexer_task'], container_id=container_id, ecr_repository=self._ecr_repository, - environment=self._create_env(), + environment=self.env_vars, image_tag=self._cdkproxy_image_tag, log_group=self.create_log_group(self._envname, self._resource_prefix, log_group_name='catalog-indexer'), schedule_expression=Schedule.expression('rate(6 hours)'), @@ -261,7 +261,7 @@ def add_share_management_task(self): f'ShareManagementTaskContainer{self._envname}', container_name='container', image=ecs.ContainerImage.from_ecr_repository(repository=self._ecr_repository, tag=self._cdkproxy_image_tag), - environment=self._create_env(), + environment=self.env_vars, command=['python3.9', '-m', 'dataall.modules.shares_base.tasks.share_manager_task'], logging=ecs.LogDriver.aws_logs( stream_prefix='task', @@ -292,7 +292,7 @@ def add_share_verifier_task(self): command=['python3.9', '-m', 'dataall.modules.shares_base.tasks.share_verifier_task'], container_id='container', ecr_repository=self._ecr_repository, - environment=self._create_env(), + environment=self.env_vars, image_tag=self._cdkproxy_image_tag, log_group=self.create_log_group(self._envname, self._resource_prefix, log_group_name='share-verifier'), schedule_expression=Schedule.expression('rate(7 days)'), @@ -321,7 +321,7 @@ def add_share_reapplier_task(self): f'ShareReapplierTaskContainer{self._envname}', container_name='container', image=ecs.ContainerImage.from_ecr_repository(repository=self._ecr_repository, tag=self._cdkproxy_image_tag), - environment=self._create_env(), + environment=self.env_vars, command=['python3.9', '-m', 'dataall.modules.shares_base.tasks.share_reapplier_task'], logging=ecs.LogDriver.aws_logs( stream_prefix='task', @@ -383,7 +383,7 @@ def add_subscription_task(self): ], container_id='container', ecr_repository=self._ecr_repository, - environment=self._create_env(), + environment=self.env_vars, image_tag=self._cdkproxy_image_tag, log_group=self.create_log_group(self._envname, self._resource_prefix, log_group_name='subscriptions'), schedule_expression=Schedule.expression('rate(15 minutes)'), @@ -403,7 +403,7 @@ def add_sync_dataset_table_task(self): command=['python3.9', '-m', 'dataall.modules.s3_datasets.tasks.tables_syncer'], container_id='container', ecr_repository=self._ecr_repository, - environment=self._create_env(), + environment=self.env_vars, image_tag=self._cdkproxy_image_tag, log_group=self.create_log_group(self._envname, self._resource_prefix, log_group_name='tables-syncer'), schedule_expression=Schedule.expression('rate(15 minutes)'), @@ -423,7 +423,7 @@ def add_omics_fetch_workflows_task(self): command=['python3.9', '-m', 'dataall.modules.omics.tasks.omics_workflows_fetcher'], container_id='container', ecr_repository=self._ecr_repository, - environment=self._create_env(), + environment=self.env_vars, image_tag=self._cdkproxy_image_tag, log_group=self.create_log_group( self._envname, self._resource_prefix, log_group_name='omics-workflows-fetcher' diff --git a/tests/modules/redshift_datasets_shares/test_redshift_table_processor.py b/tests/modules/redshift_datasets_shares/test_redshift_table_processor.py index d49009ec8..96dac2f96 100644 --- a/tests/modules/redshift_datasets_shares/test_redshift_table_processor.py +++ b/tests/modules/redshift_datasets_shares/test_redshift_table_processor.py @@ -226,7 +226,7 @@ def test_verify_redshift_cross_account_share_all_successful( mock_redshift_shares, ): # When - response = redshift_processor_cross_account.verify_shares() + response = redshift_processor_cross_account.verify_shares_health_status() # Then assert_that(response).is_true() mock_redshift_data_shares.return_value.check_datashare_exists.assert_called_with( @@ -276,7 +276,7 @@ def test_verify_redshift_share_datashare_does_not_exist( # Given mock_redshift_data_shares.return_value.check_datashare_exists.return_value = False # When - response = redshift_processor_cross_account.verify_shares() + response = redshift_processor_cross_account.verify_shares_health_status() # Then with db.scoped_session() as session: item = ShareObjectRepository.get_share_item_by_uri(session, redshift_requested_table.shareItemUri) @@ -290,7 +290,7 @@ def test_verify_redshift_schema_not_added_to_datashare( # Given mock_redshift_data_shares.return_value.check_schema_in_datashare.return_value = False # When - response = redshift_processor_cross_account.verify_shares() + response = redshift_processor_cross_account.verify_shares_health_status() # Then with db.scoped_session() as session: item = ShareObjectRepository.get_share_item_by_uri(session, redshift_requested_table.shareItemUri) @@ -304,7 +304,7 @@ def test_verify_consumer_permissions_to_datashare_wrong_status_cross_account( # Given mock_redshift_shares.return_value.get_datashare_status.return_value = 'UNAUTHORIZED' # When - response = redshift_processor_cross_account.verify_shares() + response = redshift_processor_cross_account.verify_shares_health_status() with db.scoped_session() as session: item = ShareObjectRepository.get_share_item_by_uri(session, redshift_requested_table.shareItemUri) assert_that(item.healthStatus).is_equal_to(ShareItemHealthStatus.Unhealthy.value) @@ -317,7 +317,7 @@ def test_verify_consumer_permissions_to_datashare_same_account( # Given mock_redshift_data_shares.return_value.check_consumer_permissions_to_datashare.return_value = False # When - response = redshift_processor_same_account.verify_shares() + response = redshift_processor_same_account.verify_shares_health_status() with db.scoped_session() as session: item = ShareObjectRepository.get_share_item_by_uri(session, redshift_requested_table_2.shareItemUri) assert_that(item.healthStatus).is_equal_to(ShareItemHealthStatus.Unhealthy.value) @@ -331,7 +331,7 @@ def test_verify_redshift_share_database_does_not_exist( # Given mock_redshift_data_shares.return_value.check_database_exists.return_value = False # When - response = redshift_processor_cross_account.verify_shares() + response = redshift_processor_cross_account.verify_shares_health_status() # Then with db.scoped_session() as session: item = ShareObjectRepository.get_share_item_by_uri(session, redshift_requested_table.shareItemUri) @@ -345,7 +345,7 @@ def test_verify_role_permissions_to_database( # Given mock_redshift_data_shares.return_value.check_role_permissions_in_database.return_value = False # When - response = redshift_processor_cross_account.verify_shares() + response = redshift_processor_cross_account.verify_shares_health_status() # Then with db.scoped_session() as session: item = ShareObjectRepository.get_share_item_by_uri(session, redshift_requested_table.shareItemUri) @@ -361,7 +361,7 @@ def test_verify_external_schema_exists( # Given mock_redshift_data_shares.return_value.check_schema_exists.return_value = False # When - response = redshift_processor_cross_account.verify_shares() + response = redshift_processor_cross_account.verify_shares_health_status() # Then with db.scoped_session() as session: item = ShareObjectRepository.get_share_item_by_uri(session, redshift_requested_table.shareItemUri) @@ -375,7 +375,7 @@ def test_verify_role_permissions_to_schema( # Given mock_redshift_data_shares.return_value.check_role_permissions_in_schema.return_value = False # When - response = redshift_processor_cross_account.verify_shares() + response = redshift_processor_cross_account.verify_shares_health_status() # Then with db.scoped_session() as session: item = ShareObjectRepository.get_share_item_by_uri(session, redshift_requested_table.shareItemUri) @@ -389,7 +389,7 @@ def test_verify_table_not_added_to_datashare( # Given mock_redshift_data_shares.return_value.check_table_in_datashare.return_value = False # When - response = redshift_processor_cross_account.verify_shares() + response = redshift_processor_cross_account.verify_shares_health_status() # Then with db.scoped_session() as session: item = ShareObjectRepository.get_share_item_by_uri(session, redshift_requested_table.shareItemUri)