Skip to content

Commit

Permalink
Refactor PipelineMigrator's to add include_pipeline_ids (#3495)
Browse files Browse the repository at this point in the history
## Changes
Replace skip-pipeline-ids flag with include-pipelines-ids that allows
the end user to only migrate the pipelines in the list

### Linked issues
Resolves #3492

### Functionality

- [x] modified flag for migrate-pipelines

### Tests
- [ ] modified unit tests
- [ ] modified integration tests
  • Loading branch information
pritishpai authored Jan 10, 2025
1 parent 27e2002 commit 451bcc5
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 58 deletions.
81 changes: 45 additions & 36 deletions src/databricks/labs/ucx/hive_metastore/pipelines_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from databricks.sdk.service.jobs import PipelineTask, Task, JobSettings

from databricks.labs.ucx.assessment.jobs import JobsCrawler
from databricks.labs.ucx.assessment.pipelines import PipelineInfo, PipelinesCrawler
from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler

logger = logging.getLogger(__name__)

Expand All @@ -17,12 +17,13 @@ class PipelinesMigrator:
"""
PipelinesMigrator is responsible for migrating pipelines from HMS to UC
It uses the DLT Migration API to migrate the pipelines and also updates the jobs associated with the pipelines if any
The class also provides an option to skip the pipelines that are already migrated or the ones that are explicitly skipped
The class also provides an option to include only some pipelines that are should be migrated
:param ws: WorkspaceClient
:param pipelines_crawler: PipelinesCrawler
:param catalog_name: str
:param skip_pipeline_ids: list[str] | None
:param include_pipeline_ids: list[str] | None
:param exclude_pipeline_ids: list[str] | None
"""

