diff --git a/src/databricks/labs/ucx/progress/grants.py b/src/databricks/labs/ucx/progress/grants.py index a393d31d87..5807799fa2 100644 --- a/src/databricks/labs/ucx/progress/grants.py +++ b/src/databricks/labs/ucx/progress/grants.py @@ -25,8 +25,8 @@ def __init__( ) -> None: super().__init__(sql_backend, ownership, Grant, run_id, workspace_id, catalog, schema, table) - def _encode_record_as_historical(self, record: Grant, snapshot_context: None) -> Historical: - historical = super()._encode_record_as_historical(record, snapshot_context) + def _encode_record_as_historical(self, record: Grant) -> Historical: + historical = super()._encode_record_as_historical(record) failures = [] if not record.uc_grant_sql(): type_, key = record.this_type_and_key() diff --git a/src/databricks/labs/ucx/progress/history.py b/src/databricks/labs/ucx/progress/history.py index 41ba114509..e2072ec8aa 100644 --- a/src/databricks/labs/ucx/progress/history.py +++ b/src/databricks/labs/ucx/progress/history.py @@ -3,10 +3,9 @@ import datetime as dt import json import logging -from contextlib import contextmanager from enum import Enum, EnumMeta -from collections.abc import Iterable, Sequence, Generator -from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints, final +from collections.abc import Iterable, Sequence +from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints from databricks.labs.lsql.backends import SqlBackend @@ -280,29 +279,13 @@ def __init__( def full_name(self) -> str: return f"{self._catalog}.{self._schema}.{self._table}" - @final def append_inventory_snapshot(self, snapshot: Iterable[Record]) -> None: - with self._snapshot_context() as ctx: - history_records = [self._encode_record_as_historical(record, ctx) for record in snapshot] - logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.") - # This is the only writer, and the mode is 'append'. This is documented as conflict-free. - self._sql_backend.save_table( - escape_sql_identifier(self.full_name), history_records, Historical, mode="append" - ) - - SC: ClassVar = type(None) - - @contextmanager - def _snapshot_context(self) -> Generator[SC, None, None]: - """A context manager that is held open while a snapshot is underway. - - The context itself is passed as a parameter to :method:`_encode_record_as_historical`. As a manager, preparation - and cleanup can take place before and after the snapshot takes place. - """ - yield + history_records = [self._encode_record_as_historical(record) for record in snapshot] + logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.") + # This is the only writer, and the mode is 'append'. This is documented as conflict-free. + self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append") - def _encode_record_as_historical(self, record: Record, snapshot_context: SC) -> Historical: + def _encode_record_as_historical(self, record: Record) -> Historical: """Encode a snapshot record as a historical log entry.""" # Snapshot context not needed with default implementation. - _ = snapshot_context return self._encoder.to_historical(record) diff --git a/src/databricks/labs/ucx/progress/jobs.py b/src/databricks/labs/ucx/progress/jobs.py index 17e1471569..198139543c 100644 --- a/src/databricks/labs/ucx/progress/jobs.py +++ b/src/databricks/labs/ucx/progress/jobs.py @@ -48,7 +48,7 @@ def _job_problems(self) -> dict[int, list[str]]: index[job_problem.job_id].append(failure) return index - def _encode_record_as_historical(self, record: JobInfo, snapshot_context: None) -> Historical: - historical = super()._encode_record_as_historical(record, snapshot_context) + def _encode_record_as_historical(self, record: JobInfo) -> Historical: + historical = super()._encode_record_as_historical(record) failures = self._job_problems.get(int(record.job_id), []) return replace(historical, failures=historical.failures + failures) diff --git a/src/databricks/labs/ucx/progress/tables.py b/src/databricks/labs/ucx/progress/tables.py index 6550d1526f..87671e72c1 100644 --- a/src/databricks/labs/ucx/progress/tables.py +++ b/src/databricks/labs/ucx/progress/tables.py @@ -1,10 +1,10 @@ -from collections.abc import Generator -from contextlib import contextmanager +import logging +from collections.abc import Iterable from dataclasses import replace -from typing import ClassVar from databricks.labs.lsql.backends import SqlBackend from databricks.labs.ucx.framework.crawlers import CrawlerBase +from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore.tables import Table from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationStatus @@ -12,6 +12,8 @@ from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.install import Historical +logger = logging.getLogger(__name__) + class TableProgressEncoder(ProgressEncoder[Table]): """Encoder class:Table to class:History. @@ -44,21 +46,22 @@ def __init__( ) self._migration_status_refresher = migration_status_refresher - SC: ClassVar = TableMigrationIndex - - @contextmanager - def _snapshot_context(self) -> Generator[SC, None, None]: - yield TableMigrationIndex(self._migration_status_refresher.snapshot()) + def append_inventory_snapshot(self, snapshot: Iterable[Table]) -> None: + migration_index = TableMigrationIndex(self._migration_status_refresher.snapshot()) + history_records = [self._encode_table_as_historical(record, migration_index) for record in snapshot] + logger.debug(f"Appending {len(history_records)} {self._klass} table record(s) to history.") + # This is the only writer, and the mode is 'append'. This is documented as conflict-free. + self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append") - def _encode_record_as_historical(self, record: Table, snapshot_context: SC) -> Historical: - """Encode record as historical. + def _encode_table_as_historical(self, record: Table, migration_index: TableMigrationIndex) -> Historical: + """Encode a table record, enriching with the migration status. A table failure means that the table is pending migration. Grants are purposefully left out, because a grant might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure for tables that are migrated to UC with their relevant grants also being migrated. """ - historical = super()._encode_record_as_historical(record, snapshot_context=None) + historical = super()._encode_record_as_historical(record) failures = [] - if not snapshot_context.is_migrated(record.database, record.name): + if not migration_index.is_migrated(record.database, record.name): failures.append("Pending migration") return replace(historical, failures=historical.failures + failures)