From ba8b4c6a15c5a31e8475d96703506001339c39a9 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 1 Nov 2024 08:46:10 +0100 Subject: [PATCH] Refactor register_job to register_jobs --- .../labs/ucx/assessment/sequencing.py | 31 +++++++++------ .../integration/assessment/test_sequencing.py | 6 +-- tests/unit/assessment/test_sequencing.py | 38 +++++++++---------- 3 files changed, 42 insertions(+), 33 deletions(-) diff --git a/src/databricks/labs/ucx/assessment/sequencing.py b/src/databricks/labs/ucx/assessment/sequencing.py index bc9b4f1260..4fa660f558 100644 --- a/src/databricks/labs/ucx/assessment/sequencing.py +++ b/src/databricks/labs/ucx/assessment/sequencing.py @@ -8,7 +8,7 @@ from databricks.sdk import WorkspaceClient from databricks.sdk.errors import DatabricksError -from databricks.sdk.service import jobs +from databricks.sdk.service.jobs import Job, JobCluster, Task from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo @@ -156,20 +156,29 @@ def __init__(self, ws: WorkspaceClient, administrator_locator: AdministratorLoca # Outgoing references contains edges in the graph pointing from a node to a set of nodes that the node # references. These references follow the API references, e.g. a job contains tasks in the - # `jobs.Job.settings.tasks`, thus a job has an outgoing reference to each of those tasks. + # `Job.settings.tasks`, thus a job has an outgoing reference to each of those tasks. self._outgoing_references: dict[MigrationNodeKey, set[MigrationNode]] = defaultdict(set) - def register_job(self, job: jobs.Job) -> MaybeMigrationNode: + def register_jobs(self, *jobs: Job) -> list[MaybeMigrationNode]: """Register a job. Args: - job (jobs.Job) : The job to register. + jobs (Job) : The jobs to register. Returns: - MaybeMigrationNode : A maybe migration node, which has the migration node if no problems occurred during - registering. Otherwise, the maybe migration node contains the dependency problems occurring during - registering the job. + list[MaybeMigrationNode] : Each element contains a maybe migration node for each job respectively. If no + problems occurred during registering the job, the maybe migration node contains the migration node. + Otherwise, the maybe migration node contains the dependency problems occurring during registering the + job. """ + nodes: list[MaybeMigrationNode] = [] + for job in jobs: + node = self._register_job(job) + nodes.append(node) + return nodes + + def _register_job(self, job: Job) -> MaybeMigrationNode: + """Register a single job.""" problems: list[DependencyProblem] = [] job_node = self._nodes.get(("JOB", str(job.job_id)), None) if job_node: @@ -211,11 +220,11 @@ def register_job(self, job: jobs.Job) -> MaybeMigrationNode: problems.append(problem) return MaybeMigrationNode(job_node, problems) - def _register_workflow_task(self, task: jobs.Task, parent: MigrationNode) -> MaybeMigrationNode: + def _register_workflow_task(self, task: Task, parent: MigrationNode) -> MaybeMigrationNode: """Register a workflow task. TODO: - Handle following jobs.Task attributes: + Handle following Task attributes: - for_each_task - libraries - notebook_task @@ -255,7 +264,7 @@ def _register_workflow_task(self, task: jobs.Task, parent: MigrationNode) -> May problems.append(problem) return MaybeMigrationNode(task_node, problems) - def _register_job_cluster(self, cluster: jobs.JobCluster, parent: MigrationNode) -> MaybeMigrationNode: + def _register_job_cluster(self, cluster: JobCluster, parent: MigrationNode) -> MaybeMigrationNode: """Register a job cluster. A job cluster is defined within a job and therefore is found when defined on the job by definition. @@ -276,7 +285,7 @@ def _register_cluster(self, cluster_id: str) -> MaybeMigrationNode: """Register a cluster. TODO - Handle following jobs.Task attributes: + Handle following Task attributes: - init_scripts - instance_pool_id (maybe_not) - policy_id diff --git a/tests/integration/assessment/test_sequencing.py b/tests/integration/assessment/test_sequencing.py index 5aa5fc2682..3b93682dce 100644 --- a/tests/integration/assessment/test_sequencing.py +++ b/tests/integration/assessment/test_sequencing.py @@ -7,7 +7,7 @@ def test_migration_sequencing_simple_job(make_job, runtime_ctx) -> None: """Sequence a simple job""" job = make_job() - maybe_job_node = runtime_ctx.migration_sequencer.register_job(job) + maybe_job_node = runtime_ctx.migration_sequencer.register_jobs(job)[0] assert not maybe_job_node.failed steps = runtime_ctx.migration_sequencer.generate_steps() @@ -30,7 +30,7 @@ def test_migration_sequencing_job_with_task_referencing_cluster( ) job = make_job(tasks=[task]) - maybe_job_node = runtime_ctx.migration_sequencer.register_job(job) + maybe_job_node = runtime_ctx.migration_sequencer.register_jobs(job)[0] assert not maybe_job_node.failed steps = runtime_ctx.migration_sequencer.generate_steps() @@ -45,7 +45,7 @@ def test_migration_sequencing_job_with_task_referencing_non_existing_cluster(run settings = jobs.JobSettings(name="test-job", tasks=[task]) job = jobs.Job(job_id=1234, settings=settings) - maybe_node = runtime_ctx.migration_sequencer.register_job(job) + maybe_node = runtime_ctx.migration_sequencer.register_jobs(job)[0] assert maybe_node.failed assert maybe_node.problems == [ DependencyProblem( diff --git a/tests/unit/assessment/test_sequencing.py b/tests/unit/assessment/test_sequencing.py index 02d2f478aa..1708c9a537 100644 --- a/tests/unit/assessment/test_sequencing.py +++ b/tests/unit/assessment/test_sequencing.py @@ -19,7 +19,7 @@ def admin_locator(ws): return AdministratorLocator(ws, finders=[lambda _ws: admin_finder]) -def test_register_job_with_existing_cluster(ws, admin_locator) -> None: +def test_register_jobs_with_existing_cluster(ws, admin_locator) -> None: """Register a job with a task referencing an existing cluster.""" task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123") settings = jobs.JobSettings(name="test-job", tasks=[task]) @@ -33,12 +33,12 @@ def get_cluster(cluster_id: str) -> ClusterDetails: ws.clusters.get.side_effect = get_cluster sequencer = MigrationSequencer(ws, admin_locator) - maybe_node = sequencer.register_job(job) + maybe_node = sequencer.register_jobs(job)[0] assert not maybe_node.failed -def test_register_job_with_non_existing_cluster(ws, admin_locator) -> None: +def test_register_jobs_with_non_existing_cluster(ws, admin_locator) -> None: """Register a job with a task referencing a non-existing cluster.""" task = jobs.Task(task_key="test-task", existing_cluster_id="non-existing-id") settings = jobs.JobSettings(name="test-job", tasks=[task]) @@ -47,7 +47,7 @@ def test_register_job_with_non_existing_cluster(ws, admin_locator) -> None: ws.clusters.get.side_effect = ResourceDoesNotExist("Unknown cluster") sequencer = MigrationSequencer(ws, admin_locator) - maybe_node = sequencer.register_job(job) + maybe_node = sequencer.register_jobs(job)[0] assert maybe_node.failed assert maybe_node.problems == [ @@ -58,7 +58,7 @@ def test_register_job_with_non_existing_cluster(ws, admin_locator) -> None: ] -def test_register_job_with_existing_job_cluster_key(ws, admin_locator) -> None: +def test_register_jobs_with_existing_job_cluster_key(ws, admin_locator) -> None: """Register a job with a task referencing a existing job cluster.""" job_cluster = jobs.JobCluster("existing-id", ClusterSpec()) task = jobs.Task(task_key="test-task", job_cluster_key="existing-id") @@ -66,12 +66,12 @@ def test_register_job_with_existing_job_cluster_key(ws, admin_locator) -> None: job = jobs.Job(job_id=1234, settings=settings) sequencer = MigrationSequencer(ws, admin_locator) - maybe_node = sequencer.register_job(job) + maybe_node = sequencer.register_jobs(job)[0] assert not maybe_node.failed -def test_register_job_with_non_existing_job_cluster_key(ws, admin_locator) -> None: +def test_register_jobs_with_non_existing_job_cluster_key(ws, admin_locator) -> None: """Register a job with a task referencing a non-existing job cluster.""" task = jobs.Task(task_key="test-task", job_cluster_key="non-existing-id") settings = jobs.JobSettings(name="test-job", tasks=[task]) @@ -79,7 +79,7 @@ def test_register_job_with_non_existing_job_cluster_key(ws, admin_locator) -> No sequencer = MigrationSequencer(ws, admin_locator) - maybe_node = sequencer.register_job(job) + maybe_node = sequencer.register_jobs(job)[0] assert maybe_node.failed assert maybe_node.problems == [ @@ -90,7 +90,7 @@ def test_register_job_with_non_existing_job_cluster_key(ws, admin_locator) -> No ] -def test_register_job_with_new_cluster(ws, admin_locator) -> None: +def test_register_jobs_with_new_cluster(ws, admin_locator) -> None: """Register a job with a task with a new cluster definition.""" task = jobs.Task(task_key="test-task", new_cluster=ClusterSpec()) settings = jobs.JobSettings(name="test-job", tasks=[task]) @@ -98,12 +98,12 @@ def test_register_job_with_new_cluster(ws, admin_locator) -> None: ws.jobs.get.return_value = job sequencer = MigrationSequencer(ws, admin_locator) - maybe_node = sequencer.register_job(job) + maybe_node = sequencer.register_jobs(job)[0] assert not maybe_node.failed -def test_register_job_with_task_dependency(ws, admin_locator) -> None: +def test_register_jobs_with_task_dependency(ws, admin_locator) -> None: """Register a job with two tasks having a dependency.""" task1 = jobs.Task(task_key="task1") task_dependency = jobs.TaskDependency(task1.task_key) @@ -113,12 +113,12 @@ def test_register_job_with_task_dependency(ws, admin_locator) -> None: job = jobs.Job(job_id=1234, settings=settings) sequencer = MigrationSequencer(ws, admin_locator) - maybe_node = sequencer.register_job(job) + maybe_node = sequencer.register_jobs(job)[0] assert not maybe_node.failed -def test_register_job_with_non_existing_task_dependency(ws, admin_locator) -> None: +def test_register_jobs_with_non_existing_task_dependency(ws, admin_locator) -> None: """Register a job with a non-existing task dependency.""" task_dependency = jobs.TaskDependency("non-existing-id") task = jobs.Task(task_key="task2", depends_on=[task_dependency]) @@ -126,7 +126,7 @@ def test_register_job_with_non_existing_task_dependency(ws, admin_locator) -> No job = jobs.Job(job_id=1234, settings=settings) sequencer = MigrationSequencer(ws, admin_locator) - maybe_node = sequencer.register_job(job) + maybe_node = sequencer.register_jobs(job)[0] assert maybe_node.failed assert maybe_node.problems == [ @@ -160,7 +160,7 @@ def get_cluster(cluster_id: str) -> ClusterDetails: ws.clusters.get.side_effect = get_cluster sequencer = MigrationSequencer(ws, admin_locator) - sequencer.register_job(job) + sequencer.register_jobs(job) steps = list(sequencer.generate_steps()) @@ -208,7 +208,7 @@ def test_sequence_steps_from_job_task_with_existing_job_cluster_key(ws, admin_lo settings = jobs.JobSettings(name="test-job", tasks=[task], job_clusters=[job_cluster]) job = jobs.Job(job_id=1234, settings=settings) sequencer = MigrationSequencer(ws, admin_locator) - sequencer.register_job(job) + sequencer.register_jobs(job) steps = list(sequencer.generate_steps()) @@ -254,7 +254,7 @@ def test_sequence_steps_from_job_task_with_new_cluster(ws, admin_locator) -> Non settings = jobs.JobSettings(name="test-job", tasks=[task]) job = jobs.Job(job_id=1234, settings=settings) sequencer = MigrationSequencer(ws, admin_locator) - sequencer.register_job(job) + sequencer.register_jobs(job) steps = list(sequencer.generate_steps()) @@ -292,7 +292,7 @@ def test_sequence_steps_from_job_task_with_non_existing_cluster(ws, admin_locato settings = jobs.JobSettings(name="test-job", tasks=[task]) job = jobs.Job(job_id=1234, settings=settings) sequencer = MigrationSequencer(ws, admin_locator) - sequencer.register_job(job) + sequencer.register_jobs(job) steps = list(sequencer.generate_steps()) @@ -334,7 +334,7 @@ def test_sequence_steps_from_job_task_referencing_other_task(ws, admin_locator) job = jobs.Job(job_id=1234, settings=settings) sequencer = MigrationSequencer(ws, admin_locator) - maybe_job_node = sequencer.register_job(job) + maybe_job_node = sequencer.register_jobs(job)[0] assert not maybe_job_node.failed steps = list(sequencer.generate_steps())