Skip to content

Commit

Permalink
Queuing system submission: check if the job is already waiting in the…
Browse files Browse the repository at this point in the history
… queue or currently running.
  • Loading branch information
jan-janssen committed Nov 15, 2024
1 parent 76ccca9 commit a123fc9
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 37 deletions.
4 changes: 2 additions & 2 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from executorlib.base.executor import ExecutorBase
from executorlib.cache.shared import execute_tasks_h5
from executorlib.standalone.cache.spawner import (
from executorlib.cache.subprocess_spawner import (
execute_in_subprocess,
terminate_subprocess,
)
Expand All @@ -17,7 +17,7 @@
from executorlib.standalone.thread import RaisingThread

try:
from executorlib.standalone.cache.queue import execute_with_pysqa
from executorlib.cache.queue_spawner import execute_with_pysqa
except ImportError:
# If pysqa is not available fall back to executing tasks in a subprocess
execute_with_pysqa = execute_in_subprocess
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
import os
import subprocess
from typing import List, Optional, Union
from typing import List, Optional, Union, Tuple

from pysqa import QueueAdapter

from executorlib.standalone.inputcheck import check_file_exists
from executorlib.standalone.hdf import get_queue_id, dump


def execute_with_pysqa(
command: str,
resource_dict: dict,
task_dependent_lst: List[int] = [],
command: list,
task_dependent_lst: list[int] = [],
file_name: Optional[str] = None,
resource_dict: Optional[dict] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
cache_directory: Optional[str] = None,
) -> int:
) -> Tuple[int, int]:
"""
Execute a command by submitting it to the queuing system
Args:
command (list): The command to be executed.
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
file_name (str): Name of the HDF5 file which contains the Python function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
Example resource dictionary: {
cwd: None,
Expand All @@ -30,37 +35,42 @@ def execute_with_pysqa(
Returns:
int: queuing system ID
"""
if resource_dict is None:
resource_dict = {}
if "cwd" in resource_dict and resource_dict["cwd"] is not None:
cwd = resource_dict["cwd"]
else:
cwd = cache_directory
check_file_exists(file_name=file_name)
queue_id = get_queue_id(file_name=file_name)
qa = QueueAdapter(
directory=config_directory,
queue_type=backend,
execute_command=_pysqa_execute_command,
)
submit_kwargs = {
"command": " ".join(command),
"dependency_list": [str(qid) for qid in task_dependent_lst],
"working_directory": os.path.abspath(cwd),
}
if "cwd" in resource_dict:
del resource_dict["cwd"]
unsupported_keys = [
"threads_per_core",
"gpus_per_core",
"openmpi_oversubscribe",
"slurm_cmd_args",
]
for k in unsupported_keys:
if k in resource_dict:
del resource_dict[k]
if "job_name" not in resource_dict:
resource_dict["job_name"] = "pysqa"
submit_kwargs.update(resource_dict)
return qa.submit_job(**submit_kwargs)
if queue_id is None or qa.get_status_of_job(process_id=queue_id) is None:
if resource_dict is None:
resource_dict = {}
if "cwd" in resource_dict and resource_dict["cwd"] is not None:
cwd = resource_dict["cwd"]
else:
cwd = cache_directory
submit_kwargs = {
"command": " ".join(command),
"dependency_list": [str(qid) for qid in task_dependent_lst],
"working_directory": os.path.abspath(cwd),
}
if "cwd" in resource_dict:
del resource_dict["cwd"]
unsupported_keys = [
"threads_per_core",
"gpus_per_core",
"openmpi_oversubscribe",
"slurm_cmd_args",
]
for k in unsupported_keys:
if k in resource_dict:
del resource_dict[k]
if "job_name" not in resource_dict:
resource_dict["job_name"] = "pysqa"
submit_kwargs.update(resource_dict)
queue_id = qa.submit_job(**submit_kwargs)
dump(file_name=file_name, data_dict={"queue_id": queue_id})
return queue_id


def _pysqa_execute_command(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import os.path
import subprocess
import time
from typing import Optional

from executorlib.standalone.inputcheck import check_file_exists


def execute_in_subprocess(
command: list,
task_dependent_lst: list = [],
file_name: Optional[str] = None,
resource_dict: Optional[dict] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
Expand All @@ -17,6 +21,7 @@ def execute_in_subprocess(
Args:
command (list): The command to be executed.
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
file_name (str): Name of the HDF5 file which contains the Python function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
Example resource dictionary: {
cwd: None,
Expand All @@ -29,6 +34,7 @@ def execute_in_subprocess(
subprocess.Popen: The subprocess object.
"""
check_file_exists(file_name=file_name)
while len(task_dependent_lst) > 0:
task_dependent_lst = [
task for task in task_dependent_lst if task.poll() is None
Expand Down
Empty file.
11 changes: 10 additions & 1 deletion executorlib/standalone/hdf.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Tuple
from typing import Tuple, Optional

import cloudpickle
import h5py
Expand All @@ -18,6 +18,7 @@ def dump(file_name: str, data_dict: dict) -> None:
"args": "input_args",
"kwargs": "input_kwargs",
"output": "output",
"queue_id": "queue_id",
}
with h5py.File(file_name, "a") as fname:
for data_key, data_value in data_dict.items():
Expand Down Expand Up @@ -70,3 +71,11 @@ def get_output(file_name: str) -> Tuple[bool, object]:
return True, cloudpickle.loads(np.void(hdf["/output"]))
else:
return False, None


def get_queue_id(file_name: str) -> Optional[int]:
with h5py.File(file_name, "r") as hdf:
if "queue_id" in hdf:
return cloudpickle.loads(np.void(hdf["/queue_id"]))
else:
return None
8 changes: 8 additions & 0 deletions executorlib/standalone/inputcheck.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect
import os.path
import multiprocessing
from concurrent.futures import Executor
from typing import Callable, List, Optional
Expand Down Expand Up @@ -188,3 +189,10 @@ def validate_number_of_cores(
elif max_cores is not None and max_workers is None:
max_workers = int(max_cores / cores_per_worker)
return max_workers


def check_file_exists(file_name: str):
if file_name is None:
raise ValueError("file_name is not set.")
if not os.path.exists(file_name):
raise ValueError("file_name is not written to the file system.")
2 changes: 1 addition & 1 deletion tests/test_cache_executor_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import shutil
import unittest

from executorlib.standalone.cache.spawner import execute_in_subprocess
from executorlib.cache.subprocess_spawner import execute_in_subprocess


try:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cache_executor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import shutil
import unittest

from executorlib.standalone.cache.spawner import (
from executorlib.cache.subprocess_spawner import (
execute_in_subprocess,
terminate_subprocess,
)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_pysqa_subprocess.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest

try:
from executorlib.standalone.cache.queue import _pysqa_execute_command
from executorlib.cache.queue_spawner import _pysqa_execute_command

skip_pysqa_test = False
except ImportError:
Expand Down

0 comments on commit a123fc9

Please sign in to comment.