Skip to content

Commit

Permalink
Notification Improvements Stage 1, 70%
Browse files Browse the repository at this point in the history
  • Loading branch information
TejasRGitHub authored and trajopadhye committed Dec 26, 2024
1 parent 959e2a0 commit 8750f63
Show file tree
Hide file tree
Showing 19 changed files with 518 additions and 259 deletions.
6 changes: 6 additions & 0 deletions backend/dataall/modules/catalog/tasks/catalog_indexer_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dataall.base.db import get_engine
from dataall.base.loader import load_modules, ImportMode
from dataall.base.utils.alarm_service import AlarmService
from dataall.modules.notifications.services.admin_notifications import AdminNotificationService

log = logging.getLogger(__name__)

Expand All @@ -32,6 +33,11 @@ def index_objects(cls, engine, with_deletes='False'):
return len(indexed_object_uris)
except Exception as e:
AlarmService().trigger_catalog_indexing_failure_alarm(error=str(e))
AdminNotificationService().notify_admins_with_error_log(
process_error='Exception occurred during cataloging task',
error_logs=[str(e)],
process_name='Catalog Task'
)
raise e

@classmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import List

from dataall.modules.notifications.services.ses_email_notification_service import SESEmailNotificationService


class AdminNotificationService:
admin_group = 'DAAdministrators'

def notify_admins_with_error_log(self, process_error: str, error_logs: List[str], process_name:str = ''):

subject = f'Data.all alert | Attention Required | Failure in : {process_name}'
email_message = f"""
Following error occurred when , {process_error} <br><br>
"""
for error_log in error_logs:
email_message += error_log + "<br><br>"

