Skip to content

Commit

Permalink
Generic shares_base module and specific s3_datasets_shares module - p…
Browse files Browse the repository at this point in the history
…art 11 (renaming and cleaning up s3_shares) (#1359)

### Feature or Bugfix
- Refactoring

### Detail
As explained in the design for #1123 and #1283 we are trying to
implement generic `datasets_base` and `shares_base` modules that can be
used by any type of datasets and by any type of shareable object in a
generic way.

This is one of the last PRs focused on renaming files and cleaning-up
the s3_datasets_shares module. The first step is a consolidation of the
file and classes names in the services to clearly refer to s3_shares:
- `services.managed_share_policy_service.SharePolicyService` --->
`services.s3_share_managed_policy_service.S3SharePolicyService`
- `services.dataset_sharing_alarm_service.DatasetSharingAlarmService`
--> `services.s3_share_alarm_service.S3ShareAlarmService`
- `services.managed_share_policy_service.SharePolicyService` -->
`services.s3_share_managed_policy_service.S3SharePolicyService`

👀 The main refactoring happens in what is used to be
`services.dataset_sharing_service`.
- The part that implements the `DatasetServiceInterface` has been moved
to `services/s3_share_dataset_service.py` as the `S3ShareDatasetService`
- The part used in the resolvers and by other methods has been renamed
as `services.s3_share_service.py` and the methods for the folder/table
permissions are also added to the S3ShareService (from
share_item_service)

Lastly, there is one method previously in share_item_service that has
been moved to the GlueClient directly as
`get_glue_database_from_catalog`.


### Relates
- #1283 
- #1123 
- #955 

### Security
Please answer the questions below briefly where applicable, or write
`N/A`. Based on
[OWASP 10](https://owasp.org/Top10/en/).

- Does this PR introduce or modify any input fields or queries - this
includes
fetching data from storage outside the application (e.g. a database, an
S3 bucket)?
  - Is the input sanitized?
- What precautions are you taking before deserializing the data you
consume?
  - Is injection prevented by parametrizing queries?
  - Have you ensured no `eval` or similar functions are used?
- Does this PR introduce any functionality or component that requires
authorization?
- How have you ensured it respects the existing AuthN/AuthZ mechanisms?
  - Are you logging failed auth attempts?
- Are you using or adding any cryptographic features?
  - Do you use a standard proven implementations?
  - Are the used keys controlled by the customer? Where are they stored?
- Are you introducing any new policies/roles/users?
  - Have you used the least-privilege principle? How?


By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
  • Loading branch information
dlpzx authored Jun 27, 2024
1 parent 1c09015 commit 2e80de4
Show file tree
Hide file tree
Showing 20 changed files with 371 additions and 497 deletions.
20 changes: 10 additions & 10 deletions backend/dataall/modules/s3_datasets_shares/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@ def depends_on() -> List[Type['ModuleInterface']]:
def __init__(self):
from dataall.core.environment.services.environment_resource_manager import EnvironmentResourceManager
from dataall.modules.s3_datasets_shares import api
from dataall.modules.s3_datasets_shares.services.managed_share_policy_service import SharePolicyService
from dataall.modules.s3_datasets_shares.services.s3_share_managed_policy_service import S3SharePolicyService
from dataall.modules.s3_datasets.services.dataset_service import DatasetService
from dataall.modules.datasets_base.services.dataset_list_service import DatasetListService
from dataall.modules.s3_datasets_shares.services.dataset_sharing_service import DatasetSharingService
from dataall.modules.s3_datasets_shares.db.share_object_repositories import ShareEnvironmentResource
from dataall.modules.s3_datasets_shares.services.s3_share_dataset_service import S3ShareDatasetService
from dataall.modules.s3_datasets_shares.db.s3_share_object_repositories import S3ShareEnvironmentResource
from dataall.modules.shares_base.services.share_processor_manager import (
ShareProcessorManager,
ShareProcessorDefinition,
)
from dataall.modules.shares_base.services.shares_enums import ShareableType
from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, DatasetBucket, DatasetStorageLocation

EnvironmentResourceManager.register(ShareEnvironmentResource())
DatasetService.register(DatasetSharingService())
DatasetListService.register(DatasetSharingService())
EnvironmentResourceManager.register(S3ShareEnvironmentResource())
DatasetService.register(S3ShareDatasetService())
DatasetListService.register(S3ShareDatasetService())

ShareProcessorManager.register_processor(
ShareProcessorDefinition(ShareableType.Table, None, DatasetTable, DatasetTable.tableUri)
Expand Down Expand Up @@ -77,7 +77,7 @@ def depends_on() -> List[Type['ModuleInterface']]:
]

def __init__(self):
log.info('S3 Sharing handlers have been imported')
log.info('s3_datasets_shares handlers have been imported')


class S3DatasetsSharesCdkModuleInterface(ModuleInterface):
Expand All @@ -89,9 +89,9 @@ def is_supported(modes):

def __init__(self):
import dataall.modules.s3_datasets_shares.cdk
from dataall.modules.s3_datasets_shares.services.managed_share_policy_service import SharePolicyService
from dataall.modules.s3_datasets_shares.services.s3_share_managed_policy_service import S3SharePolicyService

log.info('CDK module data_sharing has been imported')
log.info('CDK module s3_datasets_shares has been imported')


class S3DatasetsSharesECSShareModuleInterface(ModuleInterface):
Expand Down Expand Up @@ -144,4 +144,4 @@ def __init__(self):
)
)

log.info('ECS Share module s3_data_sharing has been imported')
log.info('ECS Share module s3_datasets_shares has been imported')
16 changes: 7 additions & 9 deletions backend/dataall/modules/s3_datasets_shares/api/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataall.base.api.context import Context
from dataall.base.db.exceptions import RequiredParameter
from dataall.base.feature_toggle_checker import is_feature_enabled
from dataall.modules.s3_datasets_shares.services.dataset_sharing_service import DatasetSharingService
from dataall.modules.s3_datasets_shares.services.s3_share_service import S3ShareService


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -41,32 +41,30 @@ def validate_dataset_share_selector_input(data):


def list_shared_tables_by_env_dataset(context: Context, source, datasetUri: str, envUri: str):
return DatasetSharingService.list_shared_tables_by_env_dataset(datasetUri, envUri)
return S3ShareService.list_shared_tables_by_env_dataset(datasetUri, envUri)


@is_feature_enabled('modules.s3_datasets.features.aws_actions')
def get_dataset_shared_assume_role_url(context: Context, source, datasetUri: str = None):
return DatasetSharingService.get_dataset_shared_assume_role_url(uri=datasetUri)
return S3ShareService.get_dataset_shared_assume_role_url(uri=datasetUri)


def verify_dataset_share_objects(context: Context, source, input):
RequestValidator.validate_dataset_share_selector_input(input)
dataset_uri = input.get('datasetUri')
verify_share_uris = input.get('shareUris')
return DatasetSharingService.verify_dataset_share_objects(uri=dataset_uri, share_uris=verify_share_uris)
return S3ShareService.verify_dataset_share_objects(uri=dataset_uri, share_uris=verify_share_uris)


def get_s3_consumption_data(context: Context, source, shareUri: str):
return DatasetSharingService.get_s3_consumption_data(uri=shareUri)
return S3ShareService.get_s3_consumption_data(uri=shareUri)


def list_shared_databases_tables_with_env_group(context: Context, source, environmentUri: str, groupUri: str):
return DatasetSharingService.list_shared_databases_tables_with_env_group(
environmentUri=environmentUri, groupUri=groupUri
)
return S3ShareService.list_shared_databases_tables_with_env_group(environmentUri=environmentUri, groupUri=groupUri)


def resolve_shared_db_name(context: Context, source, **kwargs):
return DatasetSharingService.resolve_shared_db_name(
return S3ShareService.resolve_shared_db_name(
source.GlueDatabaseName, source.shareUri, source.targetEnvAwsAccountId, source.targetEnvRegion
)
12 changes: 12 additions & 0 deletions backend/dataall/modules/s3_datasets_shares/aws/glue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ def get_source_catalog(self):
raise e
return None

def get_glue_database_from_catalog(self):
# Check if a catalog account exists and return database accordingly
try:
catalog_dict = self.get_source_catalog()

if catalog_dict is not None:
return catalog_dict.get('database_name')
else:
return self._database
except Exception as e:
raise e

def get_database_tags(self):
# Get tags from the glue database
account_id = self._account_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,27 @@
from warnings import warn
from typing import List

from sqlalchemy import and_, or_, func, case
from sqlalchemy import and_, or_
from sqlalchemy.orm import Query

from dataall.core.environment.db.environment_models import Environment, EnvironmentGroup
from dataall.core.environment.db.environment_models import Environment
from dataall.core.environment.services.environment_resource_manager import EnvironmentResource
from dataall.base.db import exceptions, paginate
from dataall.modules.shares_base.services.shares_enums import (
ShareItemHealthStatus,
ShareObjectStatus,
ShareItemStatus,
ShareableType,
PrincipalType,
)
from dataall.modules.shares_base.db.share_state_machines_repositories import ShareStatusRepository
from dataall.modules.shares_base.db.share_object_models import ShareObjectItem, ShareObject
from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.s3_datasets.db.dataset_models import DatasetStorageLocation, DatasetTable, S3Dataset, DatasetBucket
from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, S3Dataset
from dataall.modules.datasets_base.db.dataset_models import DatasetBase

logger = logging.getLogger(__name__)


class ShareEnvironmentResource(EnvironmentResource):
class S3ShareEnvironmentResource(EnvironmentResource):
@staticmethod
def count_resources(session, environment, group_uri) -> int:
return S3ShareObjectRepository.count_S3_principal_shares(
Expand Down Expand Up @@ -395,124 +392,6 @@ def list_s3_dataset_shares_with_existing_shared_items(
query = query.filter(ShareObjectItem.itemType == item_type)
return query.all()

@staticmethod # TODO!!!
def list_shareable_items(session, share, states, data): # TODO
# All tables from dataset with a column isShared
# marking the table as part of the shareObject
tables = (
session.query(
DatasetTable.tableUri.label('itemUri'),
func.coalesce('DatasetTable').label('itemType'),
DatasetTable.GlueTableName.label('itemName'),
DatasetTable.description.label('description'),
ShareObjectItem.shareItemUri.label('shareItemUri'),
ShareObjectItem.status.label('status'),
ShareObjectItem.healthStatus.label('healthStatus'),
ShareObjectItem.healthMessage.label('healthMessage'),
ShareObjectItem.lastVerificationTime.label('lastVerificationTime'),
case(
[(ShareObjectItem.shareItemUri.isnot(None), True)],
else_=False,
).label('isShared'),
)
.outerjoin(
ShareObjectItem,
and_(
ShareObjectItem.shareUri == share.shareUri,
DatasetTable.tableUri == ShareObjectItem.itemUri,
),
)
.filter(DatasetTable.datasetUri == share.datasetUri)
)
if states:
tables = tables.filter(ShareObjectItem.status.in_(states))

# All folders from the dataset with a column isShared
# marking the folder as part of the shareObject
locations = (
session.query(
DatasetStorageLocation.locationUri.label('itemUri'),
func.coalesce('DatasetStorageLocation').label('itemType'),
DatasetStorageLocation.S3Prefix.label('itemName'),
DatasetStorageLocation.description.label('description'),
ShareObjectItem.shareItemUri.label('shareItemUri'),
ShareObjectItem.status.label('status'),
ShareObjectItem.healthStatus.label('healthStatus'),
ShareObjectItem.healthMessage.label('healthMessage'),
ShareObjectItem.lastVerificationTime.label('lastVerificationTime'),
case(
[(ShareObjectItem.shareItemUri.isnot(None), True)],
else_=False,
).label('isShared'),
)
.outerjoin(
ShareObjectItem,
and_(
ShareObjectItem.shareUri == share.shareUri,
DatasetStorageLocation.locationUri == ShareObjectItem.itemUri,
),
)
.filter(DatasetStorageLocation.datasetUri == share.datasetUri)
)
if states:
locations = locations.filter(ShareObjectItem.status.in_(states))

s3_buckets = (
session.query(
DatasetBucket.bucketUri.label('itemUri'),
func.coalesce('S3Bucket').label('itemType'),
DatasetBucket.S3BucketName.label('itemName'),
DatasetBucket.description.label('description'),
ShareObjectItem.shareItemUri.label('shareItemUri'),
ShareObjectItem.status.label('status'),
ShareObjectItem.healthStatus.label('healthStatus'),
ShareObjectItem.healthMessage.label('healthMessage'),
ShareObjectItem.lastVerificationTime.label('lastVerificationTime'),
case(
[(ShareObjectItem.shareItemUri.isnot(None), True)],
else_=False,
).label('isShared'),
)
.outerjoin(
ShareObjectItem,
and_(
ShareObjectItem.shareUri == share.shareUri,
DatasetBucket.bucketUri == ShareObjectItem.itemUri,
),
)
.filter(DatasetBucket.datasetUri == share.datasetUri)
)
if states:
s3_buckets = s3_buckets.filter(ShareObjectItem.status.in_(states))

shareable_objects = tables.union(locations, s3_buckets).subquery('shareable_objects')
query = session.query(shareable_objects)

if data:
if data.get('term'):
term = data.get('term')
query = query.filter(
or_(
shareable_objects.c.itemName.ilike(term + '%'),
shareable_objects.c.description.ilike(term + '%'),
)
)
if 'isShared' in data:
is_shared = data.get('isShared')
query = query.filter(shareable_objects.c.isShared == is_shared)

if 'isHealthy' in data:
# healthy_status = ShareItemHealthStatus.Healthy.value
query = (
query.filter(shareable_objects.c.healthStatus == ShareItemHealthStatus.Healthy.value)
if data.get('isHealthy')
else query.filter(shareable_objects.c.healthStatus != ShareItemHealthStatus.Healthy.value)
)

return paginate(
query.order_by(shareable_objects.c.itemName).distinct(), data.get('page', 1), data.get('pageSize', 10)
).to_dict()

# the next 2 methods are used in subscription task
@staticmethod
def find_share_items_by_item_uri(session, item_uri):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
log = logging.getLogger(__name__)


class DatasetSharingAlarmService(AlarmService):
class S3ShareAlarmService(AlarmService):
"""Contains set of alarms for datasets"""

def trigger_table_sharing_failure_alarm(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService
from dataall.base.db import exceptions
from dataall.modules.shares_base.db.share_object_models import ShareObject
from dataall.modules.s3_datasets_shares.db.s3_share_object_repositories import S3ShareObjectRepository
from dataall.modules.shares_base.services.share_permissions import SHARE_OBJECT_APPROVER
from dataall.modules.s3_datasets.services.dataset_permissions import (
DELETE_DATASET,
DELETE_DATASET_TABLE,
DELETE_DATASET_FOLDER,
)
from dataall.modules.datasets_base.services.datasets_enums import DatasetRole, DatasetTypes
from dataall.modules.datasets_base.services.dataset_service_interface import DatasetServiceInterface


import logging

log = logging.getLogger(__name__)


class S3ShareDatasetService(DatasetServiceInterface):
@property
def dataset_type(self):
return DatasetTypes.S3

@staticmethod
def resolve_additional_dataset_user_role(session, uri, username, groups):
"""Implemented as part of the DatasetServiceInterface"""
share = S3ShareObjectRepository.get_share_by_dataset_attributes(session, uri, username, groups)
if share is not None:
return DatasetRole.Shared.value
return None

@staticmethod
def check_before_delete(session, uri, **kwargs):
"""Implemented as part of the DatasetServiceInterface"""
action = kwargs.get('action')
if action in [DELETE_DATASET_FOLDER, DELETE_DATASET_TABLE]:
existing_s3_shared_items = S3ShareObjectRepository.check_existing_s3_shared_items(session, uri)
if existing_s3_shared_items:
raise exceptions.ResourceShared(
action=action,
message='Revoke all shares for this item before deletion',
)
elif action in [DELETE_DATASET]:
shares = S3ShareObjectRepository.list_s3_dataset_shares_with_existing_shared_items(
session=session, dataset_uri=uri
)
if shares:
raise exceptions.ResourceShared(
action=DELETE_DATASET,
message='Revoke all dataset shares before deletion.',
)
else:
raise exceptions.RequiredParameter('Delete action')
return True

@staticmethod
def execute_on_delete(session, uri, **kwargs):
"""Implemented as part of the DatasetServiceInterface"""
action = kwargs.get('action')
if action in [DELETE_DATASET_FOLDER, DELETE_DATASET_TABLE]:
S3ShareObjectRepository.delete_s3_share_item(session, uri)
elif action in [DELETE_DATASET]:
S3ShareObjectRepository.delete_s3_shares_with_no_shared_items(session, uri)
else:
raise exceptions.RequiredParameter('Delete action')
return True

@staticmethod
def append_to_list_user_datasets(session, username, groups):
"""Implemented as part of the DatasetServiceInterface"""
return S3ShareObjectRepository.list_user_s3_shared_datasets(session, username, groups)

@staticmethod
def extend_attach_steward_permissions(session, dataset, new_stewards, **kwargs):
"""Implemented as part of the DatasetServiceInterface"""
dataset_shares = S3ShareObjectRepository.find_s3_dataset_shares(session, dataset.datasetUri)
if dataset_shares:
for share in dataset_shares:
ResourcePolicyService.attach_resource_policy(
session=session,
group=new_stewards,
permissions=SHARE_OBJECT_APPROVER,
resource_uri=share.shareUri,
resource_type=ShareObject.__name__,
)
if dataset.stewards != dataset.SamlAdminGroupName:
ResourcePolicyService.delete_resource_policy(
session=session,
group=dataset.stewards,
resource_uri=share.shareUri,
)

@staticmethod
def extend_delete_steward_permissions(session, dataset, **kwargs):
"""Implemented as part of the DatasetServiceInterface"""
dataset_shares = S3ShareObjectRepository.find_s3_dataset_shares(session, dataset.datasetUri)
if dataset_shares:
for share in dataset_shares:
if dataset.stewards != dataset.SamlAdminGroupName:
ResourcePolicyService.delete_resource_policy(
session=session,
group=dataset.stewards,
resource_uri=share.shareUri,
)
Loading

0 comments on commit 2e80de4

Please sign in to comment.