diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index bb8db8cf32..8b6313ad0b 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -226,6 +226,7 @@ def tables_progress(self) -> ProgressEncoder[Table]: self.sql_backend, self.table_ownership, self.migration_status_refresher, + [self.used_tables_crawler_for_paths, self.used_tables_crawler_for_queries], self.parent_run_id, self.workspace_id, self.config.ucx_catalog, diff --git a/src/databricks/labs/ucx/progress/tables.py b/src/databricks/labs/ucx/progress/tables.py index 4922e8efd2..d1d6eab1a9 100644 --- a/src/databricks/labs/ucx/progress/tables.py +++ b/src/databricks/labs/ucx/progress/tables.py @@ -1,4 +1,5 @@ import logging +from collections import defaultdict from collections.abc import Iterable from dataclasses import replace @@ -11,29 +12,25 @@ from databricks.labs.ucx.hive_metastore.ownership import TableOwnership from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.install import Historical +from databricks.labs.ucx.source_code.base import UsedTable +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler logger = logging.getLogger(__name__) class TableProgressEncoder(ProgressEncoder[Table]): - """Encoder class:Table to class:History. - - A progress failure for a table means: - - the table is not migrated yet - - the associated grants have a failure - """ + """Encoder class:Table to class:History.""" def __init__( self, sql_backend: SqlBackend, ownership: TableOwnership, migration_status_refresher: CrawlerBase[TableMigrationStatus], + used_tables_crawlers: list[UsedTablesCrawler], run_id: int, workspace_id: int, catalog: str, - schema: str = "multiworkspace", - table: str = "historical", ) -> None: super().__init__( sql_backend, @@ -42,27 +39,48 @@ def __init__( run_id, workspace_id, catalog, - schema, - table, + "multiworkspace", + "historical", ) self._migration_status_refresher = migration_status_refresher + self._used_tables_crawlers = used_tables_crawlers def append_inventory_snapshot(self, snapshot: Iterable[Table]) -> None: migration_index = TableMigrationIndex(self._migration_status_refresher.snapshot()) - history_records = [self._encode_table_as_historical(record, migration_index) for record in snapshot] + used_hive_tables = self._get_used_hive_tables() + history_records = [] + for record in snapshot: + history_record = self._encode_table_as_historical(record, migration_index, used_hive_tables) + history_records.append(history_record) logger.debug(f"Appending {len(history_records)} {self._klass} table record(s) to history.") # The mode is 'append'. This is documented as conflict-free. self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append") - def _encode_table_as_historical(self, record: Table, migration_index: TableMigrationIndex) -> Historical: - """Encode a table record, enriching with the migration status. + def _get_used_hive_tables(self) -> dict[str, list[UsedTable]]: + used_tables: dict[str, list[UsedTable]] = defaultdict(list[UsedTable]) + for crawler in self._used_tables_crawlers: + for used_table in crawler.snapshot(): + if used_table.catalog_name == "hive_metastore": + used_tables[used_table.full_name].append(used_table) + return used_tables + + def _encode_table_as_historical( + self, record: Table, migration_index: TableMigrationIndex, used_hive_tables: dict[str, list[UsedTable]] + ) -> Historical: + """Encode a table record, enriching with the migration status and used table references. + + Possible failures, the table is + - Pending migration + - A Hive table referenced by code - A table failure means that the table is pending migration. Grants are purposefully left out, because a grant - might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure - for tables that are migrated to UC with their relevant grants also being migrated. + Grants are purposefully left out, because a grant might not be mappable to UC, like `READ_METADATA`, thus + possibly resulting in false "pending migration" failure for tables that are migrated to UC with their relevant + grants also being migrated. """ historical = super()._encode_record_as_historical(record) failures = [] if not migration_index.is_migrated(record.database, record.name): failures.append("Pending migration") + for used_table in used_hive_tables.get(record.full_name, []): + failures.append(f"Used by {used_table.source_type}: {used_table.source_id}") return replace(historical, failures=historical.failures + failures) diff --git a/src/databricks/labs/ucx/source_code/base.py b/src/databricks/labs/ucx/source_code/base.py index 659b38b2b7..a854540208 100644 --- a/src/databricks/labs/ucx/source_code/base.py +++ b/src/databricks/labs/ucx/source_code/base.py @@ -260,6 +260,10 @@ def parse(cls, value: str, default_schema: str, is_read=True, is_write=False) -> catalog_name=catalog_name, schema_name=schema_name, table_name=parts[0], is_read=is_read, is_write=is_write ) + @property + def full_name(self) -> str: + return ".".join([self.catalog_name, self.schema_name, self.table_name]) + catalog_name: str = SourceInfo.UNKNOWN schema_name: str = SourceInfo.UNKNOWN table_name: str = SourceInfo.UNKNOWN diff --git a/tests/integration/progress/test_tables.py b/tests/integration/progress/test_tables.py new file mode 100644 index 0000000000..69cd412c09 --- /dev/null +++ b/tests/integration/progress/test_tables.py @@ -0,0 +1,62 @@ +import datetime as dt + +import pytest + +from databricks.labs.ucx.framework.utils import escape_sql_identifier +from databricks.labs.ucx.hive_metastore.tables import Table +from databricks.labs.ucx.source_code.base import LineageAtom, UsedTable + + +@pytest.mark.parametrize("is_migrated_table", [True, False]) +@pytest.mark.parametrize("is_used_table", [True, False]) +def test_table_progress_encoder_table_failures( + runtime_ctx, + az_cli_ctx, + make_catalog, + is_migrated_table: bool, + is_used_table: bool, +) -> None: + failures = [] + if not is_migrated_table: + failures.append("Pending migration") + if is_used_table: + failures.append("Used by NOTEBOOK: test/test.py") + + az_cli_ctx.progress_tracking_installation.run() + runtime_ctx = runtime_ctx.replace( + parent_run_id=1, + sql_backend=az_cli_ctx.sql_backend, + ucx_catalog=az_cli_ctx.ucx_catalog, + ) + # To set both the `upgraded_to` and `upgraded_from` table property values during table creation is not possible + # The below works because the `upgraded_to` value is not used for matching, the property only needs to be present + hive_tbl_properties = {"upgraded_to": "upgraded_to.name_does.not_matter"} if is_migrated_table else {} + hive_table_info = runtime_ctx.make_table(tbl_properties=hive_tbl_properties) + uc_tbl_properties = {"upgraded_from": hive_table_info.full_name} if is_migrated_table else {} + runtime_ctx.make_table(catalog_name=make_catalog().name, tbl_properties=uc_tbl_properties) + hive_used_table = UsedTable( + catalog_name="hive_metastore" if is_used_table else "catalog", + schema_name=hive_table_info.schema_name, + table_name=hive_table_info.name, + source_id="test/test.py", + source_timestamp=dt.datetime.now(tz=dt.timezone.utc), + source_lineage=[LineageAtom(object_type="NOTEBOOK", object_id="test/test.py")], + assessment_start_timestamp=dt.datetime.now(tz=dt.timezone.utc), + assessment_end_timestamp=dt.datetime.now(tz=dt.timezone.utc), + ) + runtime_ctx.used_tables_crawler_for_paths.dump_all([hive_used_table]) + + hive_table = Table( + hive_table_info.catalog_name, + hive_table_info.schema_name, + hive_table_info.name, + hive_table_info.table_type.value, + hive_table_info.data_source_format.value, + ) + runtime_ctx.tables_progress.append_inventory_snapshot([hive_table]) + + history_table_name = escape_sql_identifier(runtime_ctx.tables_progress.full_name) + records = list(runtime_ctx.sql_backend.fetch(f"SELECT * FROM {history_table_name}")) + + assert len(records) == 1, "Expected one historical entry" + assert records[0].failures == failures diff --git a/tests/unit/progress/test_tables.py b/tests/unit/progress/test_tables.py index baf08de26d..ef6447cdb5 100644 --- a/tests/unit/progress/test_tables.py +++ b/tests/unit/progress/test_tables.py @@ -1,7 +1,6 @@ +import datetime as dt from unittest.mock import create_autospec -import pytest - from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore.table_migration_status import ( @@ -10,23 +9,28 @@ ) from databricks.labs.ucx.hive_metastore.tables import Table from databricks.labs.ucx.progress.tables import TableProgressEncoder +from databricks.labs.ucx.source_code.base import LineageAtom, UsedTable +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler -@pytest.mark.parametrize( - "table", - [ - Table("hive_metastore", "schema", "table", "MANAGED", "DELTA"), - ], -) -def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None: +def test_table_progress_encoder_no_failures(mock_backend) -> None: + table = Table("hive_metastore", "schema", "table", "MANAGED", "DELTA") ownership = create_autospec(Ownership) ownership.owner_of.return_value = "user" migration_status_crawler = create_autospec(TableMigrationStatusRefresher) migration_status_crawler.snapshot.return_value = ( TableMigrationStatus(table.database, table.name, "main", "default", table.name, update_ts=None), ) + used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.snapshot.return_value = [] encoder = TableProgressEncoder( - mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test" + mock_backend, + ownership, + migration_status_crawler, + [used_tables_crawler], + run_id=1, + workspace_id=123456789, + catalog="test", ) encoder.append_inventory_snapshot([table]) @@ -36,29 +40,44 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None: assert len(rows[0].failures) == 0 ownership.owner_of.assert_called_once() migration_status_crawler.snapshot.assert_called_once() + used_tables_crawler.snapshot.assert_called_once() -@pytest.mark.parametrize( - "table", - [ - Table("hive_metastore", "schema", "table", "MANAGED", "DELTA"), - ], -) -def test_table_progress_encoder_pending_migration_failure(mock_backend, table: Table) -> None: +def test_table_progress_encoder_pending_migration_failure(mock_backend) -> None: + table = Table("hive_metastore", "schema", "table", "MANAGED", "DELTA") ownership = create_autospec(Ownership) ownership.owner_of.return_value = "user" migration_status_crawler = create_autospec(TableMigrationStatusRefresher) migration_status_crawler.snapshot.return_value = ( TableMigrationStatus(table.database, table.name), # No destination: therefore not yet migrated. ) + used_tables_crawler_for_paths = create_autospec(UsedTablesCrawler) + used_table = UsedTable( + catalog_name=table.catalog, + schema_name=table.database, + table_name=table.name, + source_id="test/test.py", + source_timestamp=dt.datetime.now(tz=dt.timezone.utc), + source_lineage=[LineageAtom(object_type="NOTEBOOK", object_id="test/test.py")], + assessment_start_timestamp=dt.datetime.now(tz=dt.timezone.utc), + assessment_end_timestamp=dt.datetime.now(tz=dt.timezone.utc), + ) + used_tables_crawler_for_paths.snapshot.return_value = [used_table] encoder = TableProgressEncoder( - mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test" + mock_backend, + ownership, + migration_status_crawler, + [used_tables_crawler_for_paths], + run_id=1, + workspace_id=123456789, + catalog="test", ) encoder.append_inventory_snapshot([table]) rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append") assert len(rows) > 0, f"No rows written for: {encoder.full_name}" - assert rows[0].failures == ["Pending migration"] + assert rows[0].failures == ["Pending migration", "Used by NOTEBOOK: test/test.py"] ownership.owner_of.assert_called_once() migration_status_crawler.snapshot.assert_called_once() + used_tables_crawler_for_paths.snapshot.assert_called_once() diff --git a/tests/unit/source_code/test_base.py b/tests/unit/source_code/test_base.py index e1e1cebde5..7b14b9a7e3 100644 --- a/tests/unit/source_code/test_base.py +++ b/tests/unit/source_code/test_base.py @@ -1,11 +1,14 @@ import dataclasses +import pytest + from databricks.labs.ucx.source_code.base import ( Advice, Advisory, Convention, Deprecation, Failure, + UsedTable, ) @@ -40,3 +43,14 @@ def test_deprecation_initialization() -> None: def test_convention_initialization() -> None: convention = Convention('code5', 'This is a convention', 1, 1, 2, 2) assert isinstance(convention, Advice) + + +@pytest.mark.parametrize( + "used_table, expected_name", + [ + (UsedTable(), "unknown.unknown.unknown"), + (UsedTable(catalog_name="catalog", schema_name="schema", table_name="table"), "catalog.schema.table"), + ], +) +def test_used_table_full_name(used_table: UsedTable, expected_name: str) -> None: + assert used_table.full_name == expected_name