Skip to content

Commit

Permalink
Merge pull request #1605 from williamfgc/buffer
Browse files Browse the repository at this point in the history
Refactoring Buffer options to format components
  • Loading branch information
williamfgc authored Jul 9, 2019
2 parents 701761f + 3541ef7 commit 7a52c2f
Show file tree
Hide file tree
Showing 18 changed files with 339 additions and 111 deletions.
6 changes: 4 additions & 2 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ add_library(adios2

engine/nullcore/NullCoreWriter.cpp engine/nullcore/NullCoreWriter.tcc
#toolkit
toolkit/format/BufferSTL.cpp
toolkit/format/buffer/Buffer.cpp
toolkit/format/buffer/heap/BufferSTL.cpp

toolkit/format/bp3/BP3Base.cpp toolkit/format/bp3/BP3Base.tcc
toolkit/format/bp3/BP3Serializer.cpp toolkit/format/bp3/BP3Serializer.tcc
Expand Down Expand Up @@ -104,6 +105,8 @@ if(UNIX)
endif()

if(ADIOS2_HAVE_SysVShMem)
target_sources(adios2 PRIVATE toolkit/format/buffer/ipc/BufferSystemV.cpp)

target_sources(adios2 PRIVATE toolkit/transport/shm/ShmSystemV.cpp)
endif()

Expand Down Expand Up @@ -143,7 +146,6 @@ if(ADIOS2_HAVE_SSC)
target_link_libraries(adios2 PRIVATE nlohmann_json)
endif()


if(ADIOS2_HAVE_SST)
add_subdirectory(toolkit/sst)
target_sources(adios2 PRIVATE
Expand Down
8 changes: 4 additions & 4 deletions source/adios2/engine/bp3/BP3Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,12 @@ void BP3Writer::AggregateWriteData(const bool isFinal, const int transportIndex)

if (m_BP3Serializer.m_Aggregator.m_IsConsumer)
{
const BufferSTL &bufferSTL =
const format::Buffer &bufferSTL =
m_BP3Serializer.m_Aggregator.GetConsumerBuffer(
m_BP3Serializer.m_Data);

m_FileDataManager.WriteFiles(bufferSTL.m_Buffer.data(),
bufferSTL.m_Position, transportIndex);
m_FileDataManager.WriteFiles(bufferSTL.Data(), bufferSTL.m_Position,
transportIndex);

m_FileDataManager.FlushFiles(transportIndex);
}
Expand All @@ -374,7 +374,7 @@ void BP3Writer::AggregateWriteData(const bool isFinal, const int transportIndex)

if (isFinal) // Write metadata footer
{
BufferSTL &bufferSTL = m_BP3Serializer.m_Data;
format::BufferSTL &bufferSTL = m_BP3Serializer.m_Data;
m_BP3Serializer.ResetBuffer(bufferSTL, false, false);

m_BP3Serializer.AggregateCollectiveMetadata(
Expand Down
13 changes: 6 additions & 7 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ void BP4Writer::InitBPBuffer()
// throw std::invalid_argument(
// "ADIOS2: OpenMode Append hasn't been implemented, yet");
// TODO: Get last pg timestep and update timestep counter in
BufferSTL preMetadataIndex;
format::BufferSTL preMetadataIndex;
size_t preMetadataIndexFileSize;

if (m_BP4Serializer.m_RankMPI == 0)
Expand Down Expand Up @@ -411,7 +411,7 @@ void BP4Writer::WriteProfilingJSONFile()

/*write the content of metadata index file*/
void BP4Writer::PopulateMetadataIndexFileContent(
BufferSTL &b, const uint64_t currentStep, const uint64_t mpirank,
format::BufferSTL &b, const uint64_t currentStep, const uint64_t mpirank,
const uint64_t pgIndexStart, const uint64_t variablesIndexStart,
const uint64_t attributesIndexStart, const uint64_t currentStepEndPos,
const uint64_t currentTimeStamp)
Expand Down Expand Up @@ -486,7 +486,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
m_BP4Serializer.m_Metadata.m_Position +
m_BP4Serializer.m_PreMetadataFileLength;

BufferSTL metadataIndex;
format::BufferSTL metadataIndex;
metadataIndex.Resize(128, "BP4 Index Table Entry");

uint64_t currentStep;
Expand Down Expand Up @@ -575,14 +575,13 @@ void BP4Writer::AggregateWriteData(const bool isFinal, const int transportIndex)

if (m_BP4Serializer.m_Aggregator.m_IsConsumer)
{
const BufferSTL &bufferSTL =
const format::Buffer &bufferSTL =
m_BP4Serializer.m_Aggregator.GetConsumerBuffer(
m_BP4Serializer.m_Data);
if (bufferSTL.m_Position > 0)
{
m_FileDataManager.WriteFiles(bufferSTL.m_Buffer.data(),
bufferSTL.m_Position,
transportIndex);
m_FileDataManager.WriteFiles(
bufferSTL.Data(), bufferSTL.m_Position, transportIndex);

m_FileDataManager.FlushFiles(transportIndex);
}
Expand Down
8 changes: 4 additions & 4 deletions source/adios2/engine/bp4/BP4Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ class BP4Writer : public core::Engine
void UpdateActiveFlag(const bool active);

void PopulateMetadataIndexFileContent(
BufferSTL &buffer, const uint64_t currentStep, const uint64_t mpirank,
const uint64_t pgIndexStart, const uint64_t variablesIndexStart,
const uint64_t attributesIndexStart, const uint64_t currentStepEndPos,
const uint64_t currentTimeStamp);
format::BufferSTL &buffer, const uint64_t currentStep,
const uint64_t mpirank, const uint64_t pgIndexStart,
const uint64_t variablesIndexStart, const uint64_t attributesIndexStart,
const uint64_t currentStepEndPos, const uint64_t currentTimeStamp);

void WriteCollectiveMetadataFile(const bool isFinal = false);

Expand Down
4 changes: 2 additions & 2 deletions source/adios2/toolkit/aggregator/mpi/MPIAggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ void MPIAggregator::SwapBuffers(const int step) noexcept {}

void MPIAggregator::ResetBuffers() noexcept {}

BufferSTL &MPIAggregator::GetConsumerBuffer(BufferSTL &bufferSTL)
format::Buffer &MPIAggregator::GetConsumerBuffer(format::Buffer &buffer)
{
return bufferSTL;
return buffer;
}

void MPIAggregator::Close()
Expand Down
12 changes: 7 additions & 5 deletions source/adios2/toolkit/aggregator/mpi/MPIAggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
#ifndef ADIOS2_TOOLKIT_AGGREGATOR_MPI_MPIAGGREGATOR_H_
#define ADIOS2_TOOLKIT_AGGREGATOR_MPI_MPIAGGREGATOR_H_

#include <memory> //std::unique_ptr

#include "adios2/common/ADIOSMPI.h"
#include "adios2/common/ADIOSTypes.h"
#include "adios2/toolkit/format/BufferSTL.h"
#include "adios2/toolkit/format/buffer/Buffer.h"

namespace adios2
{
Expand Down Expand Up @@ -58,10 +60,10 @@ class MPIAggregator
virtual void Init(const size_t subStreams, MPI_Comm parentComm);

virtual std::vector<std::vector<MPI_Request>>
IExchange(BufferSTL &bufferSTL, const int step) = 0;
IExchange(format::Buffer &buffer, const int step) = 0;

virtual std::vector<std::vector<MPI_Request>>
IExchangeAbsolutePosition(BufferSTL &bufferSTL, const int step) = 0;
IExchangeAbsolutePosition(format::Buffer &buffer, const int step) = 0;

virtual void
WaitAbsolutePosition(std::vector<std::vector<MPI_Request>> &requests,
Expand All @@ -74,7 +76,7 @@ class MPIAggregator

virtual void ResetBuffers() noexcept;

virtual BufferSTL &GetConsumerBuffer(BufferSTL &bufferSTL);
virtual format::Buffer &GetConsumerBuffer(format::Buffer &buffer);

/** closes current aggregator, frees m_Comm */
void Close();
Expand All @@ -88,7 +90,7 @@ class MPIAggregator
void HandshakeRank(const int rank = 0);

/** assigning extra buffers for aggregation */
std::vector<BufferSTL> m_Buffers;
std::vector<std::unique_ptr<format::Buffer>> m_Buffers;
};

} // end namespace aggregator
Expand Down
75 changes: 44 additions & 31 deletions source/adios2/toolkit/aggregator/mpi/MPIChain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "adios2/common/ADIOSMPI.h"
#include "adios2/helper/adiosFunctions.h" //helper::CheckMPIReturn

#include "adios2/toolkit/format/buffer/heap/BufferSTL.h"

namespace adios2
{
namespace aggregator
Expand All @@ -28,19 +30,19 @@ void MPIChain::Init(const size_t subStreams, MPI_Comm parentComm)
// add a receiving buffer except for the last rank (only sends)
if (m_Rank < m_Size)
{
m_Buffers.emplace_back(); // just one for now
m_Buffers.emplace_back(new format::BufferSTL()); // just one for now
}
}

std::vector<std::vector<MPI_Request>> MPIChain::IExchange(BufferSTL &bufferSTL,
const int step)
std::vector<std::vector<MPI_Request>>
MPIChain::IExchange(format::Buffer &buffer, const int step)
{
if (m_Size == 1)
{
return std::vector<std::vector<MPI_Request>>();
}

BufferSTL &sendBuffer = GetSender(bufferSTL);
format::Buffer &sendBuffer = GetSender(buffer);
const int endRank = m_Size - 1 - step;
const bool sender = (m_Rank >= 1 && m_Rank <= endRank) ? true : false;
const bool receiver = (m_Rank < endRank) ? true : false;
Expand All @@ -61,11 +63,10 @@ std::vector<std::vector<MPI_Request>> MPIChain::IExchange(BufferSTL &bufferSTL,
if (sendBuffer.m_Position > 0)
{

const std::vector<MPI_Request> requestsISend64 =
helper::Isend64(sendBuffer.m_Buffer.data(),
sendBuffer.m_Position, m_Rank - 1, 1, m_Comm,
", aggregation Isend64 data at iteration " +
std::to_string(step));
const std::vector<MPI_Request> requestsISend64 = helper::Isend64(
sendBuffer.Data(), sendBuffer.m_Position, m_Rank - 1, 1, m_Comm,
", aggregation Isend64 data at iteration " +
std::to_string(step));

requests[0].insert(requests[0].end(), requestsISend64.begin(),
requestsISend64.end());
Expand All @@ -88,8 +89,8 @@ std::vector<std::vector<MPI_Request>> MPIChain::IExchange(BufferSTL &bufferSTL,
", aggregation waiting for receiver size at iteration " +
std::to_string(step) + "\n");

BufferSTL &receiveBuffer = GetReceiver(bufferSTL);
ResizeUpdateBufferSTL(
format::Buffer &receiveBuffer = GetReceiver(buffer);
ResizeUpdateBuffer(
bufferSize, receiveBuffer,
"in aggregation, when resizing receiving buffer to size " +
std::to_string(bufferSize));
Expand All @@ -98,8 +99,8 @@ std::vector<std::vector<MPI_Request>> MPIChain::IExchange(BufferSTL &bufferSTL,
if (bufferSize > 0)
{
requests[1] =
helper::Irecv64(receiveBuffer.m_Buffer.data(),
receiveBuffer.m_Position, m_Rank + 1, 1, m_Comm,
helper::Irecv64(receiveBuffer.Data(), receiveBuffer.m_Position,
m_Rank + 1, 1, m_Comm,
", aggregation Irecv64 data at iteration " +
std::to_string(step));
}
Expand All @@ -109,7 +110,7 @@ std::vector<std::vector<MPI_Request>> MPIChain::IExchange(BufferSTL &bufferSTL,
}

std::vector<std::vector<MPI_Request>>
MPIChain::IExchangeAbsolutePosition(BufferSTL &bufferSTL, const int step)
MPIChain::IExchangeAbsolutePosition(format::Buffer &buffer, const int step)
{
if (m_Size == 1)
{
Expand All @@ -129,14 +130,13 @@ MPIChain::IExchangeAbsolutePosition(BufferSTL &bufferSTL, const int step)
if (step == 0)
{
m_SizeSend =
(m_Rank == 0) ? bufferSTL.m_AbsolutePosition : bufferSTL.m_Position;
(m_Rank == 0) ? buffer.m_AbsolutePosition : buffer.m_Position;
}

if (m_Rank == step)
{
m_ExchangeAbsolutePosition =
(m_Rank == 0) ? m_SizeSend
: m_SizeSend + bufferSTL.m_AbsolutePosition;
(m_Rank == 0) ? m_SizeSend : m_SizeSend + buffer.m_AbsolutePosition;

helper::CheckMPIReturn(
MPI_Isend(&m_ExchangeAbsolutePosition, 1, ADIOS2_MPI_SIZE_T,
Expand All @@ -147,8 +147,8 @@ MPIChain::IExchangeAbsolutePosition(BufferSTL &bufferSTL, const int step)
else if (m_Rank == destination)
{
helper::CheckMPIReturn(
MPI_Irecv(&bufferSTL.m_AbsolutePosition, 1, ADIOS2_MPI_SIZE_T, step,
0, m_Comm, &requests[1][0]),
MPI_Irecv(&buffer.m_AbsolutePosition, 1, ADIOS2_MPI_SIZE_T, step, 0,
m_Comm, &requests[1][0]),
", aggregation Irecv absolute position at iteration " +
std::to_string(step) + "\n");
}
Expand Down Expand Up @@ -235,9 +235,9 @@ void MPIChain::SwapBuffers(const int /*step*/) noexcept

void MPIChain::ResetBuffers() noexcept { m_CurrentBufferOrder = 0; }

BufferSTL &MPIChain::GetConsumerBuffer(BufferSTL &bufferSTL)
format::Buffer &MPIChain::GetConsumerBuffer(format::Buffer &buffer)
{
return GetSender(bufferSTL);
return GetSender(buffer);
}

// PRIVATE
Expand Down Expand Up @@ -276,35 +276,48 @@ void MPIChain::HandshakeLinks()
}
}

BufferSTL &MPIChain::GetSender(BufferSTL &bufferSTL)
format::Buffer &MPIChain::GetSender(format::Buffer &buffer)
{
if (m_CurrentBufferOrder == 0)
{
return bufferSTL;
return buffer;
}
else
{
return m_Buffers.front();
return *m_Buffers.front();
}
}

BufferSTL &MPIChain::GetReceiver(BufferSTL &bufferSTL)
format::Buffer &MPIChain::GetReceiver(format::Buffer &buffer)
{
if (m_CurrentBufferOrder == 0)
{
return m_Buffers.front();
return *m_Buffers.front();
}
else
{
return bufferSTL;
return buffer;
}
}

void MPIChain::ResizeUpdateBufferSTL(const size_t newSize, BufferSTL &bufferSTL,
const std::string hint)
void MPIChain::ResizeUpdateBuffer(const size_t newSize, format::Buffer &buffer,
const std::string hint)
{
bufferSTL.Resize(newSize, hint);
bufferSTL.m_Position = bufferSTL.m_Buffer.size();
if (buffer.m_FixedSize > 0)
{
if (newSize > buffer.m_FixedSize)
{
throw std::invalid_argument(
"ERROR: requesting new size: " + std::to_string(newSize) +
" bytes, for fixed size buffer " +
std::to_string(buffer.m_FixedSize) + " of type " +
buffer.m_Type + ", allocate more memory\n");
}
return; // do nothing if fixed size is enough
}

buffer.Resize(newSize, hint);
buffer.m_Position = newSize;
}

} // end namespace aggregator
Expand Down
Loading

0 comments on commit 7a52c2f

Please sign in to comment.