Skip to content

Commit

Permalink
Linting update
Browse files Browse the repository at this point in the history
  • Loading branch information
TejasRGitHub authored and trajopadhye committed Jan 15, 2025
1 parent 5b8ec01 commit 17158e2
Show file tree
Hide file tree
Showing 22 changed files with 249 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def index_objects(cls, engine, with_deletes='False'):
AdminNotificationService().notify_admins_with_error_log(
process_error='Exception occurred during cataloging task',
error_logs=[error_log],
process_name='Catalog Task'
process_name='Catalog Task',
)
raise e

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,36 @@

log = logging.getLogger(__name__)


class AdminNotificationService:
"""
Send email notifications to Admin Group i.e. DAAdministrators in data.all
Args -
1. process_error - string describing in short the error / exception details
2. error_logs - List of all the exception error logs
2. error_logs - List of all the exception error logs
3. process_name - Code where the exception occurred. Example, inside an ECS task like cataloging task, etc or inside a graphql service
"""

@classmethod
def notify_admins_with_error_log(cls, process_error: str, error_logs: List[str], process_name:str = ''):
if config.get_property(
'modules.datasets_base.features.share_notifications.email.parameters.admin_notifications', default=False
) is False:
log.info("Admin notifications are switched off")
def notify_admins_with_error_log(cls, process_error: str, error_logs: List[str], process_name: str = ''):
if (
config.get_property(
'modules.datasets_base.features.share_notifications.email.parameters.admin_notifications', default=False
)
is False
):
log.info('Admin notifications are switched off')
return

subject = f'Data.all alert | Attention Required | Failure in : {process_name}'
email_message = f"""
Following error occurred - <br><br> {process_error} <br><br>
"""
for error_log in error_logs:
email_message += error_log + "<br><br>"
email_message += error_log + '<br><br>'

email_message += "Please check the logs in cloudwatch for more details"
email_message += 'Please check the logs in cloudwatch for more details'

SESEmailNotificationService.create_and_send_email_notifications(
subject=subject,
msg=email_message,
recipient_groups_list=[DataallGroups.admin]
)
subject=subject, msg=email_message, recipient_groups_list=[DataallGroups.admin]
)
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,15 @@ def send_email_task(subject, message, recipient_groups_list, recipient_email_lis
SESEmailNotificationService.send_email_to_users(email_ids_to_send_emails, email_provider, message, subject)

except Exception as e:
email_ids_to_send_emails = email_provider.get_email_ids_from_groupList(['DAAdministrators'] , identityProvider)
SESEmailNotificationService.send_email_to_users(email_ids_to_send_emails, email_provider, f'Error sending email due to: {e}', 'Data.all alert | Attention Required | Failure in: Email Notification Service')
email_ids_to_send_emails = email_provider.get_email_ids_from_groupList(
['DAAdministrators'], identityProvider
)
SESEmailNotificationService.send_email_to_users(
email_ids_to_send_emails,
email_provider,
f'Error sending email due to: {e}',
'Data.all alert | Attention Required | Failure in: Email Notification Service',
)
raise e
else:
return True
Expand Down Expand Up @@ -87,4 +94,4 @@ def create_and_send_email_notifications(subject, msg, recipient_groups_list=None
else:
log.info(f'Notification type : {share_notification_config_type} is not active')
else:
log.info('Notifications are not active')
log.info('Notifications are not active')
125 changes: 74 additions & 51 deletions backend/dataall/modules/notifications/tasks/weekly_digest_reminder.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,33 @@ def __init__(self):

def _get_pending_share_notifications(session):
pending_shares = ShareObjectRepository.get_shares_with_statuses(session=session, status_list=['Submitted'])
log.info(f"Found {len(pending_shares)} pending shares with share object status - Submitted")
log.info(f'Found {len(pending_shares)} pending shares with share object status - Submitted')
share_dataset_map: Dict[ShareObject, DatasetBase] = {
share: DatasetBaseRepository.get_dataset_by_uri(session=session, dataset_uri=share.datasetUri) for share in
pending_shares}
share: DatasetBaseRepository.get_dataset_by_uri(session=session, dataset_uri=share.datasetUri)
for share in pending_shares
}
return [
NotificationResource(
resource=share,
resource_type='Share Object',
resource_status=f'{share.status} - Pending Approval',
receivers=[share_dataset_map[share].SamlAdminGroupName, share_dataset_map[share].stewards])
receivers=[share_dataset_map[share].SamlAdminGroupName, share_dataset_map[share].stewards],
)
for share in share_dataset_map
]


