Skip to content

Commit

Permalink
Switch to specialisation (limited to TableProgressEncoder) for ensuri…
Browse files Browse the repository at this point in the history
…ng the migration index is available during encoding.
  • Loading branch information
asnare committed Nov 12, 2024
1 parent 78787df commit 788789f
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 40 deletions.
4 changes: 2 additions & 2 deletions src/databricks/labs/ucx/progress/grants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def __init__(
) -> None:
super().__init__(sql_backend, ownership, Grant, run_id, workspace_id, catalog, schema, table)

def _encode_record_as_historical(self, record: Grant, snapshot_context: None) -> Historical:
historical = super()._encode_record_as_historical(record, snapshot_context)
def _encode_record_as_historical(self, record: Grant) -> Historical:
historical = super()._encode_record_as_historical(record)
failures = []
if not record.uc_grant_sql():
type_, key = record.this_type_and_key()
Expand Down
31 changes: 7 additions & 24 deletions src/databricks/labs/ucx/progress/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
import datetime as dt
import json
import logging
from contextlib import contextmanager
from enum import Enum, EnumMeta
from collections.abc import Iterable, Sequence, Generator
from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints, final
from collections.abc import Iterable, Sequence
from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints

from databricks.labs.lsql.backends import SqlBackend

Expand Down Expand Up @@ -280,29 +279,13 @@ def __init__(
def full_name(self) -> str:
return f"{self._catalog}.{self._schema}.{self._table}"

@final
def append_inventory_snapshot(self, snapshot: Iterable[Record]) -> None:
with self._snapshot_context() as ctx:
history_records = [self._encode_record_as_historical(record, ctx) for record in snapshot]
logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.")
# This is the only writer, and the mode is 'append'. This is documented as conflict-free.
self._sql_backend.save_table(
escape_sql_identifier(self.full_name), history_records, Historical, mode="append"
)

SC: ClassVar = type(None)

@contextmanager
def _snapshot_context(self) -> Generator[SC, None, None]:
"""A context manager that is held open while a snapshot is underway.
The context itself is passed as a parameter to :method:`_encode_record_as_historical`. As a manager, preparation
and cleanup can take place before and after the snapshot takes place.
"""
yield
history_records = [self._encode_record_as_historical(record) for record in snapshot]
logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.")
# This is the only writer, and the mode is 'append'. This is documented as conflict-free.
self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append")

def _encode_record_as_historical(self, record: Record, snapshot_context: SC) -> Historical:
def _encode_record_as_historical(self, record: Record) -> Historical:
"""Encode a snapshot record as a historical log entry."""
# Snapshot context not needed with default implementation.
_ = snapshot_context
return self._encoder.to_historical(record)
4 changes: 2 additions & 2 deletions src/databricks/labs/ucx/progress/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _job_problems(self) -> dict[int, list[str]]:
index[job_problem.job_id].append(failure)
return index

def _encode_record_as_historical(self, record: JobInfo, snapshot_context: None) -> Historical:
historical = super()._encode_record_as_historical(record, snapshot_context)
def _encode_record_as_historical(self, record: JobInfo) -> Historical:
historical = super()._encode_record_as_historical(record)
failures = self._job_problems.get(int(record.job_id), [])
return replace(historical, failures=historical.failures + failures)
27 changes: 15 additions & 12 deletions src/databricks/labs/ucx/progress/tables.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
from collections.abc import Generator
from contextlib import contextmanager
import logging
from collections.abc import Iterable
from dataclasses import replace
from typing import ClassVar

from databricks.labs.lsql.backends import SqlBackend
from databricks.labs.ucx.framework.crawlers import CrawlerBase
from databricks.labs.ucx.framework.utils import escape_sql_identifier

from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationStatus
from databricks.labs.ucx.hive_metastore.ownership import TableOwnership
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.install import Historical

logger = logging.getLogger(__name__)


class TableProgressEncoder(ProgressEncoder[Table]):
"""Encoder class:Table to class:History.
Expand Down Expand Up @@ -44,21 +46,22 @@ def __init__(
)
self._migration_status_refresher = migration_status_refresher

SC: ClassVar = TableMigrationIndex

@contextmanager
def _snapshot_context(self) -> Generator[SC, None, None]:
yield TableMigrationIndex(self._migration_status_refresher.snapshot())
def append_inventory_snapshot(self, snapshot: Iterable[Table]) -> None:
migration_index = TableMigrationIndex(self._migration_status_refresher.snapshot())
history_records = [self._encode_table_as_historical(record, migration_index) for record in snapshot]
logger.debug(f"Appending {len(history_records)} {self._klass} table record(s) to history.")
# This is the only writer, and the mode is 'append'. This is documented as conflict-free.
self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append")

def _encode_record_as_historical(self, record: Table, snapshot_context: SC) -> Historical:
"""Encode record as historical.
def _encode_table_as_historical(self, record: Table, migration_index: TableMigrationIndex) -> Historical:
"""Encode a table record, enriching with the migration status.
A table failure means that the table is pending migration. Grants are purposefully left out, because a grant
might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure
for tables that are migrated to UC with their relevant grants also being migrated.
"""
historical = super()._encode_record_as_historical(record, snapshot_context=None)
historical = super()._encode_record_as_historical(record)
failures = []
if not snapshot_context.is_migrated(record.database, record.name):
if not migration_index.is_migrated(record.database, record.name):
failures.append("Pending migration")
return replace(historical, failures=historical.failures + failures)

0 comments on commit 788789f

Please sign in to comment.