From 27b728b873298a135334ba4972b93f31b30d9910 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Thu, 9 Jun 2022 08:06:59 -0400 Subject: [PATCH] Guard calls to core::Decompress() with a mutex since Operators are not guaranteed to be thread-safe. Fix Engine.BP.WriteAppendReadADIOS2 test where one variable was put twice, leading to simultaneous memcpy calls on the same target memory at reading. Note: The current multithreaded read in BP5 leads to concurrent memcpy calls under NdCopy(). If two blocks in the dataset refer to the same cell in space, there could be data race in two memcpy() calls to put the same cell's value from two incoming blocks. By definition, which value ends up in the read result, is undefined, so as long as the two memcpy calls will result in one of the values placed, it is all good. However, if concurrent memcpy calls result in random unrelated values placed in memory, then the read result would be invalid. --- source/adios2/engine/bp5/BP5Reader.cpp | 1 + source/adios2/engine/bp5/BP5Reader.h | 1 - source/adios2/helper/adiosSystem.cpp | 34 +------------------ .../toolkit/format/bp5/BP5Deserializer.cpp | 11 +++--- .../toolkit/format/bp5/BP5Deserializer.h | 6 ++++ .../engine/bp/TestBPWriteAppendReadADIOS2.cpp | 2 -- 6 files changed, 15 insertions(+), 40 deletions(-) diff --git a/source/adios2/engine/bp5/BP5Reader.cpp b/source/adios2/engine/bp5/BP5Reader.cpp index 2d553933aa..a44f868c59 100644 --- a/source/adios2/engine/bp5/BP5Reader.cpp +++ b/source/adios2/engine/bp5/BP5Reader.cpp @@ -14,6 +14,7 @@ #include #include +#include #include using TP = std::chrono::high_resolution_clock::time_point; diff --git a/source/adios2/engine/bp5/BP5Reader.h b/source/adios2/engine/bp5/BP5Reader.h index 995188079a..d635b3899d 100644 --- a/source/adios2/engine/bp5/BP5Reader.h +++ b/source/adios2/engine/bp5/BP5Reader.h @@ -22,7 +22,6 @@ #include #include -#include #include namespace adios2 diff --git a/source/adios2/helper/adiosSystem.cpp b/source/adios2/helper/adiosSystem.cpp index 94b60ce640..3629130c80 100644 --- a/source/adios2/helper/adiosSystem.cpp +++ b/source/adios2/helper/adiosSystem.cpp @@ -184,39 +184,7 @@ char BPVersion(const std::string &name, helper::Comm &comm, unsigned int NumHardwareThreadsPerNode() { - unsigned int processor_count = std::thread::hardware_concurrency(); - if (processor_count == 0) - { -#ifdef _WIN32 - SYSTEM_INFO sysinfo; - GetSystemInfo(&sysinfo); - processor_count = sysinfo.dwNumberOfProcessors; -#else -#ifdef APPLE - int nm[2]; - size_t len = 4; - uint32_t count; - - nm[0] = CTL_HW; - nm[1] = HW_AVAILCPU; - sysctl(nm, 2, &count, &len, NULL, 0); - - if (count < 1) - { - nm[1] = HW_NCPU; - sysctl(nm, 2, &count, &len, NULL, 0); - if (count < 1) - { - count = 1; - } - } - processor_count = count; -#else - processor_count = sysconf(_SC_NPROCESSORS_ONLN); -#endif -#endif - } - return processor_count; + return std::thread::hardware_concurrency(); } } // end namespace helper diff --git a/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp b/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp index 1411ac5cc2..5a0f4fc7d9 100644 --- a/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp +++ b/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp @@ -1288,10 +1288,13 @@ void BP5Deserializer::FinalizeGet(const ReadRequest &Read, const bool freeAddr) ->Count[dim + Read.BlockID * writer_meta_base->Dims]; } decompressBuffer.resize(DestSize); - core::Decompress(IncomingData, - ((MetaArrayRecOperator *)writer_meta_base) - ->DataLengths[Read.BlockID], - decompressBuffer.data()); + { + std::lock_guard lockGuard(mutexDecompress); + core::Decompress(IncomingData, + ((MetaArrayRecOperator *)writer_meta_base) + ->DataLengths[Read.BlockID], + decompressBuffer.data()); + } IncomingData = decompressBuffer.data(); VirtualIncomingData = IncomingData; } diff --git a/source/adios2/toolkit/format/bp5/BP5Deserializer.h b/source/adios2/toolkit/format/bp5/BP5Deserializer.h index d005d05706..528bdc13b6 100644 --- a/source/adios2/toolkit/format/bp5/BP5Deserializer.h +++ b/source/adios2/toolkit/format/bp5/BP5Deserializer.h @@ -18,6 +18,8 @@ #include "ffs.h" #include "fm.h" +#include + #ifdef _WIN32 #pragma warning(disable : 4250) #endif @@ -217,6 +219,10 @@ class BP5Deserializer : virtual public BP5Base void *GetMetadataBase(BP5VarRec *VarRec, size_t Step, size_t WriterRank) const; size_t CurTimestep = 0; + + /* We assume operators are not thread-safe, call Decompress() one at a time + */ + std::mutex mutexDecompress; }; } // end namespace format diff --git a/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp b/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp index 2d11e69047..fcf1dedde7 100644 --- a/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp +++ b/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp @@ -212,7 +212,6 @@ TEST_F(BPWriteAppendReadTestADIOS2, ADIOS2BPWriteAppendRead2D2x4) // fill in the variable with values from starting index to // starting index + count bpWriter.BeginStep(); - bpWriter.Put(var_i16, currentTestData.I16.data()); bpWriter.Put(var_iString, currentTestData.S1); bpWriter.Put(var_i8, currentTestData.I8.data()); bpWriter.Put(var_i16, currentTestData.I16.data()); @@ -365,7 +364,6 @@ TEST_F(BPWriteAppendReadTestADIOS2, ADIOS2BPWriteAppendRead2D2x4) // fill in the variable with values from starting index to // starting index + count bpAppender.BeginStep(); - bpAppender.Put(var_i16, currentTestData.I16.data()); bpAppender.Put(var_iString, currentTestData.S1); bpAppender.Put(var_i8, currentTestData.I8.data()); bpAppender.Put(var_i16, currentTestData.I16.data());