Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache: Terminate processes when closing executor #447

Merged
merged 7 commits into from
Oct 27, 2024
Merged

Conversation

jan-janssen
Copy link
Member

@jan-janssen jan-janssen commented Oct 27, 2024

Summary by CodeRabbit

  • New Features

    • Introduced a new function to facilitate the termination of subprocesses, enhancing task management.
    • Added an optional parameter for task execution that allows for graceful shutdowns.
  • Bug Fixes

    • Improved error handling during task execution and termination processes.
  • Tests

    • Updated test cases to incorporate the new termination functionality, ensuring robust execution validation.

Copy link
Contributor

coderabbitai bot commented Oct 27, 2024

Walkthrough

The pull request introduces modifications to enhance subprocess management in the executorlib package. Key changes include the addition of a terminate_function parameter in the FileExecutor class and the implementation of a new terminate_subprocess function in spawner.py. The execute_tasks_h5 function in shared.py is updated to support task termination on shutdown commands. The changes extend to test cases, integrating the new termination functionality into the TestCacheExecutorSerial class, ensuring that the enhancements are covered in the testing framework.

Changes

File Change Summary
executorlib/cache/executor.py - Added terminate_subprocess import.
- Updated FileExecutor constructor to include terminate_function parameter, defaulting to None.
- Conditional logic added to assign terminate_function when using execute_in_subprocess.
- Passed terminate_function to RaisingThread.
executorlib/cache/shared.py - Added terminate_function parameter to execute_tasks_h5.
- Updated control flow in execute_tasks_h5 for task termination.
- Removed execute_in_subprocess function.
- Adjusted import statements to remove subprocess.
executorlib/standalone/cache/spawner.py - Added execute_in_subprocess and terminate_subprocess functions for subprocess management.
tests/test_cache_executor_serial.py - Imported terminate_subprocess and execute_in_subprocess from spawner.py.
- Integrated terminate_function into kwargs for multiple test methods.
tests/test_cache_executor_mpi.py - Removed imports of execute_tasks_h5 and execute_in_subprocess from shared.py.
- Adjusted test logic to utilize FileExecutor.

Possibly related PRs

  • Move cache functionality to shared #434: The changes in the main PR regarding the FileExecutor class and the introduction of the terminate_function parameter are related to the modifications in executorlib/cache/executor.py, which also involves the execute_in_subprocess function that is now imported from a different module.
  • Delete SubprocessExecutor and ShellExecutor #440: The removal of the SubprocessExecutor and ShellExecutor classes aligns with the changes in the main PR, as both involve modifications to how subprocesses are managed and executed, particularly in relation to the FileExecutor.
  • Split shared cache in backend and frontend #443: The restructuring of cache functionality and the import paths in the backend and frontend modules may relate to the changes in task execution and management in the main PR, particularly with the focus on how tasks are executed and terminated.

🐇 In the land of code, where functions play,
A new termination, brightens the day.
With threads that now listen, to stop and to yield,
Our tasks dance in order, their fate now revealed.
So hop along, friends, let’s execute with glee,
For subprocesses now bow, to our new decree! 🌟


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

# Conflicts:
#	executorlib/cache/executor.py
#	executorlib/cache/shared.py
#	tests/test_cache_executor_serial.py
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (4)
executorlib/cache/executor.py (2)

19-19: Add documentation for the terminate_function parameter

The docstring should be updated to include documentation for the new terminate_function parameter.

         Args:
             cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
             execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
             cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1.
+            terminate_function (callable, optional): The function to terminate task execution. Defaults to terminate_subprocess when execute_function is execute_in_subprocess.

Also applies to: 22-29


42-42: LGTM: Thread initialization properly updated

The terminate_function is correctly passed to the RaisingThread kwargs.

Consider adding error handling in the RaisingThread implementation to gracefully handle cases where terminate_function is None but termination is requested. This would make the system more robust against potential misconfigurations.

tests/test_cache_executor_serial.py (1)

Line range hint 115-118: Enhance tearDown to ensure process cleanup

