Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge bp5 flush read from 281 #3243

Merged
merged 2 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 26 additions & 25 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ BP5Reader::ReadData(adios2::transportman::TransportMan &FileManager,
const size_t Timestep, const size_t StartOffset,
const size_t Length, char *Destination)
{
/*
* Warning: this function is called by multiple threads
*/
size_t FlushCount = m_MetadataIndexTable[Timestep][2];
size_t DataPosPos = m_MetadataIndexTable[Timestep][3];
size_t SubfileNum = static_cast<size_t>(
Expand All @@ -215,42 +218,40 @@ BP5Reader::ReadData(adios2::transportman::TransportMan &FileManager,
TP endSubfile = NOW();
double timeSubfile = DURATION(startSubfile, endSubfile);

/* Each block is in exactly one flush. The StartOffset was calculated
as if all the flushes were in a single contiguous block in file.
*/
TP startRead = NOW();
size_t InfoStartPos =
DataPosPos + (WriterRank * (2 * FlushCount + 1) * sizeof(uint64_t));
size_t ThisFlushInfo = InfoStartPos;
size_t RemainingLength = Length;
size_t ThisDataPos;
size_t Offset = StartOffset;
size_t SumDataSize = 0; // count in contiguous space
for (size_t flush = 0; flush < FlushCount; flush++)
{

ThisDataPos =
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, ThisFlushInfo,
size_t ThisDataPos =
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, InfoStartPos,
m_Minifooter.IsLittleEndian);
size_t ThisDataSize =
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, ThisFlushInfo,
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, InfoStartPos,
m_Minifooter.IsLittleEndian);
if (ThisDataSize > RemainingLength)
ThisDataSize = RemainingLength;
FileManager.ReadFile(Destination, ThisDataSize, ThisDataPos + Offset,
SubfileNum);
Destination += ThisDataSize;
RemainingLength -= ThisDataSize;
Offset = 0;
if (RemainingLength == 0)

if (StartOffset < SumDataSize + ThisDataSize)
{
break;
// discount offsets of skipped flushes
size_t Offset = StartOffset - SumDataSize;
FileManager.ReadFile(Destination, Length, ThisDataPos + Offset,
SubfileNum);
TP endRead = NOW();
double timeRead = DURATION(startRead, endRead);
return std::make_pair(timeSubfile, timeRead);
}
SumDataSize += ThisDataSize;
}
if (RemainingLength > 0)
{
ThisDataPos =
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, ThisFlushInfo,
m_Minifooter.IsLittleEndian);
FileManager.ReadFile(Destination, RemainingLength, ThisDataPos + Offset,
SubfileNum);
}

size_t ThisDataPos = helper::ReadValue<uint64_t>(
m_MetadataIndex.m_Buffer, InfoStartPos, m_Minifooter.IsLittleEndian);
size_t Offset = StartOffset - SumDataSize;
FileManager.ReadFile(Destination, Length, ThisDataPos + Offset, SubfileNum);

TP endRead = NOW();
double timeRead = DURATION(startRead, endRead);
return std::make_pair(timeSubfile, timeRead);
Expand Down
143 changes: 143 additions & 0 deletions testing/adios2/engine/bp/TestBPWriteReadMultiblock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2085,6 +2085,149 @@ TEST_F(BPWriteReadMultiblockTest, ADIOS2BPWriteReadMultiblock2D4x2)
}
}

//******************************************************************************
// Test flushing data within the step and that read works properly for all
// blocks across all flushes
//******************************************************************************

