Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update table-migration workflows to also capture updated migration progress into the history log #3239

Merged
merged 53 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
905fefc
Remove some unnecessary code.
asnare Nov 6, 2024
c23c38a
Update table history logger to not trigger a refresh of the migration…
asnare Nov 6, 2024
9ebb26c
Consistent import.
asnare Nov 7, 2024
e861963
Update the various table-migration workflows to also update the histo…
asnare Nov 7, 2024
cfe1486
All workflows update the logs table.
asnare Nov 7, 2024
145cd7d
Table migration workflows also update the tables inventory (at the end).
asnare Nov 7, 2024
5289582
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 11, 2024
6e5e4ae
Switch to multi-line f""" """-string.
asnare Nov 11, 2024
778ad10
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 11, 2024
1ce8e05
Fix mock return value for crawler snapshot.
asnare Nov 11, 2024
78787df
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 12, 2024
788789f
Switch to specialisation (limited to TableProgressEncoder) for ensuri…
asnare Nov 12, 2024
891b3b7
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 12, 2024
f9bf219
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 13, 2024
da3b15b
Back out changes relating to the way the migration-status information…
asnare Nov 13, 2024
ac8586a
Back out more changes that are either not needed or made on other PRs.
asnare Nov 13, 2024
4b3717f
Remove comment that is no longer relevant.
asnare Nov 13, 2024
c80a9c6
Verify prerequisites for updating the migration-progress prior to the…
asnare Nov 13, 2024
f638cb5
No need to mention the assessment; we won't reach this point of the w…
asnare Nov 13, 2024
2d398f4
Use TODO marker instead of warning to highlight what we'd prefer to h…
asnare Nov 13, 2024
df9f689
Merge branch 'main' into more-workflow-history-snapshots
nfx Nov 13, 2024
4e579fd
Merge branch 'main' into more-workflow-history-snapshots
nfx Nov 18, 2024
e4d4220
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 18, 2024
14cdc77
Merge branch 'main' into more-workflow-history-snapshots
nfx Nov 18, 2024
1a32bd7
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 19, 2024
1df72bf
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 20, 2024
dbafbac
Ensure the assessment has finished before table-migration runs, and t…
asnare Nov 20, 2024
839ac76
Ensure the progress-migration catalog is configured.
asnare Nov 20, 2024
862b5bf
Remove unused fixture.
asnare Nov 20, 2024
54d06f7
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 20, 2024
12a1d9f
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 21, 2024
7774a78
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 25, 2024
a074fd7
Remove unused fixture.
asnare Nov 25, 2024
034c91b
Split over several lines to make debugging easier.
asnare Nov 25, 2024
dd74dd9
Refactor for debugging.
asnare Nov 25, 2024
4dde220
Fix test to invoke the workflow its verifying.
asnare Nov 25, 2024
007e23b
Merge branch 'main' into more-workflow-history-snapshots
asnare Dec 3, 2024
2dbc2e4
Adjust the debugging convenience.
asnare Dec 3, 2024
375276e
Merge branch 'main' into more-workflow-history-snapshots
asnare Dec 4, 2024
e30d2cc
Configure test with new infrastructure.
asnare Dec 4, 2024
c777794
Fix linting problems.
asnare Dec 4, 2024
b763886
Merge branch 'main' into more-workflow-history-snapshots
asnare Dec 9, 2024
c2127a5
Merge branch 'main' into more-workflow-history-snapshots
asnare Dec 9, 2024
babbc69
Rename task to verify progress-tracking prerequisites.
asnare Dec 9, 2024
1e45dc0
Formatting.
asnare Dec 9, 2024
c966766
Rename method to better indicate purpose.
asnare Dec 9, 2024
5e653de
Inline a local variable.
asnare Dec 9, 2024
ade16ae
Remove misleading TODO marker in lieu of #3422.
asnare Dec 10, 2024
bad20f2
Use alternate plural spelling.
asnare Dec 10, 2024
c13b423
Merge branch 'main' into more-workflow-history-snapshots
asnare Dec 11, 2024
7aa937f
Merge branch 'main' into more-workflow-history-snapshots
asnare Dec 11, 2024
fe34a3b
Use context consistently
JCZuurmond Dec 12, 2024
e0aeb4c
Merge branch 'main' into more-workflow-history-snapshots
asnare Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/table_persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Table utilization per workflow:

