Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise ValueError when requesting more cores than available #489

Merged
merged 2 commits into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions executorlib/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ class ExecutorBase(FutureExecutor):
Base class for the executor.

Args:
FutureExecutor: Base class for the executor.
max_cores (int): defines the number cores which can be used in parallel
"""

def __init__(self):
def __init__(self, max_cores: Optional[int] = None):
"""
Initialize the ExecutorBase class.
"""
cloudpickle_register(ind=3)
self._max_cores = max_cores
self._future_queue: queue.Queue = queue.Queue()
self._process: Optional[RaisingThread] = None

Expand Down Expand Up @@ -86,6 +87,15 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Fut
Returns:
Future: A Future representing the given call.
"""
cores = resource_dict.get("cores", None)
if (
cores is not None
and self._max_cores is not None
and cores > self._max_cores
):
raise ValueError(
"The specified number of cores is larger than the available number of cores."
)
check_resource_dict(function=fn)
f = Future()
self._future_queue.put(
Expand Down
2 changes: 1 addition & 1 deletion executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(
backend (str, optional): name of the backend used to spawn tasks.
disable_dependencies (boolean): Disable resolving future objects during the submission.
"""
super().__init__()
super().__init__(max_cores=None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: Passing None for max_cores bypasses core validation

Setting max_cores=None in the superclass constructor will disable the core validation mechanism, contradicting the PR's objective of raising ValueError when requesting more cores than available.

Consider this fix:

-        super().__init__(max_cores=None)
+        super().__init__(max_cores=resource_dict.get("cores"))

Committable suggestion skipped: line range outside the PR's diff.

default_resource_dict = {
"cores": 1,
"cwd": None,
Expand Down
2 changes: 1 addition & 1 deletion executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(
plot_dependency_graph: bool = False,
**kwargs: Any,
) -> None:
super().__init__()
super().__init__(max_cores=kwargs.get("max_cores", None))
executor = create_executor(*args, **kwargs)
self._set_process(
RaisingThread(
Expand Down
4 changes: 2 additions & 2 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def __init__(
executor_kwargs: dict = {},
spawner: BaseSpawner = MpiExecSpawner,
):
super().__init__()
super().__init__(max_cores=executor_kwargs.get("max_cores", None))
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
self._set_process(
Expand Down Expand Up @@ -183,7 +183,7 @@ def __init__(
executor_kwargs: dict = {},
spawner: BaseSpawner = MpiExecSpawner,
):
super().__init__()
super().__init__(max_cores=executor_kwargs.get("max_cores", None))
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
executor_kwargs["max_cores"] = max_cores
Expand Down
6 changes: 6 additions & 0 deletions tests/test_executor_backend_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ def test_meta_executor_single(self):
self.assertTrue(fs_1.done())
self.assertTrue(fs_2.done())

def test_oversubscribe(self):
with self.assertRaises(ValueError):
with Executor(max_cores=1, backend="local", block_allocation=True) as exe:
cloudpickle_register(ind=1)
fs_1 = exe.submit(calc, 1, resource_dict={"cores": 2})

@unittest.skipIf(
skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped."
)
Expand Down
Loading