Skip to content

Commit

Permalink
Scheduler limits bugfix (#92)
Browse files Browse the repository at this point in the history
* Add regression test for keyerror in job limits util

* Don't assume all jobs share the same keys in their limits
  • Loading branch information
naegelyd-insitro authored Mar 5, 2024
1 parent 4383286 commit 2160e8b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
14 changes: 7 additions & 7 deletions redun/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1311,25 +1311,25 @@ def _add_job_pending_limits(self, job: Job, eval_args: Tuple[Tuple, dict]) -> No
"""
self._jobs_pending_limits.append((job, eval_args))

def _add_limits(self, limits1, limits2):
return {
limit_name: limits1.get(limit_name, 0) + limits2.get(limit_name, 0)
for limit_name in set(limits1) | set(limits2)
}

def _check_jobs_pending_limits(self) -> None:
"""
Checks whether Jobs pending due to limits can now satisfy limits and run.
"""

def _add_limits(limits1, limits2):
return {
limit_name: limits1[limit_name] + limits2[limit_name]
for limit_name in set(limits1) | set(limits2)
}

ready_jobs: List[Tuple[Job, Tuple[tuple, dict]]] = []
not_ready_jobs: List[Tuple[Job, Tuple[tuple, dict]]] = []
ready_job_limits: Dict[str, int] = defaultdict(int)

# Resource limits that will be consumed by ready jobs.
for job, eval_args in self._jobs_pending_limits:
job_limits = job.get_limits()
proposed_limits = _add_limits(job_limits, ready_job_limits)
proposed_limits = self._add_limits(job_limits, ready_job_limits)
if self._is_job_within_limits(proposed_limits):
ready_jobs.append((job, eval_args))
ready_job_limits = proposed_limits
Expand Down
8 changes: 7 additions & 1 deletion redun/tests/test_limits.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ def test_scheduler_limits_utils() -> None:
"""
scheduler = Scheduler(config=Config(CONFIG_DICT))

job_limits = {"api": 1, "memory": MEMORY_LIMIT}
job_limits1 = {"api": 1, "memory": MEMORY_LIMIT}
job_limits2 = {"api": 1}

# Confirm we can add limits together where not all keys are present in both jobs. Regression
# test added for:
# https://github.com/insitro/redun/issues/91
job_limits = scheduler._add_limits(job_limits1, job_limits2)

# Check that a job that should have enough resources to run is correctly deemed within limits.
assert scheduler._is_job_within_limits(job_limits)
Expand Down

0 comments on commit 2160e8b

Please sign in to comment.