diff --git a/docs/user_guide/source/engines/bp5.rst b/docs/user_guide/source/engines/bp5.rst index 0dfc8ce396..4b12238e45 100644 --- a/docs/user_guide/source/engines/bp5.rst +++ b/docs/user_guide/source/engines/bp5.rst @@ -127,12 +127,13 @@ This engine allows the user to fine tune the buffering operations through the fo #. **MaxOpenFilesAtOnce**: Specify how many subfiles a process can keep open at once. Default is unlimited. If a dataset contains more subfiles than how many open file descriptors the system allows (see *ulimit -n*) then one can either try to raise that system limit (set it with *ulimit -n*), or set this parameter to force the reader to close some subfiles to stay within the limits. + #. **Threads**: Read side: Specify how many threads one process can use to speed up reading. The default value is *0*, to let the engine estimate the number of threads based on how many processes are running on the compute node and how many hardware threads are available on the compute node but it will use maximum 16 threads. Value *1* forces the engine to read everything within the main thread of the process. Other values specify the exact number of threads the engine can use. Although multithreaded reading works in a single *Get(adios2::Mode::Sync)* call if the read selection spans multiple data blocks in the file, the best parallelization is achieved by using deferred mode and reading everything in *PerformGets()/EndStep()*. ============================== ===================== =========================================================== **Key** **Value Format** **Default** and Examples ============================== ===================== =========================================================== OpenTimeoutSecs float **0** for *ReadRandomAccess* mode, **3600** for *Read* mode, ``10.0``, ``5`` - BeginStepPollingFrequencySecs float **1**, ``10.0`` + BeginStepPollingFrequencySecs float **1**, 10.0 AggregationType string **TwoLevelShm**, EveryoneWritesSerial, EveryoneWrites NumAggregators integer >= 1 **0 (one file per compute node)** AggregatorRatio integer >= 1 not used unless set @@ -151,8 +152,9 @@ This engine allows the user to fine tune the buffering operations through the fo DirectIO string On/Off **Off**, On, true, false DirectIOAlignOffset integer >= 0 **512** DirectIOAlignBuffer integer >= 0 set to DirectIOAlignOffset if unset - StatsLevel integer, 0 or 1 **1**, ``0`` + StatsLevel integer, 0 or 1 **1**, 0 MaxOpenFilesAtOnce integer >= 0 **UINT_MAX**, 1024, 1 + Threads integer >= 0 **0**, 1, 32 ============================== ===================== =========================================================== diff --git a/source/adios2/engine/bp5/BP5Engine.h b/source/adios2/engine/bp5/BP5Engine.h index 99407e888c..f836a4608f 100644 --- a/source/adios2/engine/bp5/BP5Engine.h +++ b/source/adios2/engine/bp5/BP5Engine.h @@ -159,7 +159,7 @@ class BP5Engine MACRO(ReaderShortCircuitReads, Bool, bool, false) \ MACRO(StatsLevel, UInt, unsigned int, 1) \ MACRO(StatsBlockSize, SizeBytes, size_t, DefaultStatsBlockSize) \ - MACRO(Threads, UInt, unsigned int, 1) \ + MACRO(Threads, UInt, unsigned int, 0) \ MACRO(MaxOpenFilesAtOnce, UInt, unsigned int, UINT_MAX) struct BP5Params diff --git a/source/adios2/engine/bp5/BP5Reader.cpp b/source/adios2/engine/bp5/BP5Reader.cpp index 240948c3a0..4b8247b68a 100644 --- a/source/adios2/engine/bp5/BP5Reader.cpp +++ b/source/adios2/engine/bp5/BP5Reader.cpp @@ -14,6 +14,8 @@ #include #include +#include +#include using TP = std::chrono::high_resolution_clock::time_point; #define NOW() std::chrono::high_resolution_clock::now(); @@ -331,15 +333,14 @@ void BP5Reader::PerformGets() // TP startRead = NOW(); // double sortTime = 0.0; - if (m_Parameters.Threads > 1 && nRequest > 1) + if (m_Threads > 1 && nRequest > 1) { // TP startSort = NOW(); std::sort(ReadRequests.begin(), ReadRequests.end(), lf_CompareReqSubfile); // TP endSort = NOW(); // sortTime = DURATION(startSort, endSort); - size_t nThreads = - (m_Parameters.Threads < nRequest ? m_Parameters.Threads : nRequest); + size_t nThreads = (m_Threads < nRequest ? m_Threads : nRequest); size_t maxOpenFiles = helper::SetWithinLimit( (size_t)m_Parameters.MaxOpenFilesAtOnce / nThreads, (size_t)1, @@ -471,6 +472,24 @@ void BP5Reader::InitParameters() m_Parameters.OpenTimeoutSecs = 3600.0f; } } + + m_Threads = m_Parameters.Threads; + if (m_Threads == 0) + { + helper::Comm m_NodeComm = + m_Comm.GroupByShm("creating per-node comm at BP5 Open(read)"); + unsigned int NodeSize = static_cast(m_NodeComm.Size()); + unsigned int NodeThreadSize = helper::NumHardwareThreadsPerNode(); + if (NodeThreadSize > 0) + { + m_Threads = + helper::SetWithinLimit(NodeThreadSize / NodeSize, 1U, 16U); + } + else + { + m_Threads = helper::SetWithinLimit(8U / NodeSize, 1U, 8U); + } + } } bool BP5Reader::SleepOrQuit(const TimePoint &timeoutInstant, diff --git a/source/adios2/engine/bp5/BP5Reader.h b/source/adios2/engine/bp5/BP5Reader.h index f4fbcd1685..d635b3899d 100644 --- a/source/adios2/engine/bp5/BP5Reader.h +++ b/source/adios2/engine/bp5/BP5Reader.h @@ -22,7 +22,6 @@ #include #include -#include #include namespace adios2 @@ -255,6 +254,11 @@ class BP5Reader : public BP5Engine, public Engine std::map m_WriterMap; // step -> writermap index (for all steps) std::vector m_WriterMapIndex; + + /* Communicator connecting ranks on each Compute Node. + Only used to calculate the number of threads available for reading */ + helper::Comm m_NodeComm; + unsigned int m_Threads; }; } // end namespace engine diff --git a/source/adios2/helper/adiosSystem.cpp b/source/adios2/helper/adiosSystem.cpp index 814b6d2306..3629130c80 100644 --- a/source/adios2/helper/adiosSystem.cpp +++ b/source/adios2/helper/adiosSystem.cpp @@ -12,6 +12,7 @@ #include #include // std::runtime_error, std::exception #include +#include #include @@ -181,5 +182,10 @@ char BPVersion(const std::string &name, helper::Comm &comm, return version; } +unsigned int NumHardwareThreadsPerNode() +{ + return std::thread::hardware_concurrency(); +} + } // end namespace helper } // end namespace adios2 diff --git a/source/adios2/helper/adiosSystem.h b/source/adios2/helper/adiosSystem.h index 5c9c8e68f3..4385428c8b 100644 --- a/source/adios2/helper/adiosSystem.h +++ b/source/adios2/helper/adiosSystem.h @@ -79,6 +79,12 @@ bool IsHDF5File(const std::string &name, helper::Comm &comm, const std::vector &transportsParameters) noexcept; char BPVersion(const std::string &name, helper::Comm &comm, const std::vector &transportsParameters) noexcept; + +/** Return the number of available hardware threads on the node. + * It might return 0 if the detection does not work + */ +unsigned int NumHardwareThreadsPerNode(); + } // end namespace helper } // end namespace adios2 diff --git a/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp b/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp index 1411ac5cc2..5a0f4fc7d9 100644 --- a/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp +++ b/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp @@ -1288,10 +1288,13 @@ void BP5Deserializer::FinalizeGet(const ReadRequest &Read, const bool freeAddr) ->Count[dim + Read.BlockID * writer_meta_base->Dims]; } decompressBuffer.resize(DestSize); - core::Decompress(IncomingData, - ((MetaArrayRecOperator *)writer_meta_base) - ->DataLengths[Read.BlockID], - decompressBuffer.data()); + { + std::lock_guard lockGuard(mutexDecompress); + core::Decompress(IncomingData, + ((MetaArrayRecOperator *)writer_meta_base) + ->DataLengths[Read.BlockID], + decompressBuffer.data()); + } IncomingData = decompressBuffer.data(); VirtualIncomingData = IncomingData; } diff --git a/source/adios2/toolkit/format/bp5/BP5Deserializer.h b/source/adios2/toolkit/format/bp5/BP5Deserializer.h index d005d05706..528bdc13b6 100644 --- a/source/adios2/toolkit/format/bp5/BP5Deserializer.h +++ b/source/adios2/toolkit/format/bp5/BP5Deserializer.h @@ -18,6 +18,8 @@ #include "ffs.h" #include "fm.h" +#include + #ifdef _WIN32 #pragma warning(disable : 4250) #endif @@ -217,6 +219,10 @@ class BP5Deserializer : virtual public BP5Base void *GetMetadataBase(BP5VarRec *VarRec, size_t Step, size_t WriterRank) const; size_t CurTimestep = 0; + + /* We assume operators are not thread-safe, call Decompress() one at a time + */ + std::mutex mutexDecompress; }; } // end namespace format diff --git a/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp b/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp index 2d11e69047..fcf1dedde7 100644 --- a/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp +++ b/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp @@ -212,7 +212,6 @@ TEST_F(BPWriteAppendReadTestADIOS2, ADIOS2BPWriteAppendRead2D2x4) // fill in the variable with values from starting index to // starting index + count bpWriter.BeginStep(); - bpWriter.Put(var_i16, currentTestData.I16.data()); bpWriter.Put(var_iString, currentTestData.S1); bpWriter.Put(var_i8, currentTestData.I8.data()); bpWriter.Put(var_i16, currentTestData.I16.data()); @@ -365,7 +364,6 @@ TEST_F(BPWriteAppendReadTestADIOS2, ADIOS2BPWriteAppendRead2D2x4) // fill in the variable with values from starting index to // starting index + count bpAppender.BeginStep(); - bpAppender.Put(var_i16, currentTestData.I16.data()); bpAppender.Put(var_iString, currentTestData.S1); bpAppender.Put(var_i8, currentTestData.I8.data()); bpAppender.Put(var_i16, currentTestData.I16.data());