def _get_unhealthy_share_notification(session):
unhealthy_share_objects: List[ShareObject] = ShareObjectRepository.get_share_object_with_health_status(
session=session, health_status_list=[ShareItemHealthStatus.Unhealthy.value])
log.info(f"Found {len(unhealthy_share_objects)} unhealthy share objects")
session=session, health_status_list=[ShareItemHealthStatus.Unhealthy.value]
)
log.info(f'Found {len(unhealthy_share_objects)} unhealthy share objects')
return [
NotificationResource(resource=share, resource_type='Share Object', resource_status='Unhealthy',
receivers=[share.groupUri]) for share in unhealthy_share_objects]
NotificationResource(
resource=share, resource_type='Share Object', resource_status='Unhealthy', receivers=[share.groupUri]
)
for share in unhealthy_share_objects
]


def _get_unhealthy_stack_by_type(session, target_uri: str, target_type: Any):
Expand All @@ -73,23 +79,25 @@ def _get_unhealthy_stack_by_type(session, target_uri: str, target_type: Any):
StackStatus.DELETE_FAILED.value,
StackStatus.UPDATE_FAILED.value,
StackStatus.UPDATE_ROLLBACK_FAILED.value,
StackStatus.ROLLBACK_FAILED.value
StackStatus.ROLLBACK_FAILED.value,
]
resource_objects = session.query(target_type).all()
unhealthy_stack_notification_resources: List[NotificationResource] = []
log.info(f"Found {len(unhealthy_stack_notification_resources)} unhealthy {target_type}")
log.info(f'Found {len(unhealthy_stack_notification_resources)} unhealthy {target_type}')

# Check if stack associated with these datasets / environment exists
# If yes, create a notification resource
for resource in resource_objects:
stack = StackRepository.find_stack_by_target_uri(session=session,
target_uri=resource.__getattribute__(target_uri),
statuses=unhealthy_stack_status)
stack = StackRepository.find_stack_by_target_uri(
session=session, target_uri=resource.__getattribute__(target_uri), statuses=unhealthy_stack_status
)
if stack is not None:
notification_resource = NotificationResource(resource=resource, resource_type=target_type.__name__,
resource_status=stack.status,
receivers=_get_receivers_for_stack(resource=resource,
target_type=target_type))
notification_resource = NotificationResource(
resource=resource,
resource_type=target_type.__name__,
resource_status=stack.status,
receivers=_get_receivers_for_stack(resource=resource, target_type=target_type),
)
unhealthy_stack_notification_resources.append(notification_resource)

return unhealthy_stack_notification_resources
Expand All @@ -101,10 +109,13 @@ def _get_receivers_for_stack(resource, target_type):
if target_type.__name__ == 'Environment':
return [resource.SamlGroupName]


"""
Function to create a map of {group name : resource bundle}, where each resource bundle contains dataset, share and environment notification lists.
Iterated over all the notification ( NotificationResources ) and then segregate based on the dataset, shares & environment notifications and map the bundle to a team.
"""


