Skip to content

Commit

Permalink
Merge pull request #264 from pyiron/mpi_only_for_parallel
Browse files Browse the repository at this point in the history
Use mpiexec only for parallel execution
  • Loading branch information
jan-janssen authored Feb 18, 2024
2 parents d4a2f7e + f76f7b6 commit d9edd2d
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 4 deletions.
57 changes: 57 additions & 0 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# This workflow is used to run the unittest of pyiron

name: Benchmark

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:
build:

runs-on: ${{ matrix.operating-system }}
strategy:
matrix:
include:
- operating-system: ubuntu-latest
python-version: '3.12'
label: linux-64-py-3-12-openmpi
prefix: /Users/runner/miniconda3/envs/my-env
environment-file: .ci_support/environment-openmpi.yml

- operating-system: ubuntu-latest
python-version: '3.12'
label: linux-64-py-3-12-mpich
prefix: /usr/share/miniconda3/envs/my-env
environment-file: .ci_support/environment-mpich.yml

steps:
- uses: actions/checkout@v2
- uses: conda-incubator/[email protected]
with:
python-version: ${{ matrix.python-version }}
mamba-version: "*"
channels: conda-forge
miniforge-variant: Mambaforge
channel-priority: strict
auto-update-conda: true
environment-file: ${{ matrix.environment-file }}
- name: Test
shell: bash -l {0}
timeout-minutes: 10
run: |
pip install versioneer[toml]==0.29
pip install . --no-deps --no-build-isolation
python tests/benchmark/llh.py static >> timing.log
python tests/benchmark/llh.py process >> timing.log
python tests/benchmark/llh.py thread >> timing.log
mpiexec -n 4 python -m mpi4py.futures tests/benchmark/llh.py mpi4py >> timing.log
python tests/benchmark/llh.py pympipool >> timing.log
cat timing.log
python -m unittest tests/benchmark/test_results.py
env:
OMPI_MCA_plm: 'isolated'
OMPI_MCA_rmaps_base_oversubscribe: 'yes'
OMPI_MCA_btl_vader_single_copy_mechanism: 'none'
11 changes: 7 additions & 4 deletions pympipool/shared/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,13 @@ def generate_command(self, command_lst):


def generate_mpiexec_command(cores, oversubscribe=False):
command_prepend_lst = [MPI_COMMAND, "-n", str(cores)]
if oversubscribe:
command_prepend_lst += ["--oversubscribe"]
return command_prepend_lst
if cores == 1:
return []
else:
command_prepend_lst = [MPI_COMMAND, "-n", str(cores)]
if oversubscribe:
command_prepend_lst += ["--oversubscribe"]
return command_prepend_lst


def generate_slurm_command(
Expand Down
47 changes: 47 additions & 0 deletions tests/benchmark/llh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import sys
from time import time


def llh_numpy(mean, sigma):
import numpy
data = numpy.random.normal(size=100000000).astype('float64')
s = (data - mean) ** 2 / (2 * (sigma ** 2))
pdfs = numpy.exp(- s)
pdfs /= numpy.sqrt(2 * numpy.pi) * sigma
return numpy.log(pdfs).sum()


def run_with_executor(executor=None, mean=0.1, sigma=1.1, runs=32, **kwargs):
with executor(**kwargs) as exe:
future_lst = [exe.submit(llh_numpy, mean=mean, sigma=sigma) for i in range(runs)]
return [f.result() for f in future_lst]


def run_static(mean=0.1, sigma=1.1, runs=32):
return [llh_numpy(mean=mean, sigma=sigma) for i in range(runs)]


if __name__ == "__main__":
run_mode = sys.argv[1]
start_time = time()
if run_mode == "static":
run_static(mean=0.1, sigma=1.1, runs=32)
elif run_mode == "process":
from concurrent.futures import ProcessPoolExecutor
run_with_executor(executor=ProcessPoolExecutor, mean=0.1, sigma=1.1, runs=32, max_workers=4)
elif run_mode == "thread":
from concurrent.futures import ThreadPoolExecutor
run_with_executor(executor=ThreadPoolExecutor, mean=0.1, sigma=1.1, runs=32, max_workers=4)
elif run_mode == "pympipool":
from pympipool.mpi.executor import PyMPIExecutor
run_with_executor(executor=PyMPIExecutor, mean=0.1, sigma=1.1, runs=32, max_workers=4)
elif run_mode == "flux":
from pympipool.flux.executor import PyFluxExecutor
run_with_executor(executor=PyFluxExecutor, mean=0.1, sigma=1.1, runs=32, max_workers=4)
elif run_mode == "mpi4py":
from mpi4py.futures import MPIPoolExecutor
run_with_executor(executor=MPIPoolExecutor, mean=0.1, sigma=1.1, runs=32, max_workers=4)
else:
raise ValueError(run_mode)
stop_time = time()
print(run_mode, stop_time-start_time)
17 changes: 17 additions & 0 deletions tests/benchmark/test_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import unittest


class TestResults(unittest.TestCase):
def test_result(self):
with open("timing.log") as f:
content = f.readlines()
timing_dict = {l.split()[0]: float(l.split()[1]) for l in content}
self.assertEqual(min(timing_dict, key=timing_dict.get), "process")
self.assertEqual(max(timing_dict, key=timing_dict.get), "static")
self.assertTrue(timing_dict["process"] < timing_dict["pympipool"])
self.assertTrue(timing_dict["pympipool"] < timing_dict["process"] * 1.1)
self.assertTrue(timing_dict["process"] < timing_dict["mpi4py"])
self.assertTrue(timing_dict["pympipool"] < timing_dict["mpi4py"])
self.assertTrue(timing_dict["mpi4py"] < timing_dict["process"] * 1.15)
self.assertTrue(timing_dict["thread"] < timing_dict["static"])
self.assertTrue(timing_dict["mpi4py"] < timing_dict["thread"])

0 comments on commit d9edd2d

Please sign in to comment.