diff --git a/pympipool/__init__.py b/pympipool/__init__.py index 15cdb67a..aa9bc881 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -38,7 +38,6 @@ class Executor: oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): synchronization interval - default 0.1 hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -77,7 +76,6 @@ def __init__( oversubscribe=False, init_function=None, cwd=None, - sleep_interval=0.1, executor=None, hostname_localhost=False, ): @@ -93,7 +91,6 @@ def __new__( oversubscribe=False, init_function=None, cwd=None, - sleep_interval=0.1, executor=None, hostname_localhost=False, ): @@ -113,7 +110,6 @@ def __new__( oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): synchronization interval - default 0.1 hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -136,7 +132,6 @@ def __new__( gpus_per_worker=gpus_per_worker, init_function=init_function, cwd=cwd, - sleep_interval=sleep_interval, hostname_localhost=hostname_localhost, ) elif slurm_installed: @@ -145,7 +140,6 @@ def __new__( cores_per_worker=cores_per_worker, init_function=init_function, cwd=cwd, - sleep_interval=sleep_interval, hostname_localhost=hostname_localhost, ) else: @@ -168,6 +162,5 @@ def __new__( cores_per_worker=cores_per_worker, init_function=init_function, cwd=cwd, - sleep_interval=sleep_interval, hostname_localhost=hostname_localhost, ) diff --git a/pympipool/flux/executor.py b/pympipool/flux/executor.py index 4e5b7b81..31a724ed 100644 --- a/pympipool/flux/executor.py +++ b/pympipool/flux/executor.py @@ -26,7 +26,6 @@ class PyFluxExecutor(ExecutorBase): gpus_per_worker (int): number of GPUs per worker - defaults to 0 init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): synchronization interval - default 0.1 executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an @@ -65,7 +64,6 @@ def __init__( gpus_per_worker=0, init_function=None, cwd=None, - sleep_interval=0.1, executor=None, hostname_localhost=False, ): @@ -76,7 +74,6 @@ def __init__( # Broker Arguments "future_queue": self._future_queue, "max_workers": max_workers, - "sleep_interval": sleep_interval, "hostname_localhost": hostname_localhost, "executor_class": PyFluxSingleTaskExecutor, # Executor Arguments diff --git a/pympipool/mpi/executor.py b/pympipool/mpi/executor.py index 94add9f5..eefa2431 100644 --- a/pympipool/mpi/executor.py +++ b/pympipool/mpi/executor.py @@ -22,7 +22,6 @@ class PyMPIExecutor(ExecutorBase): oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): synchronization interval - default 0.1 hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -59,7 +58,6 @@ def __init__( oversubscribe=False, init_function=None, cwd=None, - sleep_interval=0.1, hostname_localhost=False, ): super().__init__() @@ -69,7 +67,6 @@ def __init__( # Broker Arguments "future_queue": self._future_queue, "max_workers": max_workers, - "sleep_interval": sleep_interval, "executor_class": PyMPISingleTaskExecutor, "hostname_localhost": hostname_localhost, # Executor Arguments diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 40f4a96a..b0bc3cd7 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -7,7 +7,6 @@ import os import queue import sys -from time import sleep import cloudpickle @@ -176,7 +175,6 @@ def executor_broker( future_queue, max_workers, executor_class, - sleep_interval=0.1, **kwargs, ): meta_future_lst = _get_executor_dict( @@ -185,17 +183,14 @@ def executor_broker( **kwargs, ) while True: - try: - task_dict = future_queue.get_nowait() - except queue.Empty: - sleep(sleep_interval) + if execute_task_dict( + task_dict=future_queue.get(), meta_future_lst=meta_future_lst + ): + future_queue.task_done() else: - if execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): - future_queue.task_done() - else: - future_queue.task_done() - future_queue.join() - break + future_queue.task_done() + future_queue.join() + break def execute_task_dict(task_dict, meta_future_lst): diff --git a/pympipool/shell/executor.py b/pympipool/shell/executor.py index d9eda6fa..8be8ada7 100644 --- a/pympipool/shell/executor.py +++ b/pympipool/shell/executor.py @@ -67,7 +67,6 @@ class SubprocessExecutor(ExecutorBase): Args: max_workers (int): defines the number workers which can execute functions in parallel - sleep_interval (float): synchronization interval - default 0.1 Examples: @@ -82,7 +81,6 @@ class SubprocessExecutor(ExecutorBase): def __init__( self, max_workers=1, - sleep_interval=0.1, ): super().__init__() self._process = RaisingThread( @@ -91,7 +89,6 @@ def __init__( # Broker Arguments "future_queue": self._future_queue, "max_workers": max_workers, - "sleep_interval": sleep_interval, "executor_class": SubprocessSingleExecutor, }, ) diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index 397d61b0..4824441f 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -62,7 +62,6 @@ def __init__( oversubscribe=False, init_function=None, cwd=None, - sleep_interval=0.1, hostname_localhost=False, ): super().__init__() @@ -72,7 +71,6 @@ def __init__( # Broker Arguments "future_queue": self._future_queue, "max_workers": max_workers, - "sleep_interval": sleep_interval, "hostname_localhost": hostname_localhost, "executor_class": PySlurmSingleTaskExecutor, # Executor Arguments