Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queuing system submission: check if the job is already waiting in the queue or currently running. #499

Merged
merged 12 commits into from
Nov 15, 2024
Merged
4 changes: 2 additions & 2 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from executorlib.base.executor import ExecutorBase
from executorlib.cache.shared import execute_tasks_h5
from executorlib.standalone.cache.spawner import (
from executorlib.cache.subprocess_spawner import (
execute_in_subprocess,
terminate_subprocess,
)
Expand All @@ -17,7 +17,7 @@
from executorlib.standalone.thread import RaisingThread

try:
from executorlib.standalone.cache.queue import execute_with_pysqa
from executorlib.cache.queue_spawner import execute_with_pysqa
except ImportError:
# If pysqa is not available fall back to executing tasks in a subprocess
execute_with_pysqa = execute_in_subprocess
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
import os
import subprocess
from typing import List, Optional, Union
from typing import List, Optional, Tuple, Union

from pysqa import QueueAdapter

from executorlib.standalone.hdf import dump, get_queue_id
from executorlib.standalone.inputcheck import check_file_exists


def execute_with_pysqa(
command: str,
resource_dict: dict,
task_dependent_lst: List[int] = [],
command: list,
task_dependent_lst: list[int] = [],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid mutable default arguments for function parameters

Using mutable default arguments like lists can lead to unexpected behavior because the default value is shared across all function calls. In this case, task_dependent_lst: list[int] = [] should be replaced with task_dependent_lst: Optional[List[int]] = None, and initialized within the function.

Apply this diff to fix the issue:

 def execute_with_pysqa(
     command: list,
-    task_dependent_lst: list[int] = [],
+    task_dependent_lst: Optional[List[int]] = None,
     file_name: Optional[str] = None,
     resource_dict: Optional[dict] = None,
     config_directory: Optional[str] = None,
     backend: Optional[str] = None,
     cache_directory: Optional[str] = None,
 ) -> Tuple[int, int]:
+    if task_dependent_lst is None:
+        task_dependent_lst = []
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
task_dependent_lst: list[int] = [],
def execute_with_pysqa(
command: list,
task_dependent_lst: Optional[List[int]] = None,
file_name: Optional[str] = None,
resource_dict: Optional[dict] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
cache_directory: Optional[str] = None,
) -> Tuple[int, int]:
if task_dependent_lst is None:
task_dependent_lst = []
🧰 Tools
🪛 Ruff

13-13: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

file_name: Optional[str] = None,
resource_dict: Optional[dict] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
cache_directory: Optional[str] = None,
) -> int:
) -> Tuple[int, int]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Correct the return type annotation to match the actual return value

The function's return type annotation specifies Tuple[int, int], but it only returns a single int (queue_id). To prevent confusion, update the return type annotation to int to match the actual return value.

Apply this diff to fix the return type annotation:

 ) -> Tuple[int, int]:
+    ) -> int:
     """
     Execute a command by submitting it to the queuing system
     ...
     return queue_id

Also applies to: 73-73

"""
Execute a command by submitting it to the queuing system

