Skip to content

Commit

Permalink
Update table-migration workflows to also capture updated migration pr…
Browse files Browse the repository at this point in the history
…ogress into the history log (#3239)

## Changes

The table-migration workflows already contained tasks at the end that
log information about tables that still need to be migrated. The primary
purpose of this PR is to update these workflows so they also capture
updated progress information into the history log.

Other changes include:

 - Updating the documentation for which workflows update which tables.
- ~Updating the (singleton) encoder for table-history so that
initialisation doesn't trigger an implicit refresh of the
`TableMigrationStatus` data. Instead this is controlled at the workflow
level, as intended.~ Moved to #3270.

### Linked issues

~Conflicts with #3200 (will need rebasing).~ (Resolved.)

### Functionality

- updated documentation
- modified existing workflows:

  - `migrate-tables`
  - `migrate-external-hiveserde-tables-in-place-experimental`
  - `migrate-external-tables-ctas`
  - `scan-tables-in-mounts-experimental`
  - `migrate-tables-in-mounts-experimental`

### Tests

- manually tested
- updated and new unit tests
- updated and new integration tests

---------

Co-authored-by: Serge Smertin <[email protected]>
Co-authored-by: Cor Zuurmond <[email protected]>
  • Loading branch information
3 people authored Jan 3, 2025
1 parent a77ca8b commit 36752ed
Show file tree
Hide file tree
Showing 8 changed files with 467 additions and 90 deletions.
4 changes: 2 additions & 2 deletions docs/table_persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Table utilization per workflow:

| Table | Generate Assessment | Update Migration Progress | Migrate Groups | Migrate External Tables | Upgrade Jobs | Migrate tables | Migrate Data Reconciliation |
|--------------------------|---------------------|---------------------------|----------------|-------------------------|--------------|----------------|-----------------------------|
| tables | RW | RW | | RO | | RO | |
| tables | RW | RW | | RW | | RW | |
| grants | RW | RW | | RW | | RW | |
| mounts | RW | | | RO | RO | RO | |
| permissions | RW | | RW | RO | | RO | |
Expand All @@ -30,7 +30,7 @@ Table utilization per workflow:
| query_problems | RW | RW | | | | | |
| workflow_problems | RW | RW | | | | | |
| udfs | RW | RW | RO | | | | |
| logs | RW | | RW | RW | | RW | RW |
| logs | RW | RW | RW | RW | RW | RW | RW |
| recon_results | | | | | | | RW |
| redash_dashboards | RW | | | | | | RW |
| lakeview_dashboards | RW | | | | | | RW |
Expand Down
15 changes: 8 additions & 7 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import re
from collections import defaultdict
from collections.abc import Iterable
from functools import partial, cached_property

from databricks.labs.blueprint.parallel import Threads
Expand All @@ -18,8 +19,11 @@
TableMapping,
TableToMigrate,
)

from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher, TableMigrationIndex
from databricks.labs.ucx.hive_metastore.table_migration_status import (
TableMigrationStatusRefresher,
TableMigrationStatus,
TableMigrationIndex,
)
from databricks.labs.ucx.hive_metastore.tables import (
MigrationCount,
Table,
Expand Down Expand Up @@ -56,14 +60,11 @@ def __init__(
self._migrate_grants = migrate_grants
self._external_locations = external_locations

def get_remaining_tables(self) -> list[Table]:
migration_index = self.index(force_refresh=True)
table_rows = []
def warn_about_remaining_non_migrated_tables(self, migration_statuses: Iterable[TableMigrationStatus]) -> None:
migration_index = TableMigrationIndex(migration_statuses)
for crawled_table in self._tables_crawler.snapshot():
if not migration_index.is_migrated(crawled_table.database, crawled_table.name):
table_rows.append(crawled_table)
logger.warning(f"remained-hive-metastore-table: {crawled_table.key}")
return table_rows

def index(self, *, force_refresh: bool = False):
return self._migration_status_refresher.index(force_refresh=force_refresh)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, tables_
self._tables_crawler = tables_crawler

def index(self, *, force_refresh: bool = False) -> TableMigrationIndex:
return TableMigrationIndex(list(self.snapshot(force_refresh=force_refresh)))
return TableMigrationIndex(self.snapshot(force_refresh=force_refresh))

def get_seen_tables(self) -> dict[str, str]:
seen_tables: dict[str, str] = {}
Expand Down
220 changes: 200 additions & 20 deletions src/databricks/labs/ucx/hive_metastore/workflows.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime as dt

from databricks.labs.ucx.assessment.workflows import Assessment
from databricks.labs.ucx.contexts.workflow_task import RuntimeContext
from databricks.labs.ucx.framework.tasks import Workflow, job_task
Expand Down Expand Up @@ -57,10 +59,53 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW)

@job_task(job_cluster="user_isolation", depends_on=[migrate_views])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="user_isolation")
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))

