From efc921413a537ba82e46ce6a28ce67dbe30c9f6d Mon Sep 17 00:00:00 2001 From: xjules Date: Fri, 29 Dec 2023 15:19:21 +0100 Subject: [PATCH] Add stop_long_running_jobs funcitonality to Scheduler --- .../ensemble_evaluator/_builder/_legacy.py | 8 ++-- src/ert/scheduler/job.py | 9 ++++ src/ert/scheduler/scheduler.py | 42 ++++++++++++++----- 3 files changed, 45 insertions(+), 14 deletions(-) diff --git a/src/ert/ensemble_evaluator/_builder/_legacy.py b/src/ert/ensemble_evaluator/_builder/_legacy.py index 5a70bedacdc..94fe3c4aff8 100644 --- a/src/ert/ensemble_evaluator/_builder/_legacy.py +++ b/src/ert/ensemble_evaluator/_builder/_legacy.py @@ -223,12 +223,12 @@ async def _evaluate_inner( # pylint: disable=too-many-branches event_creator(identifiers.EVTYPE_ENSEMBLE_STARTED, None) ) + min_required_realizations = ( + self.min_required_realizations if self.stop_long_running else 0 + ) if isinstance(queue, Scheduler): - result = await queue.execute() + result = await queue.execute(min_required_realizations) elif isinstance(queue, JobQueue): - min_required_realizations = ( - self.min_required_realizations if self.stop_long_running else 0 - ) queue.add_dispatch_information_to_jobs_file() result = await queue.execute(min_required_realizations) diff --git a/src/ert/scheduler/job.py b/src/ert/scheduler/job.py index d9000944cf4..9bc9a95bd99 100644 --- a/src/ert/scheduler/job.py +++ b/src/ert/scheduler/job.py @@ -2,6 +2,7 @@ import asyncio import logging +import time import uuid from enum import Enum from typing import TYPE_CHECKING, Optional @@ -58,6 +59,7 @@ def __init__(self, scheduler: Scheduler, real: Realization) -> None: self.returncode: asyncio.Future[int] = asyncio.Future() self.aborted = asyncio.Event() self._scheduler = scheduler + self._start_time: Optional[float] = None @property def iens(self) -> int: @@ -67,6 +69,12 @@ def iens(self) -> int: def driver(self) -> Driver: return self._scheduler.driver + @property + def running_duration(self) -> float: + if self._start_time: + return time.time() - self._start_time + return -1 + async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None: await sem.acquire() timeout_task: Optional[asyncio.Task[None]] = None @@ -79,6 +87,7 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None: await self._send(State.PENDING) await self.started.wait() + self._start_time = time.time() await self._send(State.RUNNING) if self.real.max_runtime is not None and self.real.max_runtime > 0: diff --git a/src/ert/scheduler/scheduler.py b/src/ert/scheduler/scheduler.py index deadf123935..6fc5184cfdf 100644 --- a/src/ert/scheduler/scheduler.py +++ b/src/ert/scheduler/scheduler.py @@ -6,14 +6,7 @@ import os import ssl from dataclasses import asdict -from typing import ( - TYPE_CHECKING, - Any, - Mapping, - MutableMapping, - Optional, - Sequence, -) +from typing import TYPE_CHECKING, Any, Mapping, MutableMapping, Optional, Sequence from pydantic.dataclasses import dataclass from websockets import Headers @@ -79,8 +72,32 @@ def kill_all_jobs(self) -> None: for task in self._tasks.values(): task.cancel() - def stop_long_running_jobs(self, minimum_required_realizations: int) -> None: - pass + async def _stop_long_running_jobs(self, minimum_required_realizations: int) -> None: + LONG_RUNNING_FACTOR = 1.25 + while True: + completed_jobs = [] + for job_id, job in self._jobs.items(): + try: + if job.returncode.result() != 0: + continue + completed_jobs.append(job_id) + except (asyncio.CancelledError, asyncio.InvalidStateError): + continue + + if len(completed_jobs) >= minimum_required_realizations: + average_runtime = sum( + self._jobs[job_id].running_duration for job_id in completed_jobs + ) / len(completed_jobs) + + for job_id, task in self._tasks: + if ( + self._jobs[job_id].running_duration + > LONG_RUNNING_FACTOR * average_runtime + and not task.done() + ): + task.cancel() + await task + await asyncio.sleep(2) async def _publisher(self) -> None: if not self._ee_uri: @@ -112,11 +129,16 @@ def add_dispatch_information_to_jobs_file(self) -> None: async def execute( self, + min_required_realizations: int = 0, ) -> str: async with background_tasks() as cancel_when_execute_is_done: cancel_when_execute_is_done(self._publisher()) cancel_when_execute_is_done(self._process_event_queue()) cancel_when_execute_is_done(self.driver.poll()) + if min_required_realizations > 0: + cancel_when_execute_is_done( + self._stop_long_running_jobs(min_required_realizations) + ) start = asyncio.Event() sem = asyncio.BoundedSemaphore(self._max_running)