From 0220c422980a740b5ce645c10d5593a1c89f45af Mon Sep 17 00:00:00 2001 From: Cor Date: Tue, 21 Jan 2025 18:10:44 +0100 Subject: [PATCH] Let `PipelinesMigrator` skip unfound jobs (#3554) ## Changes Let `PipelinesMigrator` skip unfound jobs. This might happen when jobs are deleted inbetween assessment and running pipelines migration ### Linked issues Resolves #3490 ### Functionality - [x] modified existing command: `databricks labs ucx migrate-dlt-pipelines` ### Tests - [x] added unit tests --------- Co-authored-by: Guenia Izquierdo Delgado --- .../ucx/hive_metastore/pipelines_migrate.py | 8 +++-- .../hive_metastore/test_pipeline_migrate.py | 8 ++++- .../hive_metastore/test_pipeline_migrate.py | 29 +++++++++++++++---- 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/pipelines_migrate.py b/src/databricks/labs/ucx/hive_metastore/pipelines_migrate.py index b01233c427..497b864227 100644 --- a/src/databricks/labs/ucx/hive_metastore/pipelines_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/pipelines_migrate.py @@ -4,7 +4,7 @@ from databricks.labs.blueprint.parallel import Threads from databricks.sdk import WorkspaceClient -from databricks.sdk.errors import DatabricksError +from databricks.sdk.errors import DatabricksError, NotFound from databricks.sdk.service.jobs import PipelineTask, Task, JobSettings from databricks.labs.ucx.assessment.jobs import JobsCrawler @@ -53,7 +53,11 @@ def _populate_pipeline_job_tasks_mapping(self) -> None: if not job.job_id: continue - job_details = self._ws.jobs.get(int(job.job_id)) + try: + job_details = self._ws.jobs.get(int(job.job_id)) + except NotFound: + logger.warning(f"Skipping non-existing job: {job.job_id}") + continue if not job_details.settings or not job_details.settings.tasks: continue diff --git a/tests/integration/hive_metastore/test_pipeline_migrate.py b/tests/integration/hive_metastore/test_pipeline_migrate.py index 24ccfa6ba3..b1335e85e4 100644 --- a/tests/integration/hive_metastore/test_pipeline_migrate.py +++ b/tests/integration/hive_metastore/test_pipeline_migrate.py @@ -11,7 +11,13 @@ def test_pipeline_migrate( - ws, make_pipeline, make_random, watchdog_purge_suffix, make_directory, runtime_ctx, make_mounted_location + ws, + make_pipeline, + make_random, + watchdog_purge_suffix, + make_directory, + runtime_ctx, + make_mounted_location, ) -> None: src_schema = runtime_ctx.make_schema(catalog_name="hive_metastore") diff --git a/tests/unit/hive_metastore/test_pipeline_migrate.py b/tests/unit/hive_metastore/test_pipeline_migrate.py index c07c460aa7..8294dea2c6 100644 --- a/tests/unit/hive_metastore/test_pipeline_migrate.py +++ b/tests/unit/hive_metastore/test_pipeline_migrate.py @@ -4,6 +4,7 @@ from databricks.labs.lsql.backends import MockBackend from databricks.sdk.service.jobs import BaseJob, JobSettings, Task, PipelineTask +from databricks.sdk.errors import NotFound from databricks.labs.ucx.assessment.jobs import JobsCrawler from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler @@ -91,14 +92,30 @@ def test_migrate_pipelines(ws, mock_installation, pipeline_spec, include_flag, e ws.api_client.do.assert_has_calls([api_calls]) -def test_migrate_pipelines_no_pipelines( - ws, -): - errors = {} - rows = {} - sql_backend = MockBackend(fails_on_first=errors, rows=rows) +def test_migrate_pipelines_no_pipelines(ws) -> None: + sql_backend = MockBackend() pipelines_crawler = PipelinesCrawler(ws, sql_backend, "inventory_database") jobs_crawler = JobsCrawler(ws, sql_backend, "inventory_database") pipelines_migrator = PipelinesMigrator(ws, pipelines_crawler, jobs_crawler, "catalog_name") ws.jobs.list.return_value = [BaseJob(job_id=536591785949415), BaseJob(), BaseJob(job_id=536591785949417)] pipelines_migrator.migrate_pipelines() + + +def test_migrate_pipelines_skips_not_found_job(caplog, ws) -> None: + job_columns = MockBackend.rows("job_id", "success", "failures", "job_name", "creator") + sql_backend = MockBackend( + rows={ + "`hive_metastore`.`inventory_database`.`jobs`": job_columns[ + ("536591785949415", 1, [], "single-job", "anonymous@databricks.com") + ] + } + ) + pipelines_crawler = PipelinesCrawler(ws, sql_backend, "inventory_database") + jobs_crawler = JobsCrawler(ws, sql_backend, "inventory_database") + pipelines_migrator = PipelinesMigrator(ws, pipelines_crawler, jobs_crawler, "catalog_name") + + ws.jobs.get.side_effect = NotFound + + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.hive_metastore.pipelines_migrate"): + pipelines_migrator.migrate_pipelines() + assert "Skipping non-existing job: 536591785949415" in caplog.messages