Skip to content

Commit

Permalink
Rename job to something better in fm_dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Feb 5, 2025
1 parent be468ee commit 68271ad
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 43 deletions.
62 changes: 35 additions & 27 deletions src/_ert/forward_model_runner/fm_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@
)
from _ert.forward_model_runner.runner import ForwardModelRunner

JOBS_FILE = "jobs.json"
# This is incorrecty named, but is kept to avoid a breaking change.
# "job" was previously used for what is now called a "forward_model_step".
FORWARD_MODEL_DESCRIPTION_FILE = "jobs.json"

# On shared filesystems race conditions between different computers can occur
# yielding FileNotFoundError due to synchronization issues. This constant
# determines how long we can wait for synchronization to happen:
FILE_RETRY_TIME = 30

logger = logging.getLogger(__name__)

Expand All @@ -41,7 +48,7 @@ def _setup_reporters(


def _setup_logging(directory: str = "logs"):
job_runner_logger = logging.getLogger("_ert.forward_model_runner")
fm_runner_logger = logging.getLogger("_ert.forward_model_runner")
memory_csv_logger = logging.getLogger("_ert.forward_model_memory_profiler")

os.makedirs(directory, exist_ok=True)
Expand All @@ -64,28 +71,29 @@ def _setup_logging(directory: str = "logs"):
# Write the CSV header to the file:
memory_csv_logger.info(ProcessTreeStatus().csv_header())

job_runner_logger.addHandler(handler)
job_runner_logger.setLevel(logging.DEBUG)


JOBS_JSON_RETRY_TIME = 30
fm_runner_logger.addHandler(handler)
fm_runner_logger.setLevel(logging.DEBUG)


def _wait_for_retry():
time.sleep(JOBS_JSON_RETRY_TIME)
time.sleep(FILE_RETRY_TIME)


def _read_jobs_file(retry=True):
def _read_fm_description_file(retry=True):
try:
with open(JOBS_FILE, encoding="utf-8") as json_file:
with open(FORWARD_MODEL_DESCRIPTION_FILE, encoding="utf-8") as json_file:
return json.load(json_file)
except json.JSONDecodeError as e:
raise OSError("Job Runner cli failed to load JSON-file.") from e
raise OSError(
"fm_dispatch failed to load JSON-file describing the forward model."
) from e
except FileNotFoundError as e:
if retry:
logger.error(f"Could not find file {JOBS_FILE}, retrying")
logger.error(
f"Could not find file {FORWARD_MODEL_DESCRIPTION_FILE}, retrying"
)
_wait_for_retry()
return _read_jobs_file(retry=False)
return _read_fm_description_file(retry=False)
else:
raise e

Expand All @@ -94,7 +102,7 @@ def _report_all_messages(
messages: Generator[Message], reporters: list[reporting.Reporter]
) -> None:
for msg in messages:
logger.info(f"Job status: {msg}")
logger.info(f"Forward model status: {msg}")
i = 0
while i < len(reporters):
reporter = reporters[i]
Expand Down Expand Up @@ -134,17 +142,17 @@ def sigterm_handler(_signo, _stack_frame):
def fm_dispatch(args):
parser = argparse.ArgumentParser(
description=(
"Run all the jobs specified in jobs.json, "
"or specify the names of the jobs to run."
"Run all the forward model steps specified in jobs.json, "
"or specify the names of the steps to run."
)
)
parser.add_argument("run_path", nargs="?", help="Path where jobs.json is located")
parser.add_argument(
"job",
"steps",
nargs="*",
help=(
"One or more jobs to be executed from the jobs.json file. "
"If no jobs are specified, all jobs will be executed."
"One or more forward model steps to be executed from the jobs.json file. "
"If no steps are specified, all steps will be executed."
),
)

Expand All @@ -159,14 +167,14 @@ def fm_dispatch(args):
# Make sure that logging is setup _after_ we have moved to the runpath directory
_setup_logging()

jobs_data = _read_jobs_file()
fm_description = _read_fm_description_file()

experiment_id = jobs_data.get("experiment_id")
ens_id = jobs_data.get("ens_id")
ee_token = jobs_data.get("ee_token")
dispatch_url = jobs_data.get("dispatch_url")
experiment_id = fm_description.get("experiment_id")
ens_id = fm_description.get("ens_id")
ee_token = fm_description.get("ee_token")
dispatch_url = fm_description.get("dispatch_url")

is_interactive_run = len(parsed_args.job) > 0
is_interactive_run = len(parsed_args.steps) > 0
reporters = _setup_reporters(
is_interactive_run,
ens_id,
Expand All @@ -175,9 +183,9 @@ def fm_dispatch(args):
experiment_id,
)

job_runner = ForwardModelRunner(jobs_data)
fm_runner = ForwardModelRunner(fm_description)
signal.signal(signal.SIGTERM, lambda _, __: _stop_reporters_and_sigkill(reporters))
_report_all_messages(job_runner.run(parsed_args.job), reporters)
_report_all_messages(fm_runner.run(parsed_args.steps), reporters)


def main():
Expand Down
4 changes: 1 addition & 3 deletions tests/ert/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@

@pytest.fixture(autouse=True)
def no_jobs_file_retry(monkeypatch):
monkeypatch.setattr(
_ert.forward_model_runner.fm_dispatch, "JOBS_JSON_RETRY_TIME", 0
)
monkeypatch.setattr(_ert.forward_model_runner.fm_dispatch, "FILE_RETRY_TIME", 0)


@pytest.fixture(autouse=True)
Expand Down
27 changes: 14 additions & 13 deletions tests/ert/unit_tests/forward_model_runner/test_fm_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import _ert.forward_model_runner.fm_dispatch
from _ert.forward_model_runner.fm_dispatch import (
JOBS_FILE,
FORWARD_MODEL_DESCRIPTION_FILE,
_report_all_messages,
_setup_reporters,
fm_dispatch,
Expand Down Expand Up @@ -55,7 +55,7 @@ def test_terminate_steps():
executable = os.path.realpath("dummy_executable")
os.chmod("dummy_executable", stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG)

step_list = {
fm_description = {
"global_environment": {},
"global_update_path": {},
"jobList": [
Expand All @@ -81,8 +81,8 @@ def test_terminate_steps():
"ert_pid": "",
}

with open(JOBS_FILE, "w", encoding="utf-8") as f:
f.write(json.dumps(step_list))
with open(FORWARD_MODEL_DESCRIPTION_FILE, "w", encoding="utf-8") as f:
f.write(json.dumps(fm_description))

# macOS doesn't provide /usr/bin/setsid, so we roll our own
with open("setsid", "w", encoding="utf-8") as f:
Expand Down Expand Up @@ -151,7 +151,7 @@ def test_memory_profile_is_logged_as_csv(monkeypatch):
* fm_step_repeats,
}

with open(JOBS_FILE, "w", encoding="utf-8") as f:
with open(FORWARD_MODEL_DESCRIPTION_FILE, "w", encoding="utf-8") as f:
f.write(json.dumps(forward_model_steps))

monkeypatch.setattr(
Expand Down Expand Up @@ -181,7 +181,7 @@ def test_fm_dispatch_run_subset_specified_as_parameter():
executable = os.path.realpath("dummy_executable")
os.chmod("dummy_executable", stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG)

job_list = {
fm_description = {
"global_environment": {},
"global_update_path": {},
"jobList": [
Expand Down Expand Up @@ -241,8 +241,8 @@ def test_fm_dispatch_run_subset_specified_as_parameter():
"ert_pid": "",
}

with open(JOBS_FILE, "w", encoding="utf-8") as f:
f.write(json.dumps(job_list))
with open(FORWARD_MODEL_DESCRIPTION_FILE, "w", encoding="utf-8") as f:
f.write(json.dumps(fm_description))

# macOS doesn't provide /usr/bin/setsid, so we roll our own
with open("setsid", "w", encoding="utf-8") as f:
Expand Down Expand Up @@ -287,7 +287,7 @@ def test_no_jobs_json_file_raises_IOError(tmp_path):


def test_invalid_jobs_json_raises_OSError(tmp_path):
(tmp_path / JOBS_FILE).write_text("not json")
(tmp_path / FORWARD_MODEL_DESCRIPTION_FILE).write_text("not json")

with pytest.raises(OSError):
fm_dispatch(["script.py", str(tmp_path)])
Expand All @@ -314,11 +314,12 @@ def test_retry_of_jobs_json_file_read(unused_tcp_port, tmp_path, monkeypatch, ca

def create_jobs_file_after_lock():
_wait_until(
lambda: f"Could not find file {JOBS_FILE}, retrying" in caplog.text,
lambda: f"Could not find file {FORWARD_MODEL_DESCRIPTION_FILE}, retrying"
in caplog.text,
2,
"Did not get expected log message from missing jobs.json",
f"Did not get expected log message from missing {FORWARD_MODEL_DESCRIPTION_FILE}",
)
(tmp_path / JOBS_FILE).write_text(jobs_json)
(tmp_path / FORWARD_MODEL_DESCRIPTION_FILE).write_text(jobs_json)
lock.release()

with MockZMQServer(unused_tcp_port):
Expand Down Expand Up @@ -349,7 +350,7 @@ def test_setup_reporters(is_interactive_run, ens_id):


@pytest.mark.usefixtures("use_tmpdir")
def test_fm_dispatch_kills_itself_after_unsuccessful_job(unused_tcp_port):
def test_fm_dispatch_kills_itself_after_unsuccessful_step(unused_tcp_port):
port = unused_tcp_port
jobs_json = json.dumps(
{"ens_id": "_id_", "dispatch_url": f"tcp://localhost:{port}"}
Expand Down

0 comments on commit 68271ad

Please sign in to comment.