Skip to content

Commit

Permalink
Issue1248: Persistent Email Reminders (#1354)
Browse files Browse the repository at this point in the history
### Feature or Bugfix
- Feature


### Detail
- When a share request is initiated and remains pending for an extended
period, dataset producers will receive automated email reminders at
predefined intervals. These reminders will prompt producers to either
approve or extend the share request, thereby preventing delays in
accessing datasets.

Attaching screenshots for emails:

<img width="1336" alt="Screenshot 2024-06-20 at 5 34 31 PM"
src="https://github.com/data-dot-all/dataall/assets/26413731/d7be28c3-5c98-4146-92b1-295e136137a3">

<img width="1322" alt="Screenshot 2024-06-20 at 5 34 52 PM"
src="https://github.com/data-dot-all/dataall/assets/26413731/047556e8-59ee-4ebf-b8a7-c0a6684e2a63">


- Email will be sent every Monday at 9am UTC. Schedule can be changed in
cron expression in container.py

### Relates
- #1248

### Security
Please answer the questions below briefly where applicable, or write
`N/A`. Based on
[OWASP 10](https://owasp.org/Top10/en/).

- Does this PR introduce or modify any input fields or queries - this
includes
fetching data from storage outside the application (e.g. a database, an
S3 bucket)?
  - Is the input sanitized?
- What precautions are you taking before deserializing the data you
consume?
  - Is injection prevented by parametrizing queries?
  - Have you ensured no `eval` or similar functions are used?
- Does this PR introduce any functionality or component that requires
authorization?
- How have you ensured it respects the existing AuthN/AuthZ mechanisms?
  - Are you logging failed auth attempts?
- Are you using or adding any cryptographic features?
  - Do you use a standard proven implementations?
  - Are the used keys controlled by the customer? Where are they stored?
- Are you introducing any new policies/roles/users?
  - Have you used the least-privilege principle? How?


By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: Anushka Singh <[email protected]>
Co-authored-by: trajopadhye <[email protected]>
Co-authored-by: Mohit Arora <[email protected]>
Co-authored-by: rbernota <[email protected]>
Co-authored-by: Rick Bernotas <[email protected]>
Co-authored-by: Raj Chopde <[email protected]>
Co-authored-by: Noah Paige <[email protected]>
Co-authored-by: dlpzx <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: jaidisido <[email protected]>
Co-authored-by: dlpzx <[email protected]>
Co-authored-by: mourya-33 <[email protected]>
Co-authored-by: nikpodsh <[email protected]>
Co-authored-by: MK <[email protected]>
Co-authored-by: Manjula <[email protected]>
Co-authored-by: Zilvinas Saltys <[email protected]>
Co-authored-by: Zilvinas Saltys <[email protected]>
Co-authored-by: Daniel Lorch <[email protected]>
Co-authored-by: Tejas Rajopadhye <[email protected]>
Co-authored-by: Zilvinas Saltys <[email protected]>
Co-authored-by: Sofia Sazonova <[email protected]>
Co-authored-by: Sofia Sazonova <[email protected]>
  • Loading branch information
23 people authored Jun 26, 2024
1 parent e477bdf commit 90835fb
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dataall.core.environment.db.environment_models import Environment, EnvironmentGroup
from dataall.modules.datasets_base.db.dataset_models import DatasetBase
from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository
from dataall.modules.notifications.db.notification_models import Notification
from dataall.modules.shares_base.db.share_object_models import ShareObjectItem, ShareObject

from dataall.modules.shares_base.services.shares_enums import (
Expand Down Expand Up @@ -381,3 +382,24 @@ def paginated_list_shareable_items(session, subqueries: List[Query], data: dict
return paginate(
query.order_by(shareable_objects.c.itemName).distinct(), data.get('page', 1), data.get('pageSize', 10)
).to_dict()

@staticmethod
def fetch_submitted_shares_with_notifications(session):
"""
A method used by the scheduled ECS Task to run fetch_submitted_shares_with_notifications() process against ALL shared objects in ALL
active share objects within dataall
"""
with session() as session:
pending_shares = (
session.query(ShareObject)
.join(
Notification,
and_(
ShareObject.shareUri == func.split_part(Notification.target_uri, '|', 1),
ShareObject.datasetUri == func.split_part(Notification.target_uri, '|', 2),
),
)
.filter(and_(Notification.type == 'SHARE_OBJECT_SUBMITTED', ShareObject.status == 'Submitted'))
.all()
)
return pending_shares
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dataall.base.context import get_context
from dataall.modules.shares_base.services.shares_enums import ShareObjectStatus
from dataall.modules.notifications.db.notification_repositories import NotificationRepository
from dataall.modules.notifications.services.ses_email_notification_service import SESEmailNotificationService
from dataall.modules.datasets_base.db.dataset_models import DatasetBase

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -56,11 +57,47 @@ def notify_share_object_submission(self, email_id: str):
self._create_notification_task(subject=subject, msg=email_notification_msg)
return notifications

def notify_persistent_email_reminder(self, email_id: str):
share_link_text = ''
if os.environ.get('frontend_domain_url'):
share_link_text = (
f'<br><br>Please visit data.all <a href="{os.environ.get("frontend_domain_url")}'
f'/console/shares/{self.share.shareUri}">share link</a> '
f'to review and take appropriate action or view more details.'
)

msg_intro = f"""Dear User,
This is a reminder that a share request for the dataset "{self.dataset.label}" submitted by {email_id}
on behalf of principal "{self.share.principalId}" is still pending and has not been addressed.
"""

msg_end = """Your prompt attention to this matter is greatly appreciated.
Best regards,
The Data.all Team
"""

subject = f'URGENT REMINDER: Data.all | Action Required on Pending Share Request for {self.dataset.label}'
email_notification_msg = msg_intro + share_link_text + msg_end

notifications = self.register_notifications(
notification_type=DataSharingNotificationType.SHARE_OBJECT_SUBMITTED.value, msg=msg_intro
)

self._create_persistent_reminder_notification_task(subject=subject, msg=email_notification_msg)
return notifications

def notify_share_object_approval(self, email_id: str):
share_link_text = ''
if os.environ.get('frontend_domain_url'):
share_link_text = f'<br><br> Please visit data.all <a href="{os.environ.get("frontend_domain_url")}/console/shares/{self.share.shareUri}">share link </a> to take action or view more details'
msg = f'User {email_id} APPROVED share request for dataset {self.dataset.label} for principal {self.share.principalId}'
share_link_text = (
f'<br><br> Please visit data.all <a href="{os.environ.get("frontend_domain_url")}'
f'/console/shares/{self.share.shareUri}">share link </a> '
f'to take action or view more details'
)
msg = (
f'User {email_id} APPROVED share request for dataset {self.dataset.label} '
f'for principal {self.share.principalId}'
)
subject = f'Data.all | Share Request Approved for {self.dataset.label}'
email_notification_msg = msg + share_link_text

Expand Down Expand Up @@ -167,3 +204,42 @@ def _create_notification_task(self, subject, msg):
log.info(f'Notification type : {share_notification_config_type} is not active')
else:
log.info('Notifications are not active')

def _create_persistent_reminder_notification_task(self, subject, msg):
"""
At the moment just for notification_config_type = email, but designed for additional notification types
Emails sent to:
- dataset.SamlAdminGroupName
- dataset.stewards
"""
share_notification_config = config.get_property(
'modules.datasets_base.features.share_notifications', default=None
)
if share_notification_config:
for share_notification_config_type in share_notification_config.keys():
n_config = share_notification_config[share_notification_config_type]
if n_config.get('active', False) == True:
notification_recipient_groups_list = [self.dataset.SamlAdminGroupName, self.dataset.stewards]

if share_notification_config_type == 'email':
notification_task: Task = Task(
action='notification.service',
targetUri=self.share.shareUri,
payload={
'notificationType': share_notification_config_type,
'subject': subject,
'message': msg,
'recipientGroupsList': notification_recipient_groups_list,
'recipientEmailList': [],
},
)
self.session.add(notification_task)
self.session.commit()

SESEmailNotificationService.send_email_task(
subject, msg, notification_recipient_groups_list, []
)
else:
log.info(f'Notification type : {share_notification_config_type} is not active')
else:
log.info('Notifications are not active')
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import logging
import os
import sys
from dataall.modules.shares_base.db.share_object_models import ShareObject
from dataall.base.db import get_engine
from dataall.base.aws.sqs import SqsQueue
from dataall.core.tasks.service_handlers import Worker
from backend.dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository
from dataall.modules.shares_base.services.share_notification_service import ShareNotificationService
from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository


root = logging.getLogger()
root.setLevel(logging.INFO)
if not root.hasHandlers():
root.addHandler(logging.StreamHandler(sys.stdout))
log = logging.getLogger(__name__)


def persistent_email_reminders(engine):
"""
A method used by the scheduled ECS Task to run persistent_email_reminder() process against ALL
active share objects within data.all and send emails to all pending shares.
"""
with engine.scoped_session() as session:
log.info('Running Persistent Email Reminders Task')
pending_shares = ShareObjectRepository.fetch_submitted_shares_with_notifications(session=session)
log.info(f'Found {len(pending_shares)} pending shares')
pending_share: ShareObject
for pending_share in pending_shares:
log.info(f'Sending Email Reminder for Share: {pending_share.shareUri}')
share = ShareObjectRepository.get_share_by_uri(session, pending_share.shareUri)
dataset = DatasetBaseRepository.get_dataset_by_uri(session, share.datasetUri)
ShareNotificationService(session=session, dataset=dataset, share=share).notify_persistent_email_reminder(
email_id=share.owner
)
log.info(f'Email reminder sent for share {share.shareUri}')
log.info('Completed Persistent Email Reminders Task')


if __name__ == '__main__':
ENVNAME = os.environ.get('envname', 'local')
ENGINE = get_engine(envname=ENVNAME)
persistent_email_reminders(engine=ENGINE)
1 change: 1 addition & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"share_notifications": {
"email": {
"active": false,
"persistent_reminders": false,
"parameters": {
"group_notifications": true
}
Expand Down
3 changes: 3 additions & 0 deletions deploy/stacks/backend_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ def __init__(
self.lambda_api_stack.api_handler,
self.lambda_api_stack.elasticsearch_proxy_handler,
],
email_custom_domain=ses_stack.ses_identity.email_identity_name if ses_stack is not None else None,
ses_configuration_set=ses_stack.configuration_set.configuration_set_name if ses_stack is not None else None,
custom_domain=custom_domain,
**kwargs,
)

Expand Down
59 changes: 57 additions & 2 deletions deploy/stacks/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ def __init__(
tooling_account_id=None,
s3_prefix_list=None,
lambdas=None,
email_custom_domain=None,
ses_configuration_set=None,
custom_domain=None,
**kwargs,
):
super().__init__(scope, id, **kwargs)
Expand All @@ -49,6 +52,13 @@ def __init__(
envname, resource_prefix, vpc, vpce_connection, s3_prefix_list, lambdas
)
self.ecs_security_groups: [aws_ec2.SecurityGroup] = [self.scheduled_tasks_sg, self.share_manager_sg]
self.env_vars = self._create_env('INFO')

# Check if custom domain exists and if it exists email notifications could be enabled.
# Create an env variable which stores the domain URL.
# This is used for sending data.all share weblinks in the email notifications.
if custom_domain and custom_domain.get('hosted_zone_name'):
self.env_vars.update({'frontend_domain_url': f'https://{custom_domain["hosted_zone_name"]}'})

cluster = ecs.Cluster(
self,
Expand All @@ -58,7 +68,10 @@ def __init__(
container_insights=True,
)

self.task_role = self.create_task_role(envname, resource_prefix, pivot_role_name)
self.task_role = self.create_task_role(
envname, resource_prefix, pivot_role_name, email_custom_domain, ses_configuration_set
)

self.cicd_stacks_updater_role = self.create_cicd_stacks_updater_role(
envname, resource_prefix, tooling_account_id
)
Expand Down Expand Up @@ -178,6 +191,7 @@ def __init__(
self.add_share_verifier_task()
self.add_share_reapplier_task()
self.add_omics_fetch_workflows_task()
self.add_persistent_email_reminders_task()

@run_if(['modules.s3_datasets.active', 'modules.dashboards.active'])
def add_catalog_indexer_task(self):
Expand Down Expand Up @@ -286,6 +300,32 @@ def add_share_reapplier_task(self):
)
self.ecs_task_definitions_families.append(share_reapplier_task_definition.family)

@run_if(['modules.dataset_base.features.share_notifications.email.persistent_reminders'])
def add_persistent_email_reminders_task(self):
persistent_email_reminders_task, persistent_email_reminders_task_def = self.set_scheduled_task(
cluster=self.ecs_cluster,
command=[
'python3.9',
'-m',
'dataall.modules.shares_base.tasks.persistent_email_reminders_task',
],
container_id='container',
ecr_repository=self._ecr_repository,
environment=self.env_vars,
image_tag=self._cdkproxy_image_tag,
log_group=self.create_log_group(
self._envname, self._resource_prefix, log_group_name='persistent-email-reminders'
),
schedule_expression=Schedule.expression('cron(0 9 ? * 2 *)'), # Run at 9:00 AM UTC every Monday
scheduled_task_id=f'{self._resource_prefix}-{self._envname}-persistent-email-reminders-schedule',
task_id=f'{self._resource_prefix}-{self._envname}-persistent-email-reminders',
task_role=self.task_role,
vpc=self._vpc,
security_group=self.scheduled_tasks_sg,
prod_sizing=self._prod_sizing,
)
self.ecs_task_definitions_families.append(persistent_email_reminders_task.task_definition.family)

@run_if(['modules.s3_datasets.active'])
def add_subscription_task(self):
subscriptions_task, subscription_task_def = self.set_scheduled_task(
Expand Down Expand Up @@ -453,7 +493,9 @@ def create_cicd_stacks_updater_role(self, envname, resource_prefix, tooling_acco
)
return cicd_stacks_updater_role

def create_task_role(self, envname, resource_prefix, pivot_role_name):
def create_task_role(
self, envname, resource_prefix, pivot_role_name, email_custom_domain=None, ses_configuration_set=None
):
role_inline_policy = iam.Policy(
self,
f'ECSRolePolicy{envname}',
Expand Down Expand Up @@ -542,13 +584,26 @@ def create_task_role(self, envname, resource_prefix, pivot_role_name):
),
],
)

if email_custom_domain and ses_configuration_set:
role_inline_policy.document.add_statements(
iam.PolicyStatement(
actions=['ses:SendEmail'],
resources=[
f'arn:aws:ses:{self.region}:{self.account}:identity/{email_custom_domain}',
f'arn:aws:ses:{self.region}:{self.account}:configuration-set/{ses_configuration_set}',
],
)
)

task_role = iam.Role(
self,
f'ECSTaskRole{envname}',
role_name=f'{resource_prefix}-{envname}-ecs-tasks-role',
inline_policies={f'ECSRoleInlinePolicy{envname}': role_inline_policy.document},
assumed_by=iam.ServicePrincipal('ecs-tasks.amazonaws.com'),
)

task_role.grant_pass_role(task_role)
return task_role

Expand Down

0 comments on commit 90835fb

Please sign in to comment.