Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to skip BlockingIOError #509

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 35 additions & 26 deletions executorlib/standalone/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def dump(file_name: str, data_dict: dict) -> None:
"output": "output",
"queue_id": "queue_id",
}
with h5py.File(file_name, "a") as fname:
with h5py.File(name=file_name, mode="a") as fname:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add BlockingIOError handling for consistency

While the keyword argument usage improves clarity, this function should handle BlockingIOError consistently with other functions in this file.

Consider wrapping the file operation in a try-except block:

-    with h5py.File(name=file_name, mode="a") as fname:
+    try:
+        with h5py.File(name=file_name, mode="a") as fname:
+            for data_key, data_value in data_dict.items():
+                if data_key in group_dict.keys():
+                    fname.create_dataset(
+                        name="/" + group_dict[data_key],
+                        data=np.void(cloudpickle.dumps(data_value)),
+                    )
+    except BlockingIOError:
+        return dump(file_name=file_name, data_dict=data_dict)

Committable suggestion skipped: line range outside the PR's diff.

for data_key, data_value in data_dict.items():
if data_key in group_dict.keys():
fname.create_dataset(
Expand All @@ -39,21 +39,24 @@ def load(file_name: str) -> dict:
Returns:
dict: dictionary containing the python function to be executed {"fn": ..., "args": (), "kwargs": {}}
"""
with h5py.File(file_name, "r") as hdf:
data_dict = {}
if "function" in hdf:
data_dict["fn"] = cloudpickle.loads(np.void(hdf["/function"]))
else:
raise TypeError("Function not found in HDF5 file.")
if "input_args" in hdf:
data_dict["args"] = cloudpickle.loads(np.void(hdf["/input_args"]))
else:
data_dict["args"] = ()
if "input_kwargs" in hdf:
data_dict["kwargs"] = cloudpickle.loads(np.void(hdf["/input_kwargs"]))
else:
data_dict["kwargs"] = {}
return data_dict
try:
with h5py.File(name=file_name, mode="r") as hdf:
data_dict = {}
if "function" in hdf:
data_dict["fn"] = cloudpickle.loads(np.void(hdf["/function"]))
else:
raise TypeError("Function not found in HDF5 file.")
if "input_args" in hdf:
data_dict["args"] = cloudpickle.loads(np.void(hdf["/input_args"]))
else:
data_dict["args"] = ()
if "input_kwargs" in hdf:
data_dict["kwargs"] = cloudpickle.loads(np.void(hdf["/input_kwargs"]))
else:
data_dict["kwargs"] = {}
return data_dict
except BlockingIOError:
return load(file_name=file_name)
Comment on lines +42 to +59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: Improve error handling robustness

The current implementation has several issues:

  1. Unbounded recursion could lead to stack overflow
  2. Immediate retries without delay could cause CPU thrashing
  3. Other potential IOErrors are not handled

Consider implementing a more robust retry mechanism:

+import time
+from functools import wraps
+
+def retry_on_blocking(max_attempts=3, delay=0.1):
+    def decorator(func):
+        @wraps(func)
+        def wrapper(*args, **kwargs):
+            attempts = 0
+            while attempts < max_attempts:
+                try:
+                    return func(*args, **kwargs)
+                except BlockingIOError:
+                    attempts += 1
+                    if attempts == max_attempts:
+                        raise
+                    time.sleep(delay * attempts)  # Exponential backoff
+            return func(*args, **kwargs)
+        return wrapper
+    return decorator
+
+@retry_on_blocking()
 def load(file_name: str) -> dict:
-    try:
-        with h5py.File(name=file_name, mode="r") as hdf:
-            # ... existing code ...
-    except BlockingIOError:
-        return load(file_name=file_name)
+    with h5py.File(name=file_name, mode="r") as hdf:
+        # ... rest of the existing code ...
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
with h5py.File(name=file_name, mode="r") as hdf:
data_dict = {}
if "function" in hdf:
data_dict["fn"] = cloudpickle.loads(np.void(hdf["/function"]))
else:
raise TypeError("Function not found in HDF5 file.")
if "input_args" in hdf:
data_dict["args"] = cloudpickle.loads(np.void(hdf["/input_args"]))
else:
data_dict["args"] = ()
if "input_kwargs" in hdf:
data_dict["kwargs"] = cloudpickle.loads(np.void(hdf["/input_kwargs"]))
else:
data_dict["kwargs"] = {}
return data_dict
except BlockingIOError:
return load(file_name=file_name)
import time
from functools import wraps
def retry_on_blocking(max_attempts=3, delay=0.1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
while attempts < max_attempts:
try:
return func(*args, **kwargs)
except BlockingIOError:
attempts += 1
if attempts == max_attempts:
raise
time.sleep(delay * attempts) # Exponential backoff
return func(*args, **kwargs)
return wrapper
return decorator
@retry_on_blocking()
def load(file_name: str) -> dict:
with h5py.File(name=file_name, mode="r") as hdf:
data_dict = {}
if "function" in hdf:
data_dict["fn"] = cloudpickle.loads(np.void(hdf["/function"]))
else:
raise TypeError("Function not found in HDF5 file.")
if "input_args" in hdf:
data_dict["args"] = cloudpickle.loads(np.void(hdf["/input_args"]))
else:
data_dict["args"] = ()
if "input_kwargs" in hdf:
data_dict["kwargs"] = cloudpickle.loads(np.void(hdf["/input_kwargs"]))
else:
data_dict["kwargs"] = {}
return data_dict



def get_output(file_name: str) -> Tuple[bool, object]:
Expand All @@ -66,16 +69,22 @@ def get_output(file_name: str) -> Tuple[bool, object]:
Returns:
Tuple[bool, object]: boolean flag indicating if output is available and the output object itself
"""
with h5py.File(file_name, "r") as hdf:
if "output" in hdf:
return True, cloudpickle.loads(np.void(hdf["/output"]))
else:
return False, None
try:
with h5py.File(name=file_name, mode="r") as hdf:
if "output" in hdf:
return True, cloudpickle.loads(np.void(hdf["/output"]))
else:
return False, None
except BlockingIOError:
return get_output(file_name=file_name)
Comment on lines +72 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Apply the same retry mechanism here

This function has the same unbounded recursion issues as discussed in the load function.

Apply the retry decorator pattern suggested above:

+@retry_on_blocking()
 def get_output(file_name: str) -> Tuple[bool, object]:
-    try:
-        with h5py.File(name=file_name, mode="r") as hdf:
-            # ... existing code ...
-    except BlockingIOError:
-        return get_output(file_name=file_name)
+    with h5py.File(name=file_name, mode="r") as hdf:
+        # ... rest of the existing code ...

Committable suggestion skipped: line range outside the PR's diff.



def get_queue_id(file_name: str) -> Optional[int]:
with h5py.File(file_name, "r") as hdf:
if "queue_id" in hdf:
return cloudpickle.loads(np.void(hdf["/queue_id"]))
else:
return None
try:
with h5py.File(name=file_name, mode="r") as hdf:
if "queue_id" in hdf:
return cloudpickle.loads(np.void(hdf["/queue_id"]))
else:
return None
except BlockingIOError:
return get_queue_id(file_name=file_name)
Comment on lines +83 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Apply retry mechanism and improve type safety

This function has two issues:

  1. Same unbounded recursion issues as other functions
  2. Type safety: The function's return type hint suggests Optional[int] but it could return other types if stored in HDF5

Apply the retry decorator and add type checking:

+@retry_on_blocking()
 def get_queue_id(file_name: str) -> Optional[int]:
-    try:
-        with h5py.File(name=file_name, mode="r") as hdf:
-            if "queue_id" in hdf:
-                return cloudpickle.loads(np.void(hdf["/queue_id"]))
-            else:
-                return None
-    except BlockingIOError:
-        return get_queue_id(file_name=file_name)
+    with h5py.File(name=file_name, mode="r") as hdf:
+        if "queue_id" in hdf:
+            value = cloudpickle.loads(np.void(hdf["/queue_id"]))
+            if not isinstance(value, int):
+                raise TypeError(f"Queue ID must be an integer, got {type(value)}")
+            return value
+        return None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
with h5py.File(name=file_name, mode="r") as hdf:
if "queue_id" in hdf:
return cloudpickle.loads(np.void(hdf["/queue_id"]))
else:
return None
except BlockingIOError:
return get_queue_id(file_name=file_name)
@retry_on_blocking()
def get_queue_id(file_name: str) -> Optional[int]:
with h5py.File(name=file_name, mode="r") as hdf:
if "queue_id" in hdf:
value = cloudpickle.loads(np.void(hdf["/queue_id"]))
if not isinstance(value, int):
raise TypeError(f"Queue ID must be an integer, got {type(value)}")
return value
return None

Loading
Loading