Skip to content

Commit

Permalink
Do not extend buffer twice during PerformPuts, and maintain deferred …
Browse files Browse the repository at this point in the history
…datasize correctly in case of multiple calls of PerformPuts(). Addresses ornladios#2320
  • Loading branch information
pnorbert committed Jun 12, 2020
1 parent 130c4f1 commit 90ea2df
Show file tree
Hide file tree
Showing 16 changed files with 374 additions and 25 deletions.
11 changes: 11 additions & 0 deletions bindings/CXX11/adios2/cxx11/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,17 @@ ADIOS2_FOREACH_PRIMITIVE_TYPE_1ARG(declare_template_instantiation)
ADIOS2_FOREACH_TYPE_1ARG(declare_template_instantiation)
#undef declare_template_instantiation

size_t Engine::DebugGetDataBufferSize() const
{
helper::CheckForNullptr(m_Engine,
"in call to Engine::DebugGetDataBufferSize");
if (m_Engine->m_EngineType == "NULL")
{
return 0;
}
return m_Engine->DebugGetDataBufferSize();
}

std::string ToString(const Engine &engine)
{
return std::string("Engine(Name: \"" + engine.Name() + "\", Type: \"" +
Expand Down
3 changes: 3 additions & 0 deletions bindings/CXX11/adios2/cxx11/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,9 @@ class Engine
*/
void LockReaderSelections();

/* Debug function for adios2 testing framework */
size_t DebugGetDataBufferSize() const;

private:
Engine(core::Engine *engine);
core::Engine *m_Engine = nullptr;
Expand Down
8 changes: 7 additions & 1 deletion source/adios2/core/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ void Engine::LockReaderSelections() noexcept
m_ReaderSelectionsLocked = true;
}

size_t Engine::DebugGetDataBufferSize() const
{
ThrowUp("DebugGetDataBufferSize");
return 0;
}

// PROTECTED
void Engine::Init() {}
void Engine::InitParameters() {}
Expand Down Expand Up @@ -171,7 +177,7 @@ ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type)

size_t Engine::DoSteps() const
{
ThrowUp("DoPut");
ThrowUp("DoSteps");
return MaxSizeT;
}

Expand Down
3 changes: 3 additions & 0 deletions source/adios2/core/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ class Engine
*/
void LockReaderSelections() noexcept;

/* for adios2 internal testing */
virtual size_t DebugGetDataBufferSize() const;

protected:
/** from ADIOS class passed to Engine created with Open
* if no communicator is passed */
Expand Down
5 changes: 5 additions & 0 deletions source/adios2/engine/bp3/BP3Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ void BP3Writer::PerformPuts()
#undef declare_template_instantiation
}
m_BP3Serializer.m_DeferredVariables.clear();
m_BP3Serializer.m_DeferredVariablesDataSize = 0;
}

void BP3Writer::EndStep()
Expand Down Expand Up @@ -413,6 +414,10 @@ void BP3Writer::AggregateWriteData(const bool isFinal, const int transportIndex)
ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type)
#undef declare_type

size_t BP3Writer::DebugGetDataBufferSize() const
{
return m_BP3Serializer.DebugGetDataBufferSize();
}
} // end namespace engine
} // end namespace core
} // end namespace adios2
5 changes: 4 additions & 1 deletion source/adios2/engine/bp3/BP3Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class BP3Writer : public core::Engine
void EndStep() final;
void Flush(const int transportIndex = -1) final;

size_t DebugGetDataBufferSize() const final;

private:
/** Single object controlling BP buffering */
format::BP3Serializer m_BP3Serializer;
Expand Down Expand Up @@ -85,7 +87,8 @@ class BP3Writer : public core::Engine

template <class T>
void PutSyncCommon(Variable<T> &variable,
const typename Variable<T>::Info &blockInfo);
const typename Variable<T>::Info &blockInfo,
const bool resize = true);