TEST_F(BPWriteReadMultiblockTest, MultiblockPerformDataWrite)
{
if (engineName != "BP5")
{
std::cout << "Engine " << engineName
<< " is not tested for this feature." << std::endl;
return;
}
// Each process would write a 1x8 array and all processes would
// form a mpiSize * Nx 1D array
const std::string fname("MultiblockPerformDataWrite.bp");

int mpiRank = 0, mpiSize = 1;
// Number of elements per blocks (blocksize)
const size_t Nx = 8;
// Number of blocks per process (= number of flushes)
const size_t Nblocks = 3;
// Number of steps
const size_t NSteps = 3;

#if ADIOS2_USE_MPI
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
MPI_Comm_size(MPI_COMM_WORLD, &mpiSize);
#endif

// Write test data using BP

#if ADIOS2_USE_MPI
adios2::ADIOS adios(MPI_COMM_WORLD);
#else
adios2::ADIOS adios;
#endif
/* Write output */
{
adios2::IO io = adios.DeclareIO("TestIO");
const adios2::Dims shape{static_cast<size_t>(mpiSize),
static_cast<size_t>(Nx * Nblocks)};
const adios2::Dims start{static_cast<size_t>(mpiRank), 0};
const adios2::Dims count{1, Nx};

auto var_i32 = io.DefineVariable<int32_t>("i32", shape, start, count);

if (!engineName.empty())
{
io.SetEngine(engineName);
}
else
{
// Create the BP Engine
io.SetEngine("BPFile");
}

adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write);

for (size_t step = 0; step < NSteps; ++step)
{
bpWriter.BeginStep();

for (size_t b = 0; b < Nblocks; ++b)
{
// Generate test data for each process / block uniquely
int t = static_cast<int>(step * Nblocks + b);
SmallTestData currentTestData =
generateNewSmallTestData(m_TestData, t, mpiRank, mpiSize);

const adios2::Box<adios2::Dims> sel({(size_t)mpiRank, b * Nx},
{1, Nx});
var_i32.SetSelection(sel);
bpWriter.Put(var_i32, currentTestData.I32.data());

bpWriter.PerformDataWrite();
}
bpWriter.EndStep();
}

// Close the file
bpWriter.Close();
}

/* Read back each step, each block and check the unique values.
Different blocks in each step are coming from different flushes.
*/
{
adios2::IO io = adios.DeclareIO("ReadIO");

if (!engineName.empty())
{
io.SetEngine(engineName);
}

adios2::Engine bpReader =
io.Open(fname, adios2::Mode::ReadRandomAccess);

auto var_i32 = io.InquireVariable<int32_t>("i32");
EXPECT_TRUE(var_i32);
EXPECT_EQ(var_i32.ShapeID(), adios2::ShapeID::GlobalArray);
EXPECT_EQ(var_i32.Steps(), NSteps);
EXPECT_EQ(var_i32.Shape()[0], mpiSize);
EXPECT_EQ(var_i32.Shape()[1], Nx * Nblocks);

SmallTestData testData;
std::array<int32_t, Nx> I32;

const auto i32AllInfo = bpReader.AllStepsBlocksInfo(var_i32);
EXPECT_EQ(i32AllInfo.size(), NSteps);

for (size_t step = 0; step < NSteps; step++)
{
var_i32.SetStepSelection({step, 1});
for (size_t b = 0; b < Nblocks; ++b)
{
std::cout << "Read step " << step << " block=" << b
<< std::endl;
// Generate test data for each process / block uniquely
int t = static_cast<int>(step * Nblocks + b);
SmallTestData currentTestData =
generateNewSmallTestData(m_TestData, t, mpiRank, mpiSize);

const adios2::Box<adios2::Dims> sel({(size_t)mpiRank, b * Nx},
{1, Nx});
var_i32.SetSelection(sel);
bpReader.Get(var_i32, I32.data(), adios2::Mode::Sync);

/* check content of a single block */
for (size_t i = 0; i < Nx; ++i)
{
std::stringstream ss;
ss << "step=" << step << " block=" << b << " i=" << i
<< " rank=" << mpiRank;
std::string msg = ss.str();
EXPECT_EQ(I32[i], currentTestData.I32[i]) << msg;
}
}
}
bpReader.Close();
}
}

//******************************************************************************
// main
//******************************************************************************
Expand Down