Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MpiHandshake to enable XGC-COUPLER-GENE communication pattern #2019

Merged
merged 9 commits into from
Mar 6, 2020
1 change: 1 addition & 0 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ if(ADIOS2_HAVE_MPI)
target_sources(adios2 PRIVATE
core/IOMPI.cpp
helper/adiosCommMPI.h helper/adiosCommMPI.cpp
helper/adiosMpiHandshake.h helper/adiosMpiHandshake.cpp
engine/insitumpi/InSituMPIWriter.cpp engine/insitumpi/InSituMPIWriter.tcc
engine/insitumpi/InSituMPIReader.cpp engine/insitumpi/InSituMPIReader.tcc
engine/insitumpi/InSituMPIFunctions.cpp engine/insitumpi/InSituMPISchedules.cpp
Expand Down
37 changes: 37 additions & 0 deletions source/adios2/engine/ssc/SscHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,43 @@ void PrintMpiInfo(const MpiInfo &writersInfo, const MpiInfo &readersInfo)
std::cout << std::endl;
}

bool GetParameter(const Params &params, const std::string &key, int &value)
{
auto it = params.find(key);
if (it == params.end())
{
return false;
}
else
{
try
{
value = std::stoi(it->second);
}
catch (...)
{
std::string error = "Engine parameter " + key + " can only be integer numbers";
std::cerr << error << std::endl;
return false;
}
}
return true;
}

bool GetParameter(const Params &params, const std::string &key, std::string &value)
{
auto it = params.find(key);
if (it == params.end())
{
return false;
}
else
{
value = it->second;
}
return true;
}

} // end namespace ssc
} // end namespace engine
} // end namespace core
Expand Down
3 changes: 3 additions & 0 deletions source/adios2/engine/ssc/SscHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ void JsonToBlockVecVec(const std::string &input, BlockVecVec &output);

bool AreSameDims(const Dims &a, const Dims &b);

bool GetParameter(const Params &params, const std::string &key, int &value);
bool GetParameter(const Params &params, const std::string &key, std::string &value);

} // end namespace ssc
} // end namespace engine
} // end namespace core
Expand Down
268 changes: 22 additions & 246 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "SscReader.tcc"
#include "adios2/helper/adiosComm.h"
#include "adios2/helper/adiosCommMPI.h"
#include "adios2/helper/adiosFunctions.h"
#include "adios2/helper/adiosJSONcomplex.h"
#include "nlohmann/json.hpp"
Expand All @@ -31,24 +32,11 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,
m_ReaderRank = m_Comm.Rank();
m_ReaderSize = m_Comm.Size();

auto it = m_IO.m_Parameters.find("MpiMode");
if (it != m_IO.m_Parameters.end())
{
m_MpiMode = it->second;
}
it = m_IO.m_Parameters.find("Verbose");
if (it != m_IO.m_Parameters.end())
{
try
{
m_Verbosity = std::stoi(it->second);
}
catch (...)
{
std::cerr << "Engine parameter Verbose can only be integer numbers"
<< std::endl;
}
}
ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength", m_MaxFilenameLength);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", m_RendezvousAppCount);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount", m_RendezvousStreamCount);

m_Buffer.resize(1);

Expand Down Expand Up @@ -116,11 +104,6 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
{
TAU_SCOPED_TIMER_FUNC();

if (m_Verbosity >= 5)
{
std::cout << "SscReader::BeginStep, World Rank " << m_WorldRank
<< ", Reader Rank " << m_ReaderRank << std::endl;
}

if (m_InitialStep)
{
Expand Down Expand Up @@ -153,6 +136,12 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
}
}

if (m_Verbosity >= 5)
{
std::cout << "SscReader::BeginStep, World Rank " << m_WorldRank
<< ", Reader Rank " << m_ReaderRank << ", Step "<< m_CurrentStep << std::endl;
}

