Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added references to hive_metastore catalog in all table references an… #2419

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/assessment/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def snapshot(self) -> Iterable[AzureServicePrincipalInfo]:
return self._snapshot(self._try_fetch, self._crawl)

def _try_fetch(self) -> Iterable[AzureServicePrincipalInfo]:
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
for row in self._fetch(f"SELECT * FROM {self._catalog}.{self._schema}.{self._table}"):
yield AzureServicePrincipalInfo(*row)

def _crawl(self) -> Iterable[AzureServicePrincipalInfo]:
Expand Down
4 changes: 2 additions & 2 deletions src/databricks/labs/ucx/assessment/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def snapshot(self) -> Iterable[ClusterInfo]:
return self._snapshot(self._try_fetch, self._crawl)

def _try_fetch(self) -> Iterable[ClusterInfo]:
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
for row in self._fetch(f"SELECT * FROM {self._catalog}.{self._schema}.{self._table}"):
yield ClusterInfo(*row)


Expand Down Expand Up @@ -229,5 +229,5 @@ def snapshot(self) -> Iterable[PolicyInfo]:
return self._snapshot(self._try_fetch, self._crawl)

def _try_fetch(self) -> Iterable[PolicyInfo]:
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
for row in self._fetch(f"SELECT * FROM {self._catalog}.{self._schema}.{self._table}"):
yield PolicyInfo(*row)
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/assessment/init_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,5 @@ def snapshot(self) -> Iterable[GlobalInitScriptInfo]:
return self._snapshot(self._try_fetch, self._crawl)

def _try_fetch(self) -> Iterable[GlobalInitScriptInfo]:
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
for row in self._fetch(f"SELECT * FROM {self._catalog}.{self._schema}.{self._table}"):
yield GlobalInitScriptInfo(*row)
4 changes: 2 additions & 2 deletions src/databricks/labs/ucx/assessment/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def snapshot(self) -> Iterable[JobInfo]:
return self._snapshot(self._try_fetch, self._crawl)

def _try_fetch(self) -> Iterable[JobInfo]:
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
for row in self._fetch(f"SELECT * FROM {self._catalog}.{self._schema}.{self._table}"):
yield JobInfo(*row)

def _check_jar_task(self, all_task: list[RunTask]) -> list[str]:
Expand Down Expand Up @@ -190,7 +190,7 @@ def _crawl(self) -> Iterable[SubmitRunInfo]:
return self._assess_job_runs(submit_runs, all_clusters)

def _try_fetch(self) -> Iterable[SubmitRunInfo]:
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
for row in self._fetch(f"SELECT * FROM {self._catalog}.{self._schema}.{self._table}"):
yield SubmitRunInfo(*row)

def _check_spark_conf(self, conf: dict[str, str], source: str) -> list[str]:
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/assessment/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,5 @@ def snapshot(self) -> Iterable[PipelineInfo]:
return self._snapshot(self._try_fetch, self._crawl)

def _try_fetch(self) -> Iterable[PipelineInfo]:
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
for row in self._fetch(f"SELECT * FROM {self._catalog}.{self._schema}.{self._table}"):
yield PipelineInfo(*row)
6 changes: 3 additions & 3 deletions src/databricks/labs/ucx/hive_metastore/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def _add_jdbc_location(self, external_locations, location, table):
def _external_location_list(self) -> Iterable[ExternalLocation]:
tables = list(
self._backend.fetch(
f"SELECT location, storage_properties FROM {escape_sql_identifier(self._schema)}.tables WHERE location IS NOT NULL"
f"SELECT location, storage_properties FROM {self._catalog}.{escape_sql_identifier(self._schema)}.tables WHERE location IS NOT NULL"
)
)
mounts = Mounts(self._backend, self._ws, self._schema).snapshot()
Expand All @@ -215,7 +215,7 @@ def snapshot(self) -> Iterable[ExternalLocation]:

def _try_fetch(self) -> Iterable[ExternalLocation]:
for row in self._fetch(
f"SELECT * FROM {escape_sql_identifier(self._schema)}.{escape_sql_identifier(self._table)}"
f"SELECT * FROM {self._catalog}.{escape_sql_identifier(self._schema)}.{escape_sql_identifier(self._table)}"
):
yield ExternalLocation(*row)

