Skip to content

Commit

Permalink
Merge pull request #2685 from JasonRuonanWang/ssc
Browse files Browse the repository at this point in the history
Removed one-sided MPI options for fixed IO pattern in SSC
  • Loading branch information
JasonRuonanWang authored May 4, 2021
2 parents 75520a9 + 7c2a50b commit af17f86
Show file tree
Hide file tree
Showing 10 changed files with 23 additions and 1,234 deletions.
5 changes: 1 addition & 4 deletions docs/user_guide/source/engines/ssc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@ The SSC engine takes the following parameters:

1. ``OpenTimeoutSecs``: Default **10**. Timeout in seconds for opening a stream. The SSC engine's open function will block until the RendezvousAppCount is reached, or timeout, whichever comes first. If it reaches the timeout, SSC will throw an exception.

2. ``MpiMode``: Default **TwoSided**. MPI communication modes to use. Besides the default TwoSided mode using two sided MPI communications, MPI_Isend and MPI_Irecv, for data transport, there are four one sided MPI modes: OneSidedFencePush, OneSidedPostPush, OneSidedFencePull, and OneSidedPostPull. Modes with **Push** are based on the push model and use MPI_Put for data transport, while modes with **Pull** are based on the pull model and use MPI_Get. Modes with **Fence** use MPI_Win_fence for synchronization, while modes with **Post** use MPI_Win_start, MPI_Win_complete, MPI_Win_post and MPI_Win_wait.

3. ``Threading``: Default **False**. SSC will use threads to hide the time cost for metadata manipulation and data transfer when this parameter is set to **true**. SSC will check if MPI is initialized with multi-thread enabled, and if not, then SSC will force this parameter to be **false**. Please do NOT enable threading when multiple I/O streams are opened in an application, as it will cause unpredictable errors.
2. ``Threading``: Default **False**. SSC will use threads to hide the time cost for metadata manipulation and data transfer when this parameter is set to **true**. SSC will check if MPI is initialized with multi-thread enabled, and if not, then SSC will force this parameter to be **false**. Please do NOT enable threading when multiple I/O streams are opened in an application, as it will cause unpredictable errors.

=============================== ================== ================================================
**Key** **Value Format** **Default** and Examples
=============================== ================== ================================================
OpenTimeoutSecs integer **10**, 2, 20, 200
MpiMode string **TwoSided**, OneSidedFencePush, OneSidedPostPush, OneSidedFencePull, OneSidedPostPull
Threading bool **false**, true
=============================== ================== ================================================

Expand Down
74 changes: 8 additions & 66 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,
{
TAU_SCOPED_TIMER_FUNC();

helper::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
helper::GetParameter(m_IO.m_Parameters, "Threading", m_Threading);
helper::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs",
Expand All @@ -40,28 +39,9 @@ SscReader::~SscReader() { TAU_SCOPED_TIMER_FUNC(); }

void SscReader::BeginStepConsequentFixed()
{
if (m_MpiMode == "twosided")
{
MPI_Waitall(static_cast<int>(m_MpiRequests.size()),
m_MpiRequests.data(), MPI_STATUS_IGNORE);
m_MpiRequests.clear();
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_wait(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_complete(m_MpiWin);
}
MPI_Waitall(static_cast<int>(m_MpiRequests.size()), m_MpiRequests.data(),
MPI_STATUS_IGNORE);
m_MpiRequests.clear();
}

void SscReader::BeginStepFlexible(StepStatus &status)
Expand Down Expand Up @@ -278,46 +258,13 @@ void SscReader::EndStepFixed()
{
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
}
if (m_MpiMode == "twosided")
for (const auto &i : m_AllReceivingWriterRanks)
{
for (const auto &i : m_AllReceivingWriterRanks)
{
m_MpiRequests.emplace_back();
MPI_Irecv(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
m_StreamComm, &m_MpiRequests.back());
}
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_post(m_WriterGroup, 0, m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Get(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
static_cast<int>(i.second.second), MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_start(m_WriterGroup, 0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Get(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
static_cast<int>(i.second.second), MPI_CHAR, m_MpiWin);
}
m_MpiRequests.emplace_back();
MPI_Irecv(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
m_StreamComm, &m_MpiRequests.back());
}
}

Expand Down Expand Up @@ -567,11 +514,6 @@ void SscReader::DoClose(const int transportIndex)
{
BeginStep();
}

if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
{
MPI_Win_free(&m_MpiWin);
}
}

} // end namespace engine
Expand Down
1 change: 0 additions & 1 deletion source/adios2/engine/ssc/SscReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ class SscReader : public Engine
int m_Verbosity = 0;
int m_OpenTimeoutSecs = 10;
bool m_Threading = false;
std::string m_MpiMode = "twosided";
};

} // end namespace engine
Expand Down
120 changes: 14 additions & 106 deletions source/adios2/engine/ssc/SscWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode,
{
TAU_SCOPED_TIMER_FUNC();

helper::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
helper::GetParameter(m_IO.m_Parameters, "Threading", m_Threading);
helper::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs",
Expand Down Expand Up @@ -108,53 +107,16 @@ void SscWriter::EndStepFirst()
m_StreamComm, &m_MpiWin);
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
{
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
}
}

void SscWriter::EndStepConsequentFixed()
{
TAU_SCOPED_TIMER_FUNC();
if (m_MpiMode == "twosided")
{
for (const auto &i : m_AllSendingReaderRanks)
{
m_MpiRequests.emplace_back();
MPI_Isend(m_Buffer.data(), static_cast<int>(m_Buffer.size()),
MPI_CHAR, i.first, 0, m_StreamComm,
&m_MpiRequests.back());
}
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), static_cast<int>(m_Buffer.size()),
MPI_CHAR, i.first, i.second.first,
static_cast<int>(m_Buffer.size()), MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_start(m_ReaderGroup, 0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), static_cast<int>(m_Buffer.size()),
MPI_CHAR, i.first, i.second.first,
static_cast<int>(m_Buffer.size()), MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedfencepull")
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_post(m_ReaderGroup, 0, m_MpiWin);
m_MpiRequests.emplace_back();
MPI_Isend(m_Buffer.data(), static_cast<int>(m_Buffer.size()), MPI_CHAR,
i.first, 0, m_StreamComm, &m_MpiRequests.back());
}
}

Expand Down Expand Up @@ -213,28 +175,9 @@ void SscWriter::Flush(const int transportIndex) { TAU_SCOPED_TIMER_FUNC(); }

void SscWriter::MpiWait()
{
if (m_MpiMode == "twosided")
{
MPI_Waitall(static_cast<int>(m_MpiRequests.size()),
m_MpiRequests.data(), MPI_STATUSES_IGNORE);
m_MpiRequests.clear();
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_complete(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_wait(m_MpiWin);
}
MPI_Waitall(static_cast<int>(m_MpiRequests.size()), m_MpiRequests.data(),
MPI_STATUSES_IGNORE);
m_MpiRequests.clear();
}

void SscWriter::SyncMpiPattern()
Expand Down Expand Up @@ -418,50 +361,15 @@ void SscWriter::DoClose(const int transportIndex)

m_Buffer[0] = 1;

if (m_MpiMode == "twosided")
{
std::vector<MPI_Request> requests;
for (const auto &i : m_AllSendingReaderRanks)
{
requests.emplace_back();
MPI_Isend(m_Buffer.data(), 1, MPI_CHAR, i.first, 0,
m_StreamComm, &requests.back());
}
MPI_Waitall(static_cast<int>(requests.size()), requests.data(),
MPI_STATUS_IGNORE);
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), 1, MPI_CHAR, i.first, 0, 1, MPI_CHAR,
m_MpiWin);
}
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_start(m_ReaderGroup, 0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), 1, MPI_CHAR, i.first, 0, 1, MPI_CHAR,
m_MpiWin);
}
MPI_Win_complete(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
std::vector<MPI_Request> requests;
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Win_post(m_ReaderGroup, 0, m_MpiWin);
MPI_Win_wait(m_MpiWin);
requests.emplace_back();
MPI_Isend(m_Buffer.data(), 1, MPI_CHAR, i.first, 0, m_StreamComm,
&requests.back());
}

MPI_Win_free(&m_MpiWin);
MPI_Waitall(static_cast<int>(requests.size()), requests.data(),
MPI_STATUS_IGNORE);
}
else
{
Expand Down
1 change: 0 additions & 1 deletion source/adios2/engine/ssc/SscWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ class SscWriter : public Engine
int m_Verbosity = 0;
int m_OpenTimeoutSecs = 10;
bool m_Threading = false;
std::string m_MpiMode = "twosided";
};

} // end namespace engine
Expand Down
12 changes: 0 additions & 12 deletions testing/adios2/engine/ssc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,6 @@ if(ADIOS2_HAVE_MPI)
gtest_add_tests_helper(OnlyTwoSteps MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscOnlyTwoSteps.MPI "" TRUE)

gtest_add_tests_helper(OneSidedFencePush MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscOneSidedFencePush.MPI "" TRUE)

gtest_add_tests_helper(OneSidedPostPush MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscOneSidedPostPush.MPI "" TRUE)

gtest_add_tests_helper(OneSidedFencePull MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscOneSidedFencePull.MPI "" TRUE)

gtest_add_tests_helper(OneSidedPostPull MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscOneSidedPostPull.MPI "" TRUE)

gtest_add_tests_helper(Unbalanced MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscUnbalanced.MPI "" TRUE)

Expand Down
Loading

0 comments on commit af17f86

Please sign in to comment.