def __init__(
Expand All @@ -31,13 +32,15 @@ def __init__(
pipelines_crawler: PipelinesCrawler,
jobs_crawler: JobsCrawler,
catalog_name: str,
skip_pipeline_ids: list[str] | None = None,
include_pipeline_ids: list[str] | None = None,
exclude_pipeline_ids: list[str] | None = None,
):
self._ws = ws
self._pipeline_crawler = pipelines_crawler
self._jobs_crawler = jobs_crawler
self._catalog_name = catalog_name
self._skip_pipeline_ids = skip_pipeline_ids or []
self._include_pipeline_ids = include_pipeline_ids
self._exclude_pipeline_ids = exclude_pipeline_ids
self._pipeline_job_tasks_mapping: dict[str, list[dict]] = {}

def _populate_pipeline_job_tasks_mapping(self) -> None:
Expand Down Expand Up @@ -65,11 +68,20 @@ def _populate_pipeline_job_tasks_mapping(self) -> None:
self._pipeline_job_tasks_mapping[pipeline_id].append(job_info)
logger.info(f"Found job:{job.job_id} task:{task.task_key} associated with pipeline {pipeline_id}")

def _get_pipelines_to_migrate(self) -> list[PipelineInfo]:
def _get_pipeline_ids_to_migrate(self) -> list[str]:
"""
Returns the list of pipelines in the current workspace
Returns the list of pipelines filtered by the include and exclude list
"""
return list(self._pipeline_crawler.snapshot())
pipeline_ids_to_migrate = []

if self._include_pipeline_ids:
pipeline_ids_to_migrate = self._include_pipeline_ids
else:
pipeline_ids_to_migrate = [p.pipeline_id for p in self._pipeline_crawler.snapshot()]

if self._exclude_pipeline_ids is not None:
pipeline_ids_to_migrate = list(set(pipeline_ids_to_migrate) - set(self._exclude_pipeline_ids))
return pipeline_ids_to_migrate

def migrate_pipelines(self) -> None:
"""
Expand All @@ -83,34 +95,31 @@ def _migrate_pipelines(self) -> list[partial[dict | bool | list | BinaryIO]]:
Create tasks to parallely migrate the pipelines
"""
# get pipelines to migrate
pipelines_to_migrate = self._get_pipelines_to_migrate()
pipelines_to_migrate = self._get_pipeline_ids_to_migrate()
logger.info(f"Found {len(pipelines_to_migrate)} pipelines to migrate")

tasks = []
for pipeline in pipelines_to_migrate:
if pipeline.pipeline_id in self._skip_pipeline_ids:
logger.info(f"Skipping pipeline {pipeline.pipeline_id}")
continue
tasks.append(partial(self._migrate_pipeline, pipeline))
for pipeline_id in pipelines_to_migrate:
tasks.append(partial(self._migrate_pipeline, pipeline_id))
if not tasks:
return []
Threads.strict("migrate pipelines", tasks)
return tasks

def _migrate_pipeline(self, pipeline: PipelineInfo) -> dict | list | BinaryIO | bool:
def _migrate_pipeline(self, pipeline_id: str) -> dict | list | BinaryIO | bool:
"""
Private method to clone the pipeline and handle the exceptions
"""
try:
return self._clone_pipeline(pipeline)
return self._clone_pipeline(pipeline_id)
except DatabricksError as e:
if "Cloning from Hive Metastore to Unity Catalog is currently not supported" in str(e):
logger.error(f"{e}: Please contact Databricks to enable DLT HMS to UC migration API on this workspace")
return False
logger.error(f"Failed to migrate pipeline {pipeline.pipeline_id}: {e}")
logger.error(f"Failed to migrate pipeline {pipeline_id}: {e}")
return False

def _clone_pipeline(self, pipeline: PipelineInfo) -> dict | list | BinaryIO:
def _clone_pipeline(self, pipeline_id: str) -> dict | list | BinaryIO:
"""
This method calls the DLT Migration API to clone the pipeline
Stop and rename the old pipeline before cloning the new pipeline
Expand All @@ -119,25 +128,25 @@ def _clone_pipeline(self, pipeline: PipelineInfo) -> dict | list | BinaryIO:
"""
# Need to get the pipeline again to get the libraries
# else updating name fails with libraries not provided error
get_pipeline = self._ws.pipelines.get(pipeline.pipeline_id)
if get_pipeline.spec:
if get_pipeline.spec.catalog:
pipeline_info = self._ws.pipelines.get(pipeline_id)
if pipeline_info.spec and pipeline_info.pipeline_id:
if pipeline_info.spec.catalog:
# Skip if the pipeline is already migrated to UC
logger.info(f"Pipeline {pipeline.pipeline_id} is already migrated to UC")
logger.info(f"Pipeline {pipeline_info.pipeline_id} is already migrated to UC")
return []

# Stop HMS pipeline
self._ws.pipelines.stop(pipeline.pipeline_id)
self._ws.pipelines.stop(pipeline_info.pipeline_id)
# Rename old pipeline first
self._ws.pipelines.update(
pipeline.pipeline_id,
name=f"{pipeline.pipeline_name} [OLD]",
clusters=get_pipeline.spec.clusters if get_pipeline.spec.clusters else None,
storage=get_pipeline.spec.storage if get_pipeline.spec.storage else None,
continuous=get_pipeline.spec.continuous if get_pipeline.spec.continuous else None,
deployment=get_pipeline.spec.deployment if get_pipeline.spec.deployment else None,
target=get_pipeline.spec.target if get_pipeline.spec.target else None,
libraries=get_pipeline.spec.libraries if get_pipeline.spec.libraries else None,
pipeline_info.pipeline_id,
name=f"{pipeline_info.name} [OLD]",
clusters=pipeline_info.spec.clusters if pipeline_info.spec.clusters else None,
storage=pipeline_info.spec.storage if pipeline_info.spec.storage else None,
continuous=pipeline_info.spec.continuous if pipeline_info.spec.continuous else None,
deployment=pipeline_info.spec.deployment if pipeline_info.spec.deployment else None,
target=pipeline_info.spec.target if pipeline_info.spec.target else None,
libraries=pipeline_info.spec.libraries if pipeline_info.spec.libraries else None,
)

# Clone pipeline
Expand All @@ -149,21 +158,21 @@ def _clone_pipeline(self, pipeline: PipelineInfo) -> dict | list | BinaryIO:
'catalog': self._catalog_name,
'clone_mode': 'MIGRATE_TO_UC',
'configuration': {'pipelines.migration.ignoreExplicitPath': 'true'},
'name': f"{pipeline.pipeline_name}",
'name': f"{pipeline_info.name}",
}
res = self._ws.api_client.do(
'POST', f'/api/2.0/pipelines/{pipeline.pipeline_id}/clone', body=body, headers=headers
'POST', f'/api/2.0/pipelines/{pipeline_info.pipeline_id}/clone', body=body, headers=headers
)
assert isinstance(res, dict)
if 'pipeline_id' not in res:
logger.warning(f"Failed to clone pipeline {pipeline.pipeline_id}")
logger.warning(f"Failed to clone pipeline {pipeline_info.pipeline_id}")
return res

