Skip to content

Commit

Permalink
Provide a standalone task to handle completed jobs duration aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Jan 3, 2024
1 parent ef9c45c commit 6bb7106
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
7 changes: 5 additions & 2 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def running_duration(self) -> float:
if self._end_time:
return self._end_time - self._start_time
return time.time() - self._start_time
return -1
return 0

async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:
await sem.acquire()
Expand Down Expand Up @@ -118,7 +118,6 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:
)

if callback_status == LoadStatus.LOAD_SUCCESSFUL:
self._end_time = time.time()
await self._send(State.COMPLETED)
else:
assert callback_status in (
Expand Down Expand Up @@ -192,6 +191,10 @@ async def _send(self, state: State) -> None:
if state in (State.FAILED, State.ABORTED):
await self._handle_failure()

if state == State.COMPLETED:
self._end_time = time.time()
await self._scheduler.completed_jobs.put(self.iens)

status = STATE_TO_LEGACY[state]
event = CloudEvent(
{
Expand Down
35 changes: 19 additions & 16 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ def __init__(
}

self._events: asyncio.Queue[Any] = asyncio.Queue()
self._average_job_runtime: float = 0
self._completed_jobs_num: int = 0
self.completed_jobs: asyncio.Queue[int] = asyncio.Queue()

self._cancelled = False
self._max_submit = max_submit
self._max_running = max_running
Expand All @@ -76,26 +80,24 @@ def kill_all_jobs(self) -> None:
for task in self._tasks.values():
task.cancel()

async def _stop_long_running_jobs(self, minimum_required_realizations: int) -> None:
LONG_RUNNING_FACTOR = 1.25
async def _update_avg_job_runtime(self) -> None:
while True:
job_id = await self.completed_jobs.get()
self._average_job_runtime = (
self._average_job_runtime * self._completed_jobs_num
+ self._jobs[job_id].running_duration
) / (self._completed_jobs_num + 1)
self._completed_jobs_num += 1

async def _stop_long_running_jobs(
self, minimum_required_realizations: int, long_running_factor: int = 1.25
) -> None:
while True:
completed_jobs = []
for job_id, job in self._jobs.items():
try:
if job.returncode.result() != 0:
continue
completed_jobs.append(job_id)
except (asyncio.CancelledError, asyncio.InvalidStateError):
continue

if len(completed_jobs) >= minimum_required_realizations:
average_runtime = sum(
self._jobs[job_id].running_duration for job_id in completed_jobs
) / len(completed_jobs)
if self._completed_jobs_num >= minimum_required_realizations:
for job_id, task in self._tasks.items():
if (
self._jobs[job_id].running_duration
> LONG_RUNNING_FACTOR * average_runtime
> long_running_factor * self._average_job_runtime
and not task.done()
):
task.cancel()
Expand Down Expand Up @@ -154,6 +156,7 @@ async def execute(
cancel_when_execute_is_done(
self._stop_long_running_jobs(min_required_realizations)
)
cancel_when_execute_is_done(self._update_avg_job_runtime())

start = asyncio.Event()
sem = asyncio.BoundedSemaphore(self._max_running)
Expand Down

0 comments on commit 6bb7106

Please sign in to comment.