Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that table exists, even when crawlers produce zero records #373

Merged
merged 2 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/databricks/labs/ucx/assessment/crawlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def spark_version_compatibility(spark_version: str) -> str:

class PipelinesCrawler(CrawlerBase):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "pipelines")
super().__init__(sbe, "hive_metastore", schema, "pipelines", PipelineInfo)
self._ws = ws

def _crawl(self) -> list[PipelineInfo]:
Expand Down Expand Up @@ -117,7 +117,7 @@ def _try_fetch(self) -> list[PipelineInfo]:

class ClustersCrawler(CrawlerBase):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "clusters")
super().__init__(sbe, "hive_metastore", schema, "clusters", ClusterInfo)
self._ws = ws

def _crawl(self) -> list[ClusterInfo]:
Expand Down Expand Up @@ -174,10 +174,11 @@ def _try_fetch(self) -> list[ClusterInfo]:

class JobsCrawler(CrawlerBase):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "jobs")
super().__init__(sbe, "hive_metastore", schema, "jobs", JobInfo)
self._ws = ws

def _get_cluster_configs_from_all_jobs(self, all_jobs, all_clusters_by_id):
@staticmethod
def _get_cluster_configs_from_all_jobs(all_jobs, all_clusters_by_id):
for j in all_jobs:
if j.settings.job_clusters is not None:
for jc in j.settings.job_clusters:
Expand Down
24 changes: 12 additions & 12 deletions src/databricks/labs/ucx/framework/crawlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ def fetch(self, sql) -> Iterator[any]:
raise NotImplementedError

@abstractmethod
def save_table(self, full_name: str, rows: list[any], mode: str = "append"):
def save_table(self, full_name: str, rows: list[any], klass: type, mode: str = "append"):
raise NotImplementedError

def create_table(self, full_name: str, klass: type):
ddl = f"CREATE TABLE IF NOT EXISTS {full_name} ({self._schema_for(klass)}) USING DELTA"
self.execute(ddl)

_builtin_type_mapping: ClassVar[dict[type, str]] = {str: "STRING", int: "INT", bool: "BOOLEAN", float: "FLOAT"}

@classmethod
Expand Down Expand Up @@ -82,18 +86,14 @@ def fetch(self, sql) -> Iterator[any]:
logger.debug(f"[api][fetch] {sql}")
return self._sql.execute_fetch_all(self._warehouse_id, sql)

def save_table(self, full_name: str, rows: list[any], mode="append"):
def save_table(self, full_name: str, rows: list[any], klass: type, mode="append"):
if mode == "overwrite":
msg = "Overwrite mode is not yet supported"
raise NotImplementedError(msg)
rows = self._filter_none_rows(rows, full_name)
self.create_table(full_name, klass)
if len(rows) == 0:
return

klass = rows[0].__class__
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we got the klass by looking at the first record, now we add it as the explicit argument. Do we really want to add explicit argument?

ddl = f"CREATE TABLE IF NOT EXISTS {full_name} ({self._schema_for(klass)}) USING DELTA"
self.execute(ddl)

fields = dataclasses.fields(klass)
field_names = [f.name for f in fields]
for i in range(0, len(rows), self._max_records_per_batch):
Expand Down Expand Up @@ -140,18 +140,19 @@ def fetch(self, sql) -> Iterator[any]:
logger.debug(f"[spark][fetch] {sql}")
return self._spark.sql(sql).collect()

def save_table(self, full_name: str, rows: list[any], mode: str = "append"):
def save_table(self, full_name: str, rows: list[any], klass: type, mode: str = "append"):
rows = self._filter_none_rows(rows, full_name)

if len(rows) == 0:
self.create_table(full_name, klass)
return
# pyspark deals well with lists of dataclass instances, as long as schema is provided
df = self._spark.createDataFrame(rows, self._schema_for(rows[0]))
df.write.saveAsTable(full_name, mode=mode)


