Skip to content

Commit

Permalink
Merge pull request #3243 from pnorbert/merge-bp5-flush-read-from-281
Browse files Browse the repository at this point in the history
Merge bp5 flush read from 281
  • Loading branch information
pnorbert authored Jun 7, 2022
2 parents c945141 + 464f013 commit 02fb4a5
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 25 deletions.
51 changes: 26 additions & 25 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ BP5Reader::ReadData(adios2::transportman::TransportMan &FileManager,
const size_t Timestep, const size_t StartOffset,
const size_t Length, char *Destination)
{
/*
* Warning: this function is called by multiple threads
*/
size_t FlushCount = m_MetadataIndexTable[Timestep][2];
size_t DataPosPos = m_MetadataIndexTable[Timestep][3];
size_t SubfileNum = static_cast<size_t>(
Expand All @@ -215,42 +218,40 @@ BP5Reader::ReadData(adios2::transportman::TransportMan &FileManager,
TP endSubfile = NOW();
double timeSubfile = DURATION(startSubfile, endSubfile);

/* Each block is in exactly one flush. The StartOffset was calculated
as if all the flushes were in a single contiguous block in file.
*/
TP startRead = NOW();
size_t InfoStartPos =
DataPosPos + (WriterRank * (2 * FlushCount + 1) * sizeof(uint64_t));
size_t ThisFlushInfo = InfoStartPos;
size_t RemainingLength = Length;
size_t ThisDataPos;
size_t Offset = StartOffset;
size_t SumDataSize = 0; // count in contiguous space
for (size_t flush = 0; flush < FlushCount; flush++)
{

ThisDataPos =
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, ThisFlushInfo,
size_t ThisDataPos =
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, InfoStartPos,
m_Minifooter.IsLittleEndian);
size_t ThisDataSize =
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, ThisFlushInfo,
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, InfoStartPos,
m_Minifooter.IsLittleEndian);
if (ThisDataSize > RemainingLength)
ThisDataSize = RemainingLength;
FileManager.ReadFile(Destination, ThisDataSize, ThisDataPos + Offset,
SubfileNum);
Destination += ThisDataSize;
RemainingLength -= ThisDataSize;
Offset = 0;
if (RemainingLength == 0)

if (StartOffset < SumDataSize + ThisDataSize)
{
break;
// discount offsets of skipped flushes
size_t Offset = StartOffset - SumDataSize;
FileManager.ReadFile(Destination, Length, ThisDataPos + Offset,
SubfileNum);
TP endRead = NOW();
double timeRead = DURATION(startRead, endRead);
return std::make_pair(timeSubfile, timeRead);
}
SumDataSize += ThisDataSize;
}
if (RemainingLength > 0)
{
ThisDataPos =
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, ThisFlushInfo,
m_Minifooter.IsLittleEndian);
FileManager.ReadFile(Destination, RemainingLength, ThisDataPos + Offset,
SubfileNum);
}

size_t ThisDataPos = helper::ReadValue<uint64_t>(
m_MetadataIndex.m_Buffer, InfoStartPos, m_Minifooter.IsLittleEndian);
size_t Offset = StartOffset - SumDataSize;
FileManager.ReadFile(Destination, Length, ThisDataPos + Offset, SubfileNum);

TP endRead = NOW();
double timeRead = DURATION(startRead, endRead);
return std::make_pair(timeSubfile, timeRead);
Expand Down
143 changes: 143 additions & 0 deletions testing/adios2/engine/bp/TestBPWriteReadMultiblock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2085,6 +2085,149 @@ TEST_F(BPWriteReadMultiblockTest, ADIOS2BPWriteReadMultiblock2D4x2)
}
}

//******************************************************************************
// Test flushing data within the step and that read works properly for all
// blocks across all flushes
//******************************************************************************