if (m_Buffer[0] == 1)
{
return StepStatus::EndOfStream;
Expand Down Expand Up @@ -191,239 +180,29 @@ void SscReader::EndStep()

void SscReader::SyncMpiPattern()
{
TAU_SCOPED_TIMER_FUNC();

if (m_Verbosity >= 5)
{
std::cout << "SscReader::SyncMpiPattern, World Rank " << m_WorldRank
<< ", Reader Rank " << m_ReaderRank << std::endl;
}

TAU_SCOPED_TIMER_FUNC();
if (m_WorldSize == m_ReaderSize)
{
throw(std::runtime_error("no writers are found"));
}

std::vector<int> lrbuf;
std::vector<int> grbuf;

// Process m_WorldRank == 0 to gather all the local rank m_WriterRank, and
// find out all the m_WriterRank == 0
if (m_WorldRank == 0)
{
grbuf.resize(m_WorldSize);
}

MPI_Gather(&m_ReaderRank, 1, MPI_INT, grbuf.data(), 1, MPI_INT, 0,
MPI_COMM_WORLD);
m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength, m_RendezvousAppCount, 'r', m_Name, CommAsMPI(m_Comm) );
m_MpiHandshake.Wait(m_Name);
m_MpiHandshake.PrintMaps();

std::vector<int> AppStart; // m_WorldRank of the local rank 0 process
if (m_WorldRank == 0)
for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name))
{
for (int i = 0; i < m_WorldSize; ++i)
{
if (grbuf[i] == 0)
{
AppStart.push_back(i);
}
}
m_AppSize = AppStart.size();
}

// Each local rank 0 process send their type (0 for writer, 1 for reader) to
// the world rank 0 process The AppStart are re-ordered to put all writers
// ahead of all the readers.
std::vector<int>
AppType; // Vector to record the type of the local rank 0 process
if (m_ReaderRank == 0) // Send type from each local rank 0 process to the
// world rank 0 process
{
if (m_WorldRank == 0) // App_ID
{
AppType.resize(m_AppSize);
for (int i = 0; i < m_AppSize; ++i)
{
if (i == 0)
{
AppType[i] = 1;
;
}
else
{
int tmp = 1;
MPI_Recv(&tmp, 1, MPI_INT, AppStart[i], 96, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
AppType[i] = tmp;
}
}
}
else
{
int tmp = 1; // type 1 for reader
MPI_Send(&tmp, 1, MPI_INT, 0, 96, MPI_COMM_WORLD); //
}
}

if (m_WorldRank == 0)
{
std::vector<int> AppWriter;
std::vector<int> AppReader;

for (int i = 0; i < m_AppSize; ++i)
{
if (AppType[i] == 0)
{
AppWriter.push_back(AppStart[i]);
}
else
{
AppReader.push_back(AppStart[i]);
}
}
m_WriterGlobalMpiInfo.resize(AppWriter.size());
m_ReaderGlobalMpiInfo.resize(AppReader.size());
AppStart = AppWriter;
AppStart.insert(AppStart.end(), AppReader.begin(), AppReader.end());
}

// Send the m_AppSize and m_AppID to each local rank 0 process
if (m_ReaderRank == 0) // Send m_AppID to each local rank 0 process
{
if (m_WorldRank == 0) // App_ID
{
for (int i = 0; i < m_AppSize; ++i)
{
MPI_Send(&i, 1, MPI_INT, AppStart[i], 99, MPI_COMM_WORLD); //
}
}
else
{
MPI_Recv(&m_AppID, 1, MPI_INT, 0, 99, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
}
}

m_Comm.Bcast(&m_AppID, sizeof(int),
0); // Local rank 0 process broadcast the m_AppID within the
// local communicator.

MPI_Bcast(&m_AppSize, 1, MPI_INT, 0, MPI_COMM_WORLD); // Bcast the m_AppSize

// In each local communicator, each local rank 0 process gathers the world
// rank of all the rest local processes.
if (m_ReaderRank == 0)
{
lrbuf.resize(m_ReaderSize);
}

m_Comm.Gather(&m_WorldRank, 1, lrbuf.data(), 1, 0);

// Send the WorldRank vector of each local communicator to the m_WorldRank
// == 0 process.
int WriterInfoSize = 0;
int ReaderInfoSize = 0;
if (m_ReaderRank == 0)
{
if (m_WorldRank == 0) // App_ID
{
for (int i = 0; i < m_WriterGlobalMpiInfo.size(); ++i)
{
if (i == 0)
{
m_WriterGlobalMpiInfo[i] = lrbuf;
++WriterInfoSize;
}
else
{
int j_writersize;
MPI_Recv(&j_writersize, 1, MPI_INT, AppStart[i], 96,
MPI_COMM_WORLD, MPI_STATUS_IGNORE); //
++WriterInfoSize;

m_WriterGlobalMpiInfo[i].resize(j_writersize);
MPI_Recv(m_WriterGlobalMpiInfo[i].data(), j_writersize,
MPI_INT, AppStart[i], 98, MPI_COMM_WORLD,
MPI_STATUS_IGNORE); //
}
}

for (int i = m_WriterGlobalMpiInfo.size(); i < m_AppSize; ++i)
{
if (i == 0)
{
m_ReaderGlobalMpiInfo[i] = lrbuf;
++ReaderInfoSize;
}
else
{
int j_readersize;
MPI_Recv(&j_readersize, 1, MPI_INT, AppStart[i], 95,
MPI_COMM_WORLD, MPI_STATUS_IGNORE); //
++ReaderInfoSize;

m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()]
.resize(j_readersize);
MPI_Recv(
m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()]
.data(),
j_readersize, MPI_INT, AppStart[i], 97, MPI_COMM_WORLD,
MPI_STATUS_IGNORE); //
}
}
}
else
{
MPI_Send(&m_ReaderSize, 1, MPI_INT, 0, 95, MPI_COMM_WORLD);
MPI_Send(lrbuf.data(), lrbuf.size(), MPI_INT, 0, 97,
MPI_COMM_WORLD);
}
}