| Table | Generate Assessment | Update Migration Progress | Migrate Groups | Migrate External Tables | Upgrade Jobs | Migrate tables | Migrate Data Reconciliation |
|--------------------------|---------------------|---------------------------|----------------|-------------------------|--------------|----------------|-----------------------------|
| tables | RW | RW | | RO | | RO | |
| tables | RW | RW | | RW | | RW | |
| grants | RW | RW | | RW | | RW | |
| mounts | RW | | | RO | RO | RO | |
| permissions | RW | | RW | RO | | RO | |
Expand All @@ -30,7 +30,7 @@ Table utilization per workflow:
| query_problems | RW | RW | | | | | |
| workflow_problems | RW | RW | | | | | |
| udfs | RW | RW | RO | | | | |
| logs | RW | | RW | RW | | RW | RW |
| logs | RW | RW | RW | RW | RW | RW | RW |
| recon_results | | | | | | | RW |

**RW** - Read/Write, the job generates or updates the table.<br/>
Expand Down
10 changes: 5 additions & 5 deletions src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
from databricks.labs.ucx.contexts.application import GlobalContext
from databricks.labs.ucx.hive_metastore import TablesInMounts, TablesCrawler
from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table
from databricks.labs.ucx.hive_metastore.udfs import Udf
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder
from databricks.labs.ucx.progress.grants import GrantProgressEncoder
from databricks.labs.ucx.progress.grants import GrantProgressEncoder, Grant
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
from databricks.labs.ucx.progress.tables import TableProgressEncoder
Expand Down Expand Up @@ -189,7 +189,7 @@ def policies_progress(self) -> ProgressEncoder[PolicyInfo]:
)

@cached_property
def grants_progress(self) -> GrantProgressEncoder:
def grants_progress(self) -> ProgressEncoder[Grant]:
return GrantProgressEncoder(
self.sql_backend,
self.grant_ownership,
Expand Down Expand Up @@ -221,11 +221,11 @@ def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]:
)

@cached_property
def tables_progress(self) -> TableProgressEncoder:
def tables_progress(self) -> ProgressEncoder[Table]:
return TableProgressEncoder(
self.sql_backend,
self.table_ownership,
self.migration_status_refresher.index(force_refresh=False),
self.migration_status_refresher,
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
Expand Down
16 changes: 9 additions & 7 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import re
from collections import defaultdict
from collections.abc import Iterable
from functools import partial, cached_property

from databricks.labs.blueprint.parallel import Threads
Expand All @@ -19,7 +20,11 @@
TableToMigrate,
)

