Skip to content

Commit

Permalink
Let PipelinesMigrator skip unfound jobs (#3554)
Browse files Browse the repository at this point in the history
## Changes
Let `PipelinesMigrator` skip unfound jobs. This might happen when jobs
are deleted inbetween assessment and running pipelines migration

### Linked issues

Resolves #3490

### Functionality

- [x] modified existing command: `databricks labs ucx
migrate-dlt-pipelines`

### Tests

- [x] added unit tests

---------

Co-authored-by: Guenia Izquierdo Delgado <[email protected]>
  • Loading branch information
JCZuurmond and gueniai authored Jan 21, 2025
1 parent 2286190 commit 0220c42
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 9 deletions.
8 changes: 6 additions & 2 deletions src/databricks/labs/ucx/hive_metastore/pipelines_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from databricks.labs.blueprint.parallel import Threads
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import DatabricksError
from databricks.sdk.errors import DatabricksError, NotFound
from databricks.sdk.service.jobs import PipelineTask, Task, JobSettings

from databricks.labs.ucx.assessment.jobs import JobsCrawler
Expand Down Expand Up @@ -53,7 +53,11 @@ def _populate_pipeline_job_tasks_mapping(self) -> None:
if not job.job_id:
continue

job_details = self._ws.jobs.get(int(job.job_id))
try:
job_details = self._ws.jobs.get(int(job.job_id))
except NotFound:
logger.warning(f"Skipping non-existing job: {job.job_id}")
continue
if not job_details.settings or not job_details.settings.tasks:
continue

Expand Down
8 changes: 7 additions & 1 deletion tests/integration/hive_metastore/test_pipeline_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@


def test_pipeline_migrate(
ws, make_pipeline, make_random, watchdog_purge_suffix, make_directory, runtime_ctx, make_mounted_location
ws,
make_pipeline,
make_random,
watchdog_purge_suffix,
make_directory,
runtime_ctx,
make_mounted_location,
) -> None:
src_schema = runtime_ctx.make_schema(catalog_name="hive_metastore")

Expand Down
29 changes: 23 additions & 6 deletions tests/unit/hive_metastore/test_pipeline_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from databricks.labs.lsql.backends import MockBackend
from databricks.sdk.service.jobs import BaseJob, JobSettings, Task, PipelineTask
from databricks.sdk.errors import NotFound

from databricks.labs.ucx.assessment.jobs import JobsCrawler
from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler
Expand Down Expand Up @@ -91,14 +92,30 @@ def test_migrate_pipelines(ws, mock_installation, pipeline_spec, include_flag, e
ws.api_client.do.assert_has_calls([api_calls])


def test_migrate_pipelines_no_pipelines(
ws,
):
errors = {}
rows = {}
sql_backend = MockBackend(fails_on_first=errors, rows=rows)
def test_migrate_pipelines_no_pipelines(ws) -> None:
sql_backend = MockBackend()
pipelines_crawler = PipelinesCrawler(ws, sql_backend, "inventory_database")
jobs_crawler = JobsCrawler(ws, sql_backend, "inventory_database")
pipelines_migrator = PipelinesMigrator(ws, pipelines_crawler, jobs_crawler, "catalog_name")
ws.jobs.list.return_value = [BaseJob(job_id=536591785949415), BaseJob(), BaseJob(job_id=536591785949417)]
pipelines_migrator.migrate_pipelines()


def test_migrate_pipelines_skips_not_found_job(caplog, ws) -> None:
job_columns = MockBackend.rows("job_id", "success", "failures", "job_name", "creator")
sql_backend = MockBackend(
rows={
"`hive_metastore`.`inventory_database`.`jobs`": job_columns[
("536591785949415", 1, [], "single-job", "[email protected]")
]
}
)
pipelines_crawler = PipelinesCrawler(ws, sql_backend, "inventory_database")
jobs_crawler = JobsCrawler(ws, sql_backend, "inventory_database")
pipelines_migrator = PipelinesMigrator(ws, pipelines_crawler, jobs_crawler, "catalog_name")

ws.jobs.get.side_effect = NotFound

with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.hive_metastore.pipelines_migrate"):
pipelines_migrator.migrate_pipelines()
assert "Skipping non-existing job: 536591785949415" in caplog.messages

0 comments on commit 0220c42

Please sign in to comment.