Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MPI fix for restarts #369

Merged
merged 1 commit into from
Aug 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ steps:
slurm_nodes: 3
slurm_tasks_per_node: 1

- label: "MPI Checkpointer unit tests"
key: "checkpointer_mpi_tests"
command: "mpiexec julia --color=yes --project=test/ test/mpi_tests/checkpointer_mpi_tests.jl --run_name checkpointer_mpi --job_id checkpointer_mpi"
timeout_in_minutes: 20
env:
CLIMACORE_DISTRIBUTED: "MPI"
agents:
config: cpu
queue: central
slurm_ntasks: 2

- label: "Perf flame graph diff tests"
command: "julia --color=yes --project=perf/ perf/flame_test.jl --run_name flame_test --job_id flame_perf_target"
timeout_in_minutes: 5
Expand Down Expand Up @@ -253,6 +264,9 @@ steps:
slurm_ntasks: 2
slurm_mem: 20GB

- label: "batch script"
command: "sbatch test/mpi_tests/local_checks.sh"

# short high-res performance test
- label: "Unthreaded AMIP FINE" # also reported by longruns with a flame graph
key: "unthreaded_amip_fine"
Expand Down Expand Up @@ -285,3 +299,5 @@ steps:
- build_history staging # name of branch to plot
artifact_paths:
- "build_history.html"


