Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use queue.Queue().get() rather than queue.Queue().get_nowait() #247

Merged
merged 5 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions pympipool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class Executor:
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -77,7 +76,6 @@ def __init__(
oversubscribe=False,
init_function=None,
cwd=None,
sleep_interval=0.1,
executor=None,
hostname_localhost=False,
):
Expand All @@ -93,7 +91,6 @@ def __new__(
oversubscribe=False,
init_function=None,
cwd=None,
sleep_interval=0.1,
executor=None,
hostname_localhost=False,
):
Expand All @@ -113,7 +110,6 @@ def __new__(
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand All @@ -136,7 +132,6 @@ def __new__(
gpus_per_worker=gpus_per_worker,
init_function=init_function,
cwd=cwd,
sleep_interval=sleep_interval,
hostname_localhost=hostname_localhost,
)
elif slurm_installed:
Expand All @@ -145,7 +140,6 @@ def __new__(
cores_per_worker=cores_per_worker,
init_function=init_function,
cwd=cwd,
sleep_interval=sleep_interval,
hostname_localhost=hostname_localhost,
)
else:
Expand All @@ -168,6 +162,5 @@ def __new__(
cores_per_worker=cores_per_worker,
init_function=init_function,
cwd=cwd,
sleep_interval=sleep_interval,
hostname_localhost=hostname_localhost,
)
3 changes: 0 additions & 3 deletions pympipool/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class PyFluxExecutor(ExecutorBase):
gpus_per_worker (int): number of GPUs per worker - defaults to 0
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down Expand Up @@ -65,7 +64,6 @@ def __init__(
gpus_per_worker=0,
init_function=None,
cwd=None,
sleep_interval=0.1,
executor=None,
hostname_localhost=False,
):
Expand All @@ -76,7 +74,6 @@ def __init__(
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"hostname_localhost": hostname_localhost,
"executor_class": PyFluxSingleTaskExecutor,
# Executor Arguments
Expand Down
3 changes: 0 additions & 3 deletions pympipool/mpi/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class PyMPIExecutor(ExecutorBase):
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -59,7 +58,6 @@ def __init__(
oversubscribe=False,
init_function=None,
cwd=None,
sleep_interval=0.1,
hostname_localhost=False,
):
super().__init__()
Expand All @@ -69,7 +67,6 @@ def __init__(
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"executor_class": PyMPISingleTaskExecutor,
"hostname_localhost": hostname_localhost,
# Executor Arguments
Expand Down
19 changes: 7 additions & 12 deletions pympipool/shared/executorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
import queue
import sys
from time import sleep

import cloudpickle

Expand Down Expand Up @@ -176,7 +175,6 @@ def executor_broker(
future_queue,
max_workers,
executor_class,
sleep_interval=0.1,
**kwargs,
):
meta_future_lst = _get_executor_dict(
Expand All @@ -185,17 +183,14 @@ def executor_broker(
**kwargs,
)
while True:
try:
task_dict = future_queue.get_nowait()
except queue.Empty:
sleep(sleep_interval)
if execute_task_dict(
task_dict=future_queue.get(), meta_future_lst=meta_future_lst
):
future_queue.task_done()
else:
if execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst):
future_queue.task_done()
else:
future_queue.task_done()
future_queue.join()
break
future_queue.task_done()
future_queue.join()
break


def execute_task_dict(task_dict, meta_future_lst):
Expand Down
3 changes: 0 additions & 3 deletions pympipool/shell/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ class SubprocessExecutor(ExecutorBase):

Args:
max_workers (int): defines the number workers which can execute functions in parallel
sleep_interval (float): synchronization interval - default 0.1

Examples:

Expand All @@ -82,7 +81,6 @@ class SubprocessExecutor(ExecutorBase):
def __init__(
self,
max_workers=1,
sleep_interval=0.1,
):
super().__init__()
self._process = RaisingThread(
Expand All @@ -91,7 +89,6 @@ def __init__(
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"executor_class": SubprocessSingleExecutor,
},
)
Expand Down
2 changes: 0 additions & 2 deletions pympipool/slurm/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ def __init__(
oversubscribe=False,
init_function=None,
cwd=None,
sleep_interval=0.1,
hostname_localhost=False,
):
super().__init__()
Expand All @@ -72,7 +71,6 @@ def __init__(
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"hostname_localhost": hostname_localhost,
"executor_class": PySlurmSingleTaskExecutor,
# Executor Arguments
Expand Down
Loading