diff --git a/moto/batch/models.py b/moto/batch/models.py index 2fc633c54b34..887f6f48c7de 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -484,6 +484,7 @@ def __init__( all_jobs: Dict[str, "Job"], timeout: Optional[Dict[str, int]], array_properties: Dict[str, Any], + provided_job_id: Optional[str] = None, ): threading.Thread.__init__(self) DockerModel.__init__(self) @@ -494,7 +495,7 @@ def __init__( ) self.job_name = name - self.job_id = str(mock_random.uuid4()) + self.job_id = provided_job_id or str(mock_random.uuid4()) self.job_definition = job_def self.container_overrides: Dict[str, Any] = container_overrides or {} self.job_queue = job_queue @@ -527,6 +528,7 @@ def __init__( self.attempts: List[Dict[str, Any]] = [] self.latest_attempt: Optional[Dict[str, Any]] = None + self._child_jobs: Optional[List[Job]] = None def describe_short(self) -> Dict[str, Any]: result = { @@ -564,20 +566,26 @@ def describe(self) -> Dict[str, Any]: if self.timeout: result["timeout"] = self.timeout result["attempts"] = self.attempts - if self.array_properties: - size = self.array_properties["size"] + if self._child_jobs: + child_statuses = { + "STARTING": 0, + "FAILED": 0, + "RUNNING": 0, + "SUCCEEDED": 0, + "RUNNABLE": 0, + "SUBMITTED": 0, + "PENDING": 0, + } + for child_job in self._child_jobs: + if child_job.status is not None: + child_statuses[child_job.status] += 1 result["arrayProperties"] = { - "statusSummary": { - "STARTING": 0, - "FAILED": 0, - "RUNNING": 0, - "SUCCEEDED": size, - "RUNNABLE": 0, - "SUBMITTED": 0, - "PENDING": 0, - }, - "size": size, + "statusSummary": child_statuses, + "size": len(self._child_jobs), } + if len(self._child_jobs) == child_statuses["SUCCEEDED"]: + self.status = "SUCCEEDED" + result["status"] = self.status return result def _container_details(self) -> Dict[str, Any]: @@ -693,7 +701,7 @@ def run(self) -> None: ) for m in self._get_container_property("mountPoints", []) ], - "name": f"{self.job_name}-{self.job_id}", + "name": f"{self.job_name}-{self.job_id.replace(':', '-')}", } ) else: @@ -1722,7 +1730,7 @@ def submit_job( job_name: str, job_def_id: str, job_queue: str, - array_properties: Optional[Dict[str, int]], + array_properties: Dict[str, int], depends_on: Optional[List[Dict[str, str]]] = None, container_overrides: Optional[Dict[str, Any]] = None, timeout: Optional[Dict[str, int]] = None, @@ -1755,8 +1763,32 @@ def submit_job( ) self._jobs[job.job_id] = job - # Here comes the fun - job.start() + if "size" in array_properties: + child_jobs = [] + for array_index in range(array_properties["size"]): + provided_job_id = f"{job.job_id}:{array_index}" + child_job = Job( + job_name, + job_def, + queue, + log_backend=self.logs_backend, + container_overrides=container_overrides, + depends_on=depends_on, + all_jobs=self._jobs, + timeout=timeout, + array_properties={"statusSummary": {}, "index": array_index}, + provided_job_id=provided_job_id, + ) + child_jobs.append(child_job) + self._jobs[child_job.job_id] = child_job + child_job.start() + + # The 'parent' job doesn't need to be executed + # it just needs to keep track of it's children + job._child_jobs = child_jobs + else: + # Here comes the fun + job.start() return job_name, job.job_id def describe_jobs(self, jobs: Optional[List[str]]) -> List[Dict[str, Any]]: @@ -1766,24 +1798,12 @@ def describe_jobs(self, jobs: Optional[List[str]]) -> List[Dict[str, Any]]: result = [] for key, job in self._jobs.items(): - if len(job_filter) > 0: - for f in job_filter: - # account for array job - if ":" in f: - real_f, job_index = f.split(":") - if ( - real_f == key - and int(job_index) >= 0 - and int(job_index) < job.array_properties["size"] - ): - child_job = job.describe() - child_job["jobArn"] = f"{child_job['jobArn']}:{job_index}" - child_job["jobId"] = f"{child_job['jobId']}:{job_index}" - result.append(child_job) - - if key not in job_filter and job.arn not in job_filter: - # didn't find our job, skip - continue + if ( + len(job_filter) > 0 + and key not in job_filter + and job.arn not in job_filter + ): + continue result.append(job.describe()) diff --git a/moto/batch/responses.py b/moto/batch/responses.py index 1768a6b691b2..ef933f3e407e 100644 --- a/moto/batch/responses.py +++ b/moto/batch/responses.py @@ -210,7 +210,7 @@ def submitjob(self) -> str: job_name = self._get_param("jobName") job_queue = self._get_param("jobQueue") timeout = self._get_param("timeout") - array_properties = self._get_param("arrayProperties") + array_properties = self._get_param("arrayProperties", {}) name, job_id = self.batch_backend.submit_job( job_name, diff --git a/moto/batch_simple/models.py b/moto/batch_simple/models.py index d8c03d6758f1..6d8079ced596 100644 --- a/moto/batch_simple/models.py +++ b/moto/batch_simple/models.py @@ -42,7 +42,7 @@ def __getattribute__(self, name: str) -> Any: "url_bases", ]: return object.__getattribute__(self, name) - if name in ["submit_job"]: + if name in ["submit_job", "_mark_job_as_finished"]: def newfunc(*args: Any, **kwargs: Any) -> Any: attr = object.__getattribute__(self, name) @@ -57,7 +57,7 @@ def submit_job( job_name: str, job_def_id: str, job_queue: str, - array_properties: Optional[Dict[str, int]], + array_properties: Dict[str, Any], depends_on: Optional[List[Dict[str, str]]] = None, container_overrides: Optional[Dict[str, Any]] = None, timeout: Optional[Dict[str, int]] = None, @@ -80,14 +80,40 @@ def submit_job( depends_on=depends_on, all_jobs=self._jobs, timeout=timeout, - array_properties=array_properties or {}, + array_properties=array_properties, ) - self.backend._jobs[job.job_id] = job + if "size" in array_properties: + child_jobs: List[Job] = [] + for array_index in range(array_properties.get("size", 0)): + provided_job_id = f"{job.job_id}:{array_index}" + child_job = Job( + job_name, + job_def, + queue, + log_backend=self.logs_backend, + container_overrides=container_overrides, + depends_on=depends_on, + all_jobs=self._jobs, + timeout=timeout, + array_properties={"statusSummary": {}, "index": array_index}, + provided_job_id=provided_job_id, + ) + self._mark_job_as_finished(include_start_attempt=True, job=child_job) + child_jobs.append(child_job) + self._mark_job_as_finished(include_start_attempt=False, job=job) + job._child_jobs = child_jobs + else: + self._mark_job_as_finished(include_start_attempt=True, job=job) + + return job_name, job.job_id + + def _mark_job_as_finished(self, include_start_attempt: bool, job: Job) -> None: + self.backend._jobs[job.job_id] = job job.job_started_at = datetime.datetime.now() job.log_stream_name = job._stream_name - job._start_attempt() - + if include_start_attempt: + job._start_attempt() # We don't want to actually run the job - just mark it as succeeded or failed # depending on whether env var MOTO_SIMPLE_BATCH_FAIL_AFTER is set # if MOTO_SIMPLE_BATCH_FAIL_AFTER is set to an integer then batch will @@ -106,7 +132,5 @@ def submit_job( else: job._mark_stopped(success=True) - return job_name, job.job_id - batch_simple_backends = BackendDict(BatchSimpleBackend, "batch") diff --git a/tests/test_batch/test_batch_jobs.py b/tests/test_batch/test_batch_jobs.py index 0d1dfdef4e35..402dbc3c3523 100644 --- a/tests/test_batch/test_batch_jobs.py +++ b/tests/test_batch/test_batch_jobs.py @@ -86,7 +86,6 @@ def test_submit_job_by_name(): assert resp_jobs["jobs"][0]["jobDefinition"] == job_definition_arn -@mock_logs @mock_ec2 @mock_ecs @mock_iam @@ -110,23 +109,25 @@ def test_submit_job_array_size(): # Verify job_id = resp["jobId"] child_job_1_id = f"{job_id}:0" - child_job_2_id = f"{job_id}:1" - non_existent_child_job_3_id = f"{job_id}:2" - - resp_main_job = batch_client.describe_jobs(jobs=[job_id]) - resp_child_job_1_id = batch_client.describe_jobs(jobs=[child_job_1_id]) - resp_child_job_2_id = batch_client.describe_jobs(jobs=[child_job_2_id]) - resp_non_existent_child_job_3_id = batch_client.describe_jobs( - jobs=[non_existent_child_job_3_id] - ) - job = resp_main_job["jobs"][0] + job = batch_client.describe_jobs(jobs=[job_id])["jobs"][0] + + assert job["arrayProperties"]["size"] == 2 + assert job["attempts"] == [] + + _wait_for_job_status(batch_client, job_id, "SUCCEEDED") - assert "arrayProperties" in job + job = batch_client.describe_jobs(jobs=[job_id])["jobs"][0] + # If the main job is successful, that means that all child jobs are successful assert job["arrayProperties"]["size"] == 2 - assert resp_child_job_1_id["jobs"] - assert resp_child_job_2_id["jobs"] - assert not resp_non_existent_child_job_3_id["jobs"] + assert job["arrayProperties"]["statusSummary"]["SUCCEEDED"] == 2 + # Main job still has no attempts - because only the child jobs are executed + assert job["attempts"] == [] + + child_job_1 = batch_client.describe_jobs(jobs=[child_job_1_id])["jobs"][0] + assert child_job_1["status"] == "SUCCEEDED" + # Child job was executed + assert len(child_job_1["attempts"]) == 1 # SLOW TESTS diff --git a/tests/test_batch_simple/test_batch_jobs.py b/tests/test_batch_simple/test_batch_jobs.py index 7b8b21fa5472..2d2c940ff90e 100644 --- a/tests/test_batch_simple/test_batch_jobs.py +++ b/tests/test_batch_simple/test_batch_jobs.py @@ -43,7 +43,6 @@ def test_submit_job_by_name(): assert "logStreamName" in job["container"] -@mock_logs @mock_ec2 @mock_ecs @mock_iam @@ -64,23 +63,22 @@ def test_submit_job_array_size(): # Verify job_id = resp["jobId"] child_job_1_id = f"{job_id}:0" - child_job_2_id = f"{job_id}:1" - non_existent_child_job_3_id = f"{job_id}:2" - - resp_main_job = batch_client.describe_jobs(jobs=[job_id]) - resp_child_job_1_id = batch_client.describe_jobs(jobs=[child_job_1_id]) - resp_child_job_2_id = batch_client.describe_jobs(jobs=[child_job_2_id]) - resp_non_existent_child_job_3_id = batch_client.describe_jobs( - jobs=[non_existent_child_job_3_id] - ) - job = resp_main_job["jobs"][0] + job = batch_client.describe_jobs(jobs=[job_id])["jobs"][0] + + assert job["arrayProperties"]["size"] == 2 + assert job["attempts"] == [] - assert "arrayProperties" in job + # If the main job is successful, that means that all child jobs are successful assert job["arrayProperties"]["size"] == 2 - assert resp_child_job_1_id["jobs"] - assert resp_child_job_2_id["jobs"] - assert not resp_non_existent_child_job_3_id["jobs"] + assert job["arrayProperties"]["statusSummary"]["SUCCEEDED"] == 2 + # Main job still has no attempts - because only the child jobs are executed + assert job["attempts"] == [] + + child_job_1 = batch_client.describe_jobs(jobs=[child_job_1_id])["jobs"][0] + assert child_job_1["status"] == "SUCCEEDED" + # Child job was executed + assert len(child_job_1["attempts"]) == 1 @mock_batch_simple