Skip to content

Commit

Permalink
Raise ValueError when requesting more cores than available
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Nov 9, 2024
1 parent b314e72 commit 0f2e0e0
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 6 deletions.
8 changes: 6 additions & 2 deletions executorlib/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ class ExecutorBase(FutureExecutor):
Base class for the executor.
Args:
FutureExecutor: Base class for the executor.
max_cores (int): defines the number cores which can be used in parallel
"""

def __init__(self):
def __init__(self, max_cores: Optional[int] = None):
"""
Initialize the ExecutorBase class.
"""
cloudpickle_register(ind=3)
self._max_cores = max_cores
self._future_queue: queue.Queue = queue.Queue()
self._process: Optional[RaisingThread] = None

Expand Down Expand Up @@ -86,6 +87,9 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Fut
Returns:
Future: A Future representing the given call.
"""
cores = resource_dict.get("cores", None)
if cores is not None and self._max_cores is not None and cores > self._max_cores:
raise ValueError("The specified number of cores is larger than the available number of cores.")
check_resource_dict(function=fn)
f = Future()
self._future_queue.put(
Expand Down
2 changes: 1 addition & 1 deletion executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(
backend (str, optional): name of the backend used to spawn tasks.
disable_dependencies (boolean): Disable resolving future objects during the submission.
"""
super().__init__()
super().__init__(max_cores=None)
default_resource_dict = {
"cores": 1,
"cwd": None,
Expand Down
2 changes: 1 addition & 1 deletion executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(
plot_dependency_graph: bool = False,
**kwargs: Any,
) -> None:
super().__init__()
super().__init__(max_cores=kwargs.get("max_cores", None))
executor = create_executor(*args, **kwargs)
self._set_process(
RaisingThread(
Expand Down
4 changes: 2 additions & 2 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def __init__(
executor_kwargs: dict = {},
spawner: BaseSpawner = MpiExecSpawner,
):
super().__init__()
super().__init__(max_cores=executor_kwargs.get("max_cores", None))
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
self._set_process(
Expand Down Expand Up @@ -183,7 +183,7 @@ def __init__(
executor_kwargs: dict = {},
spawner: BaseSpawner = MpiExecSpawner,
):
super().__init__()
super().__init__(max_cores=executor_kwargs.get("max_cores", None))
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
executor_kwargs["max_cores"] = max_cores
Expand Down
6 changes: 6 additions & 0 deletions tests/test_executor_backend_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ def test_meta_executor_single(self):
self.assertTrue(fs_1.done())
self.assertTrue(fs_2.done())

def test_oversubscribe(self):
with self.assertRaises(ValueError):
with Executor(max_cores=1, backend="local", block_allocation=True) as exe:
cloudpickle_register(ind=1)
fs_1 = exe.submit(calc, 1, resource_dict={"cores": 2})

@unittest.skipIf(
skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped."
)
Expand Down

0 comments on commit 0f2e0e0

Please sign in to comment.