Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add history log encoder for dashboards #3424

Merged
merged 48 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
59f7535
Set LineageAtom.other to dict by default
JCZuurmond Dec 10, 2024
6b98b1f
Add dashboard progress encoder
JCZuurmond Dec 10, 2024
44db0b4
Get table failures from historical table snapshot
JCZuurmond Dec 10, 2024
9d49d17
Allow other to be None
JCZuurmond Dec 10, 2024
88afb6a
Remove cached properties from dashboard progress encoder
JCZuurmond Dec 11, 2024
7351411
Add first integration test for dashboard progress encoder
JCZuurmond Dec 11, 2024
a1e0583
Test dashboard progress encoder without failures
JCZuurmond Dec 11, 2024
de54166
Test dashboard failure coming from query problem
JCZuurmond Dec 11, 2024
96705d0
Test dashboard failure coming from dfsa
JCZuurmond Dec 11, 2024
c5607b6
Rewrite tests to assert on historical rows
JCZuurmond Dec 11, 2024
15bdfd7
Test used table failure from hive metastore table
JCZuurmond Dec 11, 2024
e5e8896
Merge tests
JCZuurmond Dec 11, 2024
789e1c1
Test Table.from_historical
JCZuurmond Dec 11, 2024
76bb74f
Format
JCZuurmond Dec 11, 2024
d131254
Assert row in integration test
JCZuurmond Dec 11, 2024
dc355ca
Add default attributes to expected data
JCZuurmond Dec 16, 2024
eb04435
Add id attributes to dashboard
JCZuurmond Dec 16, 2024
0121994
Add dashboard ownership to GlobalContext
JCZuurmond Dec 16, 2024
702fff1
Add dashboard progress encoder to runtime context
JCZuurmond Dec 16, 2024
a006639
Force key word argument in dashboard progress encoder
JCZuurmond Dec 16, 2024
7fb3d96
Update dashboard progress encoder integration test
JCZuurmond Dec 16, 2024
552ff06
Expect DFSA message to come from query problem
JCZuurmond Dec 16, 2024
0e66276
Add from table info to Table
JCZuurmond Dec 16, 2024
15c9de2
Isort
JCZuurmond Dec 16, 2024
47f1edc
Remove used tables crawler from TableProgressEncoder
JCZuurmond Dec 16, 2024
ce97801
Pass failure from used table to dashboard
JCZuurmond Dec 16, 2024
cf4129d
Persist dashboard migration progress in workflow
JCZuurmond Dec 16, 2024
2c69264
Force run integration test in CI
JCZuurmond Dec 16, 2024
355b1af
Test Redash ownership is me
JCZuurmond Dec 17, 2024
35f23fe
Test ownership of directory
JCZuurmond Dec 17, 2024
6360a61
Support ownership of directory
JCZuurmond Dec 17, 2024
4d05b3f
Test owner of directory
JCZuurmond Dec 17, 2024
9ce61cd
Test retrieving ownership for invalid path
JCZuurmond Dec 17, 2024
62e1081
Handle invalid path
JCZuurmond Dec 17, 2024
a0bf82d
Add missing type hints
JCZuurmond Dec 17, 2024
a39ab7c
Test warn about unsupported object type
JCZuurmond Dec 17, 2024
4540e74
Warn about unsupported object type
JCZuurmond Dec 17, 2024
2c835f0
Test Lakeview dashboard ownership is me
JCZuurmond Dec 17, 2024
ddd92fb
Test getting user name
JCZuurmond Dec 17, 2024
6b881e6
Handle resource does not exists
JCZuurmond Dec 17, 2024
50bb17f
Handle None types
JCZuurmond Dec 17, 2024
081cb5c
Remove legacy test
JCZuurmond Dec 17, 2024
e50544e
Update unit test
JCZuurmond Dec 17, 2024
c4437fd
Rename me to current_user
JCZuurmond Dec 17, 2024
3cce1b0
The owner of an invalid path should fallback on the workspace admin
JCZuurmond Dec 20, 2024
163b67f
Improve assert message
JCZuurmond Dec 20, 2024
666eb9f
Skip test when running in debug
JCZuurmond Dec 20, 2024
7b89476
Mark test to be skipped
JCZuurmond Jan 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/databricks/labs/ucx/assessment/dashboards.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
from collections.abc import Iterable, Iterator
from dataclasses import dataclass, field
from typing import ClassVar

from databricks.labs.lsql.backends import SqlBackend
from databricks.labs.lsql.lakeview import Dashboard as LsqlLakeviewDashboard, Dataset
Expand Down Expand Up @@ -90,6 +91,8 @@ def from_lakeview_dataset(cls, dataset: Dataset, *, parent: str | None = None) -
class Dashboard:
"""UCX representation of a dashboard."""

__id_attributes__: ClassVar[tuple[str, ...]] = ("id",)

id: str
"""The ID for this dashboard."""

Expand Down Expand Up @@ -414,7 +417,7 @@ def _maybe_direct_owner(self, record: Dashboard) -> str | None:
def _get_user_name(self, user_id: str) -> str | None:
try:
user = self._ws.users.get(user_id)
return user.display_name or user.user_name
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
return user.user_name
except DatabricksError as e:
logger.warning(f"Could not retrieve user: {user_id}", exc_info=e)
return None
5 changes: 5 additions & 0 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from databricks.labs.blueprint.wheels import ProductInfo, WheelsV2
from databricks.labs.lsql.backends import SqlBackend

from databricks.labs.ucx.assessment.dashboards import DashboardOwnership
from databricks.labs.ucx.assessment.jobs import JobsCrawler
from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler
from databricks.labs.ucx.hive_metastore.pipelines_migrate import PipelinesMigrator
Expand Down Expand Up @@ -266,6 +267,10 @@ def udfs_crawler(self) -> UdfsCrawler:
def udf_ownership(self) -> UdfOwnership:
return UdfOwnership(self.administrator_locator)

@cached_property
FastLee marked this conversation as resolved.
Show resolved Hide resolved
def dashboard_ownership(self) -> DashboardOwnership:
return DashboardOwnership(self.administrator_locator, self.workspace_client, self.workspace_path_ownership)

@cached_property
def tables_crawler(self) -> TablesCrawler:
return TablesCrawler(self.sql_backend, self.inventory_database, self.config.include_databases)
Expand Down
15 changes: 14 additions & 1 deletion src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from databricks.sdk import WorkspaceClient, core

from databricks.labs.ucx.__about__ import __version__
from databricks.labs.ucx.assessment.dashboards import Dashboard
from databricks.labs.ucx.assessment.clusters import (
ClustersCrawler,
PoliciesCrawler,
Expand All @@ -25,6 +26,7 @@
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table
from databricks.labs.ucx.hive_metastore.udfs import Udf
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder
from databricks.labs.ucx.progress.dashboards import DashboardProgressEncoder
from databricks.labs.ucx.progress.grants import Grant, GrantProgressEncoder
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
Expand Down Expand Up @@ -227,7 +229,6 @@ def tables_progress(self) -> ProgressEncoder[Table]:
self.sql_backend,
self.table_ownership,
self.migration_status_refresher,
[self.used_tables_crawler_for_paths, self.used_tables_crawler_for_queries],
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
Expand All @@ -244,6 +245,18 @@ def udfs_progress(self) -> ProgressEncoder[Udf]:
self.config.ucx_catalog,
)

@cached_property
def dashboards_progress(self) -> ProgressEncoder[Dashboard]:
return DashboardProgressEncoder(
self.sql_backend,
self.dashboard_ownership,
used_tables_crawlers=[self.used_tables_crawler_for_queries],
inventory_database=self.config.inventory_database,
job_run_id=self.parent_run_id,
workspace_id=self.workspace_id,
catalog=self.config.ucx_catalog,
)

@cached_property
def migration_sequencer(self) -> MigrationSequencer:
return MigrationSequencer(self.workspace_client, self.administrator_locator)
22 changes: 18 additions & 4 deletions src/databricks/labs/ucx/framework/owners.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

from databricks.labs.blueprint.paths import WorkspacePath
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound, InternalError
from databricks.sdk.errors import InternalError, InvalidParameterValue, NotFound, ResourceDoesNotExist
from databricks.sdk.retries import retried
from databricks.sdk.service.iam import User, PermissionLevel
from databricks.sdk.service.iam import User, ObjectPermissions, PermissionLevel
from databricks.sdk.service.workspace import ObjectType

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -219,18 +219,32 @@ def _maybe_direct_owner(self, record: WorkspacePath) -> str | None:

@staticmethod
def _maybe_type_and_id(path: WorkspacePath) -> tuple[str, str] | None:
object_info = path._object_info # pylint: disable=protected-access
try:
object_info = path._object_info # pylint: disable=protected-access
except (InvalidParameterValue, ResourceDoesNotExist):
logger.warning(f"Cannot retrieve status for: {path}")
return None
if not (object_info.object_id and object_info.object_type):
return None
object_id = str(object_info.object_id)
match object_info.object_type:
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
case ObjectType.NOTEBOOK:
return 'notebooks', object_id
case ObjectType.FILE:
return 'files', object_id
case ObjectType.DIRECTORY:
return 'directories', object_id
case _:
logger.warning(f"Unsupported object type: {object_info.object_type.value}")
return None

@staticmethod
def _infer_from_first_can_manage(object_permissions):
def _infer_from_first_can_manage(object_permissions: ObjectPermissions) -> str | None:
if object_permissions.access_control_list is None:
return None
for acl in object_permissions.access_control_list:
if acl.all_permissions is None:
return None
for permission in acl.all_permissions:
if permission.permission_level != PermissionLevel.CAN_MANAGE:
continue
Expand Down
47 changes: 47 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import logging
import re
import typing
Expand All @@ -14,6 +16,7 @@
from databricks.labs.blueprint.parallel import Threads
from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk.errors import NotFound
from databricks.sdk.service.catalog import TableInfo

from databricks.labs.ucx.framework.crawlers import CrawlerBase
from databricks.labs.ucx.framework.utils import escape_sql_identifier
Expand Down Expand Up @@ -86,6 +89,50 @@ def __post_init__(self) -> None:
if isinstance(self.table_format, str): # Should not happen according to type hint, still safer
self.table_format = self.table_format.upper()

@classmethod
def from_table_info(cls, table: TableInfo) -> Table:
if table.catalog_name is None or table.schema_name is None or table.name is None:
raise ValueError(f"Catalog, schema and table name are missing: {table}")
if table.table_type is None:
raise ValueError(f"Table type is missing: {table.table_type}")
if table.data_source_format is None:
raise ValueError(f"Data source format is missing: {table.data_source_format}")
kwargs: dict[str, str | bool] = {
"catalog": table.catalog_name,
"database": table.schema_name,
"name": table.name,
"object_type": table.table_type.value,
"table_format": table.data_source_format.value,
}
if table.storage_location:
kwargs["location"] = table.storage_location
if table.view_definition:
kwargs["view_text"] = table.view_definition
if table.properties and "upgraded_to" in table.properties:
kwargs["upgraded_to"] = bool(table.properties.get("upgraded_to"))
return cls(**kwargs) # type: ignore

@classmethod
def from_historical_data(cls, data: dict[str, str]) -> Table:
kwargs: dict[str, str | bool] = {
"catalog": data["catalog"],
"database": data["database"],
"name": data["name"],
"object_type": data["object_type"],
"table_format": data["table_format"],
}
if "location" in data:
kwargs["location"] = data["location"]
if "view_text" in data:
kwargs["view_text"] = data["view_text"]
if "upgraded_to" in data:
kwargs["upgraded_to"] = data["upgraded_to"]
if "storage_properties" in data:
kwargs["storage_properties"] = data["storage_properties"]
if "is_partitioned" in data:
kwargs["is_partitioned"] = bool(data["is_partitioned"])
return cls(**kwargs) # type: ignore

@property
def is_delta(self) -> bool:
if self.table_format is None:
Expand Down
126 changes: 126 additions & 0 deletions src/databricks/labs/ucx/progress/dashboards.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import collections
import logging
from collections.abc import Iterable
from dataclasses import replace

from databricks.labs.lsql.backends import SqlBackend

from databricks.labs.ucx.assessment.dashboards import Dashboard, DashboardOwnership
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.install import Historical
from databricks.labs.ucx.source_code.base import UsedTable
from databricks.labs.ucx.source_code.queries import QueryProblem
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler


logger = logging.getLogger(__name__)


DashboardIdToFailuresType = dict[str, list[str]] # dict[<dashboard id>, list[<failure message>]]


class DashboardProgressEncoder(ProgressEncoder[Dashboard]):
"""Encoder class:Dashboard to class:History."""

def __init__(
self,
sql_backend: SqlBackend,
ownership: DashboardOwnership,
*,
used_tables_crawlers: list[UsedTablesCrawler],
inventory_database: str,
job_run_id: int,
workspace_id: int,
catalog: str,
) -> None:
super().__init__(
sql_backend,
ownership,
Dashboard,
job_run_id,
workspace_id,
catalog,
"multiworkspace",
"historical",
)
self._inventory_database = inventory_database
self._used_tables_crawlers = used_tables_crawlers

def append_inventory_snapshot(self, snapshot: Iterable[Dashboard]) -> None:
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
query_problems = self._get_query_problems()
table_failures = self._get_tables_failures()
history_records = []
for record in snapshot:
history_record = self._encode_dashboard_as_historical(record, query_problems, table_failures)
history_records.append(history_record)
logger.debug(f"Appending {len(history_records)} {self._klass} table record(s) to history.")
# The mode is 'append'. This is documented as conflict-free.
self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append")

def _get_query_problems(self) -> DashboardIdToFailuresType:
index = collections.defaultdict(list)
for row in self._sql_backend.fetch(
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
'SELECT * FROM query_problems',
catalog='hive_metastore',
schema=self._inventory_database,
):
problem = QueryProblem(**row.asDict())
failure = (
f'[{problem.code}] {problem.query_name} ({problem.dashboard_id}/{problem.query_id}) : {problem.message}'
)
index[problem.dashboard_id].append(failure)
return index

def _get_used_tables(self) -> dict[str, list[UsedTable]]:
index = collections.defaultdict(list)
for crawler in self._used_tables_crawlers:
for used_table in crawler.snapshot():
# The dashboard and query source lineage are added by the QueryLinter
if len(used_table.source_lineage) < 2:
continue
if used_table.source_lineage[0].object_type != "DASHBOARD": # Note: this skips dangling queries
continue
if used_table.source_lineage[1].object_type != "QUERY":
continue
dashboard_id = used_table.source_lineage[0].object_id
index[dashboard_id].append(used_table)
return index

def _get_tables_failures(self) -> DashboardIdToFailuresType:
table_failures = {}
for row in self._sql_backend.fetch(
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
"SELECT * FROM objects_snapshot WHERE object_type = 'Table'",
catalog=self._catalog,
schema=self._schema,
):
historical = Historical(**row.asDict())
table = Table.from_historical_data(historical.data)
table_failures[table.full_name] = historical.failures
index = collections.defaultdict(list)
used_tables = self._get_used_tables()
for dashboard_id, used_tables_in_dashboard in used_tables.items():
for used_table in used_tables_in_dashboard:
for failure in table_failures.get(used_table.full_name, []):
index[dashboard_id].append(f"{failure}: {used_table.full_name}")
return index

def _encode_dashboard_as_historical(
self,
record: Dashboard,
query_problems: DashboardIdToFailuresType,
tables_failures: DashboardIdToFailuresType,
) -> Historical:
"""Encode a dashboard as a historical records.

Failures are detected by the QueryLinter:
- Query problems
- Direct filesystem access by code used in query
- Hive metastore tables
"""
historical = super()._encode_record_as_historical(record)
failures = []
failures.extend(query_problems.get(record.id, []))
failures.extend(tables_failures.get(record.id, []))
return replace(historical, failures=historical.failures + failures)
22 changes: 2 additions & 20 deletions src/databricks/labs/ucx/progress/tables.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
from collections import defaultdict
from collections.abc import Iterable
from dataclasses import replace

Expand All @@ -12,8 +11,6 @@
from databricks.labs.ucx.hive_metastore.ownership import TableOwnership
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.install import Historical
from databricks.labs.ucx.source_code.base import UsedTable
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler


logger = logging.getLogger(__name__)
Expand All @@ -27,7 +24,6 @@ def __init__(
sql_backend: SqlBackend,
ownership: TableOwnership,
migration_status_refresher: CrawlerBase[TableMigrationStatus],
used_tables_crawlers: list[UsedTablesCrawler],
run_id: int,
workspace_id: int,
catalog: str,
Expand All @@ -43,30 +39,18 @@ def __init__(
"historical",
)
self._migration_status_refresher = migration_status_refresher
self._used_tables_crawlers = used_tables_crawlers

def append_inventory_snapshot(self, snapshot: Iterable[Table]) -> None:
migration_index = TableMigrationIndex(self._migration_status_refresher.snapshot())
used_hive_tables = self._get_used_hive_tables()
history_records = []
for record in snapshot:
history_record = self._encode_table_as_historical(record, migration_index, used_hive_tables)
history_record = self._encode_table_as_historical(record, migration_index)
history_records.append(history_record)
logger.debug(f"Appending {len(history_records)} {self._klass} table record(s) to history.")
# The mode is 'append'. This is documented as conflict-free.
self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append")

def _get_used_hive_tables(self) -> dict[str, list[UsedTable]]:
used_tables: dict[str, list[UsedTable]] = defaultdict(list[UsedTable])
for crawler in self._used_tables_crawlers:
for used_table in crawler.snapshot():
if used_table.catalog_name == "hive_metastore":
used_tables[used_table.full_name].append(used_table)
return used_tables

def _encode_table_as_historical(
self, record: Table, migration_index: TableMigrationIndex, used_hive_tables: dict[str, list[UsedTable]]
) -> Historical:
def _encode_table_as_historical(self, record: Table, migration_index: TableMigrationIndex) -> Historical:
"""Encode a table record, enriching with the migration status and used table references.

Possible failures, the table is
Expand All @@ -81,6 +65,4 @@ def _encode_table_as_historical(
failures = []
if not migration_index.is_migrated(record.database, record.name):
failures.append("Pending migration")
for used_table in used_hive_tables.get(record.full_name, []):
failures.append(f"Used by {used_table.source_type}: {used_table.source_id}")
return replace(historical, failures=historical.failures + failures)
Loading
Loading