Skip to content

Commit

Permalink
Add mount point inventorizer (#209)
Browse files Browse the repository at this point in the history
Add a Mount Point inventorizer
  • Loading branch information
william-conti authored Sep 19, 2023
1 parent c079fb3 commit 4362237
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 83 deletions.
29 changes: 29 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/list_mounts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import logging
from dataclasses import dataclass

from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.framework.crawlers import CrawlerBase, SqlBackend

logger = logging.getLogger(__name__)


@dataclass
class Mount:
name: str
source: str


class Mounts(CrawlerBase):
def __init__(self, backend: SqlBackend, ws: WorkspaceClient, inventory_database: str):
super().__init__(backend, "hive_metastore", inventory_database, "mounts")
self._dbutils = ws.dbutils

def inventorize_mounts(self):
self._append_records(self._list_mounts())

def _list_mounts(self):
mounts = []
for mount_point, source, _ in self._dbutils.fs.mounts():
mounts.append(Mount(mount_point, source))
return mounts
53 changes: 53 additions & 0 deletions src/databricks/labs/ucx/mixins/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
import logging
import os
import pathlib
import shutil
import string
import subprocess
import sys
from pathlib import Path
from typing import BinaryIO, Optional

import pytest
from databricks.sdk import AccountClient, WorkspaceClient
from databricks.sdk.core import DatabricksError
from databricks.sdk.service import compute, iam, jobs, pipelines, workspace
from databricks.sdk.service.sql import CreateWarehouseRequestWarehouseType
from databricks.sdk.service.workspace import ImportFormat

_LOG = logging.getLogger(__name__)

Expand All @@ -36,6 +40,55 @@ def inner(**kwargs):
_LOG.debug(f"ignoring error while {name} {x} teardown: {e}")


@pytest.fixture
def fresh_wheel_file(tmp_path) -> Path:
this_file = Path(__file__)
project_root = this_file.parent.parent.parent.parent.parent.parent.absolute()
# TODO: we can dynamically determine this with python -m build .
wheel_name = "databricks_labs_ucx"

build_root = tmp_path / fresh_wheel_file.__name__
shutil.copytree(project_root, build_root)
try:
completed_process = subprocess.run(
[sys.executable, "-m", "pip", "wheel", "."],
capture_output=True,
cwd=build_root,
check=True,
)
if completed_process.returncode != 0:
raise RuntimeError(completed_process.stderr)

found_wheels = list(build_root.glob(f"{wheel_name}-*.whl"))
if not found_wheels:
msg = f"cannot find {wheel_name}-*.whl"
raise RuntimeError(msg)
if len(found_wheels) > 1:
conflicts = ", ".join(str(whl) for whl in found_wheels)
msg = f"more than one wheel match: {conflicts}"
raise RuntimeError(msg)
wheel_file = found_wheels[0]

return wheel_file
except subprocess.CalledProcessError as e:
raise RuntimeError(e.stderr) from None


@pytest.fixture
def wsfs_wheel(ws, fresh_wheel_file, make_random):
my_user = ws.current_user.me().user_name
workspace_location = f"/Users/{my_user}/wheels/{make_random(10)}"
ws.workspace.mkdirs(workspace_location)

wsfs_wheel = f"{workspace_location}/{fresh_wheel_file.name}"
with fresh_wheel_file.open("rb") as f:
ws.workspace.upload(wsfs_wheel, f, format=ImportFormat.AUTO)

yield wsfs_wheel

ws.workspace.delete(workspace_location, recursive=True)


@pytest.fixture
def make_random():
import random
Expand Down
15 changes: 15 additions & 0 deletions src/databricks/labs/ucx/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from databricks.labs.ucx.framework.crawlers import RuntimeBackend
from databricks.labs.ucx.framework.tasks import task, trigger
from databricks.labs.ucx.hive_metastore import TaclToolkit
from databricks.labs.ucx.hive_metastore.list_mounts import Mounts
from databricks.labs.ucx.workspace_access import GroupMigrationToolkit

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -51,6 +52,20 @@ def crawl_grants(cfg: MigrationConfig):
tacls.grants_snapshot()


@task("assessment", depends_on=[setup_schema])
def inventorize_mounts(cfg: MigrationConfig):
"""In this part of the assessment, we're going to scope the mount points that are going to be
migrated into Unity Catalog. Since these objects are not supported in the UC paragidm, part of the migration phase
is to migrate them into Unity Catalog External Locations.
The assessment is going in the workspace to list all the Mount points that has been created, and then store them in
the `$inventory.mounts` table, which will allow you to have a snapshot of your existing Mount Point infrastructure.
"""
ws = WorkspaceClient(config=cfg.to_databricks_config())
mounts = Mounts(backend=RuntimeBackend(), ws=ws, inventory_database=cfg.inventory_database)
mounts.inventorize_mounts()


@task("assessment", depends_on=[setup_schema])
def inventorize_permissions(cfg: MigrationConfig):
"""As we embark on the complex migration journey from Hive Metastore to the Databricks Unity Catalog, a pivotal
Expand Down
30 changes: 30 additions & 0 deletions tests/integration/framework/test_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from databricks.sdk.service.workspace import AclPermission

from databricks.labs.ucx.mixins.compute import CommandExecutor
from databricks.labs.ucx.mixins.fixtures import * # noqa: F403

load_debug_env_if_runs_from_ide("ucws") # noqa: F405
Expand Down Expand Up @@ -66,3 +67,32 @@ def test_job(make_job):

def test_pipeline(make_pipeline):
logger.info(f"created {make_pipeline()}")


def test_this_wheel_installs(ws, wsfs_wheel):
commands = CommandExecutor(ws)

commands.install_notebook_library(f"/Workspace{wsfs_wheel}")
installed_version = commands.run(
"""
from databricks.labs.ucx.__about__ import __version__
print(__version__)
"""
)

assert installed_version is not None


def test_sql_backend_works(ws, wsfs_wheel):
commands = CommandExecutor(ws)

commands.install_notebook_library(f"/Workspace{wsfs_wheel}")
database_names = commands.run(
"""
from databricks.labs.ucx.framework.crawlers import RuntimeBackend
backend = RuntimeBackend()
return backend.fetch("SHOW DATABASES")
"""
)

assert len(database_names) > 0
35 changes: 35 additions & 0 deletions tests/integration/hive_metastore/test_mounts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pytest

from databricks.labs.ucx.hive_metastore.list_mounts import Mount
from databricks.labs.ucx.mixins.compute import CommandExecutor


@pytest.mark.skip(reason="Needs to have mountpoints already created ")
def test_mount_listing(ws, wsfs_wheel, make_schema, sql_fetch_all):
_, inventory_database = make_schema(catalog="hive_metastore").split(".")
commands = CommandExecutor(ws)
commands.install_notebook_library(f"/Workspace{wsfs_wheel}")

commands.run(
f"""
from databricks.labs.ucx.hive_metastore.list_mounts import Mounts
from databricks.labs.ucx.config import MigrationConfig, GroupsConfig, TaclConfig
from databricks.sdk import WorkspaceClient
from databricks.labs.ucx.framework.crawlers import RuntimeBackend
cfg = MigrationConfig(
inventory_database="{inventory_database}",
groups=GroupsConfig(auto=True),
tacl=TaclConfig(databases=["default"]))
ws = WorkspaceClient(config=cfg.to_databricks_config())
mounts = Mounts(backend=RuntimeBackend(), ws=ws, inventory_database=cfg.inventory_database)
mounts.inventorize_mounts()
"""
)
mounts = sql_fetch_all(f"SELECT * FROM hive_metastore.{inventory_database}.mounts")
results = []

for mount in mounts:
mount_info = Mount(*mount)
results.append(mount_info)

assert len(results) > 0
83 changes: 0 additions & 83 deletions tests/integration/test_installation.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import logging
import os
import random
import shutil
import subprocess
import sys
from io import BytesIO
from pathlib import Path

import pytest
from databricks.sdk.service import compute, jobs
from databricks.sdk.service.iam import PermissionLevel
from databricks.sdk.service.workspace import ImportFormat
Expand All @@ -16,89 +12,10 @@
from databricks.labs.ucx.hive_metastore.grants import Grant
from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.install import Installer
from databricks.labs.ucx.mixins.compute import CommandExecutor

logger = logging.getLogger(__name__)


@pytest.fixture
def fresh_wheel_file(tmp_path) -> Path:
this_file = Path(__file__)
project_root = this_file.parent.parent.parent.absolute()
# TODO: we can dynamically determine this with python -m build .
wheel_name = "databricks_labs_ucx"

build_root = tmp_path / fresh_wheel_file.__name__
shutil.copytree(project_root, build_root)
try:
completed_process = subprocess.run(
[sys.executable, "-m", "pip", "wheel", "."],
capture_output=True,
cwd=build_root,
check=True,
)
if completed_process.returncode != 0:
raise RuntimeError(completed_process.stderr)

found_wheels = list(build_root.glob(f"{wheel_name}-*.whl"))
if not found_wheels:
msg = f"cannot find {wheel_name}-*.whl"
raise RuntimeError(msg)
if len(found_wheels) > 1:
conflicts = ", ".join(str(whl) for whl in found_wheels)
msg = f"more than one wheel match: {conflicts}"
raise RuntimeError(msg)
wheel_file = found_wheels[0]

return wheel_file
except subprocess.CalledProcessError as e:
raise RuntimeError(e.stderr) from None


@pytest.fixture
def wsfs_wheel(ws, fresh_wheel_file, make_random):
my_user = ws.current_user.me().user_name
workspace_location = f"/Users/{my_user}/wheels/{make_random(10)}"
ws.workspace.mkdirs(workspace_location)

wsfs_wheel = f"{workspace_location}/{fresh_wheel_file.name}"
with fresh_wheel_file.open("rb") as f:
ws.workspace.upload(wsfs_wheel, f, format=ImportFormat.AUTO)

yield wsfs_wheel

ws.workspace.delete(workspace_location, recursive=True)


def test_this_wheel_installs(ws, wsfs_wheel):
commands = CommandExecutor(ws)

commands.install_notebook_library(f"/Workspace{wsfs_wheel}")
installed_version = commands.run(
"""
from databricks.labs.ucx.__about__ import __version__
print(__version__)
"""
)

assert installed_version is not None


def test_sql_backend_works(ws, wsfs_wheel):
commands = CommandExecutor(ws)

commands.install_notebook_library(f"/Workspace{wsfs_wheel}")
database_names = commands.run(
"""
from databricks.labs.ucx.framework.crawlers import RuntimeBackend
backend = RuntimeBackend()
return backend.fetch("SHOW DATABASES")
"""
)

assert len(database_names) > 0


def test_assessment_job_with_no_inventory_database(
request,
ws,
Expand Down
22 changes: 22 additions & 0 deletions tests/unit/hive_metastore/test_mounts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from unittest.mock import MagicMock

from databricks.sdk.dbutils import MountInfo

from databricks.labs.ucx.hive_metastore.list_mounts import Mount, Mounts
from tests.unit.framework.mocks import MockBackend


def test_list_mounts_should_return_a_list_of_mount_without_encryption_type():
client = MagicMock()
client.dbutils.fs.mounts.return_value = [
MountInfo("mp_1", "path_1", "info_1"),
MountInfo("mp_2", "path_2", "info_2"),
]

backend = MockBackend()
instance = Mounts(backend, client, "test")

instance.inventorize_mounts()

expected = [Mount("mp_1", "path_1"), Mount("mp_2", "path_2")]
assert backend.rows_written_for("hive_metastore.test.mounts", "append") == expected

0 comments on commit 4362237

Please sign in to comment.