Skip to content

Commit

Permalink
Merge pull request #2363 from JasonRuonanWang/ssc-rget
Browse files Browse the repository at this point in the history
Clean up SSC and add more tests to ensure recent changes work correctly
  • Loading branch information
JasonRuonanWang authored Jul 6, 2020
2 parents cc4d398 + 341b7f1 commit 003dc7e
Show file tree
Hide file tree
Showing 7 changed files with 642 additions and 143 deletions.
71 changes: 21 additions & 50 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,51 +46,6 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,

SscReader::~SscReader() { TAU_SCOPED_TIMER_FUNC(); }

void SscReader::GetOneSidedPostPush()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_post(m_MpiAllWritersGroup, 0, m_MpiWin);
}

void SscReader::GetOneSidedFencePush()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_fence(0, m_MpiWin);
}

void SscReader::GetOneSidedPostPull()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_start(m_MpiAllWritersGroup, 0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Get(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR,
i.first, 0, i.second.second, MPI_CHAR, m_MpiWin);
}
}

void SscReader::GetOneSidedFencePull()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Get(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR,
i.first, 0, i.second.second, MPI_CHAR, m_MpiWin);
}
}

void SscReader::GetTwoSided()
{
TAU_SCOPED_TIMER_FUNC();
for (const auto &i : m_AllReceivingWriterRanks)
{
m_MpiRequests.emplace_back();
MPI_Irecv(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR,
i.first, 0, m_StreamComm, &m_MpiRequests.back());
}
}

StepStatus SscReader::BeginStep(const StepMode stepMode,
const float timeoutSeconds)
{
Expand Down Expand Up @@ -217,23 +172,39 @@ void SscReader::EndStep()

if (m_MpiMode == "twosided")
{
GetTwoSided();
for (const auto &i : m_AllReceivingWriterRanks)
{
m_MpiRequests.emplace_back();
MPI_Irecv(m_Buffer.data() + i.second.first, i.second.second,
MPI_CHAR, i.first, 0, m_StreamComm,
&m_MpiRequests.back());
}
}
else if (m_MpiMode == "onesidedfencepush")
{
GetOneSidedFencePush();
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
GetOneSidedPostPush();
MPI_Win_post(m_MpiAllWritersGroup, 0, m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
GetOneSidedFencePull();
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Get(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR,
i.first, 0, i.second.second, MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedpostpull")
{
GetOneSidedPostPull();
MPI_Win_start(m_MpiAllWritersGroup, 0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Get(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR,
i.first, 0, i.second.second, MPI_CHAR, m_MpiWin);
}
}
}

Expand Down
5 changes: 0 additions & 5 deletions source/adios2/engine/ssc/SscReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ class SscReader : public Engine
void SyncMpiPattern();
void SyncWritePattern();
void SyncReadPattern();
void GetOneSidedFencePush();
void GetOneSidedPostPush();
void GetOneSidedFencePull();
void GetOneSidedPostPull();
void GetTwoSided();

#define declare_type(T) \
void DoGetSync(Variable<T> &, T *) final; \
Expand Down
131 changes: 49 additions & 82 deletions source/adios2/engine/ssc/SscWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds)
{
TAU_SCOPED_TIMER_FUNC();

MpiWait();

if (m_InitialStep)
{
m_InitialStep = false;
Expand All @@ -58,6 +56,11 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds)
++m_CurrentStep;
}

if (m_CurrentStep > 1)
{
MpiWait();
}

return StepStatus::OK;
}

Expand All @@ -69,56 +72,6 @@ size_t SscWriter::CurrentStep() const

void SscWriter::PerformPuts() { TAU_SCOPED_TIMER_FUNC(); }

void SscWriter::PutOneSidedPostPush()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_start(m_MpiAllReadersGroup, 0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first,
i.second.first, m_Buffer.size(), MPI_CHAR, m_MpiWin);
}
m_NeedWait = true;
}

void SscWriter::PutOneSidedFencePush()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first,
i.second.first, m_Buffer.size(), MPI_CHAR, m_MpiWin);
}
m_NeedWait = true;
}

void SscWriter::PutOneSidedPostPull()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_post(m_MpiAllReadersGroup, 0, m_MpiWin);
m_NeedWait = true;
}

void SscWriter::PutOneSidedFencePull()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_fence(0, m_MpiWin);
m_NeedWait = true;
}

void SscWriter::PutTwoSided()
{
TAU_SCOPED_TIMER_FUNC();
for (const auto &i : m_AllSendingReaderRanks)
{
m_MpiRequests.emplace_back();
MPI_Isend(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first, 0,
m_StreamComm, &m_MpiRequests.back());
}
m_NeedWait = true;
}

void SscWriter::EndStep()
{
TAU_SCOPED_TIMER_FUNC();
Expand All @@ -143,23 +96,38 @@ void SscWriter::EndStep()
{
if (m_MpiMode == "twosided")
{
PutTwoSided();
for (const auto &i : m_AllSendingReaderRanks)
{
m_MpiRequests.emplace_back();
MPI_Isend(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first,
0, m_StreamComm, &m_MpiRequests.back());
}
}
else if (m_MpiMode == "onesidedfencepush")
{
PutOneSidedFencePush();
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first,
i.second.first, m_Buffer.size(), MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedpostpush")
{
PutOneSidedPostPush();
MPI_Win_start(m_MpiAllReadersGroup, 0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first,
i.second.first, m_Buffer.size(), MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedfencepull")
{
PutOneSidedFencePull();
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
PutOneSidedPostPull();
MPI_Win_post(m_MpiAllReadersGroup, 0, m_MpiWin);
}
}
}
Expand All @@ -170,31 +138,27 @@ void SscWriter::Flush(const int transportIndex) { TAU_SCOPED_TIMER_FUNC(); }

void SscWriter::MpiWait()
{
if (m_NeedWait)
if (m_MpiMode == "twosided")
{
if (m_MpiMode == "twosided")
{
MPI_Status statuses[m_MpiRequests.size()];
MPI_Waitall(m_MpiRequests.size(), m_MpiRequests.data(), statuses);
m_MpiRequests.clear();
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_complete(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_wait(m_MpiWin);
}
m_NeedWait = false;
MPI_Status statuses[m_MpiRequests.size()];
MPI_Waitall(m_MpiRequests.size(), m_MpiRequests.data(), statuses);
m_MpiRequests.clear();
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_complete(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_wait(m_MpiWin);
}
}

Expand Down Expand Up @@ -369,7 +333,10 @@ void SscWriter::DoClose(const int transportIndex)
{
TAU_SCOPED_TIMER_FUNC();

MpiWait();
if (m_CurrentStep > 0)
{
MpiWait();
}

m_Buffer[0] = 1;

Expand Down
6 changes: 0 additions & 6 deletions source/adios2/engine/ssc/SscWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class SscWriter : public Engine
MPI_Group m_MpiAllReadersGroup;
MPI_Comm m_StreamComm;
std::string m_MpiMode = "twosided";
bool m_NeedWait = false;
std::vector<MPI_Request> m_MpiRequests;

int m_StreamRank;
Expand All @@ -65,11 +64,6 @@ class SscWriter : public Engine
void SyncMpiPattern();
void SyncWritePattern();
void SyncReadPattern();
void PutOneSidedFencePush();
void PutOneSidedPostPush();
void PutOneSidedFencePull();
void PutOneSidedPostPull();
void PutTwoSided();
void MpiWait();

#define declare_type(T) \
Expand Down
6 changes: 6 additions & 0 deletions testing/adios2/engine/ssc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ if(ADIOS2_HAVE_MPI)
gtest_add_tests_helper(Base MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscBase.MPI "" TRUE)

gtest_add_tests_helper(OnlyOneStep MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscOnlyOneStep.MPI "" TRUE)

gtest_add_tests_helper(OnlyTwoSteps MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscOnlyTwoSteps.MPI "" TRUE)

gtest_add_tests_helper(OneSidedFencePush MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscOneSidedFencePush.MPI "" TRUE)

Expand Down
Loading

0 comments on commit 003dc7e

Please sign in to comment.