Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up SSC and add more tests to ensure recent changes work correctly #2363

Merged
merged 3 commits into from
Jul 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 21 additions & 50 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,51 +46,6 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,

SscReader::~SscReader() { TAU_SCOPED_TIMER_FUNC(); }

void SscReader::GetOneSidedPostPush()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_post(m_MpiAllWritersGroup, 0, m_MpiWin);
}

void SscReader::GetOneSidedFencePush()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_fence(0, m_MpiWin);
}

void SscReader::GetOneSidedPostPull()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_start(m_MpiAllWritersGroup, 0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Get(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR,
i.first, 0, i.second.second, MPI_CHAR, m_MpiWin);
}
}

void SscReader::GetOneSidedFencePull()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Get(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR,
i.first, 0, i.second.second, MPI_CHAR, m_MpiWin);
}
}

void SscReader::GetTwoSided()
{
TAU_SCOPED_TIMER_FUNC();
for (const auto &i : m_AllReceivingWriterRanks)
{
m_MpiRequests.emplace_back();
MPI_Irecv(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR,
i.first, 0, m_StreamComm, &m_MpiRequests.back());
}
}

StepStatus SscReader::BeginStep(const StepMode stepMode,
const float timeoutSeconds)
{
Expand Down Expand Up @@ -217,23 +172,39 @@ void SscReader::EndStep()

if (m_MpiMode == "twosided")
{
GetTwoSided();
for (const auto &i : m_AllReceivingWriterRanks)
{
m_MpiRequests.emplace_back();
MPI_Irecv(m_Buffer.data() + i.second.first, i.second.second,
MPI_CHAR, i.first, 0, m_StreamComm,
&m_MpiRequests.back());
}
}
else if (m_MpiMode == "onesidedfencepush")
{
GetOneSidedFencePush();
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
GetOneSidedPostPush();
MPI_Win_post(m_MpiAllWritersGroup, 0, m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
GetOneSidedFencePull();
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Get(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR,
i.first, 0, i.second.second, MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedpostpull")
{
GetOneSidedPostPull();
MPI_Win_start(m_MpiAllWritersGroup, 0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Get(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR,
i.first, 0, i.second.second, MPI_CHAR, m_MpiWin);
}
}
}

Expand Down
5 changes: 0 additions & 5 deletions source/adios2/engine/ssc/SscReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ class SscReader : public Engine
void SyncMpiPattern();
void SyncWritePattern();
void SyncReadPattern();
void GetOneSidedFencePush();
void GetOneSidedPostPush();
void GetOneSidedFencePull();
void GetOneSidedPostPull();
void GetTwoSided();

#define declare_type(T) \
void DoGetSync(Variable<T> &, T *) final; \
Expand Down
131 changes: 49 additions & 82 deletions source/adios2/engine/ssc/SscWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds)
{
TAU_SCOPED_TIMER_FUNC();

MpiWait();

if (m_InitialStep)
{
m_InitialStep = false;
Expand All @@ -58,6 +56,11 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds)
++m_CurrentStep;
}

if (m_CurrentStep > 1)
{
MpiWait();
}

return StepStatus::OK;
}

Expand All @@ -69,56 +72,6 @@ size_t SscWriter::CurrentStep() const

void SscWriter::PerformPuts() { TAU_SCOPED_TIMER_FUNC(); }

void SscWriter::PutOneSidedPostPush()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_start(m_MpiAllReadersGroup, 0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first,
i.second.first, m_Buffer.size(), MPI_CHAR, m_MpiWin);
}
m_NeedWait = true;
}

void SscWriter::PutOneSidedFencePush()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first,
i.second.first, m_Buffer.size(), MPI_CHAR, m_MpiWin);
}
m_NeedWait = true;
}

void SscWriter::PutOneSidedPostPull()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_post(m_MpiAllReadersGroup, 0, m_MpiWin);
m_NeedWait = true;
}

void SscWriter::PutOneSidedFencePull()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_fence(0, m_MpiWin);
m_NeedWait = true;
}

void SscWriter::PutTwoSided()
{
TAU_SCOPED_TIMER_FUNC();
for (const auto &i : m_AllSendingReaderRanks)
{
m_MpiRequests.emplace_back();
MPI_Isend(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first, 0,
m_StreamComm, &m_MpiRequests.back());
}
m_NeedWait = true;
}

void SscWriter::EndStep()
{
TAU_SCOPED_TIMER_FUNC();
Expand All @@ -143,23 +96,38 @@ void SscWriter::EndStep()
{
if (m_MpiMode == "twosided")
{
PutTwoSided();
for (const auto &i : m_AllSendingReaderRanks)
{
m_MpiRequests.emplace_back();
MPI_Isend(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first,
0, m_StreamComm, &m_MpiRequests.back());
}
}
else if (m_MpiMode == "onesidedfencepush")
{
PutOneSidedFencePush();
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first,
i.second.first, m_Buffer.size(), MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedpostpush")
{
PutOneSidedPostPush();
MPI_Win_start(m_MpiAllReadersGroup, 0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first,
i.second.first, m_Buffer.size(), MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedfencepull")
{
PutOneSidedFencePull();
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
PutOneSidedPostPull();
MPI_Win_post(m_MpiAllReadersGroup, 0, m_MpiWin);
}
}
}
Expand All @@ -170,31 +138,27 @@ void SscWriter::Flush(const int transportIndex) { TAU_SCOPED_TIMER_FUNC(); }

void SscWriter::MpiWait()
{
if (m_NeedWait)
if (m_MpiMode == "twosided")
{
if (m_MpiMode == "twosided")
{
MPI_Status statuses[m_MpiRequests.size()];
MPI_Waitall(m_MpiRequests.size(), m_MpiRequests.data(), statuses);
m_MpiRequests.clear();
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_complete(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_wait(m_MpiWin);
}
m_NeedWait = false;
MPI_Status statuses[m_MpiRequests.size()];
MPI_Waitall(m_MpiRequests.size(), m_MpiRequests.data(), statuses);
m_MpiRequests.clear();
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_complete(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_wait(m_MpiWin);
}
}

Expand Down Expand Up @@ -369,7 +333,10 @@ void SscWriter::DoClose(const int transportIndex)
{
TAU_SCOPED_TIMER_FUNC();

MpiWait();
if (m_CurrentStep > 0)
{
MpiWait();
}

m_Buffer[0] = 1;

Expand Down
6 changes: 0 additions & 6 deletions source/adios2/engine/ssc/SscWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class SscWriter : public Engine
MPI_Group m_MpiAllReadersGroup;
MPI_Comm m_StreamComm;
std::string m_MpiMode = "twosided";
bool m_NeedWait = false;
std::vector<MPI_Request> m_MpiRequests;

int m_StreamRank;
Expand All @@ -65,11 +64,6 @@ class SscWriter : public Engine
void SyncMpiPattern();
void SyncWritePattern();
void SyncReadPattern();
void PutOneSidedFencePush();
void PutOneSidedPostPush();
void PutOneSidedFencePull();
void PutOneSidedPostPull();
void PutTwoSided();
void MpiWait();

#define declare_type(T) \
Expand Down
6 changes: 6 additions & 0 deletions testing/adios2/engine/ssc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ if(ADIOS2_HAVE_MPI)
gtest_add_tests_helper(Base MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscBase.MPI "" TRUE)

gtest_add_tests_helper(OnlyOneStep MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscOnlyOneStep.MPI "" TRUE)

gtest_add_tests_helper(OnlyTwoSteps MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscOnlyTwoSteps.MPI "" TRUE)

gtest_add_tests_helper(OneSidedFencePush MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscOneSidedFencePush.MPI "" TRUE)

Expand Down
Loading