Skip to content

Commit

Permalink
Merge pull request #2452 from eisenhauer/MPMDFix
Browse files Browse the repository at this point in the history
Fix so that MPMD tests run
  • Loading branch information
eisenhauer authored Sep 29, 2020
2 parents 7cd3a84 + c963567 commit 994ae04
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 184 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ option(ADIOS2_BUILD_TESTING "Build the ADIOS2 testing tree"
cmake_dependent_option(ADIOS2_RUN_MPI_MPMD_TESTS
"Enable the parallel MPMD tests" ON
"ADIOS2_BUILD_TESTING;ADIOS2_HAVE_MPI" OFF)
mark_as_advanced(ADIOS2_RUN_MPMD_TESTS)
mark_as_advanced(ADIOS2_RUN_MPI_MPMD_TESTS)

include(CTest)
set(BUILD_TESTING ${ADIOS2_BUILD_TESTING})
Expand Down
13 changes: 12 additions & 1 deletion source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false ||
m_ReaderSelectionsLocked == false)
{
m_AllReceivingWriterRanks.clear();
m_ReceivedRanks.clear();
m_Buffer.resize(1, 0);
m_GlobalWritePattern.clear();
Expand Down Expand Up @@ -463,7 +464,17 @@ void SscReader::SyncReadPattern()

if (m_Verbosity >= 10)
{
ssc::PrintBlockVec(m_LocalReadPattern, "Local Read Pattern");
for (int i = 0; i < m_ReaderSize; ++i)
{
m_Comm.Barrier();
if (i == m_ReaderRank)
{
ssc::PrintBlockVec(m_LocalReadPattern,
"\n\nGlobal Read Pattern on Rank " +
std::to_string(m_ReaderRank));
}
}
m_Comm.Barrier();
}
}

Expand Down
31 changes: 23 additions & 8 deletions source/adios2/engine/ssc/SscReader.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,20 @@ void SscReader::GetDeferredCommon(Variable<T> &variable, T *data)
jref["BufferCount"] = 0;

ssc::JsonToBlockVecVec(m_GlobalWritePatternJson, m_GlobalWritePattern);
size_t oldSize = m_AllReceivingWriterRanks.size();
m_AllReceivingWriterRanks =
ssc::CalculateOverlap(m_GlobalWritePattern, m_LocalReadPattern);
CalculatePosition(m_GlobalWritePattern, m_AllReceivingWriterRanks);
size_t totalDataSize = 0;
for (auto i : m_AllReceivingWriterRanks)
size_t newSize = m_AllReceivingWriterRanks.size();
if (oldSize != newSize)
{
totalDataSize += i.second.second;
}
m_Buffer.resize(totalDataSize);
for (const auto &i : m_AllReceivingWriterRanks)
{
if (m_ReceivedRanks.find(i.first) == m_ReceivedRanks.end())
size_t totalDataSize = 0;
for (auto i : m_AllReceivingWriterRanks)
{
totalDataSize += i.second.second;
}
m_Buffer.resize(totalDataSize);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Win_lock(MPI_LOCK_SHARED, i.first, 0, m_MpiWin);
MPI_Get(m_Buffer.data() + i.second.first,
Expand All @@ -184,6 +186,19 @@ void SscReader::GetDeferredCommon(Variable<T> &variable, T *data)
{
if (b.name == variable.m_Name)
{
bool empty = false;
for (const auto c : b.count)
{
if (c == 0)
{
empty = true;
}
}
if (empty)
{
continue;
}

if (b.shapeId == ShapeID::GlobalArray ||
b.shapeId == ShapeID::LocalArray)
{
Expand Down
20 changes: 12 additions & 8 deletions source/adios2/engine/ssc/SscWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ void SscWriter::SyncMpiPattern()

helper::HandshakeComm(m_Name, 'w', m_OpenTimeoutSecs, CommAsMPI(m_Comm),
streamGroup, writerGroup, m_MpiAllReadersGroup,
m_StreamComm, writerComm, readerComm);
m_StreamComm, writerComm, readerComm, m_Verbosity);
}

void SscWriter::SyncWritePattern(bool finalStep)
Expand Down Expand Up @@ -244,6 +244,11 @@ void SscWriter::SyncWritePattern(bool finalStep)

// deserialize variables metadata
ssc::JsonToBlockVecVec(globalJson, m_GlobalWritePattern);

if (m_Verbosity >= 10 && m_WriterRank == 0)
{
ssc::PrintBlockVecVec(m_GlobalWritePattern, "Global Write Pattern");
}
}

void SscWriter::SyncReadPattern()
Expand Down Expand Up @@ -305,13 +310,6 @@ void SscWriter::SyncReadPattern()
m_ReaderSelectionsLocked = patternJson.get<bool>();
}
}

if (m_Verbosity >= 10)
{
ssc::PrintBlockVecVec(m_GlobalWritePattern, "Global Write Pattern");
ssc::PrintBlockVec(m_GlobalWritePattern[m_WriterRank],
"Local Write Pattern");
}
}

