Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crawl Table ACLs in all databases #122

Merged
merged 33 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
cd2bbf0
remove with_table_acls, add warehouse_id to configs
Aug 29, 2023
8dbef32
add entrypoint for tacl_job
Aug 29, 2023
4574565
add coverage
Aug 29, 2023
38b32cc
add config file
Aug 29, 2023
ce558fd
crawler first draft not working
Aug 29, 2023
b071150
lint
Aug 29, 2023
5cc2efc
cannot find wheel package
Aug 30, 2023
bb9641d
amend entrypoint
Aug 30, 2023
46e0c6b
create tacl job
Aug 30, 2023
d0274d0
progress with job creation dbfs wheel
Sep 1, 2023
b5d61f8
add entrypoint, replace - to _ in project name
Sep 1, 2023
30b7da8
draft commit
Sep 3, 2023
4cd7ca0
Merge branch 'main' into schedule-crawler-job
Sep 3, 2023
79ae854
add tacl config
Sep 3, 2023
2147cdf
add config, dbfs wheel
Sep 3, 2023
eb6a7c1
draft commit
Sep 3, 2023
ea9973a
add tacl dbs to config
Sep 4, 2023
2102626
fmt
Sep 4, 2023
3bdc4d4
tacl crawl support for multiple databases
Sep 4, 2023
2428a60
removed out of scope job scheduling
Sep 4, 2023
e159a4c
remove entrypoint
Sep 4, 2023
087d644
removed job scheduler test
Sep 4, 2023
ecb2dfd
fmt
Sep 4, 2023
ecec9a6
implement tacl all db crawler
Sep 4, 2023
34bc208
change database field to private
Sep 5, 2023
0f94680
change logging to logger
Sep 5, 2023
f763005
Merge branch 'main' into schedule-crawler-job
saraivdbx Sep 5, 2023
48d8bd6
implement state machine for crawlerbase
Sep 5, 2023
7624b55
Merge branch 'schedule-crawler-job' of https://github.com/databricks/…
Sep 5, 2023
1a9568e
replace migration config with taclconfig and inventory
Sep 5, 2023
e150735
replace taclconfig with databases
Sep 5, 2023
7e54360
change config for tacltoolkit
Sep 5, 2023
97fd520
Update tests/unit/test_grants.py
nfx Sep 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/migration_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ inventory:
database: default
name: uc_migration_inventory

tacl:
databases: [ "default" ]

with_table_acls: False
warehouse_id: None

groups:
selected: [ "analyst" ]
Expand Down
21 changes: 19 additions & 2 deletions notebooks/toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
# COMMAND ----------

from databricks.labs.ucx.toolkits.group_migration import GroupMigrationToolkit
from databricks.labs.ucx.config import MigrationConfig, InventoryConfig, GroupsConfig, InventoryTable
from databricks.labs.ucx.config import MigrationConfig, InventoryConfig, GroupsConfig, InventoryTable, TaclConfig
from databricks.labs.ucx.toolkits.table_acls import TaclToolkit

# COMMAND ----------

Expand All @@ -47,17 +48,23 @@
# COMMAND ----------

config = MigrationConfig(
with_table_acls=False,
inventory=InventoryConfig(table=InventoryTable(catalog="main", database="default", name="ucx_migration_inventory")),
groups=GroupsConfig(
# use this option to select specific groups manually
selected=["groupA", "groupB"],
# use this option to select all groups automatically
# auto=True
),
tacl=TaclConfig(
# use this option to select specific databases manually
databases=["default"],
# use this option to select all databases automatically
# auto=True
),
log_level="DEBUG",
)
toolkit = GroupMigrationToolkit(config)
tacltoolkit = TaclToolkit(toolkit._ws, config)

# COMMAND ----------

Expand Down Expand Up @@ -152,6 +159,16 @@

# COMMAND ----------

# MAGIC %md
# MAGIC
# MAGIC ## Inventorize Table ACL's

# COMMAND ----------

tacltoolkit.grants_snapshot()

# COMMAND ----------

