Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle PermissionDenied when listing accessible workspaces #2733

Merged
merged 13 commits into from
Sep 24, 2024
27 changes: 16 additions & 11 deletions src/databricks/labs/ucx/account/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,25 @@ def get_accessible_workspaces(self) -> list[Workspace]:
return accessible_workspaces

def can_administer(self, workspace: Workspace) -> bool:
"""Evaluate if the user can administer a workspace.

A user can administer a workspace if the user can access the workspace and is a member of the workspace "admins"
group.

Args:
workspace (Workspace): The workspace to check if the user can administer.

Returns:
bool: True if the user can administer the workspace, False otherwise.
"""
try:
# check if user has access to workspace
ws = self.client_for(workspace)
except (PermissionDenied, NotFound, ValueError) as err:
logger.warning(f"{workspace.deployment_name}: Encounter error {err}. Skipping...")
return False
current_user = ws.current_user.me()
if current_user.groups is None:
current_user = ws.current_user.me()
except (PermissionDenied, NotFound, ValueError) as e:
logger.warning(f"User cannot access workspace: {workspace.deployment_name}", exc_info=e)
return False
# check if user is a workspace admin
if "admins" not in [g.display for g in current_user.groups]:
logger.warning(
f"{workspace.deployment_name}: User {current_user.user_name} is not a workspace admin. Skipping..."
)
if current_user.groups is None or "admins" not in {g.display for g in current_user.groups}:
logger.warning(f"User '{current_user.user_name}' is not a workspace admin: {workspace.deployment_name}")
return False
return True

Expand Down
57 changes: 56 additions & 1 deletion tests/unit/account/test_workspaces.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import io
import json
from unittest.mock import create_autospec
Expand All @@ -6,7 +7,7 @@
from databricks.labs.blueprint.installation import Installation, MockInstallation
from databricks.labs.blueprint.tui import MockPrompts
from databricks.sdk import AccountClient, WorkspaceClient
from databricks.sdk.errors import NotFound, ResourceConflict
from databricks.sdk.errors import NotFound, PermissionDenied, ResourceConflict
from databricks.sdk.service import iam
from databricks.sdk.service.iam import ComplexValue, Group, ResourceMeta, User
from databricks.sdk.service.provisioning import Workspace
Expand Down Expand Up @@ -494,3 +495,57 @@ def get_workspace_client(workspace) -> WorkspaceClient:
acc.config.auth_type = "databricks-cli"
account_workspaces = AccountWorkspaces(acc)
assert len(account_workspaces.get_accessible_workspaces()) == 1


def test_account_workspaces_can_administer_when_user_in_admins_group() -> None:
acc = create_autospec(AccountClient)
ws = create_autospec(WorkspaceClient)
acc.get_workspace_client.return_value = ws
ws.current_user.me.return_value = User(user_name="test", groups=[ComplexValue(display="admins")])
account_workspaces = AccountWorkspaces(acc)
workspace = Workspace(deployment_name="test")

assert account_workspaces.can_administer(workspace)


@pytest.mark.parametrize("groups", [[ComplexValue(display="not-admins")], None])
def test_account_workspaces_cannot_administer_when_user_not_in_admins_group(caplog, groups) -> None:
acc = create_autospec(AccountClient)
ws = create_autospec(WorkspaceClient)
acc.get_workspace_client.return_value = ws
ws.current_user.me.return_value = User(user_name="test", groups=groups)
account_workspaces = AccountWorkspaces(acc)
workspace = Workspace(deployment_name="test")

with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.account.workspaces"):
can_administer = account_workspaces.can_administer(workspace)
assert not can_administer
assert "User 'test' is not a workspace admin: test" in caplog.messages


def test_account_workspaces_can_administer_handles_not_found_error_for_get_workspace_client(caplog) -> None:
acc = create_autospec(AccountClient)
acc.get_workspace_client.side_effect = NotFound
account_workspaces = AccountWorkspaces(acc)
workspace = Workspace(deployment_name="test")

with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.account.workspaces"):
can_administer = account_workspaces.can_administer(workspace)
assert not can_administer
assert "User cannot access workspace: test" in caplog.messages


def test_account_workspaces_can_administer_handles_permission_denied_error_for_current_user(caplog) -> None:
acc = create_autospec(AccountClient)
ws = create_autospec(WorkspaceClient)
acc.get_workspace_client.return_value = ws
ws.current_user.me.side_effect = PermissionDenied(
"This API is disabled for users without the databricks-sql-access or workspace-access entitlements"
)
account_workspaces = AccountWorkspaces(acc)
workspace = Workspace(deployment_name="test")

with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.account.workspaces"):
can_administer = account_workspaces.can_administer(workspace)
assert not can_administer
assert "User cannot access workspace: test" in caplog.messages