diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index ff7ed4df..a0a44fb3 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -18,14 +18,15 @@ class ExecutorBase(FutureExecutor): Base class for the executor. Args: - FutureExecutor: Base class for the executor. + max_cores (int): defines the number cores which can be used in parallel """ - def __init__(self): + def __init__(self, max_cores: Optional[int] = None): """ Initialize the ExecutorBase class. """ cloudpickle_register(ind=3) + self._max_cores = max_cores self._future_queue: queue.Queue = queue.Queue() self._process: Optional[RaisingThread] = None @@ -86,6 +87,15 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Fut Returns: Future: A Future representing the given call. """ + cores = resource_dict.get("cores", None) + if ( + cores is not None + and self._max_cores is not None + and cores > self._max_cores + ): + raise ValueError( + "The specified number of cores is larger than the available number of cores." + ) check_resource_dict(function=fn) f = Future() self._future_queue.put( diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 3ecfd6f3..ea42bad8 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -48,7 +48,7 @@ def __init__( backend (str, optional): name of the backend used to spawn tasks. disable_dependencies (boolean): Disable resolving future objects during the submission. """ - super().__init__() + super().__init__(max_cores=None) default_resource_dict = { "cores": 1, "cwd": None, diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index cd5ce42c..e36301a9 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -60,7 +60,7 @@ def __init__( plot_dependency_graph: bool = False, **kwargs: Any, ) -> None: - super().__init__() + super().__init__(max_cores=kwargs.get("max_cores", None)) executor = create_executor(*args, **kwargs) self._set_process( RaisingThread( diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index aebca15d..d163cbfe 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -131,7 +131,7 @@ def __init__( executor_kwargs: dict = {}, spawner: BaseSpawner = MpiExecSpawner, ): - super().__init__() + super().__init__(max_cores=executor_kwargs.get("max_cores", None)) executor_kwargs["future_queue"] = self._future_queue executor_kwargs["spawner"] = spawner self._set_process( @@ -183,7 +183,7 @@ def __init__( executor_kwargs: dict = {}, spawner: BaseSpawner = MpiExecSpawner, ): - super().__init__() + super().__init__(max_cores=executor_kwargs.get("max_cores", None)) executor_kwargs["future_queue"] = self._future_queue executor_kwargs["spawner"] = spawner executor_kwargs["max_cores"] = max_cores diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index 9a4309b2..4876cbfc 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -52,6 +52,12 @@ def test_meta_executor_single(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) + def test_oversubscribe(self): + with self.assertRaises(ValueError): + with Executor(max_cores=1, backend="local", block_allocation=True) as exe: + cloudpickle_register(ind=1) + fs_1 = exe.submit(calc, 1, resource_dict={"cores": 2}) + @unittest.skipIf( skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." )