diff --git a/bindings/C/adios2/c/adios2_c_adios.cpp b/bindings/C/adios2/c/adios2_c_adios.cpp index 79086ffdf4..856dc7853f 100644 --- a/bindings/C/adios2/c/adios2_c_adios.cpp +++ b/bindings/C/adios2/c/adios2_c_adios.cpp @@ -233,6 +233,44 @@ adios2_error adios2_finalize(adios2_adios *adios) } } +/** Inform ADIOS about entering communication-free computation block + * in main thread. Useful when using Async IO */ +adios2_error adios2_enter_computation_block(adios2_adios *adios) +{ + try + { + adios2::helper::CheckForNullptr( + adios, + "for adios2_adios, in call to adios2_enter_computation_block"); + reinterpret_cast(adios)->EnterComputationBlock(); + return adios2_error_none; + } + catch (...) + { + return static_cast( + adios2::helper::ExceptionToError("adios2_enter_computation_block")); + } +} + +/** Inform ADIOS about exiting communication-free computation block + * in main thread. Useful when using Async IO */ +adios2_error adios2_exit_computation_block(adios2_adios *adios) +{ + try + { + adios2::helper::CheckForNullptr( + adios, + "for adios2_adios, in call to adios2_exit_computation_block"); + reinterpret_cast(adios)->ExitComputationBlock(); + return adios2_error_none; + } + catch (...) + { + return static_cast( + adios2::helper::ExceptionToError("adios2_exit_computation_block")); + } +} + #ifdef __cplusplus } // end extern C diff --git a/bindings/C/adios2/c/adios2_c_adios.h b/bindings/C/adios2/c/adios2_c_adios.h index 1f135b9130..3be6fd2a6f 100644 --- a/bindings/C/adios2/c/adios2_c_adios.h +++ b/bindings/C/adios2/c/adios2_c_adios.h @@ -147,6 +147,14 @@ adios2_error adios2_remove_io(adios2_bool *result, adios2_adios *adios, */ adios2_error adios2_remove_all_ios(adios2_adios *adios); +/** Inform ADIOS about entering communication-free computation block + * in main thread. Useful when using Async IO */ +adios2_error adios2_enter_computation_block(adios2_adios *adios); + +/** Inform ADIOS about exiting communication-free computation block + * in main thread. Useful when using Async IO */ +adios2_error adios2_exit_computation_block(adios2_adios *adios); + #ifdef __cplusplus } // end extern C #endif diff --git a/bindings/CXX11/adios2/cxx11/ADIOS.cpp b/bindings/CXX11/adios2/cxx11/ADIOS.cpp index 81da0ec034..b88d4457be 100644 --- a/bindings/CXX11/adios2/cxx11/ADIOS.cpp +++ b/bindings/CXX11/adios2/cxx11/ADIOS.cpp @@ -49,6 +49,18 @@ void ADIOS::FlushAll() m_ADIOS->FlushAll(); } +void ADIOS::EnterComputationBlock() noexcept +{ + CheckPointer("in call to ADIOS::EnterComputationBlock()"); + m_ADIOS->EnterComputationBlock(); +} + +void ADIOS::ExitComputationBlock() noexcept +{ + CheckPointer("in call to ADIOS::ExitComputationBlock()"); + m_ADIOS->ExitComputationBlock(); +} + Operator ADIOS::DefineOperator(const std::string name, const std::string type, const Params ¶meters) { diff --git a/bindings/CXX11/adios2/cxx11/ADIOS.h b/bindings/CXX11/adios2/cxx11/ADIOS.h index fdb147cd82..689939eebc 100644 --- a/bindings/CXX11/adios2/cxx11/ADIOS.h +++ b/bindings/CXX11/adios2/cxx11/ADIOS.h @@ -235,6 +235,14 @@ class ADIOS */ void RemoveAllIOs() noexcept; + /** Inform ADIOS about entering communication-free computation block + * in main thread. Useful when using Async IO */ + void EnterComputationBlock() noexcept; + + /** Inform ADIOS about exiting communication-free computation block + * in main thread. Useful when using Async IO */ + void ExitComputationBlock() noexcept; + protected: std::shared_ptr m_ADIOS; diff --git a/bindings/Fortran/f2c/adios2_f2c_adios.cpp b/bindings/Fortran/f2c/adios2_f2c_adios.cpp index f6511369f1..6d777802a5 100644 --- a/bindings/Fortran/f2c/adios2_f2c_adios.cpp +++ b/bindings/Fortran/f2c/adios2_f2c_adios.cpp @@ -109,6 +109,20 @@ void FC_GLOBAL(adios2_finalize_f2c, ADIOS2_FINALIZE_F2C)(adios2_adios **adios, *ierr = static_cast(adios2_finalize(*adios)); } +void FC_GLOBAL(adios2_enter_computation_block_f2c, + ADIOS2_ENTER_COMPUTATION_BLOCK_F2C)(adios2_adios **adios, + int *ierr) +{ + *ierr = static_cast(adios2_enter_computation_block(*adios)); +} + +void FC_GLOBAL(adios2_exit_computation_block_f2c, + ADIOS2_EXIT_COMPUTATION_BLOCK_F2C)(adios2_adios **adios, + int *ierr) +{ + *ierr = static_cast(adios2_exit_computation_block(*adios)); +} + #ifdef __cplusplus } #endif diff --git a/bindings/Fortran/modules/adios2_adios_mod.f90 b/bindings/Fortran/modules/adios2_adios_mod.f90 index cc70f031b7..51676978f3 100644 --- a/bindings/Fortran/modules/adios2_adios_mod.f90 +++ b/bindings/Fortran/modules/adios2_adios_mod.f90 @@ -24,6 +24,8 @@ module adios2_adios_mod external adios2_remove_io_f2c external adios2_remove_all_ios_f2c external adios2_finalize_f2c + external adios2_enter_computation_block_f2c + external adios2_exit_computation_block_f2c contains subroutine adios2_declare_io(io, adios, io_name, ierr) @@ -154,4 +156,18 @@ subroutine adios2_finalize(adios, ierr) end subroutine + + subroutine adios2_enter_computation_block(adios, ierr) + type(adios2_adios), intent(in) :: adios + integer, intent(out) :: ierr + call adios2_enter_computation_block_f2c(adios, ierr) + end subroutine + + subroutine adios2_exit_computation_block(adios, ierr) + type(adios2_adios), intent(in) :: adios + integer, intent(out) :: ierr + call adios2_exit_computation_block_f2c(adios, ierr) + end subroutine + + end module diff --git a/examples/basics/globalArray/globalArray_write.cpp b/examples/basics/globalArray/globalArray_write.cpp index b53d01d9ae..33573a079c 100644 --- a/examples/basics/globalArray/globalArray_write.cpp +++ b/examples/basics/globalArray/globalArray_write.cpp @@ -28,7 +28,9 @@ * Author: pnorbert */ +#include #include +#include #include #include @@ -64,7 +66,10 @@ int main(int argc, char *argv[]) // create one with default settings here adios2::IO io = adios.DeclareIO("Output"); io.SetEngine("BP5"); + io.SetParameter("AggregationType", "TwoLevelShm"); io.SetParameter("NumAggregators", "1"); + io.SetParameter("NumSubFiles", "1"); + io.SetParameter("AsyncWrite", "Guided"); /* * Define global array: type, name, global dimensions @@ -98,6 +103,9 @@ int main(int argc, char *argv[]) // Disk I/O will be performed during this call unless // time aggregation postpones all of that to some later step writer.EndStep(); + adios.EnterComputationBlock(); + std::this_thread::sleep_for(std::chrono::duration(1.0)); + adios.ExitComputationBlock(); } // Called once: indicate that we are done with this output for the run diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index 9784366f11..f3fb058cd3 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -91,6 +91,10 @@ add_library(adios2_core toolkit/transportman/TransportMan.cpp + toolkit/shm/Spinlock.cpp + toolkit/shm/SerializeProcesses.cpp + toolkit/shm/TokenChain.h + toolkit/aggregator/mpi/MPIAggregator.cpp toolkit/aggregator/mpi/MPIChain.cpp toolkit/aggregator/mpi/MPIShmChain.cpp @@ -125,7 +129,7 @@ if (ADIOS2_HAVE_BP5) target_sources(adios2_core PRIVATE engine/bp5/BP5Engine.cpp engine/bp5/BP5Reader.cpp engine/bp5/BP5Reader.tcc - engine/bp5/BP5Writer.cpp engine/bp5/BP5Writer.tcc engine/bp5/BP5Writer_TwoLevelShm.cpp + engine/bp5/BP5Writer.cpp engine/bp5/BP5Writer.tcc engine/bp5/BP5Writer_TwoLevelShm.cpp engine/bp5/BP5Writer_TwoLevelShm_Async.cpp engine/bp5/BP5Writer_EveryoneWrites_Async.cpp ) endif() diff --git a/source/adios2/common/ADIOSTypes.h b/source/adios2/common/ADIOSTypes.h index 7b35445c14..69d7dddc98 100644 --- a/source/adios2/common/ADIOSTypes.h +++ b/source/adios2/common/ADIOSTypes.h @@ -214,8 +214,8 @@ constexpr size_t DefaultMinDeferredSize = 4 * 1024 * 1024; constexpr size_t DefaultMaxFileBatchSize = 2147381248; /** default maximum shared memory segment size - * 128Mb */ -constexpr uint64_t DefaultMaxShmSize = 128 * 1024 * 1024; + * 2 blocks of MaxFileBatchSize */ +constexpr uint64_t DefaultMaxShmSize = 2 * DefaultMaxFileBatchSize; constexpr char PathSeparator = #ifdef _WIN32 diff --git a/source/adios2/core/ADIOS.cpp b/source/adios2/core/ADIOS.cpp index fcc3d678c9..643cc9c105 100644 --- a/source/adios2/core/ADIOS.cpp +++ b/source/adios2/core/ADIOS.cpp @@ -151,6 +151,27 @@ void ADIOS::FlushAll() } } +void ADIOS::EnterComputationBlock() noexcept +{ + enteredComputationBlock = true; + for (auto &ioPair : m_IOs) + { + ioPair.second.EnterComputationBlock(); + } +} + +void ADIOS::ExitComputationBlock() noexcept +{ + if (enteredComputationBlock) + { + enteredComputationBlock = false; + for (auto &ioPair : m_IOs) + { + ioPair.second.ExitComputationBlock(); + } + } +} + std::pair &ADIOS::DefineOperator(const std::string &name, const std::string type, const Params ¶meters) diff --git a/source/adios2/core/ADIOS.h b/source/adios2/core/ADIOS.h index 660e3ef43b..cc59d08c66 100644 --- a/source/adios2/core/ADIOS.h +++ b/source/adios2/core/ADIOS.h @@ -20,6 +20,7 @@ #include "adios2/common/ADIOSConfig.h" #include "adios2/common/ADIOSTypes.h" +#include "adios2/core/CoreTypes.h" #include "adios2/core/Operator.h" #include "adios2/helper/adiosComm.h" @@ -145,6 +146,14 @@ class ADIOS */ void RemoveAllIOs() noexcept; + /** Inform ADIOS about entering communication-free computation block + * in main thread. Useful when using Async IO */ + void EnterComputationBlock() noexcept; + + /** Inform ADIOS about exiting communication-free computation block + * in main thread. Useful when using Async IO */ + void ExitComputationBlock() noexcept; + private: /** Communicator given to parallel constructor. */ helper::Comm m_Comm; @@ -166,6 +175,9 @@ class ADIOS /** operators created with DefineOperator */ std::unordered_map> m_Operators; + /** Flag for Enter/ExitComputationBlock */ + bool enteredComputationBlock = false; + void CheckOperator(const std::string name) const; void XMLInit(const std::string &configFileXML); diff --git a/source/adios2/core/Engine.cpp b/source/adios2/core/Engine.cpp index d7da3d8f2e..b404af7439 100644 --- a/source/adios2/core/Engine.cpp +++ b/source/adios2/core/Engine.cpp @@ -95,6 +95,9 @@ size_t Engine::DebugGetDataBufferSize() const return 0; } +void Engine::EnterComputationBlock() noexcept {} +void Engine::ExitComputationBlock() noexcept {} + // PROTECTED void Engine::Init() {} void Engine::InitParameters() {} diff --git a/source/adios2/core/Engine.h b/source/adios2/core/Engine.h index 87afba7e40..25141ebe4f 100644 --- a/source/adios2/core/Engine.h +++ b/source/adios2/core/Engine.h @@ -511,6 +511,11 @@ class Engine virtual void NotifyEngineAttribute(std::string name, DataType type) noexcept; + /** Inform about computation block through User->ADIOS->IO */ + virtual void EnterComputationBlock() noexcept; + /** Inform about computation block through User->ADIOS->IO */ + virtual void ExitComputationBlock() noexcept; + protected: /** from ADIOS class passed to Engine created with Open * if no communicator is passed */ diff --git a/source/adios2/core/IO.cpp b/source/adios2/core/IO.cpp index 159017bfba..7883f1f47f 100644 --- a/source/adios2/core/IO.cpp +++ b/source/adios2/core/IO.cpp @@ -738,6 +738,30 @@ void IO::RemoveEngine(const std::string &name) } } +void IO::EnterComputationBlock() noexcept +{ + for (auto &enginePair : m_Engines) + { + auto &engine = enginePair.second; + if (engine->OpenMode() != Mode::Read) + { + enginePair.second->EnterComputationBlock(); + } + } +} + +void IO::ExitComputationBlock() noexcept +{ + for (auto &enginePair : m_Engines) + { + auto &engine = enginePair.second; + if (engine->OpenMode() != Mode::Read) + { + enginePair.second->ExitComputationBlock(); + } + } +} + void IO::FlushAll() { PERFSTUBS_SCOPED_TIMER("IO::FlushAll"); diff --git a/source/adios2/core/IO.h b/source/adios2/core/IO.h index 99f13bf657..c85796c4eb 100644 --- a/source/adios2/core/IO.h +++ b/source/adios2/core/IO.h @@ -25,6 +25,7 @@ #include "adios2/common/ADIOSTypes.h" #include "adios2/core/ADIOS.h" #include "adios2/core/Attribute.h" +#include "adios2/core/CoreTypes.h" #include "adios2/core/Group.h" #include "adios2/core/Variable.h" #include "adios2/core/VariableCompound.h" @@ -487,6 +488,11 @@ class IO return m_Engines; } + /** Inform about computation block through User->ADIOS */ + void EnterComputationBlock() noexcept; + /** Inform about computation block through User->ADIOS */ + void ExitComputationBlock() noexcept; + private: /** true: exist in config file (XML) */ const bool m_InConfigFile = false; diff --git a/source/adios2/engine/bp3/BP3Writer.cpp b/source/adios2/engine/bp3/BP3Writer.cpp index d27f854a9c..b615e90b21 100644 --- a/source/adios2/engine/bp3/BP3Writer.cpp +++ b/source/adios2/engine/bp3/BP3Writer.cpp @@ -196,11 +196,11 @@ void BP3Writer::InitTransports() if (m_BP3Serializer.m_Aggregator.m_IsAggregator) { - if (m_BP3Serializer.m_Parameters.AsyncTasks) + if (m_BP3Serializer.m_Parameters.AsyncOpen) { for (size_t i = 0; i < m_IO.m_TransportsParameters.size(); ++i) { - m_IO.m_TransportsParameters[i]["asynctasks"] = "true"; + m_IO.m_TransportsParameters[i]["asyncopen"] = "true"; } } m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode, diff --git a/source/adios2/engine/bp4/BP4Writer.cpp b/source/adios2/engine/bp4/BP4Writer.cpp index bbb9b2186a..d24de5f143 100644 --- a/source/adios2/engine/bp4/BP4Writer.cpp +++ b/source/adios2/engine/bp4/BP4Writer.cpp @@ -248,11 +248,11 @@ void BP4Writer::InitTransports() if (m_BP4Serializer.m_Aggregator.m_IsAggregator) { - if (m_BP4Serializer.m_Parameters.AsyncTasks) + if (m_BP4Serializer.m_Parameters.AsyncOpen) { for (size_t i = 0; i < m_IO.m_TransportsParameters.size(); ++i) { - m_IO.m_TransportsParameters[i]["asynctasks"] = "true"; + m_IO.m_TransportsParameters[i]["asyncopen"] = "true"; } } for (size_t i = 0; i < m_IO.m_TransportsParameters.size(); ++i) diff --git a/source/adios2/engine/bp5/BP5Engine.cpp b/source/adios2/engine/bp5/BP5Engine.cpp index 3be352b204..fb43dde162 100644 --- a/source/adios2/engine/bp5/BP5Engine.cpp +++ b/source/adios2/engine/bp5/BP5Engine.cpp @@ -292,6 +292,38 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params) } }; + auto lf_SetAsyncWriteParameter = [&](const std::string key, int ¶meter, + int def) { + auto itKey = io.m_Parameters.find(key); + parameter = def; + if (itKey != io.m_Parameters.end()) + { + std::string value = itKey->second; + std::transform(value.begin(), value.end(), value.begin(), + ::tolower); + if (value == "guided" || value == "auto" || value == "on" || + value == "true") + { + parameter = (int)AsyncWrite::Guided; + } + else if (value == "sync" || value == "off" || value == "false") + { + parameter = (int)AsyncWrite::Sync; + } + else if (value == "naive") + { + parameter = (int)AsyncWrite::Naive; + } + else + { + throw std::invalid_argument( + "ERROR: Unknown BP5 AsyncWriteMode parameter \"" + value + + "\" (must be \"auto\", \"sync\", \"naive\", \"throttled\" " + "or \"guided\""); + } + } + }; + #define get_params(Param, Type, Typedecl, Default) \ lf_Set##Type##Parameter(#Param, Params.Param, Default); BP5_FOREACH_PARAMETER_TYPE_4ARGS(get_params); diff --git a/source/adios2/engine/bp5/BP5Engine.h b/source/adios2/engine/bp5/BP5Engine.h index 2a1f39deca..7ed67ada31 100644 --- a/source/adios2/engine/bp5/BP5Engine.h +++ b/source/adios2/engine/bp5/BP5Engine.h @@ -101,6 +101,13 @@ class BP5Engine Auto }; + enum class AsyncWrite + { + Sync = 0, // enable using AsyncWriteMode as bool expression + Naive, + Guided + }; + #define BP5_FOREACH_PARAMETER_TYPE_4ARGS(MACRO) \ MACRO(OpenTimeoutSecs, Int, int, 3600) \ MACRO(BeginStepPollingFrequencySecs, Int, int, 0) \ @@ -115,7 +122,8 @@ class BP5Engine MACRO(FileSystemPageSize, UInt, unsigned int, 4096) \ MACRO(AggregationType, AggregationType, int, \ (int)AggregationType::TwoLevelShm) \ - MACRO(AsyncTasks, Bool, bool, true) \ + MACRO(AsyncOpen, Bool, bool, true) \ + MACRO(AsyncWrite, AsyncWrite, int, (int)AsyncWrite::Sync) \ MACRO(GrowthFactor, Float, float, DefaultBufferGrowthFactor) \ MACRO(InitialBufferSize, SizeBytes, size_t, DefaultInitialBufferSize) \ MACRO(MinDeferredSize, SizeBytes, size_t, DefaultMinDeferredSize) \ diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index 10ca127dd9..453e277fd4 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -36,6 +36,7 @@ BP5Writer::BP5Writer(IO &io, const std::string &name, const Mode mode, m_FileMetadataIndexManager(m_Comm), m_FileMetaMetadataManager(m_Comm), m_Profiler(m_Comm) { + m_EngineStart = Now(); PERFSTUBS_SCOPED_TIMER("BP5Writer::Open"); m_IO.m_ReadStreaming = false; @@ -50,7 +51,47 @@ StepStatus BP5Writer::BeginStep(StepMode mode, const float timeoutSeconds) "without an intervening EndStep()"); } + Seconds ts = Now() - m_EngineStart; + // std::cout << "BEGIN STEP starts at: " << ts.count() << std::endl; m_BetweenStepPairs = true; + + if (m_WriterStep > 0) + { + m_LastTimeBetweenSteps = Now() - m_EndStepEnd; + m_TotalTimeBetweenSteps += m_LastTimeBetweenSteps; + m_AvgTimeBetweenSteps = m_TotalTimeBetweenSteps / m_WriterStep; + m_ExpectedTimeBetweenSteps = m_LastTimeBetweenSteps; + if (m_ExpectedTimeBetweenSteps > m_AvgTimeBetweenSteps) + { + m_ExpectedTimeBetweenSteps = m_AvgTimeBetweenSteps; + } + } + + if (m_Parameters.AsyncWrite) + { + m_AsyncWriteLock.lock(); + m_flagRush = true; + m_AsyncWriteLock.unlock(); + TimePoint wait_start = Now(); + if (m_WriteFuture.valid()) + { + m_WriteFuture.get(); + m_Comm.Barrier(); + AsyncWriteDataCleanup(); + Seconds wait = Now() - wait_start; + if (m_Comm.Rank() == 0) + { + WriteMetadataFileIndex(m_LatestMetaDataPos, + m_LatestMetaDataSize); + std::cout << "BeginStep, wait on async write was = " + << wait.count() << " time since EndStep was = " + << m_LastTimeBetweenSteps.count() + << " expect next one to be = " + << m_ExpectedTimeBetweenSteps.count() << std::endl; + } + } + } + if (m_Parameters.BufferVType == (int)BufferVType::MallocVType) { m_BP5Serializer.InitStep(new MallocV("BP5Writer", false, @@ -65,6 +106,8 @@ StepStatus BP5Writer::BeginStep(StepMode mode, const float timeoutSeconds) } m_ThisTimestepDataSize = 0; + ts = Now() - m_EngineStart; + // std::cout << "BEGIN STEP ended at: " << ts.count() << std::endl; return StepStatus::OK; } @@ -143,24 +186,67 @@ BP5Writer::WriteMetadata(const std::vector &MetaDataBlocks, return MetaDataSize; } +void BP5Writer::AsyncWriteDataCleanup() +{ + if (m_Parameters.AsyncWrite) + { + switch (m_Parameters.AggregationType) + { + case (int)AggregationType::EveryoneWrites: + case (int)AggregationType::EveryoneWritesSerial: + AsyncWriteDataCleanup_EveryoneWrites(); + break; + case (int)AggregationType::TwoLevelShm: + AsyncWriteDataCleanup_TwoLevelShm(); + break; + default: + break; + } + } +} + void BP5Writer::WriteData(format::BufferV *Data) { - switch (m_Parameters.AggregationType) - { - case (int)AggregationType::EveryoneWrites: - WriteData_EveryoneWrites(Data, false); - break; - case (int)AggregationType::EveryoneWritesSerial: - WriteData_EveryoneWrites(Data, true); - break; - case (int)AggregationType::TwoLevelShm: - WriteData_TwoLevelShm(Data); - break; - default: - throw std::invalid_argument( - "Aggregation method " + - std::to_string(m_Parameters.AggregationType) + - "is not supported in BP5"); + if (m_Parameters.AsyncWrite) + { + switch (m_Parameters.AggregationType) + { + case (int)AggregationType::EveryoneWrites: + WriteData_EveryoneWrites_Async(Data, false); + break; + case (int)AggregationType::EveryoneWritesSerial: + WriteData_EveryoneWrites_Async(Data, true); + break; + case (int)AggregationType::TwoLevelShm: + WriteData_TwoLevelShm_Async(Data); + break; + default: + throw std::invalid_argument( + "Aggregation method " + + std::to_string(m_Parameters.AggregationType) + + "is not supported in BP5"); + } + } + else + { + switch (m_Parameters.AggregationType) + { + case (int)AggregationType::EveryoneWrites: + WriteData_EveryoneWrites(Data, false); + break; + case (int)AggregationType::EveryoneWritesSerial: + WriteData_EveryoneWrites(Data, true); + break; + case (int)AggregationType::TwoLevelShm: + WriteData_TwoLevelShm(Data); + break; + default: + throw std::invalid_argument( + "Aggregation method " + + std::to_string(m_Parameters.AggregationType) + + "is not supported in BP5"); + } + delete Data; } } @@ -170,8 +256,6 @@ void BP5Writer::WriteData_EveryoneWrites(format::BufferV *Data, const aggregator::MPIChain *a = dynamic_cast(m_Aggregator); - std::vector DataVec = Data->DataVec(); - // new step writing starts at offset m_DataPos on aggregator // others will wait for the position to arrive from the rank below @@ -194,9 +278,10 @@ void BP5Writer::WriteData_EveryoneWrites(format::BufferV *Data, "Chain token in BP5Writer::WriteData"); } + m_DataPos += Data->Size(); + std::vector DataVec = Data->DataVec(); m_FileDataManager.WriteFileAt(DataVec.data(), DataVec.size(), m_StartDataPos); - m_DataPos += Data->Size(); if (SerializedWriters && a->m_Comm.Rank() < a->m_Comm.Size() - 1) { @@ -352,24 +437,34 @@ void BP5Writer::MarshalAttributes() void BP5Writer::EndStep() { + /* Seconds ts = Now() - m_EngineStart; + std::cout << "END STEP starts at: " << ts.count() << std::endl; */ m_BetweenStepPairs = false; PERFSTUBS_SCOPED_TIMER("BP5Writer::EndStep"); m_Profiler.Start("endstep"); MarshalAttributes(); // true: advances step - auto TSInfo = m_BP5Serializer.CloseTimestep(m_WriterStep); + auto TSInfo = + m_BP5Serializer.CloseTimestep(m_WriterStep, m_Parameters.AsyncWrite); /* TSInfo includes NewMetaMetaBlocks, the MetaEncodeBuffer, the * AttributeEncodeBuffer and the data encode Vector */ /* the first */ + m_ThisTimestepDataSize += TSInfo.DataBuffer->Size(); + m_Profiler.Start("AWD"); - WriteData(TSInfo.DataBuffer); + // TSInfo destructor would delete the DataBuffer so we need to save it + // for async IO and let the writer free it up when not needed anymore + adios2::format::BufferV *databuf = TSInfo.DataBuffer; + TSInfo.DataBuffer = NULL; + m_AsyncWriteLock.lock(); + m_flagRush = false; + m_AsyncWriteLock.unlock(); + WriteData(databuf); m_Profiler.Stop("AWD"); - m_ThisTimestepDataSize += TSInfo.DataBuffer->Size(); - std::vector MetaBuffer = m_BP5Serializer.CopyMetadataToContiguous( TSInfo.NewMetaMetaBlocks, TSInfo.MetaEncodeBuffer, TSInfo.AttributeEncodeBuffer, m_ThisTimestepDataSize, m_StartDataPos); @@ -416,13 +511,33 @@ void BP5Writer::EndStep() m_Assignment.size()); } WriteMetaMetadata(UniqueMetaMetaBlocks); - uint64_t ThisMetaDataPos = m_MetaDataPos; - uint64_t ThisMetaDataSize = WriteMetadata(Metadata, AttributeBlocks); - WriteMetadataFileIndex(ThisMetaDataPos, ThisMetaDataSize); + m_LatestMetaDataPos = m_MetaDataPos; + m_LatestMetaDataSize = WriteMetadata(Metadata, AttributeBlocks); + if (!m_Parameters.AsyncWrite) + { + WriteMetadataFileIndex(m_LatestMetaDataPos, m_LatestMetaDataSize); + } } delete RecvBuffer; + + if (m_Parameters.AsyncWrite) + { + /* Start counting computation blocks between EndStep and next BeginStep + * each time */ + { + m_AsyncWriteLock.lock(); + m_ComputationBlockTimes.clear(); + m_ComputationBlocksLength = 0.0; + m_ComputationBlockID = 0; + m_AsyncWriteLock.unlock(); + } + } + m_Profiler.Stop("endstep"); m_WriterStep++; + m_EndStepEnd = Now(); + /* Seconds ts2 = Now() - m_EngineStart; + std::cout << "END STEP ended at: " << ts2.count() << std::endl;*/ } // PRIVATE @@ -506,6 +621,7 @@ void BP5Writer::InitAggregator() // m_Aggregator.m_IsActive is always true // m_Aggregator.m_Comm.Rank() will always succeed (not abort) // m_Aggregator.m_SubFileIndex is always set + if (m_Parameters.AggregationType == (int)AggregationType::EveryoneWrites || m_Parameters.AggregationType == (int)AggregationType::EveryoneWritesSerial) @@ -618,11 +734,11 @@ void BP5Writer::InitTransports() /* Everyone opens its data file. Each aggregation chain opens one data file and does so in chain, not everyone at once */ - if (m_Parameters.AsyncTasks) + if (m_Parameters.AsyncOpen) { for (size_t i = 0; i < m_IO.m_TransportsParameters.size(); ++i) { - m_IO.m_TransportsParameters[i]["asynctasks"] = "true"; + m_IO.m_TransportsParameters[i]["asyncopen"] = "true"; } } @@ -858,6 +974,38 @@ void BP5Writer::NotifyEngineAttribute(std::string name, DataType type) noexcept m_MarshaledAttributesCount = 0; } +void BP5Writer::EnterComputationBlock() noexcept +{ + if (m_Parameters.AsyncWrite && !m_BetweenStepPairs) + { + m_ComputationBlockStart = Now(); + { + m_AsyncWriteLock.lock(); + m_InComputationBlock = true; + m_AsyncWriteLock.unlock(); + } + } +} + +void BP5Writer::ExitComputationBlock() noexcept +{ + if (m_Parameters.AsyncWrite && m_InComputationBlock) + { + double t = Seconds(Now() - m_ComputationBlockStart).count(); + { + m_AsyncWriteLock.lock(); + if (t > 0.1) // only register long enough intervals + { + m_ComputationBlockTimes.emplace_back(m_ComputationBlockID, t); + m_ComputationBlocksLength += t; + } + m_InComputationBlock = false; + ++m_ComputationBlockID; + m_AsyncWriteLock.unlock(); + } + } +} + void BP5Writer::FlushData(const bool isFinal) { BufferV *DataBuf; @@ -874,16 +1022,19 @@ void BP5Writer::FlushData(const bool isFinal) m_Parameters.BufferChunkSize)); } + auto databufsize = DataBuf->Size(); WriteData(DataBuf); + /* DataBuf is deleted in WriteData() */ + DataBuf = nullptr; - m_ThisTimestepDataSize += DataBuf->Size(); + m_ThisTimestepDataSize += databufsize; if (!isFinal) { size_t tmp[2]; // aggregate start pos and data size to rank 0 tmp[0] = m_StartDataPos; - tmp[1] = DataBuf->Size(); + tmp[1] = databufsize; std::vector RecvBuffer; if (m_Comm.Rank() == 0) @@ -896,7 +1047,6 @@ void BP5Writer::FlushData(const bool isFinal) FlushPosSizeInfo.push_back(RecvBuffer); } } - delete DataBuf; } void BP5Writer::Flush(const int transportIndex) { FlushData(false); } @@ -914,6 +1064,18 @@ void BP5Writer::DoClose(const int transportIndex) { EndStep(); } + + TimePoint wait_start = Now(); + Seconds wait(0.0); + if (m_WriteFuture.valid()) + { + m_AsyncWriteLock.lock(); + m_flagRush = true; + m_AsyncWriteLock.unlock(); + m_WriteFuture.get(); + wait += Now() - wait_start; + } + m_FileDataManager.CloseFiles(transportIndex); // Delete files from temporary storage if draining was on @@ -924,7 +1086,28 @@ void BP5Writer::DoClose(const int transportIndex) // close metametadata file m_FileMetaMetadataManager.CloseFiles(); + } + if (m_Parameters.AsyncWrite) + { + // wait until all process' writing thread completes + wait_start = Now(); + m_Comm.Barrier(); + AsyncWriteDataCleanup(); + wait += Now() - wait_start; + if (m_Comm.Rank() == 0) + { + std::cout << "Close waited " << wait.count() + << " seconds on async threads" << std::endl; + } + } + + if (m_Comm.Rank() == 0) + { + if (m_Parameters.AsyncWrite) + { + WriteMetadataFileIndex(m_LatestMetaDataPos, m_LatestMetaDataSize); + } // close metadata index file m_FileMetadataIndexManager.CloseFiles(); } diff --git a/source/adios2/engine/bp5/BP5Writer.h b/source/adios2/engine/bp5/BP5Writer.h index 4c257763b6..96402d348d 100644 --- a/source/adios2/engine/bp5/BP5Writer.h +++ b/source/adios2/engine/bp5/BP5Writer.h @@ -20,6 +20,8 @@ #include "adios2/toolkit/burstbuffer/FileDrainerSingleThread.h" #include "adios2/toolkit/format/bp5/BP5Serializer.h" #include "adios2/toolkit/format/buffer/BufferV.h" +#include "adios2/toolkit/shm/Spinlock.h" +#include "adios2/toolkit/shm/TokenChain.h" #include "adios2/toolkit/transportman/TransportMan.h" namespace adios2 @@ -108,6 +110,10 @@ class BP5Writer : public BP5Engine, public core::Engine void InitBPBuffer(); void NotifyEngineAttribute(std::string name, DataType type) noexcept; + void EnterComputationBlock() noexcept; + /** Inform about computation block through User->ADIOS->IO */ + void ExitComputationBlock() noexcept; + #define declare_type(T) \ void DoPut(Variable &variable, typename Variable::Span &span, \ const bool initialize, const T &value) final; @@ -156,7 +162,10 @@ class BP5Writer : public BP5Engine, public core::Engine void WriteData(format::BufferV *Data); void WriteData_EveryoneWrites(format::BufferV *Data, bool SerializedWriters); + void WriteData_EveryoneWrites_Async(format::BufferV *Data, + bool SerializedWriters); void WriteData_TwoLevelShm(format::BufferV *Data); + void WriteData_TwoLevelShm_Async(format::BufferV *Data); void PopulateMetadataIndexFileContent( format::BufferSTL &buffer, const uint64_t currentStep, @@ -226,6 +235,100 @@ class BP5Writer : public BP5Engine, public core::Engine void MakeHeader(format::BufferSTL &b, const std::string fileType, const bool isActive); + + /* Async write's future */ + std::future m_WriteFuture; + // variables to delay writing to index file + uint64_t m_LatestMetaDataPos; + uint64_t m_LatestMetaDataSize; + Seconds m_LastTimeBetweenSteps = Seconds(0.0); + Seconds m_TotalTimeBetweenSteps = Seconds(0.0); + Seconds m_AvgTimeBetweenSteps = Seconds(0.0); + Seconds m_ExpectedTimeBetweenSteps = Seconds(0.0); + TimePoint m_EndStepEnd; + TimePoint m_EngineStart; + TimePoint m_BeginStepStart; + bool m_flagRush; // main thread flips this in Close, async thread watches it + bool m_InComputationBlock = false; // main thread flips this in Clos + TimePoint m_ComputationBlockStart; + /* block counter and length in seconds */ + size_t m_ComputationBlockID = 0; + + struct ComputationBlockInfo + { + size_t blockID; + double length; // seconds + ComputationBlockInfo(const size_t id, const double len) + : blockID(id), length(len){}; + }; + + std::vector m_ComputationBlockTimes; + /* sum of computationBlockTimes at start of async IO; */ + double m_ComputationBlocksLength = 0.0; + + /* struct of data passed from main thread to async write thread at launch */ + struct AsyncWriteInfo + { + adios2::aggregator::MPIAggregator *aggregator; + int rank_global; + helper::Comm comm_chain; + int rank_chain; + int nproc_chain; + TimePoint tstart; + adios2::shm::TokenChain *tokenChain; + transportman::TransportMan *tm; + adios2::format::BufferV *Data; + uint64_t startPos; + uint64_t totalSize; + double deadline; // wall-clock time available in seconds + bool *flagRush; // flipped from false to true by main thread + bool *inComputationBlock; // flipped back and forth by main thread + // comm-free time within deadline in seconds + double computationBlocksLength; + std::vector expectedComputationBlocks; // a copy + std::vector + *currentComputationBlocks; // extended by main thread + size_t *currentComputationBlockID; // increased by main thread + shm::Spinlock *lock; // race condition over currentComp* variables + }; + + AsyncWriteInfo *m_AsyncWriteInfo; + /* lock to handle race condition over the following currentComp* variables + m_InComputationBlock / AsyncWriteInfo::inComputationBlock + m_ComputationBlockID / AsyncWriteInfo::currentComputationBlockID + m_flagRush / AsyncWriteInfo::flagRush + Currently not used + m_ComputationBlockTimes / AsyncWriteInfo::currentComputationBlocks + Note: The rush flag does not need protection but CI TSAN sanitizer + screams data race if not protected. + */ + shm::Spinlock m_AsyncWriteLock; + + /* Static functions that will run in another thread */ + static int AsyncWriteThread_EveryoneWrites(AsyncWriteInfo *info); + static int AsyncWriteThread_TwoLevelShm(AsyncWriteInfo *info); + static void AsyncWriteThread_TwoLevelShm_Aggregator(AsyncWriteInfo *info); + static void AsyncWriteThread_TwoLevelShm_SendDataToAggregator( + aggregator::MPIShmChain *a, format::BufferV *Data); + + /* write own data used by both + EveryoneWrites and TwoLevelShm async threads */ + static void AsyncWriteOwnData(AsyncWriteInfo *info, + std::vector &DataVec, + const size_t totalsize, + const bool seekOnFirstWrite); + enum class ComputationStatus + { + InComp, + NotInComp_ExpectMore, + NoMoreComp + }; + static ComputationStatus IsInComputationBlock(AsyncWriteInfo *info, + size_t &compBlockIdx); + + void AsyncWriteDataCleanup(); + void AsyncWriteDataCleanup_EveryoneWrites(); + void AsyncWriteDataCleanup_TwoLevelShm(); }; } // end namespace engine diff --git a/source/adios2/engine/bp5/BP5Writer_EveryoneWrites_Async.cpp b/source/adios2/engine/bp5/BP5Writer_EveryoneWrites_Async.cpp new file mode 100644 index 0000000000..fdf9985315 --- /dev/null +++ b/source/adios2/engine/bp5/BP5Writer_EveryoneWrites_Async.cpp @@ -0,0 +1,357 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * BP5Writer.cpp + * + */ + +#include "BP5Writer.h" +#include "BP5Writer.tcc" + +#include "adios2/common/ADIOSMacros.h" +#include "adios2/core/IO.h" +#include "adios2/helper/adiosFunctions.h" //CheckIndexRange +#include "adios2/toolkit/format/buffer/chunk/ChunkV.h" +#include "adios2/toolkit/format/buffer/malloc/MallocV.h" +#include "adios2/toolkit/transport/file/FileFStream.h" +#include + +#include // max +#include +#include + +namespace adios2 +{ +namespace core +{ +namespace engine +{ + +using namespace adios2::format; + +BP5Writer::ComputationStatus +BP5Writer::IsInComputationBlock(AsyncWriteInfo *info, size_t &compBlockIdx) +{ + ComputationStatus compStatus = ComputationStatus::NotInComp_ExpectMore; + size_t nExpectedBlocks = info->expectedComputationBlocks.size(); + + if (compBlockIdx >= nExpectedBlocks) + { + compStatus = ComputationStatus::NoMoreComp; + } + else + { + bool inComp = false; + size_t compBlockID = 0; + // access variables modified by main thread to avoid data race + info->lock->lock(); + compBlockID = *info->currentComputationBlockID; + inComp = *info->inComputationBlock; + info->lock->unlock(); + + /* Track which computation block we are in */ + if (inComp) + { + while (compBlockIdx < nExpectedBlocks && + info->expectedComputationBlocks[compBlockIdx].blockID < + compBlockID) + { + ++compBlockIdx; + } + if (info->expectedComputationBlocks[compBlockIdx].blockID > + compBlockID) + { + // the current computation block is a short one that was not + // recorded + compStatus = ComputationStatus::NotInComp_ExpectMore; + } + else + { + compStatus = ComputationStatus::InComp; + } + } + } + return compStatus; +} + +void BP5Writer::AsyncWriteOwnData(AsyncWriteInfo *info, + std::vector &DataVec, + const size_t totalsize, + const bool seekOnFirstWrite) +{ + /* local variables to track variables modified by main thread */ + size_t compBlockIdx = 0; /* position in vector to get length */ + + /* In a loop, write the data in smaller blocks */ + size_t nBlocks = DataVec.size(); + size_t wrote = 0; + size_t block = 0; + size_t temp_offset = 0; + size_t max_size = std::max(1024 * 1024UL, totalsize / 100UL); + + bool firstWrite = seekOnFirstWrite; + while (block < nBlocks) + { + bool doRush = false; + bool doSleep = false; + + info->lock->lock(); + doRush = *info->flagRush; + info->lock->unlock(); + + if (!doRush) + { + ComputationStatus compStatus = + IsInComputationBlock(info, compBlockIdx); + + /* Scheduling decisions: + Cases: + 1. Not in a computation block AND we still expect more + computation blocks down the line ==> Sleep + 2. In computation block ==> Write + 3. We are at the end of a computation block (how close??) AND we + still expect more computation blocks down the line 3. ==> Sleep + 4. We are at the end of the LAST computation block ==> Write + 5. No more computation blocks expected ==> Write all at once + 6. Main thread set flagRush ==> Write all at once + -- case 3 not handled yet properly + */ + + switch (compStatus) + { + case ComputationStatus::NotInComp_ExpectMore: + // case 1 + doSleep = true; + break; + case ComputationStatus::NoMoreComp: + // case 5 + doRush = true; + break; + default: + // cases 2, 3, 4 + break; + } + } + + if (doRush) + { + auto vec = std::vector(DataVec.begin() + block, + DataVec.end()); + vec[0].iov_base = + (const char *)DataVec[block].iov_base + temp_offset; + vec[0].iov_len = DataVec[block].iov_len - temp_offset; + size_t pos = MaxSizeT; // <==> no seek inside WriteFileAt + if (firstWrite) + { + pos = info->startPos + wrote; // seek to pos + } + /*std::cout << "Async write on Rank " << info->rank_global + << " write the rest of " << totalsize - wrote + << " bytes at pos " << pos << std::endl;*/ + + info->tm->WriteFileAt(vec.data(), vec.size(), pos); + + break; /* Exit loop after this final write */ + } + + if (doSleep) + { + std::this_thread::sleep_for(core::Seconds(0.01)); + continue; + } + + /* Write next batch of data */ + + /* Get the next n bytes from the current block, current offset */ + size_t n = DataVec[block].iov_len - temp_offset; + if (n > max_size) + { + n = max_size; + } + + if (firstWrite) + { + info->tm->WriteFileAt((const char *)DataVec[block].iov_base + + temp_offset, + n, info->startPos); + firstWrite = false; + } + else + { + info->tm->WriteFiles( + (const char *)DataVec[block].iov_base + temp_offset, n); + } + + /* Have we processed the entire block or staying with it? */ + if (n + temp_offset < DataVec[block].iov_len) + { + temp_offset += n; + } + else + { + temp_offset = 0; + ++block; + } + wrote += n; + } +}; + +int BP5Writer::AsyncWriteThread_EveryoneWrites(AsyncWriteInfo *info) +{ + if (info->tokenChain) + { + if (info->rank_chain > 0) + { + info->tokenChain->RecvToken(); + } + } + + std::vector DataVec = info->Data->DataVec(); + const uint64_t mysize = info->Data->Size(); + AsyncWriteOwnData(info, DataVec, mysize, true); + + if (info->tokenChain) + { + uint64_t t = 1; + info->tokenChain->SendToken(t); + if (!info->rank_chain) + { + info->tokenChain->RecvToken(); + } + } + delete info->Data; + return 1; +}; + +void BP5Writer::WriteData_EveryoneWrites_Async(format::BufferV *Data, + bool SerializedWriters) +{ + + const aggregator::MPIChain *a = + dynamic_cast(m_Aggregator); + + // new step writing starts at offset m_DataPos on aggregator + // others will wait for the position to arrive from the rank below + + if (a->m_Comm.Rank() > 0) + { + a->m_Comm.Recv( + &m_DataPos, 1, a->m_Comm.Rank() - 1, 0, + "Chain token in BP5Writer::WriteData_EveryoneWrites_Async"); + } + + // align to PAGE_SIZE + m_DataPos += helper::PaddingToAlignOffset(m_DataPos, + m_Parameters.FileSystemPageSize); + m_StartDataPos = m_DataPos; + + if (a->m_Comm.Rank() < a->m_Comm.Size() - 1) + { + uint64_t nextWriterPos = m_DataPos + Data->Size(); + a->m_Comm.Isend( + &nextWriterPos, 1, a->m_Comm.Rank() + 1, 0, + "Chain token in BP5Writer::WriteData_EveryoneWrites_Async"); + } + + m_DataPos += Data->Size(); + + /* a->comm can span multiple nodes but we need comm inside a node + when doing serialized aggregation */ + m_AsyncWriteInfo = new AsyncWriteInfo(); + m_AsyncWriteInfo->aggregator = nullptr; + m_AsyncWriteInfo->rank_global = m_Comm.Rank(); + if (SerializedWriters) + { + m_AsyncWriteInfo->comm_chain = a->m_Comm.GroupByShm(); + m_AsyncWriteInfo->rank_chain = m_AsyncWriteInfo->comm_chain.Rank(); + m_AsyncWriteInfo->nproc_chain = m_AsyncWriteInfo->comm_chain.Size(); + m_AsyncWriteInfo->tokenChain = + new shm::TokenChain(&m_AsyncWriteInfo->comm_chain); + } + else + { + m_AsyncWriteInfo->comm_chain = helper::Comm(); // not needed + m_AsyncWriteInfo->rank_chain = a->m_Comm.Rank(); + m_AsyncWriteInfo->nproc_chain = a->m_Comm.Size(); + m_AsyncWriteInfo->tokenChain = nullptr; + } + m_AsyncWriteInfo->tstart = m_EngineStart; + m_AsyncWriteInfo->tm = &m_FileDataManager; + m_AsyncWriteInfo->Data = Data; + m_AsyncWriteInfo->startPos = m_StartDataPos; + m_AsyncWriteInfo->totalSize = Data->Size(); + m_AsyncWriteInfo->deadline = m_ExpectedTimeBetweenSteps.count(); + m_AsyncWriteInfo->flagRush = &m_flagRush; + m_AsyncWriteInfo->lock = &m_AsyncWriteLock; + + if (m_ComputationBlocksLength > 0.0 && + m_Parameters.AsyncWrite == (int)AsyncWrite::Guided) + { + m_AsyncWriteInfo->inComputationBlock = &m_InComputationBlock; + m_AsyncWriteInfo->computationBlocksLength = m_ComputationBlocksLength; + if (m_AsyncWriteInfo->deadline < m_ComputationBlocksLength) + { + m_AsyncWriteInfo->deadline = m_ComputationBlocksLength; + } + m_AsyncWriteInfo->expectedComputationBlocks = + m_ComputationBlockTimes; // copy! + m_AsyncWriteInfo->currentComputationBlocks = + &m_ComputationBlockTimes; // ptr! + m_AsyncWriteInfo->currentComputationBlockID = &m_ComputationBlockID; + + /* Clear current block tracker now so that async thread does not get + confused with the past info */ + m_ComputationBlockTimes.clear(); + m_ComputationBlocksLength = 0.0; + m_ComputationBlockID = 0; + } + else + { + if (m_Parameters.AsyncWrite == (int)AsyncWrite::Naive) + { + m_AsyncWriteInfo->deadline = 0; + } + m_AsyncWriteInfo->inComputationBlock = nullptr; + m_AsyncWriteInfo->computationBlocksLength = 0.0; + m_AsyncWriteInfo->currentComputationBlocks = nullptr; + m_AsyncWriteInfo->currentComputationBlockID = nullptr; + } + + m_WriteFuture = std::async( + std::launch::async, AsyncWriteThread_EveryoneWrites, m_AsyncWriteInfo); + + // At this point modifying Data in main thread is prohibited !!! + + if (a->m_Comm.Size() > 1) + { + // at the end, last rank sends back the final data pos to first rank + // so it can update its data pos + if (a->m_Comm.Rank() == a->m_Comm.Size() - 1) + { + a->m_Comm.Isend(&m_DataPos, 1, 0, 0, + "Final chain token in " + "BP5Writer::WriteData_EveryoneWrites_Async"); + } + if (a->m_Comm.Rank() == 0) + { + a->m_Comm.Recv( + &m_DataPos, 1, a->m_Comm.Size() - 1, 0, + "Chain token in BP5Writer::WriteData_EveryoneWrites_Async"); + } + } +} + +void BP5Writer::AsyncWriteDataCleanup_EveryoneWrites() +{ + if (m_AsyncWriteInfo->tokenChain) + { + delete m_AsyncWriteInfo->tokenChain; + } + delete m_AsyncWriteInfo; + m_AsyncWriteInfo = nullptr; +} + +} // end namespace engine +} // end namespace core +} // end namespace adios2 diff --git a/source/adios2/engine/bp5/BP5Writer_TwoLevelShm.cpp b/source/adios2/engine/bp5/BP5Writer_TwoLevelShm.cpp index 5518861754..9bf3473899 100644 --- a/source/adios2/engine/bp5/BP5Writer_TwoLevelShm.cpp +++ b/source/adios2/engine/bp5/BP5Writer_TwoLevelShm.cpp @@ -13,6 +13,7 @@ #include "adios2/helper/adiosFunctions.h" //CheckIndexRange, PaddingToAlignOffset #include "adios2/toolkit/format/buffer/chunk/ChunkV.h" #include "adios2/toolkit/format/buffer/malloc/MallocV.h" +#include "adios2/toolkit/shm/TokenChain.h" #include "adios2/toolkit/transport/file/FileFStream.h" #include @@ -42,22 +43,6 @@ void BP5Writer::WriteData_TwoLevelShm(format::BufferV *Data) m_DataPos += helper::PaddingToAlignOffset(m_DataPos, m_Parameters.FileSystemPageSize); - /* - // Each aggregator needs to know the total size they write - // including alignment to page size - // This calculation is valid on aggregators only - std::vector mySizes = a->m_Comm.GatherValues(Data->Size()); - uint64_t myTotalSize = 0; - uint64_t pos = m_DataPos; - for (auto s : mySizes) - { - uint64_t alignment = - helper::PaddingToAlignOffset(pos, m_Parameters.FileSystemPageSize); - myTotalSize += alignment + s; - pos += alignment + s; - } - */ - // Each aggregator needs to know the total size they write // This calculation is valid on aggregators only std::vector mySizes = a->m_Comm.GatherValues(Data->Size()); @@ -77,6 +62,8 @@ void BP5Writer::WriteData_TwoLevelShm(format::BufferV *Data) a->CreateShm(static_cast(maxSize), m_Parameters.MaxShmSize); } + shm::TokenChain tokenChain(&a->m_Comm); + if (a->m_IsAggregator) { // In each aggregator chain, send from master down the line @@ -115,12 +102,8 @@ void BP5Writer::WriteData_TwoLevelShm(format::BufferV *Data) // Send token to first non-aggregator to start filling shm // Also informs next process its starting offset (for correct metadata) - if (a->m_Comm.Size() > 1) - { - uint64_t nextWriterPos = m_DataPos + Data->Size(); - a->m_Comm.Isend(&nextWriterPos, 1, a->m_Comm.Rank() + 1, 0, - "Shm token in BP5Writer::WriteData_TwoLevelShm"); - } + uint64_t nextWriterPos = m_DataPos + Data->Size(); + tokenChain.SendToken(nextWriterPos); WriteMyOwnData(Data); @@ -145,8 +128,7 @@ void BP5Writer::WriteData_TwoLevelShm(format::BufferV *Data) { // non-aggregators fill shared buffer in marching order // they also receive their starting offset this way - a->m_Comm.Recv(&m_StartDataPos, 1, a->m_Comm.Rank() - 1, 0, - "Shm token in BP5Writer::WriteData_TwoLevelShm"); + m_StartDataPos = tokenChain.RecvToken(); /*std::cout << "Rank " << m_Comm.Rank() << " non-aggregator recv token to fill shm = " @@ -154,12 +136,8 @@ void BP5Writer::WriteData_TwoLevelShm(format::BufferV *Data) SendDataToAggregator(Data); - if (a->m_Comm.Rank() < a->m_Comm.Size() - 1) - { - uint64_t nextWriterPos = m_StartDataPos + Data->Size(); - a->m_Comm.Isend(&nextWriterPos, 1, a->m_Comm.Rank() + 1, 0, - "Shm token in BP5Writer::WriteData_TwoLevelShm"); - } + uint64_t nextWriterPos = m_StartDataPos + Data->Size(); + tokenChain.SendToken(nextWriterPos); } if (a->m_Comm.Size() > 1) diff --git a/source/adios2/engine/bp5/BP5Writer_TwoLevelShm_Async.cpp b/source/adios2/engine/bp5/BP5Writer_TwoLevelShm_Async.cpp new file mode 100644 index 0000000000..25d5cf991d --- /dev/null +++ b/source/adios2/engine/bp5/BP5Writer_TwoLevelShm_Async.cpp @@ -0,0 +1,350 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * BP5Writer.cpp + * + */ + +#include "BP5Writer.h" + +#include "adios2/common/ADIOSMacros.h" +#include "adios2/core/CoreTypes.h" +#include "adios2/core/IO.h" +#include "adios2/helper/adiosFunctions.h" //CheckIndexRange, PaddingToAlignOffset +#include "adios2/toolkit/format/buffer/chunk/ChunkV.h" +#include "adios2/toolkit/format/buffer/malloc/MallocV.h" +#include "adios2/toolkit/transport/file/FileFStream.h" +#include + +#include +#include +#include +#include + +namespace adios2 +{ +namespace core +{ +namespace engine +{ + +using namespace adios2::format; + +/* Aggregator part of the async two level aggregation Guided version + This process is the one writing to disk +*/ +void BP5Writer::AsyncWriteThread_TwoLevelShm_Aggregator(AsyncWriteInfo *info) +{ + aggregator::MPIShmChain *a = + dynamic_cast(info->aggregator); + uint64_t totalSize = info->totalSize; + + /* Write own data first */ + { + std::vector DataVec = info->Data->DataVec(); + const uint64_t mysize = info->Data->Size(); + info->tm->SeekTo(info->startPos); + AsyncWriteOwnData(info, DataVec, mysize, false); + totalSize -= mysize; + } + + /* Write from shm until every non-aggr sent all data */ + std::vector DataVec(1); + size_t wrote = 0; + while (wrote < totalSize) + { + /* Write the next shm block now */ + // potentially blocking call waiting on some non-aggr process + aggregator::MPIShmChain::ShmDataBuffer *b = a->LockConsumerBuffer(); + // b->actual_size: how much we need to write + DataVec[0].iov_base = b->buf; + DataVec[0].iov_len = b->actual_size; + AsyncWriteOwnData(info, DataVec, b->actual_size, false); + wrote += b->actual_size; + a->UnlockConsumerBuffer(); + } +} + +/* Non-aggregator part of the async two level aggregation. + This process passes data to Aggregator through SHM segment. + tokenChain in caller ensures only one process (per aggregator chain) + is running this function at a time +*/ +void BP5Writer::AsyncWriteThread_TwoLevelShm_SendDataToAggregator( + aggregator::MPIShmChain *a, format::BufferV *Data) +{ + /* In a loop, copy the local data into the shared memory, alternating + between the two segments. + */ + + std::vector DataVec = Data->DataVec(); + size_t nBlocks = DataVec.size(); + + size_t sent = 0; + size_t block = 0; + size_t temp_offset = 0; + while (block < nBlocks) + { + // potentially blocking call waiting on Aggregator + aggregator::MPIShmChain::ShmDataBuffer *b = a->LockProducerBuffer(); + // b->max_size: how much we can copy + // b->actual_size: how much we actually copy + b->actual_size = 0; + while (true) + { + /* Copy n bytes from the current block, current offset to shm + making sure to use up to shm_size bytes + */ + size_t n = DataVec[block].iov_len - temp_offset; + if (n > (b->max_size - b->actual_size)) + { + n = b->max_size - b->actual_size; + } + std::memcpy(&b->buf[b->actual_size], + (const char *)DataVec[block].iov_base + temp_offset, n); + b->actual_size += n; + + /* Have we processed the entire block or staying with it? */ + if (n + temp_offset < DataVec[block].iov_len) + { + temp_offset += n; + } + else + { + temp_offset = 0; + ++block; + } + + /* Have we reached the max allowed shm size ?*/ + if (b->actual_size >= b->max_size) + { + break; + } + if (block >= nBlocks) + { + break; + } + } + sent += b->actual_size; + a->UnlockProducerBuffer(); + } +} + +int BP5Writer::AsyncWriteThread_TwoLevelShm(AsyncWriteInfo *info) +{ + /* DO NOT use MPI in this separate thread, including destroying + shm segments explicitely (a->DestroyShm) or implicitely (tokenChain) */ + Seconds ts = Now() - info->tstart; + // std::cout << "ASYNC rank " << info->rank_global + // << " starts at: " << ts.count() << std::endl; + aggregator::MPIShmChain *a = + dynamic_cast(info->aggregator); + if (a->m_IsAggregator) + { + // Send token to first non-aggregator to start filling shm + // Also informs next process its starting offset (for correct + // metadata) + uint64_t nextWriterPos = info->startPos + info->Data->Size(); + info->tokenChain->SendToken(nextWriterPos); + AsyncWriteThread_TwoLevelShm_Aggregator(info); + info->tokenChain->RecvToken(); + } + else + { + // non-aggregators fill shared buffer in marching order + // they also receive their starting offset this way + uint64_t startPos = info->tokenChain->RecvToken(); + AsyncWriteThread_TwoLevelShm_SendDataToAggregator(a, info->Data); + uint64_t nextWriterPos = startPos + info->Data->Size(); + info->tokenChain->SendToken(nextWriterPos); + } + delete info->Data; + + ts = Now() - info->tstart; + /*std::cout << "ASYNC " << info->rank_global << " ended at: " << ts.count() + << std::endl;*/ + return 1; +}; + +void BP5Writer::WriteData_TwoLevelShm_Async(format::BufferV *Data) +{ + aggregator::MPIShmChain *a = + dynamic_cast(m_Aggregator); + + // new step writing starts at offset m_DataPos on master aggregator + // other aggregators to the same file will need to wait for the position + // to arrive from the rank below + + // align to PAGE_SIZE (only valid on master aggregator at this point) + m_DataPos += helper::PaddingToAlignOffset(m_DataPos, + m_Parameters.FileSystemPageSize); + + // Each aggregator needs to know the total size they write + // This calculation is valid on aggregators only + std::vector mySizes = a->m_Comm.GatherValues(Data->Size()); + uint64_t myTotalSize = 0; + uint64_t maxSize = 0; + for (auto s : mySizes) + { + myTotalSize += s; + if (s > maxSize) + { + maxSize = s; + } + } + + if (a->m_Comm.Size() > 1) + { + a->CreateShm(static_cast(maxSize), m_Parameters.MaxShmSize); + } + + if (a->m_IsAggregator) + { + // In each aggregator chain, send from master down the line + // these total sizes, so every aggregator knows where to start + if (a->m_AggregatorChainComm.Rank() > 0) + { + a->m_AggregatorChainComm.Recv( + &m_DataPos, 1, a->m_AggregatorChainComm.Rank() - 1, 0, + "AggregatorChain token in BP5Writer::WriteData_TwoLevelShm"); + // align to PAGE_SIZE + m_DataPos += helper::PaddingToAlignOffset( + m_DataPos, m_Parameters.FileSystemPageSize); + } + m_StartDataPos = m_DataPos; // metadata needs this info + if (a->m_AggregatorChainComm.Rank() < + a->m_AggregatorChainComm.Size() - 1) + { + uint64_t nextWriterPos = m_DataPos + myTotalSize; + a->m_AggregatorChainComm.Isend( + &nextWriterPos, 1, a->m_AggregatorChainComm.Rank() + 1, 0, + "Chain token in BP5Writer::WriteData"); + } + else if (a->m_AggregatorChainComm.Size() > 1) + { + // send back final position from last aggregator in file to master + // aggregator + uint64_t nextWriterPos = m_DataPos + myTotalSize; + a->m_AggregatorChainComm.Isend( + &nextWriterPos, 1, 0, 0, "Chain token in BP5Writer::WriteData"); + } + + // Master aggregator needs to know where the last writing ended by the + // last aggregator in the chain, so that it can start from the correct + // position at the next output step + if (!a->m_AggregatorChainComm.Rank()) + { + if (a->m_AggregatorChainComm.Size() > 1) + { + a->m_AggregatorChainComm.Recv( + &m_DataPos, 1, a->m_AggregatorChainComm.Size() - 1, 0, + "Chain token in BP5Writer::WriteData"); + } + else + { + m_DataPos = m_StartDataPos + myTotalSize; + } + } + } + + /*std::cout << "Rank " << m_Comm.Rank() << " start data async " + << " to subfile " << a->m_SubStreamIndex << " at pos " + << m_StartDataPos << std::endl;*/ + + m_AsyncWriteInfo = new AsyncWriteInfo(); + m_AsyncWriteInfo->aggregator = m_Aggregator; + m_AsyncWriteInfo->rank_global = m_Comm.Rank(); + m_AsyncWriteInfo->rank_chain = a->m_Comm.Rank(); + m_AsyncWriteInfo->nproc_chain = a->m_Comm.Size(); + m_AsyncWriteInfo->comm_chain = helper::Comm(); // unused in this aggregation + m_AsyncWriteInfo->tstart = m_EngineStart; + m_AsyncWriteInfo->tokenChain = new shm::TokenChain(&a->m_Comm); + m_AsyncWriteInfo->tm = &m_FileDataManager; + m_AsyncWriteInfo->Data = Data; + m_AsyncWriteInfo->flagRush = &m_flagRush; + m_AsyncWriteInfo->lock = &m_AsyncWriteLock; + + // Metadata collection needs m_StartDataPos correctly set on + // every process before we call the async writing thread + if (a->m_IsAggregator) + { + // Informs next process its starting offset (for correct metadata) + uint64_t nextWriterPos = m_StartDataPos + Data->Size(); + m_AsyncWriteInfo->tokenChain->SendToken(nextWriterPos); + m_AsyncWriteInfo->tokenChain->RecvToken(); + } + else + { + // non-aggregators fill shared buffer in marching order + // they also receive their starting offset this way + m_StartDataPos = m_AsyncWriteInfo->tokenChain->RecvToken(); + uint64_t nextWriterPos = m_StartDataPos + Data->Size(); + m_AsyncWriteInfo->tokenChain->SendToken(nextWriterPos); + } + + // Launch data writing thread, m_StartDataPos is valid + // m_DataPos is already pointing to the end of the write, do not use here. + m_AsyncWriteInfo->startPos = m_StartDataPos; + m_AsyncWriteInfo->totalSize = myTotalSize; + m_AsyncWriteInfo->deadline = m_ExpectedTimeBetweenSteps.count(); + + if (m_ComputationBlocksLength > 0.0 && + m_Parameters.AsyncWrite == (int)AsyncWrite::Guided) + { + m_AsyncWriteInfo->inComputationBlock = &m_InComputationBlock; + m_AsyncWriteInfo->computationBlocksLength = m_ComputationBlocksLength; + if (m_AsyncWriteInfo->deadline < m_ComputationBlocksLength) + { + m_AsyncWriteInfo->deadline = m_ComputationBlocksLength; + } + m_AsyncWriteInfo->expectedComputationBlocks = + m_ComputationBlockTimes; // copy! + m_AsyncWriteInfo->currentComputationBlocks = + &m_ComputationBlockTimes; // ptr! + m_AsyncWriteInfo->currentComputationBlockID = &m_ComputationBlockID; + + /* Clear current block tracker now so that async thread does not get + confused with the past info */ + m_ComputationBlockTimes.clear(); + m_ComputationBlocksLength = 0.0; + m_ComputationBlockID = 0; + } + else + { + if (m_Parameters.AsyncWrite == (int)AsyncWrite::Naive) + { + m_AsyncWriteInfo->deadline = 0; + } + m_AsyncWriteInfo->inComputationBlock = nullptr; + m_AsyncWriteInfo->computationBlocksLength = 0.0; + m_AsyncWriteInfo->currentComputationBlocks = nullptr; + m_AsyncWriteInfo->currentComputationBlockID = nullptr; + } + + m_WriteFuture = std::async(std::launch::async, AsyncWriteThread_TwoLevelShm, + m_AsyncWriteInfo); + + /* At this point it is prohibited in the main thread + - to modify Data, which will be deleted in the async thread any tiume + - to use m_FileDataManager until next BeginStep, which is being used + in the async thread to write data + */ +} + +void BP5Writer::AsyncWriteDataCleanup_TwoLevelShm() +{ + aggregator::MPIShmChain *a = + dynamic_cast(m_AsyncWriteInfo->aggregator); + if (a->m_Comm.Size() > 1) + { + a->DestroyShm(); + } + delete m_AsyncWriteInfo->tokenChain; + delete m_AsyncWriteInfo; + m_AsyncWriteInfo = nullptr; +} + +} // end namespace engine +} // end namespace core +} // end namespace adios2 diff --git a/source/adios2/helper/adiosComm.cpp b/source/adios2/helper/adiosComm.cpp index 4c119f65cc..679ead0b8a 100644 --- a/source/adios2/helper/adiosComm.cpp +++ b/source/adios2/helper/adiosComm.cpp @@ -129,14 +129,23 @@ int Comm::Win_free(Win &win, const std::string &hint) { return m_Impl->Win_free(win, hint); } -int Comm::Win_Lock(LockType lock_type, int rank, int assert, Win &win, +int Comm::Win_lock(LockType lock_type, int rank, int assert, Win &win, const std::string &hint) { - return m_Impl->Win_Lock(lock_type, rank, assert, win, hint); + return m_Impl->Win_lock(lock_type, rank, assert, win, hint); } -int Comm::Win_Unlock(int rank, Win &win, const std::string &hint) +int Comm::Win_unlock(int rank, Win &win, const std::string &hint) { - return m_Impl->Win_Unlock(rank, win, hint); + return m_Impl->Win_unlock(rank, win, hint); +} + +int Comm::Win_lock_all(int assert, Win &win, const std::string &hint) +{ + return m_Impl->Win_lock_all(assert, win, hint); +} +int Comm::Win_unlock_all(Win &win, const std::string &hint) +{ + return m_Impl->Win_unlock_all(win, hint); } Comm::Req::Req() = default; diff --git a/source/adios2/helper/adiosComm.h b/source/adios2/helper/adiosComm.h index 4f4ed2fc54..ac9293a56f 100644 --- a/source/adios2/helper/adiosComm.h +++ b/source/adios2/helper/adiosComm.h @@ -271,9 +271,12 @@ class Comm void *baseptr, const std::string &hint = std::string()); int Win_free(Win &win, const std::string &hint = std::string()); - int Win_Lock(LockType lock_type, int rank, int assert, Win &win, + int Win_lock(LockType lock_type, int rank, int assert, Win &win, const std::string &hint = std::string()); - int Win_Unlock(int rank, Win &win, const std::string &hint = std::string()); + int Win_unlock(int rank, Win &win, const std::string &hint = std::string()); + int Win_lock_all(int assert, Win &win, + const std::string &hint = std::string()); + int Win_unlock_all(Win &win, const std::string &hint = std::string()); private: friend class CommImpl; @@ -518,11 +521,14 @@ class CommImpl int *disp_unit, void *baseptr, const std::string &hint) const = 0; virtual int Win_free(Comm::Win &win, const std::string &hint) const = 0; - virtual int Win_Lock(Comm::LockType lock_type, int rank, int assert, + virtual int Win_lock(Comm::LockType lock_type, int rank, int assert, Comm::Win &win, const std::string &hint) const = 0; - virtual int Win_Unlock(int rank, Comm::Win &win, + virtual int Win_unlock(int rank, Comm::Win &win, const std::string &hint) const = 0; - + virtual int Win_lock_all(int assert, Comm::Win &win, + const std::string &hint) const = 0; + virtual int Win_unlock_all(Comm::Win &win, + const std::string &hint) const = 0; static size_t SizeOf(Datatype datatype); static Comm MakeComm(std::unique_ptr impl); diff --git a/source/adios2/helper/adiosCommDummy.cpp b/source/adios2/helper/adiosCommDummy.cpp index 7c4ae8fd00..f3c64ccd67 100644 --- a/source/adios2/helper/adiosCommDummy.cpp +++ b/source/adios2/helper/adiosCommDummy.cpp @@ -121,10 +121,13 @@ class CommImplDummy : public CommImpl int Win_shared_query(Comm::Win &win, int rank, size_t *size, int *disp_unit, void *baseptr, const std::string &hint) const override; int Win_free(Comm::Win &win, const std::string &hint) const override; - int Win_Lock(Comm::LockType lock_type, int rank, int assert, Comm::Win &win, + int Win_lock(Comm::LockType lock_type, int rank, int assert, Comm::Win &win, const std::string &hint) const override; - int Win_Unlock(int rank, Comm::Win &win, + int Win_unlock(int rank, Comm::Win &win, const std::string &hint) const override; + int Win_lock_all(int assert, Comm::Win &win, + const std::string &hint) const override; + int Win_unlock_all(Comm::Win &win, const std::string &hint) const override; }; CommImplDummy::~CommImplDummy() = default; @@ -331,17 +334,27 @@ int CommImplDummy::Win_free(Comm::Win &win, const std::string &) const return 0; } -int CommImplDummy::Win_Lock(Comm::LockType lock_type, int rank, int assert, +int CommImplDummy::Win_lock(Comm::LockType lock_type, int rank, int assert, Comm::Win &win, const std::string &) const { return 0; } -int CommImplDummy::Win_Unlock(int rank, Comm::Win &win, +int CommImplDummy::Win_unlock(int rank, Comm::Win &win, const std::string &) const { return 0; } +int CommImplDummy::Win_lock_all(int assert, Comm::Win &win, + const std::string &) const +{ + return 0; +} +int CommImplDummy::Win_unlock_all(Comm::Win &win, const std::string &) const +{ + return 0; +} + Comm::Status CommReqImplDummy::Wait(const std::string &hint) { Comm::Status status; diff --git a/source/adios2/helper/adiosCommMPI.cpp b/source/adios2/helper/adiosCommMPI.cpp index a4ae9e0893..15787df50b 100644 --- a/source/adios2/helper/adiosCommMPI.cpp +++ b/source/adios2/helper/adiosCommMPI.cpp @@ -201,10 +201,13 @@ class CommImplMPI : public CommImpl int Win_shared_query(Comm::Win &win, int rank, size_t *size, int *disp_unit, void *baseptr, const std::string &hint) const override; int Win_free(Comm::Win &win, const std::string &hint) const override; - int Win_Lock(Comm::LockType lock_type, int rank, int assert, Comm::Win &win, + int Win_lock(Comm::LockType lock_type, int rank, int assert, Comm::Win &win, const std::string &hint) const override; - int Win_Unlock(int rank, Comm::Win &win, + int Win_unlock(int rank, Comm::Win &win, const std::string &hint) const override; + int Win_lock_all(int assert, Comm::Win &win, + const std::string &hint) const override; + int Win_unlock_all(Comm::Win &win, const std::string &hint) const override; }; CommImplMPI::~CommImplMPI() @@ -579,7 +582,7 @@ int CommImplMPI::Win_free(Comm::Win &win, const std::string &hint) const return ret; } -int CommImplMPI::Win_Lock(Comm::LockType lock_type, int rank, int assert, +int CommImplMPI::Win_lock(Comm::LockType lock_type, int rank, int assert, Comm::Win &win, const std::string &hint) const { CommWinImplMPI *w = dynamic_cast(CommWinImpl::Get(win)); @@ -588,7 +591,7 @@ int CommImplMPI::Win_Lock(Comm::LockType lock_type, int rank, int assert, CheckMPIReturn(ret, "in call to Win_Lock " + hint + "\n"); return ret; } -int CommImplMPI::Win_Unlock(int rank, Comm::Win &win, +int CommImplMPI::Win_unlock(int rank, Comm::Win &win, const std::string &hint) const { CommWinImplMPI *w = dynamic_cast(CommWinImpl::Get(win)); @@ -597,6 +600,22 @@ int CommImplMPI::Win_Unlock(int rank, Comm::Win &win, return ret; } +int CommImplMPI::Win_lock_all(int assert, Comm::Win &win, + const std::string &hint) const +{ + CommWinImplMPI *w = dynamic_cast(CommWinImpl::Get(win)); + int ret = MPI_Win_lock_all(assert, w->m_Win); + CheckMPIReturn(ret, "in call to Win_Lock_all " + hint + "\n"); + return ret; +} +int CommImplMPI::Win_unlock_all(Comm::Win &win, const std::string &hint) const +{ + CommWinImplMPI *w = dynamic_cast(CommWinImpl::Get(win)); + int ret = MPI_Win_unlock_all(w->m_Win); + CheckMPIReturn(ret, "in call to Win_Lock " + hint + "\n"); + return ret; +} + Comm::Status CommReqImplMPI::Wait(const std::string &hint) { Comm::Status status; diff --git a/source/adios2/toolkit/aggregator/mpi/MPIShmChain.h b/source/adios2/toolkit/aggregator/mpi/MPIShmChain.h index 50ed905c31..afd7ee5eaf 100644 --- a/source/adios2/toolkit/aggregator/mpi/MPIShmChain.h +++ b/source/adios2/toolkit/aggregator/mpi/MPIShmChain.h @@ -14,6 +14,7 @@ #include "adios2/common/ADIOSConfig.h" #include "adios2/toolkit/aggregator/mpi/MPIAggregator.h" +#include "adios2/toolkit/shm/Spinlock.h" #include #include @@ -24,27 +25,6 @@ namespace adios2 namespace aggregator { -class Spinlock -{ - /* from - * https://wang-yimu.com/a-tutorial-on-shared-memory-inter-process-communication - */ -public: - Spinlock() { flag_.clear(); } - void lock() - { - while (!try_lock()) - { - std::this_thread::sleep_for(std::chrono::duration(0.00001)); - } - } - void unlock() { flag_.clear(); } - -private: - inline bool try_lock() { return !flag_.test_and_set(); } - std::atomic_flag flag_; //{ATOMIC_FLAG_INIT}; -}; - // constexpr size_t SHM_BUF_SIZE = 4194304; // 4MB // we allocate 2x this size + a bit for shared memory segment @@ -162,10 +142,10 @@ class MPIShmChain : public MPIAggregator // user facing structs ShmDataBuffer sdbA; ShmDataBuffer sdbB; - aggregator::Spinlock lockSegment; + shm::Spinlock lockSegment; // locks for individual buffers (sdb and buf) - aggregator::Spinlock lockA; - aggregator::Spinlock lockB; + shm::Spinlock lockA; + shm::Spinlock lockB; // the actual data buffers // char bufA[SHM_BUF_SIZE]; // char bufB[SHM_BUF_SIZE]; diff --git a/source/adios2/toolkit/format/bp/BPBase.cpp b/source/adios2/toolkit/format/bp/BPBase.cpp index 6b669b5b75..839302d9cc 100644 --- a/source/adios2/toolkit/format/bp/BPBase.cpp +++ b/source/adios2/toolkit/format/bp/BPBase.cpp @@ -113,10 +113,10 @@ void BPBase::Init(const Params ¶meters, const std::string hint, static_cast(helper::StringTo( value, " in Parameter key=Threads " + hint)); } - else if (key == "asynctasks") + else if (key == "asyncopen") { - parsedParameters.AsyncTasks = helper::StringTo( - value, " in Parameter key=AsyncTasks " + hint); + parsedParameters.AsyncOpen = helper::StringTo( + value, " in Parameter key=AsyncOpen " + hint); } else if (key == "statslevel") { @@ -231,7 +231,7 @@ void BPBase::Init(const Params ¶meters, const std::string hint, parsedParameters.StatsLevel; // shouldn't hurt m_Parameters.Threads = parsedParameters.Threads; m_Parameters.ProfileUnit = parsedParameters.ProfileUnit; - // AsyncTasks has no impact on SST + // AsyncOpen has no impact on SST // CollectiveMetadata might break SST // NodeLocal has an unknown effect on SST // SubStreams break SST diff --git a/source/adios2/toolkit/format/bp/BPBase.h b/source/adios2/toolkit/format/bp/BPBase.h index c8fca4db3e..5ebb485a4e 100644 --- a/source/adios2/toolkit/format/bp/BPBase.h +++ b/source/adios2/toolkit/format/bp/BPBase.h @@ -189,9 +189,9 @@ class BPBase /** default time unit in m_Profiler */ TimeUnit ProfileUnit = DefaultTimeUnitEnum; - /** true: run as much local tasks in the background, + /** true: open files for write asynchronously, * false: all serial operations */ - bool AsyncTasks = true; + bool AsyncOpen = true; /** true: write collective metadata, false: skip */ bool CollectiveMetadata = true; diff --git a/source/adios2/toolkit/format/bp5/BP5Serializer.cpp b/source/adios2/toolkit/format/bp5/BP5Serializer.cpp index a9039dfb29..4e05f6f903 100644 --- a/source/adios2/toolkit/format/bp5/BP5Serializer.cpp +++ b/source/adios2/toolkit/format/bp5/BP5Serializer.cpp @@ -519,15 +519,16 @@ void BP5Serializer::PerformPuts() CurDataBuffer->CopyExternalToInternal(); } -void BP5Serializer::DumpDeferredBlocks() +void BP5Serializer::DumpDeferredBlocks(bool forceCopyDeferred) { for (auto &Def : DeferredExterns) { MetaArrayRec *MetaEntry = (MetaArrayRec *)((char *)(MetadataBuf) + Def.MetaOffset); - size_t DataOffset = m_PriorDataBufferSizeTotal + - CurDataBuffer->AddToVec(Def.DataSize, Def.Data, - Def.AlignReq, false); + size_t DataOffset = + m_PriorDataBufferSizeTotal + + CurDataBuffer->AddToVec(Def.DataSize, Def.Data, Def.AlignReq, + forceCopyDeferred); MetaEntry->DataLocation[Def.BlockID] = DataOffset; } DeferredExterns.clear(); @@ -798,7 +799,8 @@ BufferV *BP5Serializer::ReinitStepData(BufferV *DataBuffer) return tmp; } -BP5Serializer::TimestepInfo BP5Serializer::CloseTimestep(int timestep) +BP5Serializer::TimestepInfo BP5Serializer::CloseTimestep(int timestep, + bool forceCopyDeferred) { std::vector Formats; if (!Info.MetaFormat && Info.MetaFieldCount) @@ -862,7 +864,7 @@ BP5Serializer::TimestepInfo BP5Serializer::CloseTimestep(int timestep) } // Dump data for externs into iovec - DumpDeferredBlocks(); + DumpDeferredBlocks(forceCopyDeferred); MBase->DataBlockSize = CurDataBuffer->AddToVec( 0, NULL, sizeof(max_align_t), true); // output block size aligned diff --git a/source/adios2/toolkit/format/bp5/BP5Serializer.h b/source/adios2/toolkit/format/bp5/BP5Serializer.h index 3789977690..291a77744c 100644 --- a/source/adios2/toolkit/format/bp5/BP5Serializer.h +++ b/source/adios2/toolkit/format/bp5/BP5Serializer.h @@ -83,7 +83,7 @@ class BP5Serializer : virtual public BP5Base */ BufferV *ReinitStepData(BufferV *DataBuffer); - TimestepInfo CloseTimestep(int timestep); + TimestepInfo CloseTimestep(int timestep, bool forceCopyDeferred = false); void PerformPuts(); core::Engine *m_Engine = NULL; @@ -183,7 +183,7 @@ class BP5Serializer : virtual public BP5Base size_t *AppendDims(size_t *OldDims, const size_t OldCount, const size_t Count, const size_t *Vals); - void DumpDeferredBlocks(); + void DumpDeferredBlocks(bool forceCopyDeferred = false); typedef struct _ArrayRec { diff --git a/source/adios2/toolkit/shm/SerializeProcesses.cpp b/source/adios2/toolkit/shm/SerializeProcesses.cpp new file mode 100644 index 0000000000..146dc8046e --- /dev/null +++ b/source/adios2/toolkit/shm/SerializeProcesses.cpp @@ -0,0 +1,85 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * SerializeProcesses.cpp + * + * Created on: Oct 12, 2021 + * Author: Norbert Podhorszki pnorbert@ornl.gov + */ + +#include "SerializeProcesses.h" +#include +#include + +namespace adios2 +{ +namespace shm +{ + +SerializeProcesses::SerializeProcesses(helper::Comm *comm) +: m_NodeComm(comm), m_Rank(comm->Rank()), m_nProc(comm->Size()) +{ + if (m_nProc > 1) + { + char *ptr; + if (!m_Rank) + { + m_Win = m_NodeComm->Win_allocate_shared(sizeof(int), 1, &ptr); + } + else + { + m_Win = m_NodeComm->Win_allocate_shared(0, 1, &ptr); + size_t shmsize; + int disp_unit; + m_NodeComm->Win_shared_query(m_Win, 0, &shmsize, &disp_unit, &ptr); + } + m_ShmValue = reinterpret_cast(ptr); + + if (!m_Rank) + { + *m_ShmValue = 0; + } + } + else + { + m_ShmValue = new int; + } +}; + +SerializeProcesses::~SerializeProcesses() +{ + if (m_nProc > 1) + { + m_NodeComm->Win_free(m_Win); + } + else + { + delete m_ShmValue; + } +} + +void SerializeProcesses::Wait() +{ + while (*m_ShmValue != m_Rank) + { + std::this_thread::sleep_for(std::chrono::duration(0.00001)); + } +} + +bool SerializeProcesses::IsMyTurn() { return (*m_ShmValue == m_Rank); } + +void SerializeProcesses::Done() +{ + if (m_Rank < m_NodeComm->Size() - 1) + { + ++(*m_ShmValue); + } + else + { + *m_ShmValue = 0; + } +} + +} // end namespace shm +} // end namespace adios2 diff --git a/source/adios2/toolkit/shm/SerializeProcesses.h b/source/adios2/toolkit/shm/SerializeProcesses.h new file mode 100644 index 0000000000..2f8748c2b6 --- /dev/null +++ b/source/adios2/toolkit/shm/SerializeProcesses.h @@ -0,0 +1,61 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * SerializeProcesses.h + * + * Created on: Oct 12, 2021 + * Author: Norbert Podhorszki pnorbert@ornl.gov + * + * Tiny shared memory segment for the purpose of serializing a bunch of + * processes. The ranks of the communicator is used for serializing. It requires + * a communicator that connects processes on the same node only. Use + * adios2::helper::Comm::GroupByShm() to create one if needed. + * + * Rank 0 does not need to Wait() first, but must call Done() to pass control to + * Rank. Rank 0 can call Wait() after Done() and get back control when the last + * process is Done(). Other processes can Wait() or check on IsMyTurn() + * regularly. Nothing prevents processes to do anything they want unless they + * enter the blocking Wait(). + */ + +#ifndef ADIOS2_TOOLKIT_SHM_SERIALIZEPROCESSES_H_ +#define ADIOS2_TOOLKIT_SHM_SERIALIZEPROCESSES_H_ + +#include "adios2/common/ADIOSConfig.h" +#include "adios2/helper/adiosComm.h" + +#include +#include +#include + +namespace adios2 +{ +namespace shm +{ + +class SerializeProcesses +{ + +public: + SerializeProcesses(helper::Comm *comm); + + ~SerializeProcesses(); + + void Wait(); // blocking wait until it's my turn + bool IsMyTurn(); // non-blocking check if it's my turn + void Done(); // this process is done, next please + +private: + helper::Comm *m_NodeComm; + const int m_Rank; + const int m_nProc; + + helper::Comm::Win m_Win; + int *m_ShmValue; // single integer is the whole shm +}; + +} // end namespace shm +} // end namespace adios2 + +#endif /* ADIOS2_TOOLKIT_SHM_SERIALIZEPROCESSES_H_ */ diff --git a/source/adios2/toolkit/shm/Spinlock.cpp b/source/adios2/toolkit/shm/Spinlock.cpp new file mode 100644 index 0000000000..1595093022 --- /dev/null +++ b/source/adios2/toolkit/shm/Spinlock.cpp @@ -0,0 +1,36 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * Spinlock.cpp + * + * Created on: Oct 12, 2021 + * Moved out from adios2/toolkit/aggregator/mpi/MPIShmChain.h + * Author: Norbert Podhorszki pnorbert@ornl.gov + * + */ + +#include "Spinlock.h" + +#include +#include + +namespace adios2 +{ +namespace shm +{ + +Spinlock::Spinlock() { flag_.clear(); } +void Spinlock::lock() +{ + while (!try_lock()) + { + std::this_thread::sleep_for(std::chrono::duration(0.00001)); + } +} +void Spinlock::unlock() { flag_.clear(); } + +inline bool Spinlock::try_lock() { return !flag_.test_and_set(); } + +} // end namespace shm +} // end namespace adios2 diff --git a/source/adios2/toolkit/shm/Spinlock.h b/source/adios2/toolkit/shm/Spinlock.h new file mode 100644 index 0000000000..3d2537a11d --- /dev/null +++ b/source/adios2/toolkit/shm/Spinlock.h @@ -0,0 +1,42 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * Spinlock.h + * + * Created on: Oct 12, 2021 + * Moved out from adios2/toolkit/aggregator/mpi/MPIShmChain.h + * Author: Norbert Podhorszki pnorbert@ornl.gov + * + */ + +#ifndef ADIOS2_TOOLKIT_SHM_SPINLOCK_H_ +#define ADIOS2_TOOLKIT_SHM_SPINLOCK_H_ + +#include + +namespace adios2 +{ +namespace shm +{ + +class Spinlock +{ + /* from + * https://wang-yimu.com/a-tutorial-on-shared-memory-inter-process-communication + */ +public: + Spinlock(); + virtual ~Spinlock() = default; + void lock(); + void unlock(); + +private: + inline bool try_lock(); + std::atomic_flag flag_; //{ATOMIC_FLAG_INIT}; +}; + +} // end namespace shm +} // end namespace adios2 + +#endif /* ADIOS2_TOOLKIT_SHM_SPINLOCK_H_ */ diff --git a/source/adios2/toolkit/shm/TokenChain.h b/source/adios2/toolkit/shm/TokenChain.h new file mode 100644 index 0000000000..58aa54e727 --- /dev/null +++ b/source/adios2/toolkit/shm/TokenChain.h @@ -0,0 +1,176 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * TokenChain.h + * + * Created on: Oct 12, 2021 + * Author: Norbert Podhorszki pnorbert@ornl.gov + * + * A shared memory segment for the purpose of serializing a bunch of + * processes. The ranks of the communicator is used for serializing. It requires + * a communicator that connects processes on the same node only. Use + * adios2::helper::Comm::GroupByShm() to create one if needed. + * + * Rank 0 must call SendToken() to pass a token to Rank 1. + * Rank 0 MAY call RecvToken() to wait for the last process and get back a token + * from it. It MUST call RecvToken() however if it ever wants to start a second + * round so that it is synced with ending the first round. + * + * Other processes can blocking wait on RecvToken() or check status on + * CheckToken() regularly. Nothing prevents processes to do anything they want + * unless they enter the blocking wait. + * + * Example: + + helper:Comm comm = m_Comm.GroupByShm(); + shm::TokenChain tokenChain(&comm); + if (!rank) + { + int token = m_Comm.Size(); + tokenChain.SendToken(token); + ... + token = tokenChain.RecvToken(); + } + else + { + int token = tokenChain.RecvToken(); + ... + --token; + tokenChain.SendToken(token); + } + + */ + +#ifndef ADIOS2_TOOLKIT_SHM_TOKENCHAIN_H_ +#define ADIOS2_TOOLKIT_SHM_TOKENCHAIN_H_ + +#include "adios2/common/ADIOSConfig.h" +#include "adios2/helper/adiosComm.h" + +#include +#include +#include +#include +#include + +namespace adios2 +{ +namespace shm +{ + +template +class TokenChain +{ + + struct segment_ + { + int currentRank; + T token; + }; + +public: + TokenChain(helper::Comm *comm) + : m_NodeComm(comm), m_Rank(comm->Rank()), m_nProc(comm->Size()) + { + if (m_nProc > 1) + { + char *ptr; + if (!m_Rank) + { + m_Win = + m_NodeComm->Win_allocate_shared(sizeof(segment_), 1, &ptr); + } + else + { + m_Win = m_NodeComm->Win_allocate_shared(0, 1, &ptr); + size_t shmsize; + int disp_unit; + m_NodeComm->Win_shared_query(m_Win, 0, &shmsize, &disp_unit, + &ptr); + } + m_Shm = reinterpret_cast(ptr); + + if (!m_Rank) + { + m_Shm->currentRank = 0; + m_Shm->token = T(); + } + } + else + { + m_Shm = new segment_; + m_Shm->currentRank = 0; + m_Shm->token = T(); + } + } + + ~TokenChain() + { + if (m_nProc > 1) + { + m_NodeComm->Win_free(m_Win); + } + else + { + delete m_Shm; + } + } + + /** blocking wait until it's my turn, returns token */ + T &RecvToken() + { + while (m_Shm->currentRank != m_Rank) + { + assert(0 <= m_Shm->currentRank && m_Shm->currentRank < m_nProc); + std::this_thread::sleep_for(std::chrono::duration(0.00001)); + } + return m_Shm->token; + } + + /** non-blocking check if it's my turn */ + bool CheckToken() + { + assert(0 <= m_Shm->currentRank && m_Shm->currentRank < m_nProc); + return (m_Shm->currentRank == m_Rank); + } + + /** this process is done, pass token to next */ + void SendToken(T &token) + { + if (m_Rank != m_Shm->currentRank) + { + throw std::runtime_error( + "ADIOS2 Programming error: TokenChain::SendToken can only be " + "called by the Rank who last called " + "RecvToken, rank = " + + std::to_string(m_Rank)); + } + assert(0 <= m_Shm->currentRank && m_Shm->currentRank < m_nProc); + m_Shm->token = token; + /* Warning: flipping the currentRank may activate other process' + * RecvToken which in turn may call SendToken and change m_Shm + * immediately. So this action must be the very last action here */ + if (m_Rank < m_nProc - 1) + { + ++(m_Shm->currentRank); + } + else + { + m_Shm->currentRank = 0; + } + } + +private: + helper::Comm *m_NodeComm; + const int m_Rank; + const int m_nProc; + + helper::Comm::Win m_Win; + segment_ *m_Shm; +}; + +} // end namespace shm +} // end namespace adios2 + +#endif /* ADIOS2_TOOLKIT_SHM_TOKENCHAIN_H_ */ diff --git a/source/adios2/toolkit/transport/Transport.cpp b/source/adios2/toolkit/transport/Transport.cpp index b7d8848c25..9794583b2c 100644 --- a/source/adios2/toolkit/transport/Transport.cpp +++ b/source/adios2/toolkit/transport/Transport.cpp @@ -9,6 +9,8 @@ */ #include "Transport.h" +#include "adios2/core/CoreTypes.h" +#include // max #include "adios2/helper/adiosFunctions.h" //CreateDirectory diff --git a/source/adios2/toolkit/transport/file/FilePOSIX.cpp b/source/adios2/toolkit/transport/file/FilePOSIX.cpp index 2a1f265e37..52dc677767 100644 --- a/source/adios2/toolkit/transport/file/FilePOSIX.cpp +++ b/source/adios2/toolkit/transport/file/FilePOSIX.cpp @@ -460,7 +460,18 @@ size_t FilePOSIX::GetSize() return static_cast(fileStat.st_size); } -void FilePOSIX::Flush() {} +void FilePOSIX::Flush() +{ + /* Turn this off now because BP3/BP4 calls manager Flush and this syncing + * slows down IO performance */ +#if 0 +#if (_POSIX_C_SOURCE >= 199309L || _XOPEN_SOURCE >= 500) + fdatasync(m_FileDescriptor); +#else + fsync(m_FileDescriptor) +#endif +#endif +} void FilePOSIX::Close() { diff --git a/source/adios2/toolkit/transportman/TransportMan.cpp b/source/adios2/toolkit/transportman/TransportMan.cpp index 5afb3df0a0..307416064c 100644 --- a/source/adios2/toolkit/transportman/TransportMan.cpp +++ b/source/adios2/toolkit/transportman/TransportMan.cpp @@ -228,7 +228,6 @@ void TransportMan::WriteFiles(const char *buffer, const size_t size, auto &transport = transportPair.second; if (transport->m_Type == "File") { - // make this truly asynch? transport->Write(buffer, size); } } @@ -275,7 +274,6 @@ void TransportMan::WriteFiles(const core::iovec *iov, const size_t iovcnt, auto &transport = transportPair.second; if (transport->m_Type == "File") { - // make this truly asynch? transport->WriteV(iov, static_cast(iovcnt)); } } @@ -356,6 +354,28 @@ void TransportMan::SeekToFileBegin(const int transportIndex) } } +void TransportMan::SeekTo(const size_t start, const int transportIndex) +{ + if (transportIndex == -1) + { + for (auto &transportPair : m_Transports) + { + auto &transport = transportPair.second; + if (transport->m_Type == "File") + { + transport->Seek(start); + } + } + } + else + { + auto itTransport = m_Transports.find(transportIndex); + CheckFile(itTransport, ", in call to SeekTo with index " + + std::to_string(transportIndex)); + itTransport->second->Seek(start); + } +} + size_t TransportMan::GetFileSize(const size_t transportIndex) const { auto itTransport = m_Transports.find(transportIndex); @@ -579,12 +599,12 @@ std::shared_ptr TransportMan::OpenFileTransport( return helper::StringToTimeUnit(profileUnits); }; - auto lf_GetAsync = [&](const std::string defaultAsync, - const Params ¶meters) -> bool { - std::string Async = defaultAsync; - helper::SetParameterValue("AsyncTasks", parameters, Async); - helper::SetParameterValue("asynctasks", parameters, Async); - return helper::StringTo(Async, ""); + auto lf_GetAsyncOpen = [&](const std::string defaultAsync, + const Params ¶meters) -> bool { + std::string AsyncOpen = defaultAsync; + helper::SetParameterValue("AsyncOpen", parameters, AsyncOpen); + helper::SetParameterValue("asyncopen", parameters, AsyncOpen); + return helper::StringTo(AsyncOpen, ""); }; // BODY OF FUNCTION starts here @@ -605,11 +625,12 @@ std::shared_ptr TransportMan::OpenFileTransport( if (useComm) { transport->OpenChain(fileName, openMode, chainComm, - lf_GetAsync("true", parameters)); + lf_GetAsyncOpen("true", parameters)); } else { - transport->Open(fileName, openMode, lf_GetAsync("false", parameters)); + transport->Open(fileName, openMode, + lf_GetAsyncOpen("false", parameters)); } return transport; } diff --git a/source/adios2/toolkit/transportman/TransportMan.h b/source/adios2/toolkit/transportman/TransportMan.h index 29e8903678..8d901cc723 100644 --- a/source/adios2/toolkit/transportman/TransportMan.h +++ b/source/adios2/toolkit/transportman/TransportMan.h @@ -201,6 +201,8 @@ class TransportMan void SeekToFileBegin(const int transportIndex = -1); + void SeekTo(const size_t start, const int transportIndex = -1); + /** * Check if a file exists. * @param name diff --git a/source/utils/adios_iotest/adios_iotest.cpp b/source/utils/adios_iotest/adios_iotest.cpp index 1806be5080..415487e5fa 100644 --- a/source/utils/adios_iotest/adios_iotest.cpp +++ b/source/utils/adios_iotest/adios_iotest.cpp @@ -80,6 +80,8 @@ int main(int argc, char *argv[]) MPI_Barrier(settings.appComm); timeStart = MPI_Wtime(); + uint64_t actualBusyTime_usec = 0; + try { /* writing to one stream using two groups is not supported. @@ -217,16 +219,28 @@ int main(int argc, char *argv[]) { case Operation::Sleep: { + std::chrono::high_resolution_clock::time_point start = + std::chrono::high_resolution_clock::now(); + adios.EnterComputationBlock(); auto cmdS = dynamic_cast(cmd.get()); if (!settings.myRank && settings.verbose) { double t = static_cast(cmdS->sleepTime_us) / 1000000.0; - std::cout << " Sleep for " << t << " seconds " - << std::endl; + std::cout << " Sleep for " << t << " seconds "; } std::this_thread::sleep_for( std::chrono::microseconds(cmdS->sleepTime_us)); + adios.ExitComputationBlock(); + if (!settings.myRank && settings.verbose) + { + std::chrono::high_resolution_clock::time_point end = + std::chrono::high_resolution_clock::now(); + double t = static_cast((end - start).count()) / + 1000000000.0; + std::cout << " -> Slept for " << t << " seconds " + << std::endl; + } break; } case Operation::Busy: @@ -234,16 +248,51 @@ int main(int argc, char *argv[]) auto cmdS = dynamic_cast(cmd.get()); std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now(); + auto sleeptime = + std::chrono::microseconds(cmdS->busyTime_us); if (!settings.myRank && settings.verbose) { double t = static_cast(cmdS->busyTime_us) / 1000000.0; - std::cout << " Be busy for " << t << " seconds " + std::cout << " Busy for " << cmdS->cycles + << " cycles with " << t + << " seconds work in each cycle"; + } + const size_t N = 1048576; + double *f = (double *)calloc(N, sizeof(double)); + double *g = (double *)malloc(N * sizeof(double)); + for (size_t c = 0; c < cmdS->cycles; ++c) + { + auto end = std::chrono::high_resolution_clock::now() + + std::chrono::microseconds(cmdS->busyTime_us); + if (cmdS->busyTime_us > 0.1) + { + adios.EnterComputationBlock(); + } + while (std::chrono::high_resolution_clock::now() < end) + { + for (size_t i = 0; i < N; ++i) + { + f[i] = f[i] * 2.0 + 0.000001; + } + } + if (cmdS->busyTime_us > 0.1) + { + adios.ExitComputationBlock(); + } + MPI_Allreduce(f, g, N, MPI_DOUBLE, MPI_SUM, + settings.appComm); + } + std::chrono::high_resolution_clock::time_point end = + std::chrono::high_resolution_clock::now(); + actualBusyTime_usec += (end - start).count() / 1000; + if (!settings.myRank && settings.verbose) + { + double t = static_cast((end - start).count()) / + 1000000000.0; + std::cout << " -> Was busy for " << t << " seconds " << std::endl; } - while (std::chrono::high_resolution_clock::now() < - start + std::chrono::microseconds(cmdS->busyTime_us)) - ; break; } case Operation::Write: @@ -347,6 +396,12 @@ int main(int argc, char *argv[]) timeEnd = MPI_Wtime(); if (!settings.myRank) { + if (actualBusyTime_usec > 0) + { + std::cout << " Total Busy time on Rank 0 was " + << (double)actualBusyTime_usec / 1000000.0 << " seconds " + << std::endl; + } std::cout << "ADIOS IOTEST App " << settings.appId << " total time " << timeEnd - timeStart << " seconds " << std::endl; } diff --git a/source/utils/adios_iotest/processConfig.cpp b/source/utils/adios_iotest/processConfig.cpp index f3e69ae3c2..e6a73f3d93 100644 --- a/source/utils/adios_iotest/processConfig.cpp +++ b/source/utils/adios_iotest/processConfig.cpp @@ -6,6 +6,7 @@ */ #include +#include #include #include // FLT_MAX #include @@ -13,6 +14,7 @@ #include #include #include +#include #include "decomp.h" #include "processConfig.h" @@ -26,8 +28,8 @@ CommandSleep::CommandSleep(size_t time) } CommandSleep::~CommandSleep() {} -CommandBusy::CommandBusy(size_t time) -: Command(Operation::Busy), busyTime_us(time) +CommandBusy::CommandBusy(size_t cycles, size_t time) +: Command(Operation::Busy), cycles(cycles), busyTime_us(time) { } CommandBusy::~CommandBusy() {} @@ -322,8 +324,9 @@ void printConfig(const Config &cfg) case Operation::Busy: { auto cmdS = dynamic_cast(cmd.get()); - std::cout << " Be busy for " << cmdS->busyTime_us - << " microseconds " << std::endl; + std::cout << " Be busy for " << cmdS->cycles + << " compute cycles with " << cmdS->busyTime_us + << " microseconds of computation each " << std::endl; break; } case Operation::Write: @@ -561,15 +564,18 @@ Config processConfig(const Settings &settings, size_t *currentConfigLineNumber) { if (currentAppId == static_cast(settings.appId)) { - double d = stringToDouble(words, 1, "busy"); + size_t cycles = + static_cast(stringToSizet(words, 1, "busy")); + double d = stringToDouble(words, 2, "busy"); if (verbose0) { - std::cout - << "--> Command Busy for: " << std::setprecision(7) - << d << " seconds" << std::endl; + std::cout << "--> Command Busy for: " << cycles + << " cycles with " << std::setprecision(7) + << d << " seconds computation each" + << std::endl; } size_t t_us = static_cast(d * 1000000); - auto cmd = std::make_shared(t_us); + auto cmd = std::make_shared(cycles, t_us); cmd->conditionalStream = conditionalStream; cfg.commands.push_back(cmd); } diff --git a/source/utils/adios_iotest/processConfig.h b/source/utils/adios_iotest/processConfig.h index acf818fa92..b4018f9f84 100644 --- a/source/utils/adios_iotest/processConfig.h +++ b/source/utils/adios_iotest/processConfig.h @@ -60,8 +60,9 @@ class CommandSleep : public Command class CommandBusy : public Command { public: - const size_t busyTime_us = 0; // in microseconds - CommandBusy(size_t time); + const size_t cycles = 1; + const size_t busyTime_us = 0; // in microseconds per cycle + CommandBusy(size_t cycles, size_t time); ~CommandBusy(); }; diff --git a/testing/adios2/engine/bp/CMakeLists.txt b/testing/adios2/engine/bp/CMakeLists.txt index 7240334f24..b9df63ac41 100644 --- a/testing/adios2/engine/bp/CMakeLists.txt +++ b/testing/adios2/engine/bp/CMakeLists.txt @@ -12,6 +12,12 @@ file(MAKE_DIRECTORY ${BP4_DIR}) file(MAKE_DIRECTORY ${BP5_DIR}) file(MAKE_DIRECTORY ${FS_DIR}) +set(BP5_ASYNC_DIR ${BP5_DIR}/async) +file(MAKE_DIRECTORY ${BP5_ASYNC_DIR}/tls-guided) +file(MAKE_DIRECTORY ${BP5_ASYNC_DIR}/tls-naive) +file(MAKE_DIRECTORY ${BP5_ASYNC_DIR}/ews-guided) +file(MAKE_DIRECTORY ${BP5_ASYNC_DIR}/ews-naive) + macro(bp3_bp4_gtest_add_tests_helper testname mpi) gtest_add_tests_helper(${testname} ${mpi} BP Engine.BP. .BP3 WORKING_DIRECTORY ${BP3_DIR} EXTRA_ARGS "BP3" @@ -35,12 +41,31 @@ macro(bp_gtest_add_tests_helper testname mpi) endif() endmacro() +macro(async_gtest_add_tests_helper testname mpi) + if(ADIOS2_HAVE_BP5) + gtest_add_tests_helper(${testname} ${mpi} BP Engine.BP. .Async.BP5.TLS.Guided + WORKING_DIRECTORY ${BP5_ASYNC_DIR}/tls-guided EXTRA_ARGS "BP5" "AggregationType=TwoLevelShm,AsyncWrite=Guided" + ) + gtest_add_tests_helper(${testname} ${mpi} BP Engine.BP. .Async.BP5.TLS.Naive + WORKING_DIRECTORY ${BP5_ASYNC_DIR}/tls-naive EXTRA_ARGS "BP5" "AggregationType=TwoLevelShm,AsyncWrite=Naive" + ) + gtest_add_tests_helper(${testname} ${mpi} BP Engine.BP. .Async.BP5.EWS.Guided + WORKING_DIRECTORY ${BP5_ASYNC_DIR}/ews-guided EXTRA_ARGS "BP5" "AggregationType=EveryoneWritesSerial,AsyncWrite=Guided" + ) + gtest_add_tests_helper(${testname} ${mpi} BP Engine.BP. .Async.BP5.EWS.Naive + WORKING_DIRECTORY ${BP5_ASYNC_DIR}/ews-naive EXTRA_ARGS "BP5" "AggregationType=EveryoneWritesSerial,AsyncWrite=Naive" + ) + endif() +endmacro() + add_subdirectory(operations) # These tests should be *very* fast set(CTEST_TEST_TIMEOUT 10) bp_gtest_add_tests_helper(WriteReadADIOS2 MPI_ALLOW) +async_gtest_add_tests_helper(WriteReadADIOS2 MPI_ALLOW) + bp_gtest_add_tests_helper(WriteReadADIOS2fstream MPI_ALLOW) bp3_bp4_gtest_add_tests_helper(WriteReadADIOS2stdio MPI_ALLOW) bp_gtest_add_tests_helper(WriteReadAsStreamADIOS2 MPI_ALLOW) diff --git a/testing/adios2/engine/bp/TestBPWriteReadADIOS2.cpp b/testing/adios2/engine/bp/TestBPWriteReadADIOS2.cpp index e43cb3e919..8d4bdeedfd 100644 --- a/testing/adios2/engine/bp/TestBPWriteReadADIOS2.cpp +++ b/testing/adios2/engine/bp/TestBPWriteReadADIOS2.cpp @@ -15,7 +15,8 @@ #include "../SmallTestData.h" -std::string engineName; // comes from command line +std::string engineName; // comes from command line +std::string engineParameters; // comes from command line class BPWriteReadTestADIOS2 : public ::testing::Test { @@ -136,8 +137,11 @@ TEST_F(BPWriteReadTestADIOS2, ADIOS2BPWriteRead1D8) // Create the BP Engine io.SetEngine("BPFile"); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } - io.SetParameter("AsyncThreads", "0"); io.AddTransport("file"); // QUESTION: It seems that BPFilterWriter cannot overwrite existing @@ -223,6 +227,10 @@ TEST_F(BPWriteReadTestADIOS2, ADIOS2BPWriteRead1D8) { io.SetEngine(engineName); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } adios2::Engine bpReader = io.Open(fname, adios2::Mode::ReadRandomAccess); @@ -496,6 +504,10 @@ TEST_F(BPWriteReadTestADIOS2, ADIOS2BPWriteRead2D2x4) // Create the BP Engine io.SetEngine("BPFile"); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } io.AddTransport("file"); adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write); @@ -565,6 +577,10 @@ TEST_F(BPWriteReadTestADIOS2, ADIOS2BPWriteRead2D2x4) { io.SetEngine(engineName); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } adios2::Engine bpReader = io.Open(fname, adios2::Mode::ReadRandomAccess); @@ -817,6 +833,10 @@ TEST_F(BPWriteReadTestADIOS2, ADIOS2BPWriteRead2D4x2) // Create the BP Engine io.SetEngine("BPFile"); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } io.AddTransport("file"); @@ -883,6 +903,10 @@ TEST_F(BPWriteReadTestADIOS2, ADIOS2BPWriteRead2D4x2) { io.SetEngine(engineName); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } adios2::Engine bpReader = io.Open(fname, adios2::Mode::ReadRandomAccess); @@ -1100,6 +1124,10 @@ TEST_F(BPWriteReadTestADIOS2, ADIOS2BPWriteRead10D2x2) // Create the BP Engine io.SetEngine("BPFile"); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } io.AddTransport("file"); @@ -1149,6 +1177,10 @@ TEST_F(BPWriteReadTestADIOS2, ADIOS2BPWriteRead10D2x2) { io.SetEngine(engineName); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } adios2::Engine bpReader = io.Open(fname, adios2::Mode::ReadRandomAccess); @@ -1302,6 +1334,10 @@ TEST_F(BPWriteReadTestADIOS2, ADIOS2BPWriteRead2D4x2_ReadMultiSteps) // Create the BP Engine io.SetEngine("BPFile"); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } io.AddTransport("file"); @@ -1368,6 +1404,10 @@ TEST_F(BPWriteReadTestADIOS2, ADIOS2BPWriteRead2D4x2_ReadMultiSteps) { io.SetEngine(engineName); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } adios2::Engine bpReader = io.Open(fname, adios2::Mode::ReadRandomAccess); @@ -1610,6 +1650,10 @@ TEST_F(BPWriteReadTestADIOS2, ADIOS2BPWriteRead2D4x2_MultiStepsOverflow) // Create the BP Engine io.SetEngine("BPFile"); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } io.AddTransport("file"); @@ -1676,6 +1720,10 @@ TEST_F(BPWriteReadTestADIOS2, ADIOS2BPWriteRead2D4x2_MultiStepsOverflow) { io.SetEngine(engineName); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } adios2::Engine bpReader = io.Open(fname, adios2::Mode::ReadRandomAccess); @@ -1775,6 +1823,10 @@ TEST_F(BPWriteReadTestADIOS2, OpenEngineTwice) // Create the BP Engine io.SetEngine("BPFile"); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write); @@ -1818,6 +1870,10 @@ TEST_F(BPWriteReadTestADIOS2, ReadStartCount) { io.SetEngine(engineName); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } io.DefineVariable( "range", {static_cast(Nx * mpiSize)}, @@ -1835,6 +1891,10 @@ TEST_F(BPWriteReadTestADIOS2, ReadStartCount) { io.SetEngine(engineName); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } adios2::Engine bpReader = io.Open(fname, adios2::Mode::ReadRandomAccess); @@ -1916,6 +1976,10 @@ TEST_F(BPWriteReadTestADIOS2, ADIOS2BPWriteReadEmptyProcess) // Create the BP Engine io.SetEngine("BPFile"); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write); @@ -1960,6 +2024,10 @@ TEST_F(BPWriteReadTestADIOS2, ADIOS2BPWriteReadEmptyProcess) { io.SetEngine(engineName); } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } adios2::Engine bpReader = io.Open(fname, adios2::Mode::Read); @@ -2031,6 +2099,10 @@ int main(int argc, char **argv) { engineName = std::string(argv[1]); } + if (argc > 2) + { + engineParameters = std::string(argv[2]); + } result = RUN_ALL_TESTS(); #if ADIOS2_USE_MPI diff --git a/testing/adios2/performance/query/TestBPQuery.cpp b/testing/adios2/performance/query/TestBPQuery.cpp index a1b348a487..31e5ee48ed 100644 --- a/testing/adios2/performance/query/TestBPQuery.cpp +++ b/testing/adios2/performance/query/TestBPQuery.cpp @@ -195,8 +195,6 @@ void BPQueryTest::WriteFile(const std::string &fname, adios2::ADIOS &adios, io.SetEngine("BPFile"); } - io.SetParameter("AsyncThreads", "0"); - if (engineName.compare("BP4") == 0) { io.SetParameters("statslevel=1");