# MAGIC %md
# MAGIC
# MAGIC ## Cleanup the inventory table
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,4 @@ exclude_lines = [
"no cov",
"if __name__ == .__main__.:",
"if TYPE_CHECKING:",
]
]
25 changes: 20 additions & 5 deletions src/databricks/labs/ucx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,28 @@ def from_dict(cls, raw: dict):
return cls(**raw)


@dataclass
class TaclConfig:
databases: list[str] | None = None
auto: bool | None = None

def __post_init__(self):
if not self.databases and self.auto is None:
msg = "Either selected or auto must be set"
raise ValueError(msg)
if self.databases and self.auto is False:
msg = "No selected groups provided, but auto-collection is disabled"
raise ValueError(msg)

@classmethod
def from_dict(cls, raw: dict):
return cls(**raw)


@dataclass
class MigrationConfig:
inventory: InventoryConfig
with_table_acls: bool
tacl: TaclConfig
groups: GroupsConfig
connect: ConnectConfig | None = None
num_threads: int | None = 4
Expand All @@ -102,9 +120,6 @@ class MigrationConfig:
def __post_init__(self):
if self.connect is None:
self.connect = ConnectConfig()
if self.with_table_acls:
msg = "Table ACLS are not yet implemented"
raise NotImplementedError(msg)

