Skip to content

Commit

Permalink
Add stop_long_running_jobs funcitonality to Scheduler
Browse files Browse the repository at this point in the history
Add test for long_running_jobs
Add explicit iens to init mock
Remove duplicit scheduler
  • Loading branch information
xjules committed Jan 3, 2024
1 parent fc7c5ba commit ef9c45c
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 26 deletions.
14 changes: 5 additions & 9 deletions src/ert/ensemble_evaluator/_builder/_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,12 @@ async def _evaluate_inner( # pylint: disable=too-many-branches
event_creator(identifiers.EVTYPE_ENSEMBLE_STARTED, None)
)

if isinstance(queue, Scheduler):
queue.add_dispatch_information_to_jobs_file()
result = await queue.execute()
elif isinstance(queue, JobQueue):
min_required_realizations = (
self.min_required_realizations if self.stop_long_running else 0
)
queue.add_dispatch_information_to_jobs_file()
min_required_realizations = (
self.min_required_realizations if self.stop_long_running else 0
)

result = await queue.execute(min_required_realizations)
queue.add_dispatch_information_to_jobs_file()
result = await queue.execute(min_required_realizations)

except Exception:
logger.exception(
Expand Down
13 changes: 13 additions & 0 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import logging
import time
import uuid
from enum import Enum
from pathlib import Path
Expand Down Expand Up @@ -67,6 +68,8 @@ def __init__(self, scheduler: Scheduler, real: Realization) -> None:
self._scheduler: Scheduler = scheduler
self._callback_status_msg: str = ""
self._requested_max_submit: Optional[int] = None
self._start_time: Optional[float] = None
self._end_time: Optional[float] = None

@property
def iens(self) -> int:
Expand All @@ -76,6 +79,14 @@ def iens(self) -> int:
def driver(self) -> Driver:
return self._scheduler.driver

@property
def running_duration(self) -> float:
if self._start_time:
if self._end_time:
return self._end_time - self._start_time
return time.time() - self._start_time
return -1

async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:
await sem.acquire()
timeout_task: Optional[asyncio.Task[None]] = None
Expand All @@ -88,6 +99,7 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:

await self._send(State.PENDING)
await self.started.wait()
self._start_time = time.time()

await self._send(State.RUNNING)
if self.real.max_runtime is not None and self.real.max_runtime > 0:
Expand All @@ -106,6 +118,7 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:
)

if callback_status == LoadStatus.LOAD_SUCCESSFUL:
self._end_time = time.time()
await self._send(State.COMPLETED)
else:
assert callback_status in (
Expand Down
45 changes: 34 additions & 11 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,7 @@
from collections import defaultdict
from dataclasses import asdict
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Dict,
MutableMapping,
Optional,
Sequence,
)
from typing import TYPE_CHECKING, Any, Dict, MutableMapping, Optional, Sequence

from pydantic.dataclasses import dataclass
from websockets import Headers
Expand Down Expand Up @@ -83,8 +76,31 @@ def kill_all_jobs(self) -> None:
for task in self._tasks.values():
task.cancel()

def stop_long_running_jobs(self, minimum_required_realizations: int) -> None:
pass
async def _stop_long_running_jobs(self, minimum_required_realizations: int) -> None:
LONG_RUNNING_FACTOR = 1.25
while True:
completed_jobs = []
for job_id, job in self._jobs.items():
try:
if job.returncode.result() != 0:
continue
completed_jobs.append(job_id)
except (asyncio.CancelledError, asyncio.InvalidStateError):
continue

if len(completed_jobs) >= minimum_required_realizations:
average_runtime = sum(
self._jobs[job_id].running_duration for job_id in completed_jobs
) / len(completed_jobs)
for job_id, task in self._tasks.items():
if (
self._jobs[job_id].running_duration
> LONG_RUNNING_FACTOR * average_runtime
and not task.done()
):
task.cancel()
await task
await asyncio.sleep(0.1)

def set_realization(self, realization: Realization) -> None:
self._jobs[realization.iens] = Job(self, realization)
Expand Down Expand Up @@ -126,11 +142,18 @@ def add_dispatch_information_to_jobs_file(self) -> None:
for job in self._jobs.values():
self._update_jobs_json(job.iens, job.real.run_arg.runpath)

async def execute(self, minimum_required_realizations: int = 0) -> str:
async def execute(
self,
min_required_realizations: int = 0,
) -> str:
async with background_tasks() as cancel_when_execute_is_done:
cancel_when_execute_is_done(self._publisher())
cancel_when_execute_is_done(self._process_event_queue())
cancel_when_execute_is_done(self.driver.poll())
if min_required_realizations > 0:
cancel_when_execute_is_done(
self._stop_long_running_jobs(min_required_realizations)
)

start = asyncio.Event()
sem = asyncio.BoundedSemaphore(self._max_running)
Expand Down
19 changes: 13 additions & 6 deletions tests/unit_tests/scheduler/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,26 @@ def __init__(self, init=None, wait=None, kill=None):
self._mock_wait = wait
self._mock_kill = kill

async def _init(self, *args, **kwargs):
async def _init(self, iens, *args, **kwargs):
if self._mock_init is not None:
await self._mock_init(*args, **kwargs)
await self._mock_init(iens, *args, **kwargs)
return iens

async def _wait(self, *args):
async def _wait(self, iens):
if self._mock_wait is not None:
result = await self._mock_wait()
if self._mock_wait.__code__.co_argcount > 0:
result = await self._mock_wait(iens)
else:
result = await self._mock_wait()
return True if result is None else bool(result)
return True

async def _kill(self, *args):
async def _kill(self, iens, *args):
if self._mock_kill is not None:
await self._mock_kill()
if self._mock_kill.__code__.co_argcount > 0:
await self._mock_kill(iens)
else:
await self._mock_kill()


@pytest.fixture
Expand Down
31 changes: 31 additions & 0 deletions tests/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,34 @@ async def init(iens, *args, **kwargs):
assert sch.is_active()
await execute_task
assert not sch.is_active()


@pytest.mark.timeout(6)
async def test_that_long_running_jobs_were_stopped(storage, tmp_path, mock_driver):
killed_iens = []

async def kill(iens):
nonlocal killed_iens
killed_iens.append(iens)

async def wait(iens):
# all jobs with iens > 5 will sleep for 10 seconds and should be killed
if iens < 6:
await asyncio.sleep(0.5)
else:
await asyncio.sleep(10)
return True

ensemble_size = 10
ensemble = storage.create_experiment().create_ensemble(
name="foo", ensemble_size=ensemble_size
)
realizations = [
create_stub_realization(ensemble, tmp_path, iens)
for iens in range(ensemble_size)
]

sch = scheduler.Scheduler(mock_driver(wait=wait, kill=kill), realizations)

assert await sch.execute(min_required_realizations=5) == EVTYPE_ENSEMBLE_STOPPED
assert killed_iens == [6, 7, 8, 9]

0 comments on commit ef9c45c

Please sign in to comment.