SESEmailNotificationService.create_and_send_email_notifications(
subject=subject,
msg=email_message,
recipient_groups_list=[AdminNotificationService.admin_group]
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from dataall.base.aws.cognito import Cognito
from dataall.base.aws.ses import Ses
from dataall.base.config import config
from dataall.base.services.service_provider_factory import ServiceProviderFactory
from dataall.modules.notifications.services.base_email_notification_service import BaseEmailNotificationService

Expand Down Expand Up @@ -60,3 +61,31 @@ def send_email_to_users(email_list, email_provider, message, subject):
# https://aws.amazon.com/blogs/messaging-and-targeting/how-to-send-messages-to-multiple-recipients-with-amazon-simple-email-service-ses/
for emailId in email_list:
email_provider.send_email([emailId], message, subject)

@staticmethod
def create_and_send_email_notifications(subject, msg, recipient_groups_list=None, recipient_email_ids=None):
"""
Method to directly send email notification instead of creating an SQS Task
This approach is used while sending email notifications in an ECS task ( e.g. persistent email reminder task, share expiration task, etc )
Emails send to groups mentioned in recipient_groups_list and / or emails mentioned in recipient_email_ids
"""
if recipient_groups_list is None:
recipient_groups_list = []
if recipient_email_ids is None:
recipient_email_ids = []

share_notification_config = config.get_property(
'modules.datasets_base.features.share_notifications', default=None
)
if share_notification_config:
for share_notification_config_type in share_notification_config.keys():
n_config = share_notification_config[share_notification_config_type]
if n_config.get('active', False) == True:
if share_notification_config_type == 'email':
SESEmailNotificationService.send_email_task(
subject, msg, recipient_groups_list, recipient_email_ids
)
else:
log.info(f'Notification type : {share_notification_config_type} is not active')
else:
log.info('Notifications are not active')
92 changes: 50 additions & 42 deletions backend/dataall/modules/omics/tasks/omics_workflows_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from dataall.core.environment.db.environment_models import Environment
from dataall.base.db import get_engine
from dataall.modules.notifications.services.admin_notifications import AdminNotificationService
from dataall.modules.omics.aws.omics_client import OmicsClient
from dataall.modules.omics.db.omics_models import OmicsWorkflow
from dataall.modules.omics.api.enums import OmicsWorkflowType
Expand All @@ -16,49 +17,56 @@

def fetch_omics_workflows(engine):
"""List Omics workflows."""
log.info('Starting omics workflows fetcher')
with engine.scoped_session() as session:
environments = OmicsRepository(session).list_environments_with_omics_enabled()
# designed for ready2run and private workflows; when private workflow support is
# introduced, we will need go over all environments
if len(environments) == 0:
log.info('No environments found. Nothing to do.')
return True
env = environments[0]
ready_workflows = OmicsClient(awsAccountId=env.AwsAccountId, region=env.region).list_workflows(
type=OmicsWorkflowType.READY2RUN.value
)
# Removing private workflows until fully supported after initial launch
# private_workflows = OmicsClient.list_workflows(awsAccountId=env.AwsAccountId, region=env.region, type=OmicsWorkflowType.PRIVATE.value)
workflows = ready_workflows # + private_workflows
log.info(f'Found workflows {str(workflows)} in environment {env.environmentUri}')
for workflow in workflows:
log.info(f"Processing workflow name={workflow['name']}, id={workflow['id']}...")
existing_workflow = OmicsRepository(session).get_workflow_by_id(workflow['id'])
if existing_workflow is not None:
log.info(
f"Workflow name={workflow['name']}, id={workflow['id']} has already been registered in database. Updating information..."
)
existing_workflow.name = workflow['name']
existing_workflow.label = workflow['name']
session.commit()

else:
log.info(
f"Workflow name={workflow['name']} , id={workflow['id']} in environment {env.environmentUri} is new. Registering..."
)
omicsWorkflow = OmicsWorkflow(
id=workflow['id'],
name=workflow['name'],
arn=workflow['arn'],
type=workflow['type'],
environmentUri=env.environmentUri,
label=workflow['name'],
owner=env.environmentUri,
)
OmicsRepository(session).save_omics_workflow(omicsWorkflow)
return True
try:
log.info('Starting omics workflows fetcher')
with engine.scoped_session() as session:
environments = OmicsRepository(session).list_environments_with_omics_enabled()
# designed for ready2run and private workflows; when private workflow support is
# introduced, we will need go over all environments
if len(environments) == 0:
log.info('No environments found. Nothing to do.')
return True
env = environments[0]
ready_workflows = OmicsClient(awsAccountId=env.AwsAccountId, region=env.region).list_workflows(
type=OmicsWorkflowType.READY2RUN.value
)
# Removing private workflows until fully supported after initial launch
# private_workflows = OmicsClient.list_workflows(awsAccountId=env.AwsAccountId, region=env.region, type=OmicsWorkflowType.PRIVATE.value)
workflows = ready_workflows # + private_workflows
log.info(f'Found workflows {str(workflows)} in environment {env.environmentUri}')
for workflow in workflows:
log.info(f"Processing workflow name={workflow['name']}, id={workflow['id']}...")
existing_workflow = OmicsRepository(session).get_workflow_by_id(workflow['id'])
if existing_workflow is not None:
log.info(
f"Workflow name={workflow['name']}, id={workflow['id']} has already been registered in database. Updating information..."
)
existing_workflow.name = workflow['name']
existing_workflow.label = workflow['name']
session.commit()

else:
log.info(
f"Workflow name={workflow['name']} , id={workflow['id']} in environment {env.environmentUri} is new. Registering..."
)
omicsWorkflow = OmicsWorkflow(
id=workflow['id'],
name=workflow['name'],
arn=workflow['arn'],
type=workflow['type'],
environmentUri=env.environmentUri,
label=workflow['name'],
owner=env.environmentUri,
)
OmicsRepository(session).save_omics_workflow(omicsWorkflow)
return True
except Exception as e:
log.error(f'Error occured while processing omics workflow task due to: {e}')
AdminNotificationService().notify_admins_with_error_log(
process_error='Error occured while processing omics workflow task',
error_logs=[str(e)],
process_name='Omics Workflow'
)

if __name__ == '__main__':
ENVNAME = os.environ.get('envname', 'local')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ def process_revoked_shares(self) -> bool:
return False
return success

def verify_shares(self) -> bool:
def verify_shares_health_status(self) -> bool:
"""
1) (in source namespace) Check the datashare exists
2) (in source namespace) Check that schema is added to datashare
Expand All @@ -459,7 +459,7 @@ def verify_shares(self) -> bool:
9) (in target namespace) Check that the redshift role has select access to the requested table in the local db.
10) (in target namespace) Check that the redshift role has select access to the requested table in the external schema.
"""

share_object_item_health_status = True
log.info('##### Verifying Redshift tables #######')
if not self.tables:
log.info('No tables to verify. Skipping...')
Expand Down Expand Up @@ -599,11 +599,12 @@ def verify_shares(self) -> bool:
' | '.join(ds_level_errors) + ' | ' + ' | '.join(tbl_level_errors),
datetime.now(),
)
share_object_item_health_status = False
else:
ShareStatusRepository.update_share_item_health_status(
self.session, share_item, ShareItemHealthStatus.Healthy.value, None, datetime.now()
)
return True
return share_object_item_health_status

