Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crawl Table ACLs in all databases #122

Merged
merged 33 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
cd2bbf0
remove with_table_acls, add warehouse_id to configs
Aug 29, 2023
8dbef32
add entrypoint for tacl_job
Aug 29, 2023
4574565
add coverage
Aug 29, 2023
38b32cc
add config file
Aug 29, 2023
ce558fd
crawler first draft not working
Aug 29, 2023
b071150
lint
Aug 29, 2023
5cc2efc
cannot find wheel package
Aug 30, 2023
bb9641d
amend entrypoint
Aug 30, 2023
46e0c6b
create tacl job
Aug 30, 2023
d0274d0
progress with job creation dbfs wheel
Sep 1, 2023
b5d61f8
add entrypoint, replace - to _ in project name
Sep 1, 2023
30b7da8
draft commit
Sep 3, 2023
4cd7ca0
Merge branch 'main' into schedule-crawler-job
Sep 3, 2023
79ae854
add tacl config
Sep 3, 2023
2147cdf
add config, dbfs wheel
Sep 3, 2023
eb6a7c1
draft commit
Sep 3, 2023
ea9973a
add tacl dbs to config
Sep 4, 2023
2102626
fmt
Sep 4, 2023
3bdc4d4
tacl crawl support for multiple databases
Sep 4, 2023
2428a60
removed out of scope job scheduling
Sep 4, 2023
e159a4c
remove entrypoint
Sep 4, 2023
087d644
removed job scheduler test
Sep 4, 2023
ecb2dfd
fmt
Sep 4, 2023
ecec9a6
implement tacl all db crawler
Sep 4, 2023
34bc208
change database field to private
Sep 5, 2023
0f94680
change logging to logger
Sep 5, 2023
f763005
Merge branch 'main' into schedule-crawler-job
saraivdbx Sep 5, 2023
48d8bd6
implement state machine for crawlerbase
Sep 5, 2023
7624b55
Merge branch 'schedule-crawler-job' of https://github.com/databricks/…
Sep 5, 2023
1a9568e
replace migration config with taclconfig and inventory
Sep 5, 2023
e150735
replace taclconfig with databases
Sep 5, 2023
7e54360
change config for tacltoolkit
Sep 5, 2023
97fd520
Update tests/unit/test_grants.py
nfx Sep 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/migration_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ inventory:
database: default
name: uc_migration_inventory

tacl:
databases: [ "default" ]

with_table_acls: False
warehouse_id: None

groups:
selected: [ "analyst" ]
Expand Down
21 changes: 19 additions & 2 deletions notebooks/toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
# COMMAND ----------

from databricks.labs.ucx.toolkits.group_migration import GroupMigrationToolkit
from databricks.labs.ucx.config import MigrationConfig, InventoryConfig, GroupsConfig, InventoryTable
from databricks.labs.ucx.config import MigrationConfig, InventoryConfig, GroupsConfig, InventoryTable, TaclConfig
from databricks.labs.ucx.toolkits.table_acls import TaclToolkit

# COMMAND ----------

Expand All @@ -47,17 +48,23 @@
# COMMAND ----------

config = MigrationConfig(
with_table_acls=False,
inventory=InventoryConfig(table=InventoryTable(catalog="main", database="default", name="ucx_migration_inventory")),
groups=GroupsConfig(
# use this option to select specific groups manually
selected=["groupA", "groupB"],
# use this option to select all groups automatically
# auto=True
),
tacl=TaclConfig(
# use this option to select specific databases manually
databases=["default"],
# use this option to select all databases automatically
# auto=True
),
log_level="DEBUG",
)
toolkit = GroupMigrationToolkit(config)
tacltoolkit = TaclToolkit(toolkit._ws, config)

# COMMAND ----------

Expand Down Expand Up @@ -152,6 +159,16 @@

# COMMAND ----------

# MAGIC %md
# MAGIC
# MAGIC ## Inventorize Table ACL's

# COMMAND ----------

tacltoolkit.grants_snapshot()

# COMMAND ----------

# MAGIC %md
# MAGIC
# MAGIC ## Cleanup the inventory table
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,4 @@ exclude_lines = [
"no cov",
"if __name__ == .__main__.:",
"if TYPE_CHECKING:",
]
]
25 changes: 20 additions & 5 deletions src/databricks/labs/ucx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,28 @@ def from_dict(cls, raw: dict):
return cls(**raw)


@dataclass
class TaclConfig:
databases: list[str] | None = None
auto: bool | None = None

def __post_init__(self):
if not self.databases and self.auto is None:
msg = "Either selected or auto must be set"
raise ValueError(msg)
if self.databases and self.auto is False:
msg = "No selected groups provided, but auto-collection is disabled"
raise ValueError(msg)

@classmethod
def from_dict(cls, raw: dict):
return cls(**raw)


@dataclass
class MigrationConfig:
inventory: InventoryConfig
with_table_acls: bool
tacl: TaclConfig
groups: GroupsConfig
connect: ConnectConfig | None = None
num_threads: int | None = 4
Expand All @@ -102,9 +120,6 @@ class MigrationConfig:
def __post_init__(self):
if self.connect is None:
self.connect = ConnectConfig()
if self.with_table_acls:
msg = "Table ACLS are not yet implemented"
raise NotImplementedError(msg)

def as_dict(self) -> dict:
from dataclasses import fields, is_dataclass
Expand All @@ -126,7 +141,7 @@ def inner(x):
def from_dict(cls, raw: dict) -> "MigrationConfig":
return cls(
inventory=InventoryConfig.from_dict(raw.get("inventory", {})),
with_table_acls=raw.get("with_table_acls", False),
tacl=TaclConfig.from_dict(raw.get("tacl", {})),
groups=GroupsConfig.from_dict(raw.get("groups", {})),
connect=ConnectConfig.from_dict(raw.get("connect", {})),
num_threads=raw.get("num_threads", 4),
Expand Down
25 changes: 12 additions & 13 deletions src/databricks/labs/ucx/tacl/grants.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,24 +118,20 @@ def __init__(self, tc: TablesCrawler):
super().__init__(tc._backend, tc._catalog, tc._schema, "grants")
self._tc = tc

def snapshot(self, catalog: str, database: str) -> list[Grant]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this changeset looks larger than necessary. why justifies changing the signatures of 5 downstream methods versus iterating over databases in a loop?

doesn't this code look more maintainable in comparison?

for db in databases:
    for grant in grants_cralwer.snapshot('hive_metastore', db):
        yield grant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up implementing this approach because I attempted the same versions - yield and return - just like what you suggested - and both resulted in only taking a snapshot of the first database in the iteration.

return self._snapshot(
Grant, partial(self._try_load, catalog, database), partial(self._crawl, catalog, database)
)
def snapshot(self, catalog: str, databases: list) -> list[Grant]:
return self._snapshot(Grant, partial(self._try_load, catalog), partial(self._crawl, catalog, databases))

def _try_load(self, catalog: str, database: str):
for row in self._fetch(
f'SELECT * FROM {self._full_name} WHERE catalog = "{catalog}" AND database = "{database}"'
):
def _try_load(self, catalog: str):
for row in self._fetch(f'SELECT * FROM {self._full_name} WHERE catalog = "{catalog}"'):
saraivdbx marked this conversation as resolved.
Show resolved Hide resolved
yield Grant(*row)

def _crawl(self, catalog: str, database: str) -> list[Grant]:
def _crawl(self, catalog: str, databases: list) -> list[Grant]:
"""
Crawls and lists grants for tables and views within the specified catalog and database.

Args:
catalog (str): The catalog name.
database (str): The database name.
list[databases]: List of names of databases.

Returns:
list[Grant]: A list of Grant objects representing the listed grants.
Expand All @@ -157,9 +153,12 @@ def _crawl(self, catalog: str, database: str) -> list[Grant]:
list[Grant]: A list of Grant objects representing the grants found in the specified catalog and database.
"""
catalog = self._valid(catalog)
database = self._valid(database)
tasks = [partial(self._grants, catalog=catalog), partial(self._grants, catalog=catalog, database=database)]
for table in self._tc.snapshot(catalog, database):
tasks = [partial(self._grants, catalog=catalog)]
table_snapshot = self._tc.snapshot(catalog, databases)
for database in databases:
tasks.append(partial(self._grants, catalog=catalog, database=database))
saraivdbx marked this conversation as resolved.
Show resolved Hide resolved
for table in table_snapshot:
database = table.database
fn = partial(self._grants, catalog=catalog, database=database)
saraivdbx marked this conversation as resolved.
Show resolved Hide resolved
if table.kind == "VIEW":
tasks.append(partial(fn, view=table.name))
Expand Down
28 changes: 13 additions & 15 deletions src/databricks/labs/ucx/tacl/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,40 +84,38 @@ def __init__(self, backend: SqlBackend, catalog, schema):
def _all_databases(self) -> Iterator[str]:
yield from self._fetch("SHOW DATABASES")

def snapshot(self, catalog: str, database: str) -> list[Table]:
def snapshot(self, catalog: str, databases: list) -> list[Table]:
"""
Takes a snapshot of tables in the specified catalog and database.

Args:
catalog (str): The catalog name.
database (str): The database name.
databases (list): The list of names of databases.

Returns:
list[Table]: A list of Table objects representing the snapshot of tables.
"""
return self._snapshot(
Table, partial(self._try_load, catalog, database), partial(self._crawl, catalog, database)
)

def _try_load(self, catalog: str, database: str):
return self._snapshot(Table, partial(self._try_load, catalog), partial(self._crawl, catalog, databases))

def _try_load(self, catalog: str):
"""Tries to load table information from the database or throws TABLE_OR_VIEW_NOT_FOUND error"""
for row in self._fetch(
f'SELECT * FROM {self._full_name} WHERE catalog = "{catalog}" AND database = "{database}"'
):
for row in self._fetch(f'SELECT * FROM {self._full_name} WHERE catalog = "{catalog}"'):
saraivdbx marked this conversation as resolved.
Show resolved Hide resolved
yield Table(*row)

def _crawl(self, catalog: str, database: str) -> list[Table]:
def _crawl(self, catalog: str, databases: list) -> list[Table]:
"""Crawls and lists tables within the specified catalog and database.

After performing initial scan of all tables, starts making parallel
DESCRIBE TABLE EXTENDED queries for every table.
"""
catalog = self._valid(catalog)
database = self._valid(database)
logger.debug(f"[{catalog}.{database}] listing tables")
tasks = []
for _, table, _is_tmp in self._fetch(f"SHOW TABLES FROM {catalog}.{database}"):
tasks.append(partial(self._describe, catalog, database, table))
for db in databases:
catalog = self._valid(catalog)
database = self._valid(db)
logger.debug(f"[{catalog}.{database}] listing tables")
for _, table, _is_tmp in self._fetch(f"SHOW TABLES FROM {catalog}.{database}"):
tasks.append(partial(self._describe, catalog, database, table))
return ThreadedExecution.gather("listing tables", tasks)

def _describe(self, catalog: str, database: str, table: str) -> Table:
Expand Down
26 changes: 20 additions & 6 deletions src/databricks/labs/ucx/toolkits/table_acls.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import logging

from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.config import MigrationConfig
from databricks.labs.ucx.tacl._internal import (
RuntimeBackend,
SqlBackend,
Expand All @@ -8,17 +11,28 @@
from databricks.labs.ucx.tacl.grants import GrantsCrawler
from databricks.labs.ucx.tacl.tables import TablesCrawler

logger = logging.getLogger(__name__)


class TaclToolkit:
def __init__(self, ws: WorkspaceClient, inventory_catalog, inventory_schema, warehouse_id=None):
self._tc = TablesCrawler(self._backend(ws, warehouse_id), inventory_catalog, inventory_schema)
def __init__(self, ws: WorkspaceClient, config: MigrationConfig, warehouse_id=None):
saraivdbx marked this conversation as resolved.
Show resolved Hide resolved
self.inventory_catalog = config.inventory.table.catalog
self.inventory_schema = config.inventory.table.database

self._tc = TablesCrawler(self._backend(ws, warehouse_id), self.inventory_catalog, self.inventory_schema)
self._gc = GrantsCrawler(self._tc)

def database_snapshot(self, schema):
return self._tc.snapshot("hive_metastore", schema)
self._databases = (
config.tacl.databases
if config.tacl.databases
else [database.as_dict()["databaseName"] for database in self._tc._all_databases()]
)

def database_snapshot(self):
return self._tc.snapshot("hive_metastore", self._databases)

def grants_snapshot(self, schema):
return self._gc.snapshot("hive_metastore", schema)
def grants_snapshot(self):
return self._gc.snapshot("hive_metastore", self._databases)

@staticmethod
def _backend(ws: WorkspaceClient, warehouse_id: str | None = None) -> SqlBackend:
Expand Down
90 changes: 66 additions & 24 deletions tests/integration/test_tacls.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,29 @@

from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.config import (
GroupsConfig,
InventoryConfig,
InventoryTable,
MigrationConfig,
TaclConfig,
)
from databricks.labs.ucx.toolkits.table_acls import TaclToolkit

logger = logging.getLogger(__name__)


def test_describe_all_tables(ws: WorkspaceClient, make_catalog, make_schema, make_table):
def test_describe_all_tables_in_databases(ws: WorkspaceClient, make_catalog, make_schema, make_table):
warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"]

logger.info("setting up fixtures")
schema = make_schema(catalog="hive_metastore")
managed_table = make_table(schema=schema)
external_table = make_table(schema=schema, external=True)
tmp_table = make_table(schema=schema, ctas="SELECT 2+2 AS four")
view = make_table(schema=schema, ctas="SELECT 2+2 AS four", view=True)
non_delta = make_table(schema=schema, non_detla=True)
schema_a = make_schema(catalog="hive_metastore")
schema_b = make_schema(catalog="hive_metastore")
saraivdbx marked this conversation as resolved.
Show resolved Hide resolved
managed_table = make_table(schema=schema_a)
external_table = make_table(schema=schema_b, external=True)
tmp_table = make_table(schema=schema_a, ctas="SELECT 2+2 AS four")
view = make_table(schema=schema_b, ctas="SELECT 2+2 AS four", view=True)
non_delta = make_table(schema=schema_a, non_detla=True)

logger.info(
f"managed_table={managed_table}, "
Expand All @@ -28,10 +36,26 @@ def test_describe_all_tables(ws: WorkspaceClient, make_catalog, make_schema, mak

inventory_schema = make_schema(catalog=make_catalog())
inventory_catalog, inventory_schema = inventory_schema.split(".")
tak = TaclToolkit(ws, inventory_catalog, inventory_schema, warehouse_id)

databases = [schema_a.split(".")[1], schema_b.split(".")[1]]

config = MigrationConfig(
inventory=InventoryConfig(
table=InventoryTable(catalog=inventory_catalog, database=inventory_schema, name="ucx_migration_inventory")
),
groups=GroupsConfig(
auto=True,
),
tacl=TaclConfig(
databases=databases,
),
log_level="DEBUG",
)

tak = TaclToolkit(ws, config, warehouse_id)

all_tables = {}
for t in tak.database_snapshot(schema.split(".")[1]):
for t in tak.database_snapshot():
all_tables[t.key] = t

assert len(all_tables) == 5
Expand All @@ -43,28 +67,46 @@ def test_describe_all_tables(ws: WorkspaceClient, make_catalog, make_schema, mak
assert all_tables[view].view_text == "SELECT 2+2 AS four"


def test_all_grants_in_database(ws: WorkspaceClient, sql_exec, make_catalog, make_schema, make_table, make_group):
def test_all_grants_in_databases(ws: WorkspaceClient, sql_exec, make_catalog, make_schema, make_table, make_group):
warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"]

group_a = make_group()
group_b = make_group()
schema = make_schema()
table = make_table(schema=schema, external=True)
group_a = make_group(display_name="sdk_group_a")
saraivdbx marked this conversation as resolved.
Show resolved Hide resolved
group_b = make_group(display_name="sdk_group_b")
schema_a = make_schema()
schema_b = make_schema()
table_a = make_table(schema=schema_a)
table_b = make_table(schema=schema_b)

sql_exec(f"GRANT USAGE ON SCHEMA default TO `{group_a.display_name}`")
sql_exec(f"GRANT USAGE ON SCHEMA default TO `{group_b.display_name}`")
sql_exec(f"GRANT SELECT ON TABLE {table} TO `{group_a.display_name}`")
sql_exec(f"GRANT MODIFY ON SCHEMA {schema} TO `{group_b.display_name}`")
saraivdbx marked this conversation as resolved.
Show resolved Hide resolved
sql_exec(f"GRANT USAGE ON SCHEMA default TO {group_a.display_name}")
sql_exec(f"GRANT USAGE ON SCHEMA default TO {group_b.display_name}")
sql_exec(f"GRANT SELECT ON TABLE {table_a} TO {group_a.display_name}")
sql_exec(f"GRANT SELECT ON TABLE {table_b} TO {group_b.display_name}")
sql_exec(f"GRANT MODIFY ON SCHEMA {schema_b} TO {group_b.display_name}")

inventory_schema = make_schema(catalog=make_catalog())
inventory_catalog, inventory_schema = inventory_schema.split(".")
tak = TaclToolkit(ws, inventory_catalog, inventory_schema, warehouse_id)

config = MigrationConfig(
inventory=InventoryConfig(
table=InventoryTable(catalog=inventory_catalog, database=inventory_schema, name="ucx_migration_inventory")
),
groups=GroupsConfig(
auto=True,
),
tacl=TaclConfig(
auto=True,
),
log_level="DEBUG",
)

tak = TaclToolkit(ws, config, warehouse_id)

all_grants = {}
for grant in tak.grants_snapshot(schema.split(".")[1]):
logger.info(f"grant:\n{grant}\n hive: {grant.hive_grant_sql()}\n uc: {grant.uc_grant_sql()}")
for grant in tak.grants_snapshot():
logging.info(f"grant:\n{grant}\n hive: {grant.hive_grant_sql()}\n uc: {grant.uc_grant_sql()}")
all_grants[f"{grant.principal}.{grant.object_key}"] = grant.action_type

assert len(all_grants) >= 2, "must have at least two grants"
assert all_grants[f"{group_a.display_name}.{table}"] == "SELECT"
assert all_grants[f"{group_b.display_name}.{schema}"] == "MODIFY"
assert len(all_grants) >= 3, "must have at least three grants"
assert all_grants[f"{group_a.display_name}.{table_a}"] == "SELECT"
assert all_grants[f"{group_b.display_name}.{table_b}"] == "SELECT"
assert all_grants[f"{group_b.display_name}.{schema_b}"] == "MODIFY"
Loading