Skip to content

Commit

Permalink
Merge pull request #3247 from pnorbert/bp5-multithreaded-read-autocalc
Browse files Browse the repository at this point in the history
If BP5 parameter Threads is 0 (default), calculate the number of thre…
  • Loading branch information
pnorbert authored Jun 9, 2022
2 parents 1524626 + 27b728b commit 62a3372
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 13 deletions.
6 changes: 4 additions & 2 deletions docs/user_guide/source/engines/bp5.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,13 @@ This engine allows the user to fine tune the buffering operations through the fo

#. **MaxOpenFilesAtOnce**: Specify how many subfiles a process can keep open at once. Default is unlimited. If a dataset contains more subfiles than how many open file descriptors the system allows (see *ulimit -n*) then one can either try to raise that system limit (set it with *ulimit -n*), or set this parameter to force the reader to close some subfiles to stay within the limits.

#. **Threads**: Read side: Specify how many threads one process can use to speed up reading. The default value is *0*, to let the engine estimate the number of threads based on how many processes are running on the compute node and how many hardware threads are available on the compute node but it will use maximum 16 threads. Value *1* forces the engine to read everything within the main thread of the process. Other values specify the exact number of threads the engine can use. Although multithreaded reading works in a single *Get(adios2::Mode::Sync)* call if the read selection spans multiple data blocks in the file, the best parallelization is achieved by using deferred mode and reading everything in *PerformGets()/EndStep()*.

============================== ===================== ===========================================================
**Key** **Value Format** **Default** and Examples
============================== ===================== ===========================================================
OpenTimeoutSecs float **0** for *ReadRandomAccess* mode, **3600** for *Read* mode, ``10.0``, ``5``
BeginStepPollingFrequencySecs float **1**, ``10.0``
BeginStepPollingFrequencySecs float **1**, 10.0
AggregationType string **TwoLevelShm**, EveryoneWritesSerial, EveryoneWrites
NumAggregators integer >= 1 **0 (one file per compute node)**
AggregatorRatio integer >= 1 not used unless set
Expand All @@ -151,8 +152,9 @@ This engine allows the user to fine tune the buffering operations through the fo
DirectIO string On/Off **Off**, On, true, false
DirectIOAlignOffset integer >= 0 **512**
DirectIOAlignBuffer integer >= 0 set to DirectIOAlignOffset if unset
StatsLevel integer, 0 or 1 **1**, ``0``
StatsLevel integer, 0 or 1 **1**, 0
MaxOpenFilesAtOnce integer >= 0 **UINT_MAX**, 1024, 1
Threads integer >= 0 **0**, 1, 32
============================== ===================== ===========================================================


Expand Down
2 changes: 1 addition & 1 deletion source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class BP5Engine
MACRO(ReaderShortCircuitReads, Bool, bool, false) \
MACRO(StatsLevel, UInt, unsigned int, 1) \
MACRO(StatsBlockSize, SizeBytes, size_t, DefaultStatsBlockSize) \
MACRO(Threads, UInt, unsigned int, 1) \
MACRO(Threads, UInt, unsigned int, 0) \
MACRO(MaxOpenFilesAtOnce, UInt, unsigned int, UINT_MAX)

struct BP5Params
Expand Down
25 changes: 22 additions & 3 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include <chrono>
#include <errno.h>
#include <mutex>
#include <thread>

using TP = std::chrono::high_resolution_clock::time_point;
#define NOW() std::chrono::high_resolution_clock::now();
Expand Down Expand Up @@ -331,15 +333,14 @@ void BP5Reader::PerformGets()

// TP startRead = NOW();
// double sortTime = 0.0;
if (m_Parameters.Threads > 1 && nRequest > 1)
if (m_Threads > 1 && nRequest > 1)
{
// TP startSort = NOW();
std::sort(ReadRequests.begin(), ReadRequests.end(),
lf_CompareReqSubfile);
// TP endSort = NOW();
// sortTime = DURATION(startSort, endSort);
size_t nThreads =
(m_Parameters.Threads < nRequest ? m_Parameters.Threads : nRequest);
size_t nThreads = (m_Threads < nRequest ? m_Threads : nRequest);

size_t maxOpenFiles = helper::SetWithinLimit(
(size_t)m_Parameters.MaxOpenFilesAtOnce / nThreads, (size_t)1,
Expand Down Expand Up @@ -471,6 +472,24 @@ void BP5Reader::InitParameters()
m_Parameters.OpenTimeoutSecs = 3600.0f;
}
}

