diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index c25c5eb390..94b8bf09f8 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -231,6 +231,7 @@ if(ADIOS2_HAVE_MPI) target_sources(adios2 PRIVATE core/IOMPI.cpp helper/adiosCommMPI.h helper/adiosCommMPI.cpp + helper/adiosMpiHandshake.h helper/adiosMpiHandshake.cpp engine/insitumpi/InSituMPIWriter.cpp engine/insitumpi/InSituMPIWriter.tcc engine/insitumpi/InSituMPIReader.cpp engine/insitumpi/InSituMPIReader.tcc engine/insitumpi/InSituMPIFunctions.cpp engine/insitumpi/InSituMPISchedules.cpp diff --git a/source/adios2/engine/ssc/SscHelper.cpp b/source/adios2/engine/ssc/SscHelper.cpp index 3228b6a1c6..0192c8931c 100644 --- a/source/adios2/engine/ssc/SscHelper.cpp +++ b/source/adios2/engine/ssc/SscHelper.cpp @@ -303,6 +303,45 @@ void PrintMpiInfo(const MpiInfo &writersInfo, const MpiInfo &readersInfo) std::cout << std::endl; } +bool GetParameter(const Params ¶ms, const std::string &key, int &value) +{ + auto it = params.find(key); + if (it == params.end()) + { + return false; + } + else + { + try + { + value = std::stoi(it->second); + } + catch (...) + { + std::string error = + "Engine parameter " + key + " can only be integer numbers"; + std::cerr << error << std::endl; + return false; + } + } + return true; +} + +bool GetParameter(const Params ¶ms, const std::string &key, + std::string &value) +{ + auto it = params.find(key); + if (it == params.end()) + { + return false; + } + else + { + value = it->second; + } + return true; +} + } // end namespace ssc } // end namespace engine } // end namespace core diff --git a/source/adios2/engine/ssc/SscHelper.h b/source/adios2/engine/ssc/SscHelper.h index 383e672eb7..682c980ba7 100644 --- a/source/adios2/engine/ssc/SscHelper.h +++ b/source/adios2/engine/ssc/SscHelper.h @@ -66,6 +66,10 @@ void JsonToBlockVecVec(const std::string &input, BlockVecVec &output); bool AreSameDims(const Dims &a, const Dims &b); +bool GetParameter(const Params ¶ms, const std::string &key, int &value); +bool GetParameter(const Params ¶ms, const std::string &key, + std::string &value); + } // end namespace ssc } // end namespace engine } // end namespace core diff --git a/source/adios2/engine/ssc/SscReader.cpp b/source/adios2/engine/ssc/SscReader.cpp index 837daae56c..5de16fd3fd 100644 --- a/source/adios2/engine/ssc/SscReader.cpp +++ b/source/adios2/engine/ssc/SscReader.cpp @@ -10,6 +10,7 @@ #include "SscReader.tcc" #include "adios2/helper/adiosComm.h" +#include "adios2/helper/adiosCommMPI.h" #include "adios2/helper/adiosFunctions.h" #include "adios2/helper/adiosJSONcomplex.h" #include "nlohmann/json.hpp" @@ -31,24 +32,15 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode, m_ReaderRank = m_Comm.Rank(); m_ReaderSize = m_Comm.Size(); - auto it = m_IO.m_Parameters.find("MpiMode"); - if (it != m_IO.m_Parameters.end()) - { - m_MpiMode = it->second; - } - it = m_IO.m_Parameters.find("Verbose"); - if (it != m_IO.m_Parameters.end()) - { - try - { - m_Verbosity = std::stoi(it->second); - } - catch (...) - { - std::cerr << "Engine parameter Verbose can only be integer numbers" - << std::endl; - } - } + ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode); + ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity); + ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength", + m_MaxFilenameLength); + ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", + m_RendezvousAppCount); + ssc::GetParameter(m_IO.m_Parameters, "MaxStreamsPerApp", + m_MaxStreamsPerApp); + ssc::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs", m_OpenTimeoutSecs); m_Buffer.resize(1); @@ -116,12 +108,6 @@ StepStatus SscReader::BeginStep(const StepMode stepMode, { TAU_SCOPED_TIMER_FUNC(); - if (m_Verbosity >= 5) - { - std::cout << "SscReader::BeginStep, World Rank " << m_WorldRank - << ", Reader Rank " << m_ReaderRank << std::endl; - } - if (m_InitialStep) { m_InitialStep = false; @@ -153,6 +139,13 @@ StepStatus SscReader::BeginStep(const StepMode stepMode, } } + if (m_Verbosity >= 5) + { + std::cout << "SscReader::BeginStep, World Rank " << m_WorldRank + << ", Reader Rank " << m_ReaderRank << ", Step " + << m_CurrentStep << std::endl; + } + if (m_Buffer[0] == 1) { return StepStatus::EndOfStream; @@ -191,239 +184,29 @@ void SscReader::EndStep() void SscReader::SyncMpiPattern() { + TAU_SCOPED_TIMER_FUNC(); + if (m_Verbosity >= 5) { std::cout << "SscReader::SyncMpiPattern, World Rank " << m_WorldRank << ", Reader Rank " << m_ReaderRank << std::endl; } - TAU_SCOPED_TIMER_FUNC(); - if (m_WorldSize == m_ReaderSize) - { - throw(std::runtime_error("no writers are found")); - } - - std::vector lrbuf; - std::vector grbuf; + m_MpiHandshake.Handshake(m_Name, 'r', m_OpenTimeoutSecs, m_MaxStreamsPerApp, + m_MaxFilenameLength, m_RendezvousAppCount, + CommAsMPI(m_Comm)); - // Process m_WorldRank == 0 to gather all the local rank m_WriterRank, and - // find out all the m_WriterRank == 0 - if (m_WorldRank == 0) + for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name)) { - grbuf.resize(m_WorldSize); - } - - MPI_Gather(&m_ReaderRank, 1, MPI_INT, grbuf.data(), 1, MPI_INT, 0, - MPI_COMM_WORLD); - - std::vector AppStart; // m_WorldRank of the local rank 0 process - if (m_WorldRank == 0) - { - for (int i = 0; i < m_WorldSize; ++i) - { - if (grbuf[i] == 0) - { - AppStart.push_back(i); - } - } - m_AppSize = AppStart.size(); - } - - // Each local rank 0 process send their type (0 for writer, 1 for reader) to - // the world rank 0 process The AppStart are re-ordered to put all writers - // ahead of all the readers. - std::vector - AppType; // Vector to record the type of the local rank 0 process - if (m_ReaderRank == 0) // Send type from each local rank 0 process to the - // world rank 0 process - { - if (m_WorldRank == 0) // App_ID - { - AppType.resize(m_AppSize); - for (int i = 0; i < m_AppSize; ++i) - { - if (i == 0) - { - AppType[i] = 1; - ; - } - else - { - int tmp = 1; - MPI_Recv(&tmp, 1, MPI_INT, AppStart[i], 96, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); - AppType[i] = tmp; - } - } - } - else - { - int tmp = 1; // type 1 for reader - MPI_Send(&tmp, 1, MPI_INT, 0, 96, MPI_COMM_WORLD); // - } - } - - if (m_WorldRank == 0) - { - std::vector AppWriter; - std::vector AppReader; - - for (int i = 0; i < m_AppSize; ++i) - { - if (AppType[i] == 0) - { - AppWriter.push_back(AppStart[i]); - } - else - { - AppReader.push_back(AppStart[i]); - } - } - m_WriterGlobalMpiInfo.resize(AppWriter.size()); - m_ReaderGlobalMpiInfo.resize(AppReader.size()); - AppStart = AppWriter; - AppStart.insert(AppStart.end(), AppReader.begin(), AppReader.end()); - } - - // Send the m_AppSize and m_AppID to each local rank 0 process - if (m_ReaderRank == 0) // Send m_AppID to each local rank 0 process - { - if (m_WorldRank == 0) // App_ID - { - for (int i = 0; i < m_AppSize; ++i) - { - MPI_Send(&i, 1, MPI_INT, AppStart[i], 99, MPI_COMM_WORLD); // - } - } - else - { - MPI_Recv(&m_AppID, 1, MPI_INT, 0, 99, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); - } - } - - m_Comm.Bcast(&m_AppID, sizeof(int), - 0); // Local rank 0 process broadcast the m_AppID within the - // local communicator. - - MPI_Bcast(&m_AppSize, 1, MPI_INT, 0, MPI_COMM_WORLD); // Bcast the m_AppSize - - // In each local communicator, each local rank 0 process gathers the world - // rank of all the rest local processes. - if (m_ReaderRank == 0) - { - lrbuf.resize(m_ReaderSize); - } - - m_Comm.Gather(&m_WorldRank, 1, lrbuf.data(), 1, 0); - - // Send the WorldRank vector of each local communicator to the m_WorldRank - // == 0 process. - int WriterInfoSize = 0; - int ReaderInfoSize = 0; - if (m_ReaderRank == 0) - { - if (m_WorldRank == 0) // App_ID - { - for (int i = 0; i < m_WriterGlobalMpiInfo.size(); ++i) - { - if (i == 0) - { - m_WriterGlobalMpiInfo[i] = lrbuf; - ++WriterInfoSize; - } - else - { - int j_writersize; - MPI_Recv(&j_writersize, 1, MPI_INT, AppStart[i], 96, - MPI_COMM_WORLD, MPI_STATUS_IGNORE); // - ++WriterInfoSize; - - m_WriterGlobalMpiInfo[i].resize(j_writersize); - MPI_Recv(m_WriterGlobalMpiInfo[i].data(), j_writersize, - MPI_INT, AppStart[i], 98, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); // - } - } - - for (int i = m_WriterGlobalMpiInfo.size(); i < m_AppSize; ++i) - { - if (i == 0) - { - m_ReaderGlobalMpiInfo[i] = lrbuf; - ++ReaderInfoSize; - } - else - { - int j_readersize; - MPI_Recv(&j_readersize, 1, MPI_INT, AppStart[i], 95, - MPI_COMM_WORLD, MPI_STATUS_IGNORE); // - ++ReaderInfoSize; - - m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()] - .resize(j_readersize); - MPI_Recv( - m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()] - .data(), - j_readersize, MPI_INT, AppStart[i], 97, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); // - } - } - } - else - { - MPI_Send(&m_ReaderSize, 1, MPI_INT, 0, 95, MPI_COMM_WORLD); - MPI_Send(lrbuf.data(), lrbuf.size(), MPI_INT, 0, 97, - MPI_COMM_WORLD); - } - } - - // Broadcast m_WriterGlobalMpiInfo and m_ReaderGlobalMpiInfo to all the - // processes. - MPI_Bcast(&WriterInfoSize, 1, MPI_INT, 0, - MPI_COMM_WORLD); // Broadcast writerinfo size - MPI_Bcast(&ReaderInfoSize, 1, MPI_INT, 0, MPI_COMM_WORLD); - - m_WriterGlobalMpiInfo.resize(WriterInfoSize); - m_ReaderGlobalMpiInfo.resize(ReaderInfoSize); - - for (int i = 0; i < WriterInfoSize; ++i) - { - int ilen; - if (m_WorldRank == 0) - { - ilen = m_WriterGlobalMpiInfo[i].size(); - } - MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD); - m_WriterGlobalMpiInfo[i].resize(ilen); - MPI_Bcast(m_WriterGlobalMpiInfo[i].data(), ilen, MPI_INT, 0, - MPI_COMM_WORLD); // Broadcast readerinfo size - } - - for (int i = 0; i < ReaderInfoSize; ++i) - { - int ilen; - if (m_WorldRank == 0) - { - ilen = m_ReaderGlobalMpiInfo[i].size(); - } - MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD); - m_ReaderGlobalMpiInfo[i].resize(ilen); - MPI_Bcast(m_ReaderGlobalMpiInfo[i].data(), ilen, MPI_INT, 0, - MPI_COMM_WORLD); // Broadcast readerinfo size - } - - for (const auto &app : m_WriterGlobalMpiInfo) - { - for (int rank : app) + for (int rank : app.second) { m_AllWriterRanks.push_back(rank); } } - for (const auto &app : m_ReaderGlobalMpiInfo) + for (const auto &app : m_MpiHandshake.GetReaderMap(m_Name)) { - for (int rank : app) + for (int rank : app.second) { m_AllReaderRanks.push_back(rank); } @@ -433,11 +216,6 @@ void SscReader::SyncMpiPattern() MPI_Comm_group(MPI_COMM_WORLD, &worldGroup); MPI_Group_incl(worldGroup, m_AllWriterRanks.size(), m_AllWriterRanks.data(), &m_MpiAllWritersGroup); - - if (m_Verbosity >= 10 and m_WorldRank == 0) - { - ssc::PrintMpiInfo(m_WriterGlobalMpiInfo, m_ReaderGlobalMpiInfo); - } } void SscReader::SyncWritePattern() diff --git a/source/adios2/engine/ssc/SscReader.h b/source/adios2/engine/ssc/SscReader.h index c899ec6f0e..4690bb19c7 100644 --- a/source/adios2/engine/ssc/SscReader.h +++ b/source/adios2/engine/ssc/SscReader.h @@ -13,6 +13,7 @@ #include "SscHelper.h" #include "adios2/core/Engine.h" +#include "adios2/helper/adiosMpiHandshake.h" #include "adios2/toolkit/profiling/taustubs/tautimer.hpp" #include #include @@ -56,10 +57,8 @@ class SscReader : public Engine int m_WorldSize; int m_ReaderRank; int m_ReaderSize; - int m_AppID = 0; - int m_AppSize = 0; - std::vector> m_WriterGlobalMpiInfo; - std::vector> m_ReaderGlobalMpiInfo; + + helper::MpiHandshake m_MpiHandshake; std::vector m_AllWriterRanks; std::vector m_AllReaderRanks; @@ -102,6 +101,10 @@ class SscReader : public Engine ssc::RankPosMap &allOverlapRanks); int m_Verbosity = 0; + int m_MaxFilenameLength = 128; + int m_MaxStreamsPerApp = 1; + int m_RendezvousAppCount = 2; + int m_OpenTimeoutSecs = 10; }; } // end namespace engine diff --git a/source/adios2/engine/ssc/SscWriter.cpp b/source/adios2/engine/ssc/SscWriter.cpp index ad1cdfeeb3..3e4946c249 100644 --- a/source/adios2/engine/ssc/SscWriter.cpp +++ b/source/adios2/engine/ssc/SscWriter.cpp @@ -10,11 +10,10 @@ #include "SscWriter.tcc" #include "adios2/helper/adiosComm.h" +#include "adios2/helper/adiosCommMPI.h" #include "adios2/helper/adiosJSONcomplex.h" #include "nlohmann/json.hpp" -#include "adios2/helper/adiosCommMPI.h" - namespace adios2 { namespace core @@ -32,24 +31,15 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode, m_WriterRank = m_Comm.Rank(); m_WriterSize = m_Comm.Size(); - auto it = m_IO.m_Parameters.find("MpiMode"); - if (it != m_IO.m_Parameters.end()) - { - m_MpiMode = it->second; - } - it = m_IO.m_Parameters.find("Verbose"); - if (it != m_IO.m_Parameters.end()) - { - try - { - m_Verbosity = std::stoi(it->second); - } - catch (...) - { - std::cerr << "Engine parameter Verbose can only be integer numbers" - << std::endl; - } - } + ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode); + ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity); + ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength", + m_MaxFilenameLength); + ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", + m_RendezvousAppCount); + ssc::GetParameter(m_IO.m_Parameters, "MaxStreamsPerApp", + m_MaxStreamsPerApp); + ssc::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs", m_OpenTimeoutSecs); m_GlobalWritePattern.resize(m_WorldSize); m_GlobalReadPattern.resize(m_WorldSize); @@ -61,12 +51,6 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds) { TAU_SCOPED_TIMER_FUNC(); - if (m_Verbosity >= 5) - { - std::cout << "SscWriter::BeginStep, World Rank " << m_WorldRank - << ", Writer Rank " << m_WriterRank << std::endl; - } - if (m_InitialStep) { m_InitialStep = false; @@ -75,6 +59,14 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds) { ++m_CurrentStep; } + + if (m_Verbosity >= 5) + { + std::cout << "SscWriter::BeginStep, World Rank " << m_WorldRank + << ", Writer Rank " << m_WriterRank << ", Step " + << m_CurrentStep << std::endl; + } + return StepStatus::OK; } @@ -198,229 +190,21 @@ void SscWriter::SyncMpiPattern() << ", Writer Rank " << m_WriterRank << std::endl; } - if (m_WorldSize == m_WriterSize) - { - throw(std::runtime_error("no readers are found")); - } - - std::vector lrbuf; - std::vector grbuf; - - // Process m_WorldRank == 0 to gather all the local rank m_WriterRank, and - // find out all the m_WriterRank == 0 - if (m_WorldRank == 0) - { - grbuf.resize(m_WorldSize); - } - - MPI_Gather(&m_WriterRank, 1, MPI_INT, grbuf.data(), 1, MPI_INT, 0, - MPI_COMM_WORLD); + m_MpiHandshake.Handshake(m_Name, 'w', m_OpenTimeoutSecs, m_MaxStreamsPerApp, + m_MaxFilenameLength, m_RendezvousAppCount, + CommAsMPI(m_Comm)); - std::vector AppStart; // m_WorldRank of the local rank 0 process - if (m_WorldRank == 0) + for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name)) { - for (int i = 0; i < m_WorldSize; ++i) - { - if (grbuf[i] == 0) - { - AppStart.push_back(i); - } - } - m_AppSize = AppStart.size(); - } - - // Each local rank 0 process send their type (0 for writer, 1 for reader) to - // the world rank 0 process The AppStart are re-ordered to put all writers - // ahead of all the readers. - std::vector - AppType; // Vector to record the type of the local rank 0 process - if (m_WriterRank == 0) // Send type from each local rank 0 process to the - // world rank 0 process - { - if (m_WorldRank == 0) // App_ID - { - AppType.resize(m_AppSize); - for (int i = 0; i < m_AppSize; ++i) - { - if (i == 0) - { - AppType[i] = 0; - } - else - { - int tmp = 1; - MPI_Recv(&tmp, 1, MPI_INT, AppStart[i], 96, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); - AppType[i] = tmp; - } - } - } - else - { - int tmp = 0; // type 0 for writer - MPI_Send(&tmp, 1, MPI_INT, 0, 96, MPI_COMM_WORLD); // - } - } - - if (m_WorldRank == 0) - { - std::vector AppWriter; - std::vector AppReader; - - for (int i = 0; i < m_AppSize; ++i) - { - if (AppType[i] == 0) - { - AppWriter.push_back(AppStart[i]); - } - else - { - AppReader.push_back(AppStart[i]); - } - } - m_WriterGlobalMpiInfo.resize(AppWriter.size()); - m_ReaderGlobalMpiInfo.resize(AppReader.size()); - AppStart = AppWriter; - AppStart.insert(AppStart.end(), AppReader.begin(), AppReader.end()); - } - - // Send the m_AppSize and m_AppID to each local rank 0 process - if (m_WriterRank == 0) // Send m_AppID to each local rank 0 process - { - if (m_WorldRank == 0) // App_ID - { - for (int i = 0; i < m_AppSize; ++i) - { - MPI_Send(&i, 1, MPI_INT, AppStart[i], 99, MPI_COMM_WORLD); // - } - } - else - { - MPI_Recv(&m_AppID, 1, MPI_INT, 0, 99, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); - } - } - - m_Comm.Bcast(&m_AppID, 1, 0); // Local rank 0 process broadcast the m_AppID - // within the local communicator. - - MPI_Bcast(&m_AppSize, 1, MPI_INT, 0, MPI_COMM_WORLD); // Bcast the m_AppSize - - // In each local communicator, each local rank 0 process gathers the world - // rank of all the rest local processes. - if (m_WriterRank == 0) - { - lrbuf.resize(m_WriterSize); - } - m_Comm.Gather(&m_WorldRank, 1, lrbuf.data(), 1, 0); - - // Send the WorldRank vector of each local communicator to the m_WorldRank - // == 0 process. - int WriterInfoSize = 0; - int ReaderInfoSize = 0; - if (m_WriterRank == 0) - { - if (m_WorldRank == 0) // App_ID - { - for (int i = 0; i < m_WriterGlobalMpiInfo.size(); ++i) - { - if (i == 0) - { - m_WriterGlobalMpiInfo[i] = lrbuf; - ++WriterInfoSize; - } - else - { - int j_writersize; - MPI_Recv(&j_writersize, 1, MPI_INT, AppStart[i], 96, - MPI_COMM_WORLD, MPI_STATUS_IGNORE); // - ++WriterInfoSize; - - m_WriterGlobalMpiInfo[i].resize(j_writersize); - MPI_Recv(m_WriterGlobalMpiInfo[i].data(), j_writersize, - MPI_INT, AppStart[i], 98, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); // - } - } - - for (int i = m_WriterGlobalMpiInfo.size(); i < m_AppSize; ++i) - { - if (i == 0) - { - m_ReaderGlobalMpiInfo[i] = lrbuf; - ++ReaderInfoSize; - } - else - { - int j_readersize; - MPI_Recv(&j_readersize, 1, MPI_INT, AppStart[i], 95, - MPI_COMM_WORLD, MPI_STATUS_IGNORE); // - ++ReaderInfoSize; - - m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()] - .resize(j_readersize); - MPI_Recv( - m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()] - .data(), - j_readersize, MPI_INT, AppStart[i], 97, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); // - } - } - } - else - { - MPI_Send(&m_WriterSize, 1, MPI_INT, 0, 96, MPI_COMM_WORLD); - MPI_Send(lrbuf.data(), lrbuf.size(), MPI_INT, 0, 98, - MPI_COMM_WORLD); - } - } - - // Broadcast m_WriterGlobalMpiInfo and m_ReaderGlobalMpiInfo to all the - // processes. - MPI_Bcast(&WriterInfoSize, 1, MPI_INT, 0, MPI_COMM_WORLD); - MPI_Bcast(&ReaderInfoSize, 1, MPI_INT, 0, MPI_COMM_WORLD); - - m_WriterGlobalMpiInfo.resize(WriterInfoSize); - m_ReaderGlobalMpiInfo.resize(ReaderInfoSize); - - for (int i = 0; i < WriterInfoSize; ++i) - { - int ilen; - if (m_WorldRank == 0) - { - ilen = m_WriterGlobalMpiInfo[i].size(); - } - MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD); - - m_WriterGlobalMpiInfo[i].resize(ilen); - MPI_Bcast(m_WriterGlobalMpiInfo[i].data(), ilen, MPI_INT, 0, - MPI_COMM_WORLD); - } - - for (int i = 0; i < ReaderInfoSize; ++i) - { - int ilen; - if (m_WorldRank == 0) - { - ilen = m_ReaderGlobalMpiInfo[i].size(); - } - MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD); - m_ReaderGlobalMpiInfo[i].resize(ilen); - MPI_Bcast(m_ReaderGlobalMpiInfo[i].data(), ilen, MPI_INT, 0, - MPI_COMM_WORLD); - } - - for (const auto &app : m_WriterGlobalMpiInfo) - { - for (int rank : app) + for (int rank : app.second) { m_AllWriterRanks.push_back(rank); } } - for (const auto &app : m_ReaderGlobalMpiInfo) + for (const auto &app : m_MpiHandshake.GetReaderMap(m_Name)) { - for (int rank : app) + for (int rank : app.second) { m_AllReaderRanks.push_back(rank); } @@ -430,11 +214,6 @@ void SscWriter::SyncMpiPattern() MPI_Comm_group(MPI_COMM_WORLD, &worldGroup); MPI_Group_incl(worldGroup, m_AllReaderRanks.size(), m_AllReaderRanks.data(), &m_MpiAllReadersGroup); - - if (m_Verbosity >= 10 and m_WorldRank == 0) - { - ssc::PrintMpiInfo(m_WriterGlobalMpiInfo, m_ReaderGlobalMpiInfo); - } } void SscWriter::SyncWritePattern() diff --git a/source/adios2/engine/ssc/SscWriter.h b/source/adios2/engine/ssc/SscWriter.h index 158e065dbc..531ddfba34 100644 --- a/source/adios2/engine/ssc/SscWriter.h +++ b/source/adios2/engine/ssc/SscWriter.h @@ -13,6 +13,7 @@ #include "SscHelper.h" #include "adios2/core/Engine.h" +#include "adios2/helper/adiosMpiHandshake.h" #include "adios2/toolkit/profiling/taustubs/tautimer.hpp" #include #include @@ -57,10 +58,8 @@ class SscWriter : public Engine int m_WorldSize; int m_WriterRank; int m_WriterSize; - int m_AppID = 0; - int m_AppSize = 0; - std::vector> m_WriterGlobalMpiInfo; - std::vector> m_ReaderGlobalMpiInfo; + + helper::MpiHandshake m_MpiHandshake; std::vector m_AllWriterRanks; std::vector m_AllReaderRanks; @@ -92,6 +91,10 @@ class SscWriter : public Engine ssc::RankPosMap &allOverlapRanks); int m_Verbosity = 0; + int m_MaxFilenameLength = 128; + int m_MaxStreamsPerApp = 1; + int m_RendezvousAppCount = 2; + int m_OpenTimeoutSecs = 10; }; } // end namespace engine diff --git a/source/adios2/helper/adiosMpiHandshake.cpp b/source/adios2/helper/adiosMpiHandshake.cpp new file mode 100644 index 0000000000..4e8f5c14c2 --- /dev/null +++ b/source/adios2/helper/adiosMpiHandshake.cpp @@ -0,0 +1,326 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * adiosMpiHandshake.cpp + */ + +#include "adiosMpiHandshake.h" +#include +#include +#include +#include +#include + +namespace adios2 +{ +namespace helper +{ + +std::vector MpiHandshake::m_Buffer; +std::vector> MpiHandshake::m_SendRequests; +std::vector> MpiHandshake::m_RecvRequests; +size_t MpiHandshake::m_MaxStreamsPerApp; +size_t MpiHandshake::m_MaxFilenameLength; +size_t MpiHandshake::m_ItemSize; +std::map MpiHandshake::m_RendezvousAppCounts; +size_t MpiHandshake::m_StreamID = 0; +int MpiHandshake::m_WorldSize; +int MpiHandshake::m_WorldRank; +int MpiHandshake::m_LocalSize; +int MpiHandshake::m_LocalRank; +int MpiHandshake::m_LocalMasterRank; +std::map>> + MpiHandshake::m_WritersMap; +std::map>> + MpiHandshake::m_ReadersMap; +std::map MpiHandshake::m_AppsSize; + +size_t MpiHandshake::PlaceInBuffer(size_t stream, int rank) +{ + return rank * m_MaxStreamsPerApp * m_ItemSize + stream * m_ItemSize; +} + +void MpiHandshake::Test() +{ + int success; + MPI_Status status; + + for (int rank = 0; rank < m_WorldSize; ++rank) + { + for (size_t stream = 0; stream < m_MaxStreamsPerApp; ++stream) + { + MPI_Test(&m_RecvRequests[rank][stream], &success, &status); + if (success) + { + size_t offset = PlaceInBuffer(stream, rank); + char mode = m_Buffer[offset]; + offset += sizeof(char); + int appMasterRank = + reinterpret_cast(m_Buffer.data() + offset)[0]; + offset += sizeof(int); + int appSize = + reinterpret_cast(m_Buffer.data() + offset)[0]; + offset += sizeof(int); + std::string filename = m_Buffer.data() + offset; + m_AppsSize[appMasterRank] = appSize; + if (mode == 'w') + { + auto &ranks = m_WritersMap[filename][appMasterRank]; + if (std::find(ranks.begin(), ranks.end(), rank) == + ranks.end()) + { + ranks.push_back(rank); + } + } + else if (mode == 'r') + { + auto &ranks = m_ReadersMap[filename][appMasterRank]; + if (std::find(ranks.begin(), ranks.end(), rank) == + ranks.end()) + { + ranks.push_back(rank); + } + } + } + } + } +} + +bool MpiHandshake::Check(const std::string &filename) +{ + Test(); + + // check if RendezvousAppCount reached + + if (m_WritersMap[filename].size() + m_ReadersMap[filename].size() != + m_RendezvousAppCounts[filename]) + { + return false; + } + + // check if all ranks' info is received + + for (const auto &app : m_WritersMap[filename]) + { + if (app.second.size() != m_AppsSize[app.first]) + { + return false; + } + } + + for (const auto &app : m_ReadersMap[filename]) + { + if (app.second.size() != m_AppsSize[app.first]) + { + return false; + } + } + + return true; +} + +void MpiHandshake::Handshake(const std::string &filename, const char mode, + const int timeoutSeconds, + const size_t maxStreamsPerApp, + const size_t maxFilenameLength, + const size_t rendezvousAppCountForStream, + MPI_Comm localComm) +{ + + // initialize variables + + if (filename.size() > maxFilenameLength) + { + throw(std::runtime_error("Filename too long")); + } + + MPI_Comm_size(MPI_COMM_WORLD, &m_WorldSize); + MPI_Comm_rank(MPI_COMM_WORLD, &m_WorldRank); + MPI_Comm_size(localComm, &m_LocalSize); + MPI_Comm_rank(localComm, &m_LocalRank); + m_MaxStreamsPerApp = maxStreamsPerApp; + m_MaxFilenameLength = maxFilenameLength; + m_RendezvousAppCounts[filename] = rendezvousAppCountForStream; + + m_SendRequests.resize(m_WorldSize); + m_RecvRequests.resize(m_WorldSize); + for (int rank = 0; rank < m_WorldSize; ++rank) + { + m_SendRequests[rank].resize(maxStreamsPerApp); + m_RecvRequests[rank].resize(maxStreamsPerApp); + } + + m_ItemSize = maxFilenameLength + sizeof(char) + sizeof(int) * 2; + m_Buffer.resize(m_WorldSize * maxStreamsPerApp * m_ItemSize); + + // broadcast local master rank's world rank to use as app ID + + if (m_LocalRank == 0) + { + m_LocalMasterRank = m_WorldRank; + } + MPI_Bcast(&m_LocalMasterRank, 1, MPI_INT, 0, localComm); + + // start receiving + + for (int rank = 0; rank < m_WorldSize; ++rank) + { + for (size_t stream = 0; stream < maxStreamsPerApp; ++stream) + { + MPI_Irecv(m_Buffer.data() + PlaceInBuffer(stream, rank), m_ItemSize, + MPI_CHAR, rank, rank, MPI_COMM_WORLD, + &m_RecvRequests[rank][stream]); + } + } + + // start sending + + size_t offset = 0; + std::vector buffer(m_ItemSize); + std::memcpy(buffer.data(), &mode, sizeof(char)); + offset += sizeof(char); + std::memcpy(buffer.data() + offset, &m_LocalMasterRank, sizeof(int)); + offset += sizeof(int); + std::memcpy(buffer.data() + offset, &m_LocalSize, sizeof(int)); + offset += sizeof(int); + std::memcpy(buffer.data() + offset, filename.data(), filename.size()); + + for (int rank = 0; rank < m_WorldSize; ++rank) + { + MPI_Isend(buffer.data(), m_ItemSize, MPI_CHAR, rank, m_WorldRank, + MPI_COMM_WORLD, &m_SendRequests[rank][m_StreamID]); + } + + // wait and check if required RendezvousAppCount reached + + auto startTime = std::chrono::system_clock::now(); + while (!Check(filename)) + { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + auto nowTime = std::chrono::system_clock::now(); + auto duration = std::chrono::duration_cast( + nowTime - startTime); + if (duration.count() > timeoutSeconds) + { + throw(std::runtime_error("Mpi handshake timeout")); + } + } + + // clean up MPI requests + + for (auto &rs : m_RecvRequests) + { + for (auto &r : rs) + { + MPI_Status status; + int success; + MPI_Test(&r, &success, &status); + if (!success) + { + MPI_Cancel(&r); + } + } + } + m_RecvRequests.clear(); + + ++m_StreamID; +} + +const std::map> & +MpiHandshake::GetWriterMap(const std::string &filename) +{ + return m_WritersMap[filename]; +} +const std::map> & +MpiHandshake::GetReaderMap(const std::string &filename) +{ + return m_ReadersMap[filename]; +} + +void MpiHandshake::PrintMaps(const int printRank) +{ + if (m_WorldRank == printRank) + { + std::cout << "Writers: " << std::endl; + for (const auto &stream : m_WritersMap) + { + std::cout << " Stream " << stream.first << std::endl; + for (const auto &app : stream.second) + { + std::cout << " App Master Rank " << app.first + << std::endl; + std::cout << " "; + for (const auto &rank : app.second) + { + std::cout << rank << ", "; + } + std::cout << std::endl; + } + } + std::cout << "Readers: " << std::endl; + for (const auto &stream : m_ReadersMap) + { + std::cout << " Stream " << stream.first << std::endl; + for (const auto &app : stream.second) + { + std::cout << " App Master Rank " << app.first + << std::endl; + std::cout << " "; + for (const auto &rank : app.second) + { + std::cout << rank << ", "; + } + std::cout << std::endl; + } + } + } +} +void MpiHandshake::PrintMaps() +{ + for (int printRank = 0; printRank < m_WorldSize; ++printRank) + { + MPI_Barrier(MPI_COMM_WORLD); + if (m_WorldRank == printRank) + { + std::cout << "For rank " << printRank + << "============================================" + << std::endl; + std::cout << "Writers: " << std::endl; + for (const auto &stream : m_WritersMap) + { + std::cout << " Stream " << stream.first << std::endl; + for (const auto &app : stream.second) + { + std::cout << " App Master Rank " << app.first + << std::endl; + std::cout << " "; + for (const auto &rank : app.second) + { + std::cout << rank << ", "; + } + std::cout << std::endl; + } + } + std::cout << "Readers: " << std::endl; + for (const auto &stream : m_ReadersMap) + { + std::cout << " Stream " << stream.first << std::endl; + for (const auto &app : stream.second) + { + std::cout << " App Master Rank " << app.first + << std::endl; + std::cout << " "; + for (const auto &rank : app.second) + { + std::cout << rank << ", "; + } + std::cout << std::endl; + } + } + } + } +} + +} // end namespace helper +} // end namespace adios2 diff --git a/source/adios2/helper/adiosMpiHandshake.h b/source/adios2/helper/adiosMpiHandshake.h new file mode 100644 index 0000000000..9790f60b98 --- /dev/null +++ b/source/adios2/helper/adiosMpiHandshake.h @@ -0,0 +1,103 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * adiosMpiHandshake.h + */ + +#ifndef ADIOS2_HELPER_ADIOSMPIHANDSHAKE_H_ +#define ADIOS2_HELPER_ADIOSMPIHANDSHAKE_H_ + +#include "adios2/common/ADIOSConfig.h" +#ifndef ADIOS2_HAVE_MPI +#error "Do not include adiosMpiHandshake.h without ADIOS2_HAVE_MPI." +#endif + +#include +#include +#include +#include + +namespace adios2 +{ +namespace helper +{ + +class MpiHandshake +{ +public: + /** + * Start the handshake operations and wait until the rendezvous conditions + * are reached, or timeout. + * + * @param filename: name of the staging stream, must be within the length of + * maxFilenameLength + * + * @param mode: 'r' or 'w', read or write + * + * @param timeoutSeconds: timeout for the handshake, will throw exception + * when reaching this timeout + * + * @param maxStreamsPerApp: the maximum number of streams that all apps + * sharing this MPI_COMM_WORLD can possibly open. It is required that this + * number is consistent across all ranks. This is used for pre-allocating + * the vectors holding MPI requests and must be specified correctly, + * otherwise strange errors could occur. This class does not provide any + * mechanism to check whether this number being passed is actually correct + * or not accross all ranks, because implementing this logic for an + * arbitrary communication pattern is overly expensive, if not impossible. + * + * @param maxFilenameLength: the maximum possible length of filename that + * all apps sharing this MPI_COMM_WORLD could possibly define. It is + * required that this number is consistent across all ranks. This is used + * for pre-allocating the buffer for aggregating the global MPI information. + * An exception will be thrown if any filename on any rank is found to be + * longer than this. + * + * @param rendezvousAppCountForStream: the number of apps, including both + * writers and readers, that will work on this stream. The function will + * block until it receives the MPI handshake information from all these + * apps, or until timeoutSeconds is passed. + * + * @param localComm: local MPI communicator for the app + */ + static void Handshake(const std::string &filename, const char mode, + const int timeoutSeconds, + const size_t maxStreamsPerApp, + const size_t maxFilenameLength, + const size_t rendezvousAppCountForStream, + MPI_Comm localComm); + + static const std::map> & + GetWriterMap(const std::string &filename); + static const std::map> & + GetReaderMap(const std::string &filename); + static void PrintMaps(); + static void PrintMaps(const int printRank); + +private: + static void Test(); + static bool Check(const std::string &filename); + static size_t PlaceInBuffer(const size_t stream, const int rank); + static std::vector m_Buffer; + static std::vector> m_SendRequests; + static std::vector> m_RecvRequests; + static size_t m_MaxStreamsPerApp; + static size_t m_MaxFilenameLength; + static size_t m_ItemSize; + static std::map m_RendezvousAppCounts; + static size_t m_StreamID; + static int m_WorldSize; + static int m_WorldRank; + static int m_LocalSize; + static int m_LocalRank; + static int m_LocalMasterRank; + static std::map>> m_WritersMap; + static std::map>> m_ReadersMap; + static std::map m_AppsSize; +}; + +} // end namespace helper +} // end namespace adios2 + +#endif // ADIOS2_HELPER_ADIOSMPIHANDSHAKE_H_ diff --git a/testing/adios2/engine/ssc/TestSscMultiApp.cpp b/testing/adios2/engine/ssc/TestSscMultiApp.cpp index 422e62513b..3101c758d1 100644 --- a/testing/adios2/engine/ssc/TestSscMultiApp.cpp +++ b/testing/adios2/engine/ssc/TestSscMultiApp.cpp @@ -500,7 +500,7 @@ void Reader2(const Dims &shape, const Dims &start, const Dims &count, TEST_F(SscEngineTest, TestSscMultiApp) { std::string filename = "TestSscMultiApp"; - adios2::Params engineParams = {}; + adios2::Params engineParams = {{"RendezvousAppCount", "4"}}; int worldRank, worldSize; Dims start, count, shape;