def _map_groups_to_resource_bundles(list_of_notifications: List[NotificationResource], resource_bundle_type: str):
for notification in list_of_notifications:
# Get all the receivers groups
Expand All @@ -126,48 +137,52 @@ def send_reminder_email(engine):
with engine.scoped_session() as session:
# Get all shares in submitted state
pending_share_notification_resources = _get_pending_share_notifications(session=session)
resources_type_tuple.append((pending_share_notification_resources, "share_object_notifications"))
resources_type_tuple.append((pending_share_notification_resources, 'share_object_notifications'))
# Todo : Check if distinct needed for the share object repository
# Get all shares in unhealthy state
unhealthy_share_objects_notification_resources = _get_unhealthy_share_notification(session=session)
resources_type_tuple.append((unhealthy_share_objects_notification_resources, "share_object_notifications"))
resources_type_tuple.append((unhealthy_share_objects_notification_resources, 'share_object_notifications'))
# Get all the dataset which are in unhealthy state
unhealthy_datasets_notification_resources = _get_unhealthy_stack_by_type(session=session,
target_uri='datasetUri',
target_type=DatasetBase)
resources_type_tuple.append((unhealthy_datasets_notification_resources, "dataset_object_notifications"))
unhealthy_datasets_notification_resources = _get_unhealthy_stack_by_type(
session=session, target_uri='datasetUri', target_type=DatasetBase
)
resources_type_tuple.append((unhealthy_datasets_notification_resources, 'dataset_object_notifications'))
# Get all the environments which are in unhealthy state
unhealthy_environment_notification_resources = _get_unhealthy_stack_by_type(session=session,
target_uri='environmentUri',
target_type=Environment)
unhealthy_environment_notification_resources = _get_unhealthy_stack_by_type(
session=session, target_uri='environmentUri', target_type=Environment
)
resources_type_tuple.append(
(unhealthy_environment_notification_resources, "environment_object_notifications"))
(unhealthy_environment_notification_resources, 'environment_object_notifications')
)

# For each notification resource ( i.e. share notification, dataset notification, etc ),
# function _map_groups_to_resource_bundles maps each team name : resource bundle
for notification_resources, resource_bundle_type in resources_type_tuple:
_map_groups_to_resource_bundles(list_of_notifications=notification_resources, resource_bundle_type=resource_bundle_type)
_map_groups_to_resource_bundles(
list_of_notifications=notification_resources, resource_bundle_type=resource_bundle_type
)

for group, resource_bundle in group_name_to_resource_bundle_map.items():
email_body = _construct_email_body(resource_bundle)
log.debug(email_body)
subject = 'Attention Required | Data.all weekly digest'
try:
SESEmailNotificationService.create_and_send_email_notifications(subject=subject, msg=email_body,
recipient_groups_list=[group])
SESEmailNotificationService.create_and_send_email_notifications(
subject=subject, msg=email_body, recipient_groups_list=[group]
)
except Exception as e:
log.error(f"Error occurred in sending email while weekly reminder task due to: {e}")
task_exceptions.append(f"Error occurred in sending email while weekly reminder task due to: {e}")
log.error(f'Error occurred in sending email while weekly reminder task due to: {e}')
task_exceptions.append(f'Error occurred in sending email while weekly reminder task due to: {e}')
except Exception as e:
log.error(f"Error occurred while running the weekly reminder task: {e}")
task_exceptions.append(f"Error occurred while running the weekly reminder task: {e}")
log.error(f'Error occurred while running the weekly reminder task: {e}')
task_exceptions.append(f'Error occurred while running the weekly reminder task: {e}')
finally:
if len(task_exceptions) > 0:
log.info("Sending email notifications to the admin team")
log.info('Sending email notifications to the admin team')
AdminNotificationService().notify_admins_with_error_log(
process_error="Error occurred while running the weekly reminder task",
process_error='Error occurred while running the weekly reminder task',
error_logs=task_exceptions,
process_name="Weekly reminder task"
process_name='Weekly reminder task',
)


