diff --git a/src/ert/job_queue/job_queue_node.py b/src/ert/job_queue/job_queue_node.py index 497e6c277bf..d16d0ca540c 100644 --- a/src/ert/job_queue/job_queue_node.py +++ b/src/ert/job_queue/job_queue_node.py @@ -3,6 +3,7 @@ import logging import random import time +from pathlib import Path from threading import Lock, Semaphore, Thread from typing import TYPE_CHECKING, Callable, Optional @@ -58,26 +59,26 @@ def __call__( ) -class JobQueueNode(BaseCClass): # type: ignore - TYPE_NAME = "job_queue_node" - - _alloc = ResPrototype( - "void* job_queue_node_alloc(char*," - "char*," - "char*," - "int, " - "char*," - "char*" - ")", - bind=False, - ) - _free = ResPrototype("void job_queue_node_free(job_queue_node)") - _get_status = ResPrototype( - "job_status_type_enum job_queue_node_get_status(job_queue_node)" - ) - _set_queue_status = ResPrototype( - "void job_queue_node_set_status(job_queue_node, job_status_type_enum)" - ) +class JobQueueNode: + # TYPE_NAME = "job_queue_node" + + # _alloc = ResPrototype( + # "void* job_queue_node_alloc(char*," + # "char*," + # "char*," + # "int, " + # "char*," + # "char*" + # ")", + # bind=False, + # ) + # _free = ResPrototype("void job_queue_node_free(job_queue_node)") + # _get_status = ResPrototype( + # "job_status_type_enum job_queue_node_get_status(job_queue_node)" + # ) + # _set_queue_status = ResPrototype( + # "void job_queue_node_set_status(job_queue_node, job_status_type_enum)" + # ) def __init__( self, @@ -101,23 +102,36 @@ def __init__( self._start_time: Optional[float] = None self._end_time: Optional[float] = None self._timed_out = False + self._status: Optional[JobStatus] = None self._status_msg = "" - c_ptr = self._alloc( - run_arg.job_name, - run_arg.runpath, - job_script, - num_cpu, - status_file, - exit_file, - ) - if c_ptr is not None: - super().__init__(c_ptr) - else: - raise ValueError("Unable to create job node object") + # c-struct attributes + self._status_file: str = status_file + self._exit_file: str = exit_file + self._run_cmd: str = job_script + self._job_name: str = run_arg.job_name + self._run_path: str = run_arg.runpath + self._num_cpu: int = num_cpu + self._queue_index: int = 0 + self._submit_attempt: int = 0 + self._confirmed_running: bool = False + # c_ptr = self._alloc( + # run_arg.job_name, + # run_arg.runpath, + # job_script, + # num_cpu, + # status_file, + # exit_file, + # ) + + # if c_ptr is not None: + # super().__init__(c_ptr) + # else: + # raise ValueError("Unable to create job node object") def free(self) -> None: - self._free() + # self._free() + pass def __str__(self) -> str: return ( @@ -137,7 +151,27 @@ def timed_out(self) -> bool: @property def submit_attempt(self) -> int: - return _get_submit_attempt(self) + # return _get_submit_attempt(self) + return self._submit_attempt + + def refresh_status(self, driver: Driver): + if self.queue_status == JobStatus.RUNNING and not self._confirmed_running: + self._confirmed_running = Path(self._status_file).exists() + if not self._confirmed_running: + MAX_CONFIRMED_WAIT = 10 * 60 + if (time.time() - self._start_time) > MAX_CONFIRMED_WAIT: + logger.error( + f"max_confirm_wait {MAX_CONFIRMED_WAIT} has passed since sim_start" + f"without success; {self._job_name} is assumed dead (attempt {self._submit_attempt})" + ) + self._status = JobStatus.DO_KILL_NODE_FAILURE + if self.queue_status in [ + JobStatus.RUNNING, + JobStatus.PENDING, + JobStatus.SUBMITTED, + JobStatus.UNKNOWN, + ]: + self._status = driver.get_status(self) def _poll_queue_status(self, driver: "Driver") -> JobStatus: result, msg = _refresh_status(self, driver) @@ -147,7 +181,8 @@ def _poll_queue_status(self, driver: "Driver") -> JobStatus: @property def queue_status(self) -> JobStatus: - return self._get_status() + # return self._get_status() + return self._status @queue_status.setter def queue_status(self, value: JobStatus) -> None: diff --git a/src/ert/job_queue/queue.py b/src/ert/job_queue/queue.py index d2557fff1a3..3f1d5c3656d 100644 --- a/src/ert/job_queue/queue.py +++ b/src/ert/job_queue/queue.py @@ -86,10 +86,11 @@ def _queue_state_event_type(state: str) -> str: class Driver: - pass + def get_status(self, node: JobQueueNode) -> JobStatus: + return JobStatus.RUNNING -class JobQueue: # type: ignore +class JobQueue: # TYPE_NAME = "job_queue" # _alloc = ResPrototype("void* job_queue_alloc(void*)", bind=False) # _free = ResPrototype("void job_queue_free( job_queue )")