Expand Down Expand Up @@ -335,7 +335,7 @@ def snapshot(self) -> Iterable[Mount]:

def _try_fetch(self) -> Iterable[Mount]:
for row in self._fetch(
f"SELECT * FROM {escape_sql_identifier(self._schema)}.{escape_sql_identifier(self._table)}"
f"SELECT * FROM {self._catalog}.{escape_sql_identifier(self._schema)}.{escape_sql_identifier(self._table)}"
):
yield Mount(*row)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def _crawl(self) -> Iterable[MigrationStatus]:
yield table_migration_status

def _try_fetch(self) -> Iterable[MigrationStatus]:
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
for row in self._fetch(f"SELECT * FROM {self._catalog}.{self._schema}.{self._table}"):
yield MigrationStatus(*row)

def _iter_schemas(self):
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/workspace_access/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def snapshot(self) -> Iterable[WorkspaceObjectInfo]:
return self._snapshot(self._try_fetch, self._crawl)

def _try_fetch(self) -> Iterable[WorkspaceObjectInfo]:
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
for row in self._fetch(f"SELECT * FROM {self._catalog}.{self._schema}.{self._table}"):
yield WorkspaceObjectInfo(
path=row["path"], object_type=row["object_type"], object_id=row["object_id"], language=row["language"]
)
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/assessment/test_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def test_policy_try_fetch():
ws = mock_workspace_client(policy_ids=['single-user-with-spn-policyid'])
mock_backend = MockBackend(
rows={
r"SELECT \* FROM ucx.policies": [
r"SELECT \* FROM hive_metastore.ucx.policies": [
(
"single-user-with-spn-policyid",
"test_policy",
Expand Down
18 changes: 9 additions & 9 deletions tests/unit/assessment/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def test_assess_azure_service_principals(run_workflow):

def test_runtime_workspace_listing(run_workflow):
ctx = run_workflow(Assessment.workspace_listing)
assert "SELECT * FROM ucx.workspace_objects" in ctx.sql_backend.queries
assert "SELECT * FROM hive_metastore.ucx.workspace_objects" in ctx.sql_backend.queries


def test_runtime_crawl_grants(run_workflow):
Expand All @@ -36,12 +36,12 @@ def test_runtime_crawl_groups(run_workflow):

def test_runtime_crawl_cluster_policies(run_workflow):
ctx = run_workflow(Assessment.crawl_cluster_policies)
assert "SELECT * FROM ucx.policies" in ctx.sql_backend.queries
assert "SELECT * FROM hive_metastore.ucx.policies" in ctx.sql_backend.queries


def test_runtime_crawl_init_scripts(run_workflow):
ctx = run_workflow(Assessment.assess_global_init_scripts)
assert "SELECT * FROM ucx.global_init_scripts" in ctx.sql_backend.queries
assert "SELECT * FROM hive_metastore.ucx.global_init_scripts" in ctx.sql_backend.queries


def test_estimate_table_size_for_migration(run_workflow):
Expand All @@ -52,32 +52,32 @@ def test_estimate_table_size_for_migration(run_workflow):

def test_runtime_mounts(run_workflow):
ctx = run_workflow(Assessment.crawl_mounts)
assert "SELECT * FROM ucx.mounts" in ctx.sql_backend.queries
assert "SELECT * FROM hive_metastore.ucx.mounts" in ctx.sql_backend.queries


def test_guess_external_locations(run_workflow):
ctx = run_workflow(Assessment.guess_external_locations)
assert "SELECT * FROM ucx.mounts" in ctx.sql_backend.queries
assert "SELECT * FROM hive_metastore.ucx.mounts" in ctx.sql_backend.queries


def test_assess_jobs(run_workflow):
ctx = run_workflow(Assessment.assess_jobs)
assert "SELECT * FROM ucx.jobs" in ctx.sql_backend.queries
assert "SELECT * FROM hive_metastore.ucx.jobs" in ctx.sql_backend.queries


def test_assess_clusters(run_workflow):
ctx = run_workflow(Assessment.assess_clusters)
assert "SELECT * FROM ucx.clusters" in ctx.sql_backend.queries
assert "SELECT * FROM hive_metastore.ucx.clusters" in ctx.sql_backend.queries


def test_assess_pipelines(run_workflow):
ctx = run_workflow(Assessment.assess_pipelines)
assert "SELECT * FROM ucx.pipelines" in ctx.sql_backend.queries
assert "SELECT * FROM hive_metastore.ucx.pipelines" in ctx.sql_backend.queries


def test_incompatible_submit_runs(run_workflow):
ctx = run_workflow(Assessment.assess_incompatible_submit_runs)
assert "SELECT * FROM ucx.submit_runs" in ctx.sql_backend.queries
assert "SELECT * FROM hive_metastore.ucx.submit_runs" in ctx.sql_backend.queries


def test_failing_task_raises_value_error(run_workflow):
Expand Down
34 changes: 24 additions & 10 deletions tests/unit/azure/test_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

def test_save_spn_permissions_no_external_table(caplog):
w = create_autospec(WorkspaceClient)
rows = {"SELECT \\* FROM ucx.external_locations": []}
rows = {"SELECT \\* FROM hive_metastore.ucx.external_locations": []}
backend = MockBackend(rows=rows)
location = ExternalLocations(w, backend, "ucx")
installation = MockInstallation()
Expand All @@ -48,7 +48,7 @@ def test_save_spn_permissions_no_external_table(caplog):

def test_save_spn_permissions_no_external_tables():
w = create_autospec(WorkspaceClient)
rows = {"SELECT \\* FROM ucx.external_locations": [["s3://bucket1/folder1", "0"]]}
rows = {"SELECT \\* FROM hive_metastore.ucx.external_locations": [["s3://bucket1/folder1", "0"]]}
backend = MockBackend(rows=rows)
location = ExternalLocations(w, backend, "ucx")
installation = MockInstallation()
Expand All @@ -67,7 +67,9 @@ def test_save_spn_permissions_no_external_tables():
def test_save_spn_permissions_no_azure_storage_account():
w = create_autospec(WorkspaceClient)
rows = {
"SELECT \\* FROM ucx.external_locations": [["abfss://[email protected]/folder1", "1"]]
"SELECT \\* FROM hive_metastore.ucx.external_locations": [
["abfss://[email protected]/folder1", "1"]
]
}
backend = MockBackend(rows=rows)
location = ExternalLocations(w, backend, "ucx")
Expand All @@ -87,7 +89,7 @@ def test_save_spn_permissions_no_azure_storage_account():
def test_save_spn_permissions_valid_azure_storage_account():
w = create_autospec(WorkspaceClient)
rows = {
"SELECT \\* FROM ucx.external_locations": [
"SELECT \\* FROM hive_metastore.ucx.external_locations": [
["s3://bucket1/folder1", "1"],
["abfss://[email protected]/folder1", "1"],
]
Expand Down Expand Up @@ -170,7 +172,7 @@ def test_save_spn_permissions_valid_azure_storage_account():
def test_save_spn_permissions_custom_role_valid_azure_storage_account():
w = create_autospec(WorkspaceClient)
rows = {
"SELECT \\* FROM ucx.external_locations": [
"SELECT \\* FROM hive_metastore.ucx.external_locations": [
["s3://bucket1/folder1", "1"],
["abfss://[email protected]/folder1", "1"],
]
Expand Down Expand Up @@ -586,7 +588,7 @@ def test_create_global_spn_spn_present():

def test_create_global_spn_no_storage():
w = create_autospec(WorkspaceClient)
rows = {"SELECT \\* FROM ucx.external_locations": [["s3://bucket1/folder1", "0"]]}
rows = {"SELECT \\* FROM hive_metastore.ucx.external_locations": [["s3://bucket1/folder1", "0"]]}
backend = MockBackend(rows=rows)
installation = MockInstallation(
{
Expand Down Expand Up @@ -621,7 +623,11 @@ def test_create_global_spn_no_storage():
def test_create_global_spn_cluster_policy_not_found():
w = create_autospec(WorkspaceClient)
w.cluster_policies.get.side_effect = NotFound()
rows = {"SELECT \\* FROM ucx.external_locations": [["abfss://[email protected]/folder1", "1"]]}
rows = {
"SELECT \\* FROM hive_metastore.ucx.external_locations": [
["abfss://[email protected]/folder1", "1"]
]
}
backend = MockBackend(rows=rows)
location = ExternalLocations(w, backend, "ucx")
installation = MockInstallation(
Expand Down Expand Up @@ -659,7 +665,11 @@ def test_create_global_spn():
w.cluster_policies.get.return_value = cluster_policy
w.secrets.get_secret.return_value = GetSecretResponse("uber_principal_secret", "mypwd")
w.warehouses.get_workspace_warehouse_config.return_value = GetWorkspaceWarehouseConfigResponse
rows = {"SELECT \\* FROM ucx.external_locations": [["abfss://[email protected]/folder1", "1"]]}
rows = {
"SELECT \\* FROM hive_metastore.ucx.external_locations": [
["abfss://[email protected]/folder1", "1"]
]
}
backend = MockBackend(rows=rows)
location = ExternalLocations(w, backend, "ucx")
installation = MockInstallation(
Expand Down Expand Up @@ -766,7 +776,9 @@ def test_create_access_connectors_for_storage_accounts_one_access_connector(yiel
w = create_autospec(WorkspaceClient)

rows = {
"SELECT \\* FROM ucx.external_locations": [["abfss://[email protected]/folder1", "1"]]
"SELECT \\* FROM hive_metastore.ucx.external_locations": [
["abfss://[email protected]/folder1", "1"]
]
}
backend = MockBackend(rows=rows)

Expand Down Expand Up @@ -824,7 +836,9 @@ def test_create_access_connectors_for_storage_accounts_log_permission_applied(ca
w = create_autospec(WorkspaceClient)

rows = {
"SELECT \\* FROM ucx.external_locations": [["abfss://[email protected]/folder1", "1"]]
"SELECT \\* FROM hive_metastore.ucx.external_locations": [
["abfss://[email protected]/folder1", "1"]
]
}
backend = MockBackend(rows=rows)

Expand Down
14 changes: 7 additions & 7 deletions tests/unit/azure/test_locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def test_run_service_principal():
# mock crawled HMS external locations
mock_backend = MockBackend(
rows={
r"SELECT \* FROM location_test.external_locations": EXTERNAL_LOCATIONS[
r"SELECT \* FROM hive_metastore.location_test.external_locations": EXTERNAL_LOCATIONS[
("abfss://[email protected]/one/", 1),
("abfss://[email protected]/", 2),
]
Expand Down Expand Up @@ -121,7 +121,7 @@ def test_skip_unsupported_location(caplog):
ws = create_autospec(WorkspaceClient)
mock_backend = MockBackend(
rows={
r"SELECT \* FROM location_test.external_locations": EXTERNAL_LOCATIONS[
r"SELECT \* FROM hive_metastore.location_test.external_locations": EXTERNAL_LOCATIONS[
("abfss://[email protected]/one/", 1),
("adl://[email protected]/", 2),
("wasbs://[email protected]/", 2),
Expand Down Expand Up @@ -198,7 +198,7 @@ def test_run_managed_identity():
# mock crawled HMS external locations
mock_backend = MockBackend(
rows={
r"SELECT \* FROM location_test.external_locations": EXTERNAL_LOCATIONS[
r"SELECT \* FROM hive_metastore.location_test.external_locations": EXTERNAL_LOCATIONS[
("abfss://[email protected]/", 4),
("abfss://[email protected]/a/b/", 5),
]
Expand Down Expand Up @@ -273,7 +273,7 @@ def test_run_access_connectors():
"""Test run with access connectors based storage credentials"""
mock_backend = MockBackend(
rows={
r"SELECT \* FROM location_test.external_locations": EXTERNAL_LOCATIONS[
r"SELECT \* FROM hive_metastore.location_test.external_locations": EXTERNAL_LOCATIONS[
("abfss://[email protected]/", 4),
("abfss://[email protected]/a/b/", 5),
]
Expand Down Expand Up @@ -393,7 +393,7 @@ def test_location_failed_to_read():
# mock crawled HMS external locations
mock_backend = MockBackend(
rows={
r"SELECT \* FROM location_test.external_locations": EXTERNAL_LOCATIONS[
r"SELECT \* FROM hive_metastore.location_test.external_locations": EXTERNAL_LOCATIONS[
("abfss://[email protected]/", 1),
("abfss://[email protected]/", 2),
]
Expand Down Expand Up @@ -461,7 +461,7 @@ def test_overlapping_locations(caplog):
# mock crawled HMS external locations
mock_backend = MockBackend(
rows={
r"SELECT \* FROM location_test.external_locations": EXTERNAL_LOCATIONS[
r"SELECT \* FROM hive_metastore.location_test.external_locations": EXTERNAL_LOCATIONS[
("abfss://[email protected]/a/", 1),
("abfss://[email protected]/a/", 1),
]
Expand Down Expand Up @@ -527,7 +527,7 @@ def test_corner_cases_with_missing_fields(caplog, mocker):
# mock crawled HMS external locations
mock_backend = MockBackend(
rows={
r"SELECT \* FROM location_test.external_locations": EXTERNAL_LOCATIONS[
r"SELECT \* FROM hive_metastore.location_test.external_locations": EXTERNAL_LOCATIONS[
("abfss://[email protected]/", 1),
("abfss://[email protected]/", 2),
]
Expand Down
10 changes: 5 additions & 5 deletions tests/unit/hive_metastore/test_locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def test_external_locations():
row_factory = type("Row", (Row,), {"__columns__": ["location", "storage_properties"]})
sql_backend = MockBackend(
rows={
'SELECT location, storage_properties FROM test.tables WHERE location IS NOT NULL': [
'SELECT location, storage_properties FROM hive_metastore.test.tables WHERE location IS NOT NULL': [
row_factory(["s3://us-east-1-dev-account-staging-uc-ext-loc-bucket-1/Location/Table", ""]),
row_factory(["s3://us-east-1-dev-account-staging-uc-ext-loc-bucket-1/Location/Table2", ""]),
row_factory(["s3://us-east-1-dev-account-staging-uc-ext-loc-bucket-23/testloc/Table3", ""]),
Expand Down Expand Up @@ -190,7 +190,7 @@ def test_external_locations():
]
),
],
r"SELECT \* FROM test.mounts": [
r"SELECT \* FROM hive_metastore.test.mounts": [
("/mnt/ucx", "s3://us-east-1-ucx-container"),
],
}
Expand Down Expand Up @@ -219,7 +219,7 @@ def test_save_external_location_mapping_missing_location():
ws = create_autospec(WorkspaceClient)
sbe = MockBackend(
rows={
"SELECT location, storage_properties FROM test.tables WHERE location IS NOT NULL": LOCATION_STORAGE[
"SELECT location, storage_properties FROM hive_metastore.test.tables WHERE location IS NOT NULL": LOCATION_STORAGE[
("s3://test_location/test1/table1", ""),
("gcs://test_location2/test2/table2", ""),
("abfss://[email protected]/test2/table3", ""),
Expand Down Expand Up @@ -270,7 +270,7 @@ def test_save_external_location_mapping_no_missing_location():
ws = create_autospec(WorkspaceClient)
sbe = MockBackend(
rows={
"SELECT location, storage_properties FROM test.tables WHERE location IS NOT NULL": LOCATION_STORAGE[
"SELECT location, storage_properties FROM hive_metastore.test.tables WHERE location IS NOT NULL": LOCATION_STORAGE[
("s3://test_location/test1/table1", ""),
],
}
Expand All @@ -285,7 +285,7 @@ def test_match_table_external_locations():
ws = create_autospec(WorkspaceClient)
sbe = MockBackend(
rows={
"SELECT location, storage_properties FROM test.tables WHERE location IS NOT NULL": LOCATION_STORAGE[
"SELECT location, storage_properties FROM hive_metastore.test.tables WHERE location IS NOT NULL": LOCATION_STORAGE[
("s3://test_location/a/b/c/table1", ""),
("s3://test_location/a/b/table1", ""),
("gcs://test_location2/a/b/table2", ""),
Expand Down
Loading
Loading