Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add table in mount migration #1225

Merged
merged 38 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b5675f2
Adding table in mount migration
william-conti Apr 2, 2024
ce9d24b
Refactoring
william-conti Apr 2, 2024
0c12eaa
removing useless changes
william-conti Apr 2, 2024
a1f35ba
Merge branch 'main' into add_table_in_mount_migration
william-conti Apr 2, 2024
362c547
fixing oopsie
william-conti Apr 2, 2024
9e6099c
wip
william-conti Apr 3, 2024
64f74af
adding csv support
william-conti Apr 4, 2024
297fde6
fixing log
william-conti Apr 4, 2024
e0a2953
Fixes
william-conti Apr 8, 2024
32878be
Merge branch 'main' into add_table_in_mount_migration
william-conti Apr 8, 2024
0e70761
pr fixes
william-conti Apr 8, 2024
bc792c0
removing irrelevant changes
william-conti Apr 8, 2024
74fff05
removing changes
william-conti Apr 8, 2024
e420e5f
removing cheats
william-conti Apr 8, 2024
c2b3714
removing useless boilerplate
william-conti Apr 8, 2024
a5fa67a
fixing fmt
william-conti Apr 8, 2024
bd9b6e0
Merge branch 'main' into add_table_in_mount_migration
william-conti Apr 9, 2024
bfe0983
merges conflicts
william-conti Apr 9, 2024
4aec2b1
adding default owner permission
william-conti Apr 9, 2024
d11467e
end to end tests
william-conti Apr 9, 2024
627c9ed
fmt
william-conti Apr 9, 2024
93428d2
PR review
william-conti Apr 10, 2024
d685971
Merge branch 'main' into add_table_in_mount_migration
william-conti Apr 10, 2024
e7422c6
first pr returns
william-conti Apr 10, 2024
f955b69
Merge branch 'main' into add_table_in_mount_migration
william-conti Apr 12, 2024
7e9615b
PR returns
william-conti Apr 12, 2024
4a233e9
removing useless change
william-conti Apr 12, 2024
595181f
adding formats
william-conti Apr 12, 2024
9ba50a4
Merge branch 'main' into add_table_in_mount_migration
william-conti Apr 19, 2024
a888162
Merge branch 'main' into add_table_in_mount_migration
william-conti May 21, 2024
c2026e1
Merge issues
william-conti May 21, 2024
b15c558
removing features, fixing tests
william-conti May 21, 2024
d1e7503
removing public methods
william-conti May 21, 2024
9669208
Merge branch 'main' into add_table_in_mount_migration
william-conti May 21, 2024
0d931c3
Merge branch 'main' into add_table_in_mount_migration
william-conti May 21, 2024
c47ad0e
Merge branch 'main' into add_table_in_mount_migration
william-conti May 22, 2024
a139a83
Merge branch 'main' into add_table_in_mount_migration
william-conti May 23, 2024
115db64
Merge branch 'main' into add_table_in_mount_migration
william-conti May 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,29 @@ There are 3 main table migration workflows, targeting different table types. All
- Consider creating an instance pool, and setting its id when prompted during the UCX installation. This instance pool will be specified in the cluster policy used by all UCX workflows job clusters.
- You may also manually edit the job cluster configration per job or per task after the workflows are deployed.

### [EXPERIMENTAL] Scan tables in mounts Workflow
#### <b>Always run this workflow AFTER the assessment has finished</b>
- This experimental workflow attemps to find all Tables inside mount points that are present on your workspace.
- If you do not run this workflow, then `migrate-tables-in-mounts-experimental` won't do anything.
- It writes all results to `hive_metastore.<inventory_database>.tables`, you can query those tables found by filtering on database values that starts with `mounted_`
- This command is incremental, meaning that each time you run it, it will overwrite the previous tables in mounts found.
- Current format are supported:
- DELTA - PARQUET - CSV - JSON
- Also detects partitioned DELTA and PARQUET
- You can configure these workflows with the following options available on conf.yml:
- include_mounts : A list of mount points to scans, by default the workflow scans for all mount points
- exclude_paths_in_mount : A list of paths to exclude in all mount points
- include_paths_in_mount : A list of paths to include in all mount points

