From 3f732a683084bc9f98c7882ebfe9db4cc4587718 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Tue, 26 Nov 2024 14:47:38 -0500 Subject: [PATCH 01/10] Added skeleton --- src/databricks/labs/ucx/cli.py | 5 +- .../labs/ucx/contexts/workspace_cli.py | 1 + .../labs/ucx/hive_metastore/federation.py | 102 +++++++++++++++--- 3 files changed, 92 insertions(+), 16 deletions(-) diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index 63889d8a99..3059f66553 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -870,11 +870,10 @@ def export_assessment(w: WorkspaceClient, prompts: Prompts): @ucx.command -def create_federated_catalog(w: WorkspaceClient, _: Prompts): +def create_federated_catalog(w: WorkspaceClient, prompts: Prompts): """(Experimental) Create federated catalog from current workspace Hive Metastore.""" ctx = WorkspaceContext(w) - ctx.federation.register_internal_hms_as_federated_catalog() - + ctx.federation.create_from_cli(prompts) @ucx.command def enable_hms_federation(w: WorkspaceClient, _: Prompts, ctx: WorkspaceContext | None = None): diff --git a/src/databricks/labs/ucx/contexts/workspace_cli.py b/src/databricks/labs/ucx/contexts/workspace_cli.py index 4308f1c61e..e56b0766d9 100644 --- a/src/databricks/labs/ucx/contexts/workspace_cli.py +++ b/src/databricks/labs/ucx/contexts/workspace_cli.py @@ -197,6 +197,7 @@ def federation_enabler(self): @cached_property def federation(self): return HiveMetastoreFederation( + self.installation, self.workspace_client, self.external_locations, self.workspace_info, diff --git a/src/databricks/labs/ucx/hive_metastore/federation.py b/src/databricks/labs/ucx/hive_metastore/federation.py index a5c12a4d1c..267795eedd 100644 --- a/src/databricks/labs/ucx/hive_metastore/federation.py +++ b/src/databricks/labs/ucx/hive_metastore/federation.py @@ -1,7 +1,11 @@ import collections import logging +import re +from dataclasses import dataclass +from typing import ClassVar from databricks.labs.blueprint.installation import Installation +from databricks.labs.blueprint.tui import Prompts from databricks.sdk import WorkspaceClient from databricks.sdk.errors import AlreadyExists, NotFound, BadRequest from databricks.sdk.service.catalog import ( @@ -14,13 +18,27 @@ ) from databricks.labs.ucx.account.workspaces import WorkspaceInfo +from databricks.labs.ucx.assessment.secrets import SecretsMixin from databricks.labs.ucx.config import WorkspaceConfig +from databricks.labs.ucx.contexts.workspace_cli import WorkspaceContext from databricks.labs.ucx.hive_metastore import ExternalLocations logger = logging.getLogger(__name__) +@dataclass +class ExtHms: + # This is a dataclass that represents the external Hive Metastore connection information + database: str + db_type: str + host: str + password: str + port: int + user: str + version: str + + class HiveMetastoreFederationEnabler: def __init__(self, installation: Installation): self._installation = installation @@ -31,27 +49,63 @@ def enable(self): self._installation.save(config) -class HiveMetastoreFederation: +class HiveMetastoreFederation(SecretsMixin): def __init__( self, - workspace_client: WorkspaceClient, + installation: Installation, + ws: WorkspaceClient, external_locations: ExternalLocations, workspace_info: WorkspaceInfo, enable_hms_federation: bool = False, ): - self._workspace_client = workspace_client + self._ws = ws self._external_locations = external_locations self._workspace_info = workspace_info self._enable_hms_federation = enable_hms_federation + self._installation = installation + + supported_db_vers: ClassVar[dict[str, list[str]]] = { + "mysql": ["2.3.0", "0.13"], + } + + def create_from_cli(self, prompts: Prompts): + + + def _get_ext_hms(self) -> ExtHms: + config = self._installation.load(WorkspaceConfig) + if not config.spark_config: + raise ValueError('Spark config not found') + spark_config = config.spark_config + jdbc_url = self._get_value_from_config_key(spark_config,'spark.datasource.hive.metastore.jdbc.url') + + @classmethod + def _split_jdbc_url(cls, jdbc_url: str) -> tuple[str, str, int, str, str, str]: + # Define the regex pattern to match the JDBC URL components + pattern = re.compile( + r'jdbc:(?P[a-zA-Z0-9]+)://(?P[^:/]+):(?P\d+)/(?P[^?]+)(\?user=(?P[^&]+)&password=(?P[^&]+))?' + ) + match = pattern.match(jdbc_url) + if not match: + raise ValueError(f'Unsupported JDBC URL: {jdbc_url}') + + db_type = match.group('db_type') + host = match.group('host') + port = int(match.group('port')) + database = match.group('database') + user = match.group('user') + password = match.group('password') + + return db_type, host, port, database, user, password + def register_internal_hms_as_federated_catalog(self) -> CatalogInfo: if not self._enable_hms_federation: raise RuntimeWarning('Run `databricks labs ucx enable-hms-federation` to enable HMS Federation') name = self._workspace_info.current() - connection_info = self._get_or_create_connection(name) + connection_info = self._get_or_create_int_connection(name) assert connection_info.name is not None try: - return self._workspace_client.catalogs.create( + return self._ws.catalogs.create( name=connection_info.name, connection_name=connection_info.name, options={"authorized_paths": self._get_authorized_paths()}, @@ -59,30 +113,52 @@ def register_internal_hms_as_federated_catalog(self) -> CatalogInfo: except BadRequest as err: if err.error_code == 'CATALOG_ALREADY_EXISTS': logger.info(f'Catalog {connection_info.name} already exists') - for catalog_info in self._workspace_client.catalogs.list(): + for catalog_info in self._ws.catalogs.list(): if catalog_info.name == connection_info.name: return catalog_info raise err - def _get_or_create_connection(self, name: str) -> ConnectionInfo: + def _get_or_create_int_connection(self, name: str) -> ConnectionInfo: try: - return self._workspace_client.connections.create( + return self._ws.connections.create( name=name, connection_type=ConnectionType.HIVE_METASTORE, # needs SDK change options={"builtin": "true"}, ) except AlreadyExists: - for connection in self._workspace_client.connections.list(): + for connection in self._ws.connections.list(): + if connection.name == name: + return connection + raise NotFound(f'Connection {name} not found') + + def _get_or_create_ext_connection(self, name:str, ext_hms: ExtHms) -> ConnectionInfo: + try: + return self._ws.connections.create( + name=name, + connection_type=ConnectionType.HIVE_METASTORE, # needs SDK change + options={ + "builtin": "true", + "database": ext_hms.database, + "db_type": ext_hms.db_type, + "host": ext_hms.host, + "password": ext_hms.password, + "port": ext_hms.port, + "user": ext_hms.user, + "version": ext_hms.version, + }, + ) + except AlreadyExists: + for connection in self._ws.connections.list(): if connection.name == name: return connection raise NotFound(f'Connection {name} not found') def _get_authorized_paths(self) -> str: existing = {} - for external_location in self._workspace_client.external_locations.list(): + for external_location in self._ws.external_locations.list(): existing[external_location.url] = external_location authorized_paths = [] - current_user = self._workspace_client.current_user.me() + current_user = self._ws.current_user.me() if not current_user.user_name: raise NotFound('Current user not found') for external_location_info in self._external_locations.external_locations_with_root(): @@ -103,11 +179,11 @@ def _add_missing_permissions_if_needed(self, location_name: str, current_user: s grants = self._location_grants(location_name) if Privilege.CREATE_FOREIGN_SECURABLE not in grants[current_user]: change = PermissionsChange(principal=current_user, add=[Privilege.CREATE_FOREIGN_SECURABLE]) - self._workspace_client.grants.update(SecurableType.EXTERNAL_LOCATION, location_name, changes=[change]) + self._ws.grants.update(SecurableType.EXTERNAL_LOCATION, location_name, changes=[change]) def _location_grants(self, location_name: str) -> dict[str, set[Privilege]]: grants: dict[str, set[Privilege]] = collections.defaultdict(set) - result = self._workspace_client.grants.get(SecurableType.EXTERNAL_LOCATION, location_name) + result = self._ws.grants.get(SecurableType.EXTERNAL_LOCATION, location_name) if not result.privilege_assignments: return grants for assignment in result.privilege_assignments: From 08164138e4a7e9b73b36d4da1d5683a0b5dd034e Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Tue, 26 Nov 2024 15:44:28 -0500 Subject: [PATCH 02/10] Added external hms lookup --- src/databricks/labs/ucx/cli.py | 1 + .../labs/ucx/hive_metastore/federation.py | 102 ++++++++++++------ 2 files changed, 72 insertions(+), 31 deletions(-) diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index 3059f66553..a6d5fca73a 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -875,6 +875,7 @@ def create_federated_catalog(w: WorkspaceClient, prompts: Prompts): ctx = WorkspaceContext(w) ctx.federation.create_from_cli(prompts) + @ucx.command def enable_hms_federation(w: WorkspaceClient, _: Prompts, ctx: WorkspaceContext | None = None): """(Experimental) Create federated catalog from current workspace Hive Metastore.""" diff --git a/src/databricks/labs/ucx/hive_metastore/federation.py b/src/databricks/labs/ucx/hive_metastore/federation.py index 267795eedd..56b1902912 100644 --- a/src/databricks/labs/ucx/hive_metastore/federation.py +++ b/src/databricks/labs/ucx/hive_metastore/federation.py @@ -1,7 +1,7 @@ import collections import logging import re -from dataclasses import dataclass +from dataclasses import dataclass, replace from typing import ClassVar from databricks.labs.blueprint.installation import Installation @@ -20,7 +20,6 @@ from databricks.labs.ucx.account.workspaces import WorkspaceInfo from databricks.labs.ucx.assessment.secrets import SecretsMixin from databricks.labs.ucx.config import WorkspaceConfig -from databricks.labs.ucx.contexts.workspace_cli import WorkspaceContext from databricks.labs.ucx.hive_metastore import ExternalLocations @@ -30,13 +29,13 @@ @dataclass class ExtHms: # This is a dataclass that represents the external Hive Metastore connection information - database: str db_type: str host: str - password: str - port: int - user: str - version: str + port: str + database: str + user: str | None + password: str | None + version: str | None class HiveMetastoreFederationEnabler: @@ -69,17 +68,60 @@ def __init__( } def create_from_cli(self, prompts: Prompts): + if not self._enable_hms_federation: + raise RuntimeWarning('Run `databricks labs ucx enable-hms-federation` to enable HMS Federation') + name = self._workspace_info.current() + ext_hms = None + try: + ext_hms = self._get_ext_hms() + except ValueError: + logger.info('Failed to get external Hive Metastore connection information') + + if ext_hms and prompts.confirm( + f'Identified a supported external Hive Metastore connection: {ext_hms.db_type}. Use this connection?' + ): + connection_info = self._get_or_create_ext_connection(name, ext_hms) + else: + connection_info = self._get_or_create_int_connection(name) + assert connection_info.name is not None + return self._register_federated_catalog(connection_info) def _get_ext_hms(self) -> ExtHms: config = self._installation.load(WorkspaceConfig) - if not config.spark_config: + if not config.spark_conf: + raise ValueError('Spark config not found') + spark_config = config.spark_conf + if not spark_config: raise ValueError('Spark config not found') - spark_config = config.spark_config - jdbc_url = self._get_value_from_config_key(spark_config,'spark.datasource.hive.metastore.jdbc.url') + jdbc_url = self._get_value_from_config_key(spark_config, 'spark.hadoop.javax.jdo.option.ConnectionURL') + if not jdbc_url: + raise ValueError('JDBC URL not found') + version = self._get_value_from_config_key(spark_config, 'spark.sql.hive.metastore.version') + if not version: + raise ValueError('Hive Metastore version not found') + ext_hms = replace(self._split_jdbc_url(jdbc_url), version=version) + supported_versions = self.supported_db_vers.get(ext_hms.db_type) + if not supported_versions: + raise ValueError(f'Unsupported Hive Metastore: {ext_hms.db_type}') + if version not in supported_versions: + raise ValueError(f'Unsupported Hive Metastore Version: {ext_hms.db_type} - {version}') + if not ext_hms.user: + ext_hms = replace( + ext_hms, + user=self._get_value_from_config_key(spark_config, 'spark.hadoop.javax.jdo.option.ConnectionUserName'), + ) + if not ext_hms.password: + ext_hms = replace( + ext_hms, + password=self._get_value_from_config_key( + spark_config, 'spark.hadoop.javax.jdo.option.ConnectionPassword' + ), + ) + return ext_hms @classmethod - def _split_jdbc_url(cls, jdbc_url: str) -> tuple[str, str, int, str, str, str]: + def _split_jdbc_url(cls, jdbc_url: str) -> ExtHms: # Define the regex pattern to match the JDBC URL components pattern = re.compile( r'jdbc:(?P[a-zA-Z0-9]+)://(?P[^:/]+):(?P\d+)/(?P[^?]+)(\?user=(?P[^&]+)&password=(?P[^&]+))?' @@ -90,20 +132,14 @@ def _split_jdbc_url(cls, jdbc_url: str) -> tuple[str, str, int, str, str, str]: db_type = match.group('db_type') host = match.group('host') - port = int(match.group('port')) + port = match.group('port') database = match.group('database') user = match.group('user') password = match.group('password') - return db_type, host, port, database, user, password + return ExtHms(db_type, host, port, database, user, password, None) - - def register_internal_hms_as_federated_catalog(self) -> CatalogInfo: - if not self._enable_hms_federation: - raise RuntimeWarning('Run `databricks labs ucx enable-hms-federation` to enable HMS Federation') - name = self._workspace_info.current() - connection_info = self._get_or_create_int_connection(name) - assert connection_info.name is not None + def _register_federated_catalog(self, connection_info) -> CatalogInfo: try: return self._ws.catalogs.create( name=connection_info.name, @@ -131,21 +167,25 @@ def _get_or_create_int_connection(self, name: str) -> ConnectionInfo: return connection raise NotFound(f'Connection {name} not found') - def _get_or_create_ext_connection(self, name:str, ext_hms: ExtHms) -> ConnectionInfo: + def _get_or_create_ext_connection(self, name: str, ext_hms: ExtHms) -> ConnectionInfo: + options: dict[str, str] = { + "builtin": "true", + "database": ext_hms.database, + "db_type": ext_hms.db_type, + "host": ext_hms.host, + "port": ext_hms.port, + } + if ext_hms.user: + options["user"] = ext_hms.user + if ext_hms.password: + options["password"] = ext_hms.password + if ext_hms.version: + options["version"] = ext_hms.version try: return self._ws.connections.create( name=name, connection_type=ConnectionType.HIVE_METASTORE, # needs SDK change - options={ - "builtin": "true", - "database": ext_hms.database, - "db_type": ext_hms.db_type, - "host": ext_hms.host, - "password": ext_hms.password, - "port": ext_hms.port, - "user": ext_hms.user, - "version": ext_hms.version, - }, + options=options, ) except AlreadyExists: for connection in self._ws.connections.list(): From a0da09f38df50a1895981101ab0fa0d37ca2ef75 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Mon, 2 Dec 2024 10:04:42 -0500 Subject: [PATCH 03/10] Added test --- .../labs/ucx/hive_metastore/federation.py | 6 +- .../hive_metastore/test_federation.py | 9 +- tests/unit/hive_metastore/test_federation.py | 91 +++++++++++++++++-- 3 files changed, 93 insertions(+), 13 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/federation.py b/src/databricks/labs/ucx/hive_metastore/federation.py index 56b1902912..4c446dc7f2 100644 --- a/src/databricks/labs/ucx/hive_metastore/federation.py +++ b/src/databricks/labs/ucx/hive_metastore/federation.py @@ -79,7 +79,7 @@ def create_from_cli(self, prompts: Prompts): logger.info('Failed to get external Hive Metastore connection information') if ext_hms and prompts.confirm( - f'Identified a supported external Hive Metastore connection: {ext_hms.db_type}. Use this connection?' + f'A supported external Hive Metastore connection was identified: {ext_hms.db_type}. Use this connection?' ): connection_info = self._get_or_create_ext_connection(name, ext_hms) else: @@ -92,8 +92,6 @@ def _get_ext_hms(self) -> ExtHms: if not config.spark_conf: raise ValueError('Spark config not found') spark_config = config.spark_conf - if not spark_config: - raise ValueError('Spark config not found') jdbc_url = self._get_value_from_config_key(spark_config, 'spark.hadoop.javax.jdo.option.ConnectionURL') if not jdbc_url: raise ValueError('JDBC URL not found') @@ -169,7 +167,7 @@ def _get_or_create_int_connection(self, name: str) -> ConnectionInfo: def _get_or_create_ext_connection(self, name: str, ext_hms: ExtHms) -> ConnectionInfo: options: dict[str, str] = { - "builtin": "true", + "builtin": "false", "database": ext_hms.database, "db_type": ext_hms.db_type, "host": ext_hms.host, diff --git a/tests/integration/hive_metastore/test_federation.py b/tests/integration/hive_metastore/test_federation.py index fe6501bcec..b9a82b6a0d 100644 --- a/tests/integration/hive_metastore/test_federation.py +++ b/tests/integration/hive_metastore/test_federation.py @@ -1,6 +1,7 @@ from unittest.mock import create_autospec import pytest +from databricks.labs.blueprint.tui import MockPrompts from databricks.sdk import WorkspaceClient from databricks.labs.ucx.account.workspaces import WorkspaceInfo @@ -14,13 +15,15 @@ def ws(): @pytest.mark.skip("needs to be enabled") -def test_federation(ws, sql_backend): +def test_federation(ws, ctx, sql_backend): schema = 'ucx' + installation = ctx.installation tables_crawler = TablesCrawler(sql_backend, schema) mounts_crawler = MountsCrawler(sql_backend, ws, schema) external_locations = ExternalLocations(ws, sql_backend, schema, tables_crawler, mounts_crawler) workspace_info = create_autospec(WorkspaceInfo) workspace_info.current.return_value = 'some_thing' - federation = HiveMetastoreFederation(ws, external_locations, workspace_info) - federation.register_internal_hms_as_federated_catalog() + federation = HiveMetastoreFederation(installation, ws, external_locations, workspace_info) + prompts = MockPrompts({}) + federation.create_from_cli(prompts) workspace_info.current.assert_called_once() diff --git a/tests/unit/hive_metastore/test_federation.py b/tests/unit/hive_metastore/test_federation.py index c1ee7b3a30..32d3926ad8 100644 --- a/tests/unit/hive_metastore/test_federation.py +++ b/tests/unit/hive_metastore/test_federation.py @@ -1,6 +1,8 @@ +import base64 from unittest.mock import create_autospec, call from databricks.labs.blueprint.installation import MockInstallation +from databricks.labs.blueprint.tui import MockPrompts from databricks.sdk import WorkspaceClient from databricks.sdk.errors import AlreadyExists from databricks.sdk.service.catalog import ( @@ -14,6 +16,7 @@ ConnectionInfo, ) from databricks.sdk.service.iam import User +from databricks.sdk.service.workspace import GetSecretResponse from databricks.labs.ucx.account.workspaces import WorkspaceInfo from databricks.labs.ucx.config import WorkspaceConfig @@ -26,7 +29,7 @@ from databricks.labs.ucx.hive_metastore.locations import ExternalLocation -def test_create_federated_catalog(): +def test_create_federated_catalog_int(mock_installation): workspace_client = create_autospec(WorkspaceClient) external_locations = create_autospec(ExternalLocations) workspace_info = create_autospec(WorkspaceInfo) @@ -47,8 +50,11 @@ def test_create_federated_catalog(): privilege_assignments=[PrivilegeAssignment(privileges=[Privilege.MANAGE], principal='any')] ) - hms_fed = HiveMetastoreFederation(workspace_client, external_locations, workspace_info, enable_hms_federation=True) - hms_fed.register_internal_hms_as_federated_catalog() + hms_fed = HiveMetastoreFederation( + mock_installation, workspace_client, external_locations, workspace_info, enable_hms_federation=True + ) + + hms_fed.create_from_cli(MockPrompts({})) workspace_client.connections.create.assert_called_with( name='a', @@ -77,7 +83,75 @@ def test_create_federated_catalog(): assert calls == workspace_client.grants.method_calls -def test_already_existing_connection(): +def test_create_federated_catalog_ext(mock_installation): + workspace_client = create_autospec(WorkspaceClient) + external_locations = create_autospec(ExternalLocations) + workspace_info = create_autospec(WorkspaceInfo) + + workspace_info.current.return_value = 'a' + external_locations.snapshot.return_value = [ + ExternalLocation('s3://b/c/d', 1), + ] + workspace_client.current_user.me.return_value = User(user_name='serge') + workspace_client.connections.create.return_value = CatalogInfo(name='a') + workspace_client.secrets.get_secret.return_value = GetSecretResponse( + key='secret_key', value=base64.standard_b64encode('bar'.encode()).decode() + ) + workspace_client.external_locations.list.return_value = [ + ExternalLocationInfo(url='s3://b/c/d', name='b'), + ] + workspace_client.grants.get.return_value = PermissionsList( + privilege_assignments=[PrivilegeAssignment(privileges=[Privilege.MANAGE], principal='any')] + ) + mock_installation.load = lambda _: WorkspaceConfig( + inventory_database='ucx', + spark_conf={ + "spark.hadoop.javax.jdo.option.ConnectionDriverName": "org.mariadb.jdbc.Driver", + "spark.hadoop.javax.jdo.option.ConnectionPassword": "{{secrets/secret_scope/secret_key}}", + "spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:mysql://hostname.us-east-2.rds.amazonaws.com:3306/metastore", + "spark.hadoop.javax.jdo.option.ConnectionUserName": "foo", + "spark.sql.hive.metastore.jars": "maven", + "spark.sql.hive.metastore.version": "2.3.0", + }, + ) + + hms_fed = HiveMetastoreFederation( + mock_installation, workspace_client, external_locations, workspace_info, enable_hms_federation=True + ) + + hms_fed.create_from_cli(MockPrompts({"A supported external Hive Metastore.*": "yes"})) + + workspace_client.connections.create.assert_called_with( + name='a', + connection_type=ConnectionType.HIVE_METASTORE, + options={ + 'builtin': 'false', + 'database': 'metastore', + 'db_type': 'mysql', + 'host': 'hostname.us-east-2.rds.amazonaws.com', + 'password': 'bar', + 'port': '3306', + 'user': 'foo', + 'version': '2.3.0', + }, + ) + workspace_client.catalogs.create.assert_called_with( + name='a', + connection_name='a', + options={"authorized_paths": 's3://b/c/d'}, + ) + calls = [ + call.get(SecurableType.EXTERNAL_LOCATION, 'b'), + call.update( + SecurableType.EXTERNAL_LOCATION, + 'b', + changes=[PermissionsChange(principal='serge', add=[Privilege.CREATE_FOREIGN_CATALOG])], + ), + ] + assert calls == workspace_client.grants.method_calls + + +def test_already_existing_connection(mock_installation): workspace_client = create_autospec(WorkspaceClient) external_locations = create_autospec(ExternalLocations) workspace_info = create_autospec(WorkspaceInfo) @@ -99,8 +173,13 @@ def test_already_existing_connection(): privilege_assignments=[PrivilegeAssignment(privileges=[Privilege.MANAGE], principal='any')] ) - hms_fed = HiveMetastoreFederation(workspace_client, external_locations, workspace_info, enable_hms_federation=True) - hms_fed.register_internal_hms_as_federated_catalog() + hms_fed = HiveMetastoreFederation( + mock_installation, workspace_client, external_locations, workspace_info, enable_hms_federation=True + ) + + prompts = MockPrompts({}) + + hms_fed.create_from_cli(prompts) workspace_client.connections.create.assert_called_with( name='a', From a4bbc5b802d1b007d45bb7c33ff727a940b54b03 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Thu, 5 Dec 2024 16:15:57 -0500 Subject: [PATCH 04/10] Addressed issue with version --- src/databricks/labs/ucx/hive_metastore/federation.py | 12 +++++++++--- tests/unit/hive_metastore/test_federation.py | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/federation.py b/src/databricks/labs/ucx/hive_metastore/federation.py index 4c446dc7f2..a040e6cbc4 100644 --- a/src/databricks/labs/ucx/hive_metastore/federation.py +++ b/src/databricks/labs/ucx/hive_metastore/federation.py @@ -63,8 +63,9 @@ def __init__( self._enable_hms_federation = enable_hms_federation self._installation = installation + # Supported databases and version for HMS Federation supported_db_vers: ClassVar[dict[str, list[str]]] = { - "mysql": ["2.3.0", "0.13"], + "mysql": ["2.3", "0.13"], } def create_from_cli(self, prompts: Prompts): @@ -76,7 +77,7 @@ def create_from_cli(self, prompts: Prompts): try: ext_hms = self._get_ext_hms() except ValueError: - logger.info('Failed to get external Hive Metastore connection information') + logger.info('Failed to retrieve external Hive Metastore connection information') if ext_hms and prompts.confirm( f'A supported external Hive Metastore connection was identified: {ext_hms.db_type}. Use this connection?' @@ -96,8 +97,13 @@ def _get_ext_hms(self) -> ExtHms: if not jdbc_url: raise ValueError('JDBC URL not found') version = self._get_value_from_config_key(spark_config, 'spark.sql.hive.metastore.version') + # extract major version from version using regex if not version: raise ValueError('Hive Metastore version not found') + major_version = re.match(r'(\d+\.\d+)', version) + if not major_version: + raise ValueError(f'Invalid Hive Metastore version: {version}') + version = major_version.group(1) ext_hms = replace(self._split_jdbc_url(jdbc_url), version=version) supported_versions = self.supported_db_vers.get(ext_hms.db_type) if not supported_versions: @@ -167,7 +173,7 @@ def _get_or_create_int_connection(self, name: str) -> ConnectionInfo: def _get_or_create_ext_connection(self, name: str, ext_hms: ExtHms) -> ConnectionInfo: options: dict[str, str] = { - "builtin": "false", + # TODO: Fix once the FEDERATION end point is fixed. Include "builtin": "false" in options "database": ext_hms.database, "db_type": ext_hms.db_type, "host": ext_hms.host, diff --git a/tests/unit/hive_metastore/test_federation.py b/tests/unit/hive_metastore/test_federation.py index 32d3926ad8..66b109052b 100644 --- a/tests/unit/hive_metastore/test_federation.py +++ b/tests/unit/hive_metastore/test_federation.py @@ -132,7 +132,7 @@ def test_create_federated_catalog_ext(mock_installation): 'password': 'bar', 'port': '3306', 'user': 'foo', - 'version': '2.3.0', + 'version': '2.3', }, ) workspace_client.catalogs.create.assert_called_with( From b040c92b9fd6cda949a594a0aa425c7ca1e74627 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 13 Dec 2024 17:59:04 -0500 Subject: [PATCH 05/10] Addressed Changes to privilege values --- tests/unit/hive_metastore/test_federation.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/unit/hive_metastore/test_federation.py b/tests/unit/hive_metastore/test_federation.py index 66b109052b..96be9679e3 100644 --- a/tests/unit/hive_metastore/test_federation.py +++ b/tests/unit/hive_metastore/test_federation.py @@ -125,7 +125,6 @@ def test_create_federated_catalog_ext(mock_installation): name='a', connection_type=ConnectionType.HIVE_METASTORE, options={ - 'builtin': 'false', 'database': 'metastore', 'db_type': 'mysql', 'host': 'hostname.us-east-2.rds.amazonaws.com', @@ -145,7 +144,7 @@ def test_create_federated_catalog_ext(mock_installation): call.update( SecurableType.EXTERNAL_LOCATION, 'b', - changes=[PermissionsChange(principal='serge', add=[Privilege.CREATE_FOREIGN_CATALOG])], + changes=[PermissionsChange(principal='serge', add=[Privilege.CREATE_FOREIGN_SECURABLE])], ), ] assert calls == workspace_client.grants.method_calls From b1620d5fc5fd8d981331855baa57a00f52a968dc Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Wed, 18 Dec 2024 11:42:46 -0500 Subject: [PATCH 06/10] Fixed testing issues --- tests/unit/hive_metastore/test_federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/hive_metastore/test_federation.py b/tests/unit/hive_metastore/test_federation.py index 96be9679e3..e3148eb5c2 100644 --- a/tests/unit/hive_metastore/test_federation.py +++ b/tests/unit/hive_metastore/test_federation.py @@ -89,7 +89,7 @@ def test_create_federated_catalog_ext(mock_installation): workspace_info = create_autospec(WorkspaceInfo) workspace_info.current.return_value = 'a' - external_locations.snapshot.return_value = [ + external_locations.external_locations_with_root.return_value = [ ExternalLocation('s3://b/c/d', 1), ] workspace_client.current_user.me.return_value = User(user_name='serge') From b4c3151fbfa786f0896eff5b9ac2da1ddadfe94a Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Thu, 19 Dec 2024 10:44:08 -0500 Subject: [PATCH 07/10] Addressed Review Comments --- src/databricks/labs/ucx/aws/locations.py | 1 + src/databricks/labs/ucx/azure/locations.py | 1 + .../labs/ucx/contexts/application.py | 2 +- .../labs/ucx/contexts/workspace_cli.py | 8 +- .../labs/ucx/hive_metastore/federation.py | 159 ++++++++++-------- .../labs/ucx/hive_metastore/locations.py | 1 + .../hive_metastore/test_federation.py | 3 +- tests/unit/hive_metastore/test_federation.py | 32 +++- 8 files changed, 124 insertions(+), 83 deletions(-) diff --git a/src/databricks/labs/ucx/aws/locations.py b/src/databricks/labs/ucx/aws/locations.py index 863c589e8c..0f2ae3dfee 100644 --- a/src/databricks/labs/ucx/aws/locations.py +++ b/src/databricks/labs/ucx/aws/locations.py @@ -21,6 +21,7 @@ def __init__( external_locations: ExternalLocations, aws_resource_permissions: AWSResourcePermissions, principal_acl: PrincipalACL, + *, enable_hms_federation: bool = False, ): self._ws = ws diff --git a/src/databricks/labs/ucx/azure/locations.py b/src/databricks/labs/ucx/azure/locations.py index 2e70654f47..5a539a9dd1 100644 --- a/src/databricks/labs/ucx/azure/locations.py +++ b/src/databricks/labs/ucx/azure/locations.py @@ -20,6 +20,7 @@ def __init__( resource_permissions: AzureResourcePermissions, azurerm: AzureResources, principal_acl: PrincipalACL, + *, enable_hms_federation: bool = False, ): self._ws = ws diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index a01f321c35..65a0c7a871 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -415,7 +415,7 @@ def external_locations(self) -> ExternalLocations: self.inventory_database, self.tables_crawler, self.mounts_crawler, - self.config.enable_hms_federation, + enable_hms_federation=self.config.enable_hms_federation, ) @cached_property diff --git a/src/databricks/labs/ucx/contexts/workspace_cli.py b/src/databricks/labs/ucx/contexts/workspace_cli.py index e56b0766d9..9ad07d12ca 100644 --- a/src/databricks/labs/ucx/contexts/workspace_cli.py +++ b/src/databricks/labs/ucx/contexts/workspace_cli.py @@ -118,7 +118,7 @@ def external_locations_migration(self) -> AWSExternalLocationsMigration | Extern self.external_locations, self.aws_resource_permissions, self.principal_acl, - self.config.enable_hms_federation, + enable_hms_federation=self.config.enable_hms_federation, ) if self.is_azure: return ExternalLocationsMigration( @@ -127,7 +127,7 @@ def external_locations_migration(self) -> AWSExternalLocationsMigration | Extern self.azure_resource_permissions, self.azure_resources, self.principal_acl, - self.config.enable_hms_federation, + enable_hms_federation=self.config.enable_hms_federation, ) raise NotImplementedError @@ -197,11 +197,11 @@ def federation_enabler(self): @cached_property def federation(self): return HiveMetastoreFederation( - self.installation, self.workspace_client, self.external_locations, self.workspace_info, - self.config.enable_hms_federation, + self.config, + enable_hms_federation=self.config.enable_hms_federation, ) diff --git a/src/databricks/labs/ucx/hive_metastore/federation.py b/src/databricks/labs/ucx/hive_metastore/federation.py index a040e6cbc4..8274caba1a 100644 --- a/src/databricks/labs/ucx/hive_metastore/federation.py +++ b/src/databricks/labs/ucx/hive_metastore/federation.py @@ -2,7 +2,10 @@ import logging import re from dataclasses import dataclass, replace +from functools import cached_property from typing import ClassVar +from packaging.version import Version, InvalidVersion + from databricks.labs.blueprint.installation import Installation from databricks.labs.blueprint.tui import Prompts @@ -27,9 +30,13 @@ @dataclass -class ExtHms: - # This is a dataclass that represents the external Hive Metastore connection information - db_type: str +class ExternalHmsInfo: + """ + This is a dataclass that represents the external Hive Metastore connection information. + It supports non glue external metastores. + """ + + database_type: str host: str port: str database: str @@ -37,6 +44,14 @@ class ExtHms: password: str | None version: str | None + def as_dict(self) -> dict[str, str]: + return { + "database": self.database, + "db_type": self.database_type, + "host": self.host, + "port": self.port, + } + class HiveMetastoreFederationEnabler: def __init__(self, installation: Installation): @@ -51,81 +66,88 @@ def enable(self): class HiveMetastoreFederation(SecretsMixin): def __init__( self, - installation: Installation, ws: WorkspaceClient, external_locations: ExternalLocations, workspace_info: WorkspaceInfo, + config: WorkspaceConfig, + *, enable_hms_federation: bool = False, ): self._ws = ws self._external_locations = external_locations self._workspace_info = workspace_info self._enable_hms_federation = enable_hms_federation - self._installation = installation + self._config = config # Supported databases and version for HMS Federation - supported_db_vers: ClassVar[dict[str, list[str]]] = { + supported_database_versions: ClassVar[dict[str, list[str]]] = { "mysql": ["2.3", "0.13"], } - def create_from_cli(self, prompts: Prompts): + def create_from_cli(self, prompts: Prompts) -> None: if not self._enable_hms_federation: raise RuntimeWarning('Run `databricks labs ucx enable-hms-federation` to enable HMS Federation') - name = self._workspace_info.current() - ext_hms = None - try: - ext_hms = self._get_ext_hms() - except ValueError: - logger.info('Failed to retrieve external Hive Metastore connection information') + name = prompts.question( + 'Enter the name of the Hive Metastore connection and catalog', default=self._workspace_info.current() + ) - if ext_hms and prompts.confirm( - f'A supported external Hive Metastore connection was identified: {ext_hms.db_type}. Use this connection?' + if self._external_hms and prompts.confirm( + f'A supported external Hive Metastore connection was identified: {self._external_hms.database_type}. ' + f'Use this connection?' ): - connection_info = self._get_or_create_ext_connection(name, ext_hms) + connection_info = self._get_or_create_ext_connection(name, self._external_hms) else: connection_info = self._get_or_create_int_connection(name) + assert connection_info.name is not None - return self._register_federated_catalog(connection_info) + self._register_federated_catalog(connection_info) - def _get_ext_hms(self) -> ExtHms: - config = self._installation.load(WorkspaceConfig) - if not config.spark_conf: - raise ValueError('Spark config not found') - spark_config = config.spark_conf + @cached_property + def _external_hms(self) -> ExternalHmsInfo | None: + if not self._config.spark_conf: + logger.info('Spark config not found') + return None + spark_config = self._config.spark_conf jdbc_url = self._get_value_from_config_key(spark_config, 'spark.hadoop.javax.jdo.option.ConnectionURL') if not jdbc_url: - raise ValueError('JDBC URL not found') - version = self._get_value_from_config_key(spark_config, 'spark.sql.hive.metastore.version') - # extract major version from version using regex - if not version: - raise ValueError('Hive Metastore version not found') - major_version = re.match(r'(\d+\.\d+)', version) - if not major_version: - raise ValueError(f'Invalid Hive Metastore version: {version}') - version = major_version.group(1) - ext_hms = replace(self._split_jdbc_url(jdbc_url), version=version) - supported_versions = self.supported_db_vers.get(ext_hms.db_type) + logger.info('JDBC URL not found') + return None + version_value = self._get_value_from_config_key(spark_config, 'spark.sql.hive.metastore.version') + if not version_value: + logger.info('Hive Metastore version not found') + return None + try: + version = Version(version_value) + except InvalidVersion: + logger.info('Hive Metastore version is not valid') + return None + major_minor_version = f"{version.major}.{version.minor}" + external_hms = replace(self._split_jdbc_url(jdbc_url), version=major_minor_version) + supported_versions = self.supported_database_versions.get(external_hms.database_type) if not supported_versions: - raise ValueError(f'Unsupported Hive Metastore: {ext_hms.db_type}') + logger.info(f'Unsupported Hive Metastore: {external_hms.database_type}') + return None if version not in supported_versions: - raise ValueError(f'Unsupported Hive Metastore Version: {ext_hms.db_type} - {version}') - if not ext_hms.user: - ext_hms = replace( - ext_hms, + logger.info(f'Unsupported Hive Metastore Version: {external_hms.database_type} - {version}') + return None + + if not external_hms.user: + external_hms = replace( + external_hms, user=self._get_value_from_config_key(spark_config, 'spark.hadoop.javax.jdo.option.ConnectionUserName'), ) - if not ext_hms.password: - ext_hms = replace( - ext_hms, + if not external_hms.password: + external_hms = replace( + external_hms, password=self._get_value_from_config_key( spark_config, 'spark.hadoop.javax.jdo.option.ConnectionPassword' ), ) - return ext_hms + return external_hms @classmethod - def _split_jdbc_url(cls, jdbc_url: str) -> ExtHms: + def _split_jdbc_url(cls, jdbc_url: str) -> ExternalHmsInfo: # Define the regex pattern to match the JDBC URL components pattern = re.compile( r'jdbc:(?P[a-zA-Z0-9]+)://(?P[^:/]+):(?P\d+)/(?P[^?]+)(\?user=(?P[^&]+)&password=(?P[^&]+))?' @@ -141,9 +163,12 @@ def _split_jdbc_url(cls, jdbc_url: str) -> ExtHms: user = match.group('user') password = match.group('password') - return ExtHms(db_type, host, port, database, user, password, None) + return ExternalHmsInfo(db_type, host, port, database, user, password, None) - def _register_federated_catalog(self, connection_info) -> CatalogInfo: + def _register_federated_catalog( + self, + connection_info, + ) -> CatalogInfo: try: return self._ws.catalogs.create( name=connection_info.name, @@ -166,25 +191,22 @@ def _get_or_create_int_connection(self, name: str) -> ConnectionInfo: options={"builtin": "true"}, ) except AlreadyExists: - for connection in self._ws.connections.list(): - if connection.name == name: - return connection + return self._get_existing_connection(name) + + def _get_existing_connection(self, name: str) -> ConnectionInfo: + for connection in self._ws.connections.list(): + if connection.name == name: + return connection raise NotFound(f'Connection {name} not found') - def _get_or_create_ext_connection(self, name: str, ext_hms: ExtHms) -> ConnectionInfo: - options: dict[str, str] = { - # TODO: Fix once the FEDERATION end point is fixed. Include "builtin": "false" in options - "database": ext_hms.database, - "db_type": ext_hms.db_type, - "host": ext_hms.host, - "port": ext_hms.port, - } - if ext_hms.user: - options["user"] = ext_hms.user - if ext_hms.password: - options["password"] = ext_hms.password - if ext_hms.version: - options["version"] = ext_hms.version + def _get_or_create_ext_connection(self, name: str, external_hms: ExternalHmsInfo) -> ConnectionInfo: + options = external_hms.as_dict() + if external_hms.user: + options["user"] = external_hms.user + if external_hms.password: + options["password"] = external_hms.password + if external_hms.version: + options["version"] = external_hms.version try: return self._ws.connections.create( name=name, @@ -192,10 +214,7 @@ def _get_or_create_ext_connection(self, name: str, ext_hms: ExtHms) -> Connectio options=options, ) except AlreadyExists: - for connection in self._ws.connections.list(): - if connection.name == name: - return connection - raise NotFound(f'Connection {name} not found') + return self._get_existing_connection(name) def _get_authorized_paths(self) -> str: existing = {} @@ -205,7 +224,13 @@ def _get_authorized_paths(self) -> str: current_user = self._ws.current_user.me() if not current_user.user_name: raise NotFound('Current user not found') - for external_location_info in self._external_locations.external_locations_with_root(): + # Get the external locations. If not using external HMS, include the root DBFS location. + if self._external_hms is not None: + external_locations = self._external_locations.external_locations_with_root() + else: + external_locations = list(self._external_locations.snapshot()) + + for external_location_info in external_locations: location = ExternalLocations.clean_location(external_location_info.location) existing_location = existing.get(location) if not existing_location: diff --git a/src/databricks/labs/ucx/hive_metastore/locations.py b/src/databricks/labs/ucx/hive_metastore/locations.py index 226bc791db..5a8d703150 100644 --- a/src/databricks/labs/ucx/hive_metastore/locations.py +++ b/src/databricks/labs/ucx/hive_metastore/locations.py @@ -155,6 +155,7 @@ def __init__( schema: str, tables_crawler: TablesCrawler, mounts_crawler: 'MountsCrawler', + *, enable_hms_federation: bool = False, ): super().__init__(sql_backend, "hive_metastore", schema, "external_locations", ExternalLocation) diff --git a/tests/integration/hive_metastore/test_federation.py b/tests/integration/hive_metastore/test_federation.py index b9a82b6a0d..0fed17e7d9 100644 --- a/tests/integration/hive_metastore/test_federation.py +++ b/tests/integration/hive_metastore/test_federation.py @@ -17,13 +17,12 @@ def ws(): @pytest.mark.skip("needs to be enabled") def test_federation(ws, ctx, sql_backend): schema = 'ucx' - installation = ctx.installation tables_crawler = TablesCrawler(sql_backend, schema) mounts_crawler = MountsCrawler(sql_backend, ws, schema) external_locations = ExternalLocations(ws, sql_backend, schema, tables_crawler, mounts_crawler) workspace_info = create_autospec(WorkspaceInfo) workspace_info.current.return_value = 'some_thing' - federation = HiveMetastoreFederation(installation, ws, external_locations, workspace_info) + federation = HiveMetastoreFederation(ws, external_locations, workspace_info, ctx.config) prompts = MockPrompts({}) federation.create_from_cli(prompts) workspace_info.current.assert_called_once() diff --git a/tests/unit/hive_metastore/test_federation.py b/tests/unit/hive_metastore/test_federation.py index e3148eb5c2..44a91de716 100644 --- a/tests/unit/hive_metastore/test_federation.py +++ b/tests/unit/hive_metastore/test_federation.py @@ -35,7 +35,7 @@ def test_create_federated_catalog_int(mock_installation): workspace_info = create_autospec(WorkspaceInfo) workspace_info.current.return_value = 'a' - external_locations.external_locations_with_root.return_value = [ + external_locations.snapshot.return_value = [ ExternalLocation('s3://b/c/d', 1), ExternalLocation('s3://e/f/g', 1), ExternalLocation('s3://h/i/j', 1), @@ -51,10 +51,14 @@ def test_create_federated_catalog_int(mock_installation): ) hms_fed = HiveMetastoreFederation( - mock_installation, workspace_client, external_locations, workspace_info, enable_hms_federation=True + workspace_client, + external_locations, + workspace_info, + mock_installation.load(WorkspaceConfig), + enable_hms_federation=True, ) - hms_fed.create_from_cli(MockPrompts({})) + hms_fed.create_from_cli(MockPrompts({".*": ""})) workspace_client.connections.create.assert_called_with( name='a', @@ -116,13 +120,19 @@ def test_create_federated_catalog_ext(mock_installation): ) hms_fed = HiveMetastoreFederation( - mock_installation, workspace_client, external_locations, workspace_info, enable_hms_federation=True + workspace_client, + external_locations, + workspace_info, + mock_installation.load(WorkspaceConfig), + enable_hms_federation=True, ) - hms_fed.create_from_cli(MockPrompts({"A supported external Hive Metastore.*": "yes"})) + hms_fed.create_from_cli( + MockPrompts({"A supported external Hive Metastore.*": "yes", "Enter the name*": "fed_source"}) + ) workspace_client.connections.create.assert_called_with( - name='a', + name='fed_source', connection_type=ConnectionType.HIVE_METASTORE, options={ 'database': 'metastore', @@ -156,7 +166,7 @@ def test_already_existing_connection(mock_installation): workspace_info = create_autospec(WorkspaceInfo) workspace_info.current.return_value = 'a' - external_locations.external_locations_with_root.return_value = [ + external_locations.snapshot.return_value = [ ExternalLocation('s3://b/c/d', 1), ExternalLocation('s3://e/f/g', 1), ExternalLocation('s3://h/i/j', 1), @@ -173,10 +183,14 @@ def test_already_existing_connection(mock_installation): ) hms_fed = HiveMetastoreFederation( - mock_installation, workspace_client, external_locations, workspace_info, enable_hms_federation=True + workspace_client, + external_locations, + workspace_info, + mock_installation.load(WorkspaceConfig), + enable_hms_federation=True, ) - prompts = MockPrompts({}) + prompts = MockPrompts({".*": ""}) hms_fed.create_from_cli(prompts) From 31093f5576c5f8ba63b6455554853da669389284 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Thu, 19 Dec 2024 10:54:26 -0500 Subject: [PATCH 08/10] Fixed Version Issue --- src/databricks/labs/ucx/hive_metastore/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/hive_metastore/federation.py b/src/databricks/labs/ucx/hive_metastore/federation.py index 8274caba1a..f1f5e69406 100644 --- a/src/databricks/labs/ucx/hive_metastore/federation.py +++ b/src/databricks/labs/ucx/hive_metastore/federation.py @@ -128,7 +128,7 @@ def _external_hms(self) -> ExternalHmsInfo | None: if not supported_versions: logger.info(f'Unsupported Hive Metastore: {external_hms.database_type}') return None - if version not in supported_versions: + if major_minor_version not in supported_versions: logger.info(f'Unsupported Hive Metastore Version: {external_hms.database_type} - {version}') return None From 758c00c5e591a95a9db1b2065604ad05ff8840c6 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 20 Dec 2024 09:26:12 -0500 Subject: [PATCH 09/10] Addressed Review Comments --- src/databricks/labs/ucx/aws/access.py | 2 +- src/databricks/labs/ucx/hive_metastore/locations.py | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/databricks/labs/ucx/aws/access.py b/src/databricks/labs/ucx/aws/access.py index 138eb4916d..539471feed 100644 --- a/src/databricks/labs/ucx/aws/access.py +++ b/src/databricks/labs/ucx/aws/access.py @@ -226,7 +226,7 @@ def get_roles_to_migrate(self) -> list[AWSCredentialCandidate]: """ Identify the roles that need to be migrated to UC from the UC compatible roles list. """ - external_locations = self._locations.external_locations_with_root() + external_locations = list(self._locations.external_locations_with_root()) logger.info(f"Found {len(external_locations)} external locations") compatible_roles = self.load_uc_compatible_roles() roles: dict[str, AWSCredentialCandidate] = {} diff --git a/src/databricks/labs/ucx/hive_metastore/locations.py b/src/databricks/labs/ucx/hive_metastore/locations.py index 5a8d703150..40ceef378a 100644 --- a/src/databricks/labs/ucx/hive_metastore/locations.py +++ b/src/databricks/labs/ucx/hive_metastore/locations.py @@ -175,21 +175,20 @@ def clean_location(location: str) -> str: # Having s3a and s3 as separate locations will cause issues when trying to find overlapping locations return re.sub(r"^s3a:/", r"s3:/", location).rstrip("/") - def external_locations_with_root(self) -> list[ExternalLocation]: + def external_locations_with_root(self) -> Iterable[ExternalLocation]: """ Produces a list of external locations with the DBFS root location appended to the list. Utilizes the snapshot method. Used for HMS Federation. - Returns: - List of ExternalLocation objects + Yields: + Iterable[Result]: Combination of all the external locations and the DBFS root location """ - external_locations = list(self.snapshot()) + yield from self.snapshot() dbfs_root = self._get_dbfs_root() if dbfs_root: - external_locations.append(dbfs_root) - return external_locations + yield dbfs_root def _get_dbfs_root(self) -> ExternalLocation | None: """ From 3b5fec104d535aca07bf4246d625ca3f1a94fd79 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 20 Dec 2024 09:30:56 -0500 Subject: [PATCH 10/10] Fixed Unit Test --- src/databricks/labs/ucx/hive_metastore/federation.py | 2 +- tests/unit/hive_metastore/test_locations.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/federation.py b/src/databricks/labs/ucx/hive_metastore/federation.py index f1f5e69406..f81248779e 100644 --- a/src/databricks/labs/ucx/hive_metastore/federation.py +++ b/src/databricks/labs/ucx/hive_metastore/federation.py @@ -228,7 +228,7 @@ def _get_authorized_paths(self) -> str: if self._external_hms is not None: external_locations = self._external_locations.external_locations_with_root() else: - external_locations = list(self._external_locations.snapshot()) + external_locations = self._external_locations.snapshot() for external_location_info in external_locations: location = ExternalLocations.clean_location(external_location_info.location) diff --git a/tests/unit/hive_metastore/test_locations.py b/tests/unit/hive_metastore/test_locations.py index 9cbc009458..b42a330811 100644 --- a/tests/unit/hive_metastore/test_locations.py +++ b/tests/unit/hive_metastore/test_locations.py @@ -737,7 +737,7 @@ def test_resolve_dbfs_root_in_hms_federation(): external_locations = ExternalLocations( ws, sql_backend, "test", tables_crawler, mounts_crawler, enable_hms_federation=True ) - results = external_locations.external_locations_with_root() + results = list(external_locations.external_locations_with_root()) mounts_crawler.snapshot.assert_not_called() assert results == [ ExternalLocation("s3://test_location/test1", 1),