Skip to content

Commit

Permalink
Track UsedTables on TableProgressEncoder (#3373)
Browse files Browse the repository at this point in the history
<!-- REMOVE IRRELEVANT COMMENTS BEFORE CREATING A PULL REQUEST -->
## Changes

Track `UsedTables` on `TableProgressEncoder`

### Linked issues

Resolves #3061

### Functionality

- [x] modified existing workflow: `migration-progress-experimental`


### Tests

- [ ] manually tested
- [x] added unit tests
- [x] added integration tests
  • Loading branch information
JCZuurmond authored Nov 26, 2024
1 parent 780008e commit bccc103
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 35 deletions.
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def tables_progress(self) -> ProgressEncoder[Table]:
self.sql_backend,
self.table_ownership,
self.migration_status_refresher,
[self.used_tables_crawler_for_paths, self.used_tables_crawler_for_queries],
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
Expand Down
50 changes: 34 additions & 16 deletions src/databricks/labs/ucx/progress/tables.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from collections import defaultdict
from collections.abc import Iterable
from dataclasses import replace

Expand All @@ -11,29 +12,25 @@
from databricks.labs.ucx.hive_metastore.ownership import TableOwnership
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.install import Historical
from databricks.labs.ucx.source_code.base import UsedTable
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler


logger = logging.getLogger(__name__)


class TableProgressEncoder(ProgressEncoder[Table]):
"""Encoder class:Table to class:History.
A progress failure for a table means:
- the table is not migrated yet
- the associated grants have a failure
"""
"""Encoder class:Table to class:History."""

def __init__(
self,
sql_backend: SqlBackend,
ownership: TableOwnership,
migration_status_refresher: CrawlerBase[TableMigrationStatus],
used_tables_crawlers: list[UsedTablesCrawler],
run_id: int,
workspace_id: int,
catalog: str,
schema: str = "multiworkspace",
table: str = "historical",
) -> None:
super().__init__(
sql_backend,
Expand All @@ -42,27 +39,48 @@ def __init__(
run_id,
workspace_id,
catalog,
schema,
table,
"multiworkspace",
"historical",
)
self._migration_status_refresher = migration_status_refresher
self._used_tables_crawlers = used_tables_crawlers

def append_inventory_snapshot(self, snapshot: Iterable[Table]) -> None:
migration_index = TableMigrationIndex(self._migration_status_refresher.snapshot())
history_records = [self._encode_table_as_historical(record, migration_index) for record in snapshot]
used_hive_tables = self._get_used_hive_tables()
history_records = []
for record in snapshot:
history_record = self._encode_table_as_historical(record, migration_index, used_hive_tables)
history_records.append(history_record)
logger.debug(f"Appending {len(history_records)} {self._klass} table record(s) to history.")
# The mode is 'append'. This is documented as conflict-free.
self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append")

def _encode_table_as_historical(self, record: Table, migration_index: TableMigrationIndex) -> Historical:
"""Encode a table record, enriching with the migration status.
def _get_used_hive_tables(self) -> dict[str, list[UsedTable]]:
used_tables: dict[str, list[UsedTable]] = defaultdict(list[UsedTable])
for crawler in self._used_tables_crawlers:
for used_table in crawler.snapshot():
if used_table.catalog_name == "hive_metastore":
used_tables[used_table.full_name].append(used_table)
return used_tables

def _encode_table_as_historical(
self, record: Table, migration_index: TableMigrationIndex, used_hive_tables: dict[str, list[UsedTable]]
) -> Historical:
"""Encode a table record, enriching with the migration status and used table references.
Possible failures, the table is
- Pending migration
- A Hive table referenced by code
A table failure means that the table is pending migration. Grants are purposefully left out, because a grant
might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure
for tables that are migrated to UC with their relevant grants also being migrated.
Grants are purposefully left out, because a grant might not be mappable to UC, like `READ_METADATA`, thus
possibly resulting in false "pending migration" failure for tables that are migrated to UC with their relevant
grants also being migrated.
"""
historical = super()._encode_record_as_historical(record)
failures = []
if not migration_index.is_migrated(record.database, record.name):
failures.append("Pending migration")
for used_table in used_hive_tables.get(record.full_name, []):
failures.append(f"Used by {used_table.source_type}: {used_table.source_id}")
return replace(historical, failures=historical.failures + failures)
4 changes: 4 additions & 0 deletions src/databricks/labs/ucx/source_code/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ def parse(cls, value: str, default_schema: str, is_read=True, is_write=False) ->
catalog_name=catalog_name, schema_name=schema_name, table_name=parts[0], is_read=is_read, is_write=is_write
)

@property
def full_name(self) -> str:
return ".".join([self.catalog_name, self.schema_name, self.table_name])

catalog_name: str = SourceInfo.UNKNOWN
schema_name: str = SourceInfo.UNKNOWN
table_name: str = SourceInfo.UNKNOWN
Expand Down
62 changes: 62 additions & 0 deletions tests/integration/progress/test_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import datetime as dt

import pytest

from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.source_code.base import LineageAtom, UsedTable


@pytest.mark.parametrize("is_migrated_table", [True, False])
@pytest.mark.parametrize("is_used_table", [True, False])
def test_table_progress_encoder_table_failures(
runtime_ctx,
az_cli_ctx,
make_catalog,
is_migrated_table: bool,
is_used_table: bool,
) -> None:
failures = []
if not is_migrated_table:
failures.append("Pending migration")
if is_used_table:
failures.append("Used by NOTEBOOK: test/test.py")

az_cli_ctx.progress_tracking_installation.run()
runtime_ctx = runtime_ctx.replace(
parent_run_id=1,
sql_backend=az_cli_ctx.sql_backend,
ucx_catalog=az_cli_ctx.ucx_catalog,
)
# To set both the `upgraded_to` and `upgraded_from` table property values during table creation is not possible
# The below works because the `upgraded_to` value is not used for matching, the property only needs to be present
hive_tbl_properties = {"upgraded_to": "upgraded_to.name_does.not_matter"} if is_migrated_table else {}
hive_table_info = runtime_ctx.make_table(tbl_properties=hive_tbl_properties)
uc_tbl_properties = {"upgraded_from": hive_table_info.full_name} if is_migrated_table else {}
runtime_ctx.make_table(catalog_name=make_catalog().name, tbl_properties=uc_tbl_properties)
hive_used_table = UsedTable(
catalog_name="hive_metastore" if is_used_table else "catalog",
schema_name=hive_table_info.schema_name,
table_name=hive_table_info.name,
source_id="test/test.py",
source_timestamp=dt.datetime.now(tz=dt.timezone.utc),
source_lineage=[LineageAtom(object_type="NOTEBOOK", object_id="test/test.py")],
assessment_start_timestamp=dt.datetime.now(tz=dt.timezone.utc),
assessment_end_timestamp=dt.datetime.now(tz=dt.timezone.utc),
)
runtime_ctx.used_tables_crawler_for_paths.dump_all([hive_used_table])

hive_table = Table(
hive_table_info.catalog_name,
hive_table_info.schema_name,
hive_table_info.name,
hive_table_info.table_type.value,
hive_table_info.data_source_format.value,
)
runtime_ctx.tables_progress.append_inventory_snapshot([hive_table])

history_table_name = escape_sql_identifier(runtime_ctx.tables_progress.full_name)
records = list(runtime_ctx.sql_backend.fetch(f"SELECT * FROM {history_table_name}"))

assert len(records) == 1, "Expected one historical entry"
assert records[0].failures == failures
57 changes: 38 additions & 19 deletions tests/unit/progress/test_tables.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import datetime as dt
from unittest.mock import create_autospec

import pytest

from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore.table_migration_status import (
Expand All @@ -10,23 +9,28 @@
)
from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.progress.tables import TableProgressEncoder
from databricks.labs.ucx.source_code.base import LineageAtom, UsedTable
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler


@pytest.mark.parametrize(
"table",
[
Table("hive_metastore", "schema", "table", "MANAGED", "DELTA"),
],
)
def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None:
def test_table_progress_encoder_no_failures(mock_backend) -> None:
table = Table("hive_metastore", "schema", "table", "MANAGED", "DELTA")
ownership = create_autospec(Ownership)
ownership.owner_of.return_value = "user"
migration_status_crawler = create_autospec(TableMigrationStatusRefresher)
migration_status_crawler.snapshot.return_value = (
TableMigrationStatus(table.database, table.name, "main", "default", table.name, update_ts=None),
)
used_tables_crawler = create_autospec(UsedTablesCrawler)
used_tables_crawler.snapshot.return_value = []
encoder = TableProgressEncoder(
mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test"
mock_backend,
ownership,
migration_status_crawler,
[used_tables_crawler],
run_id=1,
workspace_id=123456789,
catalog="test",
)

encoder.append_inventory_snapshot([table])
Expand All @@ -36,29 +40,44 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None:
assert len(rows[0].failures) == 0
ownership.owner_of.assert_called_once()
migration_status_crawler.snapshot.assert_called_once()
used_tables_crawler.snapshot.assert_called_once()


@pytest.mark.parametrize(
"table",
[
Table("hive_metastore", "schema", "table", "MANAGED", "DELTA"),
],
)
def test_table_progress_encoder_pending_migration_failure(mock_backend, table: Table) -> None:
def test_table_progress_encoder_pending_migration_failure(mock_backend) -> None:
table = Table("hive_metastore", "schema", "table", "MANAGED", "DELTA")
ownership = create_autospec(Ownership)
ownership.owner_of.return_value = "user"
migration_status_crawler = create_autospec(TableMigrationStatusRefresher)
migration_status_crawler.snapshot.return_value = (
TableMigrationStatus(table.database, table.name), # No destination: therefore not yet migrated.
)
used_tables_crawler_for_paths = create_autospec(UsedTablesCrawler)
used_table = UsedTable(
catalog_name=table.catalog,
schema_name=table.database,
table_name=table.name,
source_id="test/test.py",
source_timestamp=dt.datetime.now(tz=dt.timezone.utc),
source_lineage=[LineageAtom(object_type="NOTEBOOK", object_id="test/test.py")],
assessment_start_timestamp=dt.datetime.now(tz=dt.timezone.utc),
assessment_end_timestamp=dt.datetime.now(tz=dt.timezone.utc),
)
used_tables_crawler_for_paths.snapshot.return_value = [used_table]
encoder = TableProgressEncoder(
mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test"
mock_backend,
ownership,
migration_status_crawler,
[used_tables_crawler_for_paths],
run_id=1,
workspace_id=123456789,
catalog="test",
)

encoder.append_inventory_snapshot([table])

rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append")
assert len(rows) > 0, f"No rows written for: {encoder.full_name}"
assert rows[0].failures == ["Pending migration"]
assert rows[0].failures == ["Pending migration", "Used by NOTEBOOK: test/test.py"]
ownership.owner_of.assert_called_once()
migration_status_crawler.snapshot.assert_called_once()
used_tables_crawler_for_paths.snapshot.assert_called_once()
14 changes: 14 additions & 0 deletions tests/unit/source_code/test_base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import dataclasses

import pytest

from databricks.labs.ucx.source_code.base import (
Advice,
Advisory,
Convention,
Deprecation,
Failure,
UsedTable,
)


Expand Down Expand Up @@ -40,3 +43,14 @@ def test_deprecation_initialization() -> None:
def test_convention_initialization() -> None:
convention = Convention('code5', 'This is a convention', 1, 1, 2, 2)
assert isinstance(convention, Advice)


@pytest.mark.parametrize(
"used_table, expected_name",
[
(UsedTable(), "unknown.unknown.unknown"),
(UsedTable(catalog_name="catalog", schema_name="schema", table_name="table"), "catalog.schema.table"),
],
)
def test_used_table_full_name(used_table: UsedTable, expected_name: str) -> None:
assert used_table.full_name == expected_name

0 comments on commit bccc103

Please sign in to comment.