From 8dfb30b69cc0a8440e1b0405795922e2a9adea83 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Tue, 28 Jul 2020 00:32:54 -0400 Subject: [PATCH] avoid repeated MPI_Get calls for blocks that have already been transferred --- source/adios2/engine/ssc/SscReader.cpp | 1 + source/adios2/engine/ssc/SscReader.h | 2 + source/adios2/engine/ssc/SscReader.tcc | 114 +++++++++++++++---------- 3 files changed, 72 insertions(+), 45 deletions(-) diff --git a/source/adios2/engine/ssc/SscReader.cpp b/source/adios2/engine/ssc/SscReader.cpp index a7c438935b..b19b29a7f1 100644 --- a/source/adios2/engine/ssc/SscReader.cpp +++ b/source/adios2/engine/ssc/SscReader.cpp @@ -59,6 +59,7 @@ StepStatus SscReader::BeginStep(const StepMode stepMode, if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false || m_ReaderSelectionsLocked == false) { + m_ReceivedRanks.clear(); m_Buffer.resize(1, 0); m_GlobalWritePattern.clear(); m_GlobalWritePattern.resize(m_StreamSize); diff --git a/source/adios2/engine/ssc/SscReader.h b/source/adios2/engine/ssc/SscReader.h index 7598ee5e93..b5941dd0e7 100644 --- a/source/adios2/engine/ssc/SscReader.h +++ b/source/adios2/engine/ssc/SscReader.h @@ -17,6 +17,7 @@ #include "adios2/toolkit/profiling/taustubs/tautimer.hpp" #include #include +#include namespace adios2 { @@ -53,6 +54,7 @@ class SscReader : public Engine MPI_Comm m_StreamComm; std::string m_MpiMode = "twosided"; std::vector m_MpiRequests; + std::unordered_set m_ReceivedRanks; int m_StreamRank; int m_StreamSize; diff --git a/source/adios2/engine/ssc/SscReader.tcc b/source/adios2/engine/ssc/SscReader.tcc index c32e84c62c..0009128117 100644 --- a/source/adios2/engine/ssc/SscReader.tcc +++ b/source/adios2/engine/ssc/SscReader.tcc @@ -29,58 +29,77 @@ void SscReader::GetDeferredCommon(Variable &variable, { TAU_SCOPED_TIMER_FUNC(); variable.SetData(data); - if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false || - m_ReaderSelectionsLocked == false) + + if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked) { - m_LocalReadPattern.emplace_back(); - auto &b = m_LocalReadPattern.back(); - b.name = variable.m_Name; - b.count = variable.m_Count; - b.start = variable.m_Start; - b.shape = variable.m_Shape; - b.type = DataType::String; + if (m_CurrentStep == 0) + { + m_LocalReadPattern.emplace_back(); + auto &b = m_LocalReadPattern.back(); + b.name = variable.m_Name; + b.count = variable.m_Count; + b.start = variable.m_Start; + b.shape = variable.m_Shape; + b.type = DataType::String; - m_LocalReadPatternJson["Variables"].emplace_back(); - auto &jref = m_LocalReadPatternJson["Variables"].back(); - jref["Name"] = b.name; - jref["Type"] = ToString(b.type); - jref["ShapeID"] = variable.m_ShapeID; - jref["Start"] = b.start; - jref["Count"] = b.count; - jref["Shape"] = b.shape; - jref["BufferStart"] = 0; - jref["BufferCount"] = 0; + m_LocalReadPatternJson["Variables"].emplace_back(); + auto &jref = m_LocalReadPatternJson["Variables"].back(); + jref["Name"] = b.name; + jref["Type"] = ToString(b.type); + jref["ShapeID"] = variable.m_ShapeID; + jref["Start"] = b.start; + jref["Count"] = b.count; + jref["Shape"] = b.shape; + jref["BufferStart"] = 0; + jref["BufferCount"] = 0; - ssc::JsonToBlockVecVec(m_GlobalWritePatternJson, m_GlobalWritePattern); - m_AllReceivingWriterRanks = - ssc::CalculateOverlap(m_GlobalWritePattern, m_LocalReadPattern); - CalculatePosition(m_GlobalWritePattern, m_AllReceivingWriterRanks); - size_t totalDataSize = 0; - for (auto i : m_AllReceivingWriterRanks) - { - totalDataSize += i.second.second; + ssc::JsonToBlockVecVec(m_GlobalWritePatternJson, + m_GlobalWritePattern); + m_AllReceivingWriterRanks = + ssc::CalculateOverlap(m_GlobalWritePattern, m_LocalReadPattern); + CalculatePosition(m_GlobalWritePattern, m_AllReceivingWriterRanks); + size_t totalDataSize = 0; + for (auto i : m_AllReceivingWriterRanks) + { + totalDataSize += i.second.second; + } + m_Buffer.resize(totalDataSize); + for (const auto &i : m_AllReceivingWriterRanks) + { + MPI_Win_lock(MPI_LOCK_SHARED, i.first, 0, m_MpiWin); + MPI_Get(m_Buffer.data() + i.second.first, i.second.second, + MPI_CHAR, i.first, 0, i.second.second, MPI_CHAR, + m_MpiWin); + MPI_Win_unlock(i.first, m_MpiWin); + } } - m_Buffer.resize(totalDataSize); + for (const auto &i : m_AllReceivingWriterRanks) { - MPI_Win_lock(MPI_LOCK_SHARED, i.first, 0, m_MpiWin); - MPI_Get(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR, - i.first, 0, i.second.second, MPI_CHAR, m_MpiWin); - MPI_Win_unlock(i.first, m_MpiWin); + const auto &v = m_GlobalWritePattern[i.first]; + for (const auto &b : v) + { + if (b.name == variable.m_Name) + { + std::vector str(b.bufferCount); + std::memcpy(str.data(), m_Buffer.data() + b.bufferStart, + b.bufferCount); + *data = std::string(str.begin(), str.end()); + } + } } } - - for (const auto &i : m_AllReceivingWriterRanks) + else { - const auto &v = m_GlobalWritePattern[i.first]; - for (const auto &b : v) + for (const auto &i : m_AllReceivingWriterRanks) { - if (b.name == variable.m_Name) + const auto &v = m_GlobalWritePattern[i.first]; + for (const auto &b : v) { - std::vector str(b.bufferCount); - std::memcpy(str.data(), m_Buffer.data() + b.bufferStart, - b.bufferCount); - *data = std::string(str.begin(), str.end()); + if (b.name == variable.m_Name) + { + *data = std::string(b.value.begin(), b.value.end()); + } } } } @@ -147,10 +166,15 @@ void SscReader::GetDeferredCommon(Variable &variable, T *data) m_Buffer.resize(totalDataSize); for (const auto &i : m_AllReceivingWriterRanks) { - MPI_Win_lock(MPI_LOCK_SHARED, i.first, 0, m_MpiWin); - MPI_Get(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR, - i.first, 0, i.second.second, MPI_CHAR, m_MpiWin); - MPI_Win_unlock(i.first, m_MpiWin); + if (m_ReceivedRanks.find(i.first) == m_ReceivedRanks.end()) + { + MPI_Win_lock(MPI_LOCK_SHARED, i.first, 0, m_MpiWin); + MPI_Get(m_Buffer.data() + i.second.first, i.second.second, + MPI_CHAR, i.first, 0, i.second.second, MPI_CHAR, + m_MpiWin); + MPI_Win_unlock(i.first, m_MpiWin); + m_ReceivedRanks.insert(i.first); + } } }