class CrawlerBase:
def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str):
def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, klass: type):
"""
Initializes a CrawlerBase instance.

Expand All @@ -168,6 +169,7 @@ def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str):
self._backend = backend
self._fetch = backend.fetch
self._exec = backend.execute
self._klass = klass

@property
def _full_name(self) -> str:
Expand Down Expand Up @@ -244,7 +246,5 @@ def _snapshot(self, fetcher, loader) -> list[any]:
return loaded_records

def _append_records(self, items):
if len(items) == 0:
return
logger.debug(f"[{self._full_name}] found {len(items)} new records for {self._table}")
self._backend.save_table(self._full_name, items, mode="append")
self._backend.save_table(self._full_name, items, self._klass, mode="append")
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/hive_metastore/data_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ExternalLocationCrawler(CrawlerBase):
_prefix_size: typing.ClassVar[list[int]] = [1, 12]

def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "external_locations")
super().__init__(sbe, "hive_metastore", schema, "external_locations", ExternalLocation)
self._ws = ws

def _external_locations(self, tables: list[Row], mounts) -> list[ExternalLocation]:
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/hive_metastore/grants.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def uc_grant_sql(self):

class GrantsCrawler(CrawlerBase):
def __init__(self, tc: TablesCrawler):
super().__init__(tc._backend, tc._catalog, tc._schema, "grants")
super().__init__(tc._backend, tc._catalog, tc._schema, "grants", Grant)
self._tc = tc

def snapshot(self) -> list[Grant]:
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/hive_metastore/mounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Mount:

class Mounts(CrawlerBase):
def __init__(self, backend: SqlBackend, ws: WorkspaceClient, inventory_database: str):
super().__init__(backend, "hive_metastore", inventory_database, "mounts")
super().__init__(backend, "hive_metastore", inventory_database, "mounts", Mount)
self._dbutils = ws.dbutils

def inventorize_mounts(self):
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(self, backend: SqlBackend, schema):
backend (SqlBackend): The SQL Execution Backend abstraction (either REST API or Spark)
schema: The schema name for the inventory persistence.
"""
super().__init__(backend, "hive_metastore", schema, "tables")
super().__init__(backend, "hive_metastore", schema, "tables", Table)

def _all_databases(self) -> Iterator[Row]:
yield from self._fetch("SHOW DATABASES")
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/workspace_access/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class PermissionManager(CrawlerBase):
def __init__(
self, backend: SqlBackend, inventory_database: str, crawlers: list[Crawler], appliers: dict[str, Applier]
):
super().__init__(backend, "hive_metastore", inventory_database, "permissions")
super().__init__(backend, "hive_metastore", inventory_database, "permissions", Permissions)
self._crawlers = crawlers
self._appliers = appliers

Expand Down
5 changes: 3 additions & 2 deletions tests/unit/framework/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ def fetch(self, sql) -> Iterator[any]:
logger.debug(f"Returning rows: {rows}")
return iter(rows)

def save_table(self, full_name: str, rows: list[any], mode: str = "append"):
self._save_table.append((full_name, rows, mode))
def save_table(self, full_name: str, rows: list[any], klass, mode: str = "append"):
if klass.__class__ == type:
self._save_table.append((full_name, rows, mode))

def rows_written_for(self, full_name: str, mode: str) -> list[any]:
rows = []
Expand Down
27 changes: 13 additions & 14 deletions tests/unit/framework/test_crawlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ class Bar:

def test_invalid():
with pytest.raises(ValueError):
CrawlerBase(MockBackend(), "a.a.a", "b", "c")
CrawlerBase(MockBackend(), "a.a.a", "b", "c", Bar)


def test_full_name():
cb = CrawlerBase(MockBackend(), "a", "b", "c")
cb = CrawlerBase(MockBackend(), "a", "b", "c", Bar)
assert "a.b.c" == cb._full_name


def test_snapshot_appends_to_existing_table():
b = MockBackend()
cb = CrawlerBase(b, "a", "b", "c")
cb = CrawlerBase(b, "a", "b", "c", Bar)

result = cb._snapshot(fetcher=lambda: [], loader=lambda: [Foo(first="first", second=True)])

Expand All @@ -55,7 +55,7 @@ def test_snapshot_appends_to_existing_table():

def test_snapshot_appends_to_new_table():
b = MockBackend()
cb = CrawlerBase(b, "a", "b", "c")
cb = CrawlerBase(b, "a", "b", "c", Bar)

