diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs
index c85eaa3c7..b55f5b0af 100644
--- a/.git-blame-ignore-revs
+++ b/.git-blame-ignore-revs
@@ -3,3 +3,5 @@
# Enforce frontend styling and remove dead code
fefc45c2cdc4d3107369c4d70210894d098a775c
+# Ignore backend ruff format/lint
+a35a4c7c4de78f0b38502882382f07aeac815b3c
diff --git a/backend/dataall/base/utils/json_utils.py b/backend/dataall/base/utils/json_utils.py
index 1a57af3df..9b7c9697b 100644
--- a/backend/dataall/base/utils/json_utils.py
+++ b/backend/dataall/base/utils/json_utils.py
@@ -37,7 +37,7 @@ def to_json(record):
elif isinstance(record, type({'a': 'dict'})):
return json.loads(json.dumps(record, default=json_decoder))
elif type(record) in [str, 'unicode']:
- return record
+ return json.dumps(record)
elif type(record) in [int, float]:
return json.dumps(record)
elif isinstance(record, bool):
diff --git a/backend/dataall/core/organizations/api/resolvers.py b/backend/dataall/core/organizations/api/resolvers.py
index 01d87e61e..a43933b3d 100644
--- a/backend/dataall/core/organizations/api/resolvers.py
+++ b/backend/dataall/core/organizations/api/resolvers.py
@@ -113,7 +113,7 @@ def send_query_chatbot(context, source, queryString):
def list_group_organization_permissions(context, source, organizationUri, groupUri):
- return OrganizationService.list_group_organization_permissions(organizationUri, groupUri)
+ return OrganizationService.list_group_organization_permissions(uri=organizationUri, groupUri=groupUri)
def list_invited_organization_permissions_with_descriptions(context, source):
diff --git a/backend/dataall/core/resource_lock/db/resource_lock_models.py b/backend/dataall/core/resource_lock/db/resource_lock_models.py
index d98f30ffe..7e478758e 100644
--- a/backend/dataall/core/resource_lock/db/resource_lock_models.py
+++ b/backend/dataall/core/resource_lock/db/resource_lock_models.py
@@ -9,7 +9,6 @@ class ResourceLock(Base):
resourceUri = Column(String, nullable=False, primary_key=True)
resourceType = Column(String, nullable=False, primary_key=True)
- isLocked = Column(Boolean, default=False)
acquiredByUri = Column(String, nullable=True)
acquiredByType = Column(String, nullable=True)
@@ -17,12 +16,10 @@ def __init__(
self,
resourceUri: str,
resourceType: str,
- isLocked: bool = False,
acquiredByUri: Optional[str] = None,
acquiredByType: Optional[str] = None,
):
self.resourceUri = resourceUri
self.resourceType = resourceType
- self.isLocked = isLocked
self.acquiredByUri = acquiredByUri
self.acquiredByType = acquiredByType
diff --git a/backend/dataall/core/resource_lock/db/resource_lock_repositories.py b/backend/dataall/core/resource_lock/db/resource_lock_repositories.py
index e242853a1..25d4e24c8 100644
--- a/backend/dataall/core/resource_lock/db/resource_lock_repositories.py
+++ b/backend/dataall/core/resource_lock/db/resource_lock_repositories.py
@@ -2,35 +2,23 @@
from dataall.core.resource_lock.db.resource_lock_models import ResourceLock
from sqlalchemy import and_, or_
+from sqlalchemy.orm import Session
+from time import sleep
+from typing import List, Tuple
+from contextlib import contextmanager
+from dataall.base.db.exceptions import ResourceLockTimeout
log = logging.getLogger(__name__)
+MAX_RETRIES = 10
+RETRY_INTERVAL = 60
-class ResourceLockRepository:
- @staticmethod
- def create_resource_lock(
- session, resource_uri, resource_type, is_locked=False, acquired_by_uri=None, acquired_by_type=None
- ):
- resource_lock = ResourceLock(
- resourceUri=resource_uri,
- resourceType=resource_type,
- isLocked=is_locked,
- acquiredByUri=acquired_by_uri,
- acquiredByType=acquired_by_type,
- )
- session.add(resource_lock)
- session.commit()
-
- @staticmethod
- def delete_resource_lock(session, resource_uri):
- resource_lock = session.query(ResourceLock).filter(ResourceLock.resourceUri == resource_uri).first()
- session.delete(resource_lock)
- session.commit()
+class ResourceLockRepository:
@staticmethod
- def acquire_locks(resources, session, acquired_by_uri, acquired_by_type):
+ def _acquire_locks(resources, session, acquired_by_uri, acquired_by_type):
"""
- Attempts to acquire one or more locks on the resources identified by resourceUri and resourceType.
+ Attempts to acquire/create one or more locks on the resources identified by resourceUri and resourceType.
Args:
resources: List of resource tuples (resourceUri, resourceType) to acquire locks for.
@@ -47,19 +35,22 @@ def acquire_locks(resources, session, acquired_by_uri, acquired_by_type):
and_(
ResourceLock.resourceUri == resource[0],
ResourceLock.resourceType == resource[1],
- ~ResourceLock.isLocked,
)
for resource in resources
]
- resource_locks = session.query(ResourceLock).filter(or_(*filter_conditions)).with_for_update().all()
- # Ensure lock record found for each resource
- if len(resource_locks) == len(resources):
- # Update the attributes of the ResourceLock object
- for resource_lock in resource_locks:
- resource_lock.isLocked = True
- resource_lock.acquiredByUri = acquired_by_uri
- resource_lock.acquiredByType = acquired_by_type
+ if not session.query(ResourceLock).filter(or_(*filter_conditions)).first():
+ records = []
+ for resource in resources:
+ records.append(
+ ResourceLock(
+ resourceUri=resource[0],
+ resourceType=resource[1],
+ acquiredByUri=acquired_by_uri,
+ acquiredByType=acquired_by_type,
+ )
+ )
+ session.add_all(records)
session.commit()
return True
else:
@@ -74,9 +65,9 @@ def acquire_locks(resources, session, acquired_by_uri, acquired_by_type):
return False
@staticmethod
- def release_lock(session, resource_uri, resource_type, share_uri):
+ def _release_lock(session, resource_uri, resource_type, share_uri):
"""
- Releases the lock on the resource identified by resource_uri, resource_type.
+ Releases/delete the lock on the resource identified by resource_uri, resource_type.
Args:
session (sqlalchemy.orm.Session): The SQLAlchemy session object used for interacting with the database.
@@ -96,7 +87,6 @@ def release_lock(session, resource_uri, resource_type, share_uri):
and_(
ResourceLock.resourceUri == resource_uri,
ResourceLock.resourceType == resource_type,
- ResourceLock.isLocked,
ResourceLock.acquiredByUri == share_uri,
)
)
@@ -105,10 +95,7 @@ def release_lock(session, resource_uri, resource_type, share_uri):
)
if resource_lock:
- resource_lock.isLocked = False
- resource_lock.acquiredByUri = ''
- resource_lock.acquiredByType = ''
-
+ session.delete(resource_lock)
session.commit()
return True
else:
@@ -120,3 +107,31 @@ def release_lock(session, resource_uri, resource_type, share_uri):
session.rollback()
log.error('Error occurred while releasing lock:', e)
return False
+
+ @staticmethod
+ @contextmanager
+ def acquire_lock_with_retry(
+ resources: List[Tuple[str, str]], session: Session, acquired_by_uri: str, acquired_by_type: str
+ ):
+ retries_remaining = MAX_RETRIES
+ log.info(f'Attempting to acquire lock for resources {resources} by share {acquired_by_uri}...')
+ while not (
+ lock_acquired := ResourceLockRepository._acquire_locks(
+ resources, session, acquired_by_uri, acquired_by_type
+ )
+ ):
+ log.info(
+ f'Lock for one or more resources {resources} already acquired. Retrying in {RETRY_INTERVAL} seconds...'
+ )
+ sleep(RETRY_INTERVAL)
+ retries_remaining -= 1
+ if retries_remaining <= 0:
+ raise ResourceLockTimeout(
+ 'process shares',
+ f'Failed to acquire lock for one or more of {resources=}',
+ )
+ try:
+ yield lock_acquired
+ finally:
+ for resource in resources:
+ ResourceLockRepository._release_lock(session, resource[0], resource[1], acquired_by_uri)
diff --git a/backend/dataall/modules/catalog/__init__.py b/backend/dataall/modules/catalog/__init__.py
index 1444359ed..95e76f0a6 100644
--- a/backend/dataall/modules/catalog/__init__.py
+++ b/backend/dataall/modules/catalog/__init__.py
@@ -17,7 +17,7 @@ def __init__(self):
class CatalogAsyncHandlersModuleInterface(ModuleInterface):
- """Implements ModuleInterface for datapipelines async lambda"""
+ """Implements ModuleInterface for catalog async lambda"""
@staticmethod
def is_supported(modes: Set[ImportMode]):
diff --git a/backend/dataall/modules/catalog/api/types.py b/backend/dataall/modules/catalog/api/types.py
index 78703dee0..f8aa049e0 100644
--- a/backend/dataall/modules/catalog/api/types.py
+++ b/backend/dataall/modules/catalog/api/types.py
@@ -42,6 +42,7 @@
fields=[
gql.Field(name='nodeUri', type=gql.ID),
gql.Field(name='parentUri', type=gql.NonNullableType(gql.String)),
+ gql.Field(name='status', type=gql.NonNullableType(gql.String)),
gql.Field(name='owner', type=gql.NonNullableType(gql.String)),
gql.Field(name='path', type=gql.NonNullableType(gql.String)),
gql.Field(name='label', type=gql.NonNullableType(gql.String)),
diff --git a/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py b/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py
index b215a933f..430a2d409 100644
--- a/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py
+++ b/backend/dataall/modules/catalog/handlers/ecs_catalog_handlers.py
@@ -18,8 +18,8 @@ def run_ecs_reindex_catalog_task(engine, task: Task):
CatalogIndexerTask.index_objects(engine, str(task.payload.get('with_deletes', False)))
else:
ecs_task_arn = Ecs.run_ecs_task(
- task_definition_param='ecs/task_def_arn/share_management',
- container_name_param='ecs/container/share_management',
+ task_definition_param='ecs/task_def_arn/catalog_indexer',
+ container_name_param='ecs/container/catalog_indexer',
context=[
{'name': 'with_deletes', 'value': str(task.payload.get('with_deletes', False))},
],
diff --git a/backend/dataall/modules/catalog/services/catalog_service.py b/backend/dataall/modules/catalog/services/catalog_service.py
index 92ae33d67..d750cde3b 100644
--- a/backend/dataall/modules/catalog/services/catalog_service.py
+++ b/backend/dataall/modules/catalog/services/catalog_service.py
@@ -1,12 +1,6 @@
import logging
from dataall.base.context import get_context
-from dataall.core.permissions.services.tenant_policy_service import TenantPolicyService
-
-from dataall.modules.catalog.db.glossary_repositories import GlossaryRepository
-from dataall.modules.catalog.db.glossary_models import GlossaryNode
-from dataall.modules.catalog.services.glossaries_permissions import MANAGE_GLOSSARIES
-from dataall.modules.catalog.indexers.registry import GlossaryRegistry
from dataall.core.permissions.services.tenant_policy_service import TenantPolicyValidationService
from dataall.core.tasks.db.task_models import Task
from dataall.core.tasks.service_handlers import Worker
@@ -15,10 +9,6 @@
logger = logging.getLogger(__name__)
-def _session():
- return get_context().db_engine.scoped_session()
-
-
class CatalogService:
@staticmethod
def start_reindex_catalog(with_deletes: bool) -> bool:
diff --git a/backend/dataall/modules/dashboards/__init__.py b/backend/dataall/modules/dashboards/__init__.py
index a47dd7f26..ffbc8e92d 100644
--- a/backend/dataall/modules/dashboards/__init__.py
+++ b/backend/dataall/modules/dashboards/__init__.py
@@ -75,3 +75,21 @@ def __init__(self):
DashboardCatalogIndexer()
log.info('Dashboard catalog indexer task has been loaded')
+
+
+class DashboardAsyncHandlersModuleInterface(ModuleInterface):
+ """Implements ModuleInterface for dashboard async lambda"""
+
+ @staticmethod
+ def is_supported(modes: Set[ImportMode]):
+ return ImportMode.HANDLERS in modes
+
+ @staticmethod
+ def depends_on() -> List[Type['ModuleInterface']]:
+ from dataall.modules.catalog import CatalogAsyncHandlersModuleInterface
+
+ return [CatalogAsyncHandlersModuleInterface]
+
+ def __init__(self):
+ pass
+ log.info('S3 Dataset handlers have been imported')
diff --git a/backend/dataall/modules/s3_datasets/services/dataset_service.py b/backend/dataall/modules/s3_datasets/services/dataset_service.py
index 5f4dbf175..58561112a 100644
--- a/backend/dataall/modules/s3_datasets/services/dataset_service.py
+++ b/backend/dataall/modules/s3_datasets/services/dataset_service.py
@@ -165,9 +165,6 @@ def create_dataset(uri, admin_group, data: dict):
DatasetService.check_imported_resources(dataset)
dataset = DatasetRepository.create_dataset(session=session, env=environment, dataset=dataset, data=data)
- ResourceLockRepository.create_resource_lock(
- session=session, resource_uri=dataset.datasetUri, resource_type=dataset.__tablename__
- )
DatasetBucketRepository.create_dataset_bucket(session, dataset, data)
ResourcePolicyService.attach_resource_policy(
@@ -413,7 +410,6 @@ def delete_dataset(uri: str, delete_from_aws: bool = False):
ResourcePolicyService.delete_resource_policy(session=session, resource_uri=uri, group=env.SamlGroupName)
if dataset.stewards:
ResourcePolicyService.delete_resource_policy(session=session, resource_uri=uri, group=dataset.stewards)
- ResourceLockRepository.delete_resource_lock(session=session, resource_uri=dataset.datasetUri)
DatasetRepository.delete_dataset(session, dataset)
if delete_from_aws:
diff --git a/backend/dataall/modules/s3_datasets_shares/__init__.py b/backend/dataall/modules/s3_datasets_shares/__init__.py
index 779f4a4ff..fcf520992 100644
--- a/backend/dataall/modules/s3_datasets_shares/__init__.py
+++ b/backend/dataall/modules/s3_datasets_shares/__init__.py
@@ -23,15 +23,37 @@ def depends_on() -> List[Type['ModuleInterface']]:
def __init__(self):
from dataall.core.environment.services.environment_resource_manager import EnvironmentResourceManager
from dataall.modules.s3_datasets_shares import api
- from dataall.modules.s3_datasets_shares.services.managed_share_policy_service import SharePolicyService
+ from dataall.modules.s3_datasets_shares.services.s3_share_managed_policy_service import S3SharePolicyService
from dataall.modules.s3_datasets.services.dataset_service import DatasetService
from dataall.modules.datasets_base.services.dataset_list_service import DatasetListService
- from dataall.modules.s3_datasets_shares.services.dataset_sharing_service import DatasetSharingService
- from dataall.modules.s3_datasets_shares.db.share_object_repositories import ShareEnvironmentResource
+ from dataall.modules.s3_datasets_shares.services.s3_share_dataset_service import S3ShareDatasetService
+ from dataall.modules.s3_datasets_shares.db.s3_share_object_repositories import S3ShareEnvironmentResource
+ from dataall.modules.shares_base.services.share_processor_manager import (
+ ShareProcessorManager,
+ ShareProcessorDefinition,
+ )
+ from dataall.modules.shares_base.services.shares_enums import ShareableType
+ from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, DatasetBucket, DatasetStorageLocation
+
+ EnvironmentResourceManager.register(S3ShareEnvironmentResource())
+ DatasetService.register(S3ShareDatasetService())
+ DatasetListService.register(S3ShareDatasetService())
+
+ ShareProcessorManager.register_processor(
+ ShareProcessorDefinition(ShareableType.Table, None, DatasetTable, DatasetTable.tableUri)
+ )
+ ShareProcessorManager.register_processor(
+ ShareProcessorDefinition(ShareableType.S3Bucket, None, DatasetBucket, DatasetBucket.bucketUri)
+ )
+ ShareProcessorManager.register_processor(
+ ShareProcessorDefinition(
+ ShareableType.StorageLocation,
+ None,
+ DatasetStorageLocation,
+ DatasetStorageLocation.locationUri,
+ )
+ )
- EnvironmentResourceManager.register(ShareEnvironmentResource())
- DatasetService.register(DatasetSharingService())
- DatasetListService.register(DatasetSharingService())
log.info('API of dataset sharing has been imported')
@@ -55,7 +77,7 @@ def depends_on() -> List[Type['ModuleInterface']]:
]
def __init__(self):
- log.info('S3 Sharing handlers have been imported')
+ log.info('s3_datasets_shares handlers have been imported')
class S3DatasetsSharesCdkModuleInterface(ModuleInterface):
@@ -67,9 +89,9 @@ def is_supported(modes):
def __init__(self):
import dataall.modules.s3_datasets_shares.cdk
- from dataall.modules.s3_datasets_shares.services.managed_share_policy_service import SharePolicyService
+ from dataall.modules.s3_datasets_shares.services.s3_share_managed_policy_service import S3SharePolicyService
- log.info('CDK module data_sharing has been imported')
+ log.info('CDK module s3_datasets_shares has been imported')
class S3DatasetsSharesECSShareModuleInterface(ModuleInterface):
@@ -122,4 +144,4 @@ def __init__(self):
)
)
- log.info('ECS Share module s3_data_sharing has been imported')
+ log.info('ECS Share module s3_datasets_shares has been imported')
diff --git a/backend/dataall/modules/s3_datasets_shares/api/mutations.py b/backend/dataall/modules/s3_datasets_shares/api/mutations.py
index 0af2eb548..3f95f0ca3 100644
--- a/backend/dataall/modules/s3_datasets_shares/api/mutations.py
+++ b/backend/dataall/modules/s3_datasets_shares/api/mutations.py
@@ -1,6 +1,7 @@
from dataall.base.api import gql
from dataall.modules.s3_datasets_shares.api.resolvers import (
verify_dataset_share_objects,
+ reapply_share_items_share_object_for_dataset,
)
@@ -10,3 +11,10 @@
type=gql.Boolean,
resolver=verify_dataset_share_objects,
)
+
+reApplyShareObjectItemsOnDataset = gql.MutationField(
+ name='reApplyShareObjectItemsOnDataset',
+ args=[gql.Argument(name='datasetUri', type=gql.NonNullableType(gql.String))],
+ type=gql.Boolean,
+ resolver=reapply_share_items_share_object_for_dataset,
+)
diff --git a/backend/dataall/modules/s3_datasets_shares/api/resolvers.py b/backend/dataall/modules/s3_datasets_shares/api/resolvers.py
index 5b07c159b..f77076a7a 100644
--- a/backend/dataall/modules/s3_datasets_shares/api/resolvers.py
+++ b/backend/dataall/modules/s3_datasets_shares/api/resolvers.py
@@ -3,7 +3,7 @@
from dataall.base.api.context import Context
from dataall.base.db.exceptions import RequiredParameter
from dataall.base.feature_toggle_checker import is_feature_enabled
-from dataall.modules.s3_datasets_shares.services.dataset_sharing_service import DatasetSharingService
+from dataall.modules.s3_datasets_shares.services.s3_share_service import S3ShareService
log = logging.getLogger(__name__)
@@ -41,32 +41,34 @@ def validate_dataset_share_selector_input(data):
def list_shared_tables_by_env_dataset(context: Context, source, datasetUri: str, envUri: str):
- return DatasetSharingService.list_shared_tables_by_env_dataset(datasetUri, envUri)
+ return S3ShareService.list_shared_tables_by_env_dataset(datasetUri, envUri)
@is_feature_enabled('modules.s3_datasets.features.aws_actions')
def get_dataset_shared_assume_role_url(context: Context, source, datasetUri: str = None):
- return DatasetSharingService.get_dataset_shared_assume_role_url(uri=datasetUri)
+ return S3ShareService.get_dataset_shared_assume_role_url(uri=datasetUri)
def verify_dataset_share_objects(context: Context, source, input):
RequestValidator.validate_dataset_share_selector_input(input)
dataset_uri = input.get('datasetUri')
verify_share_uris = input.get('shareUris')
- return DatasetSharingService.verify_dataset_share_objects(uri=dataset_uri, share_uris=verify_share_uris)
+ return S3ShareService.verify_dataset_share_objects(uri=dataset_uri, share_uris=verify_share_uris)
+
+
+def reapply_share_items_share_object_for_dataset(context: Context, source, datasetUri: str):
+ return S3ShareService.reapply_share_items_for_dataset(uri=datasetUri)
def get_s3_consumption_data(context: Context, source, shareUri: str):
- return DatasetSharingService.get_s3_consumption_data(uri=shareUri)
+ return S3ShareService.get_s3_consumption_data(uri=shareUri)
def list_shared_databases_tables_with_env_group(context: Context, source, environmentUri: str, groupUri: str):
- return DatasetSharingService.list_shared_databases_tables_with_env_group(
- environmentUri=environmentUri, groupUri=groupUri
- )
+ return S3ShareService.list_shared_databases_tables_with_env_group(environmentUri=environmentUri, groupUri=groupUri)
def resolve_shared_db_name(context: Context, source, **kwargs):
- return DatasetSharingService.resolve_shared_db_name(
+ return S3ShareService.resolve_shared_db_name(
source.GlueDatabaseName, source.shareUri, source.targetEnvAwsAccountId, source.targetEnvRegion
)
diff --git a/backend/dataall/modules/s3_datasets_shares/aws/glue_client.py b/backend/dataall/modules/s3_datasets_shares/aws/glue_client.py
index 61795b199..ef694d8c0 100644
--- a/backend/dataall/modules/s3_datasets_shares/aws/glue_client.py
+++ b/backend/dataall/modules/s3_datasets_shares/aws/glue_client.py
@@ -175,6 +175,18 @@ def get_source_catalog(self):
raise e
return None
+ def get_glue_database_from_catalog(self):
+ # Check if a catalog account exists and return database accordingly
+ try:
+ catalog_dict = self.get_source_catalog()
+
+ if catalog_dict is not None:
+ return catalog_dict.get('database_name')
+ else:
+ return self._database
+ except Exception as e:
+ raise e
+
def get_database_tags(self):
# Get tags from the glue database
account_id = self._account_id
diff --git a/backend/dataall/modules/s3_datasets_shares/db/share_object_repositories.py b/backend/dataall/modules/s3_datasets_shares/db/s3_share_object_repositories.py
similarity index 74%
rename from backend/dataall/modules/s3_datasets_shares/db/share_object_repositories.py
rename to backend/dataall/modules/s3_datasets_shares/db/s3_share_object_repositories.py
index c21e04a3d..dac3ae8f0 100644
--- a/backend/dataall/modules/s3_datasets_shares/db/share_object_repositories.py
+++ b/backend/dataall/modules/s3_datasets_shares/db/s3_share_object_repositories.py
@@ -2,16 +2,13 @@
from warnings import warn
from typing import List
-from sqlalchemy import and_, or_, func, case
+from sqlalchemy import and_, or_
from sqlalchemy.orm import Query
-from dataall.core.environment.db.environment_models import Environment, EnvironmentGroup
+from dataall.core.environment.db.environment_models import Environment
from dataall.core.environment.services.environment_resource_manager import EnvironmentResource
-from dataall.base.db import exceptions, paginate
from dataall.modules.shares_base.services.shares_enums import (
- ShareItemHealthStatus,
ShareObjectStatus,
- ShareItemStatus,
ShareableType,
PrincipalType,
)
@@ -19,13 +16,13 @@
from dataall.modules.shares_base.db.share_object_models import ShareObjectItem, ShareObject
from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
-from dataall.modules.s3_datasets.db.dataset_models import DatasetStorageLocation, DatasetTable, S3Dataset, DatasetBucket
+from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, S3Dataset
from dataall.modules.datasets_base.db.dataset_models import DatasetBase
logger = logging.getLogger(__name__)
-class ShareEnvironmentResource(EnvironmentResource):
+class S3ShareEnvironmentResource(EnvironmentResource):
@staticmethod
def count_resources(session, environment, group_uri) -> int:
return S3ShareObjectRepository.count_S3_principal_shares(
@@ -395,124 +392,6 @@ def list_s3_dataset_shares_with_existing_shared_items(
query = query.filter(ShareObjectItem.itemType == item_type)
return query.all()
- @staticmethod # TODO!!!
- def list_shareable_items(session, share, states, data): # TODO
- # All tables from dataset with a column isShared
- # marking the table as part of the shareObject
- tables = (
- session.query(
- DatasetTable.tableUri.label('itemUri'),
- func.coalesce('DatasetTable').label('itemType'),
- DatasetTable.GlueTableName.label('itemName'),
- DatasetTable.description.label('description'),
- ShareObjectItem.shareItemUri.label('shareItemUri'),
- ShareObjectItem.status.label('status'),
- ShareObjectItem.healthStatus.label('healthStatus'),
- ShareObjectItem.healthMessage.label('healthMessage'),
- ShareObjectItem.lastVerificationTime.label('lastVerificationTime'),
- case(
- [(ShareObjectItem.shareItemUri.isnot(None), True)],
- else_=False,
- ).label('isShared'),
- )
- .outerjoin(
- ShareObjectItem,
- and_(
- ShareObjectItem.shareUri == share.shareUri,
- DatasetTable.tableUri == ShareObjectItem.itemUri,
- ),
- )
- .filter(DatasetTable.datasetUri == share.datasetUri)
- )
- if states:
- tables = tables.filter(ShareObjectItem.status.in_(states))
-
- # All folders from the dataset with a column isShared
- # marking the folder as part of the shareObject
- locations = (
- session.query(
- DatasetStorageLocation.locationUri.label('itemUri'),
- func.coalesce('DatasetStorageLocation').label('itemType'),
- DatasetStorageLocation.S3Prefix.label('itemName'),
- DatasetStorageLocation.description.label('description'),
- ShareObjectItem.shareItemUri.label('shareItemUri'),
- ShareObjectItem.status.label('status'),
- ShareObjectItem.healthStatus.label('healthStatus'),
- ShareObjectItem.healthMessage.label('healthMessage'),
- ShareObjectItem.lastVerificationTime.label('lastVerificationTime'),
- case(
- [(ShareObjectItem.shareItemUri.isnot(None), True)],
- else_=False,
- ).label('isShared'),
- )
- .outerjoin(
- ShareObjectItem,
- and_(
- ShareObjectItem.shareUri == share.shareUri,
- DatasetStorageLocation.locationUri == ShareObjectItem.itemUri,
- ),
- )
- .filter(DatasetStorageLocation.datasetUri == share.datasetUri)
- )
- if states:
- locations = locations.filter(ShareObjectItem.status.in_(states))
-
- s3_buckets = (
- session.query(
- DatasetBucket.bucketUri.label('itemUri'),
- func.coalesce('S3Bucket').label('itemType'),
- DatasetBucket.S3BucketName.label('itemName'),
- DatasetBucket.description.label('description'),
- ShareObjectItem.shareItemUri.label('shareItemUri'),
- ShareObjectItem.status.label('status'),
- ShareObjectItem.healthStatus.label('healthStatus'),
- ShareObjectItem.healthMessage.label('healthMessage'),
- ShareObjectItem.lastVerificationTime.label('lastVerificationTime'),
- case(
- [(ShareObjectItem.shareItemUri.isnot(None), True)],
- else_=False,
- ).label('isShared'),
- )
- .outerjoin(
- ShareObjectItem,
- and_(
- ShareObjectItem.shareUri == share.shareUri,
- DatasetBucket.bucketUri == ShareObjectItem.itemUri,
- ),
- )
- .filter(DatasetBucket.datasetUri == share.datasetUri)
- )
- if states:
- s3_buckets = s3_buckets.filter(ShareObjectItem.status.in_(states))
-
- shareable_objects = tables.union(locations, s3_buckets).subquery('shareable_objects')
- query = session.query(shareable_objects)
-
- if data:
- if data.get('term'):
- term = data.get('term')
- query = query.filter(
- or_(
- shareable_objects.c.itemName.ilike(term + '%'),
- shareable_objects.c.description.ilike(term + '%'),
- )
- )
- if 'isShared' in data:
- is_shared = data.get('isShared')
- query = query.filter(shareable_objects.c.isShared == is_shared)
-
- if 'isHealthy' in data:
- # healthy_status = ShareItemHealthStatus.Healthy.value
- query = (
- query.filter(shareable_objects.c.healthStatus == ShareItemHealthStatus.Healthy.value)
- if data.get('isHealthy')
- else query.filter(shareable_objects.c.healthStatus != ShareItemHealthStatus.Healthy.value)
- )
-
- return paginate(
- query.order_by(shareable_objects.c.itemName).distinct(), data.get('page', 1), data.get('pageSize', 10)
- ).to_dict()
-
# the next 2 methods are used in subscription task
@staticmethod
def find_share_items_by_item_uri(session, item_uri):
diff --git a/backend/dataall/modules/s3_datasets_shares/services/dataset_sharing_alarm_service.py b/backend/dataall/modules/s3_datasets_shares/services/s3_share_alarm_service.py
similarity index 99%
rename from backend/dataall/modules/s3_datasets_shares/services/dataset_sharing_alarm_service.py
rename to backend/dataall/modules/s3_datasets_shares/services/s3_share_alarm_service.py
index 5a46fea2e..e9c6a5e2f 100644
--- a/backend/dataall/modules/s3_datasets_shares/services/dataset_sharing_alarm_service.py
+++ b/backend/dataall/modules/s3_datasets_shares/services/s3_share_alarm_service.py
@@ -9,7 +9,7 @@
log = logging.getLogger(__name__)
-class DatasetSharingAlarmService(AlarmService):
+class S3ShareAlarmService(AlarmService):
"""Contains set of alarms for datasets"""
def trigger_table_sharing_failure_alarm(
diff --git a/backend/dataall/modules/s3_datasets_shares/services/s3_share_dataset_service.py b/backend/dataall/modules/s3_datasets_shares/services/s3_share_dataset_service.py
new file mode 100644
index 000000000..435ed4e8e
--- /dev/null
+++ b/backend/dataall/modules/s3_datasets_shares/services/s3_share_dataset_service.py
@@ -0,0 +1,105 @@
+from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService
+from dataall.base.db import exceptions
+from dataall.modules.shares_base.db.share_object_models import ShareObject
+from dataall.modules.s3_datasets_shares.db.s3_share_object_repositories import S3ShareObjectRepository
+from dataall.modules.shares_base.services.share_permissions import SHARE_OBJECT_APPROVER
+from dataall.modules.s3_datasets.services.dataset_permissions import (
+ DELETE_DATASET,
+ DELETE_DATASET_TABLE,
+ DELETE_DATASET_FOLDER,
+)
+from dataall.modules.datasets_base.services.datasets_enums import DatasetRole, DatasetTypes
+from dataall.modules.datasets_base.services.dataset_service_interface import DatasetServiceInterface
+
+
+import logging
+
+log = logging.getLogger(__name__)
+
+
+class S3ShareDatasetService(DatasetServiceInterface):
+ @property
+ def dataset_type(self):
+ return DatasetTypes.S3
+
+ @staticmethod
+ def resolve_additional_dataset_user_role(session, uri, username, groups):
+ """Implemented as part of the DatasetServiceInterface"""
+ share = S3ShareObjectRepository.get_share_by_dataset_attributes(session, uri, username, groups)
+ if share is not None:
+ return DatasetRole.Shared.value
+ return None
+
+ @staticmethod
+ def check_before_delete(session, uri, **kwargs):
+ """Implemented as part of the DatasetServiceInterface"""
+ action = kwargs.get('action')
+ if action in [DELETE_DATASET_FOLDER, DELETE_DATASET_TABLE]:
+ existing_s3_shared_items = S3ShareObjectRepository.check_existing_s3_shared_items(session, uri)
+ if existing_s3_shared_items:
+ raise exceptions.ResourceShared(
+ action=action,
+ message='Revoke all shares for this item before deletion',
+ )
+ elif action in [DELETE_DATASET]:
+ shares = S3ShareObjectRepository.list_s3_dataset_shares_with_existing_shared_items(
+ session=session, dataset_uri=uri
+ )
+ if shares:
+ raise exceptions.ResourceShared(
+ action=DELETE_DATASET,
+ message='Revoke all dataset shares before deletion.',
+ )
+ else:
+ raise exceptions.RequiredParameter('Delete action')
+ return True
+
+ @staticmethod
+ def execute_on_delete(session, uri, **kwargs):
+ """Implemented as part of the DatasetServiceInterface"""
+ action = kwargs.get('action')
+ if action in [DELETE_DATASET_FOLDER, DELETE_DATASET_TABLE]:
+ S3ShareObjectRepository.delete_s3_share_item(session, uri)
+ elif action in [DELETE_DATASET]:
+ S3ShareObjectRepository.delete_s3_shares_with_no_shared_items(session, uri)
+ else:
+ raise exceptions.RequiredParameter('Delete action')
+ return True
+
+ @staticmethod
+ def append_to_list_user_datasets(session, username, groups):
+ """Implemented as part of the DatasetServiceInterface"""
+ return S3ShareObjectRepository.list_user_s3_shared_datasets(session, username, groups)
+
+ @staticmethod
+ def extend_attach_steward_permissions(session, dataset, new_stewards, **kwargs):
+ """Implemented as part of the DatasetServiceInterface"""
+ dataset_shares = S3ShareObjectRepository.find_s3_dataset_shares(session, dataset.datasetUri)
+ if dataset_shares:
+ for share in dataset_shares:
+ ResourcePolicyService.attach_resource_policy(
+ session=session,
+ group=new_stewards,
+ permissions=SHARE_OBJECT_APPROVER,
+ resource_uri=share.shareUri,
+ resource_type=ShareObject.__name__,
+ )
+ if dataset.stewards != dataset.SamlAdminGroupName:
+ ResourcePolicyService.delete_resource_policy(
+ session=session,
+ group=dataset.stewards,
+ resource_uri=share.shareUri,
+ )
+
+ @staticmethod
+ def extend_delete_steward_permissions(session, dataset, **kwargs):
+ """Implemented as part of the DatasetServiceInterface"""
+ dataset_shares = S3ShareObjectRepository.find_s3_dataset_shares(session, dataset.datasetUri)
+ if dataset_shares:
+ for share in dataset_shares:
+ if dataset.stewards != dataset.SamlAdminGroupName:
+ ResourcePolicyService.delete_resource_policy(
+ session=session,
+ group=dataset.stewards,
+ resource_uri=share.shareUri,
+ )
diff --git a/backend/dataall/modules/s3_datasets_shares/services/managed_share_policy_service.py b/backend/dataall/modules/s3_datasets_shares/services/s3_share_managed_policy_service.py
similarity index 98%
rename from backend/dataall/modules/s3_datasets_shares/services/managed_share_policy_service.py
rename to backend/dataall/modules/s3_datasets_shares/services/s3_share_managed_policy_service.py
index e53a94944..2738e8b38 100644
--- a/backend/dataall/modules/s3_datasets_shares/services/managed_share_policy_service.py
+++ b/backend/dataall/modules/s3_datasets_shares/services/s3_share_managed_policy_service.py
@@ -18,7 +18,7 @@
EMPTY_STATEMENT_SID = 'EmptyStatement'
-class SharePolicyService(ManagedPolicy):
+class S3SharePolicyService(ManagedPolicy):
def __init__(self, role_name, account, region, environmentUri, resource_prefix):
self.role_name = role_name
self.account = account
@@ -48,7 +48,7 @@ def generate_empty_policy(self) -> dict:
@staticmethod
def remove_empty_statement(policy_doc: dict, statement_sid: str) -> dict:
- statement_index = SharePolicyService._get_statement_by_sid(policy_doc, statement_sid)
+ statement_index = S3SharePolicyService._get_statement_by_sid(policy_doc, statement_sid)
if statement_index is not None:
policy_doc['Statement'].pop(statement_index)
return policy_doc
diff --git a/backend/dataall/modules/s3_datasets_shares/services/dataset_sharing_service.py b/backend/dataall/modules/s3_datasets_shares/services/s3_share_service.py
similarity index 55%
rename from backend/dataall/modules/s3_datasets_shares/services/dataset_sharing_service.py
rename to backend/dataall/modules/s3_datasets_shares/services/s3_share_service.py
index 207ab1c65..255544bc7 100644
--- a/backend/dataall/modules/s3_datasets_shares/services/dataset_sharing_service.py
+++ b/backend/dataall/modules/s3_datasets_shares/services/s3_share_service.py
@@ -1,123 +1,141 @@
+import logging
from warnings import warn
+
from dataall.base.db import utils
+from dataall.base.context import get_context
+from dataall.base.aws.sts import SessionHelper
from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService
from dataall.core.permissions.services.tenant_policy_service import TenantPolicyService
from dataall.core.environment.services.environment_service import EnvironmentService
-from dataall.base.context import get_context
-from dataall.base.db import exceptions
-from dataall.base.aws.sts import SessionHelper
-from dataall.modules.shares_base.db.share_object_models import ShareObject
-from dataall.modules.s3_datasets_shares.db.share_object_repositories import S3ShareObjectRepository
+from dataall.core.tasks.db.task_models import Task
+from dataall.core.tasks.service_handlers import Worker
from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository
from dataall.modules.shares_base.db.share_state_machines_repositories import ShareStatusRepository
-from dataall.modules.shares_base.services.share_permissions import SHARE_OBJECT_APPROVER, GET_SHARE_OBJECT
-from dataall.modules.s3_datasets_shares.services.share_item_service import S3ShareItemService
from dataall.modules.shares_base.services.share_item_service import ShareItemService
+from dataall.modules.shares_base.services.share_permissions import GET_SHARE_OBJECT
+from dataall.modules.shares_base.services.shares_enums import (
+ ShareableType,
+ ShareItemStatus,
+)
+from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, DatasetStorageLocation
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.s3_datasets.services.dataset_permissions import (
MANAGE_DATASETS,
UPDATE_DATASET,
- DELETE_DATASET,
- DELETE_DATASET_TABLE,
- DELETE_DATASET_FOLDER,
CREDENTIALS_DATASET,
+ DATASET_TABLE_READ,
+ DATASET_FOLDER_READ,
)
-from dataall.modules.datasets_base.services.datasets_enums import DatasetRole, DatasetTypes
-from dataall.modules.datasets_base.services.dataset_service_interface import DatasetServiceInterface
+from dataall.modules.s3_datasets_shares.db.s3_share_object_repositories import S3ShareObjectRepository
from dataall.modules.s3_datasets_shares.aws.glue_client import GlueClient
-import logging
-
log = logging.getLogger(__name__)
-class DatasetSharingService(DatasetServiceInterface):
- @property
- def dataset_type(self):
- return DatasetTypes.S3
-
+class S3ShareService:
@staticmethod
- def resolve_additional_dataset_user_role(session, uri, username, groups):
- """Implemented as part of the DatasetServiceInterface"""
- share = S3ShareObjectRepository.get_share_by_dataset_attributes(session, uri, username, groups)
- if share is not None:
- return DatasetRole.Shared.value
- return None
+ def delete_dataset_table_read_permission(session, share, tableUri):
+ """
+ Delete Table permissions to share groups
+ """
+ other_shares = S3ShareObjectRepository.find_all_other_share_items(
+ session,
+ not_this_share_uri=share.shareUri,
+ item_uri=tableUri,
+ share_type=ShareableType.Table.value,
+ principal_type='GROUP',
+ principal_uri=share.groupUri,
+ item_status=[ShareItemStatus.Share_Succeeded.value],
+ )
+ log.info(f'Table {tableUri} has been shared with group {share.groupUri} in {len(other_shares)} more shares')
+ if len(other_shares) == 0:
+ log.info('Delete permissions...')
+ ResourcePolicyService.delete_resource_policy(session=session, group=share.groupUri, resource_uri=tableUri)
@staticmethod
- def check_before_delete(session, uri, **kwargs):
- """Implemented as part of the DatasetServiceInterface"""
- action = kwargs.get('action')
- if action in [DELETE_DATASET_FOLDER, DELETE_DATASET_TABLE]:
- existing_s3_shared_items = S3ShareObjectRepository.check_existing_s3_shared_items(session, uri)
- if existing_s3_shared_items:
- raise exceptions.ResourceShared(
- action=action,
- message='Revoke all shares for this item before deletion',
- )
- elif action in [DELETE_DATASET]:
- shares = S3ShareObjectRepository.list_s3_dataset_shares_with_existing_shared_items(
- session=session, dataset_uri=uri
+ def delete_dataset_folder_read_permission(session, share, locationUri):
+ """
+ Delete Folder permissions to share groups
+ """
+ other_shares = S3ShareObjectRepository.find_all_other_share_items(
+ session,
+ not_this_share_uri=share.shareUri,
+ item_uri=locationUri,
+ share_type=ShareableType.StorageLocation.value,
+ principal_type='GROUP',
+ principal_uri=share.groupUri,
+ item_status=[ShareItemStatus.Share_Succeeded.value],
+ )
+ log.info(
+ f'Location {locationUri} has been shared with group {share.groupUri} in {len(other_shares)} more shares'
+ )
+ if len(other_shares) == 0:
+ log.info('Delete permissions...')
+ ResourcePolicyService.delete_resource_policy(
+ session=session,
+ group=share.groupUri,
+ resource_uri=locationUri,
)
- if shares:
- raise exceptions.ResourceShared(
- action=DELETE_DATASET,
- message='Revoke all dataset shares before deletion.',
- )
- else:
- raise exceptions.RequiredParameter('Delete action')
- return True
@staticmethod
- def execute_on_delete(session, uri, **kwargs):
- """Implemented as part of the DatasetServiceInterface"""
- action = kwargs.get('action')
- if action in [DELETE_DATASET_FOLDER, DELETE_DATASET_TABLE]:
- S3ShareObjectRepository.delete_s3_share_item(session, uri)
- elif action in [DELETE_DATASET]:
- S3ShareObjectRepository.delete_s3_shares_with_no_shared_items(session, uri)
+ def attach_dataset_table_read_permission(session, share, tableUri):
+ """
+ Attach Table permissions to share groups
+ """
+ existing_policy = ResourcePolicyService.find_resource_policies(
+ session,
+ group=share.groupUri,
+ resource_uri=tableUri,
+ resource_type=DatasetTable.__name__,
+ permissions=DATASET_TABLE_READ,
+ )
+ # toDo: separate policies from list DATASET_TABLE_READ, because in future only one of them can be granted (Now they are always granted together)
+ if len(existing_policy) == 0:
+ log.info(
+ f'Attaching new resource permission policy {DATASET_TABLE_READ} to table {tableUri} for group {share.groupUri}'
+ )
+ ResourcePolicyService.attach_resource_policy(
+ session=session,
+ group=share.groupUri,
+ permissions=DATASET_TABLE_READ,
+ resource_uri=tableUri,
+ resource_type=DatasetTable.__name__,
+ )
else:
- raise exceptions.RequiredParameter('Delete action')
- return True
-
- @staticmethod
- def append_to_list_user_datasets(session, username, groups):
- """Implemented as part of the DatasetServiceInterface"""
- return S3ShareObjectRepository.list_user_s3_shared_datasets(session, username, groups)
+ log.info(
+ f'Resource permission policy {DATASET_TABLE_READ} to table {tableUri} for group {share.groupUri} already exists. Skip... '
+ )
@staticmethod
- def extend_attach_steward_permissions(session, dataset, new_stewards, **kwargs):
- """Implemented as part of the DatasetServiceInterface"""
- dataset_shares = S3ShareObjectRepository.find_s3_dataset_shares(session, dataset.datasetUri)
- if dataset_shares:
- for share in dataset_shares:
- ResourcePolicyService.attach_resource_policy(
- session=session,
- group=new_stewards,
- permissions=SHARE_OBJECT_APPROVER,
- resource_uri=share.shareUri,
- resource_type=ShareObject.__name__,
- )
- if dataset.stewards != dataset.SamlAdminGroupName:
- ResourcePolicyService.delete_resource_policy(
- session=session,
- group=dataset.stewards,
- resource_uri=share.shareUri,
- )
+ def attach_dataset_folder_read_permission(session, share, locationUri):
+ """
+ Attach Folder permissions to share groups
+ """
+ existing_policy = ResourcePolicyService.find_resource_policies(
+ session,
+ group=share.groupUri,
+ resource_uri=locationUri,
+ resource_type=DatasetStorageLocation.__name__,
+ permissions=DATASET_FOLDER_READ,
+ )
+ # toDo: separate policies from list DATASET_TABLE_READ, because in future only one of them can be granted (Now they are always granted together)
+ if len(existing_policy) == 0:
+ log.info(
+ f'Attaching new resource permission policy {DATASET_FOLDER_READ} to folder {locationUri} for group {share.groupUri}'
+ )
- @staticmethod
- def extend_delete_steward_permissions(session, dataset, **kwargs):
- """Implemented as part of the DatasetServiceInterface"""
- dataset_shares = S3ShareObjectRepository.find_s3_dataset_shares(session, dataset.datasetUri)
- if dataset_shares:
- for share in dataset_shares:
- if dataset.stewards != dataset.SamlAdminGroupName:
- ResourcePolicyService.delete_resource_policy(
- session=session,
- group=dataset.stewards,
- resource_uri=share.shareUri,
- )
+ ResourcePolicyService.attach_resource_policy(
+ session=session,
+ group=share.groupUri,
+ permissions=DATASET_FOLDER_READ,
+ resource_uri=locationUri,
+ resource_type=DatasetStorageLocation.__name__,
+ )
+ else:
+ log.info(
+ f'Resource permission policy {DATASET_FOLDER_READ} to table {locationUri} for group {share.groupUri} already exists. Skip... '
+ )
@staticmethod
@TenantPolicyService.has_tenant_permission(MANAGE_DATASETS)
@@ -134,6 +152,17 @@ def verify_dataset_share_objects(uri: str, share_uris: list):
ShareItemService.verify_items_share_object(uri=share_uri, item_uris=item_uris)
return True
+ @staticmethod
+ @TenantPolicyService.has_tenant_permission(MANAGE_DATASETS)
+ @ResourcePolicyService.has_resource_permission(UPDATE_DATASET)
+ def reapply_share_items_for_dataset(uri: str):
+ context = get_context()
+ with context.db_engine.scoped_session() as session:
+ reapply_share_items_task: Task = Task(action='ecs.dataset.share.reapply', targetUri=uri)
+ session.add(reapply_share_items_task)
+ Worker.queue(engine=context.db_engine, task_ids=[reapply_share_items_task.taskUri])
+ return True
+
@staticmethod
def list_shared_tables_by_env_dataset(dataset_uri: str, env_uri: str):
context = get_context()
@@ -195,9 +224,10 @@ def get_s3_consumption_data(uri):
separator='-',
)
# Check if the share was made with a Glue Database
- datasetGlueDatabase = S3ShareItemService.get_glue_database_for_share(
- dataset.GlueDatabaseName, dataset.AwsAccountId, dataset.region
- )
+ datasetGlueDatabase = GlueClient(
+ account_id=dataset.AwsAccountId, region=dataset.region, database=dataset.GlueDatabaseName
+ ).get_glue_database_from_catalog()
+
old_shared_db_name = f'{datasetGlueDatabase}_shared_{uri}'[:254]
database = GlueClient(
account_id=environment.AwsAccountId, region=environment.region, database=old_shared_db_name
diff --git a/backend/dataall/modules/s3_datasets_shares/services/share_item_service.py b/backend/dataall/modules/s3_datasets_shares/services/share_item_service.py
deleted file mode 100644
index 695b9ccd4..000000000
--- a/backend/dataall/modules/s3_datasets_shares/services/share_item_service.py
+++ /dev/null
@@ -1,135 +0,0 @@
-import logging
-
-from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService
-from dataall.modules.shares_base.services.shares_enums import (
- ShareableType,
- ShareItemStatus,
-)
-from dataall.modules.s3_datasets_shares.aws.glue_client import GlueClient
-from dataall.modules.s3_datasets_shares.db.share_object_repositories import S3ShareObjectRepository
-from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, DatasetStorageLocation
-from dataall.modules.s3_datasets.services.dataset_permissions import DATASET_TABLE_READ, DATASET_FOLDER_READ
-
-log = logging.getLogger(__name__)
-
-
-class S3ShareItemService:
- @staticmethod
- def get_glue_database_for_share(glueDatabase, account_id, region): # TODO: IN S3_DATASETS_SHARES
- # Check if a catalog account exists and return database accordingly
- try:
- catalog_dict = GlueClient(
- account_id=account_id,
- region=region,
- database=glueDatabase,
- ).get_source_catalog()
-
- if catalog_dict is not None:
- return catalog_dict.get('database_name')
- else:
- return glueDatabase
- except Exception as e:
- raise e
-
- @staticmethod
- def delete_dataset_table_read_permission(session, share, tableUri):
- """
- Delete Table permissions to share groups
- """
- other_shares = S3ShareObjectRepository.find_all_other_share_items(
- session,
- not_this_share_uri=share.shareUri,
- item_uri=tableUri,
- share_type=ShareableType.Table.value,
- principal_type='GROUP',
- principal_uri=share.groupUri,
- item_status=[ShareItemStatus.Share_Succeeded.value],
- )
- log.info(f'Table {tableUri} has been shared with group {share.groupUri} in {len(other_shares)} more shares')
- if len(other_shares) == 0:
- log.info('Delete permissions...')
- ResourcePolicyService.delete_resource_policy(session=session, group=share.groupUri, resource_uri=tableUri)
-
- @staticmethod
- def delete_dataset_folder_read_permission(session, share, locationUri):
- """
- Delete Folder permissions to share groups
- """
- other_shares = S3ShareObjectRepository.find_all_other_share_items(
- session,
- not_this_share_uri=share.shareUri,
- item_uri=locationUri,
- share_type=ShareableType.StorageLocation.value,
- principal_type='GROUP',
- principal_uri=share.groupUri,
- item_status=[ShareItemStatus.Share_Succeeded.value],
- )
- log.info(
- f'Location {locationUri} has been shared with group {share.groupUri} in {len(other_shares)} more shares'
- )
- if len(other_shares) == 0:
- log.info('Delete permissions...')
- ResourcePolicyService.delete_resource_policy(
- session=session,
- group=share.groupUri,
- resource_uri=locationUri,
- )
-
- @staticmethod
- def attach_dataset_table_read_permission(session, share, tableUri):
- """
- Attach Table permissions to share groups
- """
- existing_policy = ResourcePolicyService.find_resource_policies(
- session,
- group=share.groupUri,
- resource_uri=tableUri,
- resource_type=DatasetTable.__name__,
- permissions=DATASET_TABLE_READ,
- )
- # toDo: separate policies from list DATASET_TABLE_READ, because in future only one of them can be granted (Now they are always granted together)
- if len(existing_policy) == 0:
- log.info(
- f'Attaching new resource permission policy {DATASET_TABLE_READ} to table {tableUri} for group {share.groupUri}'
- )
- ResourcePolicyService.attach_resource_policy(
- session=session,
- group=share.groupUri,
- permissions=DATASET_TABLE_READ,
- resource_uri=tableUri,
- resource_type=DatasetTable.__name__,
- )
- else:
- log.info(
- f'Resource permission policy {DATASET_TABLE_READ} to table {tableUri} for group {share.groupUri} already exists. Skip... '
- )
-
- @staticmethod
- def attach_dataset_folder_read_permission(session, share, locationUri):
- """
- Attach Folder permissions to share groups
- """
- existing_policy = ResourcePolicyService.find_resource_policies(
- session,
- group=share.groupUri,
- resource_uri=locationUri,
- resource_type=DatasetStorageLocation.__name__,
- permissions=DATASET_FOLDER_READ,
- )
- # toDo: separate policies from list DATASET_TABLE_READ, because in future only one of them can be granted (Now they are always granted together)
- if len(existing_policy) == 0:
- log.info(
- f'Attaching new resource permission policy {DATASET_FOLDER_READ} to folder {locationUri} for group {share.groupUri}'
- )
-
- ResourcePolicyService.attach_resource_policy(
- session=session,
- group=share.groupUri,
- permissions=DATASET_FOLDER_READ,
- resource_uri=locationUri,
- resource_type=DatasetStorageLocation.__name__,
- )
- else:
- log.info(
- f'Resource permission policy {DATASET_FOLDER_READ} to table {locationUri} for group {share.groupUri} already exists. Skip... '
- )
diff --git a/backend/dataall/modules/s3_datasets_shares/services/share_managers/lf_share_manager.py b/backend/dataall/modules/s3_datasets_shares/services/share_managers/lf_share_manager.py
index 8d12fb40f..b2ddef07a 100644
--- a/backend/dataall/modules/s3_datasets_shares/services/share_managers/lf_share_manager.py
+++ b/backend/dataall/modules/s3_datasets_shares/services/share_managers/lf_share_manager.py
@@ -19,7 +19,7 @@
ShareItemHealthStatus,
)
from dataall.modules.s3_datasets.db.dataset_models import DatasetTable
-from dataall.modules.s3_datasets_shares.services.dataset_sharing_alarm_service import DatasetSharingAlarmService
+from dataall.modules.s3_datasets_shares.services.s3_share_alarm_service import S3ShareAlarmService
from dataall.modules.shares_base.db.share_object_models import ShareObjectItem
from dataall.modules.s3_datasets_shares.services.share_managers.share_manager_utils import ShareErrorFormatter
from dataall.modules.shares_base.services.sharing_service import ShareData
@@ -569,7 +569,7 @@ def handle_share_failure(
f'due to: {error}'
)
- DatasetSharingAlarmService().trigger_table_sharing_failure_alarm(table, self.share, self.target_environment)
+ S3ShareAlarmService().trigger_table_sharing_failure_alarm(table, self.share, self.target_environment)
return True
def handle_revoke_failure(
@@ -589,9 +589,7 @@ def handle_revoke_failure(
f'with target account {self.target_environment.AwsAccountId}/{self.target_environment.region} '
f'due to: {error}'
)
- DatasetSharingAlarmService().trigger_revoke_table_sharing_failure_alarm(
- table, self.share, self.target_environment
- )
+ S3ShareAlarmService().trigger_revoke_table_sharing_failure_alarm(table, self.share, self.target_environment)
return True
def handle_share_failure_for_all_tables(self, tables, error, share_item_status, reapply=False):
diff --git a/backend/dataall/modules/s3_datasets_shares/services/share_managers/s3_access_point_share_manager.py b/backend/dataall/modules/s3_datasets_shares/services/share_managers/s3_access_point_share_manager.py
index e3663e3ee..8aaa9a7ea 100644
--- a/backend/dataall/modules/s3_datasets_shares/services/share_managers/s3_access_point_share_manager.py
+++ b/backend/dataall/modules/s3_datasets_shares/services/share_managers/s3_access_point_share_manager.py
@@ -18,12 +18,12 @@
DATAALL_KMS_PIVOT_ROLE_PERMISSIONS_SID,
)
from dataall.base.aws.iam import IAM
-from dataall.modules.s3_datasets_shares.services.dataset_sharing_alarm_service import DatasetSharingAlarmService
+from dataall.modules.s3_datasets_shares.services.s3_share_alarm_service import S3ShareAlarmService
from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository
from dataall.modules.shares_base.services.share_exceptions import PrincipalRoleNotFound
from dataall.modules.s3_datasets_shares.services.share_managers.share_manager_utils import ShareErrorFormatter
-from dataall.modules.s3_datasets_shares.services.managed_share_policy_service import (
- SharePolicyService,
+from dataall.modules.s3_datasets_shares.services.s3_share_managed_policy_service import (
+ S3SharePolicyService,
IAM_S3_ACCESS_POINTS_STATEMENT_SID,
EMPTY_STATEMENT_SID,
)
@@ -159,7 +159,7 @@ def check_target_role_access_policy(self) -> None:
key_alias = f'alias/{self.dataset.KmsAlias}'
kms_client = KmsClient(self.dataset_account_id, self.source_environment.region)
kms_key_id = kms_client.get_key_id(key_alias)
- share_policy_service = SharePolicyService(
+ share_policy_service = S3SharePolicyService(
environmentUri=self.target_environment.environmentUri,
account=self.target_environment.AwsAccountId,
region=self.target_environment.region,
@@ -194,7 +194,7 @@ def check_target_role_access_policy(self) -> None:
)
logger.info(f'Policy... {policy_document}')
- s3_statement_index = SharePolicyService._get_statement_by_sid(
+ s3_statement_index = S3SharePolicyService._get_statement_by_sid(
policy_document, f'{IAM_S3_ACCESS_POINTS_STATEMENT_SID}S3'
)
@@ -228,7 +228,7 @@ def check_target_role_access_policy(self) -> None:
)
if kms_key_id:
- kms_statement_index = SharePolicyService._get_statement_by_sid(
+ kms_statement_index = S3SharePolicyService._get_statement_by_sid(
policy_document, f'{IAM_S3_ACCESS_POINTS_STATEMENT_SID}KMS'
)
kms_target_resources = [f'arn:aws:kms:{self.dataset_region}:{self.dataset_account_id}:key/{kms_key_id}']
@@ -268,7 +268,7 @@ def grant_target_role_access_policy(self):
"""
logger.info(f'Grant target role {self.target_requester_IAMRoleName} access policy')
- share_policy_service = SharePolicyService(
+ share_policy_service = S3SharePolicyService(
environmentUri=self.target_environment.environmentUri,
account=self.target_environment.AwsAccountId,
region=self.target_environment.region,
@@ -619,7 +619,7 @@ def delete_access_point(self):
def revoke_target_role_access_policy(self):
logger.info('Deleting target role IAM statements...')
- share_policy_service = SharePolicyService(
+ share_policy_service = S3SharePolicyService(
environmentUri=self.target_environment.environmentUri,
account=self.target_environment.AwsAccountId,
region=self.target_environment.region,
@@ -718,7 +718,7 @@ def handle_share_failure(self, error: Exception) -> None:
f'with target account {self.target_environment.AwsAccountId}/{self.target_environment.region} '
f'due to: {error}'
)
- DatasetSharingAlarmService().trigger_folder_sharing_failure_alarm(
+ S3ShareAlarmService().trigger_folder_sharing_failure_alarm(
self.target_folder, self.share, self.target_environment
)
@@ -735,7 +735,7 @@ def handle_revoke_failure(self, error: Exception) -> bool:
f'with target account {self.target_environment.AwsAccountId}/{self.target_environment.region} '
f'due to: {error}'
)
- DatasetSharingAlarmService().trigger_revoke_folder_sharing_failure_alarm(
+ S3ShareAlarmService().trigger_revoke_folder_sharing_failure_alarm(
self.target_folder, self.share, self.target_environment
)
return True
diff --git a/backend/dataall/modules/s3_datasets_shares/services/share_managers/s3_bucket_share_manager.py b/backend/dataall/modules/s3_datasets_shares/services/share_managers/s3_bucket_share_manager.py
index 609b50a7b..aafc932e7 100644
--- a/backend/dataall/modules/s3_datasets_shares/services/share_managers/s3_bucket_share_manager.py
+++ b/backend/dataall/modules/s3_datasets_shares/services/share_managers/s3_bucket_share_manager.py
@@ -15,9 +15,9 @@
from dataall.modules.shares_base.db.share_object_models import ShareObject
from dataall.modules.shares_base.services.share_exceptions import PrincipalRoleNotFound
from dataall.modules.s3_datasets_shares.services.share_managers.share_manager_utils import ShareErrorFormatter
-from dataall.modules.s3_datasets_shares.services.dataset_sharing_alarm_service import DatasetSharingAlarmService
-from dataall.modules.s3_datasets_shares.services.managed_share_policy_service import (
- SharePolicyService,
+from dataall.modules.s3_datasets_shares.services.s3_share_alarm_service import S3ShareAlarmService
+from dataall.modules.s3_datasets_shares.services.s3_share_managed_policy_service import (
+ S3SharePolicyService,
IAM_S3_BUCKETS_STATEMENT_SID,
EMPTY_STATEMENT_SID,
)
@@ -70,7 +70,7 @@ def check_s3_iam_access(self) -> None:
kms_client = KmsClient(self.source_account_id, self.source_environment.region)
kms_key_id = kms_client.get_key_id(key_alias)
- share_policy_service = SharePolicyService(
+ share_policy_service = S3SharePolicyService(
environmentUri=self.target_environment.environmentUri,
account=self.target_environment.AwsAccountId,
region=self.target_environment.region,
@@ -98,7 +98,7 @@ def check_s3_iam_access(self) -> None:
version_id, policy_document = IAM.get_managed_policy_default_version(
self.target_environment.AwsAccountId, self.target_environment.region, share_resource_policy_name
)
- s3_statement_index = SharePolicyService._get_statement_by_sid(
+ s3_statement_index = S3SharePolicyService._get_statement_by_sid(
policy_document, f'{IAM_S3_BUCKETS_STATEMENT_SID}S3'
)
@@ -131,7 +131,7 @@ def check_s3_iam_access(self) -> None:
)
if kms_key_id:
- kms_statement_index = SharePolicyService._get_statement_by_sid(
+ kms_statement_index = S3SharePolicyService._get_statement_by_sid(
policy_document, f'{IAM_S3_BUCKETS_STATEMENT_SID}KMS'
)
kms_target_resources = [f'arn:aws:kms:{self.bucket_region}:{self.source_account_id}:key/{kms_key_id}']
@@ -172,7 +172,7 @@ def grant_s3_iam_access(self):
"""
logger.info(f'Grant target role {self.target_requester_IAMRoleName} access policy')
- share_policy_service = SharePolicyService(
+ share_policy_service = S3SharePolicyService(
environmentUri=self.target_environment.environmentUri,
account=self.target_environment.AwsAccountId,
region=self.target_environment.region,
@@ -484,7 +484,7 @@ def delete_target_role_access_policy(
):
logger.info('Deleting target role IAM statements...')
- share_policy_service = SharePolicyService(
+ share_policy_service = S3SharePolicyService(
role_name=share.principalIAMRoleName,
account=target_environment.AwsAccountId,
region=self.target_environment.region,
@@ -574,7 +574,7 @@ def handle_share_failure(self, error: Exception) -> bool:
f'with target account {self.target_environment.AwsAccountId}/{self.target_environment.region} '
f'due to: {error}'
)
- DatasetSharingAlarmService().trigger_s3_bucket_sharing_failure_alarm(
+ S3ShareAlarmService().trigger_s3_bucket_sharing_failure_alarm(
self.target_bucket, self.share, self.target_environment
)
return True
@@ -592,7 +592,7 @@ def handle_revoke_failure(self, error: Exception) -> bool:
f'with target account {self.target_environment.AwsAccountId}/{self.target_environment.region} '
f'due to: {error}'
)
- DatasetSharingAlarmService().trigger_revoke_s3_bucket_sharing_failure_alarm(
+ S3ShareAlarmService().trigger_revoke_s3_bucket_sharing_failure_alarm(
self.target_bucket, self.share, self.target_environment
)
return True
diff --git a/backend/dataall/modules/s3_datasets_shares/services/share_processors/glue_table_share_processor.py b/backend/dataall/modules/s3_datasets_shares/services/share_processors/glue_table_share_processor.py
index 8c5bc0f42..3091f7041 100644
--- a/backend/dataall/modules/s3_datasets_shares/services/share_processors/glue_table_share_processor.py
+++ b/backend/dataall/modules/s3_datasets_shares/services/share_processors/glue_table_share_processor.py
@@ -16,10 +16,10 @@
from dataall.modules.s3_datasets_shares.services.share_managers import LFShareManager
from dataall.modules.s3_datasets_shares.aws.ram_client import RamClient
from dataall.modules.shares_base.services.share_object_service import ShareObjectService
-from dataall.modules.s3_datasets_shares.services.share_item_service import S3ShareItemService
+from dataall.modules.s3_datasets_shares.services.s3_share_service import S3ShareService
from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository
from dataall.modules.shares_base.db.share_state_machines_repositories import ShareStatusRepository
-from dataall.modules.s3_datasets_shares.db.share_object_repositories import S3ShareObjectRepository
+from dataall.modules.s3_datasets_shares.db.s3_share_object_repositories import S3ShareObjectRepository
from dataall.modules.shares_base.db.share_object_state_machines import ShareItemSM
from dataall.modules.s3_datasets_shares.services.share_managers.share_manager_utils import ShareErrorFormatter
@@ -154,7 +154,7 @@ def process_approved_shares(self) -> bool:
manager.grant_principals_permissions_to_resource_link_table(table)
log.info('Attaching TABLE READ permissions...')
- S3ShareItemService.attach_dataset_table_read_permission(
+ S3ShareService.attach_dataset_table_read_permission(
self.session, self.share_data.share, table.tableUri
)
@@ -276,7 +276,7 @@ def process_revoked_shares(self) -> bool:
and self.share_data.share.groupUri != self.share_data.dataset.stewards
):
log.info('Deleting TABLE READ permissions...')
- S3ShareItemService.delete_dataset_table_read_permission(
+ S3ShareService.delete_dataset_table_read_permission(
self.session, self.share_data.share, table.tableUri
)
diff --git a/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_access_point_share_processor.py b/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_access_point_share_processor.py
index ab464f4dc..87ec4f6c0 100644
--- a/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_access_point_share_processor.py
+++ b/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_access_point_share_processor.py
@@ -5,7 +5,7 @@
from dataall.modules.shares_base.services.share_exceptions import PrincipalRoleNotFound
from dataall.modules.s3_datasets_shares.services.share_managers import S3AccessPointShareManager
from dataall.modules.shares_base.services.share_object_service import ShareObjectService
-from dataall.modules.s3_datasets_shares.services.share_item_service import S3ShareItemService
+from dataall.modules.s3_datasets_shares.services.s3_share_service import S3ShareService
from dataall.modules.shares_base.services.shares_enums import (
ShareItemHealthStatus,
ShareItemStatus,
@@ -76,7 +76,7 @@ def process_approved_shares(self) -> bool:
manager.update_dataset_bucket_key_policy()
log.info('Attaching FOLDER READ permissions...')
- S3ShareItemService.attach_dataset_folder_read_permission(
+ S3ShareService.attach_dataset_folder_read_permission(
self.session, self.share_data.share, folder.locationUri
)
@@ -145,7 +145,7 @@ def process_revoked_shares(self) -> bool:
and self.share_data.share.groupUri != self.share_data.dataset.stewards
):
log.info(f'Deleting FOLDER READ permissions from {folder.locationUri}...')
- S3ShareItemService.delete_dataset_folder_read_permission(
+ S3ShareService.delete_dataset_folder_read_permission(
self.session, manager.share, folder.locationUri
)
diff --git a/backend/dataall/modules/s3_datasets_shares/tasks/dataset_subscription_task.py b/backend/dataall/modules/s3_datasets_shares/tasks/dataset_subscription_task.py
index e382b05ef..e5a29c904 100644
--- a/backend/dataall/modules/s3_datasets_shares/tasks/dataset_subscription_task.py
+++ b/backend/dataall/modules/s3_datasets_shares/tasks/dataset_subscription_task.py
@@ -10,7 +10,7 @@
from dataall.core.environment.services.environment_service import EnvironmentService
from dataall.base.db import get_engine
from dataall.modules.shares_base.db.share_object_models import ShareObjectItem
-from dataall.modules.s3_datasets_shares.db.share_object_repositories import S3ShareObjectRepository
+from dataall.modules.s3_datasets_shares.db.s3_share_object_repositories import S3ShareObjectRepository
from dataall.modules.shares_base.services.share_notification_service import ShareNotificationService
from dataall.modules.s3_datasets.aws.sns_dataset_client import SnsDatasetClient
from dataall.modules.s3_datasets.db.dataset_location_repositories import DatasetLocationRepository
diff --git a/backend/dataall/modules/shares_base/db/share_object_repositories.py b/backend/dataall/modules/shares_base/db/share_object_repositories.py
index 0a9e2eafb..596a5771a 100644
--- a/backend/dataall/modules/shares_base/db/share_object_repositories.py
+++ b/backend/dataall/modules/shares_base/db/share_object_repositories.py
@@ -4,11 +4,14 @@
from typing import List
from dataall.base.db import exceptions, paginate
+from dataall.base.db.paginator import Page
from dataall.core.organizations.db.organization_models import Organization
from dataall.core.environment.db.environment_models import Environment, EnvironmentGroup
from dataall.modules.datasets_base.db.dataset_models import DatasetBase
from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository
+from dataall.modules.notifications.db.notification_models import Notification
from dataall.modules.shares_base.db.share_object_models import ShareObjectItem, ShareObject
+
from dataall.modules.shares_base.services.shares_enums import (
ShareItemHealthStatus,
PrincipalType,
@@ -349,7 +352,9 @@ def list_shareable_items_of_type(session, share, type, share_type_model, share_t
@staticmethod
def paginated_list_shareable_items(session, subqueries: List[Query], data: dict = None):
- if len(subqueries) == 1:
+ if len(subqueries) == 0:
+ return Page([], 1, 1, 0) # empty page. All modules are turned off
+ elif len(subqueries) == 1:
shareable_objects = subqueries[0].subquery('shareable_objects')
else:
shareable_objects = subqueries[0].union(*subqueries[1:]).subquery('shareable_objects')
@@ -377,3 +382,33 @@ def paginated_list_shareable_items(session, subqueries: List[Query], data: dict
return paginate(
query.order_by(shareable_objects.c.itemName).distinct(), data.get('page', 1), data.get('pageSize', 10)
).to_dict()
+
+ @staticmethod
+ def list_active_share_object_for_dataset(session, dataset_uri):
+ share_objects = (
+ session.query(ShareObject)
+ .filter(and_(ShareObject.datasetUri == dataset_uri, ShareObject.deleted.is_(None)))
+ .all()
+ )
+ return share_objects
+
+ @staticmethod
+ def fetch_submitted_shares_with_notifications(session):
+ """
+ A method used by the scheduled ECS Task to run fetch_submitted_shares_with_notifications() process against ALL shared objects in ALL
+ active share objects within dataall
+ """
+ with session() as session:
+ pending_shares = (
+ session.query(ShareObject)
+ .join(
+ Notification,
+ and_(
+ ShareObject.shareUri == func.split_part(Notification.target_uri, '|', 1),
+ ShareObject.datasetUri == func.split_part(Notification.target_uri, '|', 2),
+ ),
+ )
+ .filter(and_(Notification.type == 'SHARE_OBJECT_SUBMITTED', ShareObject.status == 'Submitted'))
+ .all()
+ )
+ return pending_shares
diff --git a/backend/dataall/modules/shares_base/handlers/ecs_share_handler.py b/backend/dataall/modules/shares_base/handlers/ecs_share_handler.py
index 0bf633a66..b0308dd47 100644
--- a/backend/dataall/modules/shares_base/handlers/ecs_share_handler.py
+++ b/backend/dataall/modules/shares_base/handlers/ecs_share_handler.py
@@ -1,3 +1,4 @@
+import json
import logging
import os
@@ -5,6 +6,7 @@
from dataall.core.stacks.aws.ecs import Ecs
from dataall.core.tasks.db.task_models import Task
from dataall.modules.shares_base.services.sharing_service import SharingService
+from dataall.modules.shares_base.tasks.share_reapplier_task import EcsBulkShareRepplyService
log = logging.getLogger(__name__)
@@ -30,21 +32,43 @@ def verify_share(engine, task: Task):
def reapply_share(engine, task: Task):
return EcsShareHandler._manage_share(engine, task, SharingService.reapply_share, 'reapply_share')
+ @staticmethod
+ @Worker.handler(path='ecs.dataset.share.reapply')
+ def reapply_shares_of_dataset(engine, task: Task):
+ envname = os.environ.get('envname', 'local')
+ if envname in ['local', 'dkrcompose']:
+ EcsBulkShareRepplyService.process_reapply_shares_for_dataset(engine, task.targetUri)
+ else:
+ context = [
+ {'name': 'datasetUri', 'value': task.targetUri},
+ ]
+ return EcsShareHandler._run_share_management_ecs_task(
+ task_definition_param_str='ecs/task_def_arn/share_reapplier',
+ container_name_param_str='ecs/container/share_reapplier',
+ context=context,
+ )
+
@staticmethod
def _manage_share(engine, task: Task, local_handler, ecs_handler: str):
envname = os.environ.get('envname', 'local')
if envname in ['local', 'dkrcompose']:
return local_handler(engine, task.targetUri)
else:
- return EcsShareHandler._run_share_management_ecs_task(share_uri=task.targetUri, handler=ecs_handler)
+ share_management_context = [
+ {'name': 'shareUri', 'value': task.targetUri},
+ {'name': 'handler', 'value': ecs_handler},
+ ]
+ return EcsShareHandler._run_share_management_ecs_task(
+ task_definition_param_str='ecs/task_def_arn/share_management',
+ container_name_param_str='ecs/container/share_management',
+ context=share_management_context,
+ )
@staticmethod
- def _run_share_management_ecs_task(share_uri, handler):
- return Ecs.run_ecs_task(
- task_definition_param='ecs/task_def_arn/share_management',
- container_name_param='ecs/container/share_management',
- context=[
- {'name': 'shareUri', 'value': share_uri},
- {'name': 'handler', 'value': handler},
- ],
+ def _run_share_management_ecs_task(task_definition_param_str, container_name_param_str, context):
+ ecs_task_arn = Ecs.run_ecs_task(
+ task_definition_param=task_definition_param_str,
+ container_name_param=container_name_param_str,
+ context=context,
)
+ return {'task_arn': ecs_task_arn}
diff --git a/backend/dataall/modules/shares_base/services/share_item_service.py b/backend/dataall/modules/shares_base/services/share_item_service.py
index 5ad7c1472..7237e4f4d 100644
--- a/backend/dataall/modules/shares_base/services/share_item_service.py
+++ b/backend/dataall/modules/shares_base/services/share_item_service.py
@@ -3,12 +3,10 @@
from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService
from dataall.core.tasks.service_handlers import Worker
from dataall.base.context import get_context
-from dataall.core.environment.services.environment_service import EnvironmentService
from dataall.core.tasks.db.task_models import Task
from dataall.base.db.exceptions import ObjectNotFound, UnauthorizedOperation
from dataall.modules.shares_base.services.shares_enums import (
ShareObjectActions,
- ShareableType,
ShareItemStatus,
ShareItemActions,
ShareItemHealthStatus,
diff --git a/backend/dataall/modules/shares_base/services/share_notification_service.py b/backend/dataall/modules/shares_base/services/share_notification_service.py
index 765138af9..e5b66d383 100644
--- a/backend/dataall/modules/shares_base/services/share_notification_service.py
+++ b/backend/dataall/modules/shares_base/services/share_notification_service.py
@@ -9,6 +9,7 @@
from dataall.base.context import get_context
from dataall.modules.shares_base.services.shares_enums import ShareObjectStatus
from dataall.modules.notifications.db.notification_repositories import NotificationRepository
+from dataall.modules.notifications.services.ses_email_notification_service import SESEmailNotificationService
from dataall.modules.datasets_base.db.dataset_models import DatasetBase
log = logging.getLogger(__name__)
@@ -56,11 +57,47 @@ def notify_share_object_submission(self, email_id: str):
self._create_notification_task(subject=subject, msg=email_notification_msg)
return notifications
+ def notify_persistent_email_reminder(self, email_id: str):
+ share_link_text = ''
+ if os.environ.get('frontend_domain_url'):
+ share_link_text = (
+ f'
Please visit data.all share link '
+ f'to review and take appropriate action or view more details.'
+ )
+
+ msg_intro = f"""Dear User,
+ This is a reminder that a share request for the dataset "{self.dataset.label}" submitted by {email_id}
+ on behalf of principal "{self.share.principalId}" is still pending and has not been addressed.
+ """
+
+ msg_end = """Your prompt attention to this matter is greatly appreciated.
+ Best regards,
+ The Data.all Team
+ """
+
+ subject = f'URGENT REMINDER: Data.all | Action Required on Pending Share Request for {self.dataset.label}'
+ email_notification_msg = msg_intro + share_link_text + msg_end
+
+ notifications = self.register_notifications(
+ notification_type=DataSharingNotificationType.SHARE_OBJECT_SUBMITTED.value, msg=msg_intro
+ )
+
+ self._create_persistent_reminder_notification_task(subject=subject, msg=email_notification_msg)
+ return notifications
+
def notify_share_object_approval(self, email_id: str):
share_link_text = ''
if os.environ.get('frontend_domain_url'):
- share_link_text = f'
Please visit data.all share link to take action or view more details'
- msg = f'User {email_id} APPROVED share request for dataset {self.dataset.label} for principal {self.share.principalId}'
+ share_link_text = (
+ f'