From b0f5752b0e67c2b7533973ec2d07cc97f78286ee Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 4 Aug 2020 12:54:55 -0400 Subject: [PATCH] Process metadata really step-by-step in BP4 when StreamReader is on. So far, it was reading in metadata in chunks but it processed the whole chunk (multiple steps) at once. For true semantics, we want Variables and Attributes show up in their proper step and since the IO map holds all entries of all processed steps, the current processing can show variables in later steps too early. Now the chunks are kept in memory but only processed step by step. --- source/adios2/engine/bp4/BP4Reader.cpp | 92 ++++++++++++------- source/adios2/engine/bp4/BP4Reader.h | 22 ++++- .../toolkit/format/bp/bp4/BP4Deserializer.cpp | 12 +-- .../toolkit/format/bp/bp4/BP4Deserializer.h | 5 +- 4 files changed, 89 insertions(+), 42 deletions(-) diff --git a/source/adios2/engine/bp4/BP4Reader.cpp b/source/adios2/engine/bp4/BP4Reader.cpp index 0a8cf74fdf..c24bb03105 100644 --- a/source/adios2/engine/bp4/BP4Reader.cpp +++ b/source/adios2/engine/bp4/BP4Reader.cpp @@ -409,7 +409,7 @@ void BP4Reader::InitBuffer(const TimePoint &timeoutInstant, const Seconds &pollSeconds, const Seconds &timeoutSeconds) { - std::vector sizes(2, 0); + size_t newIdxSize = 0; // Put all metadata in buffer if (m_BP4Deserializer.m_RankMPI == 0) { @@ -448,10 +448,9 @@ void BP4Reader::InitBuffer(const TimePoint &timeoutInstant, m_MDFileManager.ReadFile( m_BP4Deserializer.m_Metadata.m_Buffer.data(), expectedMinFileSize); - m_MDIndexFileProcessedSize = metadataIndexFileSize; - - sizes[0] = metadataIndexFileSize; - sizes[1] = m_MDFileProcessedSize; + m_MDFileAlreadyReadSize = expectedMinFileSize; + m_MDIndexFileAlreadyReadSize = metadataIndexFileSize; + newIdxSize = metadataIndexFileSize; } else { @@ -471,8 +470,7 @@ void BP4Reader::InitBuffer(const TimePoint &timeoutInstant, } } - m_Comm.BroadcastVector(sizes, 0); - size_t newIdxSize = sizes[0]; + newIdxSize = m_Comm.BroadcastValue(newIdxSize, 0); if (newIdxSize > 0) { @@ -484,7 +482,7 @@ void BP4Reader::InitBuffer(const TimePoint &timeoutInstant, /* Parse metadata index table */ m_BP4Deserializer.ParseMetadataIndex(m_BP4Deserializer.m_MetadataIndex, - 0, true); + 0, true, false); // now we are sure the index header has been parsed, first step parsing // done m_IdxHeaderParsed = true; @@ -509,23 +507,24 @@ void BP4Reader::InitBuffer(const TimePoint &timeoutInstant, size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant, const Seconds &pollSeconds) { - std::vector sizes(2, 0); + std::vector sizes(3, 0); if (m_BP4Deserializer.m_RankMPI == 0) { const size_t idxFileSize = m_MDIndexFileManager.GetFileSize(0); - if (idxFileSize > m_MDIndexFileProcessedSize) + if (idxFileSize > m_MDIndexFileAlreadyReadSize) { - const size_t maxIdxSize = idxFileSize - m_MDIndexFileProcessedSize; + const size_t maxIdxSize = + idxFileSize - m_MDIndexFileAlreadyReadSize; std::vector idxbuf(maxIdxSize); m_MDIndexFileManager.ReadFile(idxbuf.data(), maxIdxSize, - m_MDIndexFileProcessedSize); + m_MDIndexFileAlreadyReadSize); size_t newIdxSize; size_t expectedMinFileSize; char *buf = idxbuf.data(); MetadataCalculateMinFileSize( m_BP4Deserializer, m_Name, buf, maxIdxSize, !m_IdxHeaderParsed, - m_MDFileProcessedSize, newIdxSize, expectedMinFileSize); + m_MDFileAlreadyReadSize, newIdxSize, expectedMinFileSize); // const uint64_t expectedMinFileSize = MetadataExpectedMinFileSize( // m_BP4Deserializer, m_Name, !m_IdxHeaderParsed); @@ -536,7 +535,7 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant, newIdxSize, "re-allocating metadata index buffer, in " "call to BP4Reader::BeginStep/UpdateBuffer"); } - m_BP4Deserializer.m_MetadataIndex.m_Position = 0; + m_BP4Deserializer.m_MetadataIndex.Reset(true, false); std::copy(idxbuf.begin(), idxbuf.begin() + newIdxSize, m_BP4Deserializer.m_MetadataIndex.m_Buffer.begin()); @@ -564,20 +563,26 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant, */ const size_t fileSize = m_MDFileManager.GetFileSize(0); const size_t newMDSize = - expectedMinFileSize - m_MDFileProcessedSize; + expectedMinFileSize - m_MDFileAlreadyReadSize; if (m_BP4Deserializer.m_Metadata.m_Buffer.size() < newMDSize) { m_BP4Deserializer.m_Metadata.Resize( newMDSize, "allocating metadata buffer, in call to " "BP4Reader Open"); } - m_BP4Deserializer.m_Metadata.m_Position = 0; + m_BP4Deserializer.m_Metadata.Reset(true, false); m_MDFileManager.ReadFile( m_BP4Deserializer.m_Metadata.m_Buffer.data(), newMDSize, - m_MDFileProcessedSize); + m_MDFileAlreadyReadSize); + + m_MDFileAbsolutePos = m_MDFileAlreadyReadSize; + m_MDFileAlreadyReadSize = expectedMinFileSize; + + m_MDIndexFileAlreadyReadSize = idxFileSize; sizes[0] = newIdxSize; - sizes[1] = m_MDFileProcessedSize; + sizes[1] = m_MDFileAlreadyReadSize; + sizes[2] = m_MDFileAbsolutePos; } } } @@ -587,18 +592,21 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant, if (newIdxSize > 0) { - // broadcast buffer to all ranks from zero - m_Comm.BroadcastVector(m_BP4Deserializer.m_Metadata.m_Buffer); - - // broadcast metadata index buffer to all ranks from zero - m_Comm.BroadcastVector(m_BP4Deserializer.m_MetadataIndex.m_Buffer); - if (m_BP4Deserializer.m_RankMPI != 0) { - m_MDFileProcessedSize = sizes[1]; + m_MDFileAlreadyReadSize = sizes[1]; + m_MDFileAbsolutePos = sizes[2]; + m_BP4Deserializer.m_MetadataIndex.Reset(true, false); + m_BP4Deserializer.m_Metadata.Reset(true, false); // we need this pointer in Metadata buffer on all processes // for parsing it correctly in ProcessMetadataForNewSteps() } + + // broadcast buffer to all ranks from zero + m_Comm.BroadcastVector(m_BP4Deserializer.m_Metadata.m_Buffer); + + // broadcast metadata index buffer to all ranks from zero + m_Comm.BroadcastVector(m_BP4Deserializer.m_MetadataIndex.m_Buffer); } return newIdxSize; } @@ -613,8 +621,8 @@ void BP4Reader::ProcessMetadataForNewSteps(const size_t newIdxSize) size of the already-processed metadata because the memory buffer of new metadata starts from 0 */ m_BP4Deserializer.ParseMetadataIndex(m_BP4Deserializer.m_MetadataIndex, - m_MDFileProcessedSize, - !m_IdxHeaderParsed); + m_MDFileAbsolutePos, + !m_IdxHeaderParsed, true); m_IdxHeaderParsed = true; // fills IO with Variables and Attributes @@ -622,11 +630,11 @@ void BP4Reader::ProcessMetadataForNewSteps(const size_t newIdxSize) m_BP4Deserializer.m_Metadata, *this, false); // remember current end position in metadata and index table for next round - m_MDFileProcessedSize += newProcessedMDSize; - if (m_BP4Deserializer.m_RankMPI == 0) - { - m_MDIndexFileProcessedSize += newIdxSize; - } + m_MDFileProcessedSize = m_MDFileAbsolutePos + newProcessedMDSize; + // if (m_BP4Deserializer.m_RankMPI == 0) + //{ + // m_MDIndexFileAlreadyReadSize += newIdxSize; + //} } bool BP4Reader::CheckWriterActive() @@ -645,12 +653,32 @@ bool BP4Reader::CheckWriterActive() return m_WriterIsActive; } +bool BP4Reader::ProcessNextStepInMemory() +{ + if (m_MDFileAlreadyReadSize > m_MDFileProcessedSize) + { + // Hack: processing metadata for multiple new steps only works + // when pretending not to be in streaming mode + const bool saveReadStreaming = m_IO.m_ReadStreaming; + m_IO.m_ReadStreaming = false; + ProcessMetadataForNewSteps(0); + m_IO.m_ReadStreaming = saveReadStreaming; + return true; + } + return false; +} + StepStatus BP4Reader::CheckForNewSteps(Seconds timeoutSeconds) { /* Do a collective wait for a step within timeout. Make sure every reader comes to the same conclusion */ StepStatus retval = StepStatus::OK; + if (ProcessNextStepInMemory()) + { + return retval; + } + if (timeoutSeconds < Seconds::zero()) { timeoutSeconds = Seconds(999999999); // max 1 billion seconds wait diff --git a/source/adios2/engine/bp4/BP4Reader.h b/source/adios2/engine/bp4/BP4Reader.h index 4e2d430226..d5d546c837 100644 --- a/source/adios2/engine/bp4/BP4Reader.h +++ b/source/adios2/engine/bp4/BP4Reader.h @@ -61,14 +61,26 @@ class BP4Reader : public Engine format::BP4Deserializer m_BP4Deserializer; /* transport manager for metadata file */ transportman::TransportMan m_MDFileManager; + /* How many bytes of metadata have we already read in? */ + size_t m_MDFileAlreadyReadSize = 0; + /* How many bytes of metadata have we already processed? + * It is <= m_MDFileAlreadyReadSize, at = we need to read more */ size_t m_MDFileProcessedSize = 0; + /* The file position of the first byte that is currently + * residing in memory. Needed for skewing positions when + * processing metadata index. + */ + size_t m_MDFileAbsolutePos = 0; + /* m_MDFileAbsolutePos <= m_MDFileProcessedSize <= m_MDFileAlreadyReadSize + */ /* transport manager for managing data file(s) */ transportman::TransportMan m_DataFileManager; /* transport manager for managing the metadata index file */ transportman::TransportMan m_MDIndexFileManager; - size_t m_MDIndexFileProcessedSize = 0; + /* How many bytes of metadata index have we already read in? */ + size_t m_MDIndexFileAlreadyReadSize = 0; /* transport manager for managing the active flag file */ transportman::TransportMan m_ActiveFlagFileManager; @@ -124,6 +136,14 @@ class BP4Reader : public Engine */ bool CheckWriterActive(); + /** Check for a step that is already in memory but haven't + * been processed yet. + * @return true: if new step has been found and processed, false otherwise + * Used by CheckForNewSteps() to get the next step from memory if there is + * one. + */ + bool ProcessNextStepInMemory(); + /** Check for new steps withing timeout and only if writer is active. * @return the status flag * Used by BeginStep() to get new steps from file when it reaches the diff --git a/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.cpp b/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.cpp index f6afb96a7b..e9673a91f2 100644 --- a/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.cpp +++ b/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.cpp @@ -58,13 +58,13 @@ size_t BP4Deserializer::ParseMetadata(const BufferSTL &bufferSTL, return lastposition; } -void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL, +void BP4Deserializer::ParseMetadataIndex(BufferSTL &bufferSTL, const size_t absoluteStartPos, - const bool hasHeader) + const bool hasHeader, + const bool oneStepOnly) { const auto &buffer = bufferSTL.m_Buffer; - const size_t bufferSize = buffer.size(); - size_t position = 0; + size_t &position = bufferSTL.m_Position; if (hasHeader) { @@ -113,7 +113,7 @@ void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL, } // Read each record now - while (position < bufferSize) + do { std::vector ptrs; const uint64_t currentStep = helper::ReadValue( @@ -137,7 +137,7 @@ void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL, ptrs.push_back(currentTimeStamp); m_MetadataIndexTable[mpiRank][currentStep] = ptrs; position += 8; - } + } while (!oneStepOnly && position < buffer.size()); } const helper::BlockOperationInfo &BP4Deserializer::InitPostOperatorBlockData( diff --git a/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.h b/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.h index 0c36e12bbc..149e03c694 100644 --- a/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.h +++ b/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.h @@ -46,9 +46,8 @@ class BP4Deserializer : virtual public BP4Base ~BP4Deserializer() = default; - void ParseMetadataIndex(const BufferSTL &bufferSTL, - const size_t absoluteStartPos, - const bool hasHeader); + void ParseMetadataIndex(BufferSTL &bufferSTL, const size_t absoluteStartPos, + const bool hasHeader, const bool oneStepOnly); /* Return the position in the buffer where processing ends. The processing * is controlled by the number of records in the Index, which may be less