Skip to content

Commit

Permalink
Merge pull request #222 from mabarnes/fix-anyv
Browse files Browse the repository at this point in the history
Fix anyv parallelisation
  • Loading branch information
johnomotani authored Jun 11, 2024
2 parents 6bf4ea1 + f57f3a7 commit 456f221
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 26 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/parallel_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ jobs:
version: '1.10'
- uses: julia-actions/cache@v1
- run: |
MPILIBPATH=$(find /opt/homebrew/Cellar/open-mpi/ -name libmpi.dylib)
export MPILIBPATH=$(find /opt/homebrew/Cellar/open-mpi/ -name libmpi.dylib)
touch Project.toml
julia --project -O3 --check-bounds=no -e 'import Pkg; Pkg.add(["MPI", "MPIPreferences"]); using MPIPreferences; MPIPreferences.use_system_binary(library_names="/opt/homebrew/Cellar/open-mpi/5.0.3/lib/libmpi.dylib")'
julia --project -O3 --check-bounds=no -e "import Pkg; Pkg.add([\"MPI\", \"MPIPreferences\"]); using MPIPreferences; MPIPreferences.use_system_binary(library_names=\"$MPILIBPATH\")"
julia --project -O3 --check-bounds=no -e 'import Pkg; Pkg.add(["NCDatasets", "Random", "SpecialFunctions", "Test"]); Pkg.develop(path="moment_kinetics/")'
julia --project -O3 --check-bounds=no -e 'import Pkg; Pkg.precompile()'
# Need to use openmpi so that the following arguments work:
Expand Down
36 changes: 32 additions & 4 deletions moment_kinetics/src/array_allocation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ using ..communication
using ..debugging
@debug_initialize_NaN using ..communication: block_rank, _block_synchronize

using MPI

"""
allocate array with dimensions given by dims and entries of type Bool
"""
Expand Down Expand Up @@ -55,14 +57,27 @@ function allocate_shared_float(dims...; comm=nothing)
array = allocate_shared(mk_float, dims; comm=comm)
@debug_initialize_NaN begin
# Initialize as NaN to try and catch use of uninitialized values
if block_rank[] == 0
if comm === nothing
comm_rank = block_rank[]
this_comm = comm_block[]
elseif comm == MPI.COMM_NULL
comm_rank = -1
this_comm = nothing
else
# Get MPI.Comm_rank when comm is not nothing
comm_rank = MPI.Comm_rank(comm)
this_comm = comm
end
if comm_rank == 0
array .= NaN
@debug_track_initialized begin
# Track initialization as if the array was not initialized to NaN
array.is_initialized .= false
end
end
_block_synchronize()
if this_comm !== nothing
MPI.Barrier(this_comm)
end
end
return array
end
Expand All @@ -85,14 +100,27 @@ function allocate_shared_complex(dims...; comm=nothing)
array = allocate_shared(Complex{mk_float}, dims; comm=comm)
@debug_initialize_NaN begin
# Initialize as NaN to try and catch use of uninitialized values
if block_rank[] == 0
if comm === nothing
comm_rank = block_rank[]
this_comm = comm_block[]
elseif comm == MPI.COMM_NULL
comm_rank = -1
this_comm = nothing
else
# Get MPI.Comm_rank when comm is not nothing
comm_rank = MPI.Comm_rank(comm)
this_comm = comm
end
if comm_rank == 0
array .= NaN
@debug_track_initialized begin
# Track initialization as if the array was not initialized to NaN
array.is_initialized .= false
end
end
_block_synchronize()
if this_comm !== nothing
MPI.Barrier(this_comm)
end
end
return array
end
Expand Down
47 changes: 40 additions & 7 deletions moment_kinetics/src/communication.jl
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ const anyv_subblock_size = Ref{mk_int}()

"""
"""
const anyv_isubblock_index = Ref{mk_int}()
const anyv_isubblock_index = Ref{Union{mk_int,Nothing}}()

"""
"""
Expand Down Expand Up @@ -531,6 +531,11 @@ end
# instances, so their is_read and is_written members can be checked and
# reset by _block_synchronize()
const global_debugmpisharedarray_store = Vector{DebugMPISharedArray}(undef, 0)
# 'anyv' regions require a separate array store, because within an anyv region,
# processes in the same shared memory block may still not be synchronized if they are
# in different anyv sub-blocks, so debug checks within an anyv region should only
# consider the anyv-specific arrays.
const global_anyv_debugmpisharedarray_store = Vector{DebugMPISharedArray}(undef, 0)
end

"""
Expand Down Expand Up @@ -567,6 +572,19 @@ Array{mk_float}
function allocate_shared(T, dims; comm=nothing, maybe_debug=true)
if comm === nothing
comm = comm_block[]
elseif comm == MPI.COMM_NULL
# If `comm` is a null communicator (on this process), then this array is just a
# dummy that will not be used.
array = Array{T}(undef, (0 for _ dims)...)

@debug_shared_array begin
# If @debug_shared_array is active, create DebugMPISharedArray instead of Array
if maybe_debug
array = DebugMPISharedArray(array, comm)
end
end

return array
end
br = MPI.Comm_rank(comm)
bs = MPI.Comm_size(comm)
Expand Down Expand Up @@ -633,7 +651,11 @@ function allocate_shared(T, dims; comm=nothing, maybe_debug=true)
# If @debug_shared_array is active, create DebugMPISharedArray instead of Array
if maybe_debug
debug_array = DebugMPISharedArray(array, comm)
push!(global_debugmpisharedarray_store, debug_array)
if comm == comm_anyv_subblock[]
push!(global_anyv_debugmpisharedarray_store, debug_array)
else
push!(global_debugmpisharedarray_store, debug_array)
end
return debug_array
end
end
Expand Down Expand Up @@ -762,11 +784,17 @@ end
Raises an error if any array has been accessed incorrectly since the previous call
to _block_synchronize()
Can be added when debugging to help in down where an error occurs.
Can be added when debugging to help pin down where an error occurs.
"""
function debug_check_shared_memory(; kwargs...)
for (arraynum, array) enumerate(global_debugmpisharedarray_store)
debug_check_shared_array(array; kwargs...)
function debug_check_shared_memory(; comm=comm_block[], kwargs...)
if comm == comm_anyv_subblock[]
for (arraynum, array) enumerate(global_anyv_debugmpisharedarray_store)
debug_check_shared_array(array; comm=comm, kwargs...)
end
else
for (arraynum, array) enumerate(global_debugmpisharedarray_store)
debug_check_shared_array(array; comm=comm, kwargs...)
end
end
return nothing
end
Expand Down Expand Up @@ -909,6 +937,10 @@ sub-blocks, depending on how the species and spatial dimensions are split up.
`_anyv_subblock_synchronize()`.
"""
function _anyv_subblock_synchronize()
if comm_anyv_subblock[] == MPI.COMM_NULL
# No synchronization to do for a null communicator
return nothing
end

MPI.Barrier(comm_anyv_subblock[])

Expand Down Expand Up @@ -943,7 +975,7 @@ function _anyv_subblock_synchronize()
# * If an element is written to, only the rank that writes to it should read it.
#
@debug_detect_redundant_block_synchronize previous_was_unnecessary = true
for (arraynum, array) enumerate(global_debugmpisharedarray_store)
for (arraynum, array) enumerate(global_anyv_debugmpisharedarray_store)

debug_check_shared_array(array; comm=comm_anyv_subblock[])

Expand Down Expand Up @@ -1057,6 +1089,7 @@ end
"""
function free_shared_arrays()
@debug_shared_array resize!(global_debugmpisharedarray_store, 0)
@debug_shared_array resize!(global_anyv_debugmpisharedarray_store, 0)

for w global_Win_store
MPI.free(w)
Expand Down
8 changes: 6 additions & 2 deletions moment_kinetics/src/fokker_planck.jl
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,9 @@ function conserving_corrections!(CC,pdf_in,vpa,vperp,dummy_vpavperp)

# Broadcast x0, x1, x2 to all processes in the 'anyv' subblock
param_vec = [x0, x1, x2, upar]
MPI.Bcast!(param_vec, 0, comm_anyv_subblock[])
if comm_anyv_subblock[] != MPI.COMM_NULL
MPI.Bcast!(param_vec, 0, comm_anyv_subblock[])
end
(x0, x1, x2, upar) = param_vec

# correct CC
Expand Down Expand Up @@ -695,7 +697,9 @@ function density_conserving_correction!(CC,pdf_in,vpa,vperp,dummy_vpavperp)

# Broadcast x0 to all processes in the 'anyv' subblock
param_vec = [x0]
MPI.Bcast!(param_vec, 0, comm_anyv_subblock[])
if comm_anyv_subblock[] != MPI.COMM_NULL
MPI.Bcast!(param_vec, 0, comm_anyv_subblock[])
end
x0 = param_vec[1]

# correct CC
Expand Down
9 changes: 7 additions & 2 deletions moment_kinetics/src/looping.jl
Original file line number Diff line number Diff line change
Expand Up @@ -471,14 +471,19 @@ eval(quote
anyv_subblock_size[] = anyv_split[end]
number_of_anyv_blocks = prod(anyv_split[1:end-1])
anyv_subblock_index = block_rank[] ÷ anyv_subblock_size[]
anyv_rank_within_subblock = block_rank[] % anyv_subblock_size[]
if anyv_subblock_index number_of_anyv_blocks
anyv_subblock_index = nothing
anyv_rank_within_subblock = -1
else
anyv_rank_within_subblock = block_rank[] % anyv_subblock_size[]
end

# Create communicator for the anyv subblock. OK to do this here as
# communication.setup_distributed_memory_MPI() must have already been called
# to set block_size[] and block_rank[]
comm_anyv_subblock[] = MPI.Comm_split(comm_block[], anyv_subblock_index,
anyv_rank_within_subblock)
anyv_subblock_rank[] = MPI.Comm_rank(comm_anyv_subblock[])
anyv_subblock_rank[] = anyv_rank_within_subblock
anyv_isubblock_index[] = anyv_subblock_index
anyv_nsubblocks_per_block[] = number_of_anyv_blocks
anyv_rank0 = (anyv_subblock_rank[] == 0)
Expand Down
13 changes: 8 additions & 5 deletions moment_kinetics/test/fokker_planck_tests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ function create_grids(ngrid,nelement_vpa,nelement_vperp;
#println("made inputs")
#println("vpa: ngrid: ",ngrid," nelement: ",nelement_local_vpa, " Lvpa: ",Lvpa)
#println("vperp: ngrid: ",ngrid," nelement: ",nelement_local_vperp, " Lvperp: ",Lvperp)
vpa, vpa_spectral = define_coordinate(vpa_input)
vperp, vperp_spectral = define_coordinate(vperp_input)

# Set up MPI
initialize_comms!()
setup_distributed_memory_MPI(1,1,1,1)
vpa, vpa_spectral = define_coordinate(vpa_input)
vperp, vperp_spectral = define_coordinate(vperp_input)
looping.setup_loop_ranges!(block_rank[], block_size[];
s=1, sn=1,
r=1, z=1, vperp=vperp.n, vpa=vpa.n,
Expand Down Expand Up @@ -279,7 +279,8 @@ function runtests()
calculate_GG=true, calculate_dGdvperp=true)
# extract C[Fs,Fs'] result
# and Rosenbluth potentials for testing
begin_vperp_vpa_region()
begin_s_r_z_anyv_region()
begin_anyv_vperp_vpa_region()
@loop_vperp_vpa ivperp ivpa begin
G_M_num[ivpa,ivperp] = fkpl_arrays.GG[ivpa,ivperp]
H_M_num[ivpa,ivperp] = fkpl_arrays.HH[ivpa,ivperp]
Expand Down Expand Up @@ -419,7 +420,8 @@ function runtests()
conserving_corrections!(fkpl_arrays.CC,Fs_M,vpa,vperp,dummy_array)
end
# extract C[Fs,Fs'] result
begin_vperp_vpa_region()
begin_s_r_z_anyv_region()
begin_anyv_vperp_vpa_region()
@loop_vperp_vpa ivperp ivpa begin
C_M_num[ivpa,ivperp] = fkpl_arrays.CC[ivpa,ivperp]
end
Expand Down Expand Up @@ -579,7 +581,8 @@ function runtests()
density_conserving_correction!(fkpl_arrays.CC,Fs_M,vpa,vperp,dummy_array)
end
# extract C[Fs,Fs'] result
begin_vperp_vpa_region()
begin_s_r_z_anyv_region()
begin_anyv_vperp_vpa_region()
@loop_vperp_vpa ivperp ivpa begin
C_M_num[ivpa,ivperp] = fkpl_arrays.CC[ivpa,ivperp]
end
Expand Down
8 changes: 4 additions & 4 deletions test_scripts/2D_FEM_assembly_test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ end
println("made inputs")
println("vpa: ngrid: ",ngrid," nelement: ",nelement_local_vpa, " Lvpa: ",Lvpa)
println("vperp: ngrid: ",ngrid," nelement: ",nelement_local_vperp, " Lvperp: ",Lvperp)
vpa, vpa_spectral = define_coordinate(vpa_input)
vperp, vperp_spectral = define_coordinate(vperp_input)

# Set up MPI
if standalone
initialize_comms!()
end
setup_distributed_memory_MPI(1,1,1,1)
vpa, vpa_spectral = define_coordinate(vpa_input)
vperp, vperp_spectral = define_coordinate(vperp_input)
looping.setup_loop_ranges!(block_rank[], block_size[];
s=1, sn=1,
r=1, z=1, vperp=vperp.n, vpa=vpa.n,
Expand Down Expand Up @@ -228,7 +228,6 @@ end
ms = 1.0
msp = 1.0
nussp = 1.0
begin_serial_region()
for ivperp in 1:vperp.n
for ivpa in 1:vpa.n
Fs_M[ivpa,ivperp] = F_Maxwellian(denss,upars,vths,vpa,vperp,ivpa,ivperp)
Expand Down Expand Up @@ -280,7 +279,8 @@ end
algebraic_solve_for_d2Gdvperp2=false,calculate_GG=true,calculate_dGdvperp=true)
# extract C[Fs,Fs'] result
# and Rosenbluth potentials for testing
begin_vperp_vpa_region()
begin_s_r_z_anyv_region()
begin_anyv_vperp_vpa_region()
@loop_vperp_vpa ivperp ivpa begin
C_M_num[ivpa,ivperp] = fkpl_arrays.CC[ivpa,ivperp]
G_M_num[ivpa,ivperp] = fkpl_arrays.GG[ivpa,ivperp]
Expand Down

0 comments on commit 456f221

Please sign in to comment.