TEST_F(BPWriteReadMultiblockTest, MultiblockPerformDataWrite)
{
if (engineName != "BP5")
{
std::cout << "Engine " << engineName
<< " is not tested for this feature." << std::endl;
return;
}
// Each process would write a 1x8 array and all processes would
// form a mpiSize * Nx 1D array
const std::string fname("MultiblockPerformDataWrite.bp");

int mpiRank = 0, mpiSize = 1;
// Number of elements per blocks (blocksize)
const size_t Nx = 8;
// Number of blocks per process (= number of flushes)
const size_t Nblocks = 3;
// Number of steps
const size_t NSteps = 3;

#if ADIOS2_USE_MPI
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
MPI_Comm_size(MPI_COMM_WORLD, &mpiSize);
#endif

// Write test data using BP

#if ADIOS2_USE_MPI
adios2::ADIOS adios(MPI_COMM_WORLD);
#else
adios2::ADIOS adios;
#endif
/* Write output */
{
adios2::IO io = adios.DeclareIO("TestIO");
const adios2::Dims shape{static_cast<size_t>(mpiSize),
static_cast<size_t>(Nx * Nblocks)};
const adios2::Dims start{static_cast<size_t>(mpiRank), 0};
const adios2::Dims count{1, Nx};

auto var_i32 = io.DefineVariable<int32_t>("i32", shape, start, count);

if (!engineName.empty())
{
io.SetEngine(engineName);
}
else
{
// Create the BP Engine
io.SetEngine("BPFile");
}

adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write);

for (size_t step = 0; step < NSteps; ++step)
{
bpWriter.BeginStep();

for (size_t b = 0; b < Nblocks; ++b)
{
// Generate test data for each process / block uniquely
int t = static_cast<int>(step * Nblocks + b);
SmallTestData currentTestData =
generateNewSmallTestData(m_TestData, t, mpiRank, mpiSize);

const adios2::Box<adios2::Dims> sel({(size_t)mpiRank, b * Nx},
{1, Nx});
var_i32.SetSelection(sel);
bpWriter.Put(var_i32, currentTestData.I32.data());

bpWriter.PerformDataWrite();
}
bpWriter.EndStep();
}

// Close the file
bpWriter.Close();
}

/* Read back each step, each block and check the unique values.
Different blocks in each step are coming from different flushes.
*/
{
adios2::IO io = adios.DeclareIO("ReadIO");

if (!engineName.empty())
{
io.SetEngine(engineName);
}

adios2::Engine bpReader =
io.Open(fname, adios2::Mode::ReadRandomAccess);

auto var_i32 = io.InquireVariable<int32_t>("i32");
EXPECT_TRUE(var_i32);
EXPECT_EQ(var_i32.ShapeID(), adios2::ShapeID::GlobalArray);
EXPECT_EQ(var_i32.Steps(), NSteps);
EXPECT_EQ(var_i32.Shape()[0], mpiSize);
EXPECT_EQ(var_i32.Shape()[1], Nx * Nblocks);

SmallTestData testData;
std::array<int32_t, Nx> I32;

const auto i32AllInfo = bpReader.AllStepsBlocksInfo(var_i32);
EXPECT_EQ(i32AllInfo.size(), NSteps);

for (size_t step = 0; step < NSteps; step++)
{
var_i32.SetStepSelection({step, 1});
for (size_t b = 0; b < Nblocks; ++b)
{
std::cout << "Read step " << step << " block=" << b
<< std::endl;
// Generate test data for each process / block uniquely
int t = static_cast<int>(step * Nblocks + b);
SmallTestData currentTestData =
generateNewSmallTestData(m_TestData, t, mpiRank, mpiSize);

const adios2::Box<adios2::Dims> sel({(size_t)mpiRank, b * Nx},
{1, Nx});
var_i32.SetSelection(sel);
bpReader.Get(var_i32, I32.data(), adios2::Mode::Sync);

/* check content of a single block */
for (size_t i = 0; i < Nx; ++i)
{
std::stringstream ss;
ss << "step=" << step << " block=" << b << " i=" << i
<< " rank=" << mpiRank;
std::string msg = ss.str();
EXPECT_EQ(I32[i], currentTestData.I32[i]) << msg;
}
}
}
bpReader.Close();
}
}

//******************************************************************************
// main
//******************************************************************************
Expand Down

0 comments on commit 02fb4a5

Please sign in to comment.