Args:
command (list): The command to be executed.
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
file_name (str): Name of the HDF5 file which contains the Python function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
Example resource dictionary: {
cwd: None,
Expand All @@ -30,37 +35,42 @@ def execute_with_pysqa(
Returns:
int: queuing system ID
"""
if resource_dict is None:
resource_dict = {}
if "cwd" in resource_dict and resource_dict["cwd"] is not None:
cwd = resource_dict["cwd"]
else:
cwd = cache_directory
check_file_exists(file_name=file_name)
queue_id = get_queue_id(file_name=file_name)
Comment on lines +38 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure file_name is not None before proceeding

The file_name parameter is optional, but check_file_exists and get_queue_id require a valid file name. If file_name is None, these functions may raise exceptions. Add a check to validate file_name before using it.

Apply this diff to add the validation:

+    if file_name is None:
+        raise ValueError("file_name must be provided")
     check_file_exists(file_name=file_name)
     queue_id = get_queue_id(file_name=file_name)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
check_file_exists(file_name=file_name)
queue_id = get_queue_id(file_name=file_name)
if file_name is None:
raise ValueError("file_name must be provided")
check_file_exists(file_name=file_name)
queue_id = get_queue_id(file_name=file_name)

qa = QueueAdapter(
directory=config_directory,
queue_type=backend,
execute_command=_pysqa_execute_command,
)
submit_kwargs = {
"command": " ".join(command),
"dependency_list": [str(qid) for qid in task_dependent_lst],
"working_directory": os.path.abspath(cwd),
}
if "cwd" in resource_dict:
del resource_dict["cwd"]
unsupported_keys = [
"threads_per_core",
"gpus_per_core",
"openmpi_oversubscribe",
"slurm_cmd_args",
]
for k in unsupported_keys:
if k in resource_dict:
del resource_dict[k]
if "job_name" not in resource_dict:
resource_dict["job_name"] = "pysqa"
submit_kwargs.update(resource_dict)
return qa.submit_job(**submit_kwargs)
if queue_id is None or qa.get_status_of_job(process_id=queue_id) is None:
if resource_dict is None:
resource_dict = {}
if "cwd" in resource_dict and resource_dict["cwd"] is not None:
Comment on lines +46 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid mutating input parameters to prevent side effects

Modifying the resource_dict in place can lead to unexpected behavior if it's passed from outside the function. To prevent unintended side effects, make a shallow copy of resource_dict before making changes.

Apply this diff to avoid mutating the original dictionary:

 if resource_dict is None:
     resource_dict = {}
+else:
+    resource_dict = resource_dict.copy()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if resource_dict is None:
resource_dict = {}
if "cwd" in resource_dict and resource_dict["cwd"] is not None:
if resource_dict is None:
resource_dict = {}
else:
resource_dict = resource_dict.copy()
if "cwd" in resource_dict and resource_dict["cwd"] is not None:

cwd = resource_dict["cwd"]
else:
cwd = cache_directory
submit_kwargs = {
"command": " ".join(command),
"dependency_list": [str(qid) for qid in task_dependent_lst],
"working_directory": os.path.abspath(cwd),
Comment on lines +48 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential None value for cwd to prevent errors

If both resource_dict["cwd"] and cache_directory are None, cwd will be None, which will cause os.path.abspath(cwd) to raise a TypeError. Ensure cwd is assigned a valid path before calling os.path.abspath.

Consider setting a default value or raising an exception if cwd cannot be determined:

     else:
         cwd = cache_directory
+    if cwd is None:
+        raise ValueError("Working directory (cwd) must be specified.")
     submit_kwargs = {
         "command": " ".join(command),
         "dependency_list": [str(qid) for qid in task_dependent_lst],
         "working_directory": os.path.abspath(cwd),
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if "cwd" in resource_dict and resource_dict["cwd"] is not None:
cwd = resource_dict["cwd"]
else:
cwd = cache_directory
submit_kwargs = {
"command": " ".join(command),
"dependency_list": [str(qid) for qid in task_dependent_lst],
"working_directory": os.path.abspath(cwd),
if "cwd" in resource_dict and resource_dict["cwd"] is not None:
cwd = resource_dict["cwd"]
else:
cwd = cache_directory
if cwd is None:
raise ValueError("Working directory (cwd) must be specified.")
submit_kwargs = {
"command": " ".join(command),
"dependency_list": [str(qid) for qid in task_dependent_lst],
"working_directory": os.path.abspath(cwd),

}
if "cwd" in resource_dict:
del resource_dict["cwd"]
unsupported_keys = [
"threads_per_core",
"gpus_per_core",
"openmpi_oversubscribe",
"slurm_cmd_args",
]
for k in unsupported_keys:
if k in resource_dict:
del resource_dict[k]
if "job_name" not in resource_dict:
resource_dict["job_name"] = "pysqa"
submit_kwargs.update(resource_dict)
queue_id = qa.submit_job(**submit_kwargs)
dump(file_name=file_name, data_dict={"queue_id": queue_id})
return queue_id


def _pysqa_execute_command(
Expand Down
1 change: 1 addition & 0 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def execute_tasks_h5(
file_name=file_name,
cores=task_resource_dict["cores"],
),
file_name=file_name,
task_dependent_lst=task_dependent_lst,
resource_dict=task_resource_dict,
config_directory=pysqa_config_directory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import time
from typing import Optional

from executorlib.standalone.inputcheck import check_file_exists


def execute_in_subprocess(
command: list,
task_dependent_lst: list = [],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace mutable default argument with None

Using mutable default arguments in Python can lead to unexpected behavior because the same list object is shared across function calls.

Apply this fix:

-    task_dependent_lst: list = [],
+    task_dependent_lst: list = None,

And update the function body:

+    if task_dependent_lst is None:
+        task_dependent_lst = []

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff

10-10: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

file_name: Optional[str] = None,
resource_dict: Optional[dict] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
Expand All @@ -17,6 +20,7 @@ def execute_in_subprocess(
Args:
command (list): The command to be executed.
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
file_name (str): Name of the HDF5 file which contains the Python function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
Example resource dictionary: {
cwd: None,
Expand All @@ -29,6 +33,7 @@ def execute_in_subprocess(
subprocess.Popen: The subprocess object.

"""
check_file_exists(file_name=file_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add None check before file existence validation

Since file_name is optional, we should add a None check before validation.

Apply this fix:

-    check_file_exists(file_name=file_name)
+    if file_name is not None:
+        check_file_exists(file_name=file_name)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
check_file_exists(file_name=file_name)
if file_name is not None:
check_file_exists(file_name=file_name)

while len(task_dependent_lst) > 0:
task_dependent_lst = [
task for task in task_dependent_lst if task.poll() is None
Expand Down
Empty file.
11 changes: 10 additions & 1 deletion executorlib/standalone/hdf.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Tuple
from typing import Optional, Tuple

import cloudpickle
import h5py
Expand All @@ -18,6 +18,7 @@ def dump(file_name: str, data_dict: dict) -> None:
"args": "input_args",
"kwargs": "input_kwargs",
"output": "output",
"queue_id": "queue_id",
}
with h5py.File(file_name, "a") as fname:
for data_key, data_value in data_dict.items():
Expand Down Expand Up @@ -70,3 +71,11 @@ def get_output(file_name: str) -> Tuple[bool, object]:
return True, cloudpickle.loads(np.void(hdf["/output"]))
else:
return False, None


def get_queue_id(file_name: str) -> Optional[int]:
with h5py.File(file_name, "r") as hdf:
if "queue_id" in hdf:
return cloudpickle.loads(np.void(hdf["/queue_id"]))
else:
return None
8 changes: 8 additions & 0 deletions executorlib/standalone/inputcheck.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import inspect
import multiprocessing
import os.path
from concurrent.futures import Executor
from typing import Callable, List, Optional

Expand Down Expand Up @@ -188,3 +189,10 @@ def validate_number_of_cores(
elif max_cores is not None and max_workers is None:
max_workers = int(max_cores / cores_per_worker)
return max_workers


def check_file_exists(file_name: str):
if file_name is None:
raise ValueError("file_name is not set.")
if not os.path.exists(file_name):
raise ValueError("file_name is not written to the file system.")
Comment on lines +194 to +198
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add type hints and improve error messages

The function should follow Python typing best practices and provide more descriptive error messages.

-def check_file_exists(file_name: str):
+def check_file_exists(file_name: str) -> None:
     if file_name is None:
-        raise ValueError("file_name is not set.")
+        raise ValueError("The file_name parameter must not be None.")
     if not os.path.exists(file_name):
-        raise ValueError("file_name is not written to the file system.")
+        raise ValueError(f"The file '{file_name}' does not exist in the file system.")

The changes:

  1. Added return type annotation to match other validation functions
  2. Improved error messages to be more descriptive and include the actual filename in the error
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def check_file_exists(file_name: str):
if file_name is None:
raise ValueError("file_name is not set.")
if not os.path.exists(file_name):
raise ValueError("file_name is not written to the file system.")
def check_file_exists(file_name: str) -> None:
if file_name is None:
raise ValueError("The file_name parameter must not be None.")
if not os.path.exists(file_name):
raise ValueError(f"The file '{file_name}' does not exist in the file system.")

2 changes: 1 addition & 1 deletion tests/test_cache_executor_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import shutil
import unittest

from executorlib.standalone.cache.spawner import execute_in_subprocess
from executorlib.cache.subprocess_spawner import execute_in_subprocess
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Import path refactoring needs attention

The verification revealed mixed results regarding the import path refactoring:

  • The new path executorlib/cache/subprocess_spawner.py exists and contains the required execute_in_subprocess function
  • However, the module still imports from the old path: from executorlib.standalone.inputcheck import check_file_exists
  • Many files (27+) are still using imports from executorlib.standalone

This suggests the refactoring is incomplete and needs to be addressed consistently across the codebase.

🔗 Analysis chain

LGTM! Verify import path refactoring.

The import path update aligns with the module restructuring.

Let's verify the new import path exists and the old one is fully removed:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify import path refactoring is complete
# Expected: New path should exist, old path should be removed

# Check if new module exists
fd -e py "subprocess_spawner.py" executorlib/cache/

# Verify old module is removed
fd -e py "spawner.py" executorlib/standalone/cache/

# Check for any remaining references to old path
rg "executorlib\.standalone\.cache\.spawner"

Length of output: 307


Script:

#!/bin/bash
# Let's verify the import path changes more thoroughly

# 1. Check the content of the new module to ensure it has the required function
echo "=== Content of new subprocess_spawner.py ==="
cat executorlib/cache/subprocess_spawner.py

# 2. Search for any other files still using old import path pattern
echo -e "\n=== Files with old import pattern ==="
rg -l "from executorlib.standalone"

# 3. Check if there are any other references to the old path structure
echo -e "\n=== References to standalone in paths ==="
rg "standalone.*cache"

# 4. Verify the function is actually used in the test file
echo -e "\n=== Usage in test file ==="
cat tests/test_cache_executor_mpi.py

Length of output: 4882



try:
Expand Down
18 changes: 13 additions & 5 deletions tests/test_cache_executor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import shutil
import unittest

from executorlib.standalone.cache.spawner import (
from executorlib.cache.subprocess_spawner import (
execute_in_subprocess,
terminate_subprocess,
)
from executorlib.standalone.thread import RaisingThread

try:
from executorlib.cache.executor import FileExecutor
from executorlib.cache.executor import FileExecutor, create_file_executor
from executorlib.cache.shared import execute_tasks_h5

skip_h5py_test = False
Expand Down Expand Up @@ -46,6 +46,12 @@ def test_executor_dependence_mixed(self):
self.assertEqual(fs2.result(), 4)
self.assertTrue(fs2.done())

def test_create_file_executor_error(self):
with self.assertRaises(ValueError):
create_file_executor(block_allocation=True)
with self.assertRaises(ValueError):
create_file_executor(init_function=True)

def test_executor_dependence_error(self):
with self.assertRaises(ValueError):
with FileExecutor(
Expand Down Expand Up @@ -163,7 +169,7 @@ def test_executor_function_dependence_args(self):
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"resource_dict": {"cores": 1, "cwd": None},
"resource_dict": {"cores": 1},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Removal of 'cwd' from resource_dict is incorrect and breaks functionality

The removal of 'cwd' from resource_dict is problematic because:

  • The implementation in queue_spawner.py and subprocess_spawner.py explicitly uses the 'cwd' parameter from resource_dict
  • Other test files like test_cache_executor_pysqa_flux.py still rely on this parameter
  • The test_executor_working_directory test in the same file depends on setting 'cwd' in resource_dict
🔗 Analysis chain

Verify the removal of 'cwd' from resource_dict

The 'cwd' key has been removed from the resource dictionary. While this might be intentional, we should verify that this doesn't affect the test's functionality or expected behavior.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other test files that might be affected by this change
rg -l "resource_dict.*cwd" tests/
# Check if cwd is used in the actual implementation
rg -l "resource_dict.*cwd" executorlib/

Length of output: 228


Script:

#!/bin/bash
# Check the context of resource_dict and cwd usage in the implementation files
rg "resource_dict.*cwd" executorlib/cache/subprocess_spawner.py executorlib/cache/queue_spawner.py -B 2 -A 2

# Check the context in test files to understand the changes
rg "resource_dict.*cwd" tests/test_cache_executor_serial.py tests/test_cache_executor_pysqa_flux.py -B 2 -A 2

# Look for any recent changes related to this in git history
git log -p -- tests/test_cache_executor_serial.py | grep -A 5 -B 5 "resource_dict"

Length of output: 6809

"terminate_function": terminate_subprocess,
},
)
Expand All @@ -176,9 +182,11 @@ def test_executor_function_dependence_args(self):

def test_execute_in_subprocess_errors(self):
with self.assertRaises(ValueError):
execute_in_subprocess(command=[], config_directory="test")
execute_in_subprocess(
file_name=__file__, command=[], config_directory="test"
)
with self.assertRaises(ValueError):
execute_in_subprocess(command=[], backend="flux")
execute_in_subprocess(file_name=__file__, command=[], backend="flux")

def tearDown(self):
if os.path.exists("cache"):
Expand Down
24 changes: 22 additions & 2 deletions tests/test_cache_hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


try:
from executorlib.standalone.hdf import dump, load, get_output
from executorlib.standalone.hdf import dump, load, get_output, get_queue_id

skip_h5py_test = False
except ImportError:
Expand Down Expand Up @@ -60,12 +60,32 @@ def test_hdf_kwargs(self):
b = 2
dump(
file_name=file_name,
data_dict={"fn": my_funct, "args": (), "kwargs": {"a": a, "b": b}},
data_dict={
"fn": my_funct,
"args": (),
"kwargs": {"a": a, "b": b},
"queue_id": 123,
},
)
data_dict = load(file_name=file_name)
self.assertTrue("fn" in data_dict.keys())
self.assertEqual(data_dict["args"], ())
self.assertEqual(data_dict["kwargs"], {"a": a, "b": b})
self.assertEqual(get_queue_id(file_name=file_name), 123)
flag, output = get_output(file_name=file_name)
self.assertFalse(flag)
self.assertIsNone(output)

def test_hdf_queue_id(self):
cache_directory = os.path.abspath("cache")
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, "test_queue.h5")
queue_id = 123
dump(
file_name=file_name,
data_dict={"queue_id": queue_id},
)
self.assertEqual(get_queue_id(file_name=file_name), 123)
flag, output = get_output(file_name=file_name)
self.assertFalse(flag)
self.assertIsNone(output)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_pysqa_subprocess.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest

try:
from executorlib.standalone.cache.queue import _pysqa_execute_command
from executorlib.cache.queue_spawner import _pysqa_execute_command

skip_pysqa_test = False
except ImportError:
Expand Down
7 changes: 7 additions & 0 deletions tests/test_shared_input_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
check_max_workers_and_cores,
check_hostname_localhost,
check_pysqa_config_directory,
check_file_exists,
validate_number_of_cores,
)

Expand Down Expand Up @@ -97,6 +98,12 @@ def test_check_pysqa_config_directory(self):
with self.assertRaises(ValueError):
check_pysqa_config_directory(pysqa_config_directory="path/to/config")

def test_check_file_exists(self):
with self.assertRaises(ValueError):
check_file_exists(file_name=None)
with self.assertRaises(ValueError):
check_file_exists(file_name="/path/does/not/exist")

def test_validate_number_of_cores(self):
with self.assertRaises(ValueError):
validate_number_of_cores(
Expand Down
Loading