Skip to content

Commit

Permalink
Merge pull request #2789 from pnorbert/bp5-aggregation
Browse files Browse the repository at this point in the history
Bp5 aggregation
  • Loading branch information
pnorbert authored Jul 16, 2021
2 parents 6bb790d + 585a56c commit 8dba298
Show file tree
Hide file tree
Showing 26 changed files with 1,540 additions and 123 deletions.
2 changes: 2 additions & 0 deletions examples/basics/globalArray/globalArray_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ int main(int argc, char *argv[])
// Get io settings from the config file or
// create one with default settings here
adios2::IO io = adios.DeclareIO("Output");
io.SetEngine("BP5");
io.SetParameter("NumAggregators", "1");

/*
* Define global array: type, name, global dimensions
Expand Down
3 changes: 2 additions & 1 deletion source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ add_library(adios2_core

toolkit/aggregator/mpi/MPIAggregator.cpp
toolkit/aggregator/mpi/MPIChain.cpp
toolkit/aggregator/mpi/MPIShmChain.cpp

toolkit/burstbuffer/FileDrainer.cpp
toolkit/burstbuffer/FileDrainerSingleThread.cpp
Expand All @@ -124,7 +125,7 @@ if (ADIOS2_HAVE_BP5)
target_sources(adios2_core PRIVATE
engine/bp5/BP5Engine.cpp
engine/bp5/BP5Reader.cpp engine/bp5/BP5Reader.tcc
engine/bp5/BP5Writer.cpp engine/bp5/BP5Writer.tcc
engine/bp5/BP5Writer.cpp engine/bp5/BP5Writer.tcc engine/bp5/BP5Writer_TwoLevelShm.cpp
)
endif()

Expand Down
5 changes: 3 additions & 2 deletions source/adios2/engine/bp3/BP3Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ void BP3Writer::Init()
static_cast<unsigned int>(m_BP3Serializer.m_SizeMPI))
{
m_BP3Serializer.m_Aggregator.Init(
m_BP3Serializer.m_Parameters.NumAggregators,
m_BP3Serializer.m_Parameters.NumAggregators, m_Comm);
}
InitTransports();
Expand Down Expand Up @@ -388,10 +389,10 @@ void BP3Writer::AggregateWriteData(const bool isFinal, const int transportIndex)
// async?
for (int r = 0; r < m_BP3Serializer.m_Aggregator.m_Size; ++r)
{
aggregator::MPIAggregator::ExchangeRequests dataRequests =
aggregator::MPIChain::ExchangeRequests dataRequests =
m_BP3Serializer.m_Aggregator.IExchange(m_BP3Serializer.m_Data, r);

aggregator::MPIAggregator::ExchangeAbsolutePositionRequests
aggregator::MPIChain::ExchangeAbsolutePositionRequests
absolutePositionRequests =
m_BP3Serializer.m_Aggregator.IExchangeAbsolutePosition(
m_BP3Serializer.m_Data, r);
Expand Down
5 changes: 3 additions & 2 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ void BP4Writer::Init()
static_cast<unsigned int>(m_BP4Serializer.m_SizeMPI))
{
m_BP4Serializer.m_Aggregator.Init(
m_BP4Serializer.m_Parameters.NumAggregators,
m_BP4Serializer.m_Parameters.NumAggregators, m_Comm);
}
InitTransports();
Expand Down Expand Up @@ -745,10 +746,10 @@ void BP4Writer::AggregateWriteData(const bool isFinal, const int transportIndex)
// async?
for (int r = 0; r < m_BP4Serializer.m_Aggregator.m_Size; ++r)
{
aggregator::MPIAggregator::ExchangeRequests dataRequests =
aggregator::MPIChain::ExchangeRequests dataRequests =
m_BP4Serializer.m_Aggregator.IExchange(m_BP4Serializer.m_Data, r);

aggregator::MPIAggregator::ExchangeAbsolutePositionRequests
aggregator::MPIChain::ExchangeAbsolutePositionRequests
absolutePositionRequests =
m_BP4Serializer.m_Aggregator.IExchangeAbsolutePosition(
m_BP4Serializer.m_Data, r);
Expand Down
33 changes: 31 additions & 2 deletions source/adios2/engine/bp5/BP5Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,6 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params)
std::string value = itKey->second;
parameter = helper::StringToByteUnits(
value, "for Parameter key=" + key + "in call to Open");
parameter =
helper::StringTo<float>(value, " in Parameter key=" + key);
}
};

Expand Down Expand Up @@ -263,6 +261,37 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params)
}
};

auto lf_SetAggregationTypeParameter = [&](const std::string key,
int &parameter, int def) {
auto itKey = io.m_Parameters.find(key);
parameter = def;
if (itKey != io.m_Parameters.end())
{
std::string value = itKey->second;
std::transform(value.begin(), value.end(), value.begin(),
::tolower);
if (value == "everyonewrites" || value == "auto")
{
parameter = (int)AggregationType::EveryoneWrites;
}
else if (value == "everyonewritesserial")
{
parameter = (int)AggregationType::EveryoneWritesSerial;
}
else if (value == "twolevelshm")
{
parameter = (int)AggregationType::TwoLevelShm;
}
else
{
throw std::invalid_argument(
"ERROR: Unknown BP5 AggregationType parameter \"" + value +
"\" (must be \"auto\", \"everyonewrites\" or "
"\"twolevelshm\"");
}
}
};

#define get_params(Param, Type, Typedecl, Default) \
lf_Set##Type##Parameter(#Param, Params.Param, Default);
BP5_FOREACH_PARAMETER_TYPE_4ARGS(get_params);
Expand Down
12 changes: 12 additions & 0 deletions source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ class BP5Engine

BufferVType UseBufferV = BufferVType::ChunkVType;

enum class AggregationType
{
EveryoneWrites,
EveryoneWritesSerial,
TwoLevelShm,
Auto
};

#define BP5_FOREACH_PARAMETER_TYPE_4ARGS(MACRO) \
MACRO(OpenTimeoutSecs, Int, int, 3600) \
MACRO(BeginStepPollingFrequencySecs, Int, int, 0) \
Expand All @@ -103,6 +111,10 @@ class BP5Engine
MACRO(verbose, Int, int, 0) \
MACRO(CollectiveMetadata, Bool, bool, true) \
MACRO(NumAggregators, UInt, unsigned int, 999999) \
MACRO(NumSubFiles, UInt, unsigned int, 999999) \
MACRO(FileSystemPageSize, UInt, unsigned int, 4096) \
MACRO(AggregationType, AggregationType, int, \
(int)AggregationType::TwoLevelShm) \
MACRO(AsyncTasks, Bool, bool, true) \
MACRO(GrowthFactor, Float, float, DefaultBufferGrowthFactor) \
MACRO(InitialBufferSize, SizeBytes, size_t, DefaultInitialBufferSize) \
Expand Down
Loading

0 comments on commit 8dba298

Please sign in to comment.