diff --git a/docs/table_persistence.md b/docs/table_persistence.md index 12cac2a2c6..85fa36fcbb 100644 --- a/docs/table_persistence.md +++ b/docs/table_persistence.md @@ -32,6 +32,8 @@ Table utilization per workflow: | udfs | RW | RW | RO | | | | | | logs | RW | | RW | RW | | RW | RW | | recon_results | | | | | | | RW | +| redash_dashboards | RW | | | | | | RW | +| lakeview_dashboards | RW | | | | | | RW | **RW** - Read/Write, the job generates or updates the table.
**RO** - Read Only @@ -139,3 +141,16 @@ This is used by the permission crawler. | object_type | string | type of object (NOTEBOOK, DIRECTORY, REPO, FILE, LIBRARY) | | path | string | full path of the object in the workspace | | language | string | language of the object (applicable for notebooks only) | + + +#### _$inventory_.redash_dashboards and _$inventory_.lakeview_dashboards + +Holds a list of all Redash or Lakeview dashboards. This is used by the `QueryLinter` and `Redash` migration. + +| Column | Datatype | Description | Comments | +|-----------|--------------|---------------------------------------------------------------------------------------------|----------| +| id | string | The ID for this dashboard. | | +| name | string | The title of the dashboard that appears in list views and at the top of the dashboard page. | | +| parent | string | The identifier of the workspace folder containing the object. | | +| query_ids | list[string] | The IDs of the queries referenced by this dashboard. | | +| tags | list[string] | The tags set on this dashboard. | | diff --git a/src/databricks/labs/ucx/assessment/dashboards.py b/src/databricks/labs/ucx/assessment/dashboards.py new file mode 100644 index 0000000000..67fe12e629 --- /dev/null +++ b/src/databricks/labs/ucx/assessment/dashboards.py @@ -0,0 +1,416 @@ +from __future__ import annotations + +import itertools +import json +import logging +from collections.abc import Iterable, Iterator +from dataclasses import dataclass, field + +from databricks.labs.lsql.backends import SqlBackend +from databricks.labs.lsql.lakeview import Dashboard as LsqlLakeviewDashboard, Dataset +from databricks.sdk import WorkspaceClient +from databricks.sdk.errors import DatabricksError +from databricks.sdk.service.dashboards import Dashboard as SdkLakeviewDashboard +from databricks.sdk.service.sql import Dashboard as SdkRedashDashboard, LegacyQuery + +from databricks.labs.ucx.framework.crawlers import CrawlerBase +from databricks.labs.ucx.framework.owners import AdministratorLocator, Ownership, WorkspacePathOwnership +from databricks.labs.ucx.framework.utils import escape_sql_identifier + + +logger = logging.getLogger(__name__) + + +@dataclass +class Query: + """UCX representation of a Query. + + Note: + This class is not persisted into an inventory table. If you decide to persist this class, consider (future) + differences between Redash and Lakeview queries + """ + + id: str + """The ID for this query.""" + + name: str | None = None + """The title of this query that appears in list views, widget headings, and on the query page.""" + + parent: str | None = None + """The identifier of the workspace folder containing the object.""" + + query: str | None = None + """The text of the query to be run.""" + + catalog: str | None = None + """The name of the catalog to execute this query in.""" + + schema: str | None = None + """The name of the schema to execute this query in.""" + + tags: list[str] = field(default_factory=list) + """The tags set on this dashboard.""" + + @classmethod + def from_legacy_query(cls, query: LegacyQuery) -> Query: + """Create query from a :class:LegacyQuery""" + if not query.id: + raise ValueError(f"Query id is required: {query}") + kwargs: dict[str, str | list[str]] = {"id": query.id} + if query.name: + kwargs["name"] = query.name + if query.parent: + kwargs["parent"] = query.parent + if query.query: + kwargs["query"] = query.query + if query.options and query.options.catalog: + kwargs["catalog"] = query.options.catalog + if query.options and query.options.schema: + kwargs["schema"] = query.options.schema + if query.tags: + kwargs["tags"] = query.tags + return cls(**kwargs) # type: ignore + + @classmethod + def from_lakeview_dataset(cls, dataset: Dataset, *, parent: str | None = None) -> Query: + """Create query from a :class:Dataset""" + if not dataset.name: + raise ValueError(f"Dataset name is required: {dataset}") + kwargs = {"id": dataset.name} + if dataset.display_name: + kwargs["name"] = dataset.display_name + if parent: + kwargs["parent"] = parent + if dataset.query: + kwargs["query"] = dataset.query + return cls(**kwargs) # type: ignore + + +@dataclass +class Dashboard: + """UCX representation of a dashboard.""" + + id: str + """The ID for this dashboard.""" + + name: str | None = None + """The title of the dashboard that appears in list views and at the top of the dashboard page.""" + + parent: str | None = None + """The identifier of the workspace folder containing the object.""" + + query_ids: list[str] = field(default_factory=list) + """The IDs of the queries referenced by this dashboard.""" + + tags: list[str] = field(default_factory=list) + """The tags set on this dashboard.""" + + creator_id: str | None = None + """The ID of the user who owns the dashboard.""" + + @classmethod + def from_sdk_redash_dashboard(cls, dashboard: SdkRedashDashboard) -> Dashboard: + assert dashboard.id + kwargs: dict[str, str | list[str] | None] = {"id": dashboard.id} + if dashboard.name: + kwargs["name"] = dashboard.name + if dashboard.parent: + kwargs["parent"] = dashboard.parent + if dashboard.tags: + kwargs["tags"] = dashboard.tags + if dashboard.user_id: + kwargs["creator_id"] = str(dashboard.user_id) + query_ids = [] + for widget in dashboard.widgets or []: + if widget.visualization is None: + continue + if widget.visualization.query is None: + continue + if widget.visualization.query.id is None: + continue + query_ids.append(widget.visualization.query.id) + if query_ids: + kwargs["query_ids"] = query_ids + return cls(**kwargs) # type: ignore + + @classmethod + def from_sdk_lakeview_dashboard(cls, dashboard: SdkLakeviewDashboard) -> Dashboard: + assert dashboard.dashboard_id + kwargs: dict[str, str | list[str] | None] = {"id": dashboard.dashboard_id} + if dashboard.display_name: + kwargs["name"] = dashboard.display_name + if dashboard.parent_path: + kwargs["parent"] = dashboard.parent_path + lsql_dashboard = _convert_sdk_to_lsql_lakeview_dashboard(dashboard) + query_ids = [dataset.name for dataset in lsql_dashboard.datasets] + if query_ids: + kwargs["query_ids"] = query_ids + return cls(**kwargs) # type: ignore + + +class RedashDashboardCrawler(CrawlerBase[Dashboard]): + """Crawler for Redash dashboards.""" + + def __init__( + self, + ws: WorkspaceClient, + sql_backend: SqlBackend, + schema: str, + *, + include_dashboard_ids: list[str] | None = None, + include_query_ids: list[str] | None = None, + debug_listing_upper_limit: int | None = None, + ): + super().__init__(sql_backend, "hive_metastore", schema, "redash_dashboards", Dashboard) + self._ws = ws + self._include_dashboard_ids = include_dashboard_ids + self._include_query_ids = include_query_ids + self._debug_listing_upper_limit = debug_listing_upper_limit + + def _crawl(self) -> Iterable[Dashboard]: + dashboards = [] + for sdk_dashboard in self._list_dashboards(): + if sdk_dashboard.id is None: + continue + dashboard = Dashboard.from_sdk_redash_dashboard(sdk_dashboard) + dashboards.append(dashboard) + return dashboards + + def _list_dashboards(self) -> list[SdkRedashDashboard]: + if self._include_dashboard_ids is not None: + return self._get_dashboards(*self._include_dashboard_ids) + try: + dashboards_iterator = self._ws.dashboards.list() + except DatabricksError as e: + logger.error("Cannot list Redash dashboards", exc_info=e) + return [] + dashboards: list[SdkRedashDashboard] = [] + # Redash APIs are very slow to paginate, especially for large number of dashboards, so we limit the listing + # to a small number of items in debug mode for the assessment workflow just to complete. + while self._debug_listing_upper_limit is None or self._debug_listing_upper_limit > len(dashboards): + try: + dashboards.append(next(dashboards_iterator)) + except StopIteration: + break + except DatabricksError as e: + logger.error("Cannot list next Redash dashboards page", exc_info=e) + break + return dashboards + + def _get_dashboards(self, *dashboard_ids: str) -> list[SdkRedashDashboard]: + dashboards = [] + for dashboard_id in dashboard_ids: + dashboard = self._get_dashboard(dashboard_id) + if dashboard: + dashboards.append(dashboard) + return dashboards + + def _get_dashboard(self, dashboard_id: str) -> SdkRedashDashboard | None: + try: + return self._ws.dashboards.get(dashboard_id) + except DatabricksError as e: + logger.warning(f"Cannot get Redash dashboard: {dashboard_id}", exc_info=e) + return None + + def _try_fetch(self) -> Iterable[Dashboard]: + for row in self._fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"): + yield Dashboard(*row) + + def list_queries(self, dashboard: Dashboard | None = None) -> Iterator[Query]: + """List queries. + + Args: + dashboard (DashboardType | None) : List queries for dashboard. If None, list all queries. + Defaults to None. + + Note: + This public method does not adhere to the common crawler layout, still, it is implemented to avoid/postpone + another crawler for the queries by retrieving the queries every time they are requested. + """ + for query in self._list_legacy_queries(dashboard): + yield Query.from_legacy_query(query) + + def _list_legacy_queries(self, dashboard: Dashboard | None = None) -> Iterator[LegacyQuery]: + """List legacy queries. + + Args: + dashboard (DashboardType | None) : List queries for dashboard. If None, list all queries. + Defaults to None. + """ + if dashboard: + queries_iterator = self._list_legacy_queries_from_dashboard(dashboard) + else: + queries_iterator = self._list_all_legacy_queries() + # Redash APIs are very slow to paginate, especially for large number of dashboards, so we limit the listing + # to a small number of items in debug mode for the assessment workflow just to complete. + counter = itertools.count() + while self._debug_listing_upper_limit is None or self._debug_listing_upper_limit > next(counter): + try: + yield next(queries_iterator) + except StopIteration: + break + + def _list_all_legacy_queries(self) -> Iterator[LegacyQuery]: + """List all queries.""" + if self._include_query_ids is not None: + yield from self._get_legacy_queries(*self._include_query_ids) + else: + try: + yield from self._ws.queries_legacy.list() + except DatabricksError as e: + logger.error("Cannot list Redash queries", exc_info=e) + + def _list_legacy_queries_from_dashboard(self, dashboard: Dashboard) -> Iterator[LegacyQuery]: + """List queries from dashboard.""" + if self._include_query_ids is not None: + query_ids = set(dashboard.query_ids) & set(self._include_query_ids) + else: + query_ids = set(dashboard.query_ids) + yield from self._get_legacy_queries(*query_ids) + + def _get_legacy_queries(self, *query_ids: str) -> Iterator[LegacyQuery]: + """Get a legacy queries.""" + for query_id in query_ids: + query = self._get_legacy_query(query_id) + if query: + yield query + + def _get_legacy_query(self, query_id: str) -> LegacyQuery | None: + """Get a legacy query.""" + try: + return self._ws.queries_legacy.get(query_id) + except DatabricksError as e: + logger.warning(f"Cannot get Redash query: {query_id}", exc_info=e) + return None + + +def _convert_sdk_to_lsql_lakeview_dashboard(dashboard: SdkLakeviewDashboard) -> LsqlLakeviewDashboard: + """Parse a lsql Lakeview dashboard from an SDK Lakeview dashboard. + + Returns : + LsqlLakeviewDashboard : The parsed dashboard. If the parsing fails, it is an empty dashboard, i.e. a + dashboard without datasets and pages. + """ + lsql_dashboard = LsqlLakeviewDashboard([], []) + if dashboard.serialized_dashboard is not None: + try: + lsql_dashboard = LsqlLakeviewDashboard.from_dict(json.loads(dashboard.serialized_dashboard)) + except (KeyError, ValueError) as e: + logger.warning(f"Error when parsing Lakeview dashboard: {dashboard.dashboard_id}", exc_info=e) + return lsql_dashboard + + +class LakeviewDashboardCrawler(CrawlerBase[Dashboard]): + """Crawler for Lakeview dashboards.""" + + def __init__( + self, + ws: WorkspaceClient, + sql_backend: SqlBackend, + schema: str, + *, + include_dashboard_ids: list[str] | None = None, + include_query_ids: list[str] | None = None, + ): + super().__init__(sql_backend, "hive_metastore", schema, "lakeview_dashboards", Dashboard) + self._ws = ws + self._include_dashboard_ids = include_dashboard_ids + self._include_query_ids = include_query_ids + + def _crawl(self) -> Iterable[Dashboard]: + dashboards = [] + for sdk_dashboard in self._list_dashboards(): + if sdk_dashboard.dashboard_id is None: + continue + dashboard = Dashboard.from_sdk_lakeview_dashboard(sdk_dashboard) + dashboards.append(dashboard) + return dashboards + + def _list_dashboards(self) -> list[SdkLakeviewDashboard]: + if self._include_dashboard_ids is not None: + return self._get_dashboards(*self._include_dashboard_ids) + try: + # If the API listing limit becomes an issue in testing, please see the `:class:RedashDashboardCrawler` + # for an example on how to implement a (debug) rate limit + return list(self._ws.lakeview.list()) # TODO: Add dashboard summary view? + except DatabricksError as e: + logger.error("Cannot list Lakeview dashboards", exc_info=e) + return [] + + def _get_dashboards(self, *dashboard_ids: str) -> list[SdkLakeviewDashboard]: + dashboards = [] + for dashboard_id in dashboard_ids: + dashboard = self._get_dashboard(dashboard_id) + if dashboard: + dashboards.append(dashboard) + return dashboards + + def _get_dashboard(self, dashboard_id: str) -> SdkLakeviewDashboard | None: + try: + return self._ws.lakeview.get(dashboard_id) + except DatabricksError as e: + logger.warning(f"Cannot get Lakeview dashboard: {dashboard_id}", exc_info=e) + return None + + def _try_fetch(self) -> Iterable[Dashboard]: + for row in self._fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"): + yield Dashboard(*row) + + def list_queries(self, dashboard: Dashboard | None = None) -> Iterator[Query]: + """List queries. + + Args: + dashboard (DashboardType | None) : List queries for dashboard. If None, list all queries. + Defaults to None. + + Note: + This public method does not adhere to the common crawler layout, still, it is implemented to avoid/postpone + another crawler for the queries by retrieving the queries every time they are requested. + + Different to the Redash crawler, Lakeview queries are part of the (serialized) dashboard definition. + """ + if dashboard: + sdk_dashboard = self._get_dashboard(dashboard_id=dashboard.id) + sdk_dashboards = [sdk_dashboard] if sdk_dashboard else [] + else: + sdk_dashboards = self._list_dashboards() + for sdk_dashboard in sdk_dashboards: + lsql_dashboard = _convert_sdk_to_lsql_lakeview_dashboard(sdk_dashboard) + for dataset in lsql_dashboard.datasets: + if self._include_query_ids is not None and dataset.name not in self._include_query_ids: + continue + yield Query.from_lakeview_dataset(dataset, parent=sdk_dashboard.dashboard_id) + + +class DashboardOwnership(Ownership[Dashboard]): + """Determine ownership of dashboard in the inventory. + + This is the dashboard creator (if known) otherwise the parent (path) owner (if known). + """ + + def __init__( + self, + administrator_locator: AdministratorLocator, + ws: WorkspaceClient, + workspace_path_ownership: WorkspacePathOwnership, + ) -> None: + super().__init__(administrator_locator) + self._ws = ws + self._workspace_path_ownership = workspace_path_ownership + + def _maybe_direct_owner(self, record: Dashboard) -> str | None: + if record.creator_id: + creator_name = self._get_user_name(record.creator_id) + if creator_name: + return creator_name + if record.parent: + return self._workspace_path_ownership.owner_of_path(record.parent) + return None + + def _get_user_name(self, user_id: str) -> str | None: + try: + user = self._ws.users.get(user_id) + return user.display_name or user.user_name + except DatabricksError as e: + logger.warning(f"Could not retrieve user: {user_id}", exc_info=e) + return None diff --git a/src/databricks/labs/ucx/assessment/workflows.py b/src/databricks/labs/ucx/assessment/workflows.py index be63b38074..cd0a00be2b 100644 --- a/src/databricks/labs/ucx/assessment/workflows.py +++ b/src/databricks/labs/ucx/assessment/workflows.py @@ -7,7 +7,7 @@ logger = logging.getLogger(__name__) -class Assessment(Workflow): +class Assessment(Workflow): # pylint: disable=too-many-public-methods def __init__(self): super().__init__('assessment') @@ -190,6 +190,16 @@ def crawl_groups(self, ctx: RuntimeContext): ctx.group_manager.snapshot() @job_task + def crawl_redash_dashboards(self, ctx: RuntimeContext): + """Scans all Redash dashboards.""" + ctx.redash_crawler.snapshot() + + @job_task + def crawl_lakeview_dashboards(self, ctx: RuntimeContext): + """Scans all Lakeview dashboards.""" + ctx.lakeview_crawler.snapshot() + + @job_task(depends_on=[crawl_redash_dashboards, crawl_lakeview_dashboards]) def assess_dashboards(self, ctx: RuntimeContext): """Scans all dashboards for migration issues in SQL code of embedded widgets. diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index f4aece8f8a..63889d8a99 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -766,14 +766,21 @@ def migrate_dbsql_dashboards( else: workspace_contexts = _get_workspace_contexts(w, a, run_as_collection) for workspace_context in workspace_contexts: - workspace_context.redash.migrate_dashboards(dashboard_id) + if dashboard_id: + workspace_context.redash.migrate_dashboards(dashboard_id) + else: + workspace_context.redash.migrate_dashboards() @ucx.command -def revert_dbsql_dashboards(w: WorkspaceClient, dashboard_id: str | None = None): +def revert_dbsql_dashboards(w: WorkspaceClient, dashboard_id: str | None = None, ctx: WorkspaceContext | None = None): """Revert migrated DBSQL Dashboard queries back to their original state""" - ctx = WorkspaceContext(w) - ctx.redash.revert_dashboards(dashboard_id) + ctx = ctx or WorkspaceContext(w) + ctx.redash_crawler.snapshot(force_refresh=True) # Need the latest tags before reverting dashboards + if dashboard_id: + ctx.redash.revert_dashboards(dashboard_id) + else: + ctx.redash.revert_dashboards() @ucx.command(is_account=True) diff --git a/src/databricks/labs/ucx/config.py b/src/databricks/labs/ucx/config.py index 370c0d854a..b7755baf9d 100644 --- a/src/databricks/labs/ucx/config.py +++ b/src/databricks/labs/ucx/config.py @@ -71,9 +71,12 @@ class WorkspaceConfig: # pylint: disable=too-many-instance-attributes # [INTERNAL ONLY] Whether the assessment should capture only specific object permissions. include_object_permissions: list[str] | None = None - # [INTERNAL ONLY] Whether the assessment should lint only specific dashboards. + # [INTERNAL ONLY] Limit the dashboards to the given list include_dashboard_ids: list[str] | None = None + # [INTERNAL ONLY] Limit the queries to the given list + include_query_ids: list[str] | None = None + enable_hms_federation: bool = False managed_table_external_storage: str = 'CLONE' diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index 82c75324d3..3bb70290d3 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -28,6 +28,7 @@ from databricks.labs.ucx.account.workspaces import WorkspaceInfo from databricks.labs.ucx.assessment.azure import AzureServicePrincipalCrawler +from databricks.labs.ucx.assessment.dashboards import LakeviewDashboardCrawler, RedashDashboardCrawler from databricks.labs.ucx.assessment.export import AssessmentExporter from databricks.labs.ucx.aws.credentials import CredentialManager from databricks.labs.ucx.config import WorkspaceConfig @@ -284,6 +285,27 @@ def table_ownership(self) -> TableOwnership: self.workspace_path_ownership, ) + @cached_property + def redash_crawler(self) -> RedashDashboardCrawler: + return RedashDashboardCrawler( + self.workspace_client, + self.sql_backend, + self.inventory_database, + include_dashboard_ids=self.config.include_dashboard_ids, + include_query_ids=self.config.include_query_ids, + debug_listing_upper_limit=self.config.debug_listing_upper_limit, + ) + + @cached_property + def lakeview_crawler(self) -> LakeviewDashboardCrawler: + return LakeviewDashboardCrawler( + self.workspace_client, + self.sql_backend, + self.inventory_database, + include_dashboard_ids=self.config.include_dashboard_ids, + include_query_ids=self.config.include_query_ids, + ) + @cached_property def default_securable_ownership(self) -> DefaultSecurableOwnership: # validate that the default_owner_group is set and is a valid group (the current user is a member) @@ -552,13 +574,12 @@ def workflow_linter(self) -> WorkflowLinter: @cached_property def query_linter(self) -> QueryLinter: return QueryLinter( - self.workspace_client, self.sql_backend, self.inventory_database, TableMigrationIndex([]), self.directfs_access_crawler_for_queries, self.used_tables_crawler_for_queries, - self.config.include_dashboard_ids, + [self.redash_crawler, self.lakeview_crawler], self.config.debug_listing_upper_limit, ) @@ -584,6 +605,7 @@ def redash(self) -> Redash: self.migration_status_refresher.index(), self.workspace_client, self.installation, + self.redash_crawler, ) @cached_property diff --git a/src/databricks/labs/ucx/install.py b/src/databricks/labs/ucx/install.py index d92403a58a..c32edeff84 100644 --- a/src/databricks/labs/ucx/install.py +++ b/src/databricks/labs/ucx/install.py @@ -47,9 +47,11 @@ SpotInstancePolicy, ) from databricks.sdk.useragent import with_extra + from databricks.labs.ucx.__about__ import __version__ from databricks.labs.ucx.assessment.azure import AzureServicePrincipalInfo from databricks.labs.ucx.assessment.clusters import ClusterInfo, PolicyInfo +from databricks.labs.ucx.assessment.dashboards import Dashboard from databricks.labs.ucx.assessment.init_scripts import GlobalInitScriptInfo from databricks.labs.ucx.assessment.jobs import JobInfo, SubmitRunInfo from databricks.labs.ucx.assessment.pipelines import PipelineInfo @@ -123,6 +125,8 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str): functools.partial(table, "used_tables_in_paths", UsedTable), functools.partial(table, "used_tables_in_queries", UsedTable), functools.partial(table, "inferred_grants", Grant), + functools.partial(table, "redash_dashboards", Dashboard), + functools.partial(table, "lakeview_dashboards", Dashboard), ], ) deployer.deploy_view("grant_detail", "queries/views/grant_detail.sql") diff --git a/src/databricks/labs/ucx/queries/assessment/main/38_0_dashboards.md b/src/databricks/labs/ucx/queries/assessment/main/38_0_dashboards.md new file mode 100644 index 0000000000..88125cd4c2 --- /dev/null +++ b/src/databricks/labs/ucx/queries/assessment/main/38_0_dashboards.md @@ -0,0 +1,8 @@ +--- +height: 4 +--- + +# Dashboards + +The table below displays the dashboards in the workspace. The dashboard queries are linted, these linting outcomes are +displayed in the tables above. diff --git a/src/databricks/labs/ucx/queries/assessment/main/38_1_dashboards.sql b/src/databricks/labs/ucx/queries/assessment/main/38_1_dashboards.sql new file mode 100644 index 0000000000..038ea2d1ae --- /dev/null +++ b/src/databricks/labs/ucx/queries/assessment/main/38_1_dashboards.sql @@ -0,0 +1,32 @@ +/* +--title 'Dashboards' +--width 6 +--overrides '{"spec": { + "encodings": { + "columns": [ + {"fieldName": "dashboard_type", "title": "Type", "type": "string", "displayAs": "string", "booleanValues": ["false", "true"]}, + {"fieldName": "name", "title": "Name", "type": "string", "displayAs": "link", "linkUrlTemplate": "{{ dashboard_link }}", "linkTextTemplate": "{{ @ }}", "linkTitleTemplate": "{{ @ }}", "linkOpenInNewTab": true, "booleanValues": ["false", "true"]} + ] + }, + "invisibleColumns": [ + {"fieldName": "dashboard_link", "title": "dashboard_link", "type": "string", "displayAs": "string", "booleanValues": ["false", "true"]} + ] + }}' +*/ +SELECT + dashboard_type, + name, + dashboard_link +FROM ( + SELECT + 'Redash' AS dashboard_type, + name, + CONCAT('/sql/dashboards/', id) AS dashboard_link + FROM inventory.redash_dashboards + UNION ALL + SELECT + 'Lakeview' AS dashboard_type, + name, + CONCAT('/dashboardsv3/', id, '/published') AS dashboard_link + FROM inventory.lakeview_dashboards +) diff --git a/src/databricks/labs/ucx/source_code/base.py b/src/databricks/labs/ucx/source_code/base.py index 0e4f18230f..d02dbcdb70 100644 --- a/src/databricks/labs/ucx/source_code/base.py +++ b/src/databricks/labs/ucx/source_code/base.py @@ -195,10 +195,17 @@ def from_dict(cls, data: dict[str, Any]) -> Self: UNKNOWN = "unknown" source_id: str = UNKNOWN - source_timestamp: datetime = datetime.fromtimestamp(0) + + source_timestamp: datetime = field(default_factory=lambda: datetime.fromtimestamp(0), compare=False) + """Unused attribute, kept for legacy reasons""" + source_lineage: list[LineageAtom] = field(default_factory=list) - assessment_start_timestamp: datetime = datetime.fromtimestamp(0) - assessment_end_timestamp: datetime = datetime.fromtimestamp(0) + + assessment_start_timestamp: datetime = field(default_factory=lambda: datetime.fromtimestamp(0), compare=False) + """Unused attribute, kept for legacy reasons""" + + assessment_end_timestamp: datetime = field(default_factory=lambda: datetime.fromtimestamp(0), compare=False) + """Unused attribute, kept for legacy reasons""" def replace_source( self, diff --git a/src/databricks/labs/ucx/source_code/queries.py b/src/databricks/labs/ucx/source_code/queries.py index fda1de768a..aca7e6e0a2 100644 --- a/src/databricks/labs/ucx/source_code/queries.py +++ b/src/databricks/labs/ucx/source_code/queries.py @@ -4,18 +4,16 @@ from dataclasses import dataclass, field from datetime import datetime, timezone -from databricks.sdk import WorkspaceClient -from databricks.sdk.service.sql import Dashboard, LegacyQuery from databricks.sdk.service.workspace import Language from databricks.labs.lsql.backends import SqlBackend +from databricks.labs.ucx.assessment.dashboards import Dashboard, LakeviewDashboardCrawler, RedashDashboardCrawler, Query from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex from databricks.labs.ucx.source_code.base import CurrentSessionState, LineageAtom, UsedTable from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler, DirectFsAccess from databricks.labs.ucx.source_code.linters.context import LinterContext -from databricks.labs.ucx.source_code.redash import Redash from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler logger = logging.getLogger(__name__) @@ -45,21 +43,19 @@ class QueryLinter: def __init__( self, - ws: WorkspaceClient, sql_backend: SqlBackend, inventory_database: str, migration_index: TableMigrationIndex, directfs_crawler: DirectFsAccessCrawler, used_tables_crawler: UsedTablesCrawler, - include_dashboard_ids: list[str] | None, + dashboard_crawlers: list[LakeviewDashboardCrawler | RedashDashboardCrawler], debug_listing_upper_limit: int | None = None, ): - self._ws = ws self._sql_backend = sql_backend self._migration_index = migration_index self._directfs_crawler = directfs_crawler self._used_tables_crawler = used_tables_crawler - self._include_dashboard_ids = include_dashboard_ids + self._dashboard_crawlers = dashboard_crawlers self._debug_listing_upper_limit = debug_listing_upper_limit self._catalog = "hive_metastore" @@ -127,20 +123,29 @@ def _dump_used_tables( self._used_tables_crawler.dump_all(processed_tables) def _lint_dashboards(self, context: _ReportingContext) -> None: - for dashboard_id in self._dashboard_ids_in_scope(): - dashboard = self._ws.dashboards.get(dashboard_id=dashboard_id) - logger.info(f"Linting dashboard_id={dashboard_id}: {dashboard.name}") - problems, dfsas, tables = self._lint_and_collect_from_dashboard(dashboard, context.linted_queries) + for dashboard, queries in self._list_dashboards_with_queries(): + logger.info(f"Linting dashboard: {dashboard.name} ({dashboard.id})") + queries_to_lint = [] + for query in queries: + if query.id in context.linted_queries: + continue + queries_to_lint.append(query) + context.linted_queries.add(query.id) + problems, dfsas, tables = self._lint_dashboard_with_queries(dashboard, queries_to_lint) context.all_problems.extend(problems) context.all_dfsas.extend(dfsas) context.all_tables.extend(tables) + def _list_dashboards_with_queries(self) -> Iterable[tuple[Dashboard, list[Query]]]: + for crawler in self._dashboard_crawlers: + for dashboard in crawler.snapshot(): + yield dashboard, list(crawler.list_queries(dashboard)) + def _lint_queries(self, context: _ReportingContext) -> None: - for query in self._queries_in_scope(): - assert query.id is not None + for query in self._list_queries(): if query.id in context.linted_queries: continue - logger.info(f"Linting query_id={query.id}: {query.name}") + logger.info(f"Linting query: {query.name} ({query.id})") context.linted_queries.add(query.id) problems = self.lint_query(query) context.all_problems.extend(problems) @@ -149,140 +154,84 @@ def _lint_queries(self, context: _ReportingContext) -> None: tables = self.collect_used_tables_from_query("no-dashboard-id", query) context.all_tables.extend(tables) - def _dashboard_ids_in_scope(self) -> list[str]: - if self._include_dashboard_ids is not None: # an empty list is accepted - return self._include_dashboard_ids - items_listed = 0 - dashboard_ids = [] - # redash APIs are very slow to paginate, especially for large number of dashboards, so we limit the listing - # to a small number of items in debug mode for the assessment workflow just to complete. - for dashboard in self._ws.dashboards.list(): - if self._debug_listing_upper_limit is not None and items_listed >= self._debug_listing_upper_limit: - logger.warning(f"Debug listing limit reached: {self._debug_listing_upper_limit}") - break - if dashboard.id is None: - continue - dashboard_ids.append(dashboard.id) - items_listed += 1 - return dashboard_ids - - def _queries_in_scope(self) -> list[LegacyQuery]: - if self._include_dashboard_ids is not None: # an empty list is accepted - return [] - items_listed = 0 - legacy_queries = [] - for query in self._ws.queries_legacy.list(): - if self._debug_listing_upper_limit is not None and items_listed >= self._debug_listing_upper_limit: - logger.warning(f"Debug listing limit reached: {self._debug_listing_upper_limit}") - break - legacy_queries.append(query) - items_listed += 1 - return legacy_queries + def _list_queries(self) -> Iterable[Query]: + for crawler in self._dashboard_crawlers: + yield from crawler.list_queries() - def _lint_and_collect_from_dashboard( - self, dashboard: Dashboard, linted_queries: set[str] + def _lint_dashboard_with_queries( + self, dashboard: Dashboard, queries: list[Query] ) -> tuple[Iterable[QueryProblem], Iterable[DirectFsAccess], Iterable[UsedTable]]: - dashboard_queries = Redash.get_queries_from_dashboard(dashboard) query_problems: list[QueryProblem] = [] query_dfsas: list[DirectFsAccess] = [] query_tables: list[UsedTable] = [] - dashboard_id = dashboard.id or "" - dashboard_parent = dashboard.parent or "" - dashboard_name = dashboard.name or "" - for query in dashboard_queries: - if query.id is None: - continue - if query.id in linted_queries: - continue - linted_queries.add(query.id) + for query in queries: problems = self.lint_query(query) for problem in problems: query_problems.append( dataclasses.replace( problem, - dashboard_id=dashboard_id, - dashboard_parent=dashboard_parent, - dashboard_name=dashboard_name, + dashboard_id=dashboard.id, + dashboard_parent=dashboard.parent or "PARENT", + dashboard_name=dashboard.name or "UNKNOWN", ) ) - dfsas = self.collect_dfsas_from_query(dashboard_id, query) + dfsas = self.collect_dfsas_from_query(dashboard.id, query) for dfsa in dfsas: atom = LineageAtom( object_type="DASHBOARD", - object_id=dashboard_id, - other={"parent": dashboard_parent, "name": dashboard_name}, + object_id=dashboard.id, + other={"parent": dashboard.parent or "PARENT", "name": dashboard.name or "UNKNOWN"}, ) source_lineage = [atom] + dfsa.source_lineage query_dfsas.append(dataclasses.replace(dfsa, source_lineage=source_lineage)) - tables = self.collect_used_tables_from_query(dashboard_id, query) + tables = self.collect_used_tables_from_query(dashboard.id, query) for table in tables: atom = LineageAtom( object_type="DASHBOARD", - object_id=dashboard_id, - other={"parent": dashboard_parent, "name": dashboard_name}, + object_id=dashboard.id, + other={"parent": dashboard.parent or "PARENT", "name": dashboard.name or "UNKNOWN"}, ) source_lineage = [atom] + table.source_lineage query_tables.append(dataclasses.replace(table, source_lineage=source_lineage)) return query_problems, query_dfsas, query_tables - def lint_query(self, query: LegacyQuery) -> Iterable[QueryProblem]: + def lint_query(self, query: Query) -> Iterable[QueryProblem]: if not query.query: return ctx = LinterContext(self._migration_index, CurrentSessionState()) linter = ctx.linter(Language.SQL) - query_id = query.id or "" - query_parent = query.parent or "" - query_name = query.name or "" for advice in linter.lint(query.query): yield QueryProblem( dashboard_id="", dashboard_parent="", dashboard_name="", - query_id=query_id, - query_parent=query_parent, - query_name=query_name, + query_id=query.id, + query_parent=query.parent or "PARENT", + query_name=query.name or "UNKNOWN", code=advice.code, message=advice.message, ) - def collect_dfsas_from_query(self, dashboard_id: str, query: LegacyQuery) -> Iterable[DirectFsAccess]: - if query.query is None: + def collect_dfsas_from_query(self, dashboard_id: str, query: Query) -> Iterable[DirectFsAccess]: + if not query.query: return ctx = LinterContext(self._migration_index, CurrentSessionState()) collector = ctx.dfsa_collector(Language.SQL) source_id = f"{dashboard_id}/{query.id}" - source_name = query.name or "" - source_timestamp = self._read_timestamp(query.updated_at) - source_lineage = [LineageAtom(object_type="QUERY", object_id=source_id, other={"name": source_name})] + source_lineage = [ + LineageAtom(object_type="QUERY", object_id=source_id, other={"name": query.name or "UNKNOWN"}) + ] for dfsa in collector.collect_dfsas(query.query): - yield dfsa.replace_source( - source_id=source_id, source_timestamp=source_timestamp, source_lineage=source_lineage - ) + yield dfsa.replace_source(source_id=source_id, source_lineage=source_lineage) - def collect_used_tables_from_query(self, dashboard_id: str, query: LegacyQuery) -> Iterable[UsedTable]: - if query.query is None: + def collect_used_tables_from_query(self, dashboard_id: str, query: Query) -> Iterable[UsedTable]: + if not query.query: return ctx = LinterContext(self._migration_index, CurrentSessionState()) collector = ctx.tables_collector(Language.SQL) source_id = f"{dashboard_id}/{query.id}" - source_name = query.name or "" - source_timestamp = self._read_timestamp(query.updated_at) - source_lineage = [LineageAtom(object_type="QUERY", object_id=source_id, other={"name": source_name})] + source_lineage = [ + LineageAtom(object_type="QUERY", object_id=source_id, other={"name": query.name or "UNKNOWN"}) + ] for table in collector.collect_tables(query.query): - yield table.replace_source( - source_id=source_id, source_timestamp=source_timestamp, source_lineage=source_lineage - ) - - @classmethod - def _read_timestamp(cls, timestamp: str | None) -> datetime: - if timestamp is not None: - methods = [ - datetime.fromisoformat, - lambda s: datetime.fromisoformat(s[:-1]), # ipython breaks on final 'Z' - ] - for method in methods: - try: - return method(timestamp) - except ValueError: - pass - return datetime.now() + yield table.replace_source(source_id=source_id, source_lineage=source_lineage) diff --git a/src/databricks/labs/ucx/source_code/redash.py b/src/databricks/labs/ucx/source_code/redash.py index afac0491ed..1ceede32c2 100644 --- a/src/databricks/labs/ucx/source_code/redash.py +++ b/src/databricks/labs/ucx/source_code/redash.py @@ -1,13 +1,13 @@ import logging -from collections.abc import Iterator from dataclasses import replace -from databricks.labs.blueprint.installation import Installation +from databricks.labs.blueprint.installation import Installation, SerdeError from databricks.sdk import WorkspaceClient -from databricks.sdk.service.sql import Dashboard, LegacyQuery, UpdateQueryRequestQuery +from databricks.sdk.service.sql import LegacyQuery, UpdateQueryRequestQuery from databricks.sdk.errors.platform import DatabricksError +from databricks.labs.ucx.assessment.dashboards import Dashboard, RedashDashboardCrawler, Query from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex from databricks.labs.ucx.source_code.base import CurrentSessionState from databricks.labs.ucx.source_code.linters.from_table import FromTableSqlLinter @@ -18,41 +18,43 @@ class Redash: MIGRATED_TAG = "Migrated by UCX" - def __init__(self, index: TableMigrationIndex, ws: WorkspaceClient, installation: Installation): + def __init__( + self, + index: TableMigrationIndex, + ws: WorkspaceClient, + installation: Installation, + dashboard_crawler: RedashDashboardCrawler, + ): self._index = index self._ws = ws self._installation = installation + self._crawler = dashboard_crawler - def migrate_dashboards(self, dashboard_id: str | None = None) -> None: - for dashboard in self._list_dashboards(dashboard_id): - assert dashboard.id is not None - if dashboard.tags is not None and self.MIGRATED_TAG in dashboard.tags: + def migrate_dashboards(self, *dashboard_ids: str) -> None: + for dashboard in self._list_dashboards(*dashboard_ids): + if self.MIGRATED_TAG in dashboard.tags: logger.debug(f"Dashboard {dashboard.name} already migrated by UCX") continue - for query in self.get_queries_from_dashboard(dashboard): + for query in self._crawler.list_queries(dashboard): self._fix_query(query) self._ws.dashboards.update(dashboard.id, tags=self._get_migrated_tags(dashboard.tags)) - def revert_dashboards(self, dashboard_id: str | None = None) -> None: - for dashboard in self._list_dashboards(dashboard_id): - assert dashboard.id is not None - if dashboard.tags is None or self.MIGRATED_TAG not in dashboard.tags: + def revert_dashboards(self, *dashboard_ids: str) -> None: + for dashboard in self._list_dashboards(*dashboard_ids): # Refresh for up-to-date tags + if self.MIGRATED_TAG not in dashboard.tags: logger.debug(f"Dashboard {dashboard.name} was not migrated by UCX") continue - for query in self.get_queries_from_dashboard(dashboard): + for query in self._crawler.list_queries(dashboard): self._revert_query(query) self._ws.dashboards.update(dashboard.id, tags=self._get_original_tags(dashboard.tags)) - def _list_dashboards(self, dashboard_id: str | None) -> list[Dashboard]: - try: - if dashboard_id is None: - return list(self._ws.dashboards.list()) - return [self._ws.dashboards.get(dashboard_id)] - except DatabricksError as e: - logger.warning(f"Cannot list dashboards: {e}") - return [] + def _list_dashboards(self, *dashboard_ids: str) -> list[Dashboard]: + """List the Redash dashboards.""" + # Cached property is not used as this class in used from the CLI, thus called once per Python process + dashboards = [d for d in self._crawler.snapshot() if not dashboard_ids or d.id in dashboard_ids] + return dashboards - def _fix_query(self, query: LegacyQuery) -> None: + def _fix_query(self, query: Query) -> None: assert query.id is not None assert query.query is not None # query already migrated @@ -76,35 +78,27 @@ def _fix_query(self, query: LegacyQuery) -> None: return @staticmethod - def _get_session_state(query: LegacyQuery) -> CurrentSessionState: + def _get_session_state(query: Query) -> CurrentSessionState: session_state = CurrentSessionState() - if query.options is None: - return session_state - if query.options.catalog: - session_state = replace(session_state, catalog=query.options.catalog) - if query.options.schema: - session_state = replace(session_state, schema=query.options.schema) + if query.catalog: + session_state = replace(session_state, catalog=query.catalog) + if query.schema: + session_state = replace(session_state, schema=query.schema) return session_state - def _revert_query(self, query: LegacyQuery) -> None: + def _revert_query(self, query: Query) -> None: assert query.id is not None assert query.query is not None - if query.tags is None: - return - # find the backup query - is_migrated = False - for tag in query.tags: - if tag == self.MIGRATED_TAG: - is_migrated = True - - if not is_migrated: + if self.MIGRATED_TAG not in (query.tags or []): logger.debug(f"Query {query.name} was not migrated by UCX") return - - backup_query = self._installation.load(LegacyQuery, filename=f'backup/queries/{query.id}.json') - update_query = UpdateQueryRequestQuery( - query_text=backup_query.query, tags=self._get_original_tags(backup_query.tags) - ) + backup_query: Query | LegacyQuery + try: + backup_query = self._installation.load(Query, filename=f'backup/queries/{query.id}.json') + except SerdeError: + # Previous versions store queries as LegacyQuery + backup_query = self._installation.load(LegacyQuery, filename=f'backup/queries/{query.id}.json') + update_query = UpdateQueryRequestQuery(query_text=backup_query.query, tags=self._get_original_tags(query.tags)) try: self._ws.queries.update(query.id, update_mask="query_text,tags", query=update_query) except DatabricksError: @@ -121,16 +115,3 @@ def _get_original_tags(self, tags: list[str] | None) -> list[str] | None: if tags is None: return None return [tag for tag in tags if tag != self.MIGRATED_TAG] - - @staticmethod - def get_queries_from_dashboard(dashboard: Dashboard) -> Iterator[LegacyQuery]: - if dashboard.widgets is None: - return - for widget in dashboard.widgets: - if widget is None: - continue - if widget.visualization is None: - continue - if widget.visualization.query is None: - continue - yield widget.visualization.query diff --git a/tests/integration/assessment/test_dashboards.py b/tests/integration/assessment/test_dashboards.py index c94ef21a50..cf84afb4bf 100644 --- a/tests/integration/assessment/test_dashboards.py +++ b/tests/integration/assessment/test_dashboards.py @@ -1,154 +1,68 @@ -from datetime import datetime, timezone, timedelta - -import pytest - -from databricks.labs.ucx.source_code.base import LineageAtom, UsedTable -from databricks.labs.ucx.source_code.directfs_access import DirectFsAccess -from databricks.labs.ucx.source_code.jobs import JobProblem -from databricks.sdk.service.iam import PermissionLevel - -from databricks.labs.ucx.source_code.queries import QueryProblem - - -def _populate_workflow_problems(installation_ctx): - job_problems = [ - JobProblem( - job_id=12345, - job_name="Peter the Job", - task_key="23456", - path="parent/child.py", - code="sql-parse-error", - message="Could not parse SQL", - start_line=1234, - start_col=22, - end_line=1234, - end_col=32, - ) - ] - installation_ctx.sql_backend.save_table( - f'{installation_ctx.inventory_database}.workflow_problems', - job_problems, - JobProblem, - mode='overwrite', - ) +from databricks.sdk.service.sql import Dashboard as SdkRedashDashboard +from databricks.sdk.service.dashboards import Dashboard as SdkLakeviewDashboard +from databricks.labs.ucx.assessment.dashboards import ( + LakeviewDashboardCrawler, + Dashboard, + RedashDashboardCrawler, +) + + +def test_redash_dashboard_crawler_crawls_dashboards(ws, make_dashboard, inventory_schema, sql_backend) -> None: + dashboard: SdkRedashDashboard = make_dashboard() + crawler = RedashDashboardCrawler(ws, sql_backend, inventory_schema) + + dashboards = list(crawler.snapshot()) + + assert len(dashboards) >= 1 + assert dashboard.id in {d.id for d in dashboards}, f"Missing dashboard: {dashboard.id}" + + +def test_redash_dashboard_crawler_crawls_dashboard(ws, make_dashboard, inventory_schema, sql_backend) -> None: + dashboard: SdkRedashDashboard = make_dashboard() + assert dashboard.id + make_dashboard() # Ignore second dashboard + crawler = RedashDashboardCrawler(ws, sql_backend, inventory_schema, include_dashboard_ids=[dashboard.id]) + + dashboards = list(crawler.snapshot()) + + assert dashboards == [Dashboard.from_sdk_redash_dashboard(dashboard)] -def _populate_dashboard_problems(installation_ctx): - query_problems = [ - QueryProblem( - dashboard_id="12345", - dashboard_parent="dashbards/parent", - dashboard_name="my_dashboard", - query_id="23456", - query_parent="queries/parent", - query_name="my_query", - code="sql-parse-error", - message="Could not parse SQL", - ) - ] - installation_ctx.sql_backend.save_table( - f'{installation_ctx.inventory_database}.query_problems', - query_problems, - QueryProblem, - mode='overwrite', - ) +def test_redash_dashboard_crawler_crawls_dashboards_with_debug_listing_upper_limit( + ws, make_dashboard, inventory_schema, sql_backend +) -> None: + for _ in range(2): # Create two dashboards, expect one to be snapshotted due to upper limit below + make_dashboard() + crawler = RedashDashboardCrawler(ws, sql_backend, inventory_schema, debug_listing_upper_limit=1) -def _populate_directfs_problems(installation_ctx): - dfsas = [ - DirectFsAccess( - path="some_path", - is_read=False, - is_write=True, - source_id="xyz.py", - source_timestamp=datetime.now(timezone.utc) - timedelta(hours=2.0), - source_lineage=[ - LineageAtom(object_type="WORKFLOW", object_id="my_workflow_id", other={"name": "my_workflow"}), - LineageAtom(object_type="TASK", object_id="my_workflow_id/my_task_id"), - LineageAtom(object_type="NOTEBOOK", object_id="my_notebook_path"), - LineageAtom(object_type="FILE", object_id="my file_path"), - ], - assessment_start_timestamp=datetime.now(timezone.utc) - timedelta(minutes=5.0), - assessment_end_timestamp=datetime.now(timezone.utc) - timedelta(minutes=2.0), - ) - ] - installation_ctx.directfs_access_crawler_for_paths.dump_all(dfsas) - dfsas = [ - DirectFsAccess( - path="some_path", - is_read=False, - is_write=True, - source_id="xyz.py", - source_timestamp=datetime.now(timezone.utc) - timedelta(hours=2.0), - source_lineage=[ - LineageAtom(object_type="DASHBOARD", object_id="my_dashboard_id", other={"name": "my_dashboard"}), - LineageAtom(object_type="QUERY", object_id="my_dashboard_id/my_query_id", other={"name": "my_query"}), - ], - assessment_start_timestamp=datetime.now(timezone.utc) - timedelta(minutes=5.0), - assessment_end_timestamp=datetime.now(timezone.utc) - timedelta(minutes=2.0), - ) - ] - installation_ctx.directfs_access_crawler_for_queries.dump_all(dfsas) - - -def _populate_used_tables(installation_ctx): - tables = [ - UsedTable( - catalog_name="hive_metastore", - schema_name="staff_db", - table_name="employees", - is_read=False, - is_write=True, - source_id="xyz.py", - source_timestamp=datetime.now(timezone.utc) - timedelta(hours=2.0), - source_lineage=[ - LineageAtom(object_type="WORKFLOW", object_id="my_workflow_id", other={"name": "my_workflow"}), - LineageAtom(object_type="TASK", object_id="my_workflow_id/my_task_id"), - LineageAtom(object_type="NOTEBOOK", object_id="my_notebook_path"), - LineageAtom(object_type="FILE", object_id="my file_path"), - ], - assessment_start_timestamp=datetime.now(timezone.utc) - timedelta(minutes=5.0), - assessment_end_timestamp=datetime.now(timezone.utc) - timedelta(minutes=2.0), - ) - ] - installation_ctx.used_tables_crawler_for_paths.dump_all(tables) - tables = [ - UsedTable( - catalog_name="hive_metastore", - schema_name="customers_db", - table_name="customers", - is_read=False, - is_write=True, - source_id="xyz.py", - source_timestamp=datetime.now(timezone.utc) - timedelta(hours=2.0), - source_lineage=[ - LineageAtom(object_type="DASHBOARD", object_id="my_dashboard_id", other={"name": "my_dashboard"}), - LineageAtom(object_type="QUERY", object_id="my_dashboard_id/my_query_id", other={"name": "my_query"}), - ], - assessment_start_timestamp=datetime.now(timezone.utc) - timedelta(minutes=5.0), - assessment_end_timestamp=datetime.now(timezone.utc) - timedelta(minutes=2.0), - ) - ] - installation_ctx.used_tables_crawler_for_queries.dump_all(tables) - - -@pytest.mark.skip("Development tool") -def test_dashboard_with_prepopulated_data(installation_ctx, make_cluster_policy, make_cluster_policy_permissions): - """the purpose of this test is to prepopulate data used by the dashboard without running an actual -lengthy- assessment""" - ucx_group, _ = installation_ctx.make_ucx_group() - cluster_policy = make_cluster_policy() - make_cluster_policy_permissions( - object_id=cluster_policy.policy_id, - permission_level=PermissionLevel.CAN_USE, - group_name=ucx_group.display_name, + dashboards = list(crawler.snapshot()) + + assert len(dashboards) == 1 + + +def test_lakeview_dashboard_crawler_crawls_dashboards( + ws, make_lakeview_dashboard, inventory_schema, sql_backend +) -> None: + dashboard: SdkLakeviewDashboard = make_lakeview_dashboard() + crawler = LakeviewDashboardCrawler(ws, sql_backend, inventory_schema) + + dashboards = list(crawler.snapshot()) + + assert len(dashboards) >= 1 + assert dashboard.dashboard_id in {d.id for d in dashboards}, f"Missing dashboard: {dashboard.dashboard_id}" + + +def test_lakeview_dashboard_crawler_crawls_dashboard( + ws, make_lakeview_dashboard, inventory_schema, sql_backend +) -> None: + dashboard: SdkLakeviewDashboard = make_lakeview_dashboard() + assert dashboard.dashboard_id + make_lakeview_dashboard() # Ignore second dashboard + crawler = LakeviewDashboardCrawler( + ws, sql_backend, inventory_schema, include_dashboard_ids=[dashboard.dashboard_id] ) - installation_ctx.__dict__['include_object_permissions'] = [f"cluster-policies:{cluster_policy.policy_id}"] - installation_ctx.workspace_installation.run() - print(f"\nInventory database is {installation_ctx.inventory_database}\n") - # populate data - _populate_workflow_problems(installation_ctx) - _populate_dashboard_problems(installation_ctx) - _populate_directfs_problems(installation_ctx) - _populate_used_tables(installation_ctx) - # put a breakpoint here - print("Put a breakpoint here! Then go check the dashboard in your workspace ;-)\n") + + dashboards = list(crawler.snapshot()) + + assert dashboards == [Dashboard.from_sdk_lakeview_dashboard(dashboard)] diff --git a/tests/integration/assessment/test_workflows.py b/tests/integration/assessment/test_workflows.py index fdc1be4481..62d5e58371 100644 --- a/tests/integration/assessment/test_workflows.py +++ b/tests/integration/assessment/test_workflows.py @@ -31,6 +31,7 @@ def test_running_real_assessment_job( tmp_table = installation_ctx.make_table(schema_name=source_schema.name, ctas="SELECT 2+2 AS four") view = installation_ctx.make_table(schema_name=source_schema.name, ctas="SELECT 2+2 AS four", view=True) non_delta = installation_ctx.make_table(schema_name=source_schema.name, non_delta=True) + installation_ctx.make_linting_resources() installation_ctx.workspace_installation.run() diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 62246637c1..300f34f80c 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -22,13 +22,13 @@ from databricks.sdk import AccountClient, WorkspaceClient from databricks.sdk.errors import NotFound from databricks.sdk.retries import retried -from databricks.sdk.service import iam, dashboards +from databricks.sdk.service import iam from databricks.sdk.service.catalog import FunctionInfo, SchemaInfo, TableInfo from databricks.sdk.service.compute import CreatePolicyResponse -from databricks.sdk.service.dashboards import Dashboard as SDKDashboard +from databricks.sdk.service.dashboards import Dashboard as SdkLakeviewDashboard from databricks.sdk.service.iam import Group from databricks.sdk.service.jobs import Job, SparkPythonTask -from databricks.sdk.service.sql import Dashboard, WidgetPosition, WidgetOptions, LegacyQuery +from databricks.sdk.service.sql import Dashboard as SdkRedashDashboard, WidgetPosition, WidgetOptions, LegacyQuery from databricks.labs.ucx.__about__ import __version__ from databricks.labs.ucx.account.workspaces import AccountWorkspaces @@ -79,46 +79,47 @@ def inventory_schema(make_schema): def make_lakeview_dashboard(ws, make_random, env_or_skip, watchdog_purge_suffix): """Create a lakeview dashboard.""" warehouse_id = env_or_skip("TEST_DEFAULT_WAREHOUSE_ID") - serialized_dashboard = { - "datasets": [{"name": "fourtytwo", "displayName": "count", "query": "SELECT 42 AS count"}], - "pages": [ - { - "name": "count", - "displayName": "Counter", - "layout": [ - { - "widget": { - "name": "counter", - "queries": [ - { - "name": "main_query", - "query": { - "datasetName": "fourtytwo", - "fields": [{"name": "count", "expression": "`count`"}], - "disaggregated": True, - }, - } - ], - "spec": { - "version": 2, - "widgetType": "counter", - "encodings": {"value": {"fieldName": "count", "displayName": "count"}}, + + def create(*, display_name: str = "", query: str = "SELECT 42 AS count") -> SdkLakeviewDashboard: + serialized_dashboard = { + "datasets": [{"name": "query", "displayName": "count", "query": query}], + "pages": [ + { + "name": "count", + "displayName": "Counter", + "layout": [ + { + "widget": { + "name": "counter", + "queries": [ + { + "name": "main_query", + "query": { + "datasetName": "query", + "fields": [{"name": "count", "expression": "`count`"}], + "disaggregated": True, + }, + } + ], + "spec": { + "version": 2, + "widgetType": "counter", + "encodings": {"value": {"fieldName": "count", "displayName": "count"}}, + }, }, - }, - "position": {"x": 0, "y": 0, "width": 1, "height": 3}, - } - ], - } - ], - } + "position": {"x": 0, "y": 0, "width": 1, "height": 3}, + } + ], + } + ], + } - def create(display_name: str = "") -> SDKDashboard: if display_name: display_name = f"{display_name} ({make_random()})" else: display_name = f"created_by_ucx_{make_random()}_{watchdog_purge_suffix}" dashboard = ws.lakeview.create( - dashboard=dashboards.Dashboard( + dashboard=SdkLakeviewDashboard( display_name=display_name, serialized_dashboard=json.dumps(serialized_dashboard), warehouse_id=warehouse_id, @@ -127,7 +128,7 @@ def create(display_name: str = "") -> SDKDashboard: ws.lakeview.publish(dashboard.dashboard_id) return dashboard - def delete(dashboard: SDKDashboard) -> None: + def delete(dashboard: SdkLakeviewDashboard) -> None: ws.lakeview.trash(dashboard.dashboard_id) yield from factory("dashboard", create, delete) @@ -144,7 +145,7 @@ def make_dashboard( This fixture is used to test migrating legacy dashboards to Lakeview. """ - def create(query: LegacyQuery | None = None) -> Dashboard: + def create(query: LegacyQuery | None = None) -> SdkRedashDashboard: if not query: query = make_query() assert query @@ -181,9 +182,9 @@ def create(query: LegacyQuery | None = None) -> Dashboard: ), ) logger.info(f"Dashboard Created {dashboard_name}: {ws.config.host}/sql/dashboards/{dashboard.id}") - return dashboard + return ws.dashboards.get(dashboard.id) # Dashboard with widget - def remove(dashboard: Dashboard) -> None: + def remove(dashboard: SdkRedashDashboard) -> None: try: assert dashboard.id is not None ws.dashboards.delete(dashboard_id=dashboard.id) @@ -466,6 +467,7 @@ def __init__( # pylint: disable=too-many-arguments make_notebook_fixture, make_query_fixture, make_dashboard_fixture, + make_lakeview_dashboard_fixture, make_cluster_policy_fixture, make_cluster_policy_permissions_fixture, env_or_skip_fixture, @@ -488,6 +490,7 @@ def __init__( # pylint: disable=too-many-arguments self._make_notebook = make_notebook_fixture self._make_query = make_query_fixture self._make_dashboard = make_dashboard_fixture + self._make_lakeview_dashboard = make_lakeview_dashboard_fixture self._make_cluster_policy = make_cluster_policy_fixture self._make_cluster_policy_permissions = make_cluster_policy_permissions_fixture self._env_or_skip = env_or_skip_fixture @@ -497,7 +500,9 @@ def __init__( # pylint: disable=too-many-arguments self._udfs: list[FunctionInfo] = [] self._grants: list[Grant] = [] self._jobs: list[Job] = [] - self._dashboards: list[Dashboard] = [] + self._queries: list[LegacyQuery] = [] + self._lakeview_query_id: str | None = None + self._dashboards: list[SdkRedashDashboard | SdkLakeviewDashboard] = [] # TODO: add methods to pre-populate the following: self._spn_infos: list[AzureServicePrincipalInfo] = [] @@ -575,8 +580,21 @@ def make_job(self, **kwargs) -> Job: self._jobs.append(job) return job - def make_dashboard(self, **kwargs) -> Dashboard: - dashboard = self._make_dashboard(**kwargs) + def make_query(self, **kwargs) -> LegacyQuery: + query = self._make_query(**kwargs) + self._queries.append(query) + return query + + def make_dashboard(self, *, query: LegacyQuery | None = None, **kwargs) -> SdkRedashDashboard: + dashboard = self._make_dashboard(query=query, **kwargs) + if query: + self._queries.append(query) + self._dashboards.append(dashboard) + return dashboard + + def make_lakeview_dashboard(self, **kwargs) -> SdkLakeviewDashboard: + dashboard = self._make_lakeview_dashboard(**kwargs) + self._lakeview_query_id = "query" # Hardcoded query name in the `make_lakeview_dashboard` fixture self._dashboards.append(dashboard) return dashboard @@ -592,9 +610,9 @@ def make_linting_resources(self) -> None: self.make_job(content="spark.table('old.stuff')") self.make_job(content="spark.read.parquet('dbfs://mnt/file/')", task_type=SparkPythonTask) self.make_job(content="spark.table('some.table')", task_type=SparkPythonTask) - query_1 = self._make_query(sql_query='SELECT * from parquet.`dbfs://mnt/foo2/bar2`') + query_1 = self.make_query(sql_query='SELECT * from parquet.`dbfs://mnt/foo2/bar2`') self._make_dashboard(query=query_1) - query_2 = self._make_query(sql_query='SELECT * from my_schema.my_table') + query_2 = self.make_query(sql_query='SELECT * from my_schema.my_table') self._make_dashboard(query=query_2) def add_table(self, table: TableInfo): @@ -612,6 +630,7 @@ def config(self) -> WorkspaceConfig: include_databases=self.created_databases, include_job_ids=self.created_jobs, include_dashboard_ids=self.created_dashboards, + include_query_ids=self.created_queries, ) @cached_property @@ -719,9 +738,24 @@ def created_groups(self) -> list[str]: def created_jobs(self) -> list[int]: return [job.job_id for job in self._jobs if job.job_id is not None] + @property + def created_queries(self) -> list[str]: + query_ids = {query.id for query in self._queries if query.id} + if self._lakeview_query_id: + query_ids.add(self._lakeview_query_id) + return list(query_ids) + @property def created_dashboards(self) -> list[str]: - return [dashboard.id for dashboard in self._dashboards if dashboard.id is not None] + dashboard_ids = [] + for dashboard in self._dashboards: + if isinstance(dashboard, SdkRedashDashboard) and dashboard.id: + dashboard_ids.append(dashboard.id) + elif isinstance(dashboard, SdkLakeviewDashboard) and dashboard.dashboard_id: + dashboard_ids.append(dashboard.dashboard_id) + else: + raise ValueError(f"Unsupported dashboard: {dashboard}") + return dashboard_ids @cached_property def azure_service_principal_crawler(self) -> StaticServicePrincipalCrawler: @@ -772,6 +806,7 @@ def runtime_ctx( # pylint: disable=too-many-arguments make_notebook, make_query, make_dashboard, + make_lakeview_dashboard, make_cluster_policy, make_cluster_policy_permissions, env_or_skip, @@ -787,6 +822,7 @@ def runtime_ctx( # pylint: disable=too-many-arguments make_notebook, make_query, make_dashboard, + make_lakeview_dashboard, make_cluster_policy, make_cluster_policy_permissions, env_or_skip, @@ -927,6 +963,7 @@ def __init__( # pylint: disable=too-many-arguments make_notebook_fixture, make_query_fixture, make_dashboard_fixture, + make_lakeview_dashboard_fixture, make_cluster_policy, make_cluster_policy_permissions, ws_fixture, @@ -942,6 +979,7 @@ def __init__( # pylint: disable=too-many-arguments make_notebook_fixture, make_query_fixture, make_dashboard_fixture, + make_lakeview_dashboard_fixture, make_cluster_policy, make_cluster_policy_permissions, env_or_skip_fixture, @@ -1036,6 +1074,7 @@ def config(self) -> WorkspaceConfig: include_databases=self.created_databases, include_job_ids=self.created_jobs, include_dashboard_ids=self.created_dashboards, + include_query_ids=self.created_queries, include_object_permissions=self.include_object_permissions, warehouse_id=self._env_or_skip("TEST_DEFAULT_WAREHOUSE_ID"), ucx_catalog=self.ucx_catalog, @@ -1108,7 +1147,7 @@ def prompts(self) -> MockPrompts: @pytest.fixture -def installation_ctx( # pylint: disable=too-many-arguments +def installation_ctx( # pylint: disable=too-many-arguments,too-many-locals ws, sql_backend, make_catalog, @@ -1124,6 +1163,7 @@ def installation_ctx( # pylint: disable=too-many-arguments make_notebook, make_query, make_dashboard, + make_lakeview_dashboard, make_cluster_policy, make_cluster_policy_permissions, watchdog_purge_suffix, @@ -1142,6 +1182,7 @@ def installation_ctx( # pylint: disable=too-many-arguments make_notebook, make_query, make_dashboard, + make_lakeview_dashboard, make_cluster_policy, make_cluster_policy_permissions, ws, diff --git a/tests/integration/source_code/test_dashboards.py b/tests/integration/source_code/test_dashboards.py new file mode 100644 index 0000000000..fbff91d49e --- /dev/null +++ b/tests/integration/source_code/test_dashboards.py @@ -0,0 +1,154 @@ +import datetime as dt + +import pytest + +from databricks.labs.ucx.source_code.base import LineageAtom, UsedTable +from databricks.labs.ucx.source_code.directfs_access import DirectFsAccess +from databricks.labs.ucx.source_code.jobs import JobProblem +from databricks.sdk.service.iam import PermissionLevel + +from databricks.labs.ucx.source_code.queries import QueryProblem + + +def _populate_workflow_problems(installation_ctx): + job_problems = [ + JobProblem( + job_id=12345, + job_name="Peter the Job", + task_key="23456", + path="parent/child.py", + code="sql-parse-error", + message="Could not parse SQL", + start_line=1234, + start_col=22, + end_line=1234, + end_col=32, + ) + ] + installation_ctx.sql_backend.save_table( + f'{installation_ctx.inventory_database}.workflow_problems', + job_problems, + JobProblem, + mode='overwrite', + ) + + +def _populate_dashboard_problems(installation_ctx): + query_problems = [ + QueryProblem( + dashboard_id="12345", + dashboard_parent="dashbards/parent", + dashboard_name="my_dashboard", + query_id="23456", + query_parent="queries/parent", + query_name="my_query", + code="sql-parse-error", + message="Could not parse SQL", + ) + ] + installation_ctx.sql_backend.save_table( + f'{installation_ctx.inventory_database}.query_problems', + query_problems, + QueryProblem, + mode='overwrite', + ) + + +def _populate_directfs_problems(installation_ctx): + dfsas = [ + DirectFsAccess( + path="some_path", + is_read=False, + is_write=True, + source_id="xyz.py", + source_timestamp=dt.datetime.now(dt.timezone.utc) - dt.timedelta(hours=2.0), + source_lineage=[ + LineageAtom(object_type="WORKFLOW", object_id="my_workflow_id", other={"name": "my_workflow"}), + LineageAtom(object_type="TASK", object_id="my_workflow_id/my_task_id"), + LineageAtom(object_type="NOTEBOOK", object_id="my_notebook_path"), + LineageAtom(object_type="FILE", object_id="my file_path"), + ], + assessment_start_timestamp=dt.datetime.now(dt.timezone.utc) - dt.timedelta(minutes=5.0), + assessment_end_timestamp=dt.datetime.now(dt.timezone.utc) - dt.timedelta(minutes=2.0), + ) + ] + installation_ctx.directfs_access_crawler_for_paths.dump_all(dfsas) + dfsas = [ + DirectFsAccess( + path="some_path", + is_read=False, + is_write=True, + source_id="xyz.py", + source_timestamp=dt.datetime.now(dt.timezone.utc) - dt.timedelta(hours=2.0), + source_lineage=[ + LineageAtom(object_type="DASHBOARD", object_id="my_dashboard_id", other={"name": "my_dashboard"}), + LineageAtom(object_type="QUERY", object_id="my_dashboard_id/my_query_id", other={"name": "my_query"}), + ], + assessment_start_timestamp=dt.datetime.now(dt.timezone.utc) - dt.timedelta(minutes=5.0), + assessment_end_timestamp=dt.datetime.now(dt.timezone.utc) - dt.timedelta(minutes=2.0), + ) + ] + installation_ctx.directfs_access_crawler_for_queries.dump_all(dfsas) + + +def _populate_used_tables(installation_ctx): + tables = [ + UsedTable( + catalog_name="hive_metastore", + schema_name="staff_db", + table_name="employees", + is_read=False, + is_write=True, + source_id="xyz.py", + source_timestamp=dt.datetime.now(dt.timezone.utc) - dt.timedelta(hours=2.0), + source_lineage=[ + LineageAtom(object_type="WORKFLOW", object_id="my_workflow_id", other={"name": "my_workflow"}), + LineageAtom(object_type="TASK", object_id="my_workflow_id/my_task_id"), + LineageAtom(object_type="NOTEBOOK", object_id="my_notebook_path"), + LineageAtom(object_type="FILE", object_id="my file_path"), + ], + assessment_start_timestamp=dt.datetime.now(dt.timezone.utc) - dt.timedelta(minutes=5.0), + assessment_end_timestamp=dt.datetime.now(dt.timezone.utc) - dt.timedelta(minutes=2.0), + ) + ] + installation_ctx.used_tables_crawler_for_paths.dump_all(tables) + tables = [ + UsedTable( + catalog_name="hive_metastore", + schema_name="customers_db", + table_name="customers", + is_read=False, + is_write=True, + source_id="xyz.py", + source_timestamp=dt.datetime.now(dt.timezone.utc) - dt.timedelta(hours=2.0), + source_lineage=[ + LineageAtom(object_type="DASHBOARD", object_id="my_dashboard_id", other={"name": "my_dashboard"}), + LineageAtom(object_type="QUERY", object_id="my_dashboard_id/my_query_id", other={"name": "my_query"}), + ], + assessment_start_timestamp=dt.datetime.now(dt.timezone.utc) - dt.timedelta(minutes=5.0), + assessment_end_timestamp=dt.datetime.now(dt.timezone.utc) - dt.timedelta(minutes=2.0), + ) + ] + installation_ctx.used_tables_crawler_for_queries.dump_all(tables) + + +@pytest.mark.skip("Development tool") +def test_dashboard_with_prepopulated_data(installation_ctx, make_cluster_policy, make_cluster_policy_permissions): + """the purpose of this test is to prepopulate data used by the dashboard without running an actual -lengthy- assessment""" + ucx_group, _ = installation_ctx.make_ucx_group() + cluster_policy = make_cluster_policy() + make_cluster_policy_permissions( + object_id=cluster_policy.policy_id, + permission_level=PermissionLevel.CAN_USE, + group_name=ucx_group.display_name, + ) + installation_ctx.__dict__['include_object_permissions'] = [f"cluster-policies:{cluster_policy.policy_id}"] + installation_ctx.workspace_installation.run() + print(f"\nInventory database is {installation_ctx.inventory_database}\n") + # populate data + _populate_workflow_problems(installation_ctx) + _populate_dashboard_problems(installation_ctx) + _populate_directfs_problems(installation_ctx) + _populate_used_tables(installation_ctx) + # put a breakpoint here + print("Put a breakpoint here! Then go check the dashboard in your workspace ;-)\n") diff --git a/tests/integration/source_code/test_directfs_access.py b/tests/integration/source_code/test_directfs_access.py index 60692d54e1..9aa8943f6a 100644 --- a/tests/integration/source_code/test_directfs_access.py +++ b/tests/integration/source_code/test_directfs_access.py @@ -1,33 +1,78 @@ +import pytest + from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex +from databricks.labs.ucx.source_code.base import DirectFsAccess, LineageAtom from databricks.labs.ucx.source_code.jobs import WorkflowLinter -from databricks.labs.ucx.source_code.queries import QueryLinter -def test_query_dfsa_ownership(runtime_ctx, make_query, make_dashboard, inventory_schema, sql_backend) -> None: - """Verify the ownership of a direct-fs record for a query.""" +def test_legacy_query_dfsa_ownership(runtime_ctx) -> None: + """Verify the ownership of a direct-fs record for a legacy query.""" + query = runtime_ctx.make_query(sql_query="SELECT * from csv.`dbfs://some_folder/some_file.csv`") + dashboard = runtime_ctx.make_dashboard(query=query) - # A dashboard with a query that contains a direct filesystem reference. - query = make_query(sql_query="SELECT * from csv.`dbfs://some_folder/some_file.csv`") - dashboard = make_dashboard(query=query) + runtime_ctx.query_linter.refresh_report() - # Produce a DFSA record for the query. - linter = QueryLinter( - runtime_ctx.workspace_client, - sql_backend, - inventory_schema, - TableMigrationIndex([]), - runtime_ctx.directfs_access_crawler_for_queries, - runtime_ctx.used_tables_crawler_for_queries, - include_dashboard_ids=[dashboard.id], - ) - linter.refresh_report() + dfsas = list(runtime_ctx.directfs_access_crawler_for_queries.snapshot()) + # By comparing the element instead of the list the `field(compare=False)` of the dataclass attributes take effect + assert dfsas == [ + DirectFsAccess( + source_id=f"{dashboard.id}/{query.id}", + source_lineage=[ + LineageAtom( + object_type="DASHBOARD", + object_id=dashboard.id, + other={"parent": dashboard.parent, "name": dashboard.name}, + ), + LineageAtom( + object_type="QUERY", + object_id=f"{dashboard.id}/{query.id}", + other={"name": query.name}, + ), + ], + path="dbfs://some_folder/some_file.csv", + is_read=True, + is_write=False, + ) + ] + + owner = runtime_ctx.directfs_access_ownership.owner_of(dfsas[0]) + assert owner == runtime_ctx.workspace_client.current_user.me().user_name - # Find a record for the query. - records = runtime_ctx.directfs_access_crawler_for_queries.snapshot() - query_record = next(record for record in records if record.source_id == f"{dashboard.id}/{query.id}") - # Verify ownership can be made. - owner = runtime_ctx.directfs_access_ownership.owner_of(query_record) +@pytest.mark.xfail(reason="https://github.com/databrickslabs/ucx/issues/3411") +def test_lakeview_query_dfsa_ownership(runtime_ctx) -> None: + """Verify the ownership of a direct-fs record for a Lakeview query.""" + # `make_lakeview_dashboard` fixture expects query as string + dashboard = runtime_ctx.make_lakeview_dashboard(query="SELECT * from csv.`dbfs://some_folder/some_file.csv`") + + runtime_ctx.query_linter.refresh_report() + + dfsas = list(runtime_ctx.directfs_access_crawler_for_queries.snapshot()) + # By comparing the element instead of the list the `field(compare=False)` of the dataclass attributes take effect + # The "query" in the source and object id, and "count" in the name are hardcoded in the + # `make_lakeview_dashboard` fixture + assert dfsas == [ + DirectFsAccess( + source_id=f"{dashboard.dashboard_id}/query", + source_lineage=[ + LineageAtom( + object_type="DASHBOARD", + object_id=dashboard.dashboard_id, + other={"parent": dashboard.parent_path, "name": dashboard.display_name}, + ), + LineageAtom( + object_type="QUERY", + object_id=f"{dashboard.dashboard_id}/query", + other={"name": "count"}, + ), + ], + path="dbfs://some_folder/some_file.csv", + is_read=True, + is_write=False, + ) + ] + + owner = runtime_ctx.directfs_access_ownership.owner_of(dfsas[0]) assert owner == runtime_ctx.workspace_client.current_user.me().user_name diff --git a/tests/integration/source_code/test_queries.py b/tests/integration/source_code/test_queries.py index 0802710287..75213905d3 100644 --- a/tests/integration/source_code/test_queries.py +++ b/tests/integration/source_code/test_queries.py @@ -1,60 +1,79 @@ -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex -from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler -from databricks.labs.ucx.source_code.queries import QueryLinter -from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler +from databricks.labs.lsql.backends import Row +from databricks.labs.ucx.source_code.base import DirectFsAccess, LineageAtom, UsedTable -def test_query_linter_lints_queries_and_stores_dfsas_and_tables( - simple_ctx, ws, sql_backend, make_query, make_dashboard -): - queries = [make_query(sql_query="SELECT * from csv.`dbfs://some_folder/some_file.csv`")] - dashboards = [make_dashboard(query=queries[0])] - queries.append(make_query(sql_query="SELECT * from some_schema.some_table")) - dashboards.append(make_dashboard(query=queries[1])) - linter = QueryLinter( - ws, - sql_backend, - simple_ctx.inventory_database, - TableMigrationIndex([]), - simple_ctx.directfs_access_crawler_for_queries, - simple_ctx.used_tables_crawler_for_queries, - None, - ) - linter.refresh_report() - all_problems = sql_backend.fetch("SELECT * FROM query_problems", schema=simple_ctx.inventory_database) - problems = [row for row in all_problems if row["query_name"] == queries[0].name] - assert len(problems) == 1 - dfsa_crawler = DirectFsAccessCrawler.for_queries(sql_backend, simple_ctx.inventory_database) - all_dfsas = dfsa_crawler.snapshot() - source_id = f"{dashboards[0].id}/{queries[0].id}" - dfsas = [dfsa for dfsa in all_dfsas if dfsa.source_id == source_id] - assert len(dfsas) == 1 - assert len(dfsas[0].source_lineage) == 2 - lineage = dfsas[0].source_lineage[0] - assert lineage.object_type == "DASHBOARD" - assert lineage.object_id == dashboards[0].id - assert lineage.other - assert lineage.other.get("parent", None) == dashboards[0].parent - assert lineage.other.get("name", None) == dashboards[0].name - lineage = dfsas[0].source_lineage[1] - assert lineage.object_type == "QUERY" - assert lineage.object_id == source_id - assert lineage.other - assert lineage.other.get("name", None) == queries[0].name - used_tables_crawler = UsedTablesCrawler.for_queries(sql_backend, simple_ctx.inventory_database) - all_tables = used_tables_crawler.snapshot() - source_id = f"{dashboards[1].id}/{queries[1].id}" - tables = [table for table in all_tables if table.source_id == source_id] - assert len(tables) == 1 - assert len(tables[0].source_lineage) == 2 - lineage = tables[0].source_lineage[0] - assert lineage.object_type == "DASHBOARD" - assert lineage.object_id == dashboards[1].id - assert lineage.other - assert lineage.other.get("parent", None) == dashboards[1].parent - assert lineage.other.get("name", None) == dashboards[1].name - lineage = tables[0].source_lineage[1] - assert lineage.object_type == "QUERY" - assert lineage.object_id == source_id - assert lineage.other - assert lineage.other.get("name", None) == queries[1].name + +def test_query_linter_lints_queries_and_stores_dfsas_and_tables(simple_ctx) -> None: + query_with_dfsa = simple_ctx.make_query(sql_query="SELECT * from csv.`dbfs://some_folder/some_file.csv`") + dashboard_with_dfsa = simple_ctx.make_dashboard(query=query_with_dfsa) + # Lakeview dashboard expects a string, not a legacy query + dashboard_with_used_table = simple_ctx.make_lakeview_dashboard(query="SELECT * FROM some_schema.some_table") + + simple_ctx.query_linter.refresh_report() + + problems = list(simple_ctx.sql_backend.fetch("SELECT * FROM query_problems", schema=simple_ctx.inventory_database)) + assert problems == [ + Row( + dashboard_id=dashboard_with_dfsa.id, + dashboard_parent=dashboard_with_dfsa.parent, + dashboard_name=dashboard_with_dfsa.name, + query_id=query_with_dfsa.id, + query_parent=query_with_dfsa.parent, + query_name=query_with_dfsa.name, + code='direct-filesystem-access-in-sql-query', + message='The use of direct filesystem references is deprecated: dbfs://some_folder/some_file.csv', + ) + ] + + dfsas = list(simple_ctx.directfs_access_crawler_for_queries.snapshot()) + # By comparing the element instead of the list the `field(compare=False)` of the dataclass attributes take effect + assert dfsas == [ + DirectFsAccess( + source_id=f"{dashboard_with_dfsa.id}/{query_with_dfsa.id}", + source_lineage=[ + LineageAtom( + object_type="DASHBOARD", + object_id=dashboard_with_dfsa.id, + other={"parent": dashboard_with_dfsa.parent, "name": dashboard_with_dfsa.name}, + ), + LineageAtom( + object_type="QUERY", + object_id=f"{dashboard_with_dfsa.id}/{query_with_dfsa.id}", + other={"name": query_with_dfsa.name}, + ), + ], + path="dbfs://some_folder/some_file.csv", + is_read=True, + is_write=False, + ) + ] + + used_tables = list(simple_ctx.used_tables_crawler_for_queries.snapshot()) + # By comparing the element instead of the list the `field(compare=False)` of the dataclass attributes take effect + # The "query" in the source and object id, and "count" in the name are hardcoded in the + # `make_lakeview_dashboard` fixture + assert used_tables == [ + UsedTable( + source_id=f"{dashboard_with_used_table.dashboard_id}/query", + source_lineage=[ + LineageAtom( + object_type="DASHBOARD", + object_id=dashboard_with_used_table.dashboard_id, + other={ + "parent": dashboard_with_used_table.parent_path, + "name": dashboard_with_used_table.display_name, + }, + ), + LineageAtom( + object_type="QUERY", + object_id=f"{dashboard_with_used_table.dashboard_id}/query", + other={"name": "count"}, + ), + ], + catalog_name="hive_metastore", + schema_name="some_schema", + table_name="some_table", + is_read=True, + is_write=False, + ) + ] diff --git a/tests/integration/source_code/test_redash.py b/tests/integration/source_code/test_redash.py index 7256a9e950..c55ab24630 100644 --- a/tests/integration/source_code/test_redash.py +++ b/tests/integration/source_code/test_redash.py @@ -1,31 +1,44 @@ -from databricks.labs.ucx.source_code.redash import Redash -from databricks.sdk import WorkspaceClient -from databricks.sdk.service.sql import Query, Dashboard +import datetime as dt + +from databricks.sdk.retries import retried -from ..conftest import MockInstallationContext +from databricks.labs.ucx.source_code.redash import Redash +from databricks.sdk.service.sql import Dashboard -def test_fix_dashboard(ws: WorkspaceClient, installation_ctx: MockInstallationContext, make_dashboard, make_query): - dashboard: Dashboard = make_dashboard() - another_query: Query = make_query() +def test_migrate_dashboards_sets_migration_tags(installation_ctx) -> None: + query_in_dashboard, query_outside_dashboard = installation_ctx.make_query(), installation_ctx.make_query() + assert query_in_dashboard.id and query_outside_dashboard.id, "Query from fixture misses id" + dashboard: Dashboard = installation_ctx.make_dashboard(query=query_in_dashboard) + assert dashboard.id, "Dashboard from fixture misses id" installation_ctx.workspace_installation.run() + installation_ctx.redash.migrate_dashboards(dashboard.id) - # make sure the query is marked as migrated - queries = Redash.get_queries_from_dashboard(dashboard) - for query in queries: - assert query.id is not None - content = ws.queries.get(query.id) - assert content.tags is not None and Redash.MIGRATED_TAG in content.tags - - # make sure a different query does not get migrated - assert another_query.id is not None - another_query = ws.queries.get(another_query.id) - assert another_query.tags is not None and len(another_query.tags) == 1 - assert Redash.MIGRATED_TAG not in another_query.tags - - # revert the dashboard, make sure the query has only a single tag - installation_ctx.redash.revert_dashboards(dashboard.id) - for query in queries: - assert query.id is not None - content = ws.queries.get(query.id) - assert content.tags is not None and len(content.tags) == 1 + + @retried(on=[ValueError], timeout=dt.timedelta(seconds=90)) + def wait_for_migrated_tag_in_dashboard(dashboard_id: str) -> None: + dashboard_latest = installation_ctx.workspace_client.dashboards.get(dashboard_id) + if Redash.MIGRATED_TAG not in (dashboard_latest.tags or []): + raise ValueError(f"Missing group migration tag in dashboard: {dashboard_id}") + + wait_for_migrated_tag_in_dashboard(dashboard.id) + + query_migrated = installation_ctx.workspace_client.queries.get(query_in_dashboard.id) + assert Redash.MIGRATED_TAG in (query_migrated.tags or []) + + query_not_migrated = installation_ctx.workspace_client.queries.get(query_outside_dashboard.id) + assert Redash.MIGRATED_TAG not in (query_not_migrated.tags or []) + + installation_ctx.redash_crawler.snapshot(force_refresh=True) # Update the dashboard tags + installation_ctx.redash.revert_dashboards(dashboard.id) # Revert removes migrated tag + + @retried(on=[ValueError], timeout=dt.timedelta(seconds=90)) + def wait_for_migrated_tag_not_in_dashboard(dashboard_id: str) -> None: + dashboard_latest = installation_ctx.workspace_client.dashboards.get(dashboard_id) + if Redash.MIGRATED_TAG in (dashboard_latest.tags or []): + raise ValueError(f"Group migration tag still in dashboard: {dashboard_id}") + + wait_for_migrated_tag_not_in_dashboard(dashboard.id) + + query_reverted = installation_ctx.workspace_client.queries.get(query_in_dashboard.id) + assert Redash.MIGRATED_TAG not in (query_reverted.tags or []) diff --git a/tests/unit/assessment/test_dashboards.py b/tests/unit/assessment/test_dashboards.py new file mode 100644 index 0000000000..cf0ae8f719 --- /dev/null +++ b/tests/unit/assessment/test_dashboards.py @@ -0,0 +1,647 @@ +import logging +import json +from collections.abc import Iterator +from unittest.mock import call, create_autospec + +import pytest +from databricks.labs.lsql.lakeview import Dashboard as LsqlLakeviewDashboard, Dataset +from databricks.labs.lsql.backends import Row +from databricks.sdk import WorkspaceClient +from databricks.sdk.errors import NotFound, PermissionDenied, TooManyRequests +from databricks.sdk.service.dashboards import Dashboard as SdkLakeviewDashboard +from databricks.sdk.service.iam import User +from databricks.sdk.service.sql import ( + Dashboard as SdkRedashDashboard, + LegacyVisualization, + LegacyQuery, + Widget, + QueryOptions, +) + +from databricks.labs.ucx.assessment.dashboards import ( + LakeviewDashboardCrawler, + Dashboard, + DashboardOwnership, + RedashDashboardCrawler, + Query, +) +from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership + + +@pytest.mark.parametrize( + "legacy_query, expected", + [ + (LegacyQuery(id="qid"), Query("qid")), + ( + LegacyQuery( + id="qid", + name="Query", + query="SELECT 42 AS count", + parent="parent", + tags=["tag1", "tag2"], + options=QueryOptions(catalog="catalog", schema="schema"), + ), + Query("qid", "Query", "parent", "SELECT 42 AS count", "catalog", "schema", ["tag1", "tag2"]), + ), + ], +) +def test_query_from_legacy_query(legacy_query: LegacyQuery, expected: Query) -> None: + query = Query.from_legacy_query(legacy_query) + assert query == expected + + +@pytest.mark.parametrize( + "dataset, parent, expected", + [ + (Dataset("qid", "SELECT 42 AS count"), None, Query("qid", query="SELECT 42 AS count")), + ( + Dataset("qid", "SELECT 42 AS count", display_name="Query"), + "parent", + Query("qid", "Query", "parent", "SELECT 42 AS count"), + ), + ], +) +def test_query_from_lakeview_dataset(dataset: Dataset, parent: str | None, expected: Query) -> None: + query = Query.from_lakeview_dataset(dataset, parent=parent) + assert query == expected + + +@pytest.mark.parametrize( + "sdk_dashboard, expected", + [ + (SdkRedashDashboard(id="id"), Dashboard("id")), + ( + SdkRedashDashboard( + id="did", + name="name", + parent="parent", + tags=["tag1", "tag2"], + widgets=[ + Widget(visualization=LegacyVisualization(query=LegacyQuery(id="qid1"))), + Widget(visualization=LegacyVisualization(query=LegacyQuery(id="qid2"))), + ], + user_id=123456789, + ), + Dashboard("did", "name", "parent", ["qid1", "qid2"], ["tag1", "tag2"], "123456789"), + ), + ( + SdkRedashDashboard( + id="did", + name="name", + parent="parent", + tags=["tag1", "tag2"], + widgets=[ + Widget(), + Widget(visualization=LegacyVisualization()), + Widget(visualization=LegacyVisualization(query=LegacyQuery(id="qid1"))), + ], + ), + Dashboard("did", "name", "parent", ["qid1"], ["tag1", "tag2"]), + ), + ], +) +def test_redash_dashboard_from_sdk_dashboard(sdk_dashboard: SdkRedashDashboard, expected: Dashboard) -> None: + dashboard = Dashboard.from_sdk_redash_dashboard(sdk_dashboard) + assert dashboard == expected + + +def test_redash_dashboard_crawler_snapshot_persists_dashboards(mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + dashboards = [ + SdkRedashDashboard( + id="did", + name="name", + parent="parent", + tags=["tag1", "tag2"], + widgets=[ + Widget(visualization=LegacyVisualization(query=LegacyQuery(id="qid1"))), + Widget(visualization=LegacyVisualization(query=LegacyQuery(id="qid2"))), + ], + ), + ] + ws.dashboards.list.side_effect = lambda: (dashboard for dashboard in dashboards) # Expects an iterator + crawler = RedashDashboardCrawler(ws, mock_backend, "test") + + crawler.snapshot() + + rows = mock_backend.rows_written_for("hive_metastore.test.redash_dashboards", "overwrite") + assert rows == [ + Row(id="did", name="name", parent="parent", query_ids=["qid1", "qid2"], tags=["tag1", "tag2"], creator_id=None) + ] + ws.dashboards.list.assert_called_once() + + +def test_redash_dashboard_crawler_handles_databricks_error_on_list(caplog, mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + ws.dashboards.list.side_effect = PermissionDenied("Missing permission") + crawler = RedashDashboardCrawler(ws, mock_backend, "test") + + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.assessment.dashboards"): + crawler.snapshot() + + rows = mock_backend.rows_written_for("hive_metastore.test.redash_dashboards", "overwrite") + assert len(rows) == 0 + assert "Cannot list Redash dashboards" in caplog.messages + ws.dashboards.list.assert_called_once() + + +def test_redash_dashboard_crawler_handles_databricks_error_on_iterate(caplog, mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + dashboards = [SdkRedashDashboard(id="did1"), SdkRedashDashboard(id="did2")] + + def list_dashboards() -> Iterator[SdkRedashDashboard]: + for dashboard in dashboards: + yield dashboard + raise TooManyRequests("Exceeded API limit") + + ws.dashboards.list.side_effect = list_dashboards + crawler = RedashDashboardCrawler(ws, mock_backend, "test") + + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.assessment.dashboards"): + crawler.snapshot() + + rows = mock_backend.rows_written_for("hive_metastore.test.redash_dashboards", "overwrite") + assert rows == [Row(id="did1", name=None, parent=None, query_ids=[], tags=[], creator_id=None)] + assert "Cannot list next Redash dashboards page" in caplog.messages + ws.dashboards.list.assert_called_once() + + +def test_redash_dashboard_crawler_stops_when_debug_listing_upper_limit_reached(mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + dashboards = [SdkRedashDashboard(id="did1"), SdkRedashDashboard(id="did2")] + ws.dashboards.list.side_effect = lambda: (dashboard for dashboard in dashboards) + crawler = RedashDashboardCrawler(ws, mock_backend, "test", debug_listing_upper_limit=1) + + crawler.snapshot() + + rows = mock_backend.rows_written_for("hive_metastore.test.redash_dashboards", "overwrite") + assert rows == [Row(id="did1", name=None, parent=None, query_ids=[], tags=[], creator_id=None)] + ws.dashboards.list.assert_called_once() + + +def test_redash_dashboard_crawler_includes_dashboard_ids(mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + ws.dashboards.get.return_value = SdkRedashDashboard(id="did1") + crawler = RedashDashboardCrawler(ws, mock_backend, "test", include_dashboard_ids=["did1"]) + + crawler.snapshot() + + rows = mock_backend.rows_written_for("hive_metastore.test.redash_dashboards", "overwrite") + assert rows == [Row(id="did1", name=None, parent=None, query_ids=[], tags=[], creator_id=None)] + ws.dashboards.get.assert_called_once_with("did1") + ws.dashboards.list.assert_not_called() + + +def test_redash_dashboard_crawler_skips_not_found_dashboard_ids(caplog, mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + + def get_dashboards(dashboard_id: str) -> SdkRedashDashboard: + if dashboard_id == "did1": + return SdkRedashDashboard(id="did1") + raise NotFound(f"Did not find dashboard: {dashboard_id}") + + ws.dashboards.get.side_effect = get_dashboards + crawler = RedashDashboardCrawler(ws, mock_backend, "test", include_dashboard_ids=["did1", "did2"]) + + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.assessment.dashboards"): + crawler.snapshot() + + rows = mock_backend.rows_written_for("hive_metastore.test.redash_dashboards", "overwrite") + assert rows == [Row(id="did1", name=None, parent=None, query_ids=[], tags=[], creator_id=None)] + assert "Cannot get Redash dashboard: did2" in caplog.messages + ws.dashboards.get.assert_has_calls([call("did1"), call("did2")]) + ws.dashboards.list.assert_not_called() + + +def list_legacy_queries() -> list[LegacyQuery]: + queries = [ + LegacyQuery(id="qid1", name="First query", parent="parent", query="SELECT 42 AS count"), + LegacyQuery(id="qid2", name="Second query", parent="parent", query="SELECT 21 AS count"), + ] + return queries + + +def get_legacy_query(query_id: str) -> LegacyQuery: + for query in list_legacy_queries(): + if query.id == query_id: + return query + raise NotFound(f"Legacy query: {query_id}") + + +def test_redash_dashboard_crawler_list_queries_includes_query_ids(mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + ws.queries_legacy.list.side_effect = list_legacy_queries + ws.queries_legacy.get.side_effect = get_legacy_query + crawler = RedashDashboardCrawler(ws, mock_backend, "test", include_query_ids=["qid1"]) + + queries = list(crawler.list_queries()) + + assert queries == [Query(id="qid1", name="First query", parent="parent", query="SELECT 42 AS count")] + ws.queries_legacy.list.assert_not_called() + ws.queries_legacy.get.assert_called_once() + + +def test_redash_dashboard_crawler_list_queries_includes_query_ids_from_dashboard(mock_backend) -> None: + dashboard = Dashboard("did", query_ids=["qid1", "qid2"]) + ws = create_autospec(WorkspaceClient) + ws.queries_legacy.list.side_effect = list_legacy_queries + ws.queries_legacy.get.side_effect = get_legacy_query + crawler = RedashDashboardCrawler(ws, mock_backend, "test", include_query_ids=["qid1"]) + + queries = list(crawler.list_queries(dashboard)) + + assert queries == [Query(id="qid1", name="First query", parent="parent", query="SELECT 42 AS count")] + ws.queries_legacy.list.assert_not_called() + ws.queries_legacy.get.assert_called_once() + + +def test_redash_dashboard_crawler_skips_not_found_query_ids(caplog, mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + ws.queries_legacy.list.side_effect = list_legacy_queries + ws.queries_legacy.get.side_effect = get_legacy_query + crawler = RedashDashboardCrawler(ws, mock_backend, "test", include_query_ids=["qid1", "non-existing-id"]) + + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.assessment.dashboards"): + queries = list(crawler.list_queries()) + + assert queries == [Query(id="qid1", name="First query", parent="parent", query="SELECT 42 AS count")] + assert "Cannot get Redash query: non-existing-id" in caplog.messages + ws.queries_legacy.list.assert_not_called() + ws.queries_legacy.get.assert_has_calls([call("qid1"), call("non-existing-id")]) + + +def test_redash_dashboard_crawler_snapshot_skips_dashboard_without_id(mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + dashboards = [SdkRedashDashboard(id="did1"), SdkRedashDashboard()] # Second misses dashboard id + ws.dashboards.list.side_effect = lambda: (dashboard for dashboard in dashboards) # Expects an iterator + crawler = RedashDashboardCrawler(ws, mock_backend, "test") + + crawler.snapshot() + + rows = mock_backend.rows_written_for("hive_metastore.test.redash_dashboards", "overwrite") + assert rows == [Row(id="did1", name=None, parent=None, query_ids=[], tags=[], creator_id=None)] + ws.dashboards.list.assert_called_once() + + +def test_redash_dashboard_crawler_list_queries(mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + ws.queries_legacy.list.return_value = [ + LegacyQuery(id="qid1", name="First query", parent="parent", query="SELECT 42 AS count"), + LegacyQuery(id="qid2", name="Second query", parent="parent", query="SELECT 21 AS count"), + ] + crawler = RedashDashboardCrawler(ws, mock_backend, "test") + + queries = list(crawler.list_queries()) + + assert queries == [ + Query("qid1", "First query", "parent", "SELECT 42 AS count"), + Query("qid2", "Second query", "parent", "SELECT 21 AS count"), + ] + ws.queries_legacy.list.assert_called_once() + + +def test_redash_dashboard_crawler_list_queries_handles_permission_denied(caplog, mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + ws.queries_legacy.list.side_effect = PermissionDenied("Missing permissions") + crawler = RedashDashboardCrawler(ws, mock_backend, "test") + + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.assessment.dashboards"): + queries = list(crawler.list_queries()) + + assert len(queries) == 0 + assert "Cannot list Redash queries" in caplog.messages + ws.queries_legacy.list.assert_called_once() + + +def test_redash_dashboard_crawler_list_queries_from_dashboard(mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + ws.queries_legacy.get.return_value = LegacyQuery( + id="qid", name="Query", parent="parent", query="SELECT 42 AS count" + ) + crawler = RedashDashboardCrawler(ws, mock_backend, "test") + + queries = list(crawler.list_queries(dashboard=Dashboard("did", query_ids=["qid"]))) + + assert queries == [Query("qid", "Query", "parent", "SELECT 42 AS count")] + ws.queries_legacy.get.assert_called_once_with("qid") + + +def test_redash_dashboard_crawler_list_queries_handles_not_found(caplog, mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + ws.queries_legacy.get.side_effect = NotFound("Query not found: qid") + crawler = RedashDashboardCrawler(ws, mock_backend, "test") + + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.assessment.dashboards"): + queries = list(crawler.list_queries(dashboard=Dashboard("did", query_ids=["qid"]))) + + assert len(queries) == 0 + assert "Cannot get Redash query: qid" in caplog.messages + ws.queries_legacy.get.assert_called_once_with("qid") + + +def test_redash_dashboard_crawler_list_queries_stops_when_debug_listing_upper_limit_reached(mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + legacy_queries = [LegacyQuery(id="qid1"), LegacyQuery(id="qid2")] + ws.queries_legacy.list.side_effect = lambda: (query for query in legacy_queries) + crawler = RedashDashboardCrawler(ws, mock_backend, "test", debug_listing_upper_limit=1) + + queries = list(crawler.list_queries()) + + assert len(queries) == 1 + ws.queries_legacy.list.assert_called_once() + + +@pytest.mark.parametrize( + "sdk_dashboard, expected", + [ + (SdkLakeviewDashboard(dashboard_id="id"), Dashboard("id")), + ( + SdkLakeviewDashboard( + dashboard_id="did", + display_name="name", + parent_path="parent", + serialized_dashboard=json.dumps( + LsqlLakeviewDashboard( + datasets=[Dataset("qid1", "SELECT 1"), Dataset("qid2", "SELECT 2")], + pages=[], + ).as_dict() + ), + ), + Dashboard("did", "name", "parent", ["qid1", "qid2"]), + ), + ( + SdkLakeviewDashboard( + dashboard_id="did", + display_name="name", + parent_path="parent", + serialized_dashboard=json.dumps(LsqlLakeviewDashboard(datasets=[], pages=[]).as_dict()), + ), + Dashboard("did", "name", "parent", []), + ), + ], +) +def test_lakeview_dashboard_from_sdk_dashboard(sdk_dashboard: SdkLakeviewDashboard, expected: Dashboard) -> None: + dashboard = Dashboard.from_sdk_lakeview_dashboard(sdk_dashboard) + assert dashboard == expected + + +def test_lakeview_dashboard_crawler_snapshot_persists_dashboards(mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + dashboards = [ + SdkLakeviewDashboard( + dashboard_id="did", + display_name="name", + parent_path="parent", + serialized_dashboard=json.dumps( + LsqlLakeviewDashboard( + datasets=[Dataset("qid1", "SELECT 1"), Dataset("qid2", "SELECT 2")], + pages=[], + ).as_dict() + ), + ), + ] + ws.lakeview.list.side_effect = lambda: (dashboard for dashboard in dashboards) # Expects an iterator + crawler = LakeviewDashboardCrawler(ws, mock_backend, "test") + + crawler.snapshot() + + rows = mock_backend.rows_written_for("hive_metastore.test.lakeview_dashboards", "overwrite") + assert rows == [Row(id="did", name="name", parent="parent", query_ids=["qid1", "qid2"], tags=[], creator_id=None)] + ws.lakeview.list.assert_called_once() + + +def test_lakeview_dashboard_crawler_handles_databricks_error_on_list(caplog, mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + ws.lakeview.list.side_effect = PermissionDenied("Missing permission") + crawler = LakeviewDashboardCrawler(ws, mock_backend, "test") + + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.assessment.dashboards"): + crawler.snapshot() + + rows = mock_backend.rows_written_for("hive_metastore.test.lakeview_dashboards", "overwrite") + assert len(rows) == 0 + assert "Cannot list Lakeview dashboards" in caplog.messages + ws.lakeview.list.assert_called_once() + + +def test_lakeview_dashboard_crawler_includes_dashboard_ids(mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + ws.lakeview.get.return_value = SdkLakeviewDashboard(dashboard_id="did1") + crawler = LakeviewDashboardCrawler(ws, mock_backend, "test", include_dashboard_ids=["did1"]) + + crawler.snapshot() + + rows = mock_backend.rows_written_for("hive_metastore.test.lakeview_dashboards", "overwrite") + assert rows == [Row(id="did1", name=None, parent=None, query_ids=[], tags=[], creator_id=None)] + ws.lakeview.get.assert_called_once_with("did1") + ws.lakeview.list.assert_not_called() + + +def test_lakeview_dashboard_crawler_skips_not_found_dashboard_ids(caplog, mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + + def get_dashboards(dashboard_id: str) -> SdkLakeviewDashboard: + if dashboard_id == "did1": + return SdkLakeviewDashboard(dashboard_id="did1") + raise NotFound(f"Did not find dashboard: {dashboard_id}") + + ws.lakeview.get.side_effect = get_dashboards + crawler = LakeviewDashboardCrawler(ws, mock_backend, "test", include_dashboard_ids=["did1", "did2"]) + + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.assessment.dashboards"): + crawler.snapshot() + + rows = mock_backend.rows_written_for("hive_metastore.test.lakeview_dashboards", "overwrite") + assert rows == [Row(id="did1", name=None, parent=None, query_ids=[], tags=[], creator_id=None)] + assert "Cannot get Lakeview dashboard: did2" in caplog.messages + ws.lakeview.get.assert_has_calls([call("did1"), call("did2")]) + ws.lakeview.list.assert_not_called() + + +def test_lakeview_dashboard_crawler_list_queries_includes_query_ids(mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + datasets = [ + Dataset("qid1", "SELECT 42 AS count", "First query"), + Dataset("qid2", "SELECT 21 AS count", "Second query"), + ] + dashboard = SdkLakeviewDashboard( + dashboard_id="parent", + serialized_dashboard=json.dumps(LsqlLakeviewDashboard(datasets=datasets, pages=[]).as_dict()), + ) + ws.lakeview.list.return_value = [dashboard] + crawler = LakeviewDashboardCrawler(ws, mock_backend, "test", include_query_ids=["qid1"]) + + queries = list(crawler.list_queries()) + + assert queries == [Query(id="qid1", name="First query", parent="parent", query="SELECT 42 AS count")] + ws.lakeview.list.assert_called_once() + ws.lakeview.get.assert_not_called() + + +def test_lakeview_dashboard_crawler_list_queries_includes_query_ids_from_dashboard(mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + datasets = [ + Dataset("qid1", "SELECT 42 AS count", "First query"), + Dataset("qid2", "SELECT 21 AS count", "Second query"), + ] + dashboard = SdkLakeviewDashboard( + dashboard_id="parent", + serialized_dashboard=json.dumps(LsqlLakeviewDashboard(datasets=datasets, pages=[]).as_dict()), + ) + ws.lakeview.get.return_value = dashboard + crawler = LakeviewDashboardCrawler(ws, mock_backend, "test", include_query_ids=["qid1"]) + + queries = list(crawler.list_queries(Dashboard("parent"))) + + assert queries == [Query(id="qid1", name="First query", parent="parent", query="SELECT 42 AS count")] + ws.lakeview.list.assert_not_called() + ws.lakeview.get.assert_called_once_with("parent") + + +def test_lakeview_dashboard_crawler_snapshot_skips_dashboard_without_id(mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + dashboards = [SdkLakeviewDashboard(dashboard_id="did1"), SdkLakeviewDashboard()] # Second misses dashboard id + ws.lakeview.list.side_effect = lambda: (dashboard for dashboard in dashboards) # Expects an iterator + crawler = LakeviewDashboardCrawler(ws, mock_backend, "test") + + crawler.snapshot() + + rows = mock_backend.rows_written_for("hive_metastore.test.lakeview_dashboards", "overwrite") + assert rows == [Row(id="did1", name=None, parent=None, query_ids=[], tags=[], creator_id=None)] + ws.lakeview.list.assert_called_once() + + +def test_lakeview_dashboard_crawler_list_queries(mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + dashboards = [ + SdkLakeviewDashboard( + dashboard_id="parent", + serialized_dashboard=json.dumps( + LsqlLakeviewDashboard(datasets=[Dataset("qid1", "SELECT 42 AS count", "Query")], pages=[]).as_dict() + ), + ), + ] + ws.lakeview.list.side_effect = lambda: (dashboard for dashboard in dashboards) # Expects an iterator + crawler = LakeviewDashboardCrawler(ws, mock_backend, "test") + + queries = list(crawler.list_queries()) + + assert queries == [Query("qid1", "Query", "parent", "SELECT 42 AS count")] + ws.lakeview.list.assert_called_once() + + +def test_lakeview_dashboard_crawler_list_queries_handles_permission_denied(caplog, mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + ws.lakeview.list.side_effect = PermissionDenied("Missing permissions") + crawler = LakeviewDashboardCrawler(ws, mock_backend, "test") + + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.assessment.dashboards"): + queries = list(crawler.list_queries()) + + assert len(queries) == 0 + assert "Cannot list Lakeview dashboards" in caplog.messages + ws.lakeview.list.assert_called_once() + + +def test_lakeview_dashboard_crawler_list_queries_handles_corrupted_serialized_dashboard(caplog, mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + dashboards = [SdkLakeviewDashboard(dashboard_id="did", serialized_dashboard='{"invalid": "json}')] + ws.lakeview.list.side_effect = lambda: (dashboard for dashboard in dashboards) # Expects an iterator + crawler = LakeviewDashboardCrawler(ws, mock_backend, "test") + + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.assessment.dashboards"): + queries = list(crawler.list_queries()) + + assert len(queries) == 0 + assert "Error when parsing Lakeview dashboard: did" in caplog.messages + ws.lakeview.list.assert_called_once() + + +def test_lakeview_dashboard_crawler_list_queries_calls_query_api_get(mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + dashboard = SdkLakeviewDashboard( + dashboard_id="parent", + serialized_dashboard=json.dumps( + LsqlLakeviewDashboard(datasets=[Dataset("qid", "SELECT 42 AS count", "Query")], pages=[]).as_dict() + ), + ) + ws.lakeview.get.return_value = dashboard + crawler = LakeviewDashboardCrawler(ws, mock_backend, "test") + + queries = list(crawler.list_queries(Dashboard("did"))) + + assert queries == [Query("qid", "Query", "parent", "SELECT 42 AS count")] + ws.lakeview.get.assert_called_once_with("did") + + +def test_lakeview_dashboard_crawler_list_queries_handles_not_found(caplog, mock_backend) -> None: + ws = create_autospec(WorkspaceClient) + ws.lakeview.get.side_effect = NotFound("Query not found: qid") + crawler = LakeviewDashboardCrawler(ws, mock_backend, "test") + + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.assessment.dashboards"): + queries = list(crawler.list_queries(Dashboard("did"))) + + assert len(queries) == 0 + assert "Cannot get Lakeview dashboard: did" in caplog.messages + ws.lakeview.get.assert_called_once_with("did") + + +def test_dashboard_ownership_owner_of_from_user_display_name() -> None: + administrator_locator = create_autospec(AdministratorLocator) + ws = create_autospec(WorkspaceClient) + ws.users.get.return_value = User(display_name="Cor") + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + ownership = DashboardOwnership(administrator_locator, ws, workspace_path_ownership) + + owner = ownership.owner_of(Dashboard("id", creator_id="123456789")) + + assert owner == "Cor" + administrator_locator.get_workspace_administrator.assert_not_called() + ws.users.get.assert_called_with("123456789") + workspace_path_ownership.owner_of_path.assert_not_called() + + +def test_dashboard_ownership_owner_of_from_user_email() -> None: + administrator_locator = create_autospec(AdministratorLocator) + ws = create_autospec(WorkspaceClient) + ws.users.get.return_value = User(user_name="cor.zuurmond@databricks.com") + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + ownership = DashboardOwnership(administrator_locator, ws, workspace_path_ownership) + + owner = ownership.owner_of(Dashboard("id", creator_id="123456789")) + + assert owner == "cor.zuurmond@databricks.com" + administrator_locator.get_workspace_administrator.assert_not_called() + ws.users.get.assert_called_with("123456789") + workspace_path_ownership.owner_of_path.assert_not_called() + + +def test_dashboard_ownership_owner_of_from_workspace_path_owner() -> None: + administrator_locator = create_autospec(AdministratorLocator) + ws = create_autospec(WorkspaceClient) + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + workspace_path_ownership.owner_of_path.return_value = "Cor" + ownership = DashboardOwnership(administrator_locator, ws, workspace_path_ownership) + + owner = ownership.owner_of(Dashboard("id", parent="path")) + + assert owner == "Cor" + administrator_locator.get_workspace_administrator.assert_not_called() + ws.users.get.assert_not_called() + workspace_path_ownership.owner_of_path.assert_called_with("path") + + +def test_dashboard_ownership_owner_of_from_administrator_locator() -> None: + administrator_locator = create_autospec(AdministratorLocator) + administrator_locator.get_workspace_administrator.return_value = "Cor" + ws = create_autospec(WorkspaceClient) + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + ownership = DashboardOwnership(administrator_locator, ws, workspace_path_ownership) + + owner = ownership.owner_of(Dashboard("id")) + + assert owner == "Cor" + administrator_locator.get_workspace_administrator.assert_called_once() + ws.users.get.assert_not_called() + workspace_path_ownership.owner_of_path.assert_not_called() diff --git a/tests/unit/source_code/test_queries.py b/tests/unit/source_code/test_queries.py index 6b42c6449d..e09938db8f 100644 --- a/tests/unit/source_code/test_queries.py +++ b/tests/unit/source_code/test_queries.py @@ -1,11 +1,11 @@ -from unittest import mock from unittest.mock import create_autospec import pytest -from databricks.sdk import WorkspaceClient +from databricks.labs.lsql.backends import Row from databricks.sdk.service.sql import LegacyQuery +from databricks.labs.ucx.assessment.dashboards import Dashboard, RedashDashboardCrawler, Query from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler from databricks.labs.ucx.source_code.queries import QueryLinter from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler @@ -27,45 +27,67 @@ def test_query_linter_collects_dfsas_from_queries( name, query, dfsa_paths, is_read, is_write, migration_index, mock_backend ) -> None: - ws = create_autospec(WorkspaceClient) dfsa_crawler = create_autospec(DirectFsAccessCrawler) used_tables_crawler = create_autospec(UsedTablesCrawler) + dashboard_crawler = create_autospec(RedashDashboardCrawler) query = LegacyQuery.from_dict({"parent": "workspace", "name": name, "query": query}) - linter = QueryLinter(ws, mock_backend, "test", migration_index, dfsa_crawler, used_tables_crawler, None) + linter = QueryLinter(mock_backend, "test", migration_index, dfsa_crawler, used_tables_crawler, [dashboard_crawler]) + dfsas = linter.collect_dfsas_from_query("no-dashboard-id", query) - ws.assert_not_called() - dfsa_crawler.assert_not_called() - used_tables_crawler.assert_not_called() + assert set(dfsa.path for dfsa in dfsas) == set(dfsa_paths) assert all(dfsa.is_read == is_read for dfsa in dfsas) assert all(dfsa.is_write == is_write for dfsa in dfsas) + dfsa_crawler.assert_not_called() + used_tables_crawler.assert_not_called() + dashboard_crawler.snapshot.assert_not_called() def test_query_linter_refresh_report_writes_query_problems(migration_index, mock_backend) -> None: - ws = create_autospec(WorkspaceClient) dfsa_crawler = create_autospec(DirectFsAccessCrawler) used_tables_crawler = create_autospec(UsedTablesCrawler) - linter = QueryLinter(ws, mock_backend, "test", migration_index, dfsa_crawler, used_tables_crawler, None) + dashboard_crawler = create_autospec(RedashDashboardCrawler) + linter = QueryLinter(mock_backend, "test", migration_index, dfsa_crawler, used_tables_crawler, [dashboard_crawler]) linter.refresh_report() assert mock_backend.has_rows_written_for("`hive_metastore`.`test`.`query_problems`") - ws.dashboards.list.assert_called_once() dfsa_crawler.assert_not_called() used_tables_crawler.assert_not_called() + dashboard_crawler.snapshot.assert_called_once() + dashboard_crawler.list_queries.assert_called_once() def test_lints_queries(migration_index, mock_backend) -> None: - with mock.patch("databricks.labs.ucx.source_code.queries.Redash") as mocked_redash: - query = LegacyQuery(id="123", query="SELECT * from nowhere") - mocked_redash.get_queries_from_dashboard.return_value = [query] - ws = create_autospec(WorkspaceClient) - dfsa_crawler = create_autospec(DirectFsAccessCrawler) - used_tables_crawler = create_autospec(UsedTablesCrawler) - linter = QueryLinter(ws, mock_backend, "test", migration_index, dfsa_crawler, used_tables_crawler, ["1"]) - linter.refresh_report() + dfsa_crawler = create_autospec(DirectFsAccessCrawler) + used_tables_crawler = create_autospec(UsedTablesCrawler) + dashboard_crawler = create_autospec(RedashDashboardCrawler) + dashboard_crawler.snapshot.return_value = [Dashboard("did", "dname", "dparent", query_ids=["qid"])] + dashboard_crawler.list_queries.return_value = [ + Query( + id="qid", + name="qname", + parent="qparent", + query="SELECT * FROM old.things", + ) + ] + linter = QueryLinter(mock_backend, "test", migration_index, dfsa_crawler, used_tables_crawler, [dashboard_crawler]) + + linter.refresh_report() - assert mock_backend.has_rows_written_for("`hive_metastore`.`test`.`query_problems`") - ws.dashboards.list.assert_not_called() - dfsa_crawler.assert_not_called() - used_tables_crawler.assert_not_called() + rows = mock_backend.rows_written_for("`hive_metastore`.`test`.`query_problems`", "overwrite") + assert rows == [ + Row( + dashboard_id="did", + dashboard_parent="dparent", + dashboard_name="dname", + query_id="qid", + query_parent="qparent", + query_name="qname", + code="table-migrated-to-uc", + message="Table old.things is migrated to brand.new.stuff in Unity Catalog", + ) + ] + dfsa_crawler.assert_not_called() + used_tables_crawler.assert_not_called() + dashboard_crawler.snapshot.assert_called_once() diff --git a/tests/unit/source_code/test_redash.py b/tests/unit/source_code/test_redash.py index c60f892498..bc39430d92 100644 --- a/tests/unit/source_code/test_redash.py +++ b/tests/unit/source_code/test_redash.py @@ -1,115 +1,81 @@ -from unittest.mock import create_autospec, call +from unittest.mock import create_autospec import pytest from databricks.labs.blueprint.installation import MockInstallation +from databricks.sdk.errors import PermissionDenied +from databricks.sdk.service.sql import LegacyQuery, UpdateQueryRequestQuery -from databricks.sdk.service.sql import LegacyQuery, Dashboard, Widget, LegacyVisualization, QueryOptions - +from databricks.labs.ucx.assessment.dashboards import Dashboard, Query, RedashDashboardCrawler from databricks.labs.ucx.source_code.redash import Redash -from databricks.sdk import WorkspaceClient -from databricks.sdk.service.sql import UpdateQueryRequestQuery -from databricks.sdk.errors import PermissionDenied, NotFound - @pytest.fixture -def redash_ws(): - workspace_client = create_autospec(WorkspaceClient) - workspace_client.workspace.get_status.side_effect = NotFound("error") - workspace_client.queries.create.return_value = LegacyQuery(id="123") - workspace_client.dashboards.list.return_value = [ - Dashboard( +def redash_installation(): + installation = MockInstallation( + { + "backup/queries/1.json": {"id": "1", "query": "SELECT * FROM old.things"}, + "backup/queries/3.json": {"id": "3", "query": "SELECT * FROM old.things", "tags": ["test_tag"]}, + } + ) + return installation + + +def list_queries(dashboard: Dashboard) -> list[Query]: + queries = [ + Query( id="1", - widgets=[ - Widget( - visualization=LegacyVisualization( - query=LegacyQuery( - id="1", - name="test_query", - query="SELECT * FROM old.things", - options=QueryOptions(catalog="hive_metastore", schema="default"), - tags=["test_tag"], - ) - ) - ), - Widget( - visualization=LegacyVisualization( - query=LegacyQuery( - id="1", - name="test_query", - query="SELECT * FROM old.things", - tags=[Redash.MIGRATED_TAG], - ) - ) - ), - None, - ], + name="test_query", + query="SELECT * FROM old.things", + catalog="hive_metastore", + schema="default", + tags=["test_tag"], ), - Dashboard( + Query( id="2", - tags=[Redash.MIGRATED_TAG], - widgets=[ - Widget( - visualization=LegacyVisualization( - query=LegacyQuery( - id="1", - name="test_query", - query="SELECT * FROM old.things", - tags=[Redash.MIGRATED_TAG], - ) - ) - ), - Widget(visualization=LegacyVisualization(query=LegacyQuery(id="2", query="SELECT"))), - Widget( - visualization=LegacyVisualization( - query=LegacyQuery(id="3", query="SELECT", tags=[Redash.MIGRATED_TAG]) - ) - ), - ], + name="test_query", + query="SELECT * FROM old.things", + catalog="hive_metastore", + schema="default", + tags=["test_tag"], + ), + Query( + id="3", + name="test_query", + query="SELECT * FROM old.things", + catalog="hive_metastore", + schema="default", + tags=["test_tag", Redash.MIGRATED_TAG], ), - Dashboard(id="3", tags=[]), ] - workspace_client.dashboards.get.return_value = Dashboard( - id="2", - tags=[Redash.MIGRATED_TAG], - widgets=[ - Widget( - visualization=LegacyVisualization( - query=LegacyQuery( - id="1", - name="test_query", - query="SELECT * FROM old.things", - tags=[Redash.MIGRATED_TAG], - ) - ) - ) - ], - ) - - return workspace_client + query_mapping = {query.id: query for query in queries} + return [query_mapping[query_id] for query_id in dashboard.query_ids if query_id in query_mapping] @pytest.fixture -def redash_installation(): - installation = MockInstallation( - { - "backup/queries/1.json": {"id": "1", "query": "original_query"}, - "backup/queries/3.json": {"id": "3", "query": "original_query", "tags": ["test_tag"]}, - } - ) - return installation +def redash_dashboard_crawler(): + crawler = create_autospec(RedashDashboardCrawler) + crawler.snapshot.return_value = [ + Dashboard(id="1", query_ids=["1"]), + Dashboard(id="2", query_ids=["1", "2", "3"], tags=[Redash.MIGRATED_TAG]), + Dashboard(id="3", tags=[]), + ] + crawler.list_queries.side_effect = list_queries + return crawler + +def test_migrate_all_dashboards(ws, empty_index, redash_installation, redash_dashboard_crawler) -> None: + redash = Redash(empty_index, ws, redash_installation, redash_dashboard_crawler) -def test_migrate_all_dashboards(redash_ws, empty_index, redash_installation) -> None: - redash = Redash(empty_index, redash_ws, redash_installation) redash.migrate_dashboards() + redash_installation.assert_file_written( "backup/queries/1.json", { + 'catalog': 'hive_metastore', 'id': '1', 'name': 'test_query', - 'options': {'catalog': 'hive_metastore', 'schema': 'default'}, 'query': 'SELECT * FROM old.things', + 'schema': 'default', 'tags': ['test_tag'], }, ) @@ -117,71 +83,58 @@ def test_migrate_all_dashboards(redash_ws, empty_index, redash_installation) -> query_text="SELECT * FROM old.things", tags=[Redash.MIGRATED_TAG, 'test_tag'], ) - redash_ws.queries.update.assert_called_with( + ws.queries.update.assert_called_with( "1", update_mask="query_text,tags", query=query, ) + redash_dashboard_crawler.snapshot.assert_called_once() -def test_migrate_all_dashboards_error(redash_ws, empty_index, redash_installation, caplog) -> None: - redash_ws.dashboards.list.side_effect = PermissionDenied("error") - redash = Redash(empty_index, redash_ws, redash_installation) - redash.migrate_dashboards() - assert "Cannot list dashboards" in caplog.text - +def test_revert_single_dashboard(caplog, ws, empty_index, redash_installation, redash_dashboard_crawler) -> None: + ws.queries.get.return_value = LegacyQuery(id="1", query="original_query") + redash = Redash(empty_index, ws, redash_installation, redash_dashboard_crawler) -def test_revert_single_dashboard(redash_ws, empty_index, redash_installation, caplog) -> None: - redash_ws.queries.get.return_value = LegacyQuery(id="1", query="original_query") - redash = Redash(empty_index, redash_ws, redash_installation) - redash.revert_dashboards("2") - query = UpdateQueryRequestQuery(query_text="original_query") - redash_ws.queries.update.assert_called_with( - "1", - update_mask="query_text,tags", - query=query, - ) - redash_ws.queries.update.side_effect = PermissionDenied("error") redash.revert_dashboards("2") - assert "Cannot restore" in caplog.text + + query = UpdateQueryRequestQuery(query_text="SELECT * FROM old.things", tags=["test_tag"]) + ws.queries.update.assert_called_with("3", update_mask="query_text,tags", query=query) + ws.queries.update.side_effect = PermissionDenied("error") + redash_dashboard_crawler.snapshot.assert_called_once() -def test_revert_dashboards(redash_ws, empty_index, redash_installation) -> None: - redash_ws.queries.get.return_value = LegacyQuery(id="1", query="original_query") - redash = Redash(empty_index, redash_ws, redash_installation) +def test_revert_dashboards(ws, empty_index, redash_installation, redash_dashboard_crawler) -> None: + ws.queries.get.return_value = LegacyQuery(id="1", query="original_query") + redash = Redash(empty_index, ws, redash_installation, redash_dashboard_crawler) + redash.revert_dashboards() - calls = [ - call("1", update_mask="query_text,tags", query=UpdateQueryRequestQuery(query_text="original_query")), - call( - "3", - update_mask="query_text,tags", - query=UpdateQueryRequestQuery(query_text="original_query", tags=["test_tag"]), - ), - ] - redash_ws.queries.update.assert_has_calls(calls) + query = UpdateQueryRequestQuery(query_text="SELECT * FROM old.things", tags=["test_tag"]) + ws.queries.update.assert_called_with("3", update_mask="query_text,tags", query=query) + redash_dashboard_crawler.snapshot.assert_called_once() -def test_get_queries_from_dashboard(redash_ws) -> None: - empty_dashboard = Dashboard( - id="1", - ) - assert len(list(Redash.get_queries_from_dashboard(empty_dashboard))) == 0 - dashboard = Dashboard( - id="1", - widgets=[ - Widget(), - Widget(visualization=LegacyVisualization()), - Widget( - visualization=LegacyVisualization( - query=LegacyQuery( - id="1", - name="test_query", - query="SELECT * FROM old.things", - ) - ) - ), - ], - ) - queries = list(Redash.get_queries_from_dashboard(dashboard)) - assert len(queries) == 1 - assert queries[0].id == "1" + +def test_migrate_dashboard_gets_no_queries_when_dashboard_is_empty( + ws, empty_index, redash_installation, redash_dashboard_crawler +) -> None: + empty_dashboard = Dashboard(id="1") + redash_dashboard_crawler.snapshot.return_value = [empty_dashboard] + redash = Redash(empty_index, ws, redash_installation, redash_dashboard_crawler) + + redash.migrate_dashboards() + + ws.queries_legacy.get.assert_not_called() + redash_dashboard_crawler.snapshot.assert_called_once() + + +def test_migrate_dashboard_lists_queries_from_dashboard( + ws, empty_index, redash_installation, redash_dashboard_crawler +) -> None: + dashboard = Dashboard(id="1", query_ids=["1"]) + redash_dashboard_crawler.snapshot.return_value = [dashboard] + redash = Redash(empty_index, ws, redash_installation, redash_dashboard_crawler) + + redash.migrate_dashboards() + + redash_dashboard_crawler.list_queries.assert_called_with(dashboard) + redash_dashboard_crawler.snapshot.assert_called_once() diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index 0477f55767..ddb681aaf5 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -22,6 +22,7 @@ from databricks.sdk.service.workspace import ExportFormat, ImportFormat, ObjectInfo, ObjectType from databricks.labs.ucx.assessment.aws import AWSResources, AWSRoleAction +from databricks.labs.ucx.assessment.dashboards import RedashDashboardCrawler from databricks.labs.ucx.aws.access import AWSResourcePermissions from databricks.labs.ucx.azure.access import AzureResourcePermissions from databricks.labs.ucx.azure.resources import AzureResource, AzureResources, StorageAccount @@ -73,6 +74,7 @@ from databricks.labs.ucx.hive_metastore.tables import Table from databricks.labs.ucx.progress.install import VerifyProgressTracking from databricks.labs.ucx.source_code.linters.files import LocalFileMigrator +from databricks.labs.ucx.source_code.redash import Redash def create_workspace_client_mock(workspace_id: int) -> WorkspaceClient: @@ -1134,26 +1136,40 @@ def test_create_missing_principal_azure(ws, caplog, acc_client): assert str(failure.value) == "Unsupported cloud provider" -@pytest.mark.parametrize("run_as_collection", [False, True]) -def test_migrate_dbsql_dashboards_list_dashboards( - run_as_collection, - workspace_clients, - acc_client, -) -> None: - if not run_as_collection: - workspace_clients = [workspace_clients[0]] - migrate_dbsql_dashboards( - workspace_clients[0], - run_as_collection=run_as_collection, - a=acc_client, - ) - for workspace_client in workspace_clients: - workspace_client.dashboards.list.assert_called_once() +def test_migrate_dbsql_dashboards_calls_migrate_dashboards_on_redash(ws) -> None: + redash = create_autospec(Redash) + ctx = WorkspaceContext(ws).replace(redash=redash) + migrate_dbsql_dashboards(ws, ctx=ctx) + redash.migrate_dashboards.assert_called_once() + + +def test_migrate_dbsql_dashboards_calls_migrate_dashboards_on_redash_with_dashboard_id(ws) -> None: + redash = create_autospec(Redash) + ctx = WorkspaceContext(ws).replace(redash=redash) + migrate_dbsql_dashboards(ws, dashboard_id="id", ctx=ctx) + redash.migrate_dashboards.assert_called_once_with("id") + + +def test_revert_dbsql_dashboards_calls_revert_dashboards_on_redash(ws): + redash = create_autospec(Redash) + redash_crawler = create_autospec(RedashDashboardCrawler) + ctx = WorkspaceContext(ws).replace(redash=redash, redash_crawler=redash_crawler) + + revert_dbsql_dashboards(ws, ctx=ctx) + + redash.revert_dashboards.assert_called_once_with() + redash_crawler.snapshot.assert_called_once_with(force_refresh=True) + + +def test_revert_dbsql_dashboards_calls_revert_dashboards_on_redash_with_dashboard_id(ws): + redash = create_autospec(Redash) + redash_crawler = create_autospec(RedashDashboardCrawler) + ctx = WorkspaceContext(ws).replace(redash=redash, redash_crawler=redash_crawler) + revert_dbsql_dashboards(ws, dashboard_id="id", ctx=ctx) -def test_revert_dbsql_dashboards(ws, caplog): - revert_dbsql_dashboards(ws) - ws.dashboards.list.assert_called_once() + redash.revert_dashboards.assert_called_once_with("id") + redash_crawler.snapshot.assert_called_once_with(force_refresh=True) def test_cli_missing_awscli(ws, mocker, caplog):