def cleanup_shares(self) -> bool:
"""
Expand Down
106 changes: 61 additions & 45 deletions backend/dataall/modules/s3_datasets/tasks/tables_syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dataall.core.environment.db.environment_models import Environment, EnvironmentGroup
from dataall.core.environment.services.environment_service import EnvironmentService
from dataall.base.db import get_engine
from dataall.modules.notifications.services.admin_notifications import AdminNotificationService
from dataall.modules.s3_datasets.aws.glue_dataset_client import DatasetCrawler
from dataall.modules.s3_datasets.aws.lf_table_client import LakeFormationTableClient
from dataall.modules.s3_datasets.services.dataset_table_service import DatasetTableService
Expand All @@ -20,60 +21,75 @@


def sync_tables(engine):
with engine.scoped_session() as session:
processed_tables = []
all_datasets: [S3Dataset] = DatasetRepository.list_all_active_datasets(session)
log.info(f'Found {len(all_datasets)} datasets for tables sync')
dataset: S3Dataset
for dataset in all_datasets:
log.info(f'Synchronizing dataset {dataset.name}|{dataset.datasetUri} tables')
env: Environment = (
session.query(Environment)
.filter(
and_(
Environment.environmentUri == dataset.environmentUri,
Environment.deleted.is_(None),
task_exceptions = []
try:
with engine.scoped_session() as session:
processed_tables = []
all_datasets: [S3Dataset] = DatasetRepository.list_all_active_datasets(session)
log.info(f'Found {len(all_datasets)} datasets for tables sync')
dataset: S3Dataset
for dataset in all_datasets:
log.info(f'Synchronizing dataset {dataset.name}|{dataset.datasetUri} tables')
env: Environment = (
session.query(Environment)
.filter(
and_(
Environment.environmentUri == dataset.environmentUri,
Environment.deleted.is_(None),
)
)
.first()
)
env_group: EnvironmentGroup = EnvironmentService.get_environment_group(
session, dataset.SamlAdminGroupName, env.environmentUri
)
.first()
)
env_group: EnvironmentGroup = EnvironmentService.get_environment_group(
session, dataset.SamlAdminGroupName, env.environmentUri
)
try:
if not env or not is_assumable_pivot_role(env):
log.info(f'Dataset {dataset.GlueDatabaseName} has an invalid environment')
else:
tables = DatasetCrawler(dataset).list_glue_database_tables(dataset.S3BucketName)
try:
if not env or not is_assumable_pivot_role(env):
log.info(f'Dataset {dataset.GlueDatabaseName} has an invalid environment')
else:
tables = DatasetCrawler(dataset).list_glue_database_tables(dataset.S3BucketName)

log.info(f'Found {len(tables)} tables on Glue database {dataset.GlueDatabaseName}')
log.info(f'Found {len(tables)} tables on Glue database {dataset.GlueDatabaseName}')

DatasetTableService.sync_existing_tables(session, uri=dataset.datasetUri, glue_tables=tables)
DatasetTableService.sync_existing_tables(session, uri=dataset.datasetUri, glue_tables=tables)

tables = session.query(DatasetTable).filter(DatasetTable.datasetUri == dataset.datasetUri).all()
tables = session.query(DatasetTable).filter(DatasetTable.datasetUri == dataset.datasetUri).all()

log.info('Updating tables permissions on Lake Formation...')
log.info('Updating tables permissions on Lake Formation...')

for table in tables:
LakeFormationTableClient(table).grant_principals_all_table_permissions(
principals=[
SessionHelper.get_delegation_role_arn(env.AwsAccountId, env.region),
env_group.environmentIAMRoleArn,
],
)
for table in tables:
LakeFormationTableClient(table).grant_principals_all_table_permissions(
principals=[
SessionHelper.get_delegation_role_arn(env.AwsAccountId, env.region),
env_group.environmentIAMRoleArn,
],
)

processed_tables.extend(tables)
processed_tables.extend(tables)

DatasetTableIndexer.upsert_all(session, dataset_uri=dataset.datasetUri)
DatasetIndexer.upsert(session=session, dataset_uri=dataset.datasetUri)
except Exception as e:
log.error(
f'Failed to sync tables for dataset '
f'{dataset.AwsAccountId}/{dataset.GlueDatabaseName} '
f'due to: {e}'
)
DatasetAlarmService().trigger_dataset_sync_failure_alarm(dataset, str(e))
return processed_tables
DatasetTableIndexer.upsert_all(session, dataset_uri=dataset.datasetUri)
DatasetIndexer.upsert(session=session, dataset_uri=dataset.datasetUri)
except Exception as e:
log.error(
f'Failed to sync tables for dataset '
f'{dataset.AwsAccountId}/{dataset.GlueDatabaseName} '
f'due to: {e}'
)
DatasetAlarmService().trigger_dataset_sync_failure_alarm(dataset, str(e))
task_exceptions.append(str(e))
return processed_tables
except Exception as e:
log.error(
f'Error while running table syncer task due to: {e}'
)
task_exceptions.append(str(e))
finally:
if len(task_exceptions) > 0:
AdminNotificationService().notify_admins_with_error_log(
process_name='Table Syncer',
error_logs=task_exceptions,
process_error='Error while running table syncer task'
)


def is_assumable_pivot_role(env: Environment):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,9 @@ def process_revoked_shares(self) -> bool:
success = False
return success

def verify_shares(self) -> bool:
def verify_shares_health_status(self) -> bool:
log.info('##### Verifying tables #######')
share_object_item_health_status = True
if not self.tables:
log.info('No tables to verify. Skipping...')
else:
Expand Down Expand Up @@ -430,11 +431,12 @@ def verify_shares(self) -> bool:
' | '.join(manager.db_level_errors) + ' | ' + ' | '.join(manager.tbl_level_errors),
datetime.now(),
)
share_object_item_health_status = False
else:
ShareStatusRepository.update_share_item_health_status(
self.session, share_item, ShareItemHealthStatus.Healthy.value, None, datetime.now()
)
return True
return share_object_item_health_status

def cleanup_shares(self) -> bool:
"""
Expand Down
Loading

0 comments on commit 8750f63

Please sign in to comment.