m_Threads = m_Parameters.Threads;
if (m_Threads == 0)
{
helper::Comm m_NodeComm =
m_Comm.GroupByShm("creating per-node comm at BP5 Open(read)");
unsigned int NodeSize = static_cast<unsigned int>(m_NodeComm.Size());
unsigned int NodeThreadSize = helper::NumHardwareThreadsPerNode();
if (NodeThreadSize > 0)
{
m_Threads =
helper::SetWithinLimit(NodeThreadSize / NodeSize, 1U, 16U);
}
else
{
m_Threads = helper::SetWithinLimit(8U / NodeSize, 1U, 8U);
}
}
}

bool BP5Reader::SleepOrQuit(const TimePoint &timeoutInstant,
Expand Down
6 changes: 5 additions & 1 deletion source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

#include <chrono>
#include <map>
#include <mutex>
#include <vector>

namespace adios2
Expand Down Expand Up @@ -255,6 +254,11 @@ class BP5Reader : public BP5Engine, public Engine
std::map<uint64_t, WriterMapStruct> m_WriterMap;
// step -> writermap index (for all steps)
std::vector<uint64_t> m_WriterMapIndex;

/* Communicator connecting ranks on each Compute Node.
Only used to calculate the number of threads available for reading */
helper::Comm m_NodeComm;
unsigned int m_Threads;
};

} // end namespace engine
Expand Down
6 changes: 6 additions & 0 deletions source/adios2/helper/adiosSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <ctime>
#include <stdexcept> // std::runtime_error, std::exception
#include <system_error>
#include <thread>

#include <adios2sys/SystemTools.hxx>

Expand Down Expand Up @@ -181,5 +182,10 @@ char BPVersion(const std::string &name, helper::Comm &comm,
return version;
}

unsigned int NumHardwareThreadsPerNode()
{
return std::thread::hardware_concurrency();
}

} // end namespace helper
} // end namespace adios2
6 changes: 6 additions & 0 deletions source/adios2/helper/adiosSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ bool IsHDF5File(const std::string &name, helper::Comm &comm,
const std::vector<Params> &transportsParameters) noexcept;
char BPVersion(const std::string &name, helper::Comm &comm,
const std::vector<Params> &transportsParameters) noexcept;

/** Return the number of available hardware threads on the node.
* It might return 0 if the detection does not work
*/
unsigned int NumHardwareThreadsPerNode();

} // end namespace helper
} // end namespace adios2

Expand Down
11 changes: 7 additions & 4 deletions source/adios2/toolkit/format/bp5/BP5Deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1288,10 +1288,13 @@ void BP5Deserializer::FinalizeGet(const ReadRequest &Read, const bool freeAddr)
->Count[dim + Read.BlockID * writer_meta_base->Dims];
}
decompressBuffer.resize(DestSize);
core::Decompress(IncomingData,
((MetaArrayRecOperator *)writer_meta_base)
->DataLengths[Read.BlockID],
decompressBuffer.data());
{
std::lock_guard<std::mutex> lockGuard(mutexDecompress);
core::Decompress(IncomingData,
((MetaArrayRecOperator *)writer_meta_base)
->DataLengths[Read.BlockID],
decompressBuffer.data());
}
IncomingData = decompressBuffer.data();
VirtualIncomingData = IncomingData;
}
Expand Down
6 changes: 6 additions & 0 deletions source/adios2/toolkit/format/bp5/BP5Deserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "ffs.h"
#include "fm.h"

#include <mutex>

#ifdef _WIN32
#pragma warning(disable : 4250)
#endif
Expand Down Expand Up @@ -217,6 +219,10 @@ class BP5Deserializer : virtual public BP5Base
void *GetMetadataBase(BP5VarRec *VarRec, size_t Step,
size_t WriterRank) const;
size_t CurTimestep = 0;

/* We assume operators are not thread-safe, call Decompress() one at a time
*/
std::mutex mutexDecompress;
};

} // end namespace format
Expand Down
2 changes: 0 additions & 2 deletions testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ TEST_F(BPWriteAppendReadTestADIOS2, ADIOS2BPWriteAppendRead2D2x4)
// fill in the variable with values from starting index to
// starting index + count
bpWriter.BeginStep();
bpWriter.Put(var_i16, currentTestData.I16.data());
bpWriter.Put(var_iString, currentTestData.S1);
bpWriter.Put(var_i8, currentTestData.I8.data());
bpWriter.Put(var_i16, currentTestData.I16.data());
Expand Down Expand Up @@ -365,7 +364,6 @@ TEST_F(BPWriteAppendReadTestADIOS2, ADIOS2BPWriteAppendRead2D2x4)
// fill in the variable with values from starting index to
// starting index + count
bpAppender.BeginStep();
bpAppender.Put(var_i16, currentTestData.I16.data());
bpAppender.Put(var_iString, currentTestData.S1);
bpAppender.Put(var_i8, currentTestData.I8.data());
bpAppender.Put(var_i16, currentTestData.I16.data());
Expand Down

0 comments on commit 62a3372

Please sign in to comment.