Skip to content

Commit

Permalink
Batch: Ensure array jobs are actually executed
Browse files Browse the repository at this point in the history
  • Loading branch information
bblommers committed Oct 12, 2023
1 parent 18ad5f3 commit 301d385
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 74 deletions.
90 changes: 55 additions & 35 deletions moto/batch/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ def __init__(
all_jobs: Dict[str, "Job"],
timeout: Optional[Dict[str, int]],
array_properties: Dict[str, Any],
provided_job_id: Optional[str] = None,
):
threading.Thread.__init__(self)
DockerModel.__init__(self)
Expand All @@ -494,7 +495,7 @@ def __init__(
)

self.job_name = name
self.job_id = str(mock_random.uuid4())
self.job_id = provided_job_id or str(mock_random.uuid4())
self.job_definition = job_def
self.container_overrides: Dict[str, Any] = container_overrides or {}
self.job_queue = job_queue
Expand Down Expand Up @@ -527,6 +528,7 @@ def __init__(

self.attempts: List[Dict[str, Any]] = []
self.latest_attempt: Optional[Dict[str, Any]] = None
self._child_jobs: Optional[List[Job]] = None

def describe_short(self) -> Dict[str, Any]:
result = {
Expand Down Expand Up @@ -564,20 +566,26 @@ def describe(self) -> Dict[str, Any]:
if self.timeout:
result["timeout"] = self.timeout
result["attempts"] = self.attempts
if self.array_properties:
size = self.array_properties["size"]
if self._child_jobs:
child_statuses = {
"STARTING": 0,
"FAILED": 0,
"RUNNING": 0,
"SUCCEEDED": 0,
"RUNNABLE": 0,
"SUBMITTED": 0,
"PENDING": 0,
}
for child_job in self._child_jobs:
if child_job.status is not None:
child_statuses[child_job.status] += 1
result["arrayProperties"] = {
"statusSummary": {
"STARTING": 0,
"FAILED": 0,
"RUNNING": 0,
"SUCCEEDED": size,
"RUNNABLE": 0,
"SUBMITTED": 0,
"PENDING": 0,
},
"size": size,
"statusSummary": child_statuses,
"size": len(self._child_jobs),
}
if len(self._child_jobs) == child_statuses["SUCCEEDED"]:
self.status = "SUCCEEDED"
result["status"] = self.status
return result

def _container_details(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -693,7 +701,7 @@ def run(self) -> None:
)
for m in self._get_container_property("mountPoints", [])
],
"name": f"{self.job_name}-{self.job_id}",
"name": f"{self.job_name}-{self.job_id.replace(':', '-')}",
}
)
else:
Expand Down Expand Up @@ -1722,7 +1730,7 @@ def submit_job(
job_name: str,
job_def_id: str,
job_queue: str,
array_properties: Optional[Dict[str, int]],
array_properties: Dict[str, int],
depends_on: Optional[List[Dict[str, str]]] = None,
container_overrides: Optional[Dict[str, Any]] = None,
timeout: Optional[Dict[str, int]] = None,
Expand Down Expand Up @@ -1755,8 +1763,32 @@ def submit_job(
)
self._jobs[job.job_id] = job

# Here comes the fun
job.start()
if "size" in array_properties:
child_jobs = []
for array_index in range(array_properties["size"]):
provided_job_id = f"{job.job_id}:{array_index}"
child_job = Job(
job_name,
job_def,
queue,
log_backend=self.logs_backend,
container_overrides=container_overrides,
depends_on=depends_on,
all_jobs=self._jobs,
timeout=timeout,
array_properties={"statusSummary": {}, "index": array_index},
provided_job_id=provided_job_id,
)
child_jobs.append(child_job)
self._jobs[child_job.job_id] = child_job
child_job.start()

# The 'parent' job doesn't need to be executed
# it just needs to keep track of it's children
job._child_jobs = child_jobs
else:
# Here comes the fun
job.start()
return job_name, job.job_id

def describe_jobs(self, jobs: Optional[List[str]]) -> List[Dict[str, Any]]:
Expand All @@ -1766,24 +1798,12 @@ def describe_jobs(self, jobs: Optional[List[str]]) -> List[Dict[str, Any]]:

result = []
for key, job in self._jobs.items():
if len(job_filter) > 0:
for f in job_filter:
# account for array job
if ":" in f:
real_f, job_index = f.split(":")
if (
real_f == key
and int(job_index) >= 0
and int(job_index) < job.array_properties["size"]
):
child_job = job.describe()
child_job["jobArn"] = f"{child_job['jobArn']}:{job_index}"
child_job["jobId"] = f"{child_job['jobId']}:{job_index}"
result.append(child_job)

if key not in job_filter and job.arn not in job_filter:
# didn't find our job, skip
continue
if (
len(job_filter) > 0
and key not in job_filter
and job.arn not in job_filter
):
continue

result.append(job.describe())

Expand Down
2 changes: 1 addition & 1 deletion moto/batch/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def submitjob(self) -> str:
job_name = self._get_param("jobName")
job_queue = self._get_param("jobQueue")
timeout = self._get_param("timeout")
array_properties = self._get_param("arrayProperties")
array_properties = self._get_param("arrayProperties", {})

name, job_id = self.batch_backend.submit_job(
job_name,
Expand Down
40 changes: 32 additions & 8 deletions moto/batch_simple/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __getattribute__(self, name: str) -> Any:
"url_bases",
]:
return object.__getattribute__(self, name)
if name in ["submit_job"]:
if name in ["submit_job", "_mark_job_as_finished"]:

def newfunc(*args: Any, **kwargs: Any) -> Any:
attr = object.__getattribute__(self, name)
Expand All @@ -57,7 +57,7 @@ def submit_job(
job_name: str,
job_def_id: str,
job_queue: str,
array_properties: Optional[Dict[str, int]],
array_properties: Dict[str, Any],
depends_on: Optional[List[Dict[str, str]]] = None,
container_overrides: Optional[Dict[str, Any]] = None,
timeout: Optional[Dict[str, int]] = None,
Expand All @@ -80,14 +80,40 @@ def submit_job(
depends_on=depends_on,
all_jobs=self._jobs,
timeout=timeout,
array_properties=array_properties or {},
array_properties=array_properties,
)
self.backend._jobs[job.job_id] = job

if "size" in array_properties:
child_jobs: List[Job] = []
for array_index in range(array_properties.get("size", 0)):
provided_job_id = f"{job.job_id}:{array_index}"
child_job = Job(
job_name,
job_def,
queue,
log_backend=self.logs_backend,
container_overrides=container_overrides,
depends_on=depends_on,
all_jobs=self._jobs,
timeout=timeout,
array_properties={"statusSummary": {}, "index": array_index},
provided_job_id=provided_job_id,
)
self._mark_job_as_finished(include_start_attempt=True, job=child_job)
child_jobs.append(child_job)
self._mark_job_as_finished(include_start_attempt=False, job=job)
job._child_jobs = child_jobs
else:
self._mark_job_as_finished(include_start_attempt=True, job=job)

return job_name, job.job_id

def _mark_job_as_finished(self, include_start_attempt: bool, job: Job) -> None:
self.backend._jobs[job.job_id] = job
job.job_started_at = datetime.datetime.now()
job.log_stream_name = job._stream_name
job._start_attempt()

if include_start_attempt:
job._start_attempt()
# We don't want to actually run the job - just mark it as succeeded or failed
# depending on whether env var MOTO_SIMPLE_BATCH_FAIL_AFTER is set
# if MOTO_SIMPLE_BATCH_FAIL_AFTER is set to an integer then batch will
Expand All @@ -106,7 +132,5 @@ def submit_job(
else:
job._mark_stopped(success=True)

return job_name, job.job_id


batch_simple_backends = BackendDict(BatchSimpleBackend, "batch")
31 changes: 16 additions & 15 deletions tests/test_batch/test_batch_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ def test_submit_job_by_name():
assert resp_jobs["jobs"][0]["jobDefinition"] == job_definition_arn


@mock_logs
@mock_ec2
@mock_ecs
@mock_iam
Expand All @@ -110,23 +109,25 @@ def test_submit_job_array_size():
# Verify
job_id = resp["jobId"]
child_job_1_id = f"{job_id}:0"
child_job_2_id = f"{job_id}:1"
non_existent_child_job_3_id = f"{job_id}:2"

resp_main_job = batch_client.describe_jobs(jobs=[job_id])
resp_child_job_1_id = batch_client.describe_jobs(jobs=[child_job_1_id])
resp_child_job_2_id = batch_client.describe_jobs(jobs=[child_job_2_id])
resp_non_existent_child_job_3_id = batch_client.describe_jobs(
jobs=[non_existent_child_job_3_id]
)

job = resp_main_job["jobs"][0]
job = batch_client.describe_jobs(jobs=[job_id])["jobs"][0]

assert job["arrayProperties"]["size"] == 2
assert job["attempts"] == []

_wait_for_job_status(batch_client, job_id, "SUCCEEDED")

assert "arrayProperties" in job
job = batch_client.describe_jobs(jobs=[job_id])["jobs"][0]
# If the main job is successful, that means that all child jobs are successful
assert job["arrayProperties"]["size"] == 2
assert resp_child_job_1_id["jobs"]
assert resp_child_job_2_id["jobs"]
assert not resp_non_existent_child_job_3_id["jobs"]
assert job["arrayProperties"]["statusSummary"]["SUCCEEDED"] == 2
# Main job still has no attempts - because only the child jobs are executed
assert job["attempts"] == []

child_job_1 = batch_client.describe_jobs(jobs=[child_job_1_id])["jobs"][0]
assert child_job_1["status"] == "SUCCEEDED"
# Child job was executed
assert len(child_job_1["attempts"]) == 1


# SLOW TESTS
Expand Down
28 changes: 13 additions & 15 deletions tests/test_batch_simple/test_batch_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def test_submit_job_by_name():
assert "logStreamName" in job["container"]


@mock_logs
@mock_ec2
@mock_ecs
@mock_iam
Expand All @@ -64,23 +63,22 @@ def test_submit_job_array_size():
# Verify
job_id = resp["jobId"]
child_job_1_id = f"{job_id}:0"
child_job_2_id = f"{job_id}:1"
non_existent_child_job_3_id = f"{job_id}:2"

resp_main_job = batch_client.describe_jobs(jobs=[job_id])
resp_child_job_1_id = batch_client.describe_jobs(jobs=[child_job_1_id])
resp_child_job_2_id = batch_client.describe_jobs(jobs=[child_job_2_id])
resp_non_existent_child_job_3_id = batch_client.describe_jobs(
jobs=[non_existent_child_job_3_id]
)

job = resp_main_job["jobs"][0]
job = batch_client.describe_jobs(jobs=[job_id])["jobs"][0]

assert job["arrayProperties"]["size"] == 2
assert job["attempts"] == []

assert "arrayProperties" in job
# If the main job is successful, that means that all child jobs are successful
assert job["arrayProperties"]["size"] == 2
assert resp_child_job_1_id["jobs"]
assert resp_child_job_2_id["jobs"]
assert not resp_non_existent_child_job_3_id["jobs"]
assert job["arrayProperties"]["statusSummary"]["SUCCEEDED"] == 2
# Main job still has no attempts - because only the child jobs are executed
assert job["attempts"] == []

child_job_1 = batch_client.describe_jobs(jobs=[child_job_1_id])["jobs"][0]
assert child_job_1["status"] == "SUCCEEDED"
# Child job was executed
assert len(child_job_1["attempts"]) == 1


@mock_batch_simple
Expand Down

0 comments on commit 301d385

Please sign in to comment.