template <class T>
void PutDeferredCommon(Variable<T> &variable, const T *data);
Expand Down
22 changes: 14 additions & 8 deletions source/adios2/engine/bp3/BP3Writer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,21 @@ void BP3Writer::PutCommon(Variable<T> &variable,

template <class T>
void BP3Writer::PutSyncCommon(Variable<T> &variable,
const typename Variable<T>::Info &blockInfo)
const typename Variable<T>::Info &blockInfo,
const bool resize)
{
const size_t dataSize =
helper::PayloadSize(blockInfo.Data, blockInfo.Count) +
m_BP3Serializer.GetBPIndexSizeInData(variable.m_Name, blockInfo.Count);
format::BP3Base::ResizeResult resizeResult =
format::BP3Base::ResizeResult::Success;
if (resize)
{
const size_t dataSize =
helper::PayloadSize(blockInfo.Data, blockInfo.Count) +
m_BP3Serializer.GetBPIndexSizeInData(variable.m_Name,
blockInfo.Count);

const format::BP3Base::ResizeResult resizeResult =
m_BP3Serializer.ResizeBuffer(dataSize, "in call to variable " +
variable.m_Name + " Put");
resizeResult = m_BP3Serializer.ResizeBuffer(
dataSize, "in call to variable " + variable.m_Name + " Put");
}

// if first timestep Write create a new pg index or in time aggregation
if (!m_BP3Serializer.m_MetadataSet.DataPGIsOpen)
Expand Down Expand Up @@ -133,7 +139,7 @@ void BP3Writer::PerformPutCommon(Variable<T> &variable)
auto itSpanBlock = variable.m_BlocksSpan.find(b);
if (itSpanBlock == variable.m_BlocksSpan.end())
{
PutSyncCommon(variable, variable.m_BlocksInfo[b]);
PutSyncCommon(variable, variable.m_BlocksInfo[b], false);
}
else
{
Expand Down
16 changes: 11 additions & 5 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ void BP4Writer::PerformPuts()
#undef declare_template_instantiation
}
m_BP4Serializer.m_DeferredVariables.clear();
m_BP4Serializer.m_DeferredVariablesDataSize = 0;
}

void BP4Writer::EndStep()
Expand Down Expand Up @@ -540,7 +541,7 @@ void BP4Writer::UpdateActiveFlag(const bool active)
m_FileMetadataIndexManager.SeekToFileEnd();
if (m_DrainBB)
{
for (int i = 0; i < m_MetadataIndexFileNames.size(); ++i)
for (size_t i = 0; i < m_MetadataIndexFileNames.size(); ++i)
{
m_FileDrainer.AddOperationWriteAt(
m_DrainMetadataIndexFileNames[i],
Expand Down Expand Up @@ -574,7 +575,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)

if (m_DrainBB)
{
for (int i = 0; i < m_MetadataFileNames.size(); ++i)
for (size_t i = 0; i < m_MetadataFileNames.size(); ++i)
{
m_FileDrainer.AddOperationCopy(
m_MetadataFileNames[i], m_DrainMetadataFileNames[i],
Expand Down Expand Up @@ -637,7 +638,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)

if (m_DrainBB)
{
for (int i = 0; i < m_MetadataIndexFileNames.size(); ++i)
for (size_t i = 0; i < m_MetadataIndexFileNames.size(); ++i)
{
m_FileDrainer.AddOperationWrite(
m_DrainMetadataIndexFileNames[i],
Expand Down Expand Up @@ -678,7 +679,7 @@ void BP4Writer::WriteData(const bool isFinal, const int transportIndex)
m_FileDataManager.FlushFiles(transportIndex);
if (m_DrainBB)
{
for (int i = 0; i < m_SubStreamNames.size(); ++i)
for (size_t i = 0; i < m_SubStreamNames.size(); ++i)
{
m_FileDrainer.AddOperationCopy(m_SubStreamNames[i],
m_DrainSubStreamNames[i], dataSize);
Expand Down Expand Up @@ -728,7 +729,7 @@ void BP4Writer::AggregateWriteData(const bool isFinal, const int transportIndex)

if (m_DrainBB)
{
for (int i = 0; i < m_SubStreamNames.size(); ++i)
for (size_t i = 0; i < m_SubStreamNames.size(); ++i)
{
m_FileDrainer.AddOperationCopy(m_SubStreamNames[i],
m_DrainSubStreamNames[i],
Expand Down Expand Up @@ -756,6 +757,11 @@ void BP4Writer::AggregateWriteData(const bool isFinal, const int transportIndex)
ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type)
#undef declare_type

size_t BP4Writer::DebugGetDataBufferSize() const
{
return m_BP4Serializer.DebugGetDataBufferSize();
}

} // end namespace engine
} // end namespace core
} // end namespace adios2
5 changes: 4 additions & 1 deletion source/adios2/engine/bp4/BP4Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class BP4Writer : public core::Engine
void EndStep() final;
void Flush(const int transportIndex = -1) final;

size_t DebugGetDataBufferSize() const final;

private:
/** Single object controlling BP buffering */
format::BP4Serializer m_BP4Serializer;
Expand Down Expand Up @@ -115,7 +117,8 @@ class BP4Writer : public core::Engine

template <class T>
void PutSyncCommon(Variable<T> &variable,
const typename Variable<T>::Info &blockInfo);
const typename Variable<T>::Info &blockInfo,
const bool resize = true);

template <class T>
void PutDeferredCommon(Variable<T> &variable, const T *data);
Expand Down
23 changes: 14 additions & 9 deletions source/adios2/engine/bp4/BP4Writer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,21 @@ void BP4Writer::PutCommon(Variable<T> &variable,

template <class T>
void BP4Writer::PutSyncCommon(Variable<T> &variable,
const typename Variable<T>::Info &blockInfo)
const typename Variable<T>::Info &blockInfo,
const bool resize)
{
const size_t dataSize =
helper::PayloadSize(blockInfo.Data, blockInfo.Count) +
m_BP4Serializer.GetBPIndexSizeInData(variable.m_Name, blockInfo.Count);

const format::BP4Base::ResizeResult resizeResult =
m_BP4Serializer.ResizeBuffer(dataSize, "in call to variable " +
variable.m_Name + " Put");
format::BP4Base::ResizeResult resizeResult =
format::BP4Base::ResizeResult::Success;
if (resize)
{
const size_t dataSize =
helper::PayloadSize(blockInfo.Data, blockInfo.Count) +
m_BP4Serializer.GetBPIndexSizeInData(variable.m_Name,
blockInfo.Count);

resizeResult = m_BP4Serializer.ResizeBuffer(
dataSize, "in call to variable " + variable.m_Name + " Put");
}
// if first timestep Write create a new pg index
if (!m_BP4Serializer.m_MetadataSet.DataPGIsOpen)
{
Expand Down Expand Up @@ -133,7 +138,7 @@ void BP4Writer::PerformPutCommon(Variable<T> &variable)
auto itSpanBlock = variable.m_BlocksSpan.find(b);
if (itSpanBlock == variable.m_BlocksSpan.end())
{
PutSyncCommon(variable, variable.m_BlocksInfo[b]);
PutSyncCommon(variable, variable.m_BlocksInfo[b], false);
}
else
{
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/toolkit/format/bp/BPBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,5 +505,7 @@ std::map<size_t, std::shared_ptr<BPOperation>> BPBase::SetBPOperations(
ADIOS2_FOREACH_STDTYPE_1ARG(declare_template_instantiation)
#undef declare_template_instantiation

size_t BPBase::DebugGetDataBufferSize() const { return m_Data.DebugGetSize(); }

} // end namespace format
} // end namespace adios2
2 changes: 2 additions & 0 deletions source/adios2/toolkit/format/bp/BPBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ class BPBase
/** Delete buffer memory manually */
void DeleteBuffers();

size_t DebugGetDataBufferSize() const;

protected:
/** file I/O method type, adios1 legacy, only POSIX and MPI_AGG are used */
enum IO_METHOD
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/toolkit/format/buffer/heap/BufferSTL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,7 @@ ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_template_instantiation)

void BufferSTL::Delete() { std::vector<char>().swap(m_Buffer); }

size_t BufferSTL::DebugGetSize() const { return m_Buffer.size(); };

} // end namespace format
} // end namespace adios2
2 changes: 2 additions & 0 deletions source/adios2/toolkit/format/buffer/heap/BufferSTL.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class BufferSTL : public Buffer
size_t Align() const noexcept;

void Delete();

size_t DebugGetSize() const;
};

#define declare_template_instantiation(T) \
Expand Down
4 changes: 4 additions & 0 deletions testing/adios2/engine/bp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ bp3_bp4_gtest_add_tests_helper(WriteReadVariableSpan MPI_ALLOW)
bp3_bp4_gtest_add_tests_helper(TimeAggregation MPI_ALLOW)
bp3_bp4_gtest_add_tests_helper(NoXMLRecovery MPI_ALLOW)

if(NOT MSVC)
bp3_bp4_gtest_add_tests_helper(BufferSize MPI_NONE)
endif()

if(ADIOS2_HAVE_MPI)
bp3_bp4_gtest_add_tests_helper(WriteAggregateRead MPI_ONLY)
endif()
Expand Down
Loading

0 comments on commit 90ea2df

Please sign in to comment.