-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Queuing system submission: check if the job is already waiting in the queue or currently running. #499
Conversation
… queue or currently running.
WalkthroughThe changes in this pull request primarily involve refactoring import paths and modifying the Changes
Possibly related PRs
Warning Rate limit exceeded@jan-janssen has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 11 minutes and 49 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
🧹 Outside diff range and nitpick comments (14)
tests/test_cache_executor_mpi.py (1)
Line range hint
1-47
: Add test coverage for queue checking functionality.While the existing tests cover MPI execution, there's no test coverage for the new queue checking functionality mentioned in the PR title (checking if jobs are already waiting/running).
Would you like me to help create test cases that verify:
- Detection of already queued jobs
- Detection of currently running jobs
- Proper handling of duplicate job submissions
This would help ensure the new functionality works as intended.
tests/test_pysqa_subprocess.py (1)
Line range hint
13-45
: Add test coverage for queue checking functionalityThe test suite currently lacks coverage for the main PR objective: checking if jobs are already waiting in the queue or currently running. While the existing command execution tests are valuable, we should add test cases for:
- Detecting jobs already in queue
- Detecting currently running jobs
- Edge cases in queue status checking
Would you like me to help create test cases for these scenarios? I can provide a template with test methods covering:
- Job already exists in queue
- Job is currently running
- Queue checking with invalid job IDs
- Edge cases (empty queue, multiple matches, etc.)
executorlib/cache/subprocess_spawner.py (4)
1-6
: Remove unused importThe
os.path
import is not used in this file and can be safely removed.-import os.path import subprocess import time from typing import Optional from executorlib.standalone.inputcheck import check_file_exists
🧰 Tools
🪛 Ruff
1-1:
os.path
imported but unusedRemove unused import:
os.path
(F401)
Line range hint
10-17
: Fix mutable default argumentUsing an empty list as a default argument can lead to unexpected behavior because it's mutable and shared between function calls. Use
None
as the default and initialize the list inside the function.def execute_in_subprocess( command: list, - task_dependent_lst: list = [], + task_dependent_lst: Optional[list] = None, file_name: Optional[str] = None, resource_dict: Optional[dict] = None, config_directory: Optional[str] = None, backend: Optional[str] = None, cache_directory: Optional[str] = None,🧰 Tools
🪛 Ruff
1-1:
os.path
imported but unusedRemove unused import:
os.path
(F401)
11-11: Do not use mutable data structures for argument defaults
Replace with
None
; initialize within function(B006)
24-24
: Consider adding type validation for HDF5 filesThe docstring specifies that
file_name
should be an HDF5 file, but this isn't enforced in the type hints or validation. Consider adding a custom type or validation to ensure the correct file type.
Line range hint
9-56
: Implementation doesn't fully address PR objectivesThe PR's objective is to "check if the job is already waiting in the queue or currently running", but the current changes only implement file existence checking. Consider adding queue status verification before spawning a new subprocess.
Some considerations for implementation:
- Track running processes in a shared state
- Add queue status verification before process creation
- Handle cases where duplicate jobs are detected
Would you like me to help design and implement the queue verification functionality?
🧰 Tools
🪛 Ruff
1-1:
os.path
imported but unusedRemove unused import:
os.path
(F401)
11-11: Do not use mutable data structures for argument defaults
Replace with
None
; initialize within function(B006)
executorlib/standalone/hdf.py (2)
21-21
: Consider adding type validation for queue_idWhile the addition of queue_id storage is good, the dump function should validate that the queue_id is an integer when present in the data_dict to maintain data consistency.
Consider adding validation before storing:
def dump(file_name: str, data_dict: dict) -> None: + if "queue_id" in data_dict and not isinstance(data_dict["queue_id"], (int, type(None))): + raise TypeError("queue_id must be an integer or None") group_dict = {
Line range hint
1-81
: Consider implementing a context manager for HDF5 file operationsThe file contains multiple functions that handle HDF5 file operations with similar patterns. To improve maintainability and ensure consistent error handling, consider implementing a context manager class that encapsulates the common HDF5 file operations.
This would:
- Centralize error handling
- Ensure consistent file access patterns
- Reduce code duplication
- Make it easier to add new operations in the future
Example structure:
class HDF5Handler: def __init__(self, file_name: str): self.file_name = file_name def __enter__(self): self.file = h5py.File(self.file_name, "a") return self def __exit__(self, exc_type, exc_val, exc_tb): self.file.close() def get_queue_id(self) -> Optional[int]: # Implementation here def dump(self, data_dict: dict) -> None: # Implementation hereexecutorlib/cache/executor.py (1)
Line range hint
27-33
: Document the queue checking behaviorSince the executor now uses
execute_with_pysqa
by default, which presumably includes queue checking functionality, this behavior should be documented in the class docstring.Add information about queue checking to the docstring:
def __init__( self, cache_directory: str = "cache", resource_dict: Optional[dict] = None, execute_function: callable = execute_with_pysqa, terminate_function: Optional[callable] = None, pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, disable_dependencies: bool = False, ): """ Initialize the FileExecutor. Args: cache_directory (str, optional): The directory to store cache files. Defaults to "cache". resource_dict (dict): A dictionary of resources required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call - cwd (str/None): current working directory where the parallel python task is executed execute_function (callable, optional): The function to execute tasks. Defaults to execute_with_pysqa. + When using execute_with_pysqa, it checks if the job is already waiting in the queue + or currently running to prevent duplicate submissions. terminate_function (callable, optional): The function to terminate the tasks. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). backend (str, optional): name of the backend used to spawn tasks. disable_dependencies (boolean): Disable resolving future objects during the submission. """tests/test_cache_executor_serial.py (2)
Line range hint
142-146
: Add test coverage for queue state verificationGiven the PR objective to "check if the job is already waiting in the queue or currently running", consider adding test cases to verify:
- Detection of already queued jobs
- Detection of currently running jobs
- Queue state verification
Example test structure:
def test_detect_queued_job(self): with FileExecutor(execute_function=execute_in_subprocess) as exe: # Submit first job fs1 = exe.submit(my_funct, 1, b=2) # Attempt to submit identical job fs2 = exe.submit(my_funct, 1, b=2) # Verify queue detection behavior def test_detect_running_job(self): with FileExecutor(execute_function=execute_in_subprocess) as exe: # Submit long-running job fs1 = exe.submit(time.sleep, 2) # Verify job is detected as running # Attempt to submit identical job # Verify running job detection behaviorWould you like me to help implement these test cases or create a GitHub issue to track this task?
Line range hint
147-150
: Enhance error handling test coverageThe current error handling tests only cover basic validation. Consider adding test cases for queue-related error scenarios:
- Invalid queue states
- Queue connection errors
- Queue submission failures
Example test structure:
def test_queue_error_handling(self): with FileExecutor(execute_function=execute_in_subprocess) as exe: # Test invalid queue configuration with self.assertRaises(QueueConfigError): exe.submit(my_funct, 1, b=2, queue_config={'invalid': 'config'}) # Test queue connection failure with self.assertRaises(QueueConnectionError): exe.submit(my_funct, 1, b=2, queue_endpoint='invalid_endpoint') # Test queue submission failure with self.assertRaises(QueueSubmissionError): exe.submit(my_funct, 1, b=2, queue_params={'invalid': 'params'})executorlib/standalone/inputcheck.py (1)
192-193
: Remove extra blank linesThere are two consecutive blank lines before the function definition, which is inconsistent with the rest of the file.
- - def check_file_exists(file_name: str): + def check_file_exists(file_name: str):executorlib/cache/queue_spawner.py (2)
12-15
: Improve type annotations for clarity and consistencyTo enhance readability and follow best practices, use type hints from the
typing
module. Replace built-in types with those fromtyping
, and ensure consistency across annotations.Apply this diff to update the type annotations:
from typing import List, Optional, Tuple, Union def execute_with_pysqa( - command: list, - task_dependent_lst: list[int] = [], + command: List[str], + task_dependent_lst: Optional[List[int]] = None, file_name: Optional[str] = None, - resource_dict: Optional[dict] = None, + resource_dict: Optional[Dict[str, Any]] = None, ... ): + if task_dependent_lst is None: + task_dependent_lst = []Ensure you import any additional types like
Dict
andAny
from thetyping
module.🧰 Tools
🪛 Ruff
13-13: Do not use mutable data structures for argument defaults
Replace with
None
; initialize within function(B006)
Line range hint
23-36
: Update the docstrings to reflect optional parameters and default valuesSome parameters are marked as optional in the function signature but are not indicated as such in the docstrings. Additionally, default values have changed and should be documented accordingly.
Modify the docstrings for clarity:
Args: command (list): The command to be executed. - task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. + task_dependent_lst (list, optional): A list of subprocesses that the current subprocess depends on. Defaults to None. + file_name (str, optional): Name of the HDF5 file which contains the Python function. + resource_dict (dict, optional): Resource dictionary defining execution resources. config_directory (str, optional): Path to the config directory. backend (str, optional): Name of the backend used to spawn tasks. cache_directory (str, optional): The directory to store the HDF5 files. Returns: - int: queuing system ID + int: Queuing system ID.🧰 Tools
🪛 Ruff
13-13: Do not use mutable data structures for argument defaults
Replace with
None
; initialize within function(B006)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (8)
executorlib/cache/executor.py
(2 hunks)executorlib/cache/queue_spawner.py
(2 hunks)executorlib/cache/subprocess_spawner.py
(3 hunks)executorlib/standalone/hdf.py
(3 hunks)executorlib/standalone/inputcheck.py
(2 hunks)tests/test_cache_executor_mpi.py
(1 hunks)tests/test_cache_executor_serial.py
(1 hunks)tests/test_pysqa_subprocess.py
(1 hunks)
🧰 Additional context used
🪛 Ruff
executorlib/cache/queue_spawner.py
13-13: Do not use mutable data structures for argument defaults
Replace with None
; initialize within function
(B006)
executorlib/cache/subprocess_spawner.py
1-1: os.path
imported but unused
Remove unused import: os.path
(F401)
11-11: Do not use mutable data structures for argument defaults
Replace with None
; initialize within function
(B006)
🔇 Additional comments (6)
tests/test_pysqa_subprocess.py (1)
4-4
: Verify consistent import path updates across the codebase
The import path has been updated from executorlib.standalone.cache.queue
to executorlib.cache.queue_spawner
, which appears to be part of a module reorganization.
Let's verify that all imports have been consistently updated:
✅ Verification successful
The results show that the old import path is not found anywhere in the codebase, and the new import path is consistently used in two files: the test file and executorlib/cache/executor.py
. Let me verify the module structure to ensure this reorganization is complete.
Import path changes are consistent with module reorganization
The verification confirms:
- The old module path
executorlib.standalone.cache.queue
has been completely removed - The new module
queue_spawner.py
exists at the correct location - Only two files use the new import path, and they do so consistently
- No lingering references to the old path exist in the codebase
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any remaining old import paths and verify new import usage
# Expected: No occurrences of old import path, consistent usage of new path
# Check for any remaining old import paths
echo "Checking for old import paths..."
rg "from executorlib.standalone.cache.queue import"
# Check usage of new import path
echo "Checking new import path usage..."
rg "from executorlib.cache.queue_spawner import"
Length of output: 455
Script:
#!/bin/bash
# Check if the old module structure exists and verify the new module structure
echo "Checking module structure..."
fd -t f -p "executorlib/standalone/cache/queue*"
echo "Checking new module location..."
fd -t f -p "executorlib/cache/queue_spawner*"
# Check for any other potential references to the old path
echo "Checking for any other references to old path..."
rg "executorlib.standalone.cache.queue"
Length of output: 420
executorlib/cache/executor.py (2)
6-8
: LGTM: Import path refactoring
The import path changes look good, moving from standalone.cache to the main cache package improves the module organization.
20-23
: Verify queue checking implementation in queue_spawner
The PR aims to check for duplicate jobs in the queue, but the actual implementation of this check would be in the execute_with_pysqa
function. Let's verify its implementation.
tests/test_cache_executor_serial.py (1)
7-10
: Verify consistent import path updates across the codebase
The import path has been updated from executorlib.standalone.cache.spawner
to executorlib.cache.subprocess_spawner
. Let's ensure this change is consistent across all files.
✅ Verification successful
The results show that the old import path executorlib.standalone.cache.spawner
is not found anywhere in the codebase, while the new import path executorlib.cache.subprocess_spawner
is consistently used in three files. Let me verify if these are all the relevant files that should have this import by checking for usage of the imported functions.
Import path changes are consistent and complete
The import path change from executorlib.standalone.cache.spawner
to executorlib.cache.subprocess_spawner
is correctly implemented across the codebase. The verification shows:
- No instances of the old import path remain
- The new import path is used in all files that need these functions
- The functions are defined in
executorlib/cache/subprocess_spawner.py
and used appropriately in test files
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any remaining old import paths and verify new import paths
# are used consistently across the codebase
echo "Checking for old import paths..."
rg "from executorlib.standalone.cache.spawner import"
echo "Verifying new import paths..."
rg "from executorlib.cache.subprocess_spawner import"
Length of output: 527
Script:
#!/bin/bash
# Check for usage of execute_in_subprocess and terminate_subprocess functions
echo "Checking for execute_in_subprocess usage..."
rg "execute_in_subprocess\(" -A 2
echo -e "\nChecking for terminate_subprocess usage..."
rg "terminate_subprocess\(" -A 2
Length of output: 1070
executorlib/standalone/inputcheck.py (2)
3-3
: LGTM!
The addition of os.path
import is appropriate for the new file existence checking functionality.
194-198
: Verify integration with queue checking functionality
While the function correctly implements file existence checking, let's verify how it integrates with the queue checking functionality mentioned in the PR title.
✅ Verification successful
Let's gather more information about how check_file_exists
is used in the queue-related context.
Based on the findings, I can now generate the final response:
Function properly integrates with queue checking functionality
The check_file_exists
function is correctly integrated into the queue system:
- Used in
queue_spawner.py
to validate input files before getting queue IDs - Part of the workflow where job status is checked via
qa.get_status_of_job(process_id=queue_id)
- Serves as a prerequisite validation step before queue operations
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check how check_file_exists is used in queue-related code
# Search for references to check_file_exists in queue-related files
echo "Searching for check_file_exists usage in queue-related files:"
rg -l "check_file_exists" | grep -i "queue"
# Search for queue status checking logic
echo -e "\nSearching for queue status checking implementation:"
rg -l "queue" | xargs rg -l "status|running|waiting"
Length of output: 668
Script:
#!/bin/bash
# Check the implementation in queue_spawner.py and its usage of check_file_exists
echo "Content of queue_spawner.py showing check_file_exists usage:"
rg -A 5 -B 5 "check_file_exists" executorlib/cache/queue_spawner.py
echo -e "\nQueue status checking implementation in queue_spawner.py:"
rg -A 10 "status|running|waiting" executorlib/cache/queue_spawner.py
echo -e "\nContent of standalone/queue.py for queue status logic:"
rg -A 10 "status|running|waiting" executorlib/standalone/queue.py
Length of output: 2182
@@ -3,7 +3,7 @@ | |||
import shutil | |||
import unittest | |||
|
|||
from executorlib.standalone.cache.spawner import execute_in_subprocess | |||
from executorlib.cache.subprocess_spawner import execute_in_subprocess |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Import path refactoring needs attention
The verification revealed mixed results regarding the import path refactoring:
- The new path
executorlib/cache/subprocess_spawner.py
exists and contains the requiredexecute_in_subprocess
function - However, the module still imports from the old path:
from executorlib.standalone.inputcheck import check_file_exists
- Many files (27+) are still using imports from
executorlib.standalone
This suggests the refactoring is incomplete and needs to be addressed consistently across the codebase.
🔗 Analysis chain
LGTM! Verify import path refactoring.
The import path update aligns with the module restructuring.
Let's verify the new import path exists and the old one is fully removed:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify import path refactoring is complete
# Expected: New path should exist, old path should be removed
# Check if new module exists
fd -e py "subprocess_spawner.py" executorlib/cache/
# Verify old module is removed
fd -e py "spawner.py" executorlib/standalone/cache/
# Check for any remaining references to old path
rg "executorlib\.standalone\.cache\.spawner"
Length of output: 307
Script:
#!/bin/bash
# Let's verify the import path changes more thoroughly
# 1. Check the content of the new module to ensure it has the required function
echo "=== Content of new subprocess_spawner.py ==="
cat executorlib/cache/subprocess_spawner.py
# 2. Search for any other files still using old import path pattern
echo -e "\n=== Files with old import pattern ==="
rg -l "from executorlib.standalone"
# 3. Check if there are any other references to the old path structure
echo -e "\n=== References to standalone in paths ==="
rg "standalone.*cache"
# 4. Verify the function is actually used in the test file
echo -e "\n=== Usage in test file ==="
cat tests/test_cache_executor_mpi.py
Length of output: 4882
def get_queue_id(file_name: str) -> Optional[int]: | ||
with h5py.File(file_name, "r") as hdf: | ||
if "queue_id" in hdf: | ||
return cloudpickle.loads(np.void(hdf["/queue_id"])) | ||
else: | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add documentation and improve error handling
The new function needs documentation and should handle potential file access errors consistently with other functions in this module.
Consider these improvements:
def get_queue_id(file_name: str) -> Optional[int]:
+ """
+ Retrieve the queue ID from the HDF5 file if available
+
+ Args:
+ file_name (str): file name of the HDF5 file as absolute path
+
+ Returns:
+ Optional[int]: queue ID if available, None otherwise
+
+ Raises:
+ OSError: If the file cannot be accessed
+ TypeError: If the stored queue_id is not an integer
+ """
with h5py.File(file_name, "r") as hdf:
if "queue_id" in hdf:
- return cloudpickle.loads(np.void(hdf["/queue_id"]))
+ queue_id = cloudpickle.loads(np.void(hdf["/queue_id"]))
+ if not isinstance(queue_id, (int, type(None))):
+ raise TypeError("Stored queue_id must be an integer or None")
+ return queue_id
else:
return None
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def get_queue_id(file_name: str) -> Optional[int]: | |
with h5py.File(file_name, "r") as hdf: | |
if "queue_id" in hdf: | |
return cloudpickle.loads(np.void(hdf["/queue_id"])) | |
else: | |
return None | |
def get_queue_id(file_name: str) -> Optional[int]: | |
""" | |
Retrieve the queue ID from the HDF5 file if available | |
Args: | |
file_name (str): file name of the HDF5 file as absolute path | |
Returns: | |
Optional[int]: queue ID if available, None otherwise | |
Raises: | |
OSError: If the file cannot be accessed | |
TypeError: If the stored queue_id is not an integer | |
""" | |
with h5py.File(file_name, "r") as hdf: | |
if "queue_id" in hdf: | |
queue_id = cloudpickle.loads(np.void(hdf["/queue_id"])) | |
if not isinstance(queue_id, (int, type(None))): | |
raise TypeError("Stored queue_id must be an integer or None") | |
return queue_id | |
else: | |
return None |
def check_file_exists(file_name: str): | ||
if file_name is None: | ||
raise ValueError("file_name is not set.") | ||
if not os.path.exists(file_name): | ||
raise ValueError("file_name is not written to the file system.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add type hints and improve error messages
The function should follow Python typing best practices and provide more descriptive error messages.
-def check_file_exists(file_name: str):
+def check_file_exists(file_name: str) -> None:
if file_name is None:
- raise ValueError("file_name is not set.")
+ raise ValueError("The file_name parameter must not be None.")
if not os.path.exists(file_name):
- raise ValueError("file_name is not written to the file system.")
+ raise ValueError(f"The file '{file_name}' does not exist in the file system.")
The changes:
- Added return type annotation to match other validation functions
- Improved error messages to be more descriptive and include the actual filename in the error
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def check_file_exists(file_name: str): | |
if file_name is None: | |
raise ValueError("file_name is not set.") | |
if not os.path.exists(file_name): | |
raise ValueError("file_name is not written to the file system.") | |
def check_file_exists(file_name: str) -> None: | |
if file_name is None: | |
raise ValueError("The file_name parameter must not be None.") | |
if not os.path.exists(file_name): | |
raise ValueError(f"The file '{file_name}' does not exist in the file system.") |
config_directory: Optional[str] = None, | ||
backend: Optional[str] = None, | ||
cache_directory: Optional[str] = None, | ||
) -> int: | ||
) -> Tuple[int, int]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct the return type annotation to match the actual return value
The function's return type annotation specifies Tuple[int, int]
, but it only returns a single int
(queue_id
). To prevent confusion, update the return type annotation to int
to match the actual return value.
Apply this diff to fix the return type annotation:
) -> Tuple[int, int]:
+ ) -> int:
"""
Execute a command by submitting it to the queuing system
...
return queue_id
Also applies to: 73-73
if resource_dict is None: | ||
resource_dict = {} | ||
if "cwd" in resource_dict and resource_dict["cwd"] is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid mutating input parameters to prevent side effects
Modifying the resource_dict
in place can lead to unexpected behavior if it's passed from outside the function. To prevent unintended side effects, make a shallow copy of resource_dict
before making changes.
Apply this diff to avoid mutating the original dictionary:
if resource_dict is None:
resource_dict = {}
+else:
+ resource_dict = resource_dict.copy()
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if resource_dict is None: | |
resource_dict = {} | |
if "cwd" in resource_dict and resource_dict["cwd"] is not None: | |
if resource_dict is None: | |
resource_dict = {} | |
else: | |
resource_dict = resource_dict.copy() | |
if "cwd" in resource_dict and resource_dict["cwd"] is not None: |
check_file_exists(file_name=file_name) | ||
queue_id = get_queue_id(file_name=file_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure file_name
is not None
before proceeding
The file_name
parameter is optional, but check_file_exists
and get_queue_id
require a valid file name. If file_name
is None
, these functions may raise exceptions. Add a check to validate file_name
before using it.
Apply this diff to add the validation:
+ if file_name is None:
+ raise ValueError("file_name must be provided")
check_file_exists(file_name=file_name)
queue_id = get_queue_id(file_name=file_name)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
check_file_exists(file_name=file_name) | |
queue_id = get_queue_id(file_name=file_name) | |
if file_name is None: | |
raise ValueError("file_name must be provided") | |
check_file_exists(file_name=file_name) | |
queue_id = get_queue_id(file_name=file_name) |
if "cwd" in resource_dict and resource_dict["cwd"] is not None: | ||
cwd = resource_dict["cwd"] | ||
else: | ||
cwd = cache_directory | ||
submit_kwargs = { | ||
"command": " ".join(command), | ||
"dependency_list": [str(qid) for qid in task_dependent_lst], | ||
"working_directory": os.path.abspath(cwd), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential None
value for cwd
to prevent errors
If both resource_dict["cwd"]
and cache_directory
are None
, cwd
will be None
, which will cause os.path.abspath(cwd)
to raise a TypeError
. Ensure cwd
is assigned a valid path before calling os.path.abspath
.
Consider setting a default value or raising an exception if cwd
cannot be determined:
else:
cwd = cache_directory
+ if cwd is None:
+ raise ValueError("Working directory (cwd) must be specified.")
submit_kwargs = {
"command": " ".join(command),
"dependency_list": [str(qid) for qid in task_dependent_lst],
"working_directory": os.path.abspath(cwd),
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if "cwd" in resource_dict and resource_dict["cwd"] is not None: | |
cwd = resource_dict["cwd"] | |
else: | |
cwd = cache_directory | |
submit_kwargs = { | |
"command": " ".join(command), | |
"dependency_list": [str(qid) for qid in task_dependent_lst], | |
"working_directory": os.path.abspath(cwd), | |
if "cwd" in resource_dict and resource_dict["cwd"] is not None: | |
cwd = resource_dict["cwd"] | |
else: | |
cwd = cache_directory | |
if cwd is None: | |
raise ValueError("Working directory (cwd) must be specified.") | |
submit_kwargs = { | |
"command": " ".join(command), | |
"dependency_list": [str(qid) for qid in task_dependent_lst], | |
"working_directory": os.path.abspath(cwd), |
resource_dict: dict, | ||
task_dependent_lst: List[int] = [], | ||
command: list, | ||
task_dependent_lst: list[int] = [], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid mutable default arguments for function parameters
Using mutable default arguments like lists can lead to unexpected behavior because the default value is shared across all function calls. In this case, task_dependent_lst: list[int] = []
should be replaced with task_dependent_lst: Optional[List[int]] = None
, and initialized within the function.
Apply this diff to fix the issue:
def execute_with_pysqa(
command: list,
- task_dependent_lst: list[int] = [],
+ task_dependent_lst: Optional[List[int]] = None,
file_name: Optional[str] = None,
resource_dict: Optional[dict] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
cache_directory: Optional[str] = None,
) -> Tuple[int, int]:
+ if task_dependent_lst is None:
+ task_dependent_lst = []
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
task_dependent_lst: list[int] = [], | |
def execute_with_pysqa( | |
command: list, | |
task_dependent_lst: Optional[List[int]] = None, | |
file_name: Optional[str] = None, | |
resource_dict: Optional[dict] = None, | |
config_directory: Optional[str] = None, | |
backend: Optional[str] = None, | |
cache_directory: Optional[str] = None, | |
) -> Tuple[int, int]: | |
if task_dependent_lst is None: | |
task_dependent_lst = [] |
🧰 Tools
🪛 Ruff
13-13: Do not use mutable data structures for argument defaults
Replace with None
; initialize within function
(B006)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
executorlib/cache/shared.py (1)
Line range hint
130-134
: Consider enhancing the job deduplication mechanism.While using the file system to check for duplicate jobs works, consider these improvements for robustness:
- Use a dedicated job registry (e.g., SQLite) instead of relying solely on file existence
- Implement unique job IDs for better tracking
- Add explicit error handling for file system issues
- Consider adding a cleanup mechanism for orphaned files
This would make the system more resilient to concurrent access and easier to maintain.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
executorlib/cache/shared.py
(1 hunks)
🔇 Additional comments (1)
executorlib/cache/shared.py (1)
130-130
: 🛠️ Refactor suggestion
Update documentation and add path validation for the new file_name parameter.
The addition of the file_name
parameter needs to be properly documented and validated:
- The docstring of
execute_tasks_h5
should be updated to document the new parameter being passed toexecute_function
- Consider adding path validation to prevent potential path traversal issues
Let's verify all callers of this function to ensure they're updated:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (4)
tests/test_cache_hdf.py (3)
63-74
: Consider grouping related assertionsWhile the test is functionally correct, consider grouping the queue_id assertion with other data_dict validations for better readability and logical flow.
self.assertTrue("fn" in data_dict.keys()) self.assertEqual(data_dict["args"], ()) self.assertEqual(data_dict["kwargs"], {"a": a, "b": b}) + self.assertEqual(data_dict["queue_id"], 123) self.assertEqual(get_queue_id(file_name=file_name), 123)
🧰 Tools
🪛 Ruff
71-71: Use
key in dict
instead ofkey in dict.keys()
Remove
.keys()
(SIM118)
79-88
: Consider adding edge cases to queue_id testsThe test verifies the basic functionality, but consider adding test cases for:
- Missing queue_id
- None/null queue_id
- Invalid queue_id types
Example addition:
def test_hdf_queue_id_edge_cases(self): cache_directory = os.path.abspath("cache") os.makedirs(cache_directory, exist_ok=True) file_name = os.path.join(cache_directory, "test_queue_edge.h5") # Test missing queue_id dump(file_name=file_name, data_dict={}) self.assertIsNone(get_queue_id(file_name=file_name)) # Test None queue_id dump(file_name=file_name, data_dict={"queue_id": None}) self.assertIsNone(get_queue_id(file_name=file_name))
71-71
: Optimize dict key lookupReplace
.keys()
with direct dictionary membership testing for better performance.- self.assertTrue("fn" in data_dict.keys()) + self.assertTrue("fn" in data_dict)🧰 Tools
🪛 Ruff
71-71: Use
key in dict
instead ofkey in dict.keys()
Remove
.keys()
(SIM118)
tests/test_shared_input_check.py (1)
101-105
: Consider adding more test cases for comprehensive coverage.While the current test cases cover basic error scenarios, consider adding tests for:
- Empty string input
- Directory path input
- File with no read permissions
- Symbolic links
Here's a suggested implementation:
def test_check_file_exists(self): with self.assertRaises(ValueError): check_file_exists(file_name=None) with self.assertRaises(ValueError): check_file_exists(file_name="/path/does/not/exist") + with self.assertRaises(ValueError): + check_file_exists(file_name="") + # Create a temporary directory for testing + with tempfile.TemporaryDirectory() as tmp_dir: + with self.assertRaises(ValueError): + check_file_exists(file_name=tmp_dir) + # Create a file with no read permissions + tmp_file = os.path.join(tmp_dir, "test.txt") + with open(tmp_file, "w") as f: + f.write("test") + os.chmod(tmp_file, 0o000) + with self.assertRaises(ValueError): + check_file_exists(file_name=tmp_file) + # Create and test a symbolic link + sym_link = os.path.join(tmp_dir, "test_link.txt") + os.symlink(tmp_file, sym_link) + with self.assertRaises(ValueError): + check_file_exists(file_name=sym_link)Don't forget to add the required imports:
import os import tempfile
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
tests/test_cache_hdf.py
(2 hunks)tests/test_shared_input_check.py
(2 hunks)
🧰 Additional context used
🪛 Ruff
tests/test_cache_hdf.py
71-71: Use key in dict
instead of key in dict.keys()
Remove .keys()
(SIM118)
🔇 Additional comments (2)
tests/test_cache_hdf.py (1)
7-7
: LGTM: Import changes are well-structured
The addition of get_queue_id
import is properly placed within the existing try-except block that handles h5py dependency.
tests/test_shared_input_check.py (1)
20-20
: LGTM!
The import statement follows the existing pattern and maintains alphabetical order.
72f8f50
to
8a06907
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (2)
tests/test_cache_executor_serial.py (2)
49-54
: Add docstring to clarify test casesWhile the test cases correctly verify error conditions, it would be helpful to add a docstring explaining why these parameter values are invalid and what the expected behavior should be.
Consider adding documentation like this:
def test_create_file_executor_error(self): + """ + Test that create_file_executor raises ValueError when: + 1. block_allocation is set to True (should be either False or a callable) + 2. init_function is set to True (should be either None or a callable) + """ with self.assertRaises(ValueError): create_file_executor(block_allocation=True)
185-189
: Consider adding more error test casesWhile the current error tests are good, consider adding more test cases to cover other potential error scenarios:
- Invalid file_name
- Non-existent file
- Empty command list
Example addition:
def test_execute_in_subprocess_errors(self): # Existing tests with self.assertRaises(ValueError): execute_in_subprocess( file_name=__file__, command=[], config_directory="test" ) with self.assertRaises(ValueError): execute_in_subprocess(file_name=__file__, command=[], backend="flux") # Additional test cases with self.assertRaises(ValueError): execute_in_subprocess(file_name=None, command=["echo", "test"]) with self.assertRaises(ValueError): execute_in_subprocess(file_name="nonexistent.py", command=["echo", "test"])
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
executorlib/cache/subprocess_spawner.py
(3 hunks)tests/test_cache_executor_serial.py
(4 hunks)
🧰 Additional context used
🪛 Ruff
executorlib/cache/subprocess_spawner.py
10-10: Do not use mutable data structures for argument defaults
Replace with None
; initialize within function
(B006)
🔇 Additional comments (4)
executorlib/cache/subprocess_spawner.py (3)
5-5
: LGTM!
The import statement is correctly placed and the imported function is used in the implementation.
11-11
: LGTM!
The file_name
parameter is well-documented and properly typed as Optional[str]
.
Also applies to: 23-23
Line range hint 9-16
: Verify function usage patterns
Let's verify how this function is used across the codebase to ensure the new optional file_name
parameter doesn't break existing calls.
✅ Verification successful
Let me gather more information about the implementation to understand how the file_name
parameter is used within the function.
Based on the implementation and test files, I can now generate the final response:
The file_name
parameter is safely optional and existing usage is compatible
- The parameter is properly defined as
Optional[str] = None
- All existing usages in tests and implementation handle the optional parameter correctly
- The only direct calls to
execute_in_subprocess
withfile_name
are in error test cases, while all other usages throughFileExecutor
handle the parameter internally - The function is used primarily through the
FileExecutor
class which manages the parameter appropriately
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check usage patterns of execute_in_subprocess
# Expected: Find all calls to verify if they handle the new file_name parameter correctly
# Search for function calls
echo "Searching for execute_in_subprocess calls:"
rg "execute_in_subprocess\(" -A 3
# Search for imports to ensure we catch all usage locations
echo -e "\nSearching for imports:"
rg "from.*subprocess_spawner.*import.*execute_in_subprocess"
Length of output: 1235
Script:
#!/bin/bash
# Check the implementation of execute_in_subprocess to understand file_name usage
echo "Checking execute_in_subprocess implementation:"
rg -A 20 "def execute_in_subprocess" executorlib/cache/subprocess_spawner.py
# Check test files for more context
echo -e "\nChecking test implementation:"
cat tests/test_cache_executor_serial.py
Length of output: 7418
🧰 Tools
🪛 Ruff
10-10: Do not use mutable data structures for argument defaults
Replace with None
; initialize within function
(B006)
tests/test_cache_executor_serial.py (1)
7-14
: LGTM: Import reorganization looks good
The imports have been logically reorganized, moving subprocess-related functions to a dedicated module and adding the new create_file_executor
import.
@@ -29,6 +33,7 @@ def execute_in_subprocess( | |||
subprocess.Popen: The subprocess object. | |||
|
|||
""" | |||
check_file_exists(file_name=file_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add None check before file existence validation
Since file_name
is optional, we should add a None check before validation.
Apply this fix:
- check_file_exists(file_name=file_name)
+ if file_name is not None:
+ check_file_exists(file_name=file_name)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
check_file_exists(file_name=file_name) | |
if file_name is not None: | |
check_file_exists(file_name=file_name) |
@@ -2,10 +2,13 @@ | |||
import time | |||
from typing import Optional | |||
|
|||
from executorlib.standalone.inputcheck import check_file_exists | |||
|
|||
|
|||
def execute_in_subprocess( | |||
command: list, | |||
task_dependent_lst: list = [], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace mutable default argument with None
Using mutable default arguments in Python can lead to unexpected behavior because the same list object is shared across function calls.
Apply this fix:
- task_dependent_lst: list = [],
+ task_dependent_lst: list = None,
And update the function body:
+ if task_dependent_lst is None:
+ task_dependent_lst = []
Committable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 Ruff
10-10: Do not use mutable data structures for argument defaults
Replace with None
; initialize within function
(B006)
@@ -163,7 +169,7 @@ def test_executor_function_dependence_args(self): | |||
"future_queue": q, | |||
"cache_directory": cache_dir, | |||
"execute_function": execute_in_subprocess, | |||
"resource_dict": {"cores": 1, "cwd": None}, | |||
"resource_dict": {"cores": 1}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Removal of 'cwd' from resource_dict is incorrect and breaks functionality
The removal of 'cwd' from resource_dict is problematic because:
- The implementation in
queue_spawner.py
andsubprocess_spawner.py
explicitly uses the 'cwd' parameter from resource_dict - Other test files like
test_cache_executor_pysqa_flux.py
still rely on this parameter - The
test_executor_working_directory
test in the same file depends on setting 'cwd' in resource_dict
🔗 Analysis chain
Verify the removal of 'cwd' from resource_dict
The 'cwd' key has been removed from the resource dictionary. While this might be intentional, we should verify that this doesn't affect the test's functionality or expected behavior.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other test files that might be affected by this change
rg -l "resource_dict.*cwd" tests/
# Check if cwd is used in the actual implementation
rg -l "resource_dict.*cwd" executorlib/
Length of output: 228
Script:
#!/bin/bash
# Check the context of resource_dict and cwd usage in the implementation files
rg "resource_dict.*cwd" executorlib/cache/subprocess_spawner.py executorlib/cache/queue_spawner.py -B 2 -A 2
# Check the context in test files to understand the changes
rg "resource_dict.*cwd" tests/test_cache_executor_serial.py tests/test_cache_executor_pysqa_flux.py -B 2 -A 2
# Look for any recent changes related to this in git history
git log -p -- tests/test_cache_executor_serial.py | grep -A 5 -B 5 "resource_dict"
Length of output: 6809
Summary by CodeRabbit
New Features
check_file_exists
andget_queue_id
functionalities.FileExecutor
updated to improve task execution behavior.file_name
added to multiple functions for better task management.Bug Fixes
Documentation
Refactor
Tests
check_file_exists
and functionality related toqueue_id
.