Expand All @@ -182,19 +197,27 @@ def _construct_email_body(resource_bundle: NotificationResourceBundle):
For environments and datasets which are in unhealthy state, you can go to the AWS account and check the stack associated with that environment/dataset and check the root cause of the stack. Once you address the root cause issue, you can click on "Update Stack" on the stack page of the data.all resource in the data.all UI <br><br><br>
"""
msg_content = """"""
share_object_table_content = _create_table_for_resource(resource_bundle.share_object_notifications, "shareUri",
"/console/shares/") if len(
resource_bundle.share_object_notifications) > 0 else ""
dataset_object_table_content = _create_table_for_resource(resource_bundle.dataset_object_notifications,
"datasetUri",
"/console/s3-datasets/") if len(
resource_bundle.dataset_object_notifications) > 0 else ""
environment_object_table_content = _create_table_for_resource(resource_bundle.environment_object_notifications,
"environmentUri",
"/console/environments/") if len(
resource_bundle.environment_object_notifications) > 0 else ""

msg_content += share_object_table_content + dataset_object_table_content + environment_object_table_content + "<br><br>"
share_object_table_content = (
_create_table_for_resource(resource_bundle.share_object_notifications, 'shareUri', '/console/shares/')
if len(resource_bundle.share_object_notifications) > 0
else ''
)
dataset_object_table_content = (
_create_table_for_resource(resource_bundle.dataset_object_notifications, 'datasetUri', '/console/s3-datasets/')
if len(resource_bundle.dataset_object_notifications) > 0
else ''
)
environment_object_table_content = (
_create_table_for_resource(
resource_bundle.environment_object_notifications, 'environmentUri', '/console/environments/'
)
if len(resource_bundle.environment_object_notifications) > 0
else ''
)

msg_content += (
share_object_table_content + dataset_object_table_content + environment_object_table_content + '<br><br>'
)

msg_footer = """
In case your stack(s) or share object(s) are still in unhealthy state after applying remedial measures, please contact data.all team. <br><br>
Expand Down Expand Up @@ -247,7 +270,7 @@ def _create_table_for_resource(list_of_resources, uri_attr, link_uri):


if __name__ == '__main__':
log.info("Starting weekly reminders task")
log.info('Starting weekly reminders task')
load_modules(modes={ImportMode.SHARES_TASK})
ENVNAME = os.environ.get('envname', 'dkrcompose')
ENGINE = get_engine(envname=ENVNAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ def fetch_omics_workflows(engine):
AdminNotificationService().notify_admins_with_error_log(
process_error='Error occurred while processing omics workflow task',
error_logs=[str(e)],
process_name='Omics Workflow'
process_name='Omics Workflow',
)


if __name__ == '__main__':
ENVNAME = os.environ.get('envname', 'local')
ENGINE = get_engine(envname=ENVNAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def process_approved_shares(self) -> bool:
AdminNotificationService().notify_admins_with_error_log(
process_error='Error occurred while processing redshift table share request',
process_name='redshift table share processor',
error_logs=[str(e)]
error_logs=[str(e)],
)
return False
return success
Expand Down Expand Up @@ -373,7 +373,7 @@ def process_revoked_shares(self) -> bool:
AdminNotificationService().notify_admins_with_error_log(
process_error='Error occurred while revoking redshift table share request',
process_name='redshift tables share processor',
error_logs=[str(e)]
error_logs=[str(e)],
)
self.session.commit()
try:
Expand Down Expand Up @@ -452,7 +452,7 @@ def process_revoked_shares(self) -> bool:
AdminNotificationService().notify_admins_with_error_log(
process_error='Error occurred while revoking redshift table share request',
process_name='redshift tables share processor',
error_logs=[str(e)]
error_logs=[str(e)],
)
return False
return success
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def get_dataset_table_by_uri(session, table_uri):

@staticmethod
def update_existing_tables_status(existing_tables, glue_tables):
updated_tables_status_map: Dict[str: str] = {}
updated_tables_status_map: Dict[str:str] = {}
for existing_table in existing_tables:
if existing_table.GlueTableName not in [t['Name'] for t in glue_tables]:
if existing_table.LastGlueTableStatus != 'Deleted':
Expand Down
Loading

0 comments on commit 17158e2

Please sign in to comment.