From 74d01e1edee0b7afd6f605e9210637d467688d6c Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 15 Jan 2024 18:05:57 +0100 Subject: [PATCH 1/2] Use queue.Queue().get() rather than queue.Queue().get_nowait() --- pympipool/__init__.py | 7 ------- pympipool/flux/executor.py | 3 --- pympipool/mpi/executor.py | 4 +--- pympipool/shared/executorbase.py | 17 +++++------------ pympipool/shell/executor.py | 3 --- pympipool/slurm/executor.py | 2 -- 6 files changed, 6 insertions(+), 30 deletions(-) diff --git a/pympipool/__init__.py b/pympipool/__init__.py index 1567c721..4de0298c 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -38,7 +38,6 @@ class Executor: oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): synchronization interval - default 0.1 Examples: ``` @@ -70,7 +69,6 @@ def __init__( oversubscribe=False, init_function=None, cwd=None, - sleep_interval=0.1, executor=None, ): # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. @@ -85,7 +83,6 @@ def __new__( oversubscribe=False, init_function=None, cwd=None, - sleep_interval=0.1, executor=None, ): """ @@ -104,7 +101,6 @@ def __new__( oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): synchronization interval - default 0.1 """ if flux_installed: if oversubscribe: @@ -119,7 +115,6 @@ def __new__( gpus_per_worker=gpus_per_worker, init_function=init_function, cwd=cwd, - sleep_interval=sleep_interval, ) elif slurm_installed: return PySlurmExecutor( @@ -127,7 +122,6 @@ def __new__( cores_per_worker=cores_per_worker, init_function=init_function, cwd=cwd, - sleep_interval=sleep_interval, ) else: if threads_per_core != 1: @@ -149,5 +143,4 @@ def __new__( cores_per_worker=cores_per_worker, init_function=init_function, cwd=cwd, - sleep_interval=sleep_interval, ) diff --git a/pympipool/flux/executor.py b/pympipool/flux/executor.py index 9ab1e52d..342206b5 100644 --- a/pympipool/flux/executor.py +++ b/pympipool/flux/executor.py @@ -26,7 +26,6 @@ class PyFluxExecutor(ExecutorBase): gpus_per_worker (int): number of GPUs per worker - defaults to 0 init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): synchronization interval - default 0.1 executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux Examples: @@ -58,7 +57,6 @@ def __init__( gpus_per_worker=0, init_function=None, cwd=None, - sleep_interval=0.1, executor=None, ): super().__init__() @@ -68,7 +66,6 @@ def __init__( # Broker Arguments "future_queue": self._future_queue, "max_workers": max_workers, - "sleep_interval": sleep_interval, "executor_class": PyFluxSingleTaskExecutor, # Executor Arguments "cores": cores_per_worker, diff --git a/pympipool/mpi/executor.py b/pympipool/mpi/executor.py index 33b53f68..b849408d 100644 --- a/pympipool/mpi/executor.py +++ b/pympipool/mpi/executor.py @@ -22,7 +22,7 @@ class PyMPIExecutor(ExecutorBase): oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): synchronization interval - default 0.1 + Examples: @@ -52,7 +52,6 @@ def __init__( oversubscribe=False, init_function=None, cwd=None, - sleep_interval=0.1, ): super().__init__() self._process = RaisingThread( @@ -61,7 +60,6 @@ def __init__( # Broker Arguments "future_queue": self._future_queue, "max_workers": max_workers, - "sleep_interval": sleep_interval, "executor_class": PyMPISingleTaskExecutor, # Executor Arguments "cores": cores_per_worker, diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 7c8918cd..3ceab3bb 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -7,7 +7,6 @@ import os import queue import sys -from time import sleep import cloudpickle @@ -167,7 +166,6 @@ def executor_broker( future_queue, max_workers, executor_class, - sleep_interval=0.1, **kwargs, ): meta_future_lst = _get_executor_dict( @@ -176,17 +174,12 @@ def executor_broker( **kwargs, ) while True: - try: - task_dict = future_queue.get_nowait() - except queue.Empty: - sleep(sleep_interval) + if execute_task_dict(task_dict=future_queue.get(), meta_future_lst=meta_future_lst): + future_queue.task_done() else: - if execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): - future_queue.task_done() - else: - future_queue.task_done() - future_queue.join() - break + future_queue.task_done() + future_queue.join() + break def execute_task_dict(task_dict, meta_future_lst): diff --git a/pympipool/shell/executor.py b/pympipool/shell/executor.py index d9eda6fa..8be8ada7 100644 --- a/pympipool/shell/executor.py +++ b/pympipool/shell/executor.py @@ -67,7 +67,6 @@ class SubprocessExecutor(ExecutorBase): Args: max_workers (int): defines the number workers which can execute functions in parallel - sleep_interval (float): synchronization interval - default 0.1 Examples: @@ -82,7 +81,6 @@ class SubprocessExecutor(ExecutorBase): def __init__( self, max_workers=1, - sleep_interval=0.1, ): super().__init__() self._process = RaisingThread( @@ -91,7 +89,6 @@ def __init__( # Broker Arguments "future_queue": self._future_queue, "max_workers": max_workers, - "sleep_interval": sleep_interval, "executor_class": SubprocessSingleExecutor, }, ) diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index 1414493e..aa3c61e7 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -55,7 +55,6 @@ def __init__( oversubscribe=False, init_function=None, cwd=None, - sleep_interval=0.1, ): super().__init__() self._process = RaisingThread( @@ -64,7 +63,6 @@ def __init__( # Broker Arguments "future_queue": self._future_queue, "max_workers": max_workers, - "sleep_interval": sleep_interval, "executor_class": PySlurmSingleTaskExecutor, # Executor Arguments "cores": cores_per_worker, From 35884a4b082c084b719c25559af4bf6e3fd701ae Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Tue, 16 Jan 2024 00:56:15 +0100 Subject: [PATCH 2/2] black formatting --- pympipool/shared/executorbase.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 3ceab3bb..431c25a5 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -174,7 +174,9 @@ def executor_broker( **kwargs, ) while True: - if execute_task_dict(task_dict=future_queue.get(), meta_future_lst=meta_future_lst): + if execute_task_dict( + task_dict=future_queue.get(), meta_future_lst=meta_future_lst + ): future_queue.task_done() else: future_queue.task_done()