From bf72d13092f26631666fbda7bfecb32301f7dce0 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Mon, 24 Feb 2020 16:13:28 -0500 Subject: [PATCH] Fix TransportMan TSAN problems --- source/adios2/engine/bp3/BP3Writer.cpp | 12 +----- source/adios2/engine/bp3/BP3Writer.h | 3 -- source/adios2/engine/bp4/BP4Writer.cpp | 16 +------- source/adios2/engine/bp4/BP4Writer.h | 3 -- .../toolkit/transportman/TransportMan.cpp | 38 +++++++++++++++---- .../toolkit/transportman/TransportMan.h | 14 +++++-- 6 files changed, 42 insertions(+), 44 deletions(-) diff --git a/source/adios2/engine/bp3/BP3Writer.cpp b/source/adios2/engine/bp3/BP3Writer.cpp index 99fe1eb196..82c52fd3da 100644 --- a/source/adios2/engine/bp3/BP3Writer.cpp +++ b/source/adios2/engine/bp3/BP3Writer.cpp @@ -188,7 +188,7 @@ void BP3Writer::InitTransports() { if (m_BP3Serializer.m_Parameters.AsyncTasks) { - m_FutureOpenFiles = m_FileDataManager.OpenFilesAsync( + m_FileDataManager.OpenFilesAsync( bpSubStreamNames, m_OpenMode, m_IO.m_TransportsParameters, m_BP3Serializer.m_Profiler.m_IsActive); } @@ -341,11 +341,6 @@ void BP3Writer::WriteData(const bool isFinal, const int transportIndex) m_BP3Serializer.CloseStream(m_IO); } - if (m_FutureOpenFiles.valid()) - { - m_FutureOpenFiles.get(); - } - m_FileDataManager.WriteFiles(m_BP3Serializer.m_Data.m_Buffer.data(), dataSize, transportIndex); @@ -374,11 +369,6 @@ void BP3Writer::AggregateWriteData(const bool isFinal, const int transportIndex) m_BP3Serializer.m_Aggregator.GetConsumerBuffer( m_BP3Serializer.m_Data); - if (m_FutureOpenFiles.valid()) - { - m_FutureOpenFiles.get(); - } - m_FileDataManager.WriteFiles(bufferSTL.Data(), bufferSTL.m_Position, transportIndex); diff --git a/source/adios2/engine/bp3/BP3Writer.h b/source/adios2/engine/bp3/BP3Writer.h index d098f7147c..976e207a2e 100644 --- a/source/adios2/engine/bp3/BP3Writer.h +++ b/source/adios2/engine/bp3/BP3Writer.h @@ -53,9 +53,6 @@ class BP3Writer : public core::Engine /** Manage BP data files Transports from IO AddTransport */ transportman::TransportMan m_FileDataManager; - /** future returned by m_FileDataManager at OpenFiles */ - std::future m_FutureOpenFiles; - /** Manages the optional collective metadata files */ transportman::TransportMan m_FileMetadataManager; diff --git a/source/adios2/engine/bp4/BP4Writer.cpp b/source/adios2/engine/bp4/BP4Writer.cpp index 5efdfd441d..c531c75f8d 100644 --- a/source/adios2/engine/bp4/BP4Writer.cpp +++ b/source/adios2/engine/bp4/BP4Writer.cpp @@ -190,7 +190,7 @@ void BP4Writer::InitTransports() { if (m_BP4Serializer.m_Parameters.AsyncTasks) { - m_FutureOpenFiles = m_FileDataManager.OpenFilesAsync( + m_FileDataManager.OpenFilesAsync( bpSubStreamNames, m_OpenMode, m_IO.m_TransportsParameters, m_BP4Serializer.m_Profiler.m_IsActive); } @@ -294,10 +294,6 @@ void BP4Writer::InitBPBuffer() if (m_BP4Serializer.m_Aggregator.m_IsConsumer) { - if (m_FutureOpenFiles.valid()) - { - m_FutureOpenFiles.get(); - } m_BP4Serializer.m_PreDataFileLength = m_FileDataManager.GetFileSize(0); } @@ -577,11 +573,6 @@ void BP4Writer::WriteData(const bool isFinal, const int transportIndex) dataSize = m_BP4Serializer.CloseStream(m_IO, false); } - if (m_FutureOpenFiles.valid()) - { - m_FutureOpenFiles.get(); - } - m_FileDataManager.WriteFiles(m_BP4Serializer.m_Data.m_Buffer.data(), dataSize, transportIndex); @@ -611,11 +602,6 @@ void BP4Writer::AggregateWriteData(const bool isFinal, const int transportIndex) m_BP4Serializer.m_Data); if (bufferSTL.m_Position > 0) { - if (m_FutureOpenFiles.valid()) - { - m_FutureOpenFiles.get(); - } - m_FileDataManager.WriteFiles( bufferSTL.Data(), bufferSTL.m_Position, transportIndex); diff --git a/source/adios2/engine/bp4/BP4Writer.h b/source/adios2/engine/bp4/BP4Writer.h index 99568380e4..713feab6b0 100644 --- a/source/adios2/engine/bp4/BP4Writer.h +++ b/source/adios2/engine/bp4/BP4Writer.h @@ -53,9 +53,6 @@ class BP4Writer : public core::Engine /** Manage BP data files Transports from IO AddTransport */ transportman::TransportMan m_FileDataManager; - /** future returned by m_FileDataManager at OpenFiles */ - std::future m_FutureOpenFiles; - /** Manages the optional collective metadata files */ transportman::TransportMan m_FileMetadataManager; diff --git a/source/adios2/toolkit/transportman/TransportMan.cpp b/source/adios2/toolkit/transportman/TransportMan.cpp index 91eb52032a..5ebd5e0ae3 100644 --- a/source/adios2/toolkit/transportman/TransportMan.cpp +++ b/source/adios2/toolkit/transportman/TransportMan.cpp @@ -75,6 +75,7 @@ void TransportMan::OpenFiles(const std::vector &fileNames, const std::vector ¶metersVector, const bool profile) { + WaitForAsync(); for (size_t i = 0; i < fileNames.size(); ++i) { const Params ¶meters = parametersVector[i]; @@ -89,10 +90,12 @@ void TransportMan::OpenFiles(const std::vector &fileNames, } } -std::future TransportMan::OpenFilesAsync( - const std::vector &fileNames, const Mode openMode, - const std::vector ¶metersVector, const bool profile) +void TransportMan::OpenFilesAsync(const std::vector &fileNames, + const Mode openMode, + const std::vector ¶metersVector, + const bool profile) { + WaitForAsync(); auto lf_OpenFiles = [&](const std::vector &fileNames, const Mode openMode, const std::vector ¶metersVector, const bool profile) { @@ -111,14 +114,16 @@ std::future TransportMan::OpenFilesAsync( } }; - return std::async(std::launch::async, lf_OpenFiles, std::move(fileNames), - openMode, std::cref(parametersVector), profile); + m_FutureOpenFiles = + std::async(std::launch::async, lf_OpenFiles, std::move(fileNames), + openMode, std::cref(parametersVector), profile); } void TransportMan::OpenFileID(const std::string &name, const size_t id, const Mode mode, const Params ¶meters, const bool profile) { + WaitForAsync(); std::shared_ptr file = OpenFileTransport(name, mode, parameters, profile); m_Transports.insert({id, file}); @@ -170,6 +175,7 @@ std::vector TransportMan::GetFilesBaseNames( std::vector TransportMan::GetTransportsTypes() noexcept { + WaitForAsync(); std::vector types; types.reserve(m_Transports.size()); @@ -184,6 +190,7 @@ std::vector TransportMan::GetTransportsTypes() noexcept std::vector TransportMan::GetTransportsProfilers() noexcept { + WaitForAsync(); std::vector profilers; profilers.reserve(m_Transports.size()); @@ -198,6 +205,7 @@ TransportMan::GetTransportsProfilers() noexcept void TransportMan::WriteFiles(const char *buffer, const size_t size, const int transportIndex) { + WaitForAsync(); if (transportIndex == -1) { for (auto &transportPair : m_Transports) @@ -222,7 +230,7 @@ void TransportMan::WriteFiles(const char *buffer, const size_t size, void TransportMan::WriteFileAt(const char *buffer, const size_t size, const size_t start, const int transportIndex) { - + WaitForAsync(); auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to WriteFileAt with index " + std::to_string(transportIndex)); @@ -231,7 +239,7 @@ void TransportMan::WriteFileAt(const char *buffer, const size_t size, void TransportMan::SeekToFileEnd(const int transportIndex) { - + WaitForAsync(); auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to SeekToFileEnd with index " + std::to_string(transportIndex)); @@ -240,7 +248,7 @@ void TransportMan::SeekToFileEnd(const int transportIndex) void TransportMan::SeekToFileBegin(const int transportIndex) { - + WaitForAsync(); auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to SeekToFileBegin with index " + std::to_string(transportIndex)); @@ -249,6 +257,7 @@ void TransportMan::SeekToFileBegin(const int transportIndex) size_t TransportMan::GetFileSize(const size_t transportIndex) const { + WaitForAsync(); auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to GetFileSize with index " + std::to_string(transportIndex)); @@ -258,6 +267,7 @@ size_t TransportMan::GetFileSize(const size_t transportIndex) const void TransportMan::ReadFile(char *buffer, const size_t size, const size_t start, const size_t transportIndex) { + WaitForAsync(); auto itTransport = m_Transports.find(transportIndex); CheckFile(itTransport, ", in call to ReadFile with index " + std::to_string(transportIndex)); @@ -266,6 +276,7 @@ void TransportMan::ReadFile(char *buffer, const size_t size, const size_t start, void TransportMan::FlushFiles(const int transportIndex) { + WaitForAsync(); if (transportIndex == -1) { for (auto &transportPair : m_Transports) @@ -289,6 +300,7 @@ void TransportMan::FlushFiles(const int transportIndex) void TransportMan::CloseFiles(const int transportIndex) { + WaitForAsync(); if (transportIndex == -1) { for (auto &transportPair : m_Transports) @@ -313,6 +325,7 @@ void TransportMan::CloseFiles(const int transportIndex) bool TransportMan::AllTransportsClosed() const noexcept { bool allClose = true; + WaitForAsync(); for (const auto &transportPair : m_Transports) { const auto &transport = transportPair.second; @@ -421,5 +434,14 @@ void TransportMan::CheckFile( } } +void TransportMan::WaitForAsync() const +{ + // Ensure any prior OpenFilesAsync is complete + if (m_FutureOpenFiles.valid()) + { + m_FutureOpenFiles.get(); + } +} + } // end namespace transport } // end namespace adios2 diff --git a/source/adios2/toolkit/transportman/TransportMan.h b/source/adios2/toolkit/transportman/TransportMan.h index cb10a8b006..e9e9f66303 100644 --- a/source/adios2/toolkit/transportman/TransportMan.h +++ b/source/adios2/toolkit/transportman/TransportMan.h @@ -77,11 +77,14 @@ class TransportMan * @param openMode * @param parametersVector * @param profile - * @return + * + * Opens happen asynchronously, but any future call waits for their + * completion */ - std::future OpenFilesAsync( - const std::vector &fileNames, const Mode openMode, - const std::vector ¶metersVector, const bool profile); + void OpenFilesAsync(const std::vector &fileNames, + const Mode openMode, + const std::vector ¶metersVector, + const bool profile); /** * Used for sub-files defined by index @@ -175,6 +178,7 @@ class TransportMan protected: helper::Comm const &m_Comm; const bool m_DebugMode = false; + mutable std::future m_FutureOpenFiles; std::shared_ptr OpenFileTransport(const std::string &fileName, const Mode openMode, @@ -185,6 +189,8 @@ class TransportMan std::unordered_map>::const_iterator itTransport, const std::string hint) const; + + void WaitForAsync() const; }; } // end namespace transport