diff --git a/docs/user_guide/source/engines/ssc.rst b/docs/user_guide/source/engines/ssc.rst index d34a2c8250..b5b8a555a4 100644 --- a/docs/user_guide/source/engines/ssc.rst +++ b/docs/user_guide/source/engines/ssc.rst @@ -10,11 +10,14 @@ The SSC engine takes the following parameters: 2. ``MpiMode``: Default **TwoSided**. MPI communication modes to use. Besides the default TwoSided mode using two sided MPI communications, MPI_Isend and MPI_Irecv, for data transport, there are four one sided MPI modes: OneSidedFencePush, OneSidedPostPush, OneSidedFencePull, and OneSidedPostPull. Modes with **Push** are based on the push model and use MPI_Put for data transport, while modes with **Pull** are based on the pull model and use MPI_Get. Modes with **Fence** use MPI_Win_fence for synchronization, while modes with **Post** use MPI_Win_start, MPI_Win_complete, MPI_Win_post and MPI_Win_wait. +3. ``Threading``: Default **False**. SSC will use threads to hide the time cost of metadata manipulation and data transfer when this parameter is set to **true**. SSC will check if MPI is initialized with multi-thread enabled, and if not, then SSC will force this parameter to be **false**. + =============================== ================== ================================================ **Key** **Value Format** **Default** and Examples =============================== ================== ================================================ - OpenTimeoutSecs integer **10**, 2, 20, 200 + OpenTimeoutSecs integer **10**, 2, 20, 200 MpiMode string **TwoSided**, OneSidedFencePush, OneSidedPostPush, OneSidedFencePull, OneSidedPostPull + Threading bool **false**, true =============================== ================== ================================================ diff --git a/source/adios2/engine/ssc/SscWriter.cpp b/source/adios2/engine/ssc/SscWriter.cpp index d9eaf6cb79..fea5b9bd34 100644 --- a/source/adios2/engine/ssc/SscWriter.cpp +++ b/source/adios2/engine/ssc/SscWriter.cpp @@ -28,9 +28,26 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode, helper::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode); helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity); + helper::GetParameter(m_IO.m_Parameters, "Threading", m_Threading); helper::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs", m_OpenTimeoutSecs); + int providedMpiMode; + MPI_Query_thread(&providedMpiMode); + if (providedMpiMode != MPI_THREAD_MULTIPLE) + { + if (m_Threading == true) + { + m_Threading = false; + if (m_WriterRank == 0) + { + std::cout << "SSC Threading disabled as MPI is not initialized " + "with multi-threads" + << std::endl; + } + } + } + SyncMpiPattern(); m_WriterRank = m_Comm.Rank(); m_WriterSize = m_Comm.Size(); @@ -42,6 +59,11 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds) { TAU_SCOPED_TIMER_FUNC(); + if (m_Threading && m_EndStepThread.joinable()) + { + m_EndStepThread.join(); + } + ++m_CurrentStep; if (m_Verbosity >= 5) @@ -80,6 +102,73 @@ size_t SscWriter::CurrentStep() const { return m_CurrentStep; } void SscWriter::PerformPuts() { TAU_SCOPED_TIMER_FUNC(); } +void SscWriter::EndStepFirst() +{ + TAU_SCOPED_TIMER_FUNC(); + + SyncWritePattern(); + MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL, + m_StreamComm, &m_MpiWin); + MPI_Win_free(&m_MpiWin); + SyncReadPattern(); + if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked) + { + MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL, + m_StreamComm, &m_MpiWin); + } +} + +void SscWriter::EndStepConsequentFixed() +{ + TAU_SCOPED_TIMER_FUNC(); + if (m_MpiMode == "twosided") + { + for (const auto &i : m_AllSendingReaderRanks) + { + m_MpiRequests.emplace_back(); + MPI_Isend(m_Buffer.data(), static_cast(m_Buffer.size()), + MPI_CHAR, i.first, 0, m_StreamComm, + &m_MpiRequests.back()); + } + } + else if (m_MpiMode == "onesidedfencepush") + { + MPI_Win_fence(0, m_MpiWin); + for (const auto &i : m_AllSendingReaderRanks) + { + MPI_Put(m_Buffer.data(), static_cast(m_Buffer.size()), + MPI_CHAR, i.first, i.second.first, + static_cast(m_Buffer.size()), MPI_CHAR, m_MpiWin); + } + } + else if (m_MpiMode == "onesidedpostpush") + { + MPI_Win_start(m_MpiAllReadersGroup, 0, m_MpiWin); + for (const auto &i : m_AllSendingReaderRanks) + { + MPI_Put(m_Buffer.data(), static_cast(m_Buffer.size()), + MPI_CHAR, i.first, i.second.first, + static_cast(m_Buffer.size()), MPI_CHAR, m_MpiWin); + } + } + else if (m_MpiMode == "onesidedfencepull") + { + MPI_Win_fence(0, m_MpiWin); + } + else if (m_MpiMode == "onesidedpostpull") + { + MPI_Win_post(m_MpiAllReadersGroup, 0, m_MpiWin); + } +} + +void SscWriter::EndStepConsequentFlexible() +{ + TAU_SCOPED_TIMER_FUNC(); + SyncWritePattern(); + MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL, + m_StreamComm, &m_MpiWin); +} + void SscWriter::EndStep() { TAU_SCOPED_TIMER_FUNC(); @@ -93,68 +182,33 @@ void SscWriter::EndStep() if (m_CurrentStep == 0) { - SyncWritePattern(); - MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL, - m_StreamComm, &m_MpiWin); - MPI_Win_free(&m_MpiWin); - SyncReadPattern(); - if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked) + if (m_Threading) { - MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL, - m_StreamComm, &m_MpiWin); + m_EndStepThread = std::thread(&SscWriter::EndStepFirst, this); + } + else + { + EndStepFirst(); } } else { if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked) { - if (m_MpiMode == "twosided") - { - for (const auto &i : m_AllSendingReaderRanks) - { - m_MpiRequests.emplace_back(); - MPI_Isend(m_Buffer.data(), - static_cast(m_Buffer.size()), MPI_CHAR, - i.first, 0, m_StreamComm, &m_MpiRequests.back()); - } - } - else if (m_MpiMode == "onesidedfencepush") - { - MPI_Win_fence(0, m_MpiWin); - for (const auto &i : m_AllSendingReaderRanks) - { - MPI_Put(m_Buffer.data(), static_cast(m_Buffer.size()), - MPI_CHAR, i.first, i.second.first, - static_cast(m_Buffer.size()), MPI_CHAR, - m_MpiWin); - } - } - else if (m_MpiMode == "onesidedpostpush") - { - MPI_Win_start(m_MpiAllReadersGroup, 0, m_MpiWin); - for (const auto &i : m_AllSendingReaderRanks) - { - MPI_Put(m_Buffer.data(), static_cast(m_Buffer.size()), - MPI_CHAR, i.first, i.second.first, - static_cast(m_Buffer.size()), MPI_CHAR, - m_MpiWin); - } - } - else if (m_MpiMode == "onesidedfencepull") + EndStepConsequentFixed(); + } + else + { + if (m_Threading) { - MPI_Win_fence(0, m_MpiWin); + m_EndStepThread = + std::thread(&SscWriter::EndStepConsequentFlexible, this); } - else if (m_MpiMode == "onesidedpostpull") + else { - MPI_Win_post(m_MpiAllReadersGroup, 0, m_MpiWin); + EndStepConsequentFlexible(); } } - else - { - SyncWritePattern(); - MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL, - m_StreamComm, &m_MpiWin); - } } } @@ -376,6 +430,11 @@ void SscWriter::DoClose(const int transportIndex) << ", Writer Rank " << m_WriterRank << std::endl; } + if (m_Threading && m_EndStepThread.joinable()) + { + m_EndStepThread.join(); + } + if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked) { if (m_CurrentStep > 0) diff --git a/source/adios2/engine/ssc/SscWriter.h b/source/adios2/engine/ssc/SscWriter.h index 1866f3c0b7..a5e46ef3ce 100644 --- a/source/adios2/engine/ssc/SscWriter.h +++ b/source/adios2/engine/ssc/SscWriter.h @@ -52,8 +52,8 @@ class SscWriter : public Engine MPI_Win m_MpiWin; MPI_Group m_MpiAllReadersGroup; MPI_Comm m_StreamComm; - std::string m_MpiMode = "twosided"; std::vector m_MpiRequests; + std::thread m_EndStepThread; int m_StreamRank; int m_StreamSize; @@ -64,6 +64,9 @@ class SscWriter : public Engine void SyncWritePattern(bool finalStep = false); void SyncReadPattern(); void MpiWait(); + void EndStepFirst(); + void EndStepConsequentFixed(); + void EndStepConsequentFlexible(); #define declare_type(T) \ void DoPutSync(Variable &, const T *) final; \ @@ -82,6 +85,8 @@ class SscWriter : public Engine int m_Verbosity = 0; int m_OpenTimeoutSecs = 10; + bool m_Threading = false; + std::string m_MpiMode = "twosided"; }; } // end namespace engine diff --git a/testing/adios2/engine/ssc/TestSscBaseThreading.cpp b/testing/adios2/engine/ssc/TestSscBaseThreading.cpp new file mode 100644 index 0000000000..7150cc2a96 --- /dev/null +++ b/testing/adios2/engine/ssc/TestSscBaseThreading.cpp @@ -0,0 +1,289 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + */ + +#include "TestSscCommon.h" +#include +#include +#include +#include +#include + +using namespace adios2; +int mpiRank = 0; +int mpiSize = 1; +MPI_Comm mpiComm; + +class SscEngineTest : public ::testing::Test +{ +public: + SscEngineTest() = default; +}; + +void Writer(const Dims &shape, const Dims &start, const Dims &count, + const size_t steps, const adios2::Params &engineParams, + const std::string &name) +{ + size_t datasize = + std::accumulate(count.begin(), count.end(), static_cast(1), + std::multiplies()); + adios2::ADIOS adios(mpiComm); + adios2::IO dataManIO = adios.DeclareIO("WAN"); + dataManIO.SetEngine("ssc"); + dataManIO.SetParameters(engineParams); + std::vector myChars(datasize); + std::vector myUChars(datasize); + std::vector myShorts(datasize); + std::vector myUShorts(datasize); + std::vector myInts(datasize); + std::vector myUInts(datasize); + std::vector myFloats(datasize); + std::vector myDoubles(datasize); + std::vector> myComplexes(datasize); + std::vector> myDComplexes(datasize); + auto bpChars = + dataManIO.DefineVariable("bpChars", shape, start, count); + auto bpUChars = dataManIO.DefineVariable("bpUChars", shape, + start, count); + auto bpShorts = + dataManIO.DefineVariable("bpShorts", shape, start, count); + auto bpUShorts = dataManIO.DefineVariable( + "bpUShorts", shape, start, count); + auto bpInts = dataManIO.DefineVariable("bpInts", shape, start, count); + auto bpUInts = + dataManIO.DefineVariable("bpUInts", shape, start, count); + auto bpFloats = + dataManIO.DefineVariable("bpFloats", shape, start, count); + auto bpDoubles = + dataManIO.DefineVariable("bpDoubles", shape, start, count); + auto bpComplexes = dataManIO.DefineVariable>( + "bpComplexes", shape, start, count); + auto bpDComplexes = dataManIO.DefineVariable>( + "bpDComplexes", shape, start, count); + auto scalarInt = dataManIO.DefineVariable("scalarInt"); + auto stringVar = dataManIO.DefineVariable("stringVar"); + dataManIO.DefineAttribute("AttInt", 110); + adios2::Engine engine = dataManIO.Open(name, adios2::Mode::Write); + for (int i = 0; i < steps; ++i) + { + engine.BeginStep(); + GenData(myChars, i, start, count, shape); + GenData(myUChars, i, start, count, shape); + GenData(myShorts, i, start, count, shape); + GenData(myUShorts, i, start, count, shape); + GenData(myInts, i, start, count, shape); + GenData(myUInts, i, start, count, shape); + GenData(myFloats, i, start, count, shape); + GenData(myDoubles, i, start, count, shape); + GenData(myComplexes, i, start, count, shape); + GenData(myDComplexes, i, start, count, shape); + engine.Put(bpChars, myChars.data(), adios2::Mode::Sync); + engine.Put(bpUChars, myUChars.data(), adios2::Mode::Sync); + engine.Put(bpShorts, myShorts.data(), adios2::Mode::Sync); + engine.Put(bpUShorts, myUShorts.data(), adios2::Mode::Sync); + engine.Put(bpInts, myInts.data(), adios2::Mode::Sync); + engine.Put(bpUInts, myUInts.data(), adios2::Mode::Sync); + engine.Put(bpFloats, myFloats.data(), adios2::Mode::Sync); + engine.Put(bpDoubles, myDoubles.data(), adios2::Mode::Sync); + engine.Put(bpComplexes, myComplexes.data(), adios2::Mode::Sync); + engine.Put(bpDComplexes, myDComplexes.data(), adios2::Mode::Sync); + engine.Put(scalarInt, i); + std::string s = "sample string sample string sample string"; + engine.Put(stringVar, s); + engine.EndStep(); + } + engine.Close(); +} + +void Reader(const Dims &shape, const Dims &start, const Dims &count, + const size_t steps, const adios2::Params &engineParams, + const std::string &name) +{ + adios2::ADIOS adios(mpiComm); + adios2::IO dataManIO = adios.DeclareIO("Test"); + dataManIO.SetEngine("ssc"); + dataManIO.SetParameters(engineParams); + adios2::Engine engine = dataManIO.Open(name, adios2::Mode::Read); + + size_t datasize = + std::accumulate(count.begin(), count.end(), static_cast(1), + std::multiplies()); + std::vector myChars(datasize); + std::vector myUChars(datasize); + std::vector myShorts(datasize); + std::vector myUShorts(datasize); + std::vector myInts(datasize); + std::vector myUInts(datasize); + std::vector myFloats(datasize); + std::vector myDoubles(datasize); + std::vector> myComplexes(datasize); + std::vector> myDComplexes(datasize); + + while (true) + { + adios2::StepStatus status = engine.BeginStep(StepMode::Read, 5); + if (status == adios2::StepStatus::OK) + { + auto scalarInt = dataManIO.InquireVariable("scalarInt"); + auto blocksInfo = + engine.BlocksInfo(scalarInt, engine.CurrentStep()); + + for (const auto &bi : blocksInfo) + { + ASSERT_EQ(bi.IsValue, true); + ASSERT_EQ(bi.Value, engine.CurrentStep()); + ASSERT_EQ(scalarInt.Min(), engine.CurrentStep()); + ASSERT_EQ(scalarInt.Max(), engine.CurrentStep()); + } + + const auto &vars = dataManIO.AvailableVariables(); + ASSERT_EQ(vars.size(), 12); + size_t currentStep = engine.CurrentStep(); + adios2::Variable bpChars = + dataManIO.InquireVariable("bpChars"); + adios2::Variable bpUChars = + dataManIO.InquireVariable("bpUChars"); + adios2::Variable bpShorts = + dataManIO.InquireVariable("bpShorts"); + adios2::Variable bpUShorts = + dataManIO.InquireVariable("bpUShorts"); + adios2::Variable bpInts = + dataManIO.InquireVariable("bpInts"); + adios2::Variable bpUInts = + dataManIO.InquireVariable("bpUInts"); + adios2::Variable bpFloats = + dataManIO.InquireVariable("bpFloats"); + adios2::Variable bpDoubles = + dataManIO.InquireVariable("bpDoubles"); + adios2::Variable> bpComplexes = + dataManIO.InquireVariable>("bpComplexes"); + adios2::Variable> bpDComplexes = + dataManIO.InquireVariable>("bpDComplexes"); + adios2::Variable stringVar = + dataManIO.InquireVariable("stringVar"); + + bpChars.SetSelection({start, count}); + bpUChars.SetSelection({start, count}); + bpShorts.SetSelection({start, count}); + bpUShorts.SetSelection({start, count}); + bpInts.SetSelection({start, count}); + bpUInts.SetSelection({start, count}); + bpFloats.SetSelection({start, count}); + bpDoubles.SetSelection({start, count}); + bpComplexes.SetSelection({start, count}); + bpDComplexes.SetSelection({start, count}); + + engine.Get(bpChars, myChars.data(), adios2::Mode::Sync); + engine.Get(bpUChars, myUChars.data(), adios2::Mode::Sync); + engine.Get(bpShorts, myShorts.data(), adios2::Mode::Sync); + engine.Get(bpUShorts, myUShorts.data(), adios2::Mode::Sync); + engine.Get(bpInts, myInts.data(), adios2::Mode::Sync); + engine.Get(bpUInts, myUInts.data(), adios2::Mode::Sync); + engine.Get(bpFloats, myFloats.data(), adios2::Mode::Sync); + engine.Get(bpDoubles, myDoubles.data(), adios2::Mode::Sync); + engine.Get(bpComplexes, myComplexes.data(), adios2::Mode::Sync); + engine.Get(bpDComplexes, myDComplexes.data(), adios2::Mode::Sync); + std::string s; + engine.Get(stringVar, s, adios2::Mode::Sync); + ASSERT_EQ(s, "sample string sample string sample string"); + ASSERT_EQ(stringVar.Min(), + "sample string sample string sample string"); + ASSERT_EQ(stringVar.Max(), + "sample string sample string sample string"); + + VerifyData(myChars.data(), currentStep, start, count, shape, + mpiRank); + VerifyData(myUChars.data(), currentStep, start, count, shape, + mpiRank); + VerifyData(myShorts.data(), currentStep, start, count, shape, + mpiRank); + VerifyData(myUShorts.data(), currentStep, start, count, shape, + mpiRank); + VerifyData(myInts.data(), currentStep, start, count, shape, + mpiRank); + VerifyData(myUInts.data(), currentStep, start, count, shape, + mpiRank); + VerifyData(myFloats.data(), currentStep, start, count, shape, + mpiRank); + VerifyData(myDoubles.data(), currentStep, start, count, shape, + mpiRank); + VerifyData(myComplexes.data(), currentStep, start, count, shape, + mpiRank); + VerifyData(myDComplexes.data(), currentStep, start, count, shape, + mpiRank); + engine.EndStep(); + } + else if (status == adios2::StepStatus::EndOfStream) + { + std::cout << "[Rank " + std::to_string(mpiRank) + + "] SscTest reader end of stream!" + << std::endl; + break; + } + } + auto attInt = dataManIO.InquireAttribute("AttInt"); + std::cout << "[Rank " + std::to_string(mpiRank) + "] Attribute received " + << attInt.Data()[0] << ", expected 110" << std::endl; + ASSERT_EQ(110, attInt.Data()[0]); + ASSERT_NE(111, attInt.Data()[0]); + engine.Close(); +} + +TEST_F(SscEngineTest, TestSscBaseThreading) +{ + std::string filename = "TestSscBaseThreading"; + adios2::Params engineParams = {{"Threading", "True"}}; + + int worldRank, worldSize; + MPI_Comm_rank(MPI_COMM_WORLD, &worldRank); + MPI_Comm_size(MPI_COMM_WORLD, &worldSize); + int mpiGroup = worldRank / (worldSize / 2); + MPI_Comm_split(MPI_COMM_WORLD, mpiGroup, worldRank, &mpiComm); + + MPI_Comm_rank(mpiComm, &mpiRank); + MPI_Comm_size(mpiComm, &mpiSize); + + Dims shape = {10, (size_t)mpiSize * 2}; + Dims start = {2, (size_t)mpiRank * 2}; + Dims count = {5, 2}; + size_t steps = 10; + + if (mpiGroup == 0) + { + Writer(shape, start, count, steps, engineParams, filename); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + if (mpiGroup == 1) + { + Reader(shape, start, count, steps, engineParams, filename); + } + + MPI_Barrier(MPI_COMM_WORLD); +} + +int main(int argc, char **argv) +{ + int provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + int result = 0; + if (provided == MPI_THREAD_MULTIPLE) + { + int worldRank, worldSize; + MPI_Comm_rank(MPI_COMM_WORLD, &worldRank); + MPI_Comm_size(MPI_COMM_WORLD, &worldSize); + ::testing::InitGoogleTest(&argc, argv); + result = RUN_ALL_TESTS(); + } + else + { + std::cout + << "MPI does not support multi-thread, skipping SSC threading test" + << std::endl; + } + + MPI_Finalize(); + return result; +}