Skip to content

Commit

Permalink
Merge pull request #2691 from JasonRuonanWang/ssc
Browse files Browse the repository at this point in the history
Cleaning up SSC code
  • Loading branch information
JasonRuonanWang authored May 7, 2021
2 parents 4c5d74c + d14612c commit 550d6b1
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 36 deletions.
2 changes: 1 addition & 1 deletion docs/user_guide/source/engines/ssc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The SSC engine takes the following parameters:

1. ``OpenTimeoutSecs``: Default **10**. Timeout in seconds for opening a stream. The SSC engine's open function will block until the RendezvousAppCount is reached, or timeout, whichever comes first. If it reaches the timeout, SSC will throw an exception.

2. ``Threading``: Default **False**. SSC will use threads to hide the time cost for metadata manipulation and data transfer when this parameter is set to **true**. SSC will check if MPI is initialized with multi-thread enabled, and if not, then SSC will force this parameter to be **false**. Please do NOT enable threading when multiple I/O streams are opened in an application, as it will cause unpredictable errors.
2. ``Threading``: Default **False**. SSC will use threads to hide the time cost for metadata manipulation and data transfer when this parameter is set to **true**. SSC will check if MPI is initialized with multi-thread enabled, and if not, then SSC will force this parameter to be **false**. Please do NOT enable threading when multiple I/O streams are opened in an application, as it will cause unpredictable errors. This parameter is only effective when writer definitions and reader selections are NOT locked. For cases definitions and reader selections are locked, SSC has a more optimized way to do data transfers, and thus it will not use this parameter.

=============================== ================== ================================================
**Key** **Value Format** **Default** and Examples
Expand Down
22 changes: 8 additions & 14 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,19 +272,12 @@ void SscReader::EndStepFirstFlexible()
{
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
}

void SscReader::EndStepConsequentFlexible() { MPI_Win_free(&m_MpiWin); }

void SscReader::EndBeginStepFirstFlexible()
{
EndStepFirstFlexible();
BeginStepFlexible(m_StepStatus);
}

void SscReader::EndBeginStepConsequentFlexible()
void SscReader::EndStepConsequentFlexible()
{
EndStepConsequentFlexible();
MPI_Win_free(&m_MpiWin);
BeginStepFlexible(m_StepStatus);
}

Expand Down Expand Up @@ -312,23 +305,24 @@ void SscReader::EndStep()
if (m_Threading)
{
m_EndStepThread =
std::thread(&SscReader::EndBeginStepFirstFlexible, this);
std::thread(&SscReader::EndStepFirstFlexible, this);
}
else
{
EndStepFirstFlexible();
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
}
}
else
{
if (m_Threading)
{
m_EndStepThread = std::thread(
&SscReader::EndBeginStepConsequentFlexible, this);
m_EndStepThread =
std::thread(&SscReader::EndStepConsequentFlexible, this);
}
else
{
EndStepConsequentFlexible();
MPI_Win_free(&m_MpiWin);
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions source/adios2/engine/ssc/SscReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ class SscReader : public Engine
void EndStepFixed();
void EndStepFirstFlexible();
void EndStepConsequentFlexible();
void EndBeginStepFirstFlexible();
void EndBeginStepConsequentFlexible();

#define declare_type(T) \
void DoGetSync(Variable<T> &, T *) final; \
Expand Down
30 changes: 12 additions & 18 deletions source/adios2/engine/ssc/SscWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,14 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode,

int providedMpiMode;
MPI_Query_thread(&providedMpiMode);
if (providedMpiMode != MPI_THREAD_MULTIPLE)
if (m_Threading && providedMpiMode != MPI_THREAD_MULTIPLE)
{
if (m_Threading == true)
m_Threading = false;
if (m_WriterRank == 0 && m_Verbosity > 0)
{
m_Threading = false;
if (m_WriterRank == 0)
{
std::cout << "SSC Threading disabled as MPI is not initialized "
"with multi-threads"
<< std::endl;
}
std::cout << "SSC Threading disabled as MPI is not initialized "
"with multi-threads"
<< std::endl;
}
}

Expand Down Expand Up @@ -83,7 +80,9 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds)
{
if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
{
MpiWait();
MPI_Waitall(static_cast<int>(m_MpiRequests.size()),
m_MpiRequests.data(), MPI_STATUSES_IGNORE);
m_MpiRequests.clear();
}
else
{
Expand Down Expand Up @@ -173,13 +172,6 @@ void SscWriter::EndStep()

void SscWriter::Flush(const int transportIndex) { TAU_SCOPED_TIMER_FUNC(); }

void SscWriter::MpiWait()
{
MPI_Waitall(static_cast<int>(m_MpiRequests.size()), m_MpiRequests.data(),
MPI_STATUSES_IGNORE);
m_MpiRequests.clear();
}

void SscWriter::SyncMpiPattern()
{
TAU_SCOPED_TIMER_FUNC();
Expand Down Expand Up @@ -356,7 +348,9 @@ void SscWriter::DoClose(const int transportIndex)
{
if (m_CurrentStep > 0)
{
MpiWait();
MPI_Waitall(static_cast<int>(m_MpiRequests.size()),
m_MpiRequests.data(), MPI_STATUSES_IGNORE);
m_MpiRequests.clear();
}

m_Buffer[0] = 1;
Expand Down
1 change: 0 additions & 1 deletion source/adios2/engine/ssc/SscWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class SscWriter : public Engine
void SyncMpiPattern();
void SyncWritePattern(bool finalStep = false);
void SyncReadPattern();
void MpiWait();
void EndStepFirst();
void EndStepConsequentFixed();
void EndStepConsequentFlexible();
Expand Down

0 comments on commit 550d6b1

Please sign in to comment.