Skip to content

Commit

Permalink
Detect tables that are not present in the mapping file
Browse files Browse the repository at this point in the history
Introduces #1221
  • Loading branch information
aminmovahed-db committed Aug 5, 2024
1 parent 642ecec commit 3b541e5
Showing 1 changed file with 24 additions and 0 deletions.
24 changes: 24 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ def as_uc_table_key(self):
def as_hms_table_key(self):
return f"hive_metastore.{self.src_schema}.{self.src_table}"

@dataclass
class TableNotMapped:
workspace_name: str
src_table: str

@classmethod
def initial(cls, workspace_name: str, table: str) -> "TableNotMapped":
return cls(
workspace_name=workspace_name,
src_table=table,
)


@dataclass
class TableToMigrate:
Expand All @@ -81,6 +93,7 @@ def __eq__(self, other):

class TableMapping:
FILENAME = 'mapping.csv'
FILENAME_UNMAPPED = 'unmapped_tables.csv'
UCX_SKIP_PROPERTY = "databricks.labs.ucx.skip"

def __init__(
Expand All @@ -103,10 +116,21 @@ def current_tables(self, tables: TablesCrawler, workspace_name: str, catalog_nam
for table in tables_snapshot:
yield Rule.initial(workspace_name, catalog_name, table, self._recon_tolerance_percent)

@staticmethod
def tables_not_mapped(tables_crawler: TablesCrawler, current_tables: list[Rule], workspace_name: str):
crawled_tables_keys = [crawled_table.key for crawled_table in tables_crawler.snapshot()]
hms_table_keys = [rule.as_hms_table_key for rule in current_tables]
for crawled_table_key in crawled_tables_keys:
if crawled_table_key not in hms_table_keys:
yield TableNotMapped.initial(workspace_name, crawled_table_key)

def save(self, tables: TablesCrawler, workspace_info: WorkspaceInfo) -> str:
workspace_name = workspace_info.current()
default_catalog_name = re.sub(r"\W+", "_", workspace_name)
current_tables = self.current_tables(tables, workspace_name, default_catalog_name)
unmapped_tables = self.tables_not_mapped(tables, list(current_tables), workspace_name)
if len(unmapped_tables) != 0:
self._installation.save(list(unmapped_tables), filename=self.FILENAME_UNMAPPED)
return self._installation.save(list(current_tables), filename=self.FILENAME)

def load(self) -> list[Rule]:
Expand Down

0 comments on commit 3b541e5

Please sign in to comment.