Skip to content

Commit

Permalink
Merge pull request #2410 from pnorbert/step-by-step-read
Browse files Browse the repository at this point in the history
Process metadata really step-by-step in BP4 when StreamReader is on. …
  • Loading branch information
pnorbert authored Aug 4, 2020
2 parents bce597a + b0f5752 commit d96750b
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 42 deletions.
92 changes: 60 additions & 32 deletions source/adios2/engine/bp4/BP4Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ void BP4Reader::InitBuffer(const TimePoint &timeoutInstant,
const Seconds &pollSeconds,
const Seconds &timeoutSeconds)
{
std::vector<size_t> sizes(2, 0);
size_t newIdxSize = 0;
// Put all metadata in buffer
if (m_BP4Deserializer.m_RankMPI == 0)
{
Expand Down Expand Up @@ -448,10 +448,9 @@ void BP4Reader::InitBuffer(const TimePoint &timeoutInstant,
m_MDFileManager.ReadFile(
m_BP4Deserializer.m_Metadata.m_Buffer.data(),
expectedMinFileSize);
m_MDIndexFileProcessedSize = metadataIndexFileSize;

sizes[0] = metadataIndexFileSize;
sizes[1] = m_MDFileProcessedSize;
m_MDFileAlreadyReadSize = expectedMinFileSize;
m_MDIndexFileAlreadyReadSize = metadataIndexFileSize;
newIdxSize = metadataIndexFileSize;
}
else
{
Expand All @@ -471,8 +470,7 @@ void BP4Reader::InitBuffer(const TimePoint &timeoutInstant,
}
}

m_Comm.BroadcastVector(sizes, 0);
size_t newIdxSize = sizes[0];
newIdxSize = m_Comm.BroadcastValue(newIdxSize, 0);

if (newIdxSize > 0)
{
Expand All @@ -484,7 +482,7 @@ void BP4Reader::InitBuffer(const TimePoint &timeoutInstant,

/* Parse metadata index table */
m_BP4Deserializer.ParseMetadataIndex(m_BP4Deserializer.m_MetadataIndex,
0, true);
0, true, false);
// now we are sure the index header has been parsed, first step parsing
// done
m_IdxHeaderParsed = true;
Expand All @@ -509,23 +507,24 @@ void BP4Reader::InitBuffer(const TimePoint &timeoutInstant,
size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant,
const Seconds &pollSeconds)
{
std::vector<size_t> sizes(2, 0);
std::vector<size_t> sizes(3, 0);
if (m_BP4Deserializer.m_RankMPI == 0)
{
const size_t idxFileSize = m_MDIndexFileManager.GetFileSize(0);
if (idxFileSize > m_MDIndexFileProcessedSize)
if (idxFileSize > m_MDIndexFileAlreadyReadSize)
{
const size_t maxIdxSize = idxFileSize - m_MDIndexFileProcessedSize;
const size_t maxIdxSize =
idxFileSize - m_MDIndexFileAlreadyReadSize;
std::vector<char> idxbuf(maxIdxSize);
m_MDIndexFileManager.ReadFile(idxbuf.data(), maxIdxSize,
m_MDIndexFileProcessedSize);
m_MDIndexFileAlreadyReadSize);
size_t newIdxSize;
size_t expectedMinFileSize;
char *buf = idxbuf.data();

MetadataCalculateMinFileSize(
m_BP4Deserializer, m_Name, buf, maxIdxSize, !m_IdxHeaderParsed,
m_MDFileProcessedSize, newIdxSize, expectedMinFileSize);
m_MDFileAlreadyReadSize, newIdxSize, expectedMinFileSize);

// const uint64_t expectedMinFileSize = MetadataExpectedMinFileSize(
// m_BP4Deserializer, m_Name, !m_IdxHeaderParsed);
Expand All @@ -536,7 +535,7 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant,
newIdxSize, "re-allocating metadata index buffer, in "
"call to BP4Reader::BeginStep/UpdateBuffer");
}
m_BP4Deserializer.m_MetadataIndex.m_Position = 0;
m_BP4Deserializer.m_MetadataIndex.Reset(true, false);
std::copy(idxbuf.begin(), idxbuf.begin() + newIdxSize,
m_BP4Deserializer.m_MetadataIndex.m_Buffer.begin());

Expand Down Expand Up @@ -564,20 +563,26 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant,
*/
const size_t fileSize = m_MDFileManager.GetFileSize(0);
const size_t newMDSize =
expectedMinFileSize - m_MDFileProcessedSize;
expectedMinFileSize - m_MDFileAlreadyReadSize;
if (m_BP4Deserializer.m_Metadata.m_Buffer.size() < newMDSize)
{
m_BP4Deserializer.m_Metadata.Resize(
newMDSize, "allocating metadata buffer, in call to "
"BP4Reader Open");
}
m_BP4Deserializer.m_Metadata.m_Position = 0;
m_BP4Deserializer.m_Metadata.Reset(true, false);
m_MDFileManager.ReadFile(
m_BP4Deserializer.m_Metadata.m_Buffer.data(), newMDSize,
m_MDFileProcessedSize);
m_MDFileAlreadyReadSize);

m_MDFileAbsolutePos = m_MDFileAlreadyReadSize;
m_MDFileAlreadyReadSize = expectedMinFileSize;

m_MDIndexFileAlreadyReadSize = idxFileSize;

sizes[0] = newIdxSize;
sizes[1] = m_MDFileProcessedSize;
sizes[1] = m_MDFileAlreadyReadSize;
sizes[2] = m_MDFileAbsolutePos;
}
}
}
Expand All @@ -587,18 +592,21 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant,

