Skip to content

Commit

Permalink
Add stop_long_running_jobs funcitonality to Scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Dec 29, 2023
1 parent 5e2df1f commit 7136b4d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 14 deletions.
8 changes: 4 additions & 4 deletions src/ert/ensemble_evaluator/_builder/_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,12 @@ async def _evaluate_inner( # pylint: disable=too-many-branches
event_creator(identifiers.EVTYPE_ENSEMBLE_STARTED, None)
)

min_required_realizations = (
self.min_required_realizations if self.stop_long_running else 0
)
if isinstance(queue, Scheduler):
result = await queue.execute()
result = await queue.execute(min_required_realizations)
elif isinstance(queue, JobQueue):
min_required_realizations = (
self.min_required_realizations if self.stop_long_running else 0
)
queue.add_dispatch_information_to_jobs_file()

result = await queue.execute(min_required_realizations)
Expand Down
9 changes: 9 additions & 0 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import logging
import time
import uuid
from enum import Enum
from typing import TYPE_CHECKING, Optional
Expand Down Expand Up @@ -58,6 +59,7 @@ def __init__(self, scheduler: Scheduler, real: Realization) -> None:
self.returncode: asyncio.Future[int] = asyncio.Future()
self.aborted = asyncio.Event()
self._scheduler = scheduler
self._start_time: Optional[float] = None

@property
def iens(self) -> int:
Expand All @@ -67,6 +69,12 @@ def iens(self) -> int:
def driver(self) -> Driver:
return self._scheduler.driver

@property
def running_duration(self) -> float:
if self._start_time:
return time.time() - self._start_time
return -1

async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:
await sem.acquire()
timeout_task: Optional[asyncio.Task[None]] = None
Expand All @@ -79,6 +87,7 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:

await self._send(State.PENDING)
await self.started.wait()
self._start_time = time.time()

await self._send(State.RUNNING)
if self.real.max_runtime is not None and self.real.max_runtime > 0:
Expand Down
51 changes: 41 additions & 10 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,7 @@
import os
import ssl
from dataclasses import asdict
from typing import (
TYPE_CHECKING,
Any,
Mapping,
MutableMapping,
Optional,
Sequence,
)
from typing import TYPE_CHECKING, Any, Mapping, MutableMapping, Optional, Sequence

from pydantic.dataclasses import dataclass
from websockets import Headers
Expand Down Expand Up @@ -79,8 +72,43 @@ def kill_all_jobs(self) -> None:
for task in self._tasks.values():
task.cancel()

def stop_long_running_jobs(self, minimum_required_realizations: int) -> None:
pass
async def _stop_long_running_jobs(self, minimum_required_realizations: int) -> None:
LONG_RUNNING_FACTOR = 1.25
while True:
completed_jobs = []
for job_id, job in self._jobs.items():
try:
if job.returncode.result() != 0:
continue
completed_jobs.append(job_id)
except (asyncio.CancelledError, asyncio.InvalidStateError):
continue

finished_realizations = len(completed_jobs)

if not finished_realizations:
logger.error(
"Attempted to stop finished jobs when none was found in scheduler"
)
return

if finished_realizations < minimum_required_realizations:
return

average_runtime = (
sum(self._jobs[job_id].running_duration for job_id in completed_jobs)
/ finished_realizations
)

for job_id, task in self._tasks:
if (
self._jobs[job_id].running_duration
> LONG_RUNNING_FACTOR * average_runtime
and not task.done()
):
task.cancel()
await task
asyncio.sleep(2)

async def _publisher(self) -> None:
if not self._ee_uri:
Expand Down Expand Up @@ -112,11 +140,14 @@ def add_dispatch_information_to_jobs_file(self) -> None:

async def execute(
self,
min_required_realizations: int = 0,
) -> str:
async with background_tasks() as cancel_when_execute_is_done:
cancel_when_execute_is_done(self._publisher())
cancel_when_execute_is_done(self._process_event_queue())
cancel_when_execute_is_done(self.driver.poll())
if min_required_realizations > 0:
cancel_when_execute_is_done(self._stop_long_running_jobs())

start = asyncio.Event()
sem = asyncio.BoundedSemaphore(self._max_running)
Expand Down

0 comments on commit 7136b4d

Please sign in to comment.