-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Write cache first #492
Write cache first #492
Conversation
WalkthroughThe changes in this pull request primarily enhance the Changes
Possibly related PRs
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🧰 Additional context used🪛 Rufftests/test_local_executor.py21-21: (F401) 🔇 Additional comments (3)tests/test_local_executor.py (3)
The conditional import of Also applies to: 20-25
The tearDown method ensures proper cleanup of the cache directory after each test.
While the current tests cover basic functionality and error handling, consider adding tests for:
The suggestion from the previous review for implementing a cache hit test is still valid. Would you like me to help implement these additional test cases? Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (2)
tests/test_executor_backend_mpi.py (1)
Line range hint
100-124
: Make time-based assertions more robust.The current time-based assertions could be flaky due to system load or timing variations. Consider these improvements:
time_2 = time.time() - self.assertTrue(time_2 - time_1 > 1) + # Allow for some timing variation but ensure significant delay + self.assertGreater(time_2 - time_1, 0.9) time_3 = time.time() fs_2 = exe.submit(mpi_funct_sleep, 1) self.assertEqual(fs_2.result(), [(1, 2, 0), (1, 2, 1)]) self.assertTrue(fs_2.done()) time_4 = time.time() - self.assertTrue(time_3 - time_4 < 1) + # Cached result should be significantly faster + self.assertLess(time_4 - time_3, 0.1)This change:
- Uses a slightly lower threshold (0.9s) for the initial execution to account for timing variations
- Uses a reasonable upper bound (0.1s) for cached execution
- Fixes the time comparison in the cache check (was comparing in wrong order)
executorlib/interactive/shared.py (1)
627-640
: Add cache directory validationThe function assumes the cache directory is writable and has sufficient space. This could lead to runtime errors if the directory is not accessible or the disk is full.
Consider adding validation:
+def _validate_cache_directory(cache_dir: str) -> None: + """Validate cache directory is writable and has sufficient space.""" + if not os.access(cache_dir, os.W_OK): + raise PermissionError(f"Cache directory {cache_dir} is not writable") + import shutil + if shutil.disk_usage(cache_dir).free < 1024 * 1024: # 1MB minimum + raise OSError(f"Insufficient disk space in cache directory {cache_dir}") def _execute_task_with_cache(...): os.makedirs(cache_directory, exist_ok=True) + _validate_cache_directory(cache_directory)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
executorlib/interactive/shared.py
(1 hunks)tests/test_executor_backend_mpi.py
(1 hunks)
🔇 Additional comments (2)
tests/test_executor_backend_mpi.py (1)
100-100
:
Parameter naming inconsistency detected.
The change from max_cores
to max_workers
in this test method creates an inconsistency with other test methods in the file that still use max_cores
(e.g., test_meta_executor_serial
, test_meta_executor_single
). This suggests an incomplete refactoring.
Let's verify the parameter usage across the codebase:
Consider updating all test methods to use the new max_workers
parameter for consistency. Here's the list of methods that need to be updated:
test_meta_executor_serial
test_meta_executor_single
test_oversubscribe
test_errors
executorlib/interactive/shared.py (1)
634-638
: Verify cleanup on exception
The error handling path shuts down the interface but doesn't clean up any partially written cache files.
Let's verify if there are any other places in the codebase handling cache cleanup:
data_dict["output"] = result | ||
dump(file_name=file_name, data_dict=data_dict) | ||
f.set_result(result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider atomic cache writes
The current implementation writes the cache file directly, which could lead to corrupted cache files if the process is interrupted between lines 631-633.
Consider using atomic writes with a temporary file:
- data_dict["output"] = result
- dump(file_name=file_name, data_dict=data_dict)
- f.set_result(result)
+ data_dict["output"] = result
+ temp_file = file_name + '.tmp'
+ dump(file_name=temp_file, data_dict=data_dict)
+ os.replace(temp_file, file_name)
+ f.set_result(result)
Committable suggestion skipped: line range outside the PR's diff.
f = task_dict.pop("future") | ||
if f.set_running_or_notify_cancel(): | ||
try: | ||
result = interface.send_and_receive_dict(input_dict=task_dict) | ||
data_dict["output"] = result | ||
dump(file_name=file_name, data_dict=data_dict) | ||
f.set_result(result) | ||
except Exception as thread_exception: | ||
interface.shutdown(wait=True) | ||
future_queue.task_done() | ||
f.set_exception(exception=thread_exception) | ||
raise thread_exception | ||
else: | ||
future_queue.task_done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential race condition in cache writing
The cache writing operation happens after getting the result but before setting the future result. This could lead to a race condition if multiple processes try to write to the same cache file simultaneously.
Consider adding file locking mechanism:
try:
result = interface.send_and_receive_dict(input_dict=task_dict)
data_dict["output"] = result
+ import fcntl
+ with open(file_name, 'wb') as f:
+ fcntl.flock(f.fileno(), fcntl.LOCK_EX)
dump(file_name=file_name, data_dict=data_dict)
+ fcntl.flock(f.fileno(), fcntl.LOCK_UN)
f.set_result(result)
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (2)
tests/test_local_executor.py (2)
20-25
: Consider using consistent dependency checking approachThe code uses different approaches for checking library availability:
importlib.util.find_spec
for mpi4py- try-except for h5py
Consider using the same approach for consistency:
-try: - import h5py - - skip_h5py_test = False -except ImportError: - skip_h5py_test = True +skip_h5py_test = importlib.util.find_spec("h5py") is None🧰 Tools
🪛 Ruff
21-21:
h5py
imported but unused; consider usingimportlib.util.find_spec
to test for availability(F401)
485-488
: Consider adding setUp methodThe test class has a tearDown method to clean the cache directory but lacks a setUp method to ensure the directory exists and is clean before each test. This could lead to test failures if the directory doesn't exist.
Consider adding:
+def setUp(self): + os.makedirs("./cache", exist_ok=True) + def tearDown(self): shutil.rmtree("./cache")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
tests/test_local_executor.py
(3 hunks)
🧰 Additional context used
🪛 Ruff
tests/test_local_executor.py
21-21: h5py
imported but unused; consider using importlib.util.find_spec
to test for availability
(F401)
@unittest.skipIf( | ||
skip_h5py_test, "h5py is not installed, so the h5py tests are skipped." | ||
) | ||
def test_execute_task_cache(self): | ||
f = Future() | ||
q = Queue() | ||
q.put({"fn": calc, "args": (), "kwargs": {"i": 1}, "future": f}) | ||
q.put({"shutdown": True, "wait": True}) | ||
cloudpickle_register(ind=1) | ||
execute_parallel_tasks( | ||
future_queue=q, | ||
cores=1, | ||
openmpi_oversubscribe=False, | ||
spawner=MpiExecSpawner, | ||
cache_directory="./cache", | ||
) | ||
self.assertEqual(f.result(), 1) | ||
q.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance cache testing coverage
The current test only verifies basic functionality. Consider adding tests for:
- Cache hits (same input executed twice)
- Cache misses (different inputs)
- Cache behavior with errors
- Verification that results are actually coming from cache
Here's a suggested test to verify cache hits:
def test_execute_task_cache_hit(self):
# First execution
f1 = Future()
q1 = Queue()
q1.put({"fn": calc, "args": (), "kwargs": {"i": 1}, "future": f1})
q1.put({"shutdown": True, "wait": True})
cloudpickle_register(ind=1)
execute_parallel_tasks(
future_queue=q1,
cores=1,
openmpi_oversubscribe=False,
spawner=MpiExecSpawner,
cache_directory="./cache",
)
result1 = f1.result()
q1.join()
# Second execution (should hit cache)
f2 = Future()
q2 = Queue()
q2.put({"fn": calc, "args": (), "kwargs": {"i": 1}, "future": f2})
q2.put({"shutdown": True, "wait": True})
execute_parallel_tasks(
future_queue=q2,
cores=1,
openmpi_oversubscribe=False,
spawner=MpiExecSpawner,
cache_directory="./cache",
)
result2 = f2.result()
q2.join()
self.assertEqual(result1, 1)
self.assertEqual(result2, 1)
# Verify cache file exists
self.assertTrue(os.path.exists("./cache"))
Would you like me to generate additional test cases or open a GitHub issue to track this enhancement?
for more information, see https://pre-commit.ci
Summary by CodeRabbit
Bug Fixes
New Features
Tests
max_cores
tomax_workers
to better reflect executor instantiation changes.