Skip to content

Commit

Permalink
Merge pull request #1984 from eisenhauer/TransportManTrial
Browse files Browse the repository at this point in the history
TransportMan TSAN changes
  • Loading branch information
eisenhauer authored Feb 25, 2020
2 parents 5e19083 + bf72d13 commit e0c14ca
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 44 deletions.
12 changes: 1 addition & 11 deletions source/adios2/engine/bp3/BP3Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ void BP3Writer::InitTransports()
{
if (m_BP3Serializer.m_Parameters.AsyncTasks)
{
m_FutureOpenFiles = m_FileDataManager.OpenFilesAsync(
m_FileDataManager.OpenFilesAsync(
bpSubStreamNames, m_OpenMode, m_IO.m_TransportsParameters,
m_BP3Serializer.m_Profiler.m_IsActive);
}
Expand Down Expand Up @@ -341,11 +341,6 @@ void BP3Writer::WriteData(const bool isFinal, const int transportIndex)
m_BP3Serializer.CloseStream(m_IO);
}

if (m_FutureOpenFiles.valid())
{
m_FutureOpenFiles.get();
}

m_FileDataManager.WriteFiles(m_BP3Serializer.m_Data.m_Buffer.data(),
dataSize, transportIndex);

Expand Down Expand Up @@ -374,11 +369,6 @@ void BP3Writer::AggregateWriteData(const bool isFinal, const int transportIndex)
m_BP3Serializer.m_Aggregator.GetConsumerBuffer(
m_BP3Serializer.m_Data);

if (m_FutureOpenFiles.valid())
{
m_FutureOpenFiles.get();
}

m_FileDataManager.WriteFiles(bufferSTL.Data(), bufferSTL.m_Position,
transportIndex);

Expand Down
3 changes: 0 additions & 3 deletions source/adios2/engine/bp3/BP3Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ class BP3Writer : public core::Engine
/** Manage BP data files Transports from IO AddTransport */
transportman::TransportMan m_FileDataManager;

/** future returned by m_FileDataManager at OpenFiles */
std::future<void> m_FutureOpenFiles;

/** Manages the optional collective metadata files */
transportman::TransportMan m_FileMetadataManager;

Expand Down
16 changes: 1 addition & 15 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ void BP4Writer::InitTransports()
{
if (m_BP4Serializer.m_Parameters.AsyncTasks)
{
m_FutureOpenFiles = m_FileDataManager.OpenFilesAsync(
m_FileDataManager.OpenFilesAsync(
bpSubStreamNames, m_OpenMode, m_IO.m_TransportsParameters,
m_BP4Serializer.m_Profiler.m_IsActive);
}
Expand Down Expand Up @@ -294,10 +294,6 @@ void BP4Writer::InitBPBuffer()

if (m_BP4Serializer.m_Aggregator.m_IsConsumer)
{
if (m_FutureOpenFiles.valid())
{
m_FutureOpenFiles.get();
}
m_BP4Serializer.m_PreDataFileLength =
m_FileDataManager.GetFileSize(0);
}
Expand Down Expand Up @@ -577,11 +573,6 @@ void BP4Writer::WriteData(const bool isFinal, const int transportIndex)
dataSize = m_BP4Serializer.CloseStream(m_IO, false);
}

if (m_FutureOpenFiles.valid())
{
m_FutureOpenFiles.get();
}

m_FileDataManager.WriteFiles(m_BP4Serializer.m_Data.m_Buffer.data(),
dataSize, transportIndex);

Expand Down Expand Up @@ -611,11 +602,6 @@ void BP4Writer::AggregateWriteData(const bool isFinal, const int transportIndex)
m_BP4Serializer.m_Data);
if (bufferSTL.m_Position > 0)
{
if (m_FutureOpenFiles.valid())
{
m_FutureOpenFiles.get();
}

m_FileDataManager.WriteFiles(
bufferSTL.Data(), bufferSTL.m_Position, transportIndex);

Expand Down
3 changes: 0 additions & 3 deletions source/adios2/engine/bp4/BP4Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ class BP4Writer : public core::Engine
/** Manage BP data files Transports from IO AddTransport */
transportman::TransportMan m_FileDataManager;

/** future returned by m_FileDataManager at OpenFiles */
std::future<void> m_FutureOpenFiles;

/** Manages the optional collective metadata files */
transportman::TransportMan m_FileMetadataManager;

Expand Down
38 changes: 30 additions & 8 deletions source/adios2/toolkit/transportman/TransportMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ void TransportMan::OpenFiles(const std::vector<std::string> &fileNames,
const std::vector<Params> &parametersVector,
const bool profile)
{
WaitForAsync();
for (size_t i = 0; i < fileNames.size(); ++i)
{
const Params &parameters = parametersVector[i];
Expand All @@ -89,10 +90,12 @@ void TransportMan::OpenFiles(const std::vector<std::string> &fileNames,
}
}

std::future<void> TransportMan::OpenFilesAsync(
const std::vector<std::string> &fileNames, const Mode openMode,
const std::vector<Params> &parametersVector, const bool profile)
void TransportMan::OpenFilesAsync(const std::vector<std::string> &fileNames,
const Mode openMode,
const std::vector<Params> &parametersVector,
const bool profile)
{
WaitForAsync();
auto lf_OpenFiles =
[&](const std::vector<std::string> &fileNames, const Mode openMode,
const std::vector<Params> &parametersVector, const bool profile) {
Expand All @@ -111,14 +114,16 @@ std::future<void> TransportMan::OpenFilesAsync(
}
};

return std::async(std::launch::async, lf_OpenFiles, std::move(fileNames),
openMode, std::cref(parametersVector), profile);
m_FutureOpenFiles =
std::async(std::launch::async, lf_OpenFiles, std::move(fileNames),
openMode, std::cref(parametersVector), profile);
}

void TransportMan::OpenFileID(const std::string &name, const size_t id,
const Mode mode, const Params &parameters,
const bool profile)
{
WaitForAsync();
std::shared_ptr<Transport> file =
OpenFileTransport(name, mode, parameters, profile);
m_Transports.insert({id, file});
Expand Down Expand Up @@ -170,6 +175,7 @@ std::vector<std::string> TransportMan::GetFilesBaseNames(

std::vector<std::string> TransportMan::GetTransportsTypes() noexcept
{
WaitForAsync();
std::vector<std::string> types;
types.reserve(m_Transports.size());

Expand All @@ -184,6 +190,7 @@ std::vector<std::string> TransportMan::GetTransportsTypes() noexcept
std::vector<profiling::IOChrono *>
TransportMan::GetTransportsProfilers() noexcept
{
WaitForAsync();
std::vector<profiling::IOChrono *> profilers;
profilers.reserve(m_Transports.size());

Expand All @@ -198,6 +205,7 @@ TransportMan::GetTransportsProfilers() noexcept
void TransportMan::WriteFiles(const char *buffer, const size_t size,
const int transportIndex)
{
WaitForAsync();
if (transportIndex == -1)
{
for (auto &transportPair : m_Transports)
Expand All @@ -222,7 +230,7 @@ void TransportMan::WriteFiles(const char *buffer, const size_t size,
void TransportMan::WriteFileAt(const char *buffer, const size_t size,
const size_t start, const int transportIndex)
{

WaitForAsync();
auto itTransport = m_Transports.find(transportIndex);
CheckFile(itTransport, ", in call to WriteFileAt with index " +
std::to_string(transportIndex));
Expand All @@ -231,7 +239,7 @@ void TransportMan::WriteFileAt(const char *buffer, const size_t size,

void TransportMan::SeekToFileEnd(const int transportIndex)
{

WaitForAsync();
auto itTransport = m_Transports.find(transportIndex);
CheckFile(itTransport, ", in call to SeekToFileEnd with index " +
std::to_string(transportIndex));
Expand All @@ -240,7 +248,7 @@ void TransportMan::SeekToFileEnd(const int transportIndex)

void TransportMan::SeekToFileBegin(const int transportIndex)
{

WaitForAsync();
auto itTransport = m_Transports.find(transportIndex);
CheckFile(itTransport, ", in call to SeekToFileBegin with index " +
std::to_string(transportIndex));
Expand All @@ -249,6 +257,7 @@ void TransportMan::SeekToFileBegin(const int transportIndex)

size_t TransportMan::GetFileSize(const size_t transportIndex) const
{
WaitForAsync();
auto itTransport = m_Transports.find(transportIndex);
CheckFile(itTransport, ", in call to GetFileSize with index " +
std::to_string(transportIndex));
Expand All @@ -258,6 +267,7 @@ size_t TransportMan::GetFileSize(const size_t transportIndex) const
void TransportMan::ReadFile(char *buffer, const size_t size, const size_t start,
const size_t transportIndex)
{
WaitForAsync();
auto itTransport = m_Transports.find(transportIndex);
CheckFile(itTransport, ", in call to ReadFile with index " +
std::to_string(transportIndex));
Expand All @@ -266,6 +276,7 @@ void TransportMan::ReadFile(char *buffer, const size_t size, const size_t start,

void TransportMan::FlushFiles(const int transportIndex)
{
WaitForAsync();
if (transportIndex == -1)
{
for (auto &transportPair : m_Transports)
Expand All @@ -289,6 +300,7 @@ void TransportMan::FlushFiles(const int transportIndex)

void TransportMan::CloseFiles(const int transportIndex)
{
WaitForAsync();
if (transportIndex == -1)
{
for (auto &transportPair : m_Transports)
Expand All @@ -313,6 +325,7 @@ void TransportMan::CloseFiles(const int transportIndex)
bool TransportMan::AllTransportsClosed() const noexcept
{
bool allClose = true;
WaitForAsync();
for (const auto &transportPair : m_Transports)
{
const auto &transport = transportPair.second;
Expand Down Expand Up @@ -421,5 +434,14 @@ void TransportMan::CheckFile(
}
}

void TransportMan::WaitForAsync() const
{
// Ensure any prior OpenFilesAsync is complete
if (m_FutureOpenFiles.valid())
{
m_FutureOpenFiles.get();
}
}

} // end namespace transport
} // end namespace adios2
14 changes: 10 additions & 4 deletions source/adios2/toolkit/transportman/TransportMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,14 @@ class TransportMan
* @param openMode
* @param parametersVector
* @param profile
* @return
*
* Opens happen asynchronously, but any future call waits for their
* completion
*/
std::future<void> OpenFilesAsync(
const std::vector<std::string> &fileNames, const Mode openMode,
const std::vector<Params> &parametersVector, const bool profile);
void OpenFilesAsync(const std::vector<std::string> &fileNames,
const Mode openMode,
const std::vector<Params> &parametersVector,
const bool profile);

/**
* Used for sub-files defined by index
Expand Down Expand Up @@ -175,6 +178,7 @@ class TransportMan
protected:
helper::Comm const &m_Comm;
const bool m_DebugMode = false;
mutable std::future<void> m_FutureOpenFiles;

std::shared_ptr<Transport> OpenFileTransport(const std::string &fileName,
const Mode openMode,
Expand All @@ -185,6 +189,8 @@ class TransportMan
std::unordered_map<size_t, std::shared_ptr<Transport>>::const_iterator
itTransport,
const std::string hint) const;

void WaitForAsync() const;
};

} // end namespace transport
Expand Down

0 comments on commit e0c14ca

Please sign in to comment.