Skip to content

Commit

Permalink
Rewrite to not use a cached_property on TableProgressEncoder
Browse files Browse the repository at this point in the history
  • Loading branch information
JCZuurmond committed Nov 26, 2024
1 parent 9b9db61 commit b389e3d
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions src/databricks/labs/ucx/progress/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from collections import defaultdict
from collections.abc import Iterable
from dataclasses import replace
from functools import cached_property

from databricks.labs.lsql.backends import SqlBackend

Expand Down Expand Up @@ -48,21 +47,26 @@ def __init__(

def append_inventory_snapshot(self, snapshot: Iterable[Table]) -> None:
migration_index = TableMigrationIndex(self._migration_status_refresher.snapshot())
history_records = [self._encode_table_as_historical(record, migration_index) for record in snapshot]
used_hive_tables = self._get_used_hive_tables()
history_records = []
for record in snapshot:
history_record = self._encode_table_as_historical(record, migration_index, used_hive_tables)
history_records.append(history_record)
logger.debug(f"Appending {len(history_records)} {self._klass} table record(s) to history.")
# The mode is 'append'. This is documented as conflict-free.
self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append")

@cached_property
def _used_hive_tables(self) -> dict[str, list[UsedTable]]:
def _get_used_hive_tables(self) -> dict[str, list[UsedTable]]:
used_tables: dict[str, list[UsedTable]] = defaultdict(list[UsedTable])
for crawler in self._used_tables_crawlers:
for used_table in crawler.snapshot():
if used_table.catalog_name == "hive_metastore":
used_tables[used_table.full_name].append(used_table)
return used_tables

def _encode_table_as_historical(self, record: Table, migration_index: TableMigrationIndex) -> Historical:
def _encode_table_as_historical(
self, record: Table, migration_index: TableMigrationIndex, used_hive_tables: dict[str, list[UsedTable]]
) -> Historical:
"""Encode a table record, enriching with the migration status and used table references.
Possible failures, the table is
Expand All @@ -77,6 +81,6 @@ def _encode_table_as_historical(self, record: Table, migration_index: TableMigra
failures = []
if not migration_index.is_migrated(record.database, record.name):
failures.append("Pending migration")
for used_table in self._used_hive_tables.get(record.full_name, []):
for used_table in used_hive_tables.get(record.full_name, []):
failures.append(f"Used by {used_table.source_type}: {used_table.source_id}")
return replace(historical, failures=historical.failures + failures)

0 comments on commit b389e3d

Please sign in to comment.