// Broadcast m_WriterGlobalMpiInfo and m_ReaderGlobalMpiInfo to all the
// processes.
MPI_Bcast(&WriterInfoSize, 1, MPI_INT, 0,
MPI_COMM_WORLD); // Broadcast writerinfo size
MPI_Bcast(&ReaderInfoSize, 1, MPI_INT, 0, MPI_COMM_WORLD);

m_WriterGlobalMpiInfo.resize(WriterInfoSize);
m_ReaderGlobalMpiInfo.resize(ReaderInfoSize);

for (int i = 0; i < WriterInfoSize; ++i)
{
int ilen;
if (m_WorldRank == 0)
{
ilen = m_WriterGlobalMpiInfo[i].size();
}
MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD);
m_WriterGlobalMpiInfo[i].resize(ilen);
MPI_Bcast(m_WriterGlobalMpiInfo[i].data(), ilen, MPI_INT, 0,
MPI_COMM_WORLD); // Broadcast readerinfo size
}

for (int i = 0; i < ReaderInfoSize; ++i)
{
int ilen;
if (m_WorldRank == 0)
{
ilen = m_ReaderGlobalMpiInfo[i].size();
}
MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD);
m_ReaderGlobalMpiInfo[i].resize(ilen);
MPI_Bcast(m_ReaderGlobalMpiInfo[i].data(), ilen, MPI_INT, 0,
MPI_COMM_WORLD); // Broadcast readerinfo size
}

for (const auto &app : m_WriterGlobalMpiInfo)
{
for (int rank : app)
for (int rank : app.second)
{
m_AllWriterRanks.push_back(rank);
}
}

for (const auto &app : m_ReaderGlobalMpiInfo)
for (const auto &app : m_MpiHandshake.GetReaderMap(m_Name))
{
for (int rank : app)
for (int rank : app.second)
{
m_AllReaderRanks.push_back(rank);
}
Expand All @@ -434,10 +213,6 @@ void SscReader::SyncMpiPattern()
MPI_Group_incl(worldGroup, m_AllWriterRanks.size(), m_AllWriterRanks.data(),
&m_MpiAllWritersGroup);

if (m_Verbosity >= 10 and m_WorldRank == 0)
{
ssc::PrintMpiInfo(m_WriterGlobalMpiInfo, m_ReaderGlobalMpiInfo);
}
}

void SscReader::SyncWritePattern()
Expand Down Expand Up @@ -687,6 +462,7 @@ void SscReader::DoClose(const int transportIndex)
<< ", Reader Rank " << m_ReaderRank << std::endl;
}
MPI_Win_free(&m_MpiWin);
m_MpiHandshake.Finalize();
}

} // end namespace engine
Expand Down
Loading