2 changes: 1 addition & 1 deletion experiments/AMIP/modular/coupler_driver_modular.jl
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ cs = CoupledSimulation{FT}(
if restart_dir !== "unspecified"
for sim in cs.model_sims
if get_model_state_vector(sim) !== nothing
restart_model_state!(sim, restart_t; input_dir = restart_dir)
restart_model_state!(sim, comms_ctx, restart_t; input_dir = restart_dir)
end
end
end
Expand Down
23 changes: 14 additions & 9 deletions src/Checkpointer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,25 @@ function checkpoint_model_state(
@info "Saving checkpoint " * Interfacer.name(sim) * " model state to HDF5 on day $day second $sec"
mkpath(joinpath(output_dir, "checkpoint"))
output_file = joinpath(output_dir, "checkpoint", "checkpoint_" * Interfacer.name(sim) * "_$t.hdf5")
hdfwriter = InputOutput.HDF5Writer(output_file, comms_ctx)
InputOutput.HDF5.write_attribute(hdfwriter.file, "time", t)
InputOutput.write!(hdfwriter, Y, "model_state")
Base.close(hdfwriter)
checkpoint_writer = InputOutput.HDF5Writer(output_file, comms_ctx)
InputOutput.HDF5.write_attribute(checkpoint_writer.file, "time", t)
InputOutput.write!(checkpoint_writer, Y, "model_state")
Base.close(checkpoint_writer)
return nothing

end

"""
restart_model_state!(sim::Interfacer.ComponentModelSimulation, t::Int; input_dir = "input")
restart_model_state!(sim::Interfacer.ComponentModelSimulation, comms_ctx::ClimaComms.AbstractCommsContext, t::Int; input_dir = "input")

Sets the model state of a simulation from a HDF5 file from a given time, t (in seconds).
"""
function restart_model_state!(sim::Interfacer.ComponentModelSimulation, t::Int; input_dir = "input")
function restart_model_state!(
sim::Interfacer.ComponentModelSimulation,
comms_ctx::ClimaComms.AbstractCommsContext,
LenkaNovak marked this conversation as resolved.
Show resolved Hide resolved
t::Int;
input_dir = "input",
)
Y = get_model_state_vector(sim)
day = floor(Int, t / (60 * 60 * 24))
sec = floor(Int, t % (60 * 60 * 24))
Expand All @@ -59,9 +64,9 @@ function restart_model_state!(sim::Interfacer.ComponentModelSimulation, t::Int;
@info "Setting " Interfacer.name(sim) " state to checkpoint: $input_file, corresponding to day $day second $sec"

# open file and read
hdfreader = InputOutput.HDF5Reader(input_file)
Y_new = InputOutput.read_field(hdfreader, "model_state")
Base.close(hdfreader)
restart_reader = InputOutput.HDF5Reader(input_file, comms_ctx)
Y_new = InputOutput.read_field(restart_reader, "model_state")
Base.close(restart_reader)

# set new state
Y .= Y_new
Expand Down
1 change: 1 addition & 0 deletions test/Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Downloads = "f43a241f-c20a-4ad4-852c-f6b1247861c6"
IntervalSets = "8197267c-284f-5f27-9208-e0e47529a953"
MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195"
MPIPreferences = "3da0fdf6-3ccc-4f1b-acd9-58baa6c99267"
NCDatasets = "85f8d34a-cbdd-5861-8df4-14fed0d494ab"
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Expand Down
2 changes: 1 addition & 1 deletion test/checkpointer_tests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ end

# new sim run
sim_new = DummySimulation(Fields.FieldVector(T = zeros(boundary_space)))
restart_model_state!(sim_new, t, input_dir = "test_checkpoint")
restart_model_state!(sim_new, comms_ctx, t, input_dir = "test_checkpoint")
@test sim_new.state.T == sim.state.T

# remove checkpoint directory
Expand Down
44 changes: 44 additions & 0 deletions test/mpi_tests/checkpointer_mpi_tests.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#=
Unit tests for ClimaCoupler Checkpointer module functions to exercise MPI

These are in a separate testing file from the other Checkpointer unit tests so
that MPI can be enabled for testing of these functions.
=#

using ClimaCore: Meshes, Domains, Topologies, Spaces, Fields, InputOutput
using ClimaCoupler: TestHelper
using ClimaComms
using Test
import ClimaCoupler: Interfacer
import ClimaCoupler.Checkpointer: get_model_state_vector, restart_model_state!, checkpoint_model_state

# set up MPI communications context
const comms_ctx = ClimaComms.context(ClimaComms.CPUSingleThreaded())
const pid, nprocs = ClimaComms.init(comms_ctx)
@info pid
ClimaComms.barrier(comms_ctx)

FT = Float64
struct DummySimulation{S} <: Interfacer.AtmosModelSimulation
state::S
end
get_model_state_vector(sim::DummySimulation) = sim.state
@testset "checkpoint_model_state, restart_model_state!" begin
boundary_space = TestHelper.create_space(FT, comms_ctx = comms_ctx)
t = 1

# old sim run
sim = DummySimulation(Fields.FieldVector(T = ones(boundary_space)))
checkpoint_model_state(sim, comms_ctx, t, output_dir = "test_checkpoint")

# new sim run
sim_new = DummySimulation(Fields.FieldVector(T = zeros(boundary_space)))
restart_model_state!(sim_new, comms_ctx, t, input_dir = "test_checkpoint")
@test sim_new.state.T == sim.state.T

# remove checkpoint directory
ClimaComms.barrier(comms_ctx)
if ClimaComms.iamroot(comms_ctx)
rm("./test_checkpoint/", force = true, recursive = true)
end
end
41 changes: 41 additions & 0 deletions test/mpi_tests/local_checks.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/bin/bash
#SBATCH --time=24:00:00
#SBATCH --nodes=1
#SBATCH --job-name=mpi_restart_test
#SBATCH --reservation=clima
#SBATCH --mem=32GB
#SBATCH --ntasks=2

module purge
module load julia/1.8.5 openmpi/4.1.1 hdf5/1.12.1-ompi411
export JULIA_MPI_BINARY=system
export JULIA_NUM_THREADS=${SLURM_CPUS_PER_TASK:=1}
export CLIMACORE_DISTRIBUTED="MPI"
export JULIA_HDF5_PATH=""

export RUN_NAME=amip_restart_mpi_test
export RESTART_DIR=experiments/AMIP/modular/output/amip/${RUN_NAME}_artifacts/
export RESTART_T=200

julia -e 'using Pkg; Pkg.add("MPIPreferences"); using MPIPreferences; use_system_binary()'
julia --project -e 'using Pkg; Pkg.instantiate()'
julia --project -e 'using Pkg; Pkg.build("MPI")'
julia --project -e 'using Pkg; Pkg.build("HDF5")'
julia --project -e 'using Pkg; Pkg.API.precompile()'

julia --project=experiments/AMIP/modular/ -e 'using Pkg; Pkg.instantiate(;verbose=true)'
julia --project=experiments/AMIP/modular/ -e 'using Pkg; Pkg.precompile()'
julia --project=experiments/AMIP/modular/ -e 'using Pkg; Pkg.status()'

julia --project=artifacts -e 'using Pkg; Pkg.instantiate(;verbose=true)'
julia --project=artifacts -e 'using Pkg; Pkg.precompile()'
julia --project=artifacts -e 'using Pkg; Pkg.status()'
julia --project=artifacts artifacts/download_artifacts.jl

# run spin up
# - specify `--monthly_checkpoint true` to save monthly checkpoints of all model prognostic states
mpiexec julia --color=yes --project=experiments/AMIP/modular/ experiments/AMIP/modular/coupler_driver_modular.jl --run_name $RUN_NAME --coupled true --start_date 19790101 --monthly_checkpoint true --anim true --surface_setup PrescribedSurface --dt_cpl 200 --energy_check false --mode_name amip --mono_surface false --vert_diff true --moist equil --rad clearsky --precip_model 0M --z_elem 35 --dz_bottom 50 --h_elem 12 --kappa_4 3e16 --rayleigh_sponge true --alpha_rayleigh_uh 0 --dt 200secs --t_end 0.1days --job_id $RUN_NAME --dt_save_to_sol 1000days --dt_save_to_disk 10days --apply_limiter false --FLOAT_TYPE Float64

# init using a restart
# - specify the directory of the `checkpoint/` folder (i.e., `--restart_dir`) and time (in secs; `--restart_t`) of the restart file
mpiexec julia --color=yes --project=experiments/AMIP/modular/ experiments/AMIP/modular/coupler_driver_modular.jl --run_name $RUN_NAME --coupled true --restart_dir $RESTART_DIR --restart_t $RESTART_T --start_date 19790102 --anim true --surface_setup PrescribedSurface --dt_cpl 200 --energy_check false --mode_name amip --mono_surface false --vert_diff true --moist equil --rad clearsky --precip_model 0M --z_elem 35 --dz_bottom 50 --h_elem 12 --kappa_4 3e16 --rayleigh_sponge true --alpha_rayleigh_uh 0 --dt 200secs --t_end 0.1days --job_id $RUN_NAME --dt_save_to_sol 1000days --dt_save_to_disk 10days --apply_limiter false --FLOAT_TYPE Float64