Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL External HMS Support for HMS Federation #3385

Merged
merged 10 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,10 +870,10 @@ def export_assessment(w: WorkspaceClient, prompts: Prompts):


@ucx.command
def create_federated_catalog(w: WorkspaceClient, _: Prompts):
def create_federated_catalog(w: WorkspaceClient, prompts: Prompts):
"""(Experimental) Create federated catalog from current workspace Hive Metastore."""
ctx = WorkspaceContext(w)
ctx.federation.register_internal_hms_as_federated_catalog()
ctx.federation.create_from_cli(prompts)


@ucx.command
Expand Down
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/contexts/workspace_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def federation_enabler(self):
@cached_property
def federation(self):
return HiveMetastoreFederation(
self.installation,
self.workspace_client,
self.external_locations,
self.workspace_info,
Expand Down
148 changes: 134 additions & 14 deletions src/databricks/labs/ucx/hive_metastore/federation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import collections
import logging
import re
from dataclasses import dataclass, replace
from typing import ClassVar

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.tui import Prompts
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import AlreadyExists, NotFound, BadRequest
from databricks.sdk.service.catalog import (
Expand All @@ -14,13 +18,26 @@
)

from databricks.labs.ucx.account.workspaces import WorkspaceInfo
from databricks.labs.ucx.assessment.secrets import SecretsMixin
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.hive_metastore import ExternalLocations


logger = logging.getLogger(__name__)


@dataclass
class ExtHms:
FastLee marked this conversation as resolved.
Show resolved Hide resolved
# This is a dataclass that represents the external Hive Metastore connection information
db_type: str
FastLee marked this conversation as resolved.
Show resolved Hide resolved
FastLee marked this conversation as resolved.
Show resolved Hide resolved
host: str
port: str
database: str
user: str | None
password: str | None
version: str | None
FastLee marked this conversation as resolved.
Show resolved Hide resolved


class HiveMetastoreFederationEnabler:
def __init__(self, installation: Installation):
self._installation = installation
Expand All @@ -31,58 +48,161 @@ def enable(self):
self._installation.save(config)


class HiveMetastoreFederation:
class HiveMetastoreFederation(SecretsMixin):
def __init__(
self,
workspace_client: WorkspaceClient,
installation: Installation,
ws: WorkspaceClient,
external_locations: ExternalLocations,
workspace_info: WorkspaceInfo,
enable_hms_federation: bool = False,
):
self._workspace_client = workspace_client
self._ws = ws
self._external_locations = external_locations
self._workspace_info = workspace_info
self._enable_hms_federation = enable_hms_federation
self._installation = installation

def register_internal_hms_as_federated_catalog(self) -> CatalogInfo:
# Supported databases and version for HMS Federation
supported_db_vers: ClassVar[dict[str, list[str]]] = {
FastLee marked this conversation as resolved.
Show resolved Hide resolved
"mysql": ["2.3", "0.13"],
}

def create_from_cli(self, prompts: Prompts):
FastLee marked this conversation as resolved.
Show resolved Hide resolved
if not self._enable_hms_federation:
raise RuntimeWarning('Run `databricks labs ucx enable-hms-federation` to enable HMS Federation')
name = self._workspace_info.current()
connection_info = self._get_or_create_connection(name)

ext_hms = None
try:
ext_hms = self._get_ext_hms()
except ValueError:
logger.info('Failed to retrieve external Hive Metastore connection information')
FastLee marked this conversation as resolved.
Show resolved Hide resolved

if ext_hms and prompts.confirm(
f'A supported external Hive Metastore connection was identified: {ext_hms.db_type}. Use this connection?'
):
connection_info = self._get_or_create_ext_connection(name, ext_hms)
else:
connection_info = self._get_or_create_int_connection(name)
assert connection_info.name is not None
FastLee marked this conversation as resolved.
Show resolved Hide resolved
return self._register_federated_catalog(connection_info)

def _get_ext_hms(self) -> ExtHms:
FastLee marked this conversation as resolved.
Show resolved Hide resolved
config = self._installation.load(WorkspaceConfig)
FastLee marked this conversation as resolved.
Show resolved Hide resolved
if not config.spark_conf:
raise ValueError('Spark config not found')
spark_config = config.spark_conf
jdbc_url = self._get_value_from_config_key(spark_config, 'spark.hadoop.javax.jdo.option.ConnectionURL')
if not jdbc_url:
raise ValueError('JDBC URL not found')
version = self._get_value_from_config_key(spark_config, 'spark.sql.hive.metastore.version')
# extract major version from version using regex
if not version:
raise ValueError('Hive Metastore version not found')
major_version = re.match(r'(\d+\.\d+)', version)
FastLee marked this conversation as resolved.
Show resolved Hide resolved
if not major_version:
raise ValueError(f'Invalid Hive Metastore version: {version}')
version = major_version.group(1)
ext_hms = replace(self._split_jdbc_url(jdbc_url), version=version)
supported_versions = self.supported_db_vers.get(ext_hms.db_type)
if not supported_versions:
raise ValueError(f'Unsupported Hive Metastore: {ext_hms.db_type}')
if version not in supported_versions:
raise ValueError(f'Unsupported Hive Metastore Version: {ext_hms.db_type} - {version}')
if not ext_hms.user:
ext_hms = replace(
ext_hms,
user=self._get_value_from_config_key(spark_config, 'spark.hadoop.javax.jdo.option.ConnectionUserName'),
)
if not ext_hms.password:
ext_hms = replace(
ext_hms,
password=self._get_value_from_config_key(
spark_config, 'spark.hadoop.javax.jdo.option.ConnectionPassword'
),
)
return ext_hms

@classmethod
def _split_jdbc_url(cls, jdbc_url: str) -> ExtHms:
# Define the regex pattern to match the JDBC URL components
pattern = re.compile(
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
r'jdbc:(?P<db_type>[a-zA-Z0-9]+)://(?P<host>[^:/]+):(?P<port>\d+)/(?P<database>[^?]+)(\?user=(?P<user>[^&]+)&password=(?P<password>[^&]+))?'
)
match = pattern.match(jdbc_url)
if not match:
raise ValueError(f'Unsupported JDBC URL: {jdbc_url}')

db_type = match.group('db_type')
host = match.group('host')
port = match.group('port')
database = match.group('database')
user = match.group('user')
password = match.group('password')

return ExtHms(db_type, host, port, database, user, password, None)

def _register_federated_catalog(self, connection_info) -> CatalogInfo:
try:
return self._workspace_client.catalogs.create(
return self._ws.catalogs.create(
name=connection_info.name,
connection_name=connection_info.name,
options={"authorized_paths": self._get_authorized_paths()},
)
except BadRequest as err:
if err.error_code == 'CATALOG_ALREADY_EXISTS':
logger.info(f'Catalog {connection_info.name} already exists')
for catalog_info in self._workspace_client.catalogs.list():
for catalog_info in self._ws.catalogs.list():
if catalog_info.name == connection_info.name:
return catalog_info
raise err

def _get_or_create_connection(self, name: str) -> ConnectionInfo:
def _get_or_create_int_connection(self, name: str) -> ConnectionInfo:
FastLee marked this conversation as resolved.
Show resolved Hide resolved
try:
return self._workspace_client.connections.create(
return self._ws.connections.create(
name=name,
connection_type=ConnectionType.HIVE_METASTORE, # needs SDK change
options={"builtin": "true"},
)
except AlreadyExists:
for connection in self._workspace_client.connections.list():
for connection in self._ws.connections.list():
if connection.name == name:
return connection
raise NotFound(f'Connection {name} not found')

def _get_or_create_ext_connection(self, name: str, ext_hms: ExtHms) -> ConnectionInfo:
options: dict[str, str] = {
FastLee marked this conversation as resolved.
Show resolved Hide resolved
# TODO: Fix once the FEDERATION end point is fixed. Include "builtin": "false" in options
"database": ext_hms.database,
"db_type": ext_hms.db_type,
"host": ext_hms.host,
"port": ext_hms.port,
}
if ext_hms.user:
options["user"] = ext_hms.user
if ext_hms.password:
options["password"] = ext_hms.password
if ext_hms.version:
options["version"] = ext_hms.version
try:
return self._ws.connections.create(
name=name,
connection_type=ConnectionType.HIVE_METASTORE, # needs SDK change
options=options,
)
except AlreadyExists:
for connection in self._ws.connections.list():
FastLee marked this conversation as resolved.
Show resolved Hide resolved
if connection.name == name:
return connection
raise NotFound(f'Connection {name} not found')

def _get_authorized_paths(self) -> str:
existing = {}
for external_location in self._workspace_client.external_locations.list():
for external_location in self._ws.external_locations.list():
existing[external_location.url] = external_location
authorized_paths = []
current_user = self._workspace_client.current_user.me()
current_user = self._ws.current_user.me()
if not current_user.user_name:
raise NotFound('Current user not found')
for external_location_info in self._external_locations.external_locations_with_root():
Expand All @@ -103,11 +223,11 @@ def _add_missing_permissions_if_needed(self, location_name: str, current_user: s
grants = self._location_grants(location_name)
if Privilege.CREATE_FOREIGN_SECURABLE not in grants[current_user]:
change = PermissionsChange(principal=current_user, add=[Privilege.CREATE_FOREIGN_SECURABLE])
self._workspace_client.grants.update(SecurableType.EXTERNAL_LOCATION, location_name, changes=[change])
self._ws.grants.update(SecurableType.EXTERNAL_LOCATION, location_name, changes=[change])

def _location_grants(self, location_name: str) -> dict[str, set[Privilege]]:
grants: dict[str, set[Privilege]] = collections.defaultdict(set)
result = self._workspace_client.grants.get(SecurableType.EXTERNAL_LOCATION, location_name)
result = self._ws.grants.get(SecurableType.EXTERNAL_LOCATION, location_name)
if not result.privilege_assignments:
return grants
for assignment in result.privilege_assignments:
Expand Down
9 changes: 6 additions & 3 deletions tests/integration/hive_metastore/test_federation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from unittest.mock import create_autospec

import pytest
from databricks.labs.blueprint.tui import MockPrompts
from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.account.workspaces import WorkspaceInfo
Expand All @@ -14,13 +15,15 @@ def ws():


@pytest.mark.skip("needs to be enabled")
def test_federation(ws, sql_backend):
def test_federation(ws, ctx, sql_backend):
schema = 'ucx'
installation = ctx.installation
tables_crawler = TablesCrawler(sql_backend, schema)
mounts_crawler = MountsCrawler(sql_backend, ws, schema)
external_locations = ExternalLocations(ws, sql_backend, schema, tables_crawler, mounts_crawler)
workspace_info = create_autospec(WorkspaceInfo)
workspace_info.current.return_value = 'some_thing'
federation = HiveMetastoreFederation(ws, external_locations, workspace_info)
federation.register_internal_hms_as_federated_catalog()
federation = HiveMetastoreFederation(installation, ws, external_locations, workspace_info)
prompts = MockPrompts({})
federation.create_from_cli(prompts)
workspace_info.current.assert_called_once()
Loading
Loading