The current tearDown only removes the cache directory but doesn't explicitly ensure that all spawned processes are terminated. This could lead to orphaned processes if tests fail.

Consider enhancing the tearDown method:

 def tearDown(self):
+    # Ensure all test processes are terminated
+    if hasattr(self, '_current_process'):
+        terminate_subprocess(self._current_process)
     if os.path.exists("cache"):
         shutil.rmtree("cache")
executorlib/cache/shared.py (1)

Line range hint 77-93: Improve docstring for terminate_function parameter.

The docstring should indicate that the parameter is optional and describe its purpose more clearly.

-        terminate_function (callable): The function to terminate the tasks.
+        terminate_function (Optional[callable], optional): Function to terminate tasks during shutdown.
+            If provided, this function will be called for each running task when shutdown is requested.
+            Defaults to None.
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 451cc92 and 9307b01.

📒 Files selected for processing (3)
  • executorlib/cache/executor.py (4 hunks)
  • executorlib/cache/shared.py (4 hunks)
  • tests/test_cache_executor_serial.py (4 hunks)
🔇 Additional comments (5)
executorlib/cache/executor.py (2)

2-2: LGTM: Import changes are well-organized

The new imports for Optional and terminate_subprocess are properly organized and necessary for the new functionality.

Also applies to: 5-9


30-31: LGTM: Default termination function logic is sound

The conditional logic correctly pairs terminate_subprocess with execute_in_subprocess while maintaining flexibility for custom termination functions.

Let's verify that terminate_subprocess is always used with execute_in_subprocess:

✅ Verification successful

Based on the search results, I can generate the final response as I have enough information to verify the concern. The results show that:

  1. In executorlib/cache/executor.py, the default pairing is correctly implemented where terminate_subprocess is set when execute_in_subprocess is used
  2. In the test files (test_cache_executor_serial.py), whenever execute_in_subprocess is used, it's properly paired with terminate_subprocess
  3. No instances were found where execute_in_subprocess is used without terminate_subprocess

Verified: Default termination function is correctly paired

The codebase consistently pairs execute_in_subprocess with terminate_subprocess in both implementation and tests. The default termination function logic is working as intended.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any instances where execute_in_subprocess might be used without terminate_subprocess
rg -l "execute_in_subprocess" | xargs rg "execute_in_subprocess" -A 10 -B 10

Length of output: 10763

tests/test_cache_executor_serial.py (2)

11-15: LGTM: Clean import organization

The new import for terminate_subprocess is properly organized within the existing grouped import statement and correctly placed inside the try-except block.


58-58: 🛠️ Refactor suggestion

Consider adding specific tests for process termination

While the terminate_function parameter is correctly added to all test methods, there are no explicit test cases verifying the process termination behavior.

Consider adding test cases that:

  1. Verify processes are properly terminated when the executor is closed
  2. Test error cases and cleanup when processes fail to terminate
  3. Ensure no processes are left running after test completion

Example test case structure:

def test_process_termination(self):
    with FileExecutor() as exe:
        # Start a long-running task
        future = exe.submit(time.sleep, 10)
        # Verify process exists
        # Close executor
    # Verify process was terminated

Also applies to: 82-82, 106-106

executorlib/cache/shared.py (1)

6-6: LGTM: Import statements are appropriate.

The added imports support the new process termination functionality.

Also applies to: 8-8