void SscWriter::CalculatePosition(ssc::BlockVecVec &writerVecVec,
Expand Down Expand Up @@ -369,6 +367,12 @@ void SscWriter::DoClose(const int transportIndex)
{
TAU_SCOPED_TIMER_FUNC();

if (m_Verbosity >= 5)
{
std::cout << "SscWriter::DoClose, World Rank " << m_StreamRank
<< ", Writer Rank " << m_WriterRank << std::endl;
}

if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
{
if (m_CurrentStep > 0)
Expand Down
32 changes: 26 additions & 6 deletions source/adios2/helper/adiosMpiHandshake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <chrono>
#include <cstdio>
#include <fstream>
#include <iostream>
#include <sstream>
#include <thread>
#include <unordered_set>

Expand All @@ -25,9 +27,10 @@ void HandshakeComm(const std::string &filename, const char mode,
const int timeoutSeconds, MPI_Comm localComm,
MPI_Group &streamGroup, MPI_Group &writerGroup,
MPI_Group &readerGroup, MPI_Comm &streamComm,
MPI_Comm &writerComm, MPI_Comm &readerComm)
MPI_Comm &writerComm, MPI_Comm &readerComm, int verbosity)
{
auto appRankMaps = HandshakeRank(filename, mode, timeoutSeconds, localComm);
auto appRankMaps =
HandshakeRank(filename, mode, timeoutSeconds, localComm, verbosity);
MPI_Group worldGroup;
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, static_cast<int>(appRankMaps[0].size()),
Expand All @@ -47,10 +50,9 @@ void HandshakeComm(const std::string &filename, const char mode,
#endif
}

const std::vector<std::vector<int>> HandshakeRank(const std::string &filename,
const char mode,
const int timeoutSeconds,
MPI_Comm localComm)
const std::vector<std::vector<int>>
HandshakeRank(const std::string &filename, const char mode,
const int timeoutSeconds, MPI_Comm localComm, int verbosity)
{
std::vector<std::vector<int>> ret(3);

Expand Down Expand Up @@ -211,6 +213,24 @@ const std::vector<std::vector<int>> HandshakeRank(const std::string &filename,
localComm);
}

if (verbosity >= 5)
{
std::stringstream output;
output << "World Rank " << worldRank << ": " << std::endl;
int s = 0;
for (const auto &i : ret)
{
output << " " << s << ": ";
for (const auto &j : i)
{
output << j << ", ";
}
output << std::endl;
++s;
}
std::cout << output.str();
}

return ret;
}

Expand Down
10 changes: 5 additions & 5 deletions source/adios2/helper/adiosMpiHandshake.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@ namespace helper
* for stream *filename*. [1] is the vector of all writer ranks for stream
* *filename*. [2] is the vector of all reader ranks for stream *filename*.
*/
const std::vector<std::vector<int>> HandshakeRank(const std::string &filename,
const char mode,
const int timeoutSeconds,
MPI_Comm localComm);
const std::vector<std::vector<int>>
HandshakeRank(const std::string &filename, const char mode,
const int timeoutSeconds, MPI_Comm localComm, int verbosity = 0);

void HandshakeComm(const std::string &filename, const char mode,
const int timeoutSeconds, MPI_Comm localComm,
MPI_Group &streamGroup, MPI_Group &writerGroup,
MPI_Group &readerGroup, MPI_Comm &streamComm,
MPI_Comm &writerComm, MPI_Comm &readerComm);
MPI_Comm &writerComm, MPI_Comm &readerComm,
int verbosity = 0);

} // end namespace helper
} // end namespace adios2
Expand Down
6 changes: 3 additions & 3 deletions testing/adios2/engine/staging-common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ endforeach()
#
# Setup tests for InSituMPI engine
#
if(ADIOS2_HAVE_MPI AND ADIOS2_RUN_MPMD_TESTS)
if(ADIOS2_HAVE_MPI AND ADIOS2_RUN_MPI_MPMD_TESTS)
set (SIMPLE_IMPI_TESTS "1x1;TimeoutOnOpen;1x1.Modes;1x1.Attrs;1x1.Local;1x1.SharedNothing;1x1.SharedIO;1x1.SharedVar;1x1.SharedNothingSync;1x1.SharedIOSync;1x1.SharedVarSync;2x1.SharedNothing;2x1.SharedIO;2x1.SharedVar;2x1.SharedNothingSync;2x1.SharedIOSync;2x1.SharedVarSync")
set (INSITU_TESTS ${SIMPLE_IMPI_TESTS} ${SIMPLE_FORTRAN_TESTS} ${SIMPLE_MPI_TESTS} ${SIMPLE_ZFP_TESTS})
# Tests that don't work for InSitu
Expand Down Expand Up @@ -194,8 +194,8 @@ if(ADIOS2_HAVE_MPI AND ADIOS2_RUN_MPMD_TESTS)
endforeach()
endif()

if(ADIOS2_HAVE_SSC AND ADIOS2_RUN_MPMD_TESTS)
set (SSC_BASE_TESTS "1x1;1x1.Attrs;2x1;1x2;2x1ZeroDataVar;2x1ZeroDataR64;3x5;5x3;3x5LockGeometry;TimeoutOnOpen")
if(ADIOS2_HAVE_SSC AND ADIOS2_RUN_MPI_MPMD_TESTS)
set (SSC_BASE_TESTS "1x1;1x1.Attrs;2x1;1x2;2x1ZeroDataVar;2x1ZeroDataR64;3x5;5x3;3x5LockGeometry")
set (SSC_TESTS ${SIMPLE_FORTRAN_TESTS} ${SIMPLE_MPI_FORTRAN_TESTS} ${SSC_BASE_TESTS})
foreach(test ${SSC_TESTS})
add_common_test(${test} SSC)
Expand Down
Loading

0 comments on commit 994ae04

Please sign in to comment.