def fetcher():
msg = ".. TABLE_OR_VIEW_NOT_FOUND .."
Expand All @@ -69,7 +69,7 @@ def fetcher():

def test_snapshot_wrong_error():
b = MockBackend()
cb = CrawlerBase(b, "a", "b", "c")
cb = CrawlerBase(b, "a", "b", "c", Bar)

def fetcher():
msg = "always fails"
Expand Down Expand Up @@ -119,25 +119,25 @@ def test_statement_execution_backend_fetch_happy(mocker):
def test_statement_execution_backend_save_table_overwrite(mocker):
seb = StatementExecutionBackend(mocker.Mock(), "abc")
with pytest.raises(NotImplementedError):
seb.save_table("a.b.c", [1, 2, 3], mode="overwrite")
seb.save_table("a.b.c", [1, 2, 3], Bar, mode="overwrite")


def test_statement_execution_backend_save_table_empty_records(mocker):
execute_sql = mocker.patch("databricks.labs.ucx.mixins.sql.StatementExecutionExt.execute")

seb = StatementExecutionBackend(mocker.Mock(), "abc")

seb.save_table("a.b.c", [])
seb.save_table("a.b.c", [], Bar)

execute_sql.assert_not_called()
execute_sql.assert_called()


def test_statement_execution_backend_save_table_two_records(mocker):
execute_sql = mocker.patch("databricks.labs.ucx.mixins.sql.StatementExecutionExt.execute")

seb = StatementExecutionBackend(mocker.Mock(), "abc")

seb.save_table("a.b.c", [Foo("aaa", True), Foo("bbb", False)])
seb.save_table("a.b.c", [Foo("aaa", True), Foo("bbb", False)], Foo)

assert [
mocker.call(
Expand All @@ -152,8 +152,7 @@ def test_statement_execution_backend_save_table_in_batches_of_two(mocker):

seb = StatementExecutionBackend(mocker.Mock(), "abc", max_records_per_batch=2)

seb.save_table("a.b.c", [Foo("aaa", True), Foo("bbb", False), Foo("ccc", True)])

seb.save_table("a.b.c", [Foo("aaa", True), Foo("bbb", False), Foo("ccc", True)], Foo)
assert [
mocker.call(
"abc", "CREATE TABLE IF NOT EXISTS a.b.c (first STRING NOT NULL, second BOOLEAN NOT NULL) USING DELTA"
Expand Down Expand Up @@ -203,7 +202,7 @@ def test_runtime_backend_save_table(mocker):

rb = RuntimeBackend()

rb.save_table("a.b.c", [Foo("aaa", True), Foo("bbb", False)])
rb.save_table("a.b.c", [Foo("aaa", True), Foo("bbb", False)], Bar)

rb._spark.createDataFrame.assert_called_with(
[Foo(first="aaa", second=True), Foo(first="bbb", second=False)],
Expand All @@ -221,7 +220,7 @@ def test_runtime_backend_save_table_with_row_containing_none(mocker):

rb = RuntimeBackend()

rb.save_table("a.b.c", [Foo("aaa", True), Foo("bbb", False), Foo("ccc", None)])
rb.save_table("a.b.c", [Foo("aaa", True), Foo("bbb", False), Foo("ccc", None)], Bar)

rb._spark.createDataFrame.assert_called_with(
[Foo(first="aaa", second=True), Foo(first="bbb", second=False)],
Expand All @@ -239,7 +238,7 @@ def test_runtime_backend_save_table_with_row_containing_none_with_nullable_class

rb = RuntimeBackend()

rb.save_table("a.b.c", [Baz("aaa", "ccc"), Baz("bbb", None)])
rb.save_table("a.b.c", [Baz("aaa", "ccc"), Baz("bbb", None)], Bar)

rb._spark.createDataFrame.assert_called_with(
[Baz(first="aaa", second="ccc"), Baz(first="bbb", second=None)],
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/hive_metastore/test_mounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ def test_list_mounts_should_return_a_list_of_mount_without_encryption_type():
instance.inventorize_mounts()

expected = [Mount("mp_1", "path_1"), Mount("mp_2", "path_2")]
assert backend.rows_written_for("hive_metastore.test.mounts", "append") == expected
assert expected == backend.rows_written_for("hive_metastore.test.mounts", "append")