From b389e3d63480b592435b6e07b5b32a5e9fb9c341 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 26 Nov 2024 09:57:14 +0100 Subject: [PATCH] Rewrite to not use a cached_property on TableProgressEncoder --- src/databricks/labs/ucx/progress/tables.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/databricks/labs/ucx/progress/tables.py b/src/databricks/labs/ucx/progress/tables.py index db7ddbb478..d1d6eab1a9 100644 --- a/src/databricks/labs/ucx/progress/tables.py +++ b/src/databricks/labs/ucx/progress/tables.py @@ -2,7 +2,6 @@ from collections import defaultdict from collections.abc import Iterable from dataclasses import replace -from functools import cached_property from databricks.labs.lsql.backends import SqlBackend @@ -48,13 +47,16 @@ def __init__( def append_inventory_snapshot(self, snapshot: Iterable[Table]) -> None: migration_index = TableMigrationIndex(self._migration_status_refresher.snapshot()) - history_records = [self._encode_table_as_historical(record, migration_index) for record in snapshot] + used_hive_tables = self._get_used_hive_tables() + history_records = [] + for record in snapshot: + history_record = self._encode_table_as_historical(record, migration_index, used_hive_tables) + history_records.append(history_record) logger.debug(f"Appending {len(history_records)} {self._klass} table record(s) to history.") # The mode is 'append'. This is documented as conflict-free. self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append") - @cached_property - def _used_hive_tables(self) -> dict[str, list[UsedTable]]: + def _get_used_hive_tables(self) -> dict[str, list[UsedTable]]: used_tables: dict[str, list[UsedTable]] = defaultdict(list[UsedTable]) for crawler in self._used_tables_crawlers: for used_table in crawler.snapshot(): @@ -62,7 +64,9 @@ def _used_hive_tables(self) -> dict[str, list[UsedTable]]: used_tables[used_table.full_name].append(used_table) return used_tables - def _encode_table_as_historical(self, record: Table, migration_index: TableMigrationIndex) -> Historical: + def _encode_table_as_historical( + self, record: Table, migration_index: TableMigrationIndex, used_hive_tables: dict[str, list[UsedTable]] + ) -> Historical: """Encode a table record, enriching with the migration status and used table references. Possible failures, the table is @@ -77,6 +81,6 @@ def _encode_table_as_historical(self, record: Table, migration_index: TableMigra failures = [] if not migration_index.is_migrated(record.database, record.name): failures.append("Pending migration") - for used_table in self._used_hive_tables.get(record.full_name, []): + for used_table in used_hive_tables.get(record.full_name, []): failures.append(f"Used by {used_table.source_type}: {used_table.source_id}") return replace(historical, failures=historical.failures + failures)