if (newIdxSize > 0)
{
// broadcast buffer to all ranks from zero
m_Comm.BroadcastVector(m_BP4Deserializer.m_Metadata.m_Buffer);

// broadcast metadata index buffer to all ranks from zero
m_Comm.BroadcastVector(m_BP4Deserializer.m_MetadataIndex.m_Buffer);

if (m_BP4Deserializer.m_RankMPI != 0)
{
m_MDFileProcessedSize = sizes[1];
m_MDFileAlreadyReadSize = sizes[1];
m_MDFileAbsolutePos = sizes[2];
m_BP4Deserializer.m_MetadataIndex.Reset(true, false);
m_BP4Deserializer.m_Metadata.Reset(true, false);
// we need this pointer in Metadata buffer on all processes
// for parsing it correctly in ProcessMetadataForNewSteps()
}

// broadcast buffer to all ranks from zero
m_Comm.BroadcastVector(m_BP4Deserializer.m_Metadata.m_Buffer);

// broadcast metadata index buffer to all ranks from zero
m_Comm.BroadcastVector(m_BP4Deserializer.m_MetadataIndex.m_Buffer);
}
return newIdxSize;
}
Expand All @@ -613,20 +621,20 @@ void BP4Reader::ProcessMetadataForNewSteps(const size_t newIdxSize)
size of the already-processed metadata because the memory buffer of
new metadata starts from 0 */
m_BP4Deserializer.ParseMetadataIndex(m_BP4Deserializer.m_MetadataIndex,
m_MDFileProcessedSize,
!m_IdxHeaderParsed);
m_MDFileAbsolutePos,
!m_IdxHeaderParsed, true);
m_IdxHeaderParsed = true;

// fills IO with Variables and Attributes
const size_t newProcessedMDSize = m_BP4Deserializer.ParseMetadata(
m_BP4Deserializer.m_Metadata, *this, false);

// remember current end position in metadata and index table for next round
m_MDFileProcessedSize += newProcessedMDSize;
if (m_BP4Deserializer.m_RankMPI == 0)
{
m_MDIndexFileProcessedSize += newIdxSize;
}
m_MDFileProcessedSize = m_MDFileAbsolutePos + newProcessedMDSize;
// if (m_BP4Deserializer.m_RankMPI == 0)
//{
// m_MDIndexFileAlreadyReadSize += newIdxSize;
//}
}

bool BP4Reader::CheckWriterActive()
Expand All @@ -645,12 +653,32 @@ bool BP4Reader::CheckWriterActive()
return m_WriterIsActive;
}

bool BP4Reader::ProcessNextStepInMemory()
{
if (m_MDFileAlreadyReadSize > m_MDFileProcessedSize)
{
// Hack: processing metadata for multiple new steps only works
// when pretending not to be in streaming mode
const bool saveReadStreaming = m_IO.m_ReadStreaming;
m_IO.m_ReadStreaming = false;
ProcessMetadataForNewSteps(0);
m_IO.m_ReadStreaming = saveReadStreaming;
return true;
}
return false;
}

StepStatus BP4Reader::CheckForNewSteps(Seconds timeoutSeconds)
{
/* Do a collective wait for a step within timeout.
Make sure every reader comes to the same conclusion */
StepStatus retval = StepStatus::OK;

if (ProcessNextStepInMemory())
{
return retval;
}

if (timeoutSeconds < Seconds::zero())
{
timeoutSeconds = Seconds(999999999); // max 1 billion seconds wait
Expand Down
22 changes: 21 additions & 1 deletion source/adios2/engine/bp4/BP4Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,26 @@ class BP4Reader : public Engine
format::BP4Deserializer m_BP4Deserializer;
/* transport manager for metadata file */
transportman::TransportMan m_MDFileManager;
/* How many bytes of metadata have we already read in? */
size_t m_MDFileAlreadyReadSize = 0;
/* How many bytes of metadata have we already processed?
* It is <= m_MDFileAlreadyReadSize, at = we need to read more */
size_t m_MDFileProcessedSize = 0;
/* The file position of the first byte that is currently
* residing in memory. Needed for skewing positions when
* processing metadata index.
*/
size_t m_MDFileAbsolutePos = 0;
/* m_MDFileAbsolutePos <= m_MDFileProcessedSize <= m_MDFileAlreadyReadSize
*/

/* transport manager for managing data file(s) */
transportman::TransportMan m_DataFileManager;

/* transport manager for managing the metadata index file */
transportman::TransportMan m_MDIndexFileManager;
size_t m_MDIndexFileProcessedSize = 0;
/* How many bytes of metadata index have we already read in? */
size_t m_MDIndexFileAlreadyReadSize = 0;

/* transport manager for managing the active flag file */
transportman::TransportMan m_ActiveFlagFileManager;
Expand Down Expand Up @@ -124,6 +136,14 @@ class BP4Reader : public Engine
*/
bool CheckWriterActive();

/** Check for a step that is already in memory but haven't
* been processed yet.
* @return true: if new step has been found and processed, false otherwise
* Used by CheckForNewSteps() to get the next step from memory if there is
* one.
*/
bool ProcessNextStepInMemory();

/** Check for new steps withing timeout and only if writer is active.
* @return the status flag
* Used by BeginStep() to get new steps from file when it reaches the
Expand Down
12 changes: 6 additions & 6 deletions source/adios2/toolkit/format/bp/bp4/BP4Deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ size_t BP4Deserializer::ParseMetadata(const BufferSTL &bufferSTL,
return lastposition;
}

void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL,
void BP4Deserializer::ParseMetadataIndex(BufferSTL &bufferSTL,
const size_t absoluteStartPos,
const bool hasHeader)
const bool hasHeader,
const bool oneStepOnly)
{
const auto &buffer = bufferSTL.m_Buffer;
const size_t bufferSize = buffer.size();
size_t position = 0;
size_t &position = bufferSTL.m_Position;

if (hasHeader)
{
Expand Down Expand Up @@ -113,7 +113,7 @@ void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL,
}

// Read each record now
while (position < bufferSize)
do
{
std::vector<uint64_t> ptrs;
const uint64_t currentStep = helper::ReadValue<uint64_t>(
Expand All @@ -137,7 +137,7 @@ void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL,
ptrs.push_back(currentTimeStamp);
m_MetadataIndexTable[mpiRank][currentStep] = ptrs;
position += 8;
}
} while (!oneStepOnly && position < buffer.size());
}

const helper::BlockOperationInfo &BP4Deserializer::InitPostOperatorBlockData(
Expand Down
5 changes: 2 additions & 3 deletions source/adios2/toolkit/format/bp/bp4/BP4Deserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ class BP4Deserializer : virtual public BP4Base

~BP4Deserializer() = default;

void ParseMetadataIndex(const BufferSTL &bufferSTL,
const size_t absoluteStartPos,
const bool hasHeader);
void ParseMetadataIndex(BufferSTL &bufferSTL, const size_t absoluteStartPos,
const bool hasHeader, const bool oneStepOnly);

/* Return the position in the buffer where processing ends. The processing
* is controlled by the number of records in the Index, which may be less
Expand Down

0 comments on commit d96750b

Please sign in to comment.