Skip to content

Commit

Permalink
Handle PermissionDenied when listing accessible workspaces (databri…
Browse files Browse the repository at this point in the history
…ckslabs#2733)

## Changes
Handle `PermissionDenied` when listing accessible workspaces

### Linked issues
Resolves databrickslabs#2732

### Functionality

- [x] modified existing command: that user the accesible workspaces
(most account level commands)

### Tests

- [ ] manually tested
- [x] added unit tests
  • Loading branch information
JCZuurmond authored and jgarciaf106 committed Sep 26, 2024
1 parent 204ab4a commit 0a6e72f
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 12 deletions.
27 changes: 16 additions & 11 deletions src/databricks/labs/ucx/account/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,25 @@ def get_accessible_workspaces(self) -> list[Workspace]:
return accessible_workspaces

def can_administer(self, workspace: Workspace) -> bool:
"""Evaluate if the user can administer a workspace.
A user can administer a workspace if the user can access the workspace and is a member of the workspace "admins"
group.
Args:
workspace (Workspace): The workspace to check if the user can administer.
Returns:
bool: True if the user can administer the workspace, False otherwise.
"""
try:
# check if user has access to workspace
ws = self.client_for(workspace)
except (PermissionDenied, NotFound, ValueError) as err:
logger.warning(f"{workspace.deployment_name}: Encounter error {err}. Skipping...")
return False
current_user = ws.current_user.me()
if current_user.groups is None:
current_user = ws.current_user.me()
except (PermissionDenied, NotFound, ValueError) as e:
logger.warning(f"User cannot access workspace: {workspace.deployment_name}", exc_info=e)
return False
# check if user is a workspace admin
if "admins" not in [g.display for g in current_user.groups]:
logger.warning(
f"{workspace.deployment_name}: User {current_user.user_name} is not a workspace admin. Skipping..."
)
if current_user.groups is None or "admins" not in {g.display for g in current_user.groups}:
logger.warning(f"User '{current_user.user_name}' is not a workspace admin: {workspace.deployment_name}")
return False
return True

Expand Down
57 changes: 56 additions & 1 deletion tests/unit/account/test_workspaces.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import io
import json
from unittest.mock import create_autospec
Expand All @@ -6,7 +7,7 @@
from databricks.labs.blueprint.installation import Installation, MockInstallation
from databricks.labs.blueprint.tui import MockPrompts
from databricks.sdk import AccountClient, WorkspaceClient
from databricks.sdk.errors import NotFound, ResourceConflict
from databricks.sdk.errors import NotFound, PermissionDenied, ResourceConflict
from databricks.sdk.service import iam
from databricks.sdk.service.iam import ComplexValue, Group, ResourceMeta, User
from databricks.sdk.service.provisioning import Workspace
Expand Down Expand Up @@ -494,3 +495,57 @@ def get_workspace_client(workspace) -> WorkspaceClient:
acc.config.auth_type = "databricks-cli"
account_workspaces = AccountWorkspaces(acc)
assert len(account_workspaces.get_accessible_workspaces()) == 1


def test_account_workspaces_can_administer_when_user_in_admins_group() -> None:
acc = create_autospec(AccountClient)
ws = create_autospec(WorkspaceClient)
acc.get_workspace_client.return_value = ws
ws.current_user.me.return_value = User(user_name="test", groups=[ComplexValue(display="admins")])
account_workspaces = AccountWorkspaces(acc)
workspace = Workspace(deployment_name="test")

assert account_workspaces.can_administer(workspace)


@pytest.mark.parametrize("groups", [[ComplexValue(display="not-admins")], None])
def test_account_workspaces_cannot_administer_when_user_not_in_admins_group(caplog, groups) -> None:
acc = create_autospec(AccountClient)
ws = create_autospec(WorkspaceClient)
acc.get_workspace_client.return_value = ws
ws.current_user.me.return_value = User(user_name="test", groups=groups)
account_workspaces = AccountWorkspaces(acc)
workspace = Workspace(deployment_name="test")

with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.account.workspaces"):
can_administer = account_workspaces.can_administer(workspace)
assert not can_administer
assert "User 'test' is not a workspace admin: test" in caplog.messages


def test_account_workspaces_can_administer_handles_not_found_error_for_get_workspace_client(caplog) -> None:
acc = create_autospec(AccountClient)
acc.get_workspace_client.side_effect = NotFound
account_workspaces = AccountWorkspaces(acc)
workspace = Workspace(deployment_name="test")

with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.account.workspaces"):
can_administer = account_workspaces.can_administer(workspace)
assert not can_administer
assert "User cannot access workspace: test" in caplog.messages


def test_account_workspaces_can_administer_handles_permission_denied_error_for_current_user(caplog) -> None:
acc = create_autospec(AccountClient)
ws = create_autospec(WorkspaceClient)
acc.get_workspace_client.return_value = ws
ws.current_user.me.side_effect = PermissionDenied(
"This API is disabled for users without the databricks-sql-access or workspace-access entitlements"
)
account_workspaces = AccountWorkspaces(acc)
workspace = Workspace(deployment_name="test")

with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.account.workspaces"):
can_administer = account_workspaces.can_administer(workspace)
assert not can_administer
assert "User cannot access workspace: test" in caplog.messages

0 comments on commit 0a6e72f

Please sign in to comment.