# After successful clone, update jobs
# Currently there is no SDK method available to migrate the DLT pipelines
# We are directly using the DLT Migration API in the interim, once the SDK method is available, we can replace this
if pipeline.pipeline_id in self._pipeline_job_tasks_mapping:
for pipeline_job_task_mapping in self._pipeline_job_tasks_mapping[pipeline.pipeline_id]:
if pipeline_info.pipeline_id in self._pipeline_job_tasks_mapping:
for pipeline_job_task_mapping in self._pipeline_job_tasks_mapping[pipeline_info.pipeline_id]:
self._ws.jobs.update(
pipeline_job_task_mapping['job_id'],
new_settings=JobSettings(
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/hive_metastore/test_pipeline_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def test_pipeline_migrate(
runtime_ctx.pipelines_crawler,
runtime_ctx.jobs_crawler,
runtime_ctx.make_catalog().name,
skip_pipeline_ids=[created_pipelines[1].pipeline_id],
include_pipeline_ids=[created_pipelines[0].pipeline_id, created_pipelines[1].pipeline_id],
)
pipelines_migrator.migrate_pipelines()

Expand Down
30 changes: 9 additions & 21 deletions tests/unit/hive_metastore/test_pipeline_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@


@pytest.mark.parametrize(
"pipeline_spec,expected,api_calls",
"pipeline_spec, include_flag, expected, api_calls",
[
# empty spec
(
("empty-spec", "pipe1", 1, "[]", "creator1"),
"empty-spec",
1,
call(
'POST',
Expand All @@ -26,16 +27,17 @@
'catalog': 'catalog_name',
'clone_mode': 'MIGRATE_TO_UC',
'configuration': {'pipelines.migration.ignoreExplicitPath': 'true'},
'name': '[]',
'name': 'New DLT Pipeline',
},
headers={'Accept': 'application/json', 'Content-Type': 'application/json'},
),
),
# migrated dlt spec
(("migrated-dlt-spec", "pipe2", 1, "[]", "creator2"), 0, None),
(("migrated-dlt-spec", "pipe2", 1, "[]", "creator2"), "migrated-dlt-spec", 0, None),
# spec with spn
(
("spec-with-spn", "pipe3", 1, "[]", "creator3"),
"spec-with-spn",
1,
call(
'POST',
Expand All @@ -44,16 +46,16 @@
'catalog': 'catalog_name',
'clone_mode': 'MIGRATE_TO_UC',
'configuration': {'pipelines.migration.ignoreExplicitPath': 'true'},
'name': '[]',
'name': 'New DLT Pipeline',
},
headers={'Accept': 'application/json', 'Content-Type': 'application/json'},
),
),
# skip pipeline
(("skip-pipeline", "pipe3", 1, "[]", "creator3"), 0, None),
(("skip-pipeline", "pipe3", 1, "[]", "creator3"), "some-other-spec", 0, None),
],
)
def test_migrate_pipelines(ws, mock_installation, pipeline_spec, expected, api_calls):
def test_migrate_pipelines(ws, mock_installation, pipeline_spec, include_flag, expected, api_calls):
errors = {}
rows = {
"`hive_metastore`.`inventory_database`.`pipelines`": [pipeline_spec],
Expand All @@ -65,7 +67,7 @@ def test_migrate_pipelines(ws, mock_installation, pipeline_spec, expected, api_c
pipelines_crawler = PipelinesCrawler(ws, sql_backend, "inventory_database")
jobs_crawler = JobsCrawler(ws, sql_backend, "inventory_database")
pipelines_migrator = PipelinesMigrator(
ws, pipelines_crawler, jobs_crawler, "catalog_name", skip_pipeline_ids=["skip-pipeline"]
ws, pipelines_crawler, jobs_crawler, "catalog_name", include_pipeline_ids=[include_flag]
)

ws.jobs.get.side_effect = [
Expand All @@ -88,20 +90,6 @@ def test_migrate_pipelines(ws, mock_installation, pipeline_spec, expected, api_c
if api_calls:
ws.api_client.do.assert_has_calls([api_calls])

ws.jobs.list.return_value = [BaseJob(job_id=536591785949415), BaseJob(), BaseJob(job_id=536591785949417)]
ws.jobs.get.side_effect = [
BaseJob(
job_id=536591785949415,
settings=JobSettings(
name="single-job",
tasks=[Task(pipeline_task=PipelineTask(pipeline_id="empty-spec"), task_key="task_key")],
),
),
BaseJob(
job_id=536591785949417,
),
]


def test_migrate_pipelines_no_pipelines(
ws,
Expand Down

0 comments on commit 451bcc5

Please sign in to comment.