Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring Buffer options to format components #1605

Merged
merged 2 commits into from
Jul 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ add_library(adios2

engine/nullcore/NullCoreWriter.cpp engine/nullcore/NullCoreWriter.tcc
#toolkit
toolkit/format/BufferSTL.cpp
toolkit/format/buffer/Buffer.cpp
toolkit/format/buffer/heap/BufferSTL.cpp

toolkit/format/bp3/BP3Base.cpp toolkit/format/bp3/BP3Base.tcc
toolkit/format/bp3/BP3Serializer.cpp toolkit/format/bp3/BP3Serializer.tcc
Expand Down Expand Up @@ -104,6 +105,8 @@ if(UNIX)
endif()

if(ADIOS2_HAVE_SysVShMem)
target_sources(adios2 PRIVATE toolkit/format/buffer/ipc/BufferSystemV.cpp)

target_sources(adios2 PRIVATE toolkit/transport/shm/ShmSystemV.cpp)
endif()

Expand Down Expand Up @@ -143,7 +146,6 @@ if(ADIOS2_HAVE_SSC)
target_link_libraries(adios2 PRIVATE nlohmann_json)
endif()


if(ADIOS2_HAVE_SST)
add_subdirectory(toolkit/sst)
target_sources(adios2 PRIVATE
Expand Down
8 changes: 4 additions & 4 deletions source/adios2/engine/bp3/BP3Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,12 @@ void BP3Writer::AggregateWriteData(const bool isFinal, const int transportIndex)

if (m_BP3Serializer.m_Aggregator.m_IsConsumer)
{
const BufferSTL &bufferSTL =
const format::Buffer &bufferSTL =
m_BP3Serializer.m_Aggregator.GetConsumerBuffer(
m_BP3Serializer.m_Data);

m_FileDataManager.WriteFiles(bufferSTL.m_Buffer.data(),
bufferSTL.m_Position, transportIndex);
m_FileDataManager.WriteFiles(bufferSTL.Data(), bufferSTL.m_Position,
transportIndex);

m_FileDataManager.FlushFiles(transportIndex);
}
Expand All @@ -374,7 +374,7 @@ void BP3Writer::AggregateWriteData(const bool isFinal, const int transportIndex)

if (isFinal) // Write metadata footer
{
BufferSTL &bufferSTL = m_BP3Serializer.m_Data;
format::BufferSTL &bufferSTL = m_BP3Serializer.m_Data;
m_BP3Serializer.ResetBuffer(bufferSTL, false, false);

m_BP3Serializer.AggregateCollectiveMetadata(
Expand Down
13 changes: 6 additions & 7 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ void BP4Writer::InitBPBuffer()
// throw std::invalid_argument(
// "ADIOS2: OpenMode Append hasn't been implemented, yet");
// TODO: Get last pg timestep and update timestep counter in
BufferSTL preMetadataIndex;
format::BufferSTL preMetadataIndex;
size_t preMetadataIndexFileSize;

if (m_BP4Serializer.m_RankMPI == 0)
Expand Down Expand Up @@ -411,7 +411,7 @@ void BP4Writer::WriteProfilingJSONFile()

/*write the content of metadata index file*/
void BP4Writer::PopulateMetadataIndexFileContent(
BufferSTL &b, const uint64_t currentStep, const uint64_t mpirank,
format::BufferSTL &b, const uint64_t currentStep, const uint64_t mpirank,
const uint64_t pgIndexStart, const uint64_t variablesIndexStart,
const uint64_t attributesIndexStart, const uint64_t currentStepEndPos,
const uint64_t currentTimeStamp)
Expand Down Expand Up @@ -486,7 +486,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
m_BP4Serializer.m_Metadata.m_Position +
m_BP4Serializer.m_PreMetadataFileLength;

BufferSTL metadataIndex;
format::BufferSTL metadataIndex;
metadataIndex.Resize(128, "BP4 Index Table Entry");

uint64_t currentStep;
Expand Down Expand Up @@ -575,14 +575,13 @@ void BP4Writer::AggregateWriteData(const bool isFinal, const int transportIndex)

if (m_BP4Serializer.m_Aggregator.m_IsConsumer)
{
const BufferSTL &bufferSTL =
const format::Buffer &bufferSTL =
m_BP4Serializer.m_Aggregator.GetConsumerBuffer(
m_BP4Serializer.m_Data);
if (bufferSTL.m_Position > 0)
{
m_FileDataManager.WriteFiles(bufferSTL.m_Buffer.data(),
bufferSTL.m_Position,
transportIndex);
m_FileDataManager.WriteFiles(
bufferSTL.Data(), bufferSTL.m_Position, transportIndex);

m_FileDataManager.FlushFiles(transportIndex);
}
Expand Down
8 changes: 4 additions & 4 deletions source/adios2/engine/bp4/BP4Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ class BP4Writer : public core::Engine
void UpdateActiveFlag(const bool active);

void PopulateMetadataIndexFileContent(
BufferSTL &buffer, const uint64_t currentStep, const uint64_t mpirank,
const uint64_t pgIndexStart, const uint64_t variablesIndexStart,
const uint64_t attributesIndexStart, const uint64_t currentStepEndPos,
const uint64_t currentTimeStamp);
format::BufferSTL &buffer, const uint64_t currentStep,
const uint64_t mpirank, const uint64_t pgIndexStart,
const uint64_t variablesIndexStart, const uint64_t attributesIndexStart,
const uint64_t currentStepEndPos, const uint64_t currentTimeStamp);

void WriteCollectiveMetadataFile(const bool isFinal = false);

Expand Down
4 changes: 2 additions & 2 deletions source/adios2/toolkit/aggregator/mpi/MPIAggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ void MPIAggregator::SwapBuffers(const int step) noexcept {}

void MPIAggregator::ResetBuffers() noexcept {}

BufferSTL &MPIAggregator::GetConsumerBuffer(BufferSTL &bufferSTL)
format::Buffer &MPIAggregator::GetConsumerBuffer(format::Buffer &buffer)
{
return bufferSTL;
return buffer;
}

void MPIAggregator::Close()
Expand Down
12 changes: 7 additions & 5 deletions source/adios2/toolkit/aggregator/mpi/MPIAggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
#ifndef ADIOS2_TOOLKIT_AGGREGATOR_MPI_MPIAGGREGATOR_H_
#define ADIOS2_TOOLKIT_AGGREGATOR_MPI_MPIAGGREGATOR_H_

#include <memory> //std::unique_ptr

#include "adios2/common/ADIOSMPI.h"
#include "adios2/common/ADIOSTypes.h"
#include "adios2/toolkit/format/BufferSTL.h"
#include "adios2/toolkit/format/buffer/Buffer.h"

namespace adios2
{
Expand Down Expand Up @@ -58,10 +60,10 @@ class MPIAggregator
virtual void Init(const size_t subStreams, MPI_Comm parentComm);

virtual std::vector<std::vector<MPI_Request>>
IExchange(BufferSTL &bufferSTL, const int step) = 0;
IExchange(format::Buffer &buffer, const int step) = 0;

virtual std::vector<std::vector<MPI_Request>>
IExchangeAbsolutePosition(BufferSTL &bufferSTL, const int step) = 0;
IExchangeAbsolutePosition(format::Buffer &buffer, const int step) = 0;

virtual void
WaitAbsolutePosition(std::vector<std::vector<MPI_Request>> &requests,
Expand All @@ -74,7 +76,7 @@ class MPIAggregator

virtual void ResetBuffers() noexcept;

virtual BufferSTL &GetConsumerBuffer(BufferSTL &bufferSTL);
virtual format::Buffer &GetConsumerBuffer(format::Buffer &buffer);

/** closes current aggregator, frees m_Comm */
void Close();
Expand All @@ -88,7 +90,7 @@ class MPIAggregator
void HandshakeRank(const int rank = 0);

/** assigning extra buffers for aggregation */
std::vector<BufferSTL> m_Buffers;
std::vector<std::unique_ptr<format::Buffer>> m_Buffers;
};

} // end namespace aggregator
Expand Down
75 changes: 44 additions & 31 deletions source/adios2/toolkit/aggregator/mpi/MPIChain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "adios2/common/ADIOSMPI.h"
#include "adios2/helper/adiosFunctions.h" //helper::CheckMPIReturn

#include "adios2/toolkit/format/buffer/heap/BufferSTL.h"

namespace adios2
{
namespace aggregator
Expand All @@ -28,19 +30,19 @@ void MPIChain::Init(const size_t subStreams, MPI_Comm parentComm)
// add a receiving buffer except for the last rank (only sends)
if (m_Rank < m_Size)
{
m_Buffers.emplace_back(); // just one for now
m_Buffers.emplace_back(new format::BufferSTL()); // just one for now
}
}

std::vector<std::vector<MPI_Request>> MPIChain::IExchange(BufferSTL &bufferSTL,
const int step)
std::vector<std::vector<MPI_Request>>
MPIChain::IExchange(format::Buffer &buffer, const int step)
{
if (m_Size == 1)
{
return std::vector<std::vector<MPI_Request>>();
}

BufferSTL &sendBuffer = GetSender(bufferSTL);
format::Buffer &sendBuffer = GetSender(buffer);
const int endRank = m_Size - 1 - step;
const bool sender = (m_Rank >= 1 && m_Rank <= endRank) ? true : false;
const bool receiver = (m_Rank < endRank) ? true : false;
Expand All @@ -61,11 +63,10 @@ std::vector<std::vector<MPI_Request>> MPIChain::IExchange(BufferSTL &bufferSTL,
if (sendBuffer.m_Position > 0)
{

const std::vector<MPI_Request> requestsISend64 =
helper::Isend64(sendBuffer.m_Buffer.data(),
sendBuffer.m_Position, m_Rank - 1, 1, m_Comm,
", aggregation Isend64 data at iteration " +
std::to_string(step));
const std::vector<MPI_Request> requestsISend64 = helper::Isend64(
sendBuffer.Data(), sendBuffer.m_Position, m_Rank - 1, 1, m_Comm,
", aggregation Isend64 data at iteration " +
std::to_string(step));

requests[0].insert(requests[0].end(), requestsISend64.begin(),
requestsISend64.end());
Expand All @@ -88,8 +89,8 @@ std::vector<std::vector<MPI_Request>> MPIChain::IExchange(BufferSTL &bufferSTL,
", aggregation waiting for receiver size at iteration " +
std::to_string(step) + "\n");

BufferSTL &receiveBuffer = GetReceiver(bufferSTL);
ResizeUpdateBufferSTL(
format::Buffer &receiveBuffer = GetReceiver(buffer);
ResizeUpdateBuffer(
bufferSize, receiveBuffer,
"in aggregation, when resizing receiving buffer to size " +
std::to_string(bufferSize));
Expand All @@ -98,8 +99,8 @@ std::vector<std::vector<MPI_Request>> MPIChain::IExchange(BufferSTL &bufferSTL,
if (bufferSize > 0)
{
requests[1] =
helper::Irecv64(receiveBuffer.m_Buffer.data(),
receiveBuffer.m_Position, m_Rank + 1, 1, m_Comm,
helper::Irecv64(receiveBuffer.Data(), receiveBuffer.m_Position,
m_Rank + 1, 1, m_Comm,
", aggregation Irecv64 data at iteration " +
std::to_string(step));
}
Expand All @@ -109,7 +110,7 @@ std::vector<std::vector<MPI_Request>> MPIChain::IExchange(BufferSTL &bufferSTL,
}

std::vector<std::vector<MPI_Request>>
MPIChain::IExchangeAbsolutePosition(BufferSTL &bufferSTL, const int step)
MPIChain::IExchangeAbsolutePosition(format::Buffer &buffer, const int step)
{
if (m_Size == 1)
{
Expand All @@ -129,14 +130,13 @@ MPIChain::IExchangeAbsolutePosition(BufferSTL &bufferSTL, const int step)
if (step == 0)
{
m_SizeSend =
(m_Rank == 0) ? bufferSTL.m_AbsolutePosition : bufferSTL.m_Position;
(m_Rank == 0) ? buffer.m_AbsolutePosition : buffer.m_Position;
}

if (m_Rank == step)
{
m_ExchangeAbsolutePosition =
(m_Rank == 0) ? m_SizeSend
: m_SizeSend + bufferSTL.m_AbsolutePosition;
(m_Rank == 0) ? m_SizeSend : m_SizeSend + buffer.m_AbsolutePosition;

helper::CheckMPIReturn(
MPI_Isend(&m_ExchangeAbsolutePosition, 1, ADIOS2_MPI_SIZE_T,
Expand All @@ -147,8 +147,8 @@ MPIChain::IExchangeAbsolutePosition(BufferSTL &bufferSTL, const int step)
else if (m_Rank == destination)
{
helper::CheckMPIReturn(
MPI_Irecv(&bufferSTL.m_AbsolutePosition, 1, ADIOS2_MPI_SIZE_T, step,
0, m_Comm, &requests[1][0]),
MPI_Irecv(&buffer.m_AbsolutePosition, 1, ADIOS2_MPI_SIZE_T, step, 0,
m_Comm, &requests[1][0]),
", aggregation Irecv absolute position at iteration " +
std::to_string(step) + "\n");
}
Expand Down Expand Up @@ -235,9 +235,9 @@ void MPIChain::SwapBuffers(const int /*step*/) noexcept

void MPIChain::ResetBuffers() noexcept { m_CurrentBufferOrder = 0; }

BufferSTL &MPIChain::GetConsumerBuffer(BufferSTL &bufferSTL)
format::Buffer &MPIChain::GetConsumerBuffer(format::Buffer &buffer)
{
return GetSender(bufferSTL);
return GetSender(buffer);
}

// PRIVATE
Expand Down Expand Up @@ -276,35 +276,48 @@ void MPIChain::HandshakeLinks()
}
}

BufferSTL &MPIChain::GetSender(BufferSTL &bufferSTL)
format::Buffer &MPIChain::GetSender(format::Buffer &buffer)
{
if (m_CurrentBufferOrder == 0)
{
return bufferSTL;
return buffer;
}
else
{
return m_Buffers.front();
return *m_Buffers.front();
}
}

BufferSTL &MPIChain::GetReceiver(BufferSTL &bufferSTL)
format::Buffer &MPIChain::GetReceiver(format::Buffer &buffer)
{
if (m_CurrentBufferOrder == 0)
{
return m_Buffers.front();
return *m_Buffers.front();
}
else
{
return bufferSTL;
return buffer;
}
}

void MPIChain::ResizeUpdateBufferSTL(const size_t newSize, BufferSTL &bufferSTL,
const std::string hint)
void MPIChain::ResizeUpdateBuffer(const size_t newSize, format::Buffer &buffer,
const std::string hint)
{
bufferSTL.Resize(newSize, hint);
bufferSTL.m_Position = bufferSTL.m_Buffer.size();
if (buffer.m_FixedSize > 0)
{
if (newSize > buffer.m_FixedSize)
{
throw std::invalid_argument(
"ERROR: requesting new size: " + std::to_string(newSize) +
" bytes, for fixed size buffer " +
std::to_string(buffer.m_FixedSize) + " of type " +
buffer.m_Type + ", allocate more memory\n");
}
return; // do nothing if fixed size is enough
}

buffer.Resize(newSize, hint);
buffer.m_Position = newSize;
}

} // end namespace aggregator
Expand Down
Loading