From c1b8b227744db3c690e18c48182190f45812e44c Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 15 Nov 2024 20:19:36 +0100 Subject: [PATCH] Queuing system submission: check if the job is already waiting in the queue or currently running. (#499) * Queuing system submission: check if the job is already waiting in the queue or currently running. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update shared.py * fix tests * Add tests * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * test without working directoru parameter * more fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * more tests --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- executorlib/cache/executor.py | 4 +- .../cache/queue.py => cache/queue_spawner.py} | 72 +++++++++++-------- executorlib/cache/shared.py | 1 + .../subprocess_spawner.py} | 5 ++ executorlib/standalone/cache/__init__.py | 0 executorlib/standalone/hdf.py | 11 ++- executorlib/standalone/inputcheck.py | 8 +++ tests/test_cache_executor_mpi.py | 2 +- tests/test_cache_executor_serial.py | 18 +++-- tests/test_cache_hdf.py | 24 ++++++- tests/test_pysqa_subprocess.py | 2 +- tests/test_shared_input_check.py | 7 ++ 12 files changed, 111 insertions(+), 43 deletions(-) rename executorlib/{standalone/cache/queue.py => cache/queue_spawner.py} (60%) rename executorlib/{standalone/cache/spawner.py => cache/subprocess_spawner.py} (89%) delete mode 100644 executorlib/standalone/cache/__init__.py diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index ea42bad8..a2de5bf1 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -3,7 +3,7 @@ from executorlib.base.executor import ExecutorBase from executorlib.cache.shared import execute_tasks_h5 -from executorlib.standalone.cache.spawner import ( +from executorlib.cache.subprocess_spawner import ( execute_in_subprocess, terminate_subprocess, ) @@ -17,7 +17,7 @@ from executorlib.standalone.thread import RaisingThread try: - from executorlib.standalone.cache.queue import execute_with_pysqa + from executorlib.cache.queue_spawner import execute_with_pysqa except ImportError: # If pysqa is not available fall back to executing tasks in a subprocess execute_with_pysqa = execute_in_subprocess diff --git a/executorlib/standalone/cache/queue.py b/executorlib/cache/queue_spawner.py similarity index 60% rename from executorlib/standalone/cache/queue.py rename to executorlib/cache/queue_spawner.py index d448d4e6..e30f44b6 100644 --- a/executorlib/standalone/cache/queue.py +++ b/executorlib/cache/queue_spawner.py @@ -1,24 +1,29 @@ import os import subprocess -from typing import List, Optional, Union +from typing import List, Optional, Tuple, Union from pysqa import QueueAdapter +from executorlib.standalone.hdf import dump, get_queue_id +from executorlib.standalone.inputcheck import check_file_exists + def execute_with_pysqa( - command: str, - resource_dict: dict, - task_dependent_lst: List[int] = [], + command: list, + task_dependent_lst: list[int] = [], + file_name: Optional[str] = None, + resource_dict: Optional[dict] = None, config_directory: Optional[str] = None, backend: Optional[str] = None, cache_directory: Optional[str] = None, -) -> int: +) -> Tuple[int, int]: """ Execute a command by submitting it to the queuing system Args: command (list): The command to be executed. task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. + file_name (str): Name of the HDF5 file which contains the Python function resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function. Example resource dictionary: { cwd: None, @@ -30,37 +35,42 @@ def execute_with_pysqa( Returns: int: queuing system ID """ - if resource_dict is None: - resource_dict = {} - if "cwd" in resource_dict and resource_dict["cwd"] is not None: - cwd = resource_dict["cwd"] - else: - cwd = cache_directory + check_file_exists(file_name=file_name) + queue_id = get_queue_id(file_name=file_name) qa = QueueAdapter( directory=config_directory, queue_type=backend, execute_command=_pysqa_execute_command, ) - submit_kwargs = { - "command": " ".join(command), - "dependency_list": [str(qid) for qid in task_dependent_lst], - "working_directory": os.path.abspath(cwd), - } - if "cwd" in resource_dict: - del resource_dict["cwd"] - unsupported_keys = [ - "threads_per_core", - "gpus_per_core", - "openmpi_oversubscribe", - "slurm_cmd_args", - ] - for k in unsupported_keys: - if k in resource_dict: - del resource_dict[k] - if "job_name" not in resource_dict: - resource_dict["job_name"] = "pysqa" - submit_kwargs.update(resource_dict) - return qa.submit_job(**submit_kwargs) + if queue_id is None or qa.get_status_of_job(process_id=queue_id) is None: + if resource_dict is None: + resource_dict = {} + if "cwd" in resource_dict and resource_dict["cwd"] is not None: + cwd = resource_dict["cwd"] + else: + cwd = cache_directory + submit_kwargs = { + "command": " ".join(command), + "dependency_list": [str(qid) for qid in task_dependent_lst], + "working_directory": os.path.abspath(cwd), + } + if "cwd" in resource_dict: + del resource_dict["cwd"] + unsupported_keys = [ + "threads_per_core", + "gpus_per_core", + "openmpi_oversubscribe", + "slurm_cmd_args", + ] + for k in unsupported_keys: + if k in resource_dict: + del resource_dict[k] + if "job_name" not in resource_dict: + resource_dict["job_name"] = "pysqa" + submit_kwargs.update(resource_dict) + queue_id = qa.submit_job(**submit_kwargs) + dump(file_name=file_name, data_dict={"queue_id": queue_id}) + return queue_id def _pysqa_execute_command( diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index e9dba894..e3ae504c 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -127,6 +127,7 @@ def execute_tasks_h5( file_name=file_name, cores=task_resource_dict["cores"], ), + file_name=file_name, task_dependent_lst=task_dependent_lst, resource_dict=task_resource_dict, config_directory=pysqa_config_directory, diff --git a/executorlib/standalone/cache/spawner.py b/executorlib/cache/subprocess_spawner.py similarity index 89% rename from executorlib/standalone/cache/spawner.py rename to executorlib/cache/subprocess_spawner.py index cfc3c207..69bf0c01 100644 --- a/executorlib/standalone/cache/spawner.py +++ b/executorlib/cache/subprocess_spawner.py @@ -2,10 +2,13 @@ import time from typing import Optional +from executorlib.standalone.inputcheck import check_file_exists + def execute_in_subprocess( command: list, task_dependent_lst: list = [], + file_name: Optional[str] = None, resource_dict: Optional[dict] = None, config_directory: Optional[str] = None, backend: Optional[str] = None, @@ -17,6 +20,7 @@ def execute_in_subprocess( Args: command (list): The command to be executed. task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. + file_name (str): Name of the HDF5 file which contains the Python function resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function. Example resource dictionary: { cwd: None, @@ -29,6 +33,7 @@ def execute_in_subprocess( subprocess.Popen: The subprocess object. """ + check_file_exists(file_name=file_name) while len(task_dependent_lst) > 0: task_dependent_lst = [ task for task in task_dependent_lst if task.poll() is None diff --git a/executorlib/standalone/cache/__init__.py b/executorlib/standalone/cache/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/executorlib/standalone/hdf.py b/executorlib/standalone/hdf.py index 9e8c8798..06048f6b 100644 --- a/executorlib/standalone/hdf.py +++ b/executorlib/standalone/hdf.py @@ -1,4 +1,4 @@ -from typing import Tuple +from typing import Optional, Tuple import cloudpickle import h5py @@ -18,6 +18,7 @@ def dump(file_name: str, data_dict: dict) -> None: "args": "input_args", "kwargs": "input_kwargs", "output": "output", + "queue_id": "queue_id", } with h5py.File(file_name, "a") as fname: for data_key, data_value in data_dict.items(): @@ -70,3 +71,11 @@ def get_output(file_name: str) -> Tuple[bool, object]: return True, cloudpickle.loads(np.void(hdf["/output"])) else: return False, None + + +def get_queue_id(file_name: str) -> Optional[int]: + with h5py.File(file_name, "r") as hdf: + if "queue_id" in hdf: + return cloudpickle.loads(np.void(hdf["/queue_id"])) + else: + return None diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index 040bbe51..a78e2e8d 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -1,5 +1,6 @@ import inspect import multiprocessing +import os.path from concurrent.futures import Executor from typing import Callable, List, Optional @@ -188,3 +189,10 @@ def validate_number_of_cores( elif max_cores is not None and max_workers is None: max_workers = int(max_cores / cores_per_worker) return max_workers + + +def check_file_exists(file_name: str): + if file_name is None: + raise ValueError("file_name is not set.") + if not os.path.exists(file_name): + raise ValueError("file_name is not written to the file system.") diff --git a/tests/test_cache_executor_mpi.py b/tests/test_cache_executor_mpi.py index 0b8a657b..e30dd699 100644 --- a/tests/test_cache_executor_mpi.py +++ b/tests/test_cache_executor_mpi.py @@ -3,7 +3,7 @@ import shutil import unittest -from executorlib.standalone.cache.spawner import execute_in_subprocess +from executorlib.cache.subprocess_spawner import execute_in_subprocess try: diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index c5962aac..bb3a6967 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -4,14 +4,14 @@ import shutil import unittest -from executorlib.standalone.cache.spawner import ( +from executorlib.cache.subprocess_spawner import ( execute_in_subprocess, terminate_subprocess, ) from executorlib.standalone.thread import RaisingThread try: - from executorlib.cache.executor import FileExecutor + from executorlib.cache.executor import FileExecutor, create_file_executor from executorlib.cache.shared import execute_tasks_h5 skip_h5py_test = False @@ -46,6 +46,12 @@ def test_executor_dependence_mixed(self): self.assertEqual(fs2.result(), 4) self.assertTrue(fs2.done()) + def test_create_file_executor_error(self): + with self.assertRaises(ValueError): + create_file_executor(block_allocation=True) + with self.assertRaises(ValueError): + create_file_executor(init_function=True) + def test_executor_dependence_error(self): with self.assertRaises(ValueError): with FileExecutor( @@ -163,7 +169,7 @@ def test_executor_function_dependence_args(self): "future_queue": q, "cache_directory": cache_dir, "execute_function": execute_in_subprocess, - "resource_dict": {"cores": 1, "cwd": None}, + "resource_dict": {"cores": 1}, "terminate_function": terminate_subprocess, }, ) @@ -176,9 +182,11 @@ def test_executor_function_dependence_args(self): def test_execute_in_subprocess_errors(self): with self.assertRaises(ValueError): - execute_in_subprocess(command=[], config_directory="test") + execute_in_subprocess( + file_name=__file__, command=[], config_directory="test" + ) with self.assertRaises(ValueError): - execute_in_subprocess(command=[], backend="flux") + execute_in_subprocess(file_name=__file__, command=[], backend="flux") def tearDown(self): if os.path.exists("cache"): diff --git a/tests/test_cache_hdf.py b/tests/test_cache_hdf.py index a13a4d9d..9a25fef1 100644 --- a/tests/test_cache_hdf.py +++ b/tests/test_cache_hdf.py @@ -4,7 +4,7 @@ try: - from executorlib.standalone.hdf import dump, load, get_output + from executorlib.standalone.hdf import dump, load, get_output, get_queue_id skip_h5py_test = False except ImportError: @@ -60,12 +60,32 @@ def test_hdf_kwargs(self): b = 2 dump( file_name=file_name, - data_dict={"fn": my_funct, "args": (), "kwargs": {"a": a, "b": b}}, + data_dict={ + "fn": my_funct, + "args": (), + "kwargs": {"a": a, "b": b}, + "queue_id": 123, + }, ) data_dict = load(file_name=file_name) self.assertTrue("fn" in data_dict.keys()) self.assertEqual(data_dict["args"], ()) self.assertEqual(data_dict["kwargs"], {"a": a, "b": b}) + self.assertEqual(get_queue_id(file_name=file_name), 123) + flag, output = get_output(file_name=file_name) + self.assertFalse(flag) + self.assertIsNone(output) + + def test_hdf_queue_id(self): + cache_directory = os.path.abspath("cache") + os.makedirs(cache_directory, exist_ok=True) + file_name = os.path.join(cache_directory, "test_queue.h5") + queue_id = 123 + dump( + file_name=file_name, + data_dict={"queue_id": queue_id}, + ) + self.assertEqual(get_queue_id(file_name=file_name), 123) flag, output = get_output(file_name=file_name) self.assertFalse(flag) self.assertIsNone(output) diff --git a/tests/test_pysqa_subprocess.py b/tests/test_pysqa_subprocess.py index f2906527..54742d70 100644 --- a/tests/test_pysqa_subprocess.py +++ b/tests/test_pysqa_subprocess.py @@ -1,7 +1,7 @@ import unittest try: - from executorlib.standalone.cache.queue import _pysqa_execute_command + from executorlib.cache.queue_spawner import _pysqa_execute_command skip_pysqa_test = False except ImportError: diff --git a/tests/test_shared_input_check.py b/tests/test_shared_input_check.py index 5e3b0766..b1b18b60 100644 --- a/tests/test_shared_input_check.py +++ b/tests/test_shared_input_check.py @@ -17,6 +17,7 @@ check_max_workers_and_cores, check_hostname_localhost, check_pysqa_config_directory, + check_file_exists, validate_number_of_cores, ) @@ -97,6 +98,12 @@ def test_check_pysqa_config_directory(self): with self.assertRaises(ValueError): check_pysqa_config_directory(pysqa_config_directory="path/to/config") + def test_check_file_exists(self): + with self.assertRaises(ValueError): + check_file_exists(file_name=None) + with self.assertRaises(ValueError): + check_file_exists(file_name="/path/does/not/exist") + def test_validate_number_of_cores(self): with self.assertRaises(ValueError): validate_number_of_cores(