Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge workspace_access.Crawler and workspace_access.Applier interfaces to workspace_access.AclSupport #436

Merged
merged 4 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 13 additions & 17 deletions docs/local-group-migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,12 @@ To deliver this migration, the following steps are performed:

> Please note that inherited permissions will not be inventorized / migrated. We only cover direct permissions.

On a very high-level, the permissions inventorization process is split into two steps:
On a very high-level, the permissions crawling process is split into two steps:

1. collect all existing permissions into a persistent storage.
2. apply the collected permissions to the target resources.
1. collect all existing permissions into a persistent storage - see `workspace_access.AclSupport.get_crawler_tasks`.
2. apply the collected permissions to the target resources - see `workspace_access.AclSupport.get_apply_task`.

The first step is performed by the `Crawler` and the second by the `Applier`.

Crawler and applier are intrinsically connected to each other due to SerDe (serialization/deserialization) logic.

We implement separate crawlers and applier for each supported resource type.

Please note that `table ACLs` logic is currently handled separately from the logic described in this document.
We implement `workspace_access.AclSupport` for each supported resource type.

## Logical objects and relevant APIs

Expand Down Expand Up @@ -147,7 +141,9 @@ Additional info:

#### Known issues

- Folder names with forward-slash (`/`) in directory name will be skipped by the inventory. Databricks UI no longer allows creating folders with a forward slash. See [this issue](https://github.com/databrickslabs/ucx/issues/230) for more details.
- Folder names with forward-slash (`/`) in directory name will be skipped by the inventory. Databricks UI no longer
allows creating folders with a forward slash. See [this issue](https://github.com/databrickslabs/ucx/issues/230) for
more details.

### Secrets (uses Secrets API)

Expand All @@ -163,16 +159,16 @@ Additional info:
- put method: `ws.secrets.put_acl`


## Crawler and serialization logic
## AclSupport and serialization logic

Crawlers are expected to return a list of callable functions that will be later used to get the permissions.
Each of these functions shall return a `PermissionInventoryItem` that should be serializable into a Delta Table.
Each of these functions shall return a `workspace_access.Permissions` that should be serializable into a Delta Table.
The permission payload differs between different crawlers, therefore each crawler should implement a serialization
method.

## Applier and deserialization logic

Appliers are expected to accept a list of `PermissionInventoryItem` and generate a list of callables that will apply the
Appliers are expected to accept a list of `workspace_access.Permissions` and generate a list of callables that will apply the
given permissions.
Each applier should implement a deserialization method that will convert the raw payload into a typed one.
Each permission item should have a crawler type associated with it, so that the applier can use the correct
Expand All @@ -189,10 +185,10 @@ We do this inside the `applier`, by returning a `noop` callable if the object is
To crawl the permissions, we use the following logic:
1. Go through the list of all crawlers.
2. Get the list of all objects of the given type.
3. For each object, generate a callable that will return a `PermissionInventoryItem`.
3. For each object, generate a callable that will return a `workspace_access.Permissions`.
4. Execute the callables in parallel
5. Collect the results into a list of `PermissionInventoryItem`.
6. Save the list of `PermissionInventoryItem` into a Delta Table.
5. Collect the results into a list of `workspace_access.Permissions`.
6. Save the list of `workspace_access.Permissions` into a Delta Table.

## Applying the permissions

Expand Down
9 changes: 4 additions & 5 deletions src/databricks/labs/ucx/mixins/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,15 +471,14 @@ def create(
if single_node:
kwargs["num_workers"] = 0
if "spark_conf" in kwargs:
kwargs["spark_conf"] = {
**kwargs["spark_conf"],
**{"spark.databricks.cluster.profile": "singleNode", "spark.master": "local[*]"},
kwargs["spark_conf"] = kwargs["spark_conf"] | {
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
}
else:
kwargs["spark_conf"] = {"spark.databricks.cluster.profile": "singleNode", "spark.master": "local[*]"}
kwargs["custom_tags"] = {"ResourceClass": "SingleNode"}
kwargs["node_type_id"] = ws.clusters.select_node_type(local_disk=True)
elif "instance_pool_id" not in kwargs:
if "instance_pool_id" not in kwargs:
kwargs["node_type_id"] = ws.clusters.select_node_type(local_disk=True)

return ws.clusters.create(
Expand Down
33 changes: 7 additions & 26 deletions src/databricks/labs/ucx/workspace_access/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from abc import abstractmethod
from collections.abc import Callable, Iterator
from dataclasses import dataclass
from functools import partial
from logging import Logger
from typing import Literal

Expand All @@ -21,39 +20,21 @@ class Permissions:
Destination = Literal["backup", "account"]


class Crawler:
class AclSupport:
@abstractmethod
def get_crawler_tasks(self) -> Iterator[Callable[..., Permissions | None]]:
"""
This method should return a list of crawler tasks (e.g. partials or just any callables)
:return:
"""


# TODO: this class has to become typing.Protocol and keep only abstract methods
# See https://www.oreilly.com/library/view/fluent-python-2nd/9781492056348/ch13.html
class Applier:
@abstractmethod
def is_item_relevant(self, item: Permissions, migration_state: GroupMigrationState) -> bool:
"""TODO: remove it, see https://github.com/databrickslabs/ucx/issues/410"""

@abstractmethod
def _get_apply_task(
self, item: Permissions, migration_state: GroupMigrationState, destination: Destination
) -> partial:
"""
This method should return an instance of ApplierTask.
"""

def get_apply_task(
self, item: Permissions, migration_state: GroupMigrationState, destination: Destination
) -> partial:
# we explicitly put the relevance check here to avoid "forgotten implementation" in child classes
if self.is_item_relevant(item, migration_state):
return self._get_apply_task(item, migration_state, destination)
else:
) -> Callable[[], None] | None:
"""This method returns a Callable, that applies permissions to a destination group, based on
the group migration state. The callable is required not to have any shared mutable state."""

def noop():
pass

return partial(noop)
@abstractmethod
def object_types(self) -> set[str]:
"""This method returns a set of strings, that represent object types that are applicable by this instance."""
95 changes: 54 additions & 41 deletions src/databricks/labs/ucx/workspace_access/generic.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import json
import logging
from collections.abc import Callable, Iterator
Expand All @@ -11,8 +12,7 @@

from databricks.labs.ucx.mixins.hardening import rate_limited
from databricks.labs.ucx.workspace_access.base import (
Applier,
Crawler,
AclSupport,
Destination,
Permissions,
)
Expand All @@ -31,17 +31,49 @@ class RetryableError(DatabricksError):
pass


class GenericPermissionsSupport(Crawler, Applier):
def __init__(self, ws: WorkspaceClient, listings: list[Callable[..., Iterator[GenericPermissionsInfo]]]):
class Listing:
def __init__(self, func: Callable[..., list], id_attribute: str, object_type: str):
self._func = func
self._id_attribute = id_attribute
self._object_type = object_type

def object_types(self) -> set[str]:
return {self._object_type}

def __iter__(self):
started = datetime.datetime.now()
for item in self._func():
yield GenericPermissionsInfo(getattr(item, self._id_attribute), self._object_type)
since = datetime.datetime.now() - started
logger.info(f"Listed {self._object_type} in {since}")


class GenericPermissionsSupport(AclSupport):
def __init__(self, ws: WorkspaceClient, listings: list[Listing]):
self._ws = ws
self._listings = listings

def get_crawler_tasks(self):
for listing in self._listings:
for info in listing():
for info in listing:
yield partial(self._crawler_task, info.request_type, info.object_id)

def is_item_relevant(self, item: Permissions, migration_state: GroupMigrationState) -> bool:
def object_types(self) -> set[str]:
all_object_types = set()
for listing in self._listings:
for object_type in listing.object_types():
all_object_types.add(object_type)
return all_object_types

def get_apply_task(self, item: Permissions, migration_state: GroupMigrationState, destination: Destination):
if not self._is_item_relevant(item, migration_state):
return None
object_permissions = iam.ObjectPermissions.from_dict(json.loads(item.raw))
new_acl = self._prepare_new_acl(object_permissions, migration_state, destination)
return partial(self._applier_task, item.object_type, item.object_id, new_acl)

@staticmethod
def _is_item_relevant(item: Permissions, migration_state: GroupMigrationState) -> bool:
# passwords and tokens are represented on the workspace-level
if item.object_id in ("tokens", "passwords"):
return True
Expand All @@ -50,14 +82,6 @@ def is_item_relevant(self, item: Permissions, migration_state: GroupMigrationSta
]
return any(g in mentioned_groups for g in [info.workspace.display_name for info in migration_state.groups])

def _get_apply_task(
self, item: Permissions, migration_state: GroupMigrationState, destination: Destination
) -> partial:
new_acl = self._prepare_new_acl(
iam.ObjectPermissions.from_dict(json.loads(item.raw)), migration_state, destination
)
return partial(self._applier_task, item.object_type, item.object_id, new_acl)

@rate_limited(max_requests=30)
def _applier_task(self, object_type: str, object_id: str, acl: list[iam.AccessControlRequest]):
self._ws.permissions.update(object_type, object_id, access_control_list=acl)
Expand Down Expand Up @@ -121,20 +145,17 @@ def _prepare_new_acl(
return acl_requests


def listing_wrapper(
func: Callable[..., list], id_attribute: str, object_type: str
) -> Callable[..., Iterator[GenericPermissionsInfo]]:
def wrapper() -> Iterator[GenericPermissionsInfo]:
for item in func():
yield GenericPermissionsInfo(
object_id=getattr(item, id_attribute),
request_type=object_type,
)

return wrapper
class WorkspaceListing(Listing):
def __init__(self, ws: WorkspaceClient, num_threads=20, start_path: str | None = "/"):
super().__init__(..., ..., ...)
self._ws = ws
self._num_threads = num_threads
self._start_path = start_path

def object_types(self) -> set[str]:
return {"notebooks", "directories", "repos", "files"}

def workspace_listing(ws: WorkspaceClient, num_threads=20, start_path: str | None = "/"):
@staticmethod
def _convert_object_type_to_request_type(_object: workspace.ObjectInfo) -> str | None:
match _object.object_type:
case workspace.ObjectType.NOTEBOOK:
Expand All @@ -151,17 +172,15 @@ def _convert_object_type_to_request_type(_object: workspace.ObjectInfo) -> str |
case None:
return None

def inner():
def __iter__(self):
from databricks.labs.ucx.workspace_access.listing import WorkspaceListing

ws_listing = WorkspaceListing(ws, num_threads=num_threads, with_directories=False)
for _object in ws_listing.walk(start_path):
request_type = _convert_object_type_to_request_type(_object)
ws_listing = WorkspaceListing(self._ws, num_threads=self._num_threads, with_directories=False)
for _object in ws_listing.walk(self._start_path):
request_type = self._convert_object_type_to_request_type(_object)
if request_type:
yield GenericPermissionsInfo(object_id=str(_object.object_id), request_type=request_type)

return inner


def models_listing(ws: WorkspaceClient):
def inner() -> Iterator[ml.ModelDatabricks]:
Expand Down Expand Up @@ -193,12 +212,6 @@ def inner() -> Iterator[ml.Experiment]:
return inner


def authorization_listing():
def inner():
for _value in ["passwords", "tokens"]:
yield GenericPermissionsInfo(
object_id=_value,
request_type="authorization",
)

return inner
def tokens_and_passwords():
for _value in ["passwords", "tokens"]:
yield GenericPermissionsInfo(_value, "authorization")
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/workspace_access/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def is_in_scope(self, attr: str, group: Group) -> bool:
return False

def get_by_workspace_group_name(self, workspace_group_name: str) -> MigrationGroupInfo | None:
# TODO: this method is deprecated, replace all usages by get_target_principal()
found = [g for g in self.groups if g.workspace.display_name == workspace_group_name]
if len(found) == 0:
return None
Expand Down
Loading