### [EXPERIMENTAL] Migrate tables in mounts Workflow
- An experimental workflow that migrates tables in mount points using a `CREATE TABLE` command, optinally sets a default tables owner if provided in `default_table_owner` conf parameter.
- You must do the following in order to make this work:
- run the Assessment [workflow](#assessment-workflow)
- run the scan tables in mounts [workflow](#EXPERIMENTAL-scan-tables-in-mounts-workflow)
- run the [`create-table-mapping` command](#create-table-mapping-command)
- or manually create a `mapping.csv` file in Workspace -> Applications -> ucx


[[back to top](#databricks-labs-ucx)]

## Jobs Static Code Analysis Workflow
Expand Down
64 changes: 44 additions & 20 deletions src/databricks/labs/ucx/hive_metastore/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ def _get_databases_in_scope(self, databases: set[str]):
return Threads.strict("checking databases for skip property", tasks)

def _get_database_in_scope_task(self, database: str) -> str | None:
if database.startswith("mounted_"):
return database
describe = {}
try:
for value in self._sql_backend.fetch(f"DESCRIBE SCHEMA EXTENDED {escape_sql_identifier(database)}"):
Expand All @@ -182,14 +184,11 @@ def _get_table_in_scope_task(self, table_to_migrate: TableToMigrate) -> TableToM
if self.exists_in_uc(table, rule.as_uc_table_key):
logger.info(f"The intended target for {table.key}, {rule.as_uc_table_key}, already exists.")
return None
try:
result = self._sql_backend.fetch(
f"SHOW TBLPROPERTIES {escape_sql_identifier(table.database)}.{escape_sql_identifier(table.name)}"
)
except DatabricksError as err:
logger.warning(f"Failed to get properties for Table {table.key}: {err}")
properties = self._get_table_properties(table)
if not properties:
return None
for value in result:

for value in properties:
if value["key"] == self.UCX_SKIP_PROPERTY:
logger.info(f"{table.key} is marked to be skipped")
return None
Expand All @@ -210,20 +209,45 @@ def _get_table_in_scope_task(self, table_to_migrate: TableToMigrate) -> TableToM

return table_to_migrate

def _get_table_properties(self, table: Table):
table_identifier = f"{escape_sql_identifier(table.database)}.{escape_sql_identifier(table.name)}"
if table.is_table_in_mount:
table_identifier = f"delta.`{table.location}`"

try:
return self._sql_backend.fetch(f"SHOW TBLPROPERTIES {table_identifier}")
except DatabricksError as err:
logger.warning(f"Failed to get properties for Table {table.key}: {err}")
return None

def exists_in_uc(self, src_table: Table, target_key: str):
# Attempts to get the target table info from UC returns True if it exists.
try:
table_info = self._ws.tables.get(target_key)
if not table_info.properties:
return True
upgraded_from = table_info.properties.get("upgraded_from")
if upgraded_from and upgraded_from != src_table.key:
raise ResourceConflict(
f"Expected to be migrated from {src_table.key}, but got {upgraded_from}. "
"You can skip this error using the CLI command: "
"databricks labs ucx skip "
f"--schema {src_table.database} --table {src_table.name}"
)
table_info = self._try_get_table_in_uc(target_key)
if not table_info:
return False
# Corner case for tables in mounts where the table exists in UC, but the location is not the same
# from the one provided in the mapping
if src_table.is_table_in_mount and table_info.storage_location != src_table.location:
raise ResourceConflict(
f"Expected to be migrated from {src_table.key}, but got {table_info.storage_location}. "
"You can skip this error using the CLI command: "
"databricks labs ucx skip "
f"--schema {src_table.database} --table {src_table.name}"
)
if not table_info.properties:
return True
upgraded_from = table_info.properties.get("upgraded_from")
if upgraded_from and upgraded_from != src_table.key and not src_table.is_table_in_mount:
raise ResourceConflict(
f"Expected to be migrated from {src_table.key}, but got {upgraded_from}. "
"You can skip this error using the CLI command: "
"databricks labs ucx skip "
f"--schema {src_table.database} --table {src_table.name}"
)
return True

def _try_get_table_in_uc(self, target_key: str):
try:
return self._ws.tables.get(target_key)
except NotFound:
return False
return None
47 changes: 38 additions & 9 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from databricks.labs.blueprint.parallel import Threads
from databricks.labs.lsql.backends import SqlBackend
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.hive_metastore import TablesCrawler, Mounts
Expand Down Expand Up @@ -168,6 +169,8 @@
return self._migrate_table_create_ctas(src_table.src, src_table.rule, grants, mounts)
if src_table.src.what == What.EXTERNAL_SYNC:
return self._migrate_external_table(src_table.src, src_table.rule, grants)
if src_table.src.what == What.TABLE_IN_MOUNT:
return self._migrate_table_in_mount(src_table.src, src_table.rule, grants)
if src_table.src.what == What.EXTERNAL_HIVESERDE:
# This hiveserde_in_place_migrate is used to determine if current hiveserde migration should use in-place migration or CTAS.
# We will provide two workflows for hiveserde table migration:
Expand Down Expand Up @@ -212,9 +215,9 @@
logger.debug(f"Migrating view {src_view.src.key} to using SQL query: {view_migrate_sql}")
try:
self._backend.execute(view_migrate_sql)
self._backend.execute(src_view.src.sql_alter_to(src_view.rule.as_uc_table_key))
self._backend.execute(self._sql_alter_to(src_view.src, src_view.rule.as_uc_table_key))
self._backend.execute(
src_view.src.sql_alter_from(src_view.rule.as_uc_table_key, self._ws.get_workspace_id())
self._sql_alter_from(src_view.src, src_view.rule.as_uc_table_key, self._ws.get_workspace_id())
)
except DatabricksError as e:
logger.warning(f"Failed to migrate view {src_view.src.key} to {src_view.rule.as_uc_table_key}: {e}")
Expand All @@ -241,7 +244,7 @@
f"Status code: {sync_result.status_code}. Description: {sync_result.description}"
)
return False
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
self._backend.execute(self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id()))
return self._migrate_acl(src_table, rule, grants)

def _migrate_external_table_hiveserde_in_place(
Expand Down Expand Up @@ -280,8 +283,8 @@
)
try:
self._backend.execute(table_migrate_sql)
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
self._backend.execute(self._sql_alter_to(src_table, rule.as_uc_table_key))
self._backend.execute(self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id()))
except DatabricksError as e:
logger.warning(f"Failed to migrate table {src_table.key} to {rule.as_uc_table_key}: {e}")
return False
Expand All @@ -295,8 +298,8 @@
)
try:
self._backend.execute(table_migrate_sql)
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
self._backend.execute(self._sql_alter_to(src_table, rule.as_uc_table_key))
self._backend.execute(self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id()))
except DatabricksError as e:
logger.warning(f"Failed to migrate table {src_table.key} to {rule.as_uc_table_key}: {e}")
return False
Expand All @@ -316,8 +319,23 @@
logger.debug(f"Migrating table {src_table.key} to {rule.as_uc_table_key} using SQL query: {table_migrate_sql}")
try:
self._backend.execute(table_migrate_sql)
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
self._backend.execute(self._sql_alter_to(src_table, rule.as_uc_table_key))
self._backend.execute(self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id()))
except DatabricksError as e:
logger.warning(f"Failed to migrate table {src_table.key} to {rule.as_uc_table_key}: {e}")
return False

Check warning on line 326 in src/databricks/labs/ucx/hive_metastore/table_migrate.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/hive_metastore/table_migrate.py#L324-L326

Added lines #L324 - L326 were not covered by tests
return self._migrate_acl(src_table, rule, grants)

def _migrate_table_in_mount(self, src_table: Table, rule: Rule, grants: list[Grant] | None = None):
target_table_key = rule.as_uc_table_key
try:
table_schema = self._backend.fetch(f"DESCRIBE TABLE delta.`{src_table.location}`;")
table_migrate_sql = src_table.sql_migrate_table_in_mount(target_table_key, table_schema)
logger.info(
f"Migrating table in mount {src_table.location} to UC table {rule.as_uc_table_key} using SQL query: {table_migrate_sql}"
)
self._backend.execute(table_migrate_sql)
self._backend.execute(self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id()))
except DatabricksError as e:
logger.warning(f"Failed to migrate table {src_table.key} to {rule.as_uc_table_key}: {e}")
return False
Expand Down Expand Up @@ -460,3 +478,14 @@
grant = dataclasses.replace(grant, principal=matched_group[0])
matched_grants.append(grant)
return matched_grants

def _sql_alter_to(self, table: Table, target_table_key: str):
return f"ALTER {table.kind} {escape_sql_identifier(table.key)} SET TBLPROPERTIES ('upgraded_to' = '{target_table_key}');"

def _sql_alter_from(self, table: Table, target_table_key: str, ws_id: int):
source = table.location if table.is_table_in_mount else table.key
return (
f"ALTER {table.kind} {escape_sql_identifier(target_table_key)} SET TBLPROPERTIES "
f"('upgraded_from' = '{source}'"
f" , '{table.UPGRADED_FROM_WS_PARAM}' = '{ws_id}');"
)
52 changes: 33 additions & 19 deletions src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import re
import typing
from collections.abc import Iterable
from collections.abc import Iterable, Iterator
from dataclasses import dataclass
from enum import Enum, auto
from functools import partial
Expand All @@ -28,6 +28,7 @@ class What(Enum):
DBFS_ROOT_DELTA = auto()
DBFS_ROOT_NON_DELTA = auto()
VIEW = auto()
TABLE_IN_MOUNT = auto()
DB_DATASET = auto()
UNKNOWN = auto()

Expand Down Expand Up @@ -87,6 +88,8 @@ def is_delta(self) -> bool:

@property
def key(self) -> str:
if self.is_table_in_mount:
return f"{self.catalog}.{self.database}.{self.location}".lower()
return f"{self.catalog}.{self.database}.{self.name}".lower()

@property
Expand All @@ -103,16 +106,6 @@ def __eq__(self, other):
def kind(self) -> str:
return "VIEW" if self.view_text is not None else "TABLE"

def sql_alter_to(self, target_table_key):
return f"ALTER {self.kind} {escape_sql_identifier(self.key)} SET TBLPROPERTIES ('upgraded_to' = '{target_table_key}');"

def sql_alter_from(self, target_table_key, ws_id):
return (
f"ALTER {self.kind} {escape_sql_identifier(target_table_key)} SET TBLPROPERTIES "
f"('upgraded_from' = '{self.key}'"
f" , '{self.UPGRADED_FROM_WS_PARAM}' = '{ws_id}');"
)

def sql_unset_upgraded_to(self):
return f"ALTER {self.kind} {escape_sql_identifier(self.key)} UNSET TBLPROPERTIES IF EXISTS('upgraded_to');"

Expand Down Expand Up @@ -147,14 +140,6 @@ def is_format_supported_for_sync(self) -> bool:
return False
return self.table_format.upper() in {"DELTA", "PARQUET", "CSV", "JSON", "ORC", "TEXT", "AVRO"}

@property
def is_format_supported_for_create_like(self) -> bool:
# Based on documentation
# https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-create-table-like.html
if self.table_format is None:
return False
return self.table_format.upper() in {"DELTA", "PARQUET", "CSV", "JSON", "TEXT"}

@property
def is_databricks_dataset(self) -> bool:
if not self.location:
Expand All @@ -164,10 +149,16 @@ def is_databricks_dataset(self) -> bool:
return True
return False

@property
def is_table_in_mount(self) -> bool:
return self.database.startswith("mounted_") and self.is_delta

@property
def what(self) -> What:
if self.is_databricks_dataset:
return What.DB_DATASET
if self.is_table_in_mount:
return What.TABLE_IN_MOUNT
if self.is_dbfs_root and self.table_format == "DELTA":
return What.DBFS_ROOT_DELTA
if self.is_dbfs_root:
Expand Down Expand Up @@ -307,6 +298,29 @@ def sql_migrate_dbfs(self, target_table_key):
def sql_migrate_view(self, target_table_key):
return f"CREATE VIEW IF NOT EXISTS {escape_sql_identifier(target_table_key)} AS {self.view_text};"

def sql_migrate_table_in_mount(self, target_table_key: str, table_schema: Iterator[typing.Any]):
fields = []
partitioned_fields = []
next_fileds_are_partitioned = False
for key, value, _ in table_schema:
if key == "# Partition Information":
continue
if key == "# col_name":
next_fileds_are_partitioned = True
continue
if next_fileds_are_partitioned:
partitioned_fields.append(f"{key}")
else:
fields.append(f"{key} {value}")

partitioned_str = ""
if partitioned_fields:
partitioning_columns = ", ".join(partitioned_fields)
partitioned_str = f"PARTITIONED BY ({partitioning_columns})"
schema = ", ".join(fields)

return f"CREATE TABLE IF NOT EXISTS {escape_sql_identifier(target_table_key)} ({schema}) {partitioned_str} LOCATION '{self.location}';"


@dataclass
class TableError:
Expand Down
24 changes: 22 additions & 2 deletions src/databricks/labs/ucx/hive_metastore/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@
dashboard _before_ all tasks have been completed, but then only already completed information is shown."""


class MigrateTablesInMounts(Workflow):
class ScanTablesInMounts(Workflow):
def __init__(self):
super().__init__('migrate-tables-in-mounts-experimental')
super().__init__('scan-tables-in-mounts-experimental')

@job_task
def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext):
Expand All @@ -199,3 +199,23 @@
def migration_report(self, ctx: RuntimeContext):
"""Refreshes the migration dashboard after all previous tasks have been completed. Note that you can access the
dashboard _before_ all tasks have been completed, but then only already completed information is shown."""


class MigrateTablesInMounts(Workflow):
def __init__(self):
super().__init__('migrate-tables-in-mounts-experimental')

@job_task(job_cluster="table_migration", depends_on=[ScanTablesInMounts.scan_tables_in_mounts_experimental])
def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext):
"""[EXPERIMENTAL] This workflow migrates `delta tables stored in mount points` to Unity Catalog using a Create Table statement."""
ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT)

Check warning on line 211 in src/databricks/labs/ucx/hive_metastore/workflows.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/hive_metastore/workflows.py#L211

Added line #L211 was not covered by tests

@job_task(job_cluster="table_migration", depends_on=[migrate_tables_in_mounts_experimental])
def refresh_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.index_full_refresh()

@job_task(dashboard="migration_main", depends_on=[refresh_migration_status])
def migration_report(self, ctx: RuntimeContext):
"""Refreshes the migration dashboard after all previous tasks have been completed. Note that you can access the
dashboard _before_ all tasks have been completed, but then only already completed information is shown."""
Loading
Loading