def as_dict(self) -> dict:
from dataclasses import fields, is_dataclass
Expand All @@ -126,7 +141,7 @@ def inner(x):
def from_dict(cls, raw: dict) -> "MigrationConfig":
return cls(
inventory=InventoryConfig.from_dict(raw.get("inventory", {})),
with_table_acls=raw.get("with_table_acls", False),
tacl=TaclConfig.from_dict(raw.get("tacl", {})),
groups=GroupsConfig.from_dict(raw.get("groups", {})),
connect=ConnectConfig.from_dict(raw.get("connect", {})),
num_threads=raw.get("num_threads", 4),
Expand Down
18 changes: 14 additions & 4 deletions src/databricks/labs/ucx/tacl/_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,25 @@ def _snapshot(self, klass, fetcher, loader) -> list[any]:
Returns:
list[any]: A list of data records, either fetched or loaded.
"""
loaded = False
trigger_load = ValueError("trigger records load")
while True:
try:
logger.debug(f"[{self._full_name}] fetching {self._table} inventory")
return list(fetcher())
cached_results = list(fetcher())
if len(cached_results) == 0 and loaded:
return cached_results
if len(cached_results) == 0 and not loaded:
raise trigger_load
return cached_results
except Exception as e:
if "TABLE_OR_VIEW_NOT_FOUND" not in str(e):
if not (e == trigger_load or "TABLE_OR_VIEW_NOT_FOUND" in str(e)):
raise e
logger.debug(f"[{self._full_name}] {self._table} inventory not found, crawling")
self._append_records(klass, loader())
logger.debug(f"[{self._full_name}] crawling new batch for {self._table}")
loaded_records = list(loader())
if len(loaded_records) > 0:
self._append_records(klass, loaded_records)
loaded = True

@staticmethod
def _row_to_sql(row, fields):
Expand Down
34 changes: 28 additions & 6 deletions src/databricks/labs/ucx/toolkits/table_acls.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import logging

from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.config import MigrationConfig
from databricks.labs.ucx.tacl._internal import (
RuntimeBackend,
SqlBackend,
Expand All @@ -8,17 +11,36 @@
from databricks.labs.ucx.tacl.grants import GrantsCrawler
from databricks.labs.ucx.tacl.tables import TablesCrawler

logger = logging.getLogger(__name__)


class TaclToolkit:
def __init__(self, ws: WorkspaceClient, inventory_catalog, inventory_schema, warehouse_id=None):
self._tc = TablesCrawler(self._backend(ws, warehouse_id), inventory_catalog, inventory_schema)
def __init__(self, ws: WorkspaceClient, config: MigrationConfig, warehouse_id=None):
saraivdbx marked this conversation as resolved.
Show resolved Hide resolved
self.inventory_catalog = config.inventory.table.catalog
self.inventory_schema = config.inventory.table.database

self._tc = TablesCrawler(self._backend(ws, warehouse_id), self.inventory_catalog, self.inventory_schema)
self._gc = GrantsCrawler(self._tc)

def database_snapshot(self, schema):
return self._tc.snapshot("hive_metastore", schema)
self._databases = (
config.tacl.databases
if config.tacl.databases
else [database.as_dict()["databaseName"] for database in self._tc._all_databases()]
)

def database_snapshot(self):
tables = []
for db in self._databases:
for t in self._tc.snapshot("hive_metastore", db):
tables.append(t)
return tables

def grants_snapshot(self, schema):
return self._gc.snapshot("hive_metastore", schema)
def grants_snapshot(self):
grants = []
for db in self._databases:
for grant in self._gc.snapshot("hive_metastore", db):
grants.append(grant)
return grants

@staticmethod
def _backend(ws: WorkspaceClient, warehouse_id: str | None = None) -> SqlBackend:
Expand Down
90 changes: 66 additions & 24 deletions tests/integration/test_tacls.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,29 @@

from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.config import (
GroupsConfig,
InventoryConfig,
InventoryTable,
MigrationConfig,
TaclConfig,
)
from databricks.labs.ucx.toolkits.table_acls import TaclToolkit

logger = logging.getLogger(__name__)


def test_describe_all_tables(ws: WorkspaceClient, make_catalog, make_schema, make_table):
def test_describe_all_tables_in_databases(ws: WorkspaceClient, make_catalog, make_schema, make_table):
warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"]

logger.info("setting up fixtures")
schema = make_schema(catalog="hive_metastore")
managed_table = make_table(schema=schema)
external_table = make_table(schema=schema, external=True)
tmp_table = make_table(schema=schema, ctas="SELECT 2+2 AS four")
view = make_table(schema=schema, ctas="SELECT 2+2 AS four", view=True)
non_delta = make_table(schema=schema, non_detla=True)
schema_a = make_schema(catalog="hive_metastore")
schema_b = make_schema(catalog="hive_metastore")
saraivdbx marked this conversation as resolved.
Show resolved Hide resolved
managed_table = make_table(schema=schema_a)
external_table = make_table(schema=schema_b, external=True)
tmp_table = make_table(schema=schema_a, ctas="SELECT 2+2 AS four")
view = make_table(schema=schema_b, ctas="SELECT 2+2 AS four", view=True)
non_delta = make_table(schema=schema_a, non_detla=True)

logger.info(
f"managed_table={managed_table}, "
Expand All @@ -28,10 +36,26 @@ def test_describe_all_tables(ws: WorkspaceClient, make_catalog, make_schema, mak

inventory_schema = make_schema(catalog=make_catalog())
inventory_catalog, inventory_schema = inventory_schema.split(".")
tak = TaclToolkit(ws, inventory_catalog, inventory_schema, warehouse_id)

databases = [schema_a.split(".")[1], schema_b.split(".")[1]]

config = MigrationConfig(
inventory=InventoryConfig(
table=InventoryTable(catalog=inventory_catalog, database=inventory_schema, name="ucx_migration_inventory")
),
groups=GroupsConfig(
auto=True,
),
tacl=TaclConfig(
databases=databases,
),
log_level="DEBUG",
)

tak = TaclToolkit(ws, config, warehouse_id)

all_tables = {}
for t in tak.database_snapshot(schema.split(".")[1]):
for t in tak.database_snapshot():
all_tables[t.key] = t

assert len(all_tables) == 5
Expand All @@ -43,28 +67,46 @@ def test_describe_all_tables(ws: WorkspaceClient, make_catalog, make_schema, mak
assert all_tables[view].view_text == "SELECT 2+2 AS four"


def test_all_grants_in_database(ws: WorkspaceClient, sql_exec, make_catalog, make_schema, make_table, make_group):
def test_all_grants_in_databases(ws: WorkspaceClient, sql_exec, make_catalog, make_schema, make_table, make_group):
warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"]

group_a = make_group()
group_b = make_group()
schema = make_schema()
table = make_table(schema=schema, external=True)
group_a = make_group(display_name="sdk_group_a")
saraivdbx marked this conversation as resolved.
Show resolved Hide resolved
group_b = make_group(display_name="sdk_group_b")
schema_a = make_schema()
schema_b = make_schema()
table_a = make_table(schema=schema_a)
table_b = make_table(schema=schema_b)

sql_exec(f"GRANT USAGE ON SCHEMA default TO `{group_a.display_name}`")
sql_exec(f"GRANT USAGE ON SCHEMA default TO `{group_b.display_name}`")
sql_exec(f"GRANT SELECT ON TABLE {table} TO `{group_a.display_name}`")
sql_exec(f"GRANT MODIFY ON SCHEMA {schema} TO `{group_b.display_name}`")
saraivdbx marked this conversation as resolved.
Show resolved Hide resolved
sql_exec(f"GRANT USAGE ON SCHEMA default TO {group_a.display_name}")
sql_exec(f"GRANT USAGE ON SCHEMA default TO {group_b.display_name}")
sql_exec(f"GRANT SELECT ON TABLE {table_a} TO {group_a.display_name}")
sql_exec(f"GRANT SELECT ON TABLE {table_b} TO {group_b.display_name}")
sql_exec(f"GRANT MODIFY ON SCHEMA {schema_b} TO {group_b.display_name}")

inventory_schema = make_schema(catalog=make_catalog())
inventory_catalog, inventory_schema = inventory_schema.split(".")
tak = TaclToolkit(ws, inventory_catalog, inventory_schema, warehouse_id)

config = MigrationConfig(
inventory=InventoryConfig(
table=InventoryTable(catalog=inventory_catalog, database=inventory_schema, name="ucx_migration_inventory")
),
groups=GroupsConfig(
auto=True,
),
tacl=TaclConfig(
auto=True,
),
log_level="DEBUG",
)

tak = TaclToolkit(ws, config, warehouse_id)

all_grants = {}
for grant in tak.grants_snapshot(schema.split(".")[1]):
logger.info(f"grant:\n{grant}\n hive: {grant.hive_grant_sql()}\n uc: {grant.uc_grant_sql()}")
for grant in tak.grants_snapshot():
logging.info(f"grant:\n{grant}\n hive: {grant.hive_grant_sql()}\n uc: {grant.uc_grant_sql()}")
all_grants[f"{grant.principal}.{grant.object_key}"] = grant.action_type

assert len(all_grants) >= 2, "must have at least two grants"
assert all_grants[f"{group_a.display_name}.{table}"] == "SELECT"
assert all_grants[f"{group_b.display_name}.{schema}"] == "MODIFY"
assert len(all_grants) >= 3, "must have at least three grants"
assert all_grants[f"{group_a.display_name}.{table_a}"] == "SELECT"
assert all_grants[f"{group_b.display_name}.{table_b}"] == "SELECT"
assert all_grants[f"{group_b.display_name}.{schema_b}"] == "MODIFY"
12 changes: 5 additions & 7 deletions tests/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
from functools import partial
from pathlib import Path

import pytest
import yaml

from databricks.labs.ucx.config import (
GroupsConfig,
InventoryConfig,
InventoryTable,
MigrationConfig,
TaclConfig,
)


Expand All @@ -19,12 +19,9 @@ def test_initialization():
MigrationConfig,
inventory=InventoryConfig(table=InventoryTable(catalog="catalog", database="database", name="name")),
groups=GroupsConfig(auto=True),
tacl=TaclConfig(databases=["default"]),
)

with pytest.raises(NotImplementedError):
mc(with_table_acls=True)

mc(with_table_acls=False)
mc()


# path context manager
Expand Down Expand Up @@ -54,9 +51,10 @@ def test_reader(tmp_path: Path):
MigrationConfig,
inventory=InventoryConfig(table=InventoryTable(catalog="catalog", database="database", name="name")),
groups=GroupsConfig(auto=True),
tacl=TaclConfig(databases=["default"]),
)

config: MigrationConfig = mc(with_table_acls=False)
config: MigrationConfig = mc()
config_file = tmp_path / "config.yml"

as_dict = config.as_dict()
Expand Down
Loading