Skip to content

Commit

Permalink
Extended test suite for HMS->HMS TACL migration (#439)
Browse files Browse the repository at this point in the history
  • Loading branch information
nfx authored Oct 11, 2023
1 parent 9479258 commit aaf7c00
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 25 deletions.
17 changes: 17 additions & 0 deletions docs/local-group-migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,21 @@ from databricks.labs.ucx.config import GroupsConfig

group_manager = GroupManager(ws, GroupsConfig(auto=True))
group_manager.ws_local_group_deletion_recovery()
```

3. Recover Table ACL from `$inventory.grants` to `$inventory.permissions`:

```python
from databricks.labs.ucx.hive_metastore import GrantsCrawler, TablesCrawler
from databricks.labs.ucx.workspace_access.manager import PermissionManager
from databricks.labs.ucx.workspace_access.tacl import TableAclSupport
from databricks.labs.ucx.framework.crawlers import RuntimeBackend

sql_backend = RuntimeBackend()
inventory_schema = cfg.inventory_database
tables = TablesCrawler(sql_backend, inventory_schema)
grants = GrantsCrawler(tables)
tacl = TableAclSupport(grants, sql_backend)
permission_manager = PermissionManager(sql_backend, inventory_schema, [tacl])
permission_manager.inventorize_permissions()
```
4 changes: 3 additions & 1 deletion src/databricks/labs/ucx/framework/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def _execute(self):
with ThreadPoolExecutor(self._num_threads) as pool:
futures = []
for task in self._tasks:
if task is None:
continue
future = pool.submit(self._wrap_result(task, self._name))
future.add_done_callback(self._progress_report)
futures.append(future)
Expand Down Expand Up @@ -98,7 +100,7 @@ def inner(*args, **kwargs):
try:
return func(*args, **kwargs), None
except Exception as err:
logger.error(f"{name} task failed: {err!s}")
logger.error(f"{name} task failed: {err!s}", exc_info=err)
return None, err

return inner
15 changes: 13 additions & 2 deletions src/databricks/labs/ucx/workspace_access/manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import os
from collections.abc import Callable, Iterator
Expand Down Expand Up @@ -80,7 +81,7 @@ def apply_group_permissions(self, migration_state: GroupMigrationState, destinat
if len(migration_state.groups) == 0:
logger.info("No valid groups selected, nothing to do.")
return True
items = sorted(self._load_all(), key=lambda i: i.object_type)
items = sorted(self.load_all(), key=lambda i: i.object_type)
logger.info(
f"Applying the permissions to {destination} groups. "
f"Total groups to apply permissions: {len(migration_state.groups)}. "
Expand All @@ -104,6 +105,9 @@ def apply_group_permissions(self, migration_state: GroupMigrationState, destinat
tasks_for_support = [
relevant_support.get_apply_task(item, migration_state, destination) for item in items_subset
]
tasks_for_support = [_ for _ in tasks_for_support if _ is not None]
if len(tasks_for_support) == 0:
continue
logger.info(f"Total tasks for {object_type}: {len(tasks_for_support)}")
applier_tasks.extend(tasks_for_support)

Expand Down Expand Up @@ -132,16 +136,23 @@ def cleanup(self):
logger.info("Inventory table cleanup complete")

def _save(self, items: list[Permissions]):
# keep in mind, that object_type and object_id are not primary keys.
self._append_records(items) # TODO: update instead of append
logger.info("Successfully saved the items to inventory table")

def _load_all(self) -> list[Permissions]:
def load_all(self) -> list[Permissions]:
logger.info(f"Loading inventory table {self._full_name}")
return [
Permissions(object_id, object_type, raw)
for object_id, object_type, raw in self._fetch(f"SELECT object_id, object_type, raw FROM {self._full_name}")
]

def load_all_for(self, object_type: str, object_id: str, klass: type) -> any:
for perm in self.load_all():
if object_type == perm.object_type and object_id == perm.object_id:
raw = json.loads(perm.raw)
yield klass(**raw)

def _get_crawler_tasks(self) -> Iterator[Callable[..., Permissions | None]]:
for support in self._acl_support:
yield from support.get_crawler_tasks()
58 changes: 49 additions & 9 deletions tests/integration/workspace_access/test_groups.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import logging
from datetime import timedelta

from databricks.sdk import WorkspaceClient
from databricks.sdk.retries import retried
from databricks.sdk.service.iam import PermissionLevel

from databricks.labs.ucx.config import GroupsConfig
from databricks.labs.ucx.hive_metastore import GrantsCrawler, TablesCrawler
from databricks.labs.ucx.hive_metastore.grants import Grant
from databricks.labs.ucx.workspace_access.generic import (
GenericPermissionsSupport, Listing,
GenericPermissionsSupport,
Listing,
)
from databricks.labs.ucx.workspace_access.groups import GroupManager
from databricks.labs.ucx.workspace_access.manager import PermissionManager
from databricks.labs.ucx.workspace_access.tacl import TableAclSupport

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -89,47 +94,82 @@ def test_replace_workspace_groups_with_account_groups(
generic_permissions = GenericPermissionsSupport(
ws, [Listing(ws.cluster_policies.list, "policy_id", "cluster-policies")]
)
permission_manager = PermissionManager(sql_backend, inventory_schema, [generic_permissions])
tables = TablesCrawler(sql_backend, inventory_schema)
grants = GrantsCrawler(tables)
tacl = TableAclSupport(grants, sql_backend)
permission_manager = PermissionManager(sql_backend, inventory_schema, [generic_permissions, tacl])

permission_manager.inventorize_permissions()

dummy_grants = list(permission_manager.load_all_for("TABLE", dummy_table.full_name, Grant))
assert 2 == len(dummy_grants)

table_permissions = grants.for_table_info(dummy_table)
print(table_permissions)
assert ws_group.display_name in table_permissions
assert "SELECT" in table_permissions[ws_group.display_name]

permission_manager.apply_group_permissions(group_manager.migration_state, destination="backup")

@retried(on=[AssertionError], timeout=timedelta(seconds=30))
def check_table_permissions_for_backup_group():
table_permissions = grants.for_table_info(dummy_table)
assert group_info.workspace.display_name in table_permissions
assert group_info.backup.display_name in table_permissions
assert "SELECT" in table_permissions[group_info.workspace.display_name]
assert "SELECT" in table_permissions[group_info.backup.display_name]

check_table_permissions_for_backup_group()

policy_permissions = generic_permissions.load_as_dict("cluster-policies", cluster_policy.policy_id)
assert PermissionLevel.CAN_USE == policy_permissions[group_info.workspace.display_name]
assert PermissionLevel.CAN_USE == policy_permissions[group_info.backup.display_name]

group_manager.replace_workspace_groups_with_account_groups()

table_permissions = grants.for_table_info(dummy_table)
print(table_permissions)
@retried(on=[AssertionError], timeout=timedelta(seconds=30))
def check_table_permissions_for_account_group():
table_permissions = grants.for_table_info(dummy_table)
assert group_info.account.display_name in table_permissions
assert group_info.backup.display_name in table_permissions
assert "SELECT" in table_permissions[group_info.backup.display_name]

check_table_permissions_for_account_group()

policy_permissions = generic_permissions.load_as_dict("cluster-policies", cluster_policy.policy_id)
assert group_info.workspace.display_name not in policy_permissions
assert PermissionLevel.CAN_USE == policy_permissions[group_info.backup.display_name]

permission_manager.apply_group_permissions(group_manager.migration_state, destination="account")

table_permissions = grants.for_table_info(dummy_table)
print(table_permissions)
@retried(on=[AssertionError], timeout=timedelta(seconds=30))
def check_table_permissions_for_account_group():
table_permissions = grants.for_table_info(dummy_table)
assert group_info.account.display_name in table_permissions
assert group_info.backup.display_name in table_permissions
assert "SELECT" in table_permissions[group_info.backup.display_name]
assert "SELECT" in table_permissions[group_info.account.display_name]

check_table_permissions_for_account_group()

policy_permissions = generic_permissions.load_as_dict("cluster-policies", cluster_policy.policy_id)
assert PermissionLevel.CAN_USE == policy_permissions[group_info.account.display_name]
assert PermissionLevel.CAN_USE == policy_permissions[group_info.backup.display_name]

# TODO: check hive grants as well

for _info in group_manager.migration_state.groups:
ws.groups.delete(_info.backup.id)

policy_permissions = generic_permissions.load_as_dict("cluster-policies", cluster_policy.policy_id)
assert group_info.backup.display_name not in policy_permissions

@retried(on=[AssertionError], timeout=timedelta(seconds=30))
def check_table_permissions_after_backup_delete():
table_permissions = grants.for_table_info(dummy_table)
assert group_info.backup.display_name not in table_permissions
assert group_info.account.display_name in table_permissions
assert "SELECT" in table_permissions[group_info.account.display_name]

check_table_permissions_after_backup_delete()


def test_group_listing(ws: WorkspaceClient, make_ucx_group):
ws_group, acc_group = make_ucx_group()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ def test_permissions_save_and_load(ws, sql_backend, inventory_schema, env_or_ski
]

pi._save(saved)
loaded = pi._load_all()
loaded = pi.load_all()

assert saved == loaded
63 changes: 63 additions & 0 deletions tests/integration/workspace_access/test_tacl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import logging

from databricks.sdk.service.iam import PermissionLevel

from databricks.labs.ucx.hive_metastore import GrantsCrawler, TablesCrawler
from databricks.labs.ucx.workspace_access.generic import (
GenericPermissionsSupport,
Listing,
)
from databricks.labs.ucx.workspace_access.manager import PermissionManager
from databricks.labs.ucx.workspace_access.tacl import TableAclSupport

logger = logging.getLogger(__name__)


def test_recover_permissions_from_grants(
ws,
sql_backend,
inventory_schema,
make_ucx_group,
make_group,
make_acc_group,
make_cluster_policy,
make_cluster_policy_permissions,
make_table,
):
ws_group, _ = make_ucx_group()
cluster_policy = make_cluster_policy()
make_cluster_policy_permissions(
object_id=cluster_policy.policy_id,
permission_level=PermissionLevel.CAN_USE,
group_name=ws_group.display_name,
)

dummy_table = make_table()
sql_backend.execute(f"GRANT SELECT ON TABLE {dummy_table.full_name} TO `{ws_group.display_name}`")

generic_permissions = GenericPermissionsSupport(
ws, [Listing(ws.cluster_policies.list, "policy_id", "cluster-policies")]
)
tables = TablesCrawler(sql_backend, inventory_schema)
grants = GrantsCrawler(tables)
tacl = TableAclSupport(grants, sql_backend)

# simulate: Table ACLs were not part of $inventory.permissions
permission_manager = PermissionManager(sql_backend, inventory_schema, [generic_permissions])
permission_manager.inventorize_permissions()

object_types = set()
for perm in permission_manager.load_all():
object_types.add(perm.object_type)
assert {"cluster-policies"} == object_types

# simulate: recovery of Table ACLs as part of $inventory.permissions
permission_manager = PermissionManager(sql_backend, inventory_schema, [tacl])
permission_manager.inventorize_permissions()

# simulate: normal flow
permission_manager = PermissionManager(sql_backend, inventory_schema, [generic_permissions, tacl])
object_types = set()
for perm in permission_manager.load_all():
object_types.add(perm.object_type)
assert {"CATALOG", "TABLE", "DATABASE", "cluster-policies"} == object_types
4 changes: 1 addition & 3 deletions tests/unit/workspace_access/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
Destination,
Permissions,
)
from databricks.labs.ucx.workspace_access.groups import (
GroupMigrationState,
)
from databricks.labs.ucx.workspace_access.groups import GroupMigrationState


def test_applier():
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/workspace_access/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test_load_all():
)
pi = PermissionManager(b, "test_database", [])

output = pi._load_all()
output = pi.load_all()
assert output[0] == Permissions("object1", "clusters", "test acl")


Expand Down
14 changes: 6 additions & 8 deletions tests/unit/workspace_access/test_tacl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@

from databricks.labs.ucx.hive_metastore import GrantsCrawler, TablesCrawler
from databricks.labs.ucx.workspace_access.base import Permissions
from databricks.labs.ucx.workspace_access.groups import (
GroupMigrationState,
MigrationGroupInfo,
)
from databricks.labs.ucx.workspace_access.groups import GroupMigrationState
from databricks.labs.ucx.workspace_access.tacl import TableAclSupport

from ..framework.mocks import MockBackend
Expand Down Expand Up @@ -51,7 +48,9 @@ def test_tacl_applier(mocker):
),
)
migration_state = GroupMigrationState()
migration_state.add(Group(display_name="abc"), Group(display_name="tmp-backup-abc"), Group(display_name="account-abc"))
migration_state.add(
Group(display_name="abc"), Group(display_name="tmp-backup-abc"), Group(display_name="account-abc")
)
task = table_acl_support.get_apply_task(permissions, migration_state, "backup")
task()

Expand All @@ -77,9 +76,8 @@ def test_tacl_applier_no_target_principal(mocker):
)
migration_state = GroupMigrationState()
migration_state.add(
Group(display_name="abc"),
Group(display_name="tmp-backup-abc"),
Group(display_name="account-abc"))
Group(display_name="abc"), Group(display_name="tmp-backup-abc"), Group(display_name="account-abc")
)
task = table_acl_support.get_apply_task(permissions, migration_state, "backup")
assert task is None

Expand Down

0 comments on commit aaf7c00

Please sign in to comment.