Skip to content

Commit

Permalink
mpi_aggregator: keep requests for actual data exchange internal
Browse files Browse the repository at this point in the history
  • Loading branch information
germasch committed Apr 1, 2019
1 parent dc1730f commit f391c41
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 34 deletions.
5 changes: 2 additions & 3 deletions source/adios2/engine/bp3/BP3Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,7 @@ void BP3Writer::AggregateWriteData(const bool isFinal, const int transportIndex)
// async?
for (int r = 0; r < m_BP3Serializer.m_Aggregator.m_Size; ++r)
{
std::vector<MPI_Request> dataRequests =
m_BP3Serializer.m_Aggregator.IExchange(m_BP3Serializer.m_Data, r);
m_BP3Serializer.m_Aggregator.IExchange(m_BP3Serializer.m_Data, r);

m_BP3Serializer.m_Aggregator.IExchangeAbsolutePosition(
m_BP3Serializer.m_Data, r);
Expand All @@ -350,7 +349,7 @@ void BP3Writer::AggregateWriteData(const bool isFinal, const int transportIndex)

m_BP3Serializer.m_Aggregator.WaitAbsolutePosition(r);

m_BP3Serializer.m_Aggregator.Wait(dataRequests, r);
m_BP3Serializer.m_Aggregator.Wait(r);
m_BP3Serializer.m_Aggregator.SwapBuffers(r);
}

Expand Down
5 changes: 2 additions & 3 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,7 @@ void BP4Writer::AggregateWriteData(const bool isFinal, const int transportIndex)
// async?
for (int r = 0; r < m_BP4Serializer.m_Aggregator.m_Size; ++r)
{
std::vector<MPI_Request> dataRequests =
m_BP4Serializer.m_Aggregator.IExchange(m_BP4Serializer.m_Data, r);
m_BP4Serializer.m_Aggregator.IExchange(m_BP4Serializer.m_Data, r);

m_BP4Serializer.m_Aggregator.IExchangeAbsolutePosition(
m_BP4Serializer.m_Data, r);
Expand All @@ -550,7 +549,7 @@ void BP4Writer::AggregateWriteData(const bool isFinal, const int transportIndex)

m_BP4Serializer.m_Aggregator.WaitAbsolutePosition(r);

m_BP4Serializer.m_Aggregator.Wait(dataRequests, r);
m_BP4Serializer.m_Aggregator.Wait(r);
m_BP4Serializer.m_Aggregator.SwapBuffers(r);
}

Expand Down
10 changes: 2 additions & 8 deletions source/adios2/toolkit/aggregator/mpi/MPIAggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,8 @@ MPIAggregator::~MPIAggregator()

void MPIAggregator::Init(const size_t subStreams, MPI_Comm parentComm) {}

std::vector<MPI_Request> MPIAggregator::IExchange(BufferSTL & /**bufferSTL*/,
const int /** step*/)
void MPIAggregator::IExchange(BufferSTL & /**bufferSTL*/, const int /** step*/)
{
std::vector<MPI_Request> requests;
return requests;
}

void MPIAggregator::IExchangeAbsolutePosition(BufferSTL &bufferSTL,
Expand Down Expand Up @@ -103,10 +100,7 @@ void MPIAggregator::WaitAbsolutePosition(const int step)
}
}

void MPIAggregator::Wait(std::vector<MPI_Request> & /**requests*/,
const int /**step*/)
{
}
void MPIAggregator::Wait(const int /**step*/) {}

void MPIAggregator::SwapBuffers(const int step) noexcept {}

Expand Down
5 changes: 2 additions & 3 deletions source/adios2/toolkit/aggregator/mpi/MPIAggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,13 @@ class MPIAggregator

virtual void Init(const size_t subStreams, MPI_Comm parentComm);

virtual std::vector<MPI_Request> IExchange(BufferSTL &bufferSTL,
const int step);
virtual void IExchange(BufferSTL &bufferSTL, const int step);

void IExchangeAbsolutePosition(BufferSTL &bufferSTL, const int step);

void WaitAbsolutePosition(const int step);

virtual void Wait(std::vector<MPI_Request> &requests, const int step);
virtual void Wait(const int step);

virtual void SwapBuffers(const int step) noexcept;

Expand Down
23 changes: 9 additions & 14 deletions source/adios2/toolkit/aggregator/mpi/MPIChain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,33 +33,30 @@ void MPIChain::Init(const size_t subStreams, MPI_Comm parentComm)
}
}

std::vector<MPI_Request> MPIChain::IExchange(BufferSTL &bufferSTL,
const int step)
void MPIChain::IExchange(BufferSTL &bufferSTL, const int step)
{
if (m_Size == 1)
{
return std::vector<MPI_Request>();
return;
}

BufferSTL &sendBuffer = GetSender(bufferSTL);
const int endRank = m_Size - 1 - step;
const bool sender = (m_Rank >= 1 && m_Rank <= endRank) ? true : false;
const bool receiver = (m_Rank < endRank) ? true : false;

std::vector<MPI_Request> requests(3);

if (sender) // sender
{
helper::CheckMPIReturn(MPI_Isend(&sendBuffer.m_Position, 1,
ADIOS2_MPI_SIZE_T, m_Rank - 1, 0,
m_Comm, &requests[0]),
m_Comm, &m_DataRequests[0]),
", aggregation Isend size at iteration " +
std::to_string(step) + "\n");

helper::CheckMPIReturn(
MPI_Isend(sendBuffer.m_Buffer.data(),
static_cast<int>(sendBuffer.m_Position), MPI_CHAR,
m_Rank - 1, 1, m_Comm, &requests[1]),
m_Rank - 1, 1, m_Comm, &m_DataRequests[1]),
", aggregation Isend data at iteration " + std::to_string(step) +
"\n");
}
Expand Down Expand Up @@ -89,15 +86,13 @@ std::vector<MPI_Request> MPIChain::IExchange(BufferSTL &bufferSTL,
helper::CheckMPIReturn(
MPI_Irecv(receiveBuffer.m_Buffer.data(),
static_cast<int>(receiveBuffer.m_Position), MPI_CHAR,
m_Rank + 1, 1, m_Comm, &requests[2]),
m_Rank + 1, 1, m_Comm, &m_DataRequests[2]),
", aggregation Irecv data at iteration " + std::to_string(step) +
"\n");
}

return requests;
}

void MPIChain::Wait(std::vector<MPI_Request> &requests, const int step)
void MPIChain::Wait(const int step)
{
if (m_Size == 1)
{
Expand All @@ -112,20 +107,20 @@ void MPIChain::Wait(std::vector<MPI_Request> &requests, const int step)
if (receiver)
{
helper::CheckMPIReturn(
MPI_Wait(&requests[2], &status),
MPI_Wait(&m_DataRequests[2], &status),
", aggregation waiting for receiver data at iteration " +
std::to_string(step) + "\n");
}

if (sender)
{
helper::CheckMPIReturn(
MPI_Wait(&requests[0], &status),
MPI_Wait(&m_DataRequests[0], &status),
", aggregation waiting for sender size at iteration " +
std::to_string(step) + "\n");

helper::CheckMPIReturn(
MPI_Wait(&requests[1], &status),
MPI_Wait(&m_DataRequests[1], &status),
", aggregation waiting for sender data at iteration " +
std::to_string(step) + "\n");
}
Expand Down
7 changes: 4 additions & 3 deletions source/adios2/toolkit/aggregator/mpi/MPIChain.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ class MPIChain : public MPIAggregator

void Init(const size_t subStreams, MPI_Comm parentComm) final;

std::vector<MPI_Request> IExchange(BufferSTL &bufferSTL,
const int step) final;
void IExchange(BufferSTL &bufferSTL, const int step) final;

void Wait(std::vector<MPI_Request> &request, const int step) final;
void Wait(const int step) final;

void SwapBuffers(const int step) noexcept final;

Expand Down Expand Up @@ -73,6 +72,8 @@ class MPIChain : public MPIAggregator
*/
void ResizeUpdateBufferSTL(const size_t newSize, BufferSTL &bufferSTL,
const std::string hint);

std::array<MPI_Request, 3> m_DataRequests;
};

} // end namespace aggregator
Expand Down

0 comments on commit f391c41

Please sign in to comment.