-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Try to skip BlockingIOError #509
Conversation
Check out this pull request on See visual diffs & provide feedback on Jupyter Notebooks. Powered by ReviewNB |
WalkthroughThe pull request introduces modifications to two files: Changes
Possibly related PRs
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (1)
executorlib/standalone/hdf.py (1)
Line range hint
1-90
: Consider a more systematic approach to error handlingWhile handling BlockingIOError is a good improvement, the current implementation could benefit from a more systematic approach:
- Implement a centralized retry mechanism (as suggested above)
- Consider adding logging to track retry attempts
- Consider implementing a context manager for consistent file handling across all functions
- Add proper error handling for other potential IOErrors
Example context manager implementation:
from contextlib import contextmanager import logging @contextmanager def safe_hdf5_access(file_name: str, mode: str = "r"): """Context manager for safe HDF5 file access with retry logic.""" attempts = 0 max_attempts = 3 while attempts < max_attempts: try: with h5py.File(name=file_name, mode=mode) as hdf: yield hdf break except BlockingIOError: attempts += 1 if attempts == max_attempts: raise delay = 0.1 * attempts logging.warning(f"BlockingIOError on {file_name}, retry {attempts} after {delay}s") time.sleep(delay)This would simplify all functions to use:
def get_output(file_name: str) -> Tuple[bool, object]: with safe_hdf5_access(file_name) as hdf: if "output" in hdf: return True, cloudpickle.loads(np.void(hdf["/output"])) return False, None
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
executorlib/standalone/hdf.py
(3 hunks)notebooks/2-hpc-submission.ipynb
(1 hunks)
🔇 Additional comments (4)
notebooks/2-hpc-submission.ipynb (4)
Line range hint 67-96
: LGTM! Well-structured code examples.
The code examples effectively demonstrate both SLURM and Flux usage with proper resource specifications and template configurations.
Line range hint 180-191
: LGTM! Proper cache cleanup implementation.
The cleanup section correctly handles the removal of cached files with appropriate error handling.
1-1
: Verify alignment with PR objective.
The PR objective mentions "Try to skip BlockingIOError", but there's no visible error handling in the notebook. This change appears to be implemented in executorlib/standalone/hdf.py
but not demonstrated in this notebook.
Let's check if there are any error handling examples in the repository:
1-1
: LGTM! Verify Flux kernel availability.
The kernel change from Python 3 to Flux and Python version update to 3.12.7 align well with the notebook's content about the Flux framework.
Let's verify the Flux kernel is properly configured:
✅ Verification successful
Flux kernel configuration is properly documented
The kernel change is well-supported by the repository:
- Installation instructions for the Flux kernel are documented in
docs/installation.md
- The
binder/postBuild
script includes automated setup of the Flux kernel - The notebook content demonstrates Flux-specific features
The kernel change from Python 3 to Flux aligns with the notebook's content about the Flux framework and is properly supported by the repository's infrastructure. No issues found with this change.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check if Flux kernel is available in Jupyter
jupyter kernelspec list 2>/dev/null | grep -A 1 "flux"
Length of output: 42
Script:
#!/bin/bash
# Let's check the notebook metadata and content more thoroughly
cat notebooks/2-hpc-submission.ipynb | jq -r '.metadata.kernelspec'
# Also check if there are any other notebooks with Flux kernel references
fd ".ipynb$" | xargs cat | jq -r 'select(.metadata.kernelspec.name == "flux") | .metadata.kernelspec'
Length of output: 313
Script:
#!/bin/bash
# Let's check if there are any installation or setup instructions for the Flux kernel
rg -i "flux.*kernel|kernel.*flux" -A 5 -B 5
# Also check if there are any related configuration files
fd -e yaml -e json -e toml -e ini | xargs rg -l "flux.*kernel|kernel.*flux"
Length of output: 13970
@@ -20,7 +20,7 @@ def dump(file_name: str, data_dict: dict) -> None: | |||
"output": "output", | |||
"queue_id": "queue_id", | |||
} | |||
with h5py.File(file_name, "a") as fname: | |||
with h5py.File(name=file_name, mode="a") as fname: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add BlockingIOError handling for consistency
While the keyword argument usage improves clarity, this function should handle BlockingIOError consistently with other functions in this file.
Consider wrapping the file operation in a try-except block:
- with h5py.File(name=file_name, mode="a") as fname:
+ try:
+ with h5py.File(name=file_name, mode="a") as fname:
+ for data_key, data_value in data_dict.items():
+ if data_key in group_dict.keys():
+ fname.create_dataset(
+ name="/" + group_dict[data_key],
+ data=np.void(cloudpickle.dumps(data_value)),
+ )
+ except BlockingIOError:
+ return dump(file_name=file_name, data_dict=data_dict)
Committable suggestion skipped: line range outside the PR's diff.
try: | ||
with h5py.File(name=file_name, mode="r") as hdf: | ||
if "output" in hdf: | ||
return True, cloudpickle.loads(np.void(hdf["/output"])) | ||
else: | ||
return False, None | ||
except BlockingIOError: | ||
return get_output(file_name=file_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apply the same retry mechanism here
This function has the same unbounded recursion issues as discussed in the load
function.
Apply the retry decorator pattern suggested above:
+@retry_on_blocking()
def get_output(file_name: str) -> Tuple[bool, object]:
- try:
- with h5py.File(name=file_name, mode="r") as hdf:
- # ... existing code ...
- except BlockingIOError:
- return get_output(file_name=file_name)
+ with h5py.File(name=file_name, mode="r") as hdf:
+ # ... rest of the existing code ...
Committable suggestion skipped: line range outside the PR's diff.
try: | ||
with h5py.File(name=file_name, mode="r") as hdf: | ||
if "queue_id" in hdf: | ||
return cloudpickle.loads(np.void(hdf["/queue_id"])) | ||
else: | ||
return None | ||
except BlockingIOError: | ||
return get_queue_id(file_name=file_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apply retry mechanism and improve type safety
This function has two issues:
- Same unbounded recursion issues as other functions
- Type safety: The function's return type hint suggests Optional[int] but it could return other types if stored in HDF5
Apply the retry decorator and add type checking:
+@retry_on_blocking()
def get_queue_id(file_name: str) -> Optional[int]:
- try:
- with h5py.File(name=file_name, mode="r") as hdf:
- if "queue_id" in hdf:
- return cloudpickle.loads(np.void(hdf["/queue_id"]))
- else:
- return None
- except BlockingIOError:
- return get_queue_id(file_name=file_name)
+ with h5py.File(name=file_name, mode="r") as hdf:
+ if "queue_id" in hdf:
+ value = cloudpickle.loads(np.void(hdf["/queue_id"]))
+ if not isinstance(value, int):
+ raise TypeError(f"Queue ID must be an integer, got {type(value)}")
+ return value
+ return None
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
try: | |
with h5py.File(name=file_name, mode="r") as hdf: | |
if "queue_id" in hdf: | |
return cloudpickle.loads(np.void(hdf["/queue_id"])) | |
else: | |
return None | |
except BlockingIOError: | |
return get_queue_id(file_name=file_name) | |
@retry_on_blocking() | |
def get_queue_id(file_name: str) -> Optional[int]: | |
with h5py.File(name=file_name, mode="r") as hdf: | |
if "queue_id" in hdf: | |
value = cloudpickle.loads(np.void(hdf["/queue_id"])) | |
if not isinstance(value, int): | |
raise TypeError(f"Queue ID must be an integer, got {type(value)}") | |
return value | |
return None |
try: | ||
with h5py.File(name=file_name, mode="r") as hdf: | ||
data_dict = {} | ||
if "function" in hdf: | ||
data_dict["fn"] = cloudpickle.loads(np.void(hdf["/function"])) | ||
else: | ||
raise TypeError("Function not found in HDF5 file.") | ||
if "input_args" in hdf: | ||
data_dict["args"] = cloudpickle.loads(np.void(hdf["/input_args"])) | ||
else: | ||
data_dict["args"] = () | ||
if "input_kwargs" in hdf: | ||
data_dict["kwargs"] = cloudpickle.loads(np.void(hdf["/input_kwargs"])) | ||
else: | ||
data_dict["kwargs"] = {} | ||
return data_dict | ||
except BlockingIOError: | ||
return load(file_name=file_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Improve error handling robustness
The current implementation has several issues:
- Unbounded recursion could lead to stack overflow
- Immediate retries without delay could cause CPU thrashing
- Other potential IOErrors are not handled
Consider implementing a more robust retry mechanism:
+import time
+from functools import wraps
+
+def retry_on_blocking(max_attempts=3, delay=0.1):
+ def decorator(func):
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ attempts = 0
+ while attempts < max_attempts:
+ try:
+ return func(*args, **kwargs)
+ except BlockingIOError:
+ attempts += 1
+ if attempts == max_attempts:
+ raise
+ time.sleep(delay * attempts) # Exponential backoff
+ return func(*args, **kwargs)
+ return wrapper
+ return decorator
+
+@retry_on_blocking()
def load(file_name: str) -> dict:
- try:
- with h5py.File(name=file_name, mode="r") as hdf:
- # ... existing code ...
- except BlockingIOError:
- return load(file_name=file_name)
+ with h5py.File(name=file_name, mode="r") as hdf:
+ # ... rest of the existing code ...
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
try: | |
with h5py.File(name=file_name, mode="r") as hdf: | |
data_dict = {} | |
if "function" in hdf: | |
data_dict["fn"] = cloudpickle.loads(np.void(hdf["/function"])) | |
else: | |
raise TypeError("Function not found in HDF5 file.") | |
if "input_args" in hdf: | |
data_dict["args"] = cloudpickle.loads(np.void(hdf["/input_args"])) | |
else: | |
data_dict["args"] = () | |
if "input_kwargs" in hdf: | |
data_dict["kwargs"] = cloudpickle.loads(np.void(hdf["/input_kwargs"])) | |
else: | |
data_dict["kwargs"] = {} | |
return data_dict | |
except BlockingIOError: | |
return load(file_name=file_name) | |
import time | |
from functools import wraps | |
def retry_on_blocking(max_attempts=3, delay=0.1): | |
def decorator(func): | |
@wraps(func) | |
def wrapper(*args, **kwargs): | |
attempts = 0 | |
while attempts < max_attempts: | |
try: | |
return func(*args, **kwargs) | |
except BlockingIOError: | |
attempts += 1 | |
if attempts == max_attempts: | |
raise | |
time.sleep(delay * attempts) # Exponential backoff | |
return func(*args, **kwargs) | |
return wrapper | |
return decorator | |
@retry_on_blocking() | |
def load(file_name: str) -> dict: | |
with h5py.File(name=file_name, mode="r") as hdf: | |
data_dict = {} | |
if "function" in hdf: | |
data_dict["fn"] = cloudpickle.loads(np.void(hdf["/function"])) | |
else: | |
raise TypeError("Function not found in HDF5 file.") | |
if "input_args" in hdf: | |
data_dict["args"] = cloudpickle.loads(np.void(hdf["/input_args"])) | |
else: | |
data_dict["args"] = () | |
if "input_kwargs" in hdf: | |
data_dict["kwargs"] = cloudpickle.loads(np.void(hdf["/input_kwargs"])) | |
else: | |
data_dict["kwargs"] = {} | |
return data_dict |
Summary by CodeRabbit
New Features
Bug Fixes
Documentation