@job_task(
depends_on=[
convert_managed_table,
migrate_external_tables_sync,
migrate_dbfs_root_delta_tables,
migrate_dbfs_root_non_delta_tables,
migrate_views,
verify_progress_tracking_prerequisites,
],
)
def update_table_inventory(self, ctx: RuntimeContext) -> None:
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not
# UC-enabled, so we cannot both snapshot and update the history log from the same location.
# Step 1 of 3: Just refresh the tables inventory.
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(depends_on=[verify_progress_tracking_prerequisites, update_table_inventory], job_cluster="user_isolation")
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress)

@job_task(
depends_on=[verify_progress_tracking_prerequisites, update_migration_status], job_cluster="user_isolation"
)
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the
# history log.
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
ctx.tables_progress.append_inventory_snapshot(tables_snapshot)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]
)
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class MigrateHiveSerdeTablesInPlace(Workflow):
Expand All @@ -86,10 +131,44 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW)

@job_task(job_cluster="user_isolation", depends_on=[migrate_views])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="user_isolation")
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))

@job_task(depends_on=[verify_progress_tracking_prerequisites, migrate_views])
def update_table_inventory(self, ctx: RuntimeContext) -> None:
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not
# UC-enabled, so we cannot both snapshot and update the history log from the same location.
# Step 1 of 3: Just refresh the tables inventory.
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory])
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]
)
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the
# history log.
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
ctx.tables_progress.append_inventory_snapshot(tables_snapshot)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]
)
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class MigrateExternalTablesCTAS(Workflow):
Expand Down Expand Up @@ -120,10 +199,51 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW)

@job_task(job_cluster="user_isolation", depends_on=[migrate_views])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="user_isolation")
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))

@job_task(
depends_on=[
verify_progress_tracking_prerequisites,
migrate_views,
migrate_hive_serde_ctas,
migrate_other_external_ctas,
]
)
def update_table_inventory(self, ctx: RuntimeContext) -> None:
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not
# UC-enabled, so cannot both snapshot and update the history log from the same location.
# Step 1 of 3: Just refresh the tables inventory.
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory])
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]
)
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the
# history log.
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
ctx.tables_progress.append_inventory_snapshot(tables_snapshot)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]
)
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class ScanTablesInMounts(Workflow):
Expand All @@ -137,10 +257,36 @@ def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext):
replacing any existing content that might be present."""
ctx.tables_in_mounts.snapshot(force_refresh=True)

@job_task(job_cluster="user_isolation", depends_on=[scan_tables_in_mounts_experimental])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="user_isolation")
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))

@job_task(
job_cluster="user_isolation",
depends_on=[verify_progress_tracking_prerequisites, scan_tables_in_mounts_experimental],
)
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]
)
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
ctx.tables_progress.append_inventory_snapshot(tables_snapshot)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]
)
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class MigrateTablesInMounts(Workflow):
Expand All @@ -152,7 +298,41 @@ def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext):
"""[EXPERIMENTAL] This workflow migrates `delta tables stored in mount points` to Unity Catalog using a Create Table statement."""
ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT)

@job_task(job_cluster="user_isolation", depends_on=[migrate_tables_in_mounts_experimental])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="user_isolation")
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))

@job_task(depends_on=[verify_progress_tracking_prerequisites, migrate_tables_in_mounts_experimental])
def update_table_inventory(self, ctx: RuntimeContext) -> None:
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not
# UC-enabled, so we cannot both snapshot and update the history log from the same location.
# Step 1 of 3: Just refresh the tables inventory.
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory])
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]
)
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the
# history log.
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
ctx.tables_progress.append_inventory_snapshot(tables_snapshot)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]
)
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()
Loading

0 comments on commit 36752ed

Please sign in to comment.