Skip to content

Commit

Permalink
Guard calls to core::Decompress() with a mutex since Operators are no…
Browse files Browse the repository at this point in the history
…t guaranteed to be thread-safe. Fix Engine.BP.WriteAppendReadADIOS2 test where one variable was put twice, leading to simultaneous memcpy calls on the same target memory at reading.

Note: The current multithreaded read in BP5 leads to concurrent memcpy calls under NdCopy(). If two blocks in the dataset refer to the same cell in space, there could be data race in two memcpy() calls to put the same cell's value from two incoming blocks. By definition, which value ends up in the read result, is undefined, so as long as the two memcpy calls will result in one of the values placed, it is all good. However, if concurrent memcpy calls result in random unrelated values placed in memory, then the read result would be invalid.
  • Loading branch information
pnorbert committed Jun 9, 2022
1 parent 2bd2414 commit 27b728b
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 40 deletions.
1 change: 1 addition & 0 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <chrono>
#include <errno.h>
#include <mutex>
#include <thread>

using TP = std::chrono::high_resolution_clock::time_point;
Expand Down
1 change: 0 additions & 1 deletion source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

#include <chrono>
#include <map>
#include <mutex>
#include <vector>

namespace adios2
Expand Down
34 changes: 1 addition & 33 deletions source/adios2/helper/adiosSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,39 +184,7 @@ char BPVersion(const std::string &name, helper::Comm &comm,

unsigned int NumHardwareThreadsPerNode()
{
unsigned int processor_count = std::thread::hardware_concurrency();
if (processor_count == 0)
{
#ifdef _WIN32
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
processor_count = sysinfo.dwNumberOfProcessors;
#else
#ifdef APPLE
int nm[2];
size_t len = 4;
uint32_t count;

nm[0] = CTL_HW;
nm[1] = HW_AVAILCPU;
sysctl(nm, 2, &count, &len, NULL, 0);

if (count < 1)
{
nm[1] = HW_NCPU;
sysctl(nm, 2, &count, &len, NULL, 0);
if (count < 1)
{
count = 1;
}
}
processor_count = count;
#else
processor_count = sysconf(_SC_NPROCESSORS_ONLN);
#endif
#endif
}
return processor_count;
return std::thread::hardware_concurrency();
}

} // end namespace helper
Expand Down
11 changes: 7 additions & 4 deletions source/adios2/toolkit/format/bp5/BP5Deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1288,10 +1288,13 @@ void BP5Deserializer::FinalizeGet(const ReadRequest &Read, const bool freeAddr)
->Count[dim + Read.BlockID * writer_meta_base->Dims];
}
decompressBuffer.resize(DestSize);
core::Decompress(IncomingData,
((MetaArrayRecOperator *)writer_meta_base)
->DataLengths[Read.BlockID],
decompressBuffer.data());
{
std::lock_guard<std::mutex> lockGuard(mutexDecompress);
core::Decompress(IncomingData,
((MetaArrayRecOperator *)writer_meta_base)
->DataLengths[Read.BlockID],
decompressBuffer.data());
}
IncomingData = decompressBuffer.data();
VirtualIncomingData = IncomingData;
}
Expand Down
6 changes: 6 additions & 0 deletions source/adios2/toolkit/format/bp5/BP5Deserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "ffs.h"
#include "fm.h"

#include <mutex>

#ifdef _WIN32
#pragma warning(disable : 4250)
#endif
Expand Down Expand Up @@ -217,6 +219,10 @@ class BP5Deserializer : virtual public BP5Base
void *GetMetadataBase(BP5VarRec *VarRec, size_t Step,
size_t WriterRank) const;
size_t CurTimestep = 0;

/* We assume operators are not thread-safe, call Decompress() one at a time
*/
std::mutex mutexDecompress;
};

} // end namespace format
Expand Down
2 changes: 0 additions & 2 deletions testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ TEST_F(BPWriteAppendReadTestADIOS2, ADIOS2BPWriteAppendRead2D2x4)
// fill in the variable with values from starting index to
// starting index + count
bpWriter.BeginStep();
bpWriter.Put(var_i16, currentTestData.I16.data());
bpWriter.Put(var_iString, currentTestData.S1);
bpWriter.Put(var_i8, currentTestData.I8.data());
bpWriter.Put(var_i16, currentTestData.I16.data());
Expand Down Expand Up @@ -365,7 +364,6 @@ TEST_F(BPWriteAppendReadTestADIOS2, ADIOS2BPWriteAppendRead2D2x4)
// fill in the variable with values from starting index to
// starting index + count
bpAppender.BeginStep();
bpAppender.Put(var_i16, currentTestData.I16.data());
bpAppender.Put(var_iString, currentTestData.S1);
bpAppender.Put(var_i8, currentTestData.I8.data());
bpAppender.Put(var_i16, currentTestData.I16.data());
Expand Down

0 comments on commit 27b728b

Please sign in to comment.