from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher
from databricks.labs.ucx.hive_metastore.table_migration_status import (
TableMigrationStatusRefresher,
TableMigrationStatus,
TableMigrationIndex,
)
from databricks.labs.ucx.hive_metastore.tables import (
MigrationCount,
Table,
Expand Down Expand Up @@ -55,14 +60,11 @@ def __init__(
self._migrate_grants = migrate_grants
self._external_locations = external_locations

def get_remaining_tables(self) -> list[Table]:
nfx marked this conversation as resolved.
Show resolved Hide resolved
self.index(force_refresh=True)
table_rows = []
def check_remaining_tables(self, migration_status: Iterable[TableMigrationStatus]):
migration_index = TableMigrationIndex(migration_status)
for crawled_table in self._tables_crawler.snapshot():
if not self._is_migrated(crawled_table.database, crawled_table.name):
table_rows.append(crawled_table)
if not migration_index.is_migrated(crawled_table.database, crawled_table.name):
logger.warning(f"remained-hive-metastore-table: {crawled_table.key}")
return table_rows

def index(self, *, force_refresh: bool = False):
return self._migration_status_refresher.index(force_refresh=force_refresh)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def key(self):


class TableMigrationIndex:
def __init__(self, tables: list[TableMigrationStatus]):
def __init__(self, tables: Iterable[TableMigrationStatus]):
self._index = {(ms.src_schema, ms.src_table): ms for ms in tables}

def is_migrated(self, schema: str, table: str) -> bool:
Expand Down Expand Up @@ -84,7 +84,7 @@ def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, tables_
self._tables_crawler = tables_crawler

def index(self, *, force_refresh: bool = False) -> TableMigrationIndex:
return TableMigrationIndex(list(self.snapshot(force_refresh=force_refresh)))
return TableMigrationIndex(self.snapshot(force_refresh=force_refresh))

def get_seen_tables(self) -> dict[str, str]:
seen_tables: dict[str, str] = {}
Expand Down
187 changes: 167 additions & 20 deletions src/databricks/labs/ucx/hive_metastore/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,50 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW)

@job_task(job_cluster="table_migration", depends_on=[migrate_views])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="tacl")
def setup_tacl(self, ctx: RuntimeContext):
"""(Optimization) Allow the TACL job cluster to be started before we commence refreshing the tables inventory."""

@job_task(
depends_on=[
convert_managed_table,
migrate_external_tables_sync,
migrate_dbfs_root_delta_tables,
migrate_dbfs_root_non_delta_tables,
migrate_views,
setup_tacl,
],
job_cluster="tacl",
asnare marked this conversation as resolved.
Show resolved Hide resolved
)
def update_table_inventory(self, ctx: RuntimeContext) -> None:
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
# The TACL cluster is not UC-enabled, so we cannot the snapshot cannot be written immediately to the history log.
# Step 1 of 3: Just refresh the tables inventory.
ctx.tables_crawler.snapshot(force_refresh=True)
asnare marked this conversation as resolved.
Show resolved Hide resolved

@job_task(depends_on=[update_table_inventory], job_cluster="table_migration")
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on a TACL cluster.)
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.check_remaining_tables(updated_migration_progress)
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved

@job_task(depends_on=[update_migration_status], job_cluster="table_migration")
asnare marked this conversation as resolved.
Show resolved Hide resolved
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# The table migration cluster is not legacy-ACL enabled, so we can't crawl from here.
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status was refreshed, capture into the
# history log.
# WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl.
history_log = ctx.tables_progress
asnare marked this conversation as resolved.
Show resolved Hide resolved
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
history_log.append_inventory_snapshot(tables_snapshot)

@job_task(job_cluster="table_migration", depends_on=[update_tables_history_log])
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class MigrateHiveSerdeTablesInPlace(Workflow):
Expand All @@ -86,10 +126,40 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW)

@job_task(job_cluster="table_migration", depends_on=[migrate_views])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="tacl")
def setup_tacl(self, ctx: RuntimeContext):
"""(Optimization) Allow the TACL job cluster to be started before we commence refreshing the tables inventory."""

@job_task(depends_on=[migrate_hive_serde_in_place, migrate_views, setup_tacl], job_cluster="tacl")
def update_table_inventory(self, ctx: RuntimeContext) -> None:
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
# The TACL cluster is not UC-enabled, so we cannot the snapshot cannot be written immediately to the history log.
# Step 1 of 3: Just refresh the tables inventory.
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(depends_on=[update_table_inventory], job_cluster="table_migration")
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on a TACL cluster.)
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.check_remaining_tables(updated_migration_progress)

@job_task(depends_on=[update_migration_status], job_cluster="table_migration")
asnare marked this conversation as resolved.
Show resolved Hide resolved
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# The table migration cluster is not legacy-ACL enabled, so we can't crawl from here.
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status was refreshed, capture into the
# history log.
# WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl.
history_log = ctx.tables_progress
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
history_log.append_inventory_snapshot(tables_snapshot)

@job_task(job_cluster="table_migration", depends_on=[update_tables_history_log])
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class MigrateExternalTablesCTAS(Workflow):
Expand Down Expand Up @@ -120,10 +190,42 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW)

@job_task(job_cluster="table_migration", depends_on=[migrate_views])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="tacl")
def setup_tacl(self, ctx: RuntimeContext):
"""(Optimization) Allow the TACL job cluster to be started before we commence refreshing the tables inventory."""

@job_task(
depends_on=[migrate_other_external_ctas, migrate_hive_serde_ctas, migrate_views, setup_tacl], job_cluster="tacl"
)
def update_table_inventory(self, ctx: RuntimeContext) -> None:
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
# The TACL cluster is not UC-enabled, so we cannot the snapshot cannot be written immediately to the history log.
# Step 1 of 3: Just refresh the tables inventory.
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(depends_on=[update_table_inventory], job_cluster="table_migration")
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on a TACL cluster.)
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.check_remaining_tables(updated_migration_progress)

@job_task(depends_on=[update_migration_status], job_cluster="table_migration")
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
asnare marked this conversation as resolved.
Show resolved Hide resolved
"""Update the history log with the latest tables inventory and migration status."""
# The table migration cluster is not legacy-ACL enabled, so we can't crawl from here.
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status was refreshed, capture into the
# history log.
# WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl.
history_log = ctx.tables_progress
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
history_log.append_inventory_snapshot(tables_snapshot)

@job_task(job_cluster="table_migration", depends_on=[update_tables_history_log])
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class ScanTablesInMounts(Workflow):
Expand All @@ -137,10 +239,25 @@ def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext):
replacing any existing content that might be present."""
ctx.tables_in_mounts.snapshot(force_refresh=True)

@job_task(job_cluster="table_migration", depends_on=[scan_tables_in_mounts_experimental])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(depends_on=[scan_tables_in_mounts_experimental], job_cluster="table_migration")
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.check_remaining_tables(updated_migration_progress)

@job_task(depends_on=[update_migration_status], job_cluster="table_migration")
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl.
history_log = ctx.tables_progress
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
history_log.append_inventory_snapshot(tables_snapshot)
asnare marked this conversation as resolved.
Show resolved Hide resolved

@job_task(job_cluster="table_migration", depends_on=[update_tables_history_log])
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class MigrateTablesInMounts(Workflow):
Expand All @@ -152,7 +269,37 @@ def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext):
"""[EXPERIMENTAL] This workflow migrates `delta tables stored in mount points` to Unity Catalog using a Create Table statement."""
ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT)

@job_task(job_cluster="table_migration", depends_on=[migrate_tables_in_mounts_experimental])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="tacl")
def setup_tacl(self, ctx: RuntimeContext):
"""(Optimization) Allow the TACL job cluster to be started before we commence refreshing the tables inventory."""

@job_task(depends_on=[migrate_tables_in_mounts_experimental, setup_tacl], job_cluster="tacl")
def update_table_inventory(self, ctx: RuntimeContext) -> None:
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
# The TACL cluster is not UC-enabled, so we cannot the snapshot cannot be written immediately to the history log.
# Step 1 of 3: Just refresh the tables inventory.
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(depends_on=[update_table_inventory], job_cluster="table_migration")
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on a TACL cluster.)
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.check_remaining_tables(updated_migration_progress)

@job_task(depends_on=[update_migration_status], job_cluster="table_migration")
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# The table migration cluster is not legacy-ACL enabled, so we can't crawl from here.
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status was refreshed, capture into the
# history log.
# WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl.
history_log = ctx.tables_progress
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
history_log.append_inventory_snapshot(tables_snapshot)
asnare marked this conversation as resolved.
Show resolved Hide resolved

@job_task(job_cluster="table_migration", depends_on=[update_tables_history_log])
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()
Loading
Loading