Comment on lines +111 to +113
if terminate_function is not None:
for task in process_dict.values():
terminate_function(task=task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling and logging to termination logic.

The current implementation might leave processes running if termination fails for any task.

Consider this more robust implementation:

     if terminate_function is not None:
+        termination_errors = []
         for task in process_dict.values():
-            terminate_function(task=task)
+            try:
+                terminate_function(task=task)
+            except Exception as e:
+                termination_errors.append(f"Failed to terminate task: {e}")
+        if termination_errors:
+            raise RuntimeError(f"Errors during shutdown: {'; '.join(termination_errors)}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if terminate_function is not None:
for task in process_dict.values():
terminate_function(task=task)
if terminate_function is not None:
termination_errors = []
for task in process_dict.values():
try:
terminate_function(task=task)
except Exception as e:
termination_errors.append(f"Failed to terminate task: {e}")
if termination_errors:
raise RuntimeError(f"Errors during shutdown: {'; '.join(termination_errors)}")

Comment on lines 72 to 76
def terminate_subprocess(task):
task.terminate()
while task.poll() is None:
time.sleep(0.1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add timeout and error handling to process termination.

The current implementation has several potential issues:

  1. No timeout could lead to indefinite waiting
  2. Fixed sleep interval might not be optimal
  3. Missing error handling
  4. Missing docstring

Consider this improved implementation:

def terminate_subprocess(task):
+    """
+    Terminate a subprocess and wait for it to complete.
+    
+    Args:
+        task: The subprocess.Popen instance to terminate
+        
+    Raises:
+        TimeoutError: If process doesn't terminate within timeout
+        RuntimeError: If termination fails
+    """
    try:
        task.terminate()
-        while task.poll() is None:
-            time.sleep(0.1)
+        timeout = 30  # seconds
+        wait_start = time.time()
+        while task.poll() is None:
+            if time.time() - wait_start > timeout:
+                task.kill()  # Force kill if terminate doesn't work
+                raise TimeoutError(f"Process failed to terminate within {timeout} seconds")
+            time.sleep(min(0.1, timeout/10))  # Adaptive sleep
+    except Exception as e:
+        raise RuntimeError(f"Failed to terminate process: {e}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def terminate_subprocess(task):
task.terminate()
while task.poll() is None:
time.sleep(0.1)
def terminate_subprocess(task):
"""
Terminate a subprocess and wait for it to complete.
Args:
task: The subprocess.Popen instance to terminate
Raises:
TimeoutError: If process doesn't terminate within timeout
RuntimeError: If termination fails
"""
try:
task.terminate()
timeout = 30 # seconds
wait_start = time.time()
while task.poll() is None:
if time.time() - wait_start > timeout:
task.kill() # Force kill if terminate doesn't work
raise TimeoutError(f"Process failed to terminate within {timeout} seconds")
time.sleep(min(0.1, timeout/10)) # Adaptive sleep
except Exception as e:
raise RuntimeError(f"Failed to terminate process: {e}")

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
executorlib/cache/executor.py (2)

30-30: Enhance parameter documentation

The documentation for terminate_function could be more detailed to explain its purpose and behavior.

Consider expanding the documentation:

-            terminate_function (callable, optional): The function to terminate the tasks.
+            terminate_function (callable, optional): Function to terminate running tasks when the executor is closed.
+                If None and execute_function is execute_in_subprocess, defaults to terminate_subprocess.
+                For custom execute_functions, provide a matching terminate_function to ensure proper cleanup.

Line range hint 1-48: Consider documenting the termination behavior

The implementation of process termination is well-structured, but the behavior and timing of termination should be documented at the class level.

Consider adding class-level documentation:

  1. When termination occurs (during cleanup, on errors, etc.)
  2. How termination affects running tasks
  3. Any cleanup guarantees or limitations
  4. Examples of custom termination functions

This will help users implement custom execute/terminate function pairs correctly.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 9307b01 and 54924b5.

📒 Files selected for processing (3)
  • executorlib/cache/executor.py (4 hunks)
  • executorlib/cache/shared.py (4 hunks)
  • tests/test_cache_executor_serial.py (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/test_cache_executor_serial.py
🔇 Additional comments (8)
executorlib/cache/executor.py (4)

5-9: LGTM: Import changes are well-organized

The addition of terminate_subprocess to the imports is properly grouped with related functionality.


46-46: Verify RaisingThread implementation

The terminate_function is correctly passed to RaisingThread, but we should verify its implementation.

Let's check the RaisingThread and execute_tasks_h5 implementations:

#!/bin/bash
# Check RaisingThread implementation
ast-grep --pattern 'class RaisingThread { $$$ }'

# Check execute_tasks_h5 implementation
ast-grep --pattern 'def execute_tasks_h5($$$) { $$$ }'

33-34: 🛠️ Refactor suggestion

Consider validating custom execute_functions

While the default pairing of execute_in_subprocess with terminate_subprocess is good, there's no validation for custom execution functions.

Let's check if there are any custom execute_functions in use:

#!/bin/bash
# Search for FileExecutor instantiations with custom execute_functions
rg "FileExecutor\(.*execute_function\s*=(?!.*execute_in_subprocess)"

Consider adding validation:

         if execute_function == execute_in_subprocess and terminate_function is None:
             terminate_function = terminate_subprocess
+        elif execute_function != execute_in_subprocess and terminate_function is None:
+            raise ValueError("terminate_function must be provided when using custom execute_function")

20-20: LGTM: Parameter addition is well-typed and backward compatible

The new terminate_function parameter is properly typed and maintains backward compatibility with its default value.

Let's verify if there are any direct instantiations of FileExecutor that might need updates:

✅ Verification successful

No impact on existing instantiations - parameter is optional with safe default

The verification shows that all existing FileExecutor instantiations in the codebase use either no parameters or other named parameters (cwd, cores_per_worker). Since the new terminate_function parameter is optional with a None default value, these existing usages remain fully compatible and no updates are required.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for FileExecutor instantiations to ensure compatibility
rg -l "FileExecutor\(" | xargs rg -A 2 "FileExecutor\("

Length of output: 490


Script:

#!/bin/bash
# Search for FileExecutor instantiations with better pattern matching
rg -A 3 "FileExecutor\s*\(" --type py

# Also search for imports to understand usage
rg "from.*FileExecutor|import.*FileExecutor" --type py

Length of output: 1843

executorlib/cache/shared.py (4)

6-6: LGTM!

The time import is correctly placed with other standard library imports and is necessary for the new termination functionality.


75-79: Critical: Implement robust process termination.

The current implementation needs significant improvements for production reliability. I agree with the previous review feedback that remains unaddressed:

  1. Add timeout to prevent indefinite hanging
  2. Implement error handling for termination failures
  3. Add proper documentation
  4. Use adaptive sleep intervals

87-87: LGTM!

The terminate_function parameter is well-typed and properly documented.

Also applies to: 98-98


116-118: Critical: Add error handling to process termination.

The current implementation could leave processes running if any termination fails. I agree with the previous review that suggested collecting and reporting errors during shutdown.

Let's verify the potential impact:

#!/bin/bash
# Search for other shutdown handlers that might be affected
rg -l 'shutdown.*=.*True' 

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Outside diff range and nitpick comments (1)
executorlib/cache/executor.py (1)

46-46: Consider error handling for termination failures.

While passing the terminate_function to RaisingThread is correct, there's no visible error handling if the termination fails.

Consider implementing a try-catch block in the termination logic to handle potential failures gracefully and possibly log the errors.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 54924b5 and cf5b211.

📒 Files selected for processing (5)
  • executorlib/cache/executor.py (4 hunks)
  • executorlib/cache/shared.py (3 hunks)
  • executorlib/standalone/cache/spawner.py (1 hunks)
  • tests/test_cache_executor_mpi.py (0 hunks)
  • tests/test_cache_executor_serial.py (4 hunks)
💤 Files with no reviewable changes (1)
  • tests/test_cache_executor_mpi.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • executorlib/cache/shared.py
  • tests/test_cache_executor_serial.py
🧰 Additional context used
🪛 Ruff
executorlib/standalone/cache/spawner.py

8-8: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

🔇 Additional comments (5)
executorlib/standalone/cache/spawner.py (1)

1-39: Verify integration with FileExecutor

The implementation provides the necessary functionality for process termination, but let's verify its integration with the FileExecutor class.

✅ Verification successful

Let me gather more information about the FileExecutor implementation to ensure proper process termination.


Let me try a different approach to verify the implementation.


Let me check one final aspect - the execute_tasks_h5 implementation to see how it uses the terminate_function.


Let me check the complete implementation of execute_tasks_h5 to see the termination logic.


Implementation correctly integrates process termination

The code review confirms proper integration of the termination functionality:

  1. FileExecutor correctly imports and sets up terminate_subprocess as the default termination function
  2. execute_tasks_h5 properly handles termination by:
    • Accepting terminate_function as an optional parameter
    • Implementing shutdown logic that calls terminate_function on all active processes when shutdown is requested
  3. The termination is properly integrated into the process management flow, ensuring clean cleanup of subprocesses
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that FileExecutor properly uses the terminate_subprocess function

# Test: Check if FileExecutor imports and uses the terminate_subprocess function
rg -l "from.*spawner.*import.*terminate_subprocess" --type py
rg -A 5 "terminate_subprocess" --type py

Length of output: 2833


Script:

#!/bin/bash
# Check FileExecutor's process termination implementation
ast-grep --pattern 'class FileExecutor {
  $$$
  def _set_process($$$) {
    $$$
  }
  $$$
}'

# Check for shutdown/cleanup methods
rg -A 10 "def (shutdown|cleanup|close|terminate)" executorlib/cache/executor.py

Length of output: 177


Script:

#!/bin/bash
# Check the complete FileExecutor implementation
cat executorlib/cache/executor.py

# Also check for any test cases that verify process termination
rg -A 10 "def test.*terminate" tests/

Length of output: 2037


Script:

#!/bin/bash
# Check how execute_tasks_h5 uses the terminate_function
rg -A 20 "def execute_tasks_h5" executorlib/cache/shared.py

Length of output: 828


Script:

#!/bin/bash
# Get the complete implementation of execute_tasks_h5
cat executorlib/cache/shared.py

Length of output: 7619

🧰 Tools
🪛 Ruff

8-8: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

executorlib/cache/executor.py (4)

5-9: LGTM: Clean import organization.

The imports are well-organized, with the new terminate_subprocess import logically grouped with related functionality.


20-20: LGTM: Well-documented parameter addition.

The terminate_function parameter is properly typed as Optional[callable] and well-documented in the docstring.

Also applies to: 30-30


Line range hint 1-48: Verify thread safety of termination process.

The executor manages concurrent tasks, so we should ensure the termination process is thread-safe.

#!/bin/bash
# Description: Look for potential race conditions in thread handling
# Expected: Find synchronization mechanisms around thread termination

# Search for thread synchronization primitives
rg -A 3 "Lock|Event|Condition|Semaphore" executorlib/
# Look for concurrent access to shared resources during termination
ast-grep --pattern 'def terminate($$$) { $$$ }'

33-34: Verify the default termination behavior.

The code automatically assigns terminate_subprocess as the termination function when using execute_in_subprocess. This coupling should be tested to ensure proper process cleanup.

task_dependent_lst = [
task for task in task_dependent_lst if task.poll() is None
]
return subprocess.Popen(command, universal_newlines=True, cwd=cwd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for subprocess creation

The subprocess creation should be wrapped in a try-except block to handle potential errors.

Here's a suggested implementation:

-    return subprocess.Popen(command, universal_newlines=True, cwd=cwd)
+    try:
+        return subprocess.Popen(
+            command,
+            universal_newlines=True,
+            cwd=cwd,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE
+        )
+    except (OSError, subprocess.SubprocessError) as e:
+        raise RuntimeError(f"Failed to execute command {command}: {str(e)}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return subprocess.Popen(command, universal_newlines=True, cwd=cwd)
try:
return subprocess.Popen(
command,
universal_newlines=True,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
except (OSError, subprocess.SubprocessError) as e:
raise RuntimeError(f"Failed to execute command {command}: {str(e)}")

Comment on lines +6 to +10
def execute_in_subprocess(
command: list,
task_dependent_lst: list = [],
cwd: Optional[str] = None,
) -> subprocess.Popen:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix mutable default argument and add error handling

The function has a few potential issues that should be addressed:

Apply this diff to fix the mutable default argument and add error handling:

 def execute_in_subprocess(
     command: list,
-    task_dependent_lst: list = [],
+    task_dependent_lst: Optional[list] = None,
     cwd: Optional[str] = None,
 ) -> subprocess.Popen:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def execute_in_subprocess(
command: list,
task_dependent_lst: list = [],
cwd: Optional[str] = None,
) -> subprocess.Popen:
def execute_in_subprocess(
command: list,
task_dependent_lst: Optional[list] = None,
cwd: Optional[str] = None,
) -> subprocess.Popen:
🧰 Tools
🪛 Ruff

8-8: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

Comment on lines +23 to +27
while len(task_dependent_lst) > 0:
task_dependent_lst = [
task for task in task_dependent_lst if task.poll() is None
]
return subprocess.Popen(command, universal_newlines=True, cwd=cwd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add timeout and error handling for dependent tasks

The while loop could potentially run indefinitely if dependent tasks never complete. Consider adding a timeout mechanism and proper error handling.

Here's a suggested implementation:

+    if task_dependent_lst is None:
+        task_dependent_lst = []
+    start_time = time.time()
+    timeout = 3600  # 1 hour timeout, adjust as needed
     while len(task_dependent_lst) > 0:
+        if time.time() - start_time > timeout:
+            raise TimeoutError("Dependent tasks did not complete within the timeout period")
         task_dependent_lst = [
             task for task in task_dependent_lst if task.poll() is None
         ]
+        time.sleep(0.1)  # Add small delay to prevent CPU spinning

Committable suggestion was skipped due to low confidence.

Comment on lines +30 to +39
def terminate_subprocess(task):
"""
Terminate a subprocess and wait for it to complete.

Args:
task (subprocess.Popen): The subprocess.Popen instance to terminate
"""
task.terminate()
while task.poll() is None:
time.sleep(0.1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance subprocess termination with graceful shutdown and timeout

The current implementation could be improved with proper type hints, graceful shutdown attempt, timeout mechanism, and error handling.

Here's a suggested implementation:

-def terminate_subprocess(task):
+def terminate_subprocess(task: subprocess.Popen, timeout: float = 10.0) -> None:
     """
     Terminate a subprocess and wait for it to complete.
 
     Args:
         task (subprocess.Popen): The subprocess.Popen instance to terminate
+        timeout (float): Maximum time to wait for process termination in seconds
+
+    Raises:
+        TimeoutError: If the process doesn't terminate within the timeout period
     """
-    task.terminate()
-    while task.poll() is None:
-        time.sleep(0.1)
+    try:
+        # First attempt graceful shutdown
+        task.terminate()
+        start_time = time.time()
+        
+        while task.poll() is None:
+            if time.time() - start_time > timeout:
+                # Force kill if graceful shutdown fails
+                task.kill()
+                if task.poll() is None:
+                    raise TimeoutError(f"Failed to terminate process {task.pid} within {timeout} seconds")
+            time.sleep(0.1)
+    except Exception as e:
+        raise RuntimeError(f"Error while terminating process {task.pid}: {str(e)}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def terminate_subprocess(task):
"""
Terminate a subprocess and wait for it to complete.
Args:
task (subprocess.Popen): The subprocess.Popen instance to terminate
"""
task.terminate()
while task.poll() is None:
time.sleep(0.1)
def terminate_subprocess(task: subprocess.Popen, timeout: float = 10.0) -> None:
"""
Terminate a subprocess and wait for it to complete.
Args:
task (subprocess.Popen): The subprocess.Popen instance to terminate
timeout (float): Maximum time to wait for process termination in seconds
Raises:
TimeoutError: If the process doesn't terminate within the timeout period
"""
try:
# First attempt graceful shutdown
task.terminate()
start_time = time.time()
while task.poll() is None:
if time.time() - start_time > timeout:
# Force kill if graceful shutdown fails
task.kill()
if task.poll() is None:
raise TimeoutError(f"Failed to terminate process {task.pid} within {timeout} seconds")
time.sleep(0.1)
except Exception as e:
raise RuntimeError(f"Error while terminating process {task.pid}: {str(e)}")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant