Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic shares_base module and specific s3_datasets_shares module - part 8 (share_item_service) #1350

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from dataall.modules.s3_datasets_shares.db.share_object_repositories import ShareObjectRepository
from dataall.modules.shares_base.db.share_object_state_machines import ShareItemSM
from dataall.modules.shares_base.services.share_permissions import SHARE_OBJECT_APPROVER, GET_SHARE_OBJECT
from dataall.modules.s3_datasets_shares.services.share_item_service import ShareItemService
from dataall.modules.s3_datasets_shares.services.share_item_service import S3ShareItemService
from dataall.modules.shares_base.services.share_item_service import ShareItemService
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.s3_datasets.services.dataset_permissions import (
MANAGE_DATASETS,
Expand Down Expand Up @@ -194,7 +195,7 @@ def get_s3_consumption_data(uri):
separator='-',
)
# Check if the share was made with a Glue Database
datasetGlueDatabase = ShareItemService._get_glue_database_for_share(
datasetGlueDatabase = S3ShareItemService.get_glue_database_for_share(
dataset.GlueDatabaseName, dataset.AwsAccountId, dataset.region
)
old_shared_db_name = f'{datasetGlueDatabase}_shared_{uri}'[:254]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,220 +1,21 @@
import logging

from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService
from dataall.core.tasks.service_handlers import Worker
from dataall.base.context import get_context
from dataall.core.environment.services.environment_service import EnvironmentService
from dataall.core.tasks.db.task_models import Task
from dataall.base.db import utils
from dataall.base.db.exceptions import ObjectNotFound, UnauthorizedOperation
from dataall.modules.shares_base.services.shares_enums import (
ShareObjectActions,
ShareableType,
ShareItemStatus,
ShareItemActions,
ShareItemHealthStatus,
)
from dataall.modules.s3_datasets_shares.aws.glue_client import GlueClient
from dataall.modules.shares_base.db.share_object_models import ShareObjectItem
from dataall.modules.s3_datasets_shares.db.share_object_repositories import ShareObjectRepository
from dataall.modules.shares_base.db.share_object_state_machines import (
ShareObjectSM,
ShareItemSM,
)
from dataall.modules.shares_base.services.share_exceptions import ShareItemsFound
from dataall.modules.shares_base.services.share_notification_service import ShareNotificationService
from dataall.modules.shares_base.services.share_permissions import (
GET_SHARE_OBJECT,
ADD_ITEM,
REMOVE_ITEM,
LIST_ENVIRONMENT_SHARED_WITH_OBJECTS,
APPROVE_SHARE_OBJECT,
)
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, S3Dataset, DatasetStorageLocation
from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, DatasetStorageLocation
from dataall.modules.s3_datasets.services.dataset_permissions import DATASET_TABLE_READ, DATASET_FOLDER_READ

log = logging.getLogger(__name__)


class ShareItemService:
@staticmethod
def _get_share_uri(session, uri):
share_item = ShareObjectRepository.get_share_item_by_uri(session, uri)
share = ShareObjectRepository.get_share_by_uri(session, share_item.shareUri)
return share.shareUri

@staticmethod
@ResourcePolicyService.has_resource_permission(GET_SHARE_OBJECT)
def verify_items_share_object(uri, item_uris):
context = get_context()
with context.db_engine.scoped_session() as session:
verify_items = [ShareObjectRepository.get_share_item_by_uri(session, uri) for uri in item_uris]
for item in verify_items:
setattr(item, 'healthStatus', ShareItemHealthStatus.PendingVerify.value)

verify_share_items_task: Task = Task(action='ecs.share.verify', targetUri=uri)
session.add(verify_share_items_task)

Worker.queue(engine=context.db_engine, task_ids=[verify_share_items_task.taskUri])
return True

@staticmethod
@ResourcePolicyService.has_resource_permission(APPROVE_SHARE_OBJECT)
def reapply_items_share_object(uri, item_uris):
context = get_context()
with context.db_engine.scoped_session() as session:
verify_items = [ShareObjectRepository.get_share_item_by_uri(session, uri) for uri in item_uris]
for item in verify_items:
setattr(item, 'healthStatus', ShareItemHealthStatus.PendingReApply.value)

reapply_share_items_task: Task = Task(action='ecs.share.reapply', targetUri=uri)
session.add(reapply_share_items_task)

Worker.queue(engine=context.db_engine, task_ids=[reapply_share_items_task.taskUri])
return True

@staticmethod
@ResourcePolicyService.has_resource_permission(GET_SHARE_OBJECT)
def revoke_items_share_object(uri, revoked_uris):
context = get_context()
with context.db_engine.scoped_session() as session:
share = ShareObjectRepository.get_share_by_uri(session, uri)
dataset = DatasetRepository.get_dataset_by_uri(session, share.datasetUri)
revoked_items_states = ShareObjectRepository.get_share_items_states(session, uri, revoked_uris)
revoked_items = [ShareObjectRepository.get_share_item_by_uri(session, uri) for uri in revoked_uris]

if not revoked_items_states:
raise ShareItemsFound(
action='Revoke Items from Share Object',
message='Nothing to be revoked.',
)

share_sm = ShareObjectSM(share.status)
new_share_state = share_sm.run_transition(ShareObjectActions.RevokeItems.value)

for item_state in revoked_items_states:
item_sm = ShareItemSM(item_state)
new_state = item_sm.run_transition(ShareObjectActions.RevokeItems.value)
for item in revoked_items:
if item.status == item_state:
item_sm.update_state_single_item(session, item, new_state)

share_sm.update_state(session, share, new_share_state)

ShareNotificationService(session=session, dataset=dataset, share=share).notify_share_object_rejection(
email_id=context.username
)

revoke_share_task: Task = Task(
action='ecs.share.revoke',
targetUri=uri,
payload={'environmentUri': share.environmentUri},
)
session.add(revoke_share_task)

Worker.queue(engine=context.db_engine, task_ids=[revoke_share_task.taskUri])

return share

@staticmethod
@ResourcePolicyService.has_resource_permission(ADD_ITEM)
def add_shared_item(uri: str, data: dict = None):
context = get_context()
with context.db_engine.scoped_session() as session:
item_type = data.get('itemType')
item_uri = data.get('itemUri')
share = ShareObjectRepository.get_share_by_uri(session, uri)
dataset: S3Dataset = DatasetRepository.get_dataset_by_uri(session, share.datasetUri)
target_environment = EnvironmentService.get_environment_by_uri(session, share.environmentUri)

share_sm = ShareObjectSM(share.status)
new_share_state = share_sm.run_transition(ShareItemActions.AddItem.value)
share_sm.update_state(session, share, new_share_state)

item = ShareObjectRepository.get_share_item(session, item_type, item_uri)
if not item:
raise ObjectNotFound('ShareObjectItem', item_uri)

if item_type == ShareableType.Table.value and item.region != target_environment.region:
raise UnauthorizedOperation(
action=ADD_ITEM,
message=f'Lake Formation cross region sharing is not supported. '
f'Table {item.itemUri} is in {item.region} and target environment '
f'{target_environment.name} is in {target_environment.region} ',
)

share_item: ShareObjectItem = ShareObjectRepository.find_sharable_item(session, uri, item_uri)

s3_access_point_name = utils.slugify(
share.datasetUri + '-' + share.principalId,
max_length=50,
lowercase=True,
regex_pattern='[^a-zA-Z0-9-]',
separator='-',
)
log.info(f'S3AccessPointName={s3_access_point_name}')

if not share_item:
share_item = ShareObjectItem(
shareUri=uri,
itemUri=item_uri,
itemType=item_type,
itemName=item.name,
status=ShareItemStatus.PendingApproval.value,
owner=context.username,
)
session.add(share_item)
return share_item

@staticmethod
@ResourcePolicyService.has_resource_permission(REMOVE_ITEM, parent_resource=_get_share_uri)
def remove_shared_item(uri: str):
with get_context().db_engine.scoped_session() as session:
share_item = ShareObjectRepository.get_share_item_by_uri(session, uri)
if (
share_item.itemType == ShareableType.Table.value
and share_item.status == ShareItemStatus.Share_Failed.value
):
share = ShareObjectRepository.get_share_by_uri(session, share_item.shareUri)
ResourcePolicyService.delete_resource_policy(
session=session,
group=share.groupUri,
resource_uri=share_item.itemUri,
)

item_sm = ShareItemSM(share_item.status)
item_sm.run_transition(ShareItemActions.RemoveItem.value)
ShareObjectRepository.remove_share_object_item(session, share_item)
return True

@staticmethod
@ResourcePolicyService.has_resource_permission(GET_SHARE_OBJECT)
def resolve_shared_item(uri, item: ShareObjectItem):
with get_context().db_engine.scoped_session() as session:
return ShareObjectRepository.get_share_item(session, item.itemType, item.itemUri)

@staticmethod
def check_existing_shared_items(share):
with get_context().db_engine.scoped_session() as session:
return ShareObjectRepository.check_existing_shared_items(session, share.shareUri)

@staticmethod
def list_shareable_objects(share, filter, is_revokable=False):
states = None
if is_revokable:
states = ShareItemSM.get_share_item_revokable_states()

with get_context().db_engine.scoped_session() as session:
return ShareObjectRepository.list_shareable_items(session, share, states, filter)

@staticmethod
@ResourcePolicyService.has_resource_permission(LIST_ENVIRONMENT_SHARED_WITH_OBJECTS)
def paginated_shared_with_environment_datasets(session, uri, data) -> dict:
return ShareObjectRepository.paginate_shared_datasets(session, uri, data)

class S3ShareItemService:
@staticmethod
def _get_glue_database_for_share(glueDatabase, account_id, region):
def get_glue_database_for_share(glueDatabase, account_id, region): # TODO: IN S3_DATASETS_SHARES
# Check if a catalog account exists and return database accordingly
try:
catalog_dict = GlueClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from dataall.modules.s3_datasets_shares.services.share_managers import LFShareManager
from dataall.modules.s3_datasets_shares.aws.ram_client import RamClient
from dataall.modules.shares_base.services.share_object_service import ShareObjectService
from dataall.modules.s3_datasets_shares.services.share_item_service import ShareItemService
from dataall.modules.s3_datasets_shares.services.share_item_service import S3ShareItemService
from dataall.modules.s3_datasets_shares.db.share_object_repositories import ShareObjectRepository
from dataall.modules.shares_base.db.share_object_state_machines import ShareItemSM
from dataall.modules.s3_datasets_shares.services.share_managers.share_manager_utils import ShareErrorFormatter
Expand Down Expand Up @@ -151,7 +151,7 @@ def process_approved_shares(self) -> bool:
manager.grant_principals_permissions_to_resource_link_table(table)

log.info('Attaching TABLE READ permissions...')
ShareItemService.attach_dataset_table_read_permission(
S3ShareItemService.attach_dataset_table_read_permission(
self.session, self.share_data.share, table.tableUri
)

Expand Down Expand Up @@ -273,7 +273,7 @@ def process_revoked_shares(self) -> bool:
and self.share_data.share.groupUri != self.share_data.dataset.stewards
):
log.info('Deleting TABLE READ permissions...')
ShareItemService.delete_dataset_table_read_permission(
S3ShareItemService.delete_dataset_table_read_permission(
self.session, self.share_data.share, table.tableUri
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dataall.modules.shares_base.services.share_exceptions import PrincipalRoleNotFound
from dataall.modules.s3_datasets_shares.services.share_managers import S3AccessPointShareManager
from dataall.modules.shares_base.services.share_object_service import ShareObjectService
from dataall.modules.s3_datasets_shares.services.share_item_service import ShareItemService
from dataall.modules.s3_datasets_shares.services.share_item_service import S3ShareItemService
from dataall.modules.shares_base.services.shares_enums import (
ShareItemHealthStatus,
ShareItemStatus,
Expand Down Expand Up @@ -74,7 +74,7 @@ def process_approved_shares(self) -> bool:
manager.update_dataset_bucket_key_policy()

log.info('Attaching FOLDER READ permissions...')
ShareItemService.attach_dataset_folder_read_permission(
S3ShareItemService.attach_dataset_folder_read_permission(
self.session, self.share_data.share, folder.locationUri
)

Expand Down Expand Up @@ -143,7 +143,7 @@ def process_revoked_shares(self) -> bool:
and self.share_data.share.groupUri != self.share_data.dataset.stewards
):
log.info(f'Deleting FOLDER READ permissions from {folder.locationUri}...')
ShareItemService.delete_dataset_folder_read_permission(
S3ShareItemService.delete_dataset_folder_read_permission(
self.session, manager.share, folder.locationUri
)

Expand Down
6 changes: 3 additions & 3 deletions backend/dataall/modules/shares_base/api/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
from dataall.base.db.exceptions import RequiredParameter
from dataall.modules.shares_base.services.shares_enums import ShareObjectPermission
from dataall.modules.shares_base.db.share_object_models import ShareObjectItem, ShareObject
from dataall.modules.s3_datasets_shares.services.share_item_service import ShareItemService
from dataall.modules.shares_base.services.share_item_service import ShareItemService
from dataall.modules.shares_base.services.share_object_service import ShareObjectService
from dataall.modules.shares_base.services.share_logs_service import ShareLogsService
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.s3_datasets.db.dataset_models import DatasetStorageLocation, DatasetTable, S3Dataset
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository # TODO
from dataall.modules.s3_datasets.db.dataset_models import DatasetStorageLocation, DatasetTable, S3Dataset # TODO


log = logging.getLogger(__name__)
Expand Down
Loading
Loading