Skip to content

Commit

Permalink
Merge pull request #258 from pyiron/localhost
Browse files Browse the repository at this point in the history
add additional parameter to control if hostname of localhost is used
  • Loading branch information
jan-janssen authored Jan 31, 2024
2 parents 94b23a4 + ecb96ce commit eca7c31
Show file tree
Hide file tree
Showing 15 changed files with 71 additions and 69 deletions.
35 changes: 0 additions & 35 deletions .github/workflows/check-macos-latest.yml

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/unittest-mpich.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
strategy:
matrix:
include:
- operating-system: macos-11
- operating-system: macos-latest
python-version: '3.12'
label: osx-64-py-3-12-mpich
prefix: /Users/runner/miniconda3/envs/my-env
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/unittest-openmpi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
strategy:
matrix:
include:
- operating-system: macos-11
- operating-system: macos-latest
python-version: '3.12'
label: osx-64-py-3-12-openmpi
prefix: /Users/runner/miniconda3/envs/my-env
Expand Down
5 changes: 5 additions & 0 deletions pympipool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(
cwd=None,
sleep_interval=0.1,
executor=None,
hostname_localhost=False,
):
# Use __new__() instead of __init__(). This function is only implemented to enable auto-completion.
pass
Expand All @@ -87,6 +88,7 @@ def __new__(
cwd=None,
sleep_interval=0.1,
executor=None,
hostname_localhost=False,
):
"""
Instead of returning a pympipool.Executor object this function returns either a pympipool.mpi.PyMPIExecutor,
Expand Down Expand Up @@ -120,6 +122,7 @@ def __new__(
init_function=init_function,
cwd=cwd,
sleep_interval=sleep_interval,
hostname_localhost=hostname_localhost,
)
elif slurm_installed:
return PySlurmExecutor(
Expand All @@ -128,6 +131,7 @@ def __new__(
init_function=init_function,
cwd=cwd,
sleep_interval=sleep_interval,
hostname_localhost=hostname_localhost,
)
else:
if threads_per_core != 1:
Expand All @@ -150,4 +154,5 @@ def __new__(
init_function=init_function,
cwd=cwd,
sleep_interval=sleep_interval,
hostname_localhost=hostname_localhost,
)
7 changes: 7 additions & 0 deletions pympipool/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class PyFluxExecutor(ExecutorBase):
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
hostname_localhost (boolean): use localhost as hostname to establish the zmq connection
Examples:
Expand Down Expand Up @@ -60,6 +61,7 @@ def __init__(
cwd=None,
sleep_interval=0.1,
executor=None,
hostname_localhost=False,
):
super().__init__()
self._process = RaisingThread(
Expand All @@ -69,6 +71,7 @@ def __init__(
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"hostname_localhost": hostname_localhost,
"executor_class": PyFluxSingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
Expand All @@ -93,6 +96,8 @@ class PyFluxSingleTaskExecutor(ExecutorBase):
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
hostname_localhost (boolean): use localhost as hostname to establish the zmq connection
"""

def __init__(
Expand All @@ -103,6 +108,7 @@ def __init__(
init_function=None,
cwd=None,
executor=None,
hostname_localhost=False,
):
super().__init__()
self._process = RaisingThread(
Expand All @@ -112,6 +118,7 @@ def __init__(
"future_queue": self._future_queue,
"cores": cores,
"interface_class": FluxPythonInterface,
"hostname_localhost": hostname_localhost,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": gpus_per_task,
Expand Down
6 changes: 6 additions & 0 deletions pympipool/mpi/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class PyMPIExecutor(ExecutorBase):
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
hostname_localhost (boolean): use localhost as hostname to establish the zmq connection
Examples:
Expand Down Expand Up @@ -53,6 +54,7 @@ def __init__(
init_function=None,
cwd=None,
sleep_interval=0.1,
hostname_localhost=False,
):
super().__init__()
self._process = RaisingThread(
Expand All @@ -63,6 +65,7 @@ def __init__(
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"executor_class": PyMPISingleTaskExecutor,
"hostname_localhost": hostname_localhost,
# Executor Arguments
"cores": cores_per_worker,
"oversubscribe": oversubscribe,
Expand All @@ -82,6 +85,7 @@ class PyMPISingleTaskExecutor(ExecutorBase):
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
hostname_localhost (boolean): use localhost as hostname to establish the zmq connection
"""

Expand All @@ -91,6 +95,7 @@ def __init__(
oversubscribe=False,
init_function=None,
cwd=None,
hostname_localhost=False,
):
super().__init__()
self._process = RaisingThread(
Expand All @@ -103,6 +108,7 @@ def __init__(
# Interface Arguments
"cwd": cwd,
"oversubscribe": oversubscribe,
"hostname_localhost": hostname_localhost,
},
)
self._process.start()
Expand Down
10 changes: 6 additions & 4 deletions pympipool/shared/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,13 @@ def __del__(self):
def interface_bootup(
command_lst,
connections,
hostname_localhost=False,
):
command_lst += [
"--host",
gethostname(),
]
if not hostname_localhost:
command_lst += [
"--host",
gethostname(),
]
interface = SocketInterface(interface=connections)
command_lst += [
"--zmqport",
Expand Down
3 changes: 3 additions & 0 deletions pympipool/shared/executorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def execute_parallel_tasks(
future_queue,
cores,
interface_class,
hostname_localhost=False,
**kwargs,
):
"""
Expand All @@ -128,11 +129,13 @@ def execute_parallel_tasks(
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
cores (int): defines the total number of MPI ranks to use
interface_class:
hostname_localhost (boolean): use localhost as hostname to establish the zmq connection
"""
execute_parallel_tasks_loop(
interface=interface_bootup(
command_lst=_get_backend_path(cores=cores),
connections=interface_class(cores=cores, **kwargs),
hostname_localhost=hostname_localhost,
),
future_queue=future_queue,
)
Expand Down
6 changes: 6 additions & 0 deletions pympipool/slurm/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class PySlurmExecutor(ExecutorBase):
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
hostname_localhost (boolean): use localhost as hostname to establish the zmq connection
Examples:
Expand Down Expand Up @@ -56,6 +57,7 @@ def __init__(
init_function=None,
cwd=None,
sleep_interval=0.1,
hostname_localhost=False,
):
super().__init__()
self._process = RaisingThread(
Expand All @@ -65,6 +67,7 @@ def __init__(
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"hostname_localhost": hostname_localhost,
"executor_class": PySlurmSingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
Expand All @@ -89,6 +92,7 @@ class PySlurmSingleTaskExecutor(ExecutorBase):
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
hostname_localhost (boolean): use localhost as hostname to establish the zmq connection
"""

Expand All @@ -100,6 +104,7 @@ def __init__(
oversubscribe=False,
init_function=None,
cwd=None,
hostname_localhost=False,
):
super().__init__()
self._process = RaisingThread(
Expand All @@ -114,6 +119,7 @@ def __init__(
"gpus_per_core": gpus_per_task,
"cwd": cwd,
"oversubscribe": oversubscribe,
"hostname_localhost": hostname_localhost,
},
)
self._process.start()
Expand Down
8 changes: 4 additions & 4 deletions tests/test_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def calc(i):

class TestFuture(unittest.TestCase):
def test_pool_serial(self):
with PyMPISingleTaskExecutor(cores=1) as p:
with PyMPISingleTaskExecutor(cores=1, hostname_localhost=True) as p:
output = p.submit(calc, i=2)
self.assertTrue(isinstance(output, Future))
self.assertFalse(output.done())
Expand All @@ -20,7 +20,7 @@ def test_pool_serial(self):
self.assertEqual(output.result(), np.array(4))

def test_pool_serial_multi_core(self):
with PyMPISingleTaskExecutor(cores=2) as p:
with PyMPISingleTaskExecutor(cores=2, hostname_localhost=True) as p:
output = p.submit(calc, i=2)
self.assertTrue(isinstance(output, Future))
self.assertFalse(output.done())
Expand Down Expand Up @@ -48,7 +48,7 @@ def callback(future):
def submit():
# Executor only exists in this scope and can get garbage collected after
# this function is exits
future = PyMPISingleTaskExecutor().submit(slow_callable)
future = PyMPISingleTaskExecutor(hostname_localhost=True).submit(slow_callable)
future.add_done_callback(callback)
return future

Expand Down Expand Up @@ -84,7 +84,7 @@ def __init__(self):
def run(self):
self.running = True

future = PyMPISingleTaskExecutor().submit(self.return_42)
future = PyMPISingleTaskExecutor(hostname_localhost=True).submit(self.return_42)
future.add_done_callback(self.finished)

return future
Expand Down
15 changes: 9 additions & 6 deletions tests/test_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def test_meta_executor_future(self):
meta_future = _get_executor_dict(
max_workers=1,
executor_class=PyMPISingleTaskExecutor,
hostname_localhost=True,
)
future_obj = list(meta_future.keys())[0]
executor_obj = list(meta_future.values())[0]
Expand All @@ -47,6 +48,7 @@ def test_execute_task_dict(self):
meta_future_lst = _get_executor_dict(
max_workers=1,
executor_class=PyMPISingleTaskExecutor,
hostname_localhost=True,
)
f = Future()
self.assertTrue(
Expand All @@ -68,6 +70,7 @@ def test_execute_task_dict_error(self):
meta_future_lst = _get_executor_dict(
max_workers=1,
executor_class=PyMPISingleTaskExecutor,
hostname_localhost=True,
)
with self.assertRaises(ValueError):
execute_task_dict(task_dict={}, meta_future_lst=meta_future_lst)
Expand All @@ -78,15 +81,15 @@ def test_executor_broker(self):
f = Future()
q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f})
q.put({"shutdown": True, "wait": True})
executor_broker(future_queue=q, max_workers=1, executor_class=PyMPISingleTaskExecutor)
executor_broker(future_queue=q, max_workers=1, executor_class=PyMPISingleTaskExecutor, hostname_localhost=True)
self.assertTrue(f.done())
self.assertEqual(f.result(), 1)
q.join()


class TestMetaExecutor(unittest.TestCase):
def test_meta_executor_serial(self):
with PyMPIExecutor(max_workers=2) as exe:
with PyMPIExecutor(max_workers=2, hostname_localhost=True) as exe:
fs_1 = exe.submit(calc, 1)
fs_2 = exe.submit(calc, 2)
self.assertEqual(fs_1.result(), 1)
Expand All @@ -95,7 +98,7 @@ def test_meta_executor_serial(self):
self.assertTrue(fs_2.done())

def test_meta_executor_single(self):
with PyMPIExecutor(max_workers=1) as exe:
with PyMPIExecutor(max_workers=1, hostname_localhost=True) as exe:
fs_1 = exe.submit(calc, 1)
fs_2 = exe.submit(calc, 2)
self.assertEqual(fs_1.result(), 1)
Expand All @@ -104,13 +107,13 @@ def test_meta_executor_single(self):
self.assertTrue(fs_2.done())

def test_meta_executor_parallel(self):
with PyMPIExecutor(max_workers=1, cores_per_worker=2) as exe:
with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as exe:
fs_1 = exe.submit(mpi_funct, 1)
self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)])
self.assertTrue(fs_1.done())

def test_errors(self):
with self.assertRaises(TypeError):
PyMPIExecutor(max_workers=1, cores_per_worker=1, threads_per_core=2)
PyMPIExecutor(max_workers=1, cores_per_worker=1, threads_per_core=2, hostname_localhost=True)
with self.assertRaises(TypeError):
PyMPIExecutor(max_workers=1, cores_per_worker=1, gpus_per_worker=1)
PyMPIExecutor(max_workers=1, cores_per_worker=1, gpus_per_worker=1, hostname_localhost=True)
Loading

0 comments on commit eca7c31

Please sign in to comment.