From 5fdae54777c880b64b1e365af202f6c6f81dad6f Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Fri, 27 May 2022 18:12:20 -0400 Subject: [PATCH 1/2] Test writing data with multiple PerformDataWrite and read back. This breaks and points out the issue with BP5Reader::ReadData() in 2.8.0 release. --- .../engine/bp/TestBPWriteReadMultiblock.cpp | 143 ++++++++++++++++++ 1 file changed, 143 insertions(+) diff --git a/testing/adios2/engine/bp/TestBPWriteReadMultiblock.cpp b/testing/adios2/engine/bp/TestBPWriteReadMultiblock.cpp index f04e472d62..bf92f732ac 100644 --- a/testing/adios2/engine/bp/TestBPWriteReadMultiblock.cpp +++ b/testing/adios2/engine/bp/TestBPWriteReadMultiblock.cpp @@ -2085,6 +2085,149 @@ TEST_F(BPWriteReadMultiblockTest, ADIOS2BPWriteReadMultiblock2D4x2) } } +//****************************************************************************** +// Test flushing data within the step and that read works properly for all +// blocks across all flushes +//****************************************************************************** + +TEST_F(BPWriteReadMultiblockTest, MultiblockPerformDataWrite) +{ + if (engineName != "BP5") + { + std::cout << "Engine " << engineName + << " is not tested for this feature." << std::endl; + return; + } + // Each process would write a 1x8 array and all processes would + // form a mpiSize * Nx 1D array + const std::string fname("MultiblockPerformDataWrite.bp"); + + int mpiRank = 0, mpiSize = 1; + // Number of elements per blocks (blocksize) + const size_t Nx = 8; + // Number of blocks per process (= number of flushes) + const size_t Nblocks = 3; + // Number of steps + const size_t NSteps = 3; + +#if ADIOS2_USE_MPI + MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank); + MPI_Comm_size(MPI_COMM_WORLD, &mpiSize); +#endif + + // Write test data using BP + +#if ADIOS2_USE_MPI + adios2::ADIOS adios(MPI_COMM_WORLD); +#else + adios2::ADIOS adios; +#endif + /* Write output */ + { + adios2::IO io = adios.DeclareIO("TestIO"); + const adios2::Dims shape{static_cast(mpiSize), + static_cast(Nx * Nblocks)}; + const adios2::Dims start{static_cast(mpiRank), 0}; + const adios2::Dims count{1, Nx}; + + auto var_i32 = io.DefineVariable("i32", shape, start, count); + + if (!engineName.empty()) + { + io.SetEngine(engineName); + } + else + { + // Create the BP Engine + io.SetEngine("BPFile"); + } + + adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write); + + for (size_t step = 0; step < NSteps; ++step) + { + bpWriter.BeginStep(); + + for (size_t b = 0; b < Nblocks; ++b) + { + // Generate test data for each process / block uniquely + int t = static_cast(step * Nblocks + b); + SmallTestData currentTestData = + generateNewSmallTestData(m_TestData, t, mpiRank, mpiSize); + + const adios2::Box sel({(size_t)mpiRank, b * Nx}, + {1, Nx}); + var_i32.SetSelection(sel); + bpWriter.Put(var_i32, currentTestData.I32.data()); + + bpWriter.PerformDataWrite(); + } + bpWriter.EndStep(); + } + + // Close the file + bpWriter.Close(); + } + + /* Read back each step, each block and check the unique values. + Different blocks in each step are coming from different flushes. + */ + { + adios2::IO io = adios.DeclareIO("ReadIO"); + + if (!engineName.empty()) + { + io.SetEngine(engineName); + } + + adios2::Engine bpReader = + io.Open(fname, adios2::Mode::ReadRandomAccess); + + auto var_i32 = io.InquireVariable("i32"); + EXPECT_TRUE(var_i32); + EXPECT_EQ(var_i32.ShapeID(), adios2::ShapeID::GlobalArray); + EXPECT_EQ(var_i32.Steps(), NSteps); + EXPECT_EQ(var_i32.Shape()[0], mpiSize); + EXPECT_EQ(var_i32.Shape()[1], Nx * Nblocks); + + SmallTestData testData; + std::array I32; + + const auto i32AllInfo = bpReader.AllStepsBlocksInfo(var_i32); + EXPECT_EQ(i32AllInfo.size(), NSteps); + + for (size_t step = 0; step < NSteps; step++) + { + var_i32.SetStepSelection({step, 1}); + for (size_t b = 0; b < Nblocks; ++b) + { + std::cout << "Read step " << step << " block=" << b + << std::endl; + // Generate test data for each process / block uniquely + int t = static_cast(step * Nblocks + b); + SmallTestData currentTestData = + generateNewSmallTestData(m_TestData, t, mpiRank, mpiSize); + + const adios2::Box sel({(size_t)mpiRank, b * Nx}, + {1, Nx}); + var_i32.SetSelection(sel); + bpReader.Get(var_i32, I32.data(), adios2::Mode::Sync); + + /* check content of a single block */ + for (size_t i = 0; i < Nx; ++i) + { + std::stringstream ss; + ss << "step=" << step << " block=" << b << " i=" << i + << " rank=" << mpiRank; + std::string msg = ss.str(); + EXPECT_EQ(I32[i], currentTestData.I32[i]) << msg; + } + } + } + bpReader.Close(); + } +} + //****************************************************************************** // main //****************************************************************************** From 464f013b09dcf0342200f79a9146a519239f5fb6 Mon Sep 17 00:00:00 2001 From: Podhorszki Norbert Date: Mon, 6 Jun 2022 09:15:08 -0400 Subject: [PATCH 2/2] Fix the flush/read problem in the master branch, based on how it was fixed in 2.8.1. Could not cherry-pick 95c38c39ac from 2.8.1, so made this fix manually. --- source/adios2/engine/bp5/BP5Reader.cpp | 51 +++++++++++++------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/source/adios2/engine/bp5/BP5Reader.cpp b/source/adios2/engine/bp5/BP5Reader.cpp index 52bdf1704a..240948c3a0 100644 --- a/source/adios2/engine/bp5/BP5Reader.cpp +++ b/source/adios2/engine/bp5/BP5Reader.cpp @@ -193,6 +193,9 @@ BP5Reader::ReadData(adios2::transportman::TransportMan &FileManager, const size_t Timestep, const size_t StartOffset, const size_t Length, char *Destination) { + /* + * Warning: this function is called by multiple threads + */ size_t FlushCount = m_MetadataIndexTable[Timestep][2]; size_t DataPosPos = m_MetadataIndexTable[Timestep][3]; size_t SubfileNum = static_cast( @@ -215,42 +218,40 @@ BP5Reader::ReadData(adios2::transportman::TransportMan &FileManager, TP endSubfile = NOW(); double timeSubfile = DURATION(startSubfile, endSubfile); + /* Each block is in exactly one flush. The StartOffset was calculated + as if all the flushes were in a single contiguous block in file. + */ TP startRead = NOW(); size_t InfoStartPos = DataPosPos + (WriterRank * (2 * FlushCount + 1) * sizeof(uint64_t)); - size_t ThisFlushInfo = InfoStartPos; - size_t RemainingLength = Length; - size_t ThisDataPos; - size_t Offset = StartOffset; + size_t SumDataSize = 0; // count in contiguous space for (size_t flush = 0; flush < FlushCount; flush++) { - - ThisDataPos = - helper::ReadValue(m_MetadataIndex.m_Buffer, ThisFlushInfo, + size_t ThisDataPos = + helper::ReadValue(m_MetadataIndex.m_Buffer, InfoStartPos, m_Minifooter.IsLittleEndian); size_t ThisDataSize = - helper::ReadValue(m_MetadataIndex.m_Buffer, ThisFlushInfo, + helper::ReadValue(m_MetadataIndex.m_Buffer, InfoStartPos, m_Minifooter.IsLittleEndian); - if (ThisDataSize > RemainingLength) - ThisDataSize = RemainingLength; - FileManager.ReadFile(Destination, ThisDataSize, ThisDataPos + Offset, - SubfileNum); - Destination += ThisDataSize; - RemainingLength -= ThisDataSize; - Offset = 0; - if (RemainingLength == 0) + + if (StartOffset < SumDataSize + ThisDataSize) { - break; + // discount offsets of skipped flushes + size_t Offset = StartOffset - SumDataSize; + FileManager.ReadFile(Destination, Length, ThisDataPos + Offset, + SubfileNum); + TP endRead = NOW(); + double timeRead = DURATION(startRead, endRead); + return std::make_pair(timeSubfile, timeRead); } + SumDataSize += ThisDataSize; } - if (RemainingLength > 0) - { - ThisDataPos = - helper::ReadValue(m_MetadataIndex.m_Buffer, ThisFlushInfo, - m_Minifooter.IsLittleEndian); - FileManager.ReadFile(Destination, RemainingLength, ThisDataPos + Offset, - SubfileNum); - } + + size_t ThisDataPos = helper::ReadValue( + m_MetadataIndex.m_Buffer, InfoStartPos, m_Minifooter.IsLittleEndian); + size_t Offset = StartOffset - SumDataSize; + FileManager.ReadFile(Destination, Length, ThisDataPos + Offset, SubfileNum); + TP endRead = NOW(); double timeRead = DURATION(startRead, endRead); return std::make_pair(timeSubfile, timeRead);