From 3f07a87d9fc2e1946d473f8299916683c39b31f9 Mon Sep 17 00:00:00 2001
From: Kacper Derlatka <51274280+Delcior@users.noreply.github.com>
Date: Fri, 22 Mar 2024 10:03:52 +0100
Subject: [PATCH] add support for threading and for user-specified MPI
dimension. Closes #35 (#98)
* mpi: outer, threads:2
* add TODO ids
* add TODO label (3D support)
* skipping multi-threading tests if JIT disabled
* add pylint-disable comment
* print number of cores on workers
* log info about supported plotting options
* debug: print num_threads per worker
* debug: omit pylint
* debug: omit pylint
* hardcore threads
* fix issue number
* mpi_dim as scenarion ctor argument; new test for mpi_indices; more readable code around send/recv tags for threading
* pylint fixes
* pylint fixes ++
* pylint disable too-many-args for whole file
* check thread number every time
* increase workflow timeout
* check if the 3rd-order-terms issue indeed is present?
* indeed 3rd-order terms cause problems with mpi_dim=INNER :(
* increase timeout to 120min
* remove 4th worker from the test
* increase timeout to 60min
* updates to README to reflect threading support changes
* removing debug leftover
---------
Co-authored-by: Sylwester Arabas
- - - - -
- ## Package architecture ```mermaid @@ -143,7 +137,7 @@ licence: [GPL v3](https://www.gnu.org/licenses/gpl-3.0.html) - MPI support for PyMPDATA implemented externally (i.e., not incurring any overhead or additional dependencies for PyMPDATA users) - MPI calls within Numba njitted code (hence not using `mpi4py`, but leveraging `numba-mpi`) -- hybrid threading (internal in PyMPDATA, in the inner dimension) + MPI (outer dimension) parallelisation +- hybrid domain decomposition parallelisation: threading (internal in PyMPDATA, in the inner dimension) + MPI (either inner or outer dimension) - portability across major OSes (currently Linux & macOS; no Windows support due [challenges in getting HDF5/MPI-IO to work there](https://docs.h5py.org/en/stable/build.html#source-installation-on-windows)) - full test coverage including CI builds asserting on same results with multi-node vs. single-node computations - Continuous Integration with different OSes and different MPI implementation diff --git a/scenarios/_scenario.py b/scenarios/_scenario.py index a1a75d6..24ba10a 100644 --- a/scenarios/_scenario.py +++ b/scenarios/_scenario.py @@ -1,17 +1,20 @@ -# pylint: disable=too-few-public-methods """ Provides base _Scenario base class that every scenario should inherit """ + from PyMPDATA import Solver +from PyMPDATA.impl.enumerations import INNER, OUTER -class _Scenario: +class _Scenario: # pylint: disable=too-few-public-methods """Base class for every Scenario. Provides logic for advance() function""" - def __init__(self, *, stepper, advectee, advector, g_factor=None): + # pylint: disable=too-many-arguments + def __init__(self, *, mpi_dim, stepper, advectee, advector, g_factor=None): + self.mpi_dim = mpi_dim self.solver = Solver( stepper=stepper, advectee=advectee, advector=advector, g_factor=g_factor ) - def advance(self, dataset, output_steps, x_range): + def advance(self, dataset, output_steps, mpi_range): """Logic for performing simulation. Returns wall time of one timestep (in clock ticks)""" steps_done = 0 wall_time = 0 @@ -21,5 +24,12 @@ def advance(self, dataset, output_steps, x_range): wall_time_per_timestep = self.solver.advance(n_steps=n_steps) wall_time += wall_time_per_timestep * n_steps steps_done += n_steps - dataset[x_range, :, index] = self.solver.advectee.get() + data = self.solver.advectee.get() + dataset[ + ( + mpi_range if self.mpi_dim == OUTER else slice(None), + mpi_range if self.mpi_dim == INNER else slice(None), + slice(index, index + 1), + ) + ] = data.reshape((data.shape[0], data.shape[1], 1)) return wall_time diff --git a/scenarios/cartesian.py b/scenarios/cartesian.py index 2f129b4..cdeab96 100644 --- a/scenarios/cartesian.py +++ b/scenarios/cartesian.py @@ -4,6 +4,7 @@ from matplotlib import pyplot from PyMPDATA import ScalarField, Stepper, VectorField from PyMPDATA.boundary_conditions import Periodic +from PyMPDATA.impl.enumerations import INNER, OUTER from PyMPDATA_MPI.domain_decomposition import mpi_indices from PyMPDATA_MPI.mpi_periodic import MPIPeriodic @@ -24,16 +25,22 @@ def __init__( # pylint: disable=too-many-arguments rank, size, courant_field_multiplier, + mpi_dim, ): # pylint: disable=too-many-locals, invalid-name halo = mpdata_options.n_halo - xi, yi = mpi_indices(grid, rank, size) - nx, ny = xi.shape + xyi = mpi_indices(grid=grid, rank=rank, size=size, mpi_dim=mpi_dim) + nx, ny = xyi[mpi_dim].shape - boundary_conditions = (MPIPeriodic(size=size), Periodic()) + mpi_periodic = MPIPeriodic(size=size, mpi_dim=mpi_dim) + periodic = Periodic() + boundary_conditions = ( + mpi_periodic if mpi_dim == OUTER else periodic, + mpi_periodic if mpi_dim == INNER else periodic, + ) advectee = ScalarField( - data=self.initial_condition(xi, yi, grid), + data=self.initial_condition(*xyi, grid), halo=mpdata_options.n_halo, boundary_conditions=boundary_conditions, ) @@ -52,11 +59,16 @@ def __init__( # pylint: disable=too-many-arguments n_threads=n_threads, left_first=tuple([rank % 2 == 0] * 2), # TODO #70 (see also https://github.com/open-atmos/PyMPDATA/issues/386) - buffer_size=((ny + 2 * halo) * halo) + buffer_size=( + (ny if mpi_dim == OUTER else nx + 2 * halo) * halo + ) # TODO #38 support for 3D domain * 2 # for temporary send/recv buffer on one side - * 2, # for complex dtype + * 2 # for complex dtype + * (2 if mpi_dim == OUTER else n_threads), + ) + super().__init__( + mpi_dim=mpi_dim, stepper=stepper, advectee=advectee, advector=advector ) - super().__init__(stepper=stepper, advectee=advectee, advector=advector) @staticmethod def initial_condition(xi, yi, grid): diff --git a/scenarios/spherical.py b/scenarios/spherical.py index faf4649..7264cd3 100644 --- a/scenarios/spherical.py +++ b/scenarios/spherical.py @@ -87,7 +87,15 @@ class SphericalScenario(_Scenario): """ def __init__( # pylint: disable=too-many-arguments - self, *, mpdata_options, n_threads, grid, rank, size, courant_field_multiplier + self, + *, + mpi_dim, + mpdata_options, + n_threads, + grid, + rank, + size, + courant_field_multiplier, ): # pylint: disable=too-many-locals,invalid-name self.settings = WilliamsonAndRasch89Settings( @@ -96,7 +104,7 @@ def __init__( # pylint: disable=too-many-arguments output_steps=range(0, 5120 // 3, 100), # original: 5120 ) - xi, _ = mpi_indices(grid, rank, size) + xi, _ = mpi_indices(grid=grid, rank=rank, size=size, mpi_dim=mpi_dim) mpi_nlon, mpi_nlat = xi.shape assert size == 1 or mpi_nlon < self.settings.nlon @@ -105,8 +113,8 @@ def __init__( # pylint: disable=too-many-arguments assert x0 == xi[0, 0] boundary_conditions = ( - MPIPeriodic(size=size), - MPIPolar(mpi_grid=(mpi_nlon, mpi_nlat), grid=grid), + MPIPeriodic(size=size, mpi_dim=mpi_dim), + MPIPolar(mpi_grid=(mpi_nlon, mpi_nlat), grid=grid, mpi_dim=mpi_dim), ) advector_x = courant_field_multiplier[0] * np.array( @@ -179,7 +187,11 @@ def __init__( # pylint: disable=too-many-arguments * 2, # for complex dtype ) super().__init__( - stepper=stepper, advectee=advectee, advector=advector, g_factor=g_factor + mpi_dim=mpi_dim, + stepper=stepper, + advectee=advectee, + advector=advector, + g_factor=g_factor, ) def quick_look(self, state): diff --git a/tests/local/contract_tests/test_single_vs_multi_node.py b/tests/local/contract_tests/test_single_vs_multi_node.py index 04988a9..14924c6 100644 --- a/tests/local/contract_tests/test_single_vs_multi_node.py +++ b/tests/local/contract_tests/test_single_vs_multi_node.py @@ -4,12 +4,14 @@ import shutil from pathlib import Path -import mpi4py +import numba import numba_mpi as mpi import numpy as np import pytest from matplotlib import pyplot +from mpi4py import MPI from PyMPDATA import Options +from PyMPDATA.impl.enumerations import INNER, OUTER from PyMPDATA_MPI.domain_decomposition import subdomain from PyMPDATA_MPI.hdf_storage import HDFStorage @@ -23,32 +25,35 @@ {"n_iters": 3}, ) -COURANT_FIELD_MULTIPLIER = ( - (0.5, 0.25), - (-0.5, 0.25), - (0.5, -0.25), - (-0.5, -0.25), -) +COURANT_FIELD_MULTIPLIER = ((0.5, 0.25), (-0.5, 0.25), (0.5, -0.25), (-0.5, -0.25)) + +CARTESIAN_OUTPUT_STEPS = range(0, 2, 1) + +SPHERICAL_OUTPUT_STEPS = range(0, 2000, 100) @pytest.mark.parametrize( - "scenario_class, output_steps", + "scenario_class, output_steps, n_threads", ( - (CartesianScenario, range(0, 24, 2)), - (SphericalScenario, range(0, 2000, 100)), + (CartesianScenario, CARTESIAN_OUTPUT_STEPS, 1), + (CartesianScenario, CARTESIAN_OUTPUT_STEPS, 2), + (CartesianScenario, CARTESIAN_OUTPUT_STEPS, 3), + (SphericalScenario, SPHERICAL_OUTPUT_STEPS, 1), # TODO #56 ), ) @pytest.mark.parametrize("options_kwargs", OPTIONS_KWARGS) -@pytest.mark.parametrize("n_threads", (1,)) # TODO #35 : 2+ @pytest.mark.parametrize("courant_field_multiplier", COURANT_FIELD_MULTIPLIER) -def test_single_vs_multi_node( # pylint: disable=too-many-arguments +@pytest.mark.parametrize("mpi_dim", (INNER, OUTER)) +def test_single_vs_multi_node( # pylint: disable=too-many-arguments,too-many-branches,too-many-statements + *, + mpi_dim, scenario_class, mpi_tmp_path_fixed, options_kwargs, n_threads, courant_field_multiplier, output_steps, - grid=(64, 32), + grid=(64, 32), # TODO #101 ): """ Test is divided into three logical stages. @@ -57,7 +62,6 @@ def test_single_vs_multi_node( # pylint: disable=too-many-arguments Each iteration uses different domain decomposition. Last stage is responsible for comparing results to ground truth (which is simulation performed on single node environment) - """ # pylint: disable=too-many-locals if scenario_class is SphericalScenario and options_kwargs["n_iters"] > 1: @@ -66,6 +70,18 @@ def test_single_vs_multi_node( # pylint: disable=too-many-arguments if scenario_class is SphericalScenario and mpi.size() > 2: pytest.skip("TODO #56") + if scenario_class is SphericalScenario and mpi_dim == INNER: + pytest.skip("TODO #56") + + if n_threads > 1 and options_kwargs.get("nonoscillatory", False): + pytest.skip("TODO #99") + + if mpi_dim == INNER and options_kwargs.get("third_order_terms", False): + pytest.skip("TODO #102") + + if n_threads > 1 and numba.config.DISABLE_JIT: # pylint: disable=no-member + pytest.skip("threading requires Numba JIT to be enabled") + plot = True and ( "CI_PLOTS_PATH" in os.environ and courant_field_multiplier == COURANT_FIELD_MULTIPLIER[0] @@ -73,7 +89,6 @@ def test_single_vs_multi_node( # pylint: disable=too-many-arguments options_kwargs == OPTIONS_KWARGS[-1] or scenario_class is SphericalScenario ) ) - # arrange options_str = ( str(options_kwargs) @@ -93,6 +108,7 @@ def test_single_vs_multi_node( # pylint: disable=too-many-arguments dataset_name = "test" # act + numba.set_num_threads(n_threads) for mpi_max_size, path in paths.items(): truncated_size = min(mpi_max_size, mpi.size()) rank = mpi.rank() @@ -122,11 +138,12 @@ def test_single_vs_multi_node( # pylint: disable=too-many-arguments ) with Storage.mpi_context( - path, "r+", mpi4py.MPI.COMM_WORLD.Split(rank < truncated_size, rank) + path, "r+", MPI.COMM_WORLD.Split(rank < truncated_size, rank) ) as storage: dataset = setup_dataset_and_sync_all_workers(storage, dataset_name) if rank < truncated_size: simulation = scenario_class( + mpi_dim=mpi_dim, mpdata_options=Options(**options_kwargs), n_threads=n_threads, grid=grid, @@ -134,16 +151,18 @@ def test_single_vs_multi_node( # pylint: disable=too-many-arguments size=truncated_size, courant_field_multiplier=courant_field_multiplier, ) - x_range = slice(*subdomain(grid[0], rank, truncated_size)) + mpi_range = slice( + *subdomain(grid[simulation.mpi_dim], rank, truncated_size) + ) - simulation.advance(dataset, output_steps, x_range) + simulation.advance(dataset, output_steps, mpi_range) # plot if plot: tmp = np.empty_like(dataset[:, :, -1]) for i, _ in enumerate(output_steps): tmp[:] = np.nan - tmp[x_range, :] = dataset[x_range, :, i] + tmp[:, mpi_range] = dataset[:, mpi_range, i] simulation.quick_look(tmp) filename = f"step={i:04d}.svg" pyplot.savefig(plot_path / filename) diff --git a/tests/local/unit_tests/test_domain_decomposition.py b/tests/local/unit_tests/test_domain_decomposition.py new file mode 100644 index 0000000..7e84bec --- /dev/null +++ b/tests/local/unit_tests/test_domain_decomposition.py @@ -0,0 +1,33 @@ +""" +tests for domain decomposition utilities +""" + +import pytest +from PyMPDATA.impl.enumerations import INNER, OUTER + +from PyMPDATA_MPI.domain_decomposition import mpi_indices + + +@pytest.mark.parametrize( + "grid, rank, size, mpi_dim, expected", + ( + # size=1 + ((2, 2), 0, 1, OUTER, [[[0, 0], [1, 1]], [[0, 1], [0, 1]]]), + ((2, 2), 0, 1, INNER, [[[0, 0], [1, 1]], [[0, 1], [0, 1]]]), + # size=2 + ((2, 2), 0, 2, OUTER, [[[0, 0]], [[0, 1]]]), + ((2, 2), 1, 2, OUTER, [[[1, 1]], [[0, 1]]]), + ((2, 2), 0, 2, INNER, [[[0], [1]], [[0], [0]]]), + ((2, 2), 1, 2, INNER, [[[0], [1]], [[1], [1]]]), + ), +) +def test_mpi_indices(grid, rank, size, mpi_dim, expected): + """tests the subdomain-aware index-generation logic""" + # arrange + sut = mpi_indices + + # act + xyi = sut(grid=grid, rank=rank, size=size, mpi_dim=mpi_dim) + + # assert + assert (xyi == expected).all() diff --git a/tests/local/unit_tests/test_simulation.py b/tests/local/unit_tests/test_simulation.py deleted file mode 100644 index c049e2e..0000000 --- a/tests/local/unit_tests/test_simulation.py +++ /dev/null @@ -1,21 +0,0 @@ -# pylint: disable=missing-module-docstring,missing-function-docstring,missing-class-docstring,invalid-name - -import pytest - -from PyMPDATA_MPI.domain_decomposition import mpi_indices - - -@pytest.mark.parametrize( - "grid, rank, size, expected", - ( - ((2, 3), 0, 1, [[0.0, 0.0, 0.0], [1.0, 1.0, 1.0]]), - ((2, 3), 0, 2, [[0.0, 0.0, 0.0]]), - ((2, 3), 1, 2, [[1.0, 1.0, 1.0]]), - ((3, 2), 0, 1, [[0.0, 0.0], [1.0, 1.0], [2.0, 2.0]]), - ((3, 2), 0, 2, [[0.0, 0.0], [1.0, 1.0]]), - ((3, 2), 1, 2, [[2.0, 2.0]]), - ), -) -def test_mpi_indices(grid, rank, size, expected): - xi, _ = mpi_indices(grid, rank, size) - assert (xi == expected).all()