Skip to content

Commit

Permalink
Rewrite Redash to use Query instead of LegacyQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
JCZuurmond committed Dec 9, 2024
1 parent e4f2c4a commit 703c657
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 39 deletions.
15 changes: 6 additions & 9 deletions src/databricks/labs/ucx/assessment/dashboards.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ def list_queries(self, dashboard: Dashboard | None = None) -> Iterator[Query]:
This public method does not adhere to the common crawler layout, still, it is implemented to avoid/postpone
another crawler for the queries by retrieving the queries every time they are requested.
"""
for query in self.list_legacy_queries(dashboard):
for query in self._list_legacy_queries(dashboard):
yield Query.from_legacy_query(query)

def list_legacy_queries(self, dashboard: Dashboard | None = None) -> Iterator[LegacyQuery]:
def _list_legacy_queries(self, dashboard: Dashboard | None = None) -> Iterator[LegacyQuery]:
"""List legacy queries.
Args:
Expand All @@ -223,7 +223,10 @@ def list_legacy_queries(self, dashboard: Dashboard | None = None) -> Iterator[Le
This public method does not adhere to the common crawler layout, still, it is implemented to avoid/postpone
another crawler for the queries by retrieving the queries every time they are requested.
"""
queries_iterator = self._list_legacy_queries(dashboard)
if dashboard:
queries_iterator = self._list_legacy_queries_from_dashboard(dashboard)
else:
queries_iterator = self._list_all_legacy_queries()
# Redash APIs are very slow to paginate, especially for large number of dashboards, so we limit the listing
# to a small number of items in debug mode for the assessment workflow just to complete.
counter = itertools.count()
Expand All @@ -233,12 +236,6 @@ def list_legacy_queries(self, dashboard: Dashboard | None = None) -> Iterator[Le
except StopIteration:
break

def _list_legacy_queries(self, dashboard: Dashboard | None = None) -> Iterator[LegacyQuery]:
"""List legacy queries."""
if dashboard:
return self._list_legacy_queries_from_dashboard(dashboard)
return self._list_all_legacy_queries()

def _list_all_legacy_queries(self) -> Iterator[LegacyQuery]:
"""List all queries."""
if self._include_query_ids:
Expand Down
35 changes: 18 additions & 17 deletions src/databricks/labs/ucx/source_code/redash.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging
from dataclasses import replace

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.installation import Installation, SerdeError

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.sql import LegacyQuery, UpdateQueryRequestQuery
from databricks.sdk.errors.platform import DatabricksError

from databricks.labs.ucx.assessment.dashboards import Dashboard, RedashDashboardCrawler
from databricks.labs.ucx.assessment.dashboards import Dashboard, RedashDashboardCrawler, Query
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.source_code.base import CurrentSessionState
from databricks.labs.ucx.source_code.linters.from_table import FromTableSqlLinter
Expand Down Expand Up @@ -35,7 +35,7 @@ def migrate_dashboards(self, *dashboard_ids: str) -> None:
if self.MIGRATED_TAG in dashboard.tags:
logger.debug(f"Dashboard {dashboard.name} already migrated by UCX")
continue
for query in self._crawler.list_legacy_queries(dashboard):
for query in self._crawler.list_queries(dashboard):
self._fix_query(query)
self._ws.dashboards.update(dashboard.id, tags=self._get_migrated_tags(dashboard.tags))

Expand All @@ -44,7 +44,7 @@ def revert_dashboards(self, *dashboard_ids: str) -> None:
if self.MIGRATED_TAG not in dashboard.tags:
logger.debug(f"Dashboard {dashboard.name} was not migrated by UCX")
continue
for query in self._crawler.list_legacy_queries(dashboard):
for query in self._crawler.list_queries(dashboard):
self._revert_query(query)
self._ws.dashboards.update(dashboard.id, tags=self._get_original_tags(dashboard.tags))

Expand All @@ -63,7 +63,7 @@ def _list_dashboards(self, *dashboard_ids: str, force_refresh: bool = False) ->
break
return dashboards_filtered

def _fix_query(self, query: LegacyQuery) -> None:
def _fix_query(self, query: Query) -> None:
assert query.id is not None
assert query.query is not None
# query already migrated
Expand All @@ -87,26 +87,27 @@ def _fix_query(self, query: LegacyQuery) -> None:
return

@staticmethod
def _get_session_state(query: LegacyQuery) -> CurrentSessionState:
def _get_session_state(query: Query) -> CurrentSessionState:
session_state = CurrentSessionState()
if query.options is None:
return session_state
if query.options.catalog:
session_state = replace(session_state, catalog=query.options.catalog)
if query.options.schema:
session_state = replace(session_state, schema=query.options.schema)
if query.catalog:
session_state = replace(session_state, catalog=query.catalog)
if query.schema:
session_state = replace(session_state, schema=query.schema)
return session_state

def _revert_query(self, query: LegacyQuery) -> None:
def _revert_query(self, query: Query) -> None:
assert query.id is not None
assert query.query is not None
if self.MIGRATED_TAG not in (query.tags or []):
logger.debug(f"Query {query.name} was not migrated by UCX")
return
backup_query = self._installation.load(LegacyQuery, filename=f'backup/queries/{query.id}.json')
update_query = UpdateQueryRequestQuery(
query_text=backup_query.query, tags=self._get_original_tags(backup_query.tags)
)
backup_query: Query | LegacyQuery
try:
backup_query = self._installation.load(Query, filename=f'backup/queries/{query.id}.json')
except SerdeError:
# Previous versions store queries as LegacyQuery
backup_query = self._installation.load(LegacyQuery, filename=f'backup/queries/{query.id}.json')
update_query = UpdateQueryRequestQuery(query_text=backup_query.query, tags=self._get_original_tags(query.tags))
try:
self._ws.queries.update(query.id, update_mask="query_text,tags", query=update_query)
except DatabricksError:
Expand Down
31 changes: 18 additions & 13 deletions tests/unit/source_code/test_redash.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import pytest
from databricks.labs.blueprint.installation import MockInstallation
from databricks.sdk.errors import PermissionDenied
from databricks.sdk.service.sql import LegacyQuery, QueryOptions, UpdateQueryRequestQuery
from databricks.sdk.service.sql import LegacyQuery, UpdateQueryRequestQuery

from databricks.labs.ucx.assessment.dashboards import Dashboard, RedashDashboardCrawler
from databricks.labs.ucx.assessment.dashboards import Dashboard, Query, RedashDashboardCrawler
from databricks.labs.ucx.source_code.redash import Redash


Expand All @@ -20,27 +20,30 @@ def redash_installation():
return installation


def list_legacy_queries(dashboard: Dashboard) -> list[LegacyQuery]:
def list_queries(dashboard: Dashboard) -> list[Query]:
queries = [
LegacyQuery(
Query(
id="1",
name="test_query",
query="SELECT * FROM old.things",
options=QueryOptions(catalog="hive_metastore", schema="default"),
catalog="hive_metastore",
schema="default",
tags=["test_tag"],
),
LegacyQuery(
Query(
id="2",
name="test_query",
query="SELECT * FROM old.things",
options=QueryOptions(catalog="hive_metastore", schema="default"),
catalog="hive_metastore",
schema="default",
tags=["test_tag"],
),
LegacyQuery(
Query(
id="3",
name="test_query",
query="SELECT * FROM old.things",
options=QueryOptions(catalog="hive_metastore", schema="default"),
catalog="hive_metastore",
schema="default",
tags=["test_tag", Redash.MIGRATED_TAG],
),
]
Expand All @@ -61,7 +64,7 @@ def redash_dashboard_crawler():
Dashboard(id="2", query_ids=["1", "2", "3"], tags=[Redash.MIGRATED_TAG]),
Dashboard(id="3", tags=[]),
]
crawler.list_legacy_queries.side_effect = list_legacy_queries
crawler.list_queries.side_effect = list_queries
return crawler


Expand All @@ -73,10 +76,12 @@ def test_migrate_all_dashboards(ws, empty_index, redash_installation, redash_das
redash_installation.assert_file_written(
"backup/queries/1.json",
{
'catalog': 'hive_metastore',
'id': '1',
'name': 'test_query',
'options': {'catalog': 'hive_metastore', 'schema': 'default'},
'parent': 'ORPHAN',
'query': 'SELECT * FROM old.things',
'schema': 'default',
'tags': ['test_tag'],
},
)
Expand Down Expand Up @@ -128,7 +133,7 @@ def test_migrate_dashboard_gets_no_queries_when_dashboard_is_empty(
redash_dashboard_crawler.snapshot.assert_called_once()


def test_migrate_dashboard_lists_legacy_queries_from_dashboard(
def test_migrate_dashboard_lists_queries_from_dashboard(
ws, empty_index, redash_installation, redash_dashboard_crawler
) -> None:
dashboard = Dashboard(id="1", query_ids=["1"])
Expand All @@ -137,5 +142,5 @@ def test_migrate_dashboard_lists_legacy_queries_from_dashboard(

redash.migrate_dashboards()

redash_dashboard_crawler.list_legacy_queries.assert_called_with(dashboard)
redash_dashboard_crawler.list_queries.assert_called_with(dashboard)
redash_dashboard_crawler.snapshot.assert_called_once()

0 comments on commit 703c657

Please sign in to comment.