diff --git a/source/adios2/engine/bp4/BP4Reader.cpp b/source/adios2/engine/bp4/BP4Reader.cpp index 0a8cf74fdf..c24bb03105 100644 --- a/source/adios2/engine/bp4/BP4Reader.cpp +++ b/source/adios2/engine/bp4/BP4Reader.cpp @@ -409,7 +409,7 @@ void BP4Reader::InitBuffer(const TimePoint &timeoutInstant, const Seconds &pollSeconds, const Seconds &timeoutSeconds) { - std::vector sizes(2, 0); + size_t newIdxSize = 0; // Put all metadata in buffer if (m_BP4Deserializer.m_RankMPI == 0) { @@ -448,10 +448,9 @@ void BP4Reader::InitBuffer(const TimePoint &timeoutInstant, m_MDFileManager.ReadFile( m_BP4Deserializer.m_Metadata.m_Buffer.data(), expectedMinFileSize); - m_MDIndexFileProcessedSize = metadataIndexFileSize; - - sizes[0] = metadataIndexFileSize; - sizes[1] = m_MDFileProcessedSize; + m_MDFileAlreadyReadSize = expectedMinFileSize; + m_MDIndexFileAlreadyReadSize = metadataIndexFileSize; + newIdxSize = metadataIndexFileSize; } else { @@ -471,8 +470,7 @@ void BP4Reader::InitBuffer(const TimePoint &timeoutInstant, } } - m_Comm.BroadcastVector(sizes, 0); - size_t newIdxSize = sizes[0]; + newIdxSize = m_Comm.BroadcastValue(newIdxSize, 0); if (newIdxSize > 0) { @@ -484,7 +482,7 @@ void BP4Reader::InitBuffer(const TimePoint &timeoutInstant, /* Parse metadata index table */ m_BP4Deserializer.ParseMetadataIndex(m_BP4Deserializer.m_MetadataIndex, - 0, true); + 0, true, false); // now we are sure the index header has been parsed, first step parsing // done m_IdxHeaderParsed = true; @@ -509,23 +507,24 @@ void BP4Reader::InitBuffer(const TimePoint &timeoutInstant, size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant, const Seconds &pollSeconds) { - std::vector sizes(2, 0); + std::vector sizes(3, 0); if (m_BP4Deserializer.m_RankMPI == 0) { const size_t idxFileSize = m_MDIndexFileManager.GetFileSize(0); - if (idxFileSize > m_MDIndexFileProcessedSize) + if (idxFileSize > m_MDIndexFileAlreadyReadSize) { - const size_t maxIdxSize = idxFileSize - m_MDIndexFileProcessedSize; + const size_t maxIdxSize = + idxFileSize - m_MDIndexFileAlreadyReadSize; std::vector idxbuf(maxIdxSize); m_MDIndexFileManager.ReadFile(idxbuf.data(), maxIdxSize, - m_MDIndexFileProcessedSize); + m_MDIndexFileAlreadyReadSize); size_t newIdxSize; size_t expectedMinFileSize; char *buf = idxbuf.data(); MetadataCalculateMinFileSize( m_BP4Deserializer, m_Name, buf, maxIdxSize, !m_IdxHeaderParsed, - m_MDFileProcessedSize, newIdxSize, expectedMinFileSize); + m_MDFileAlreadyReadSize, newIdxSize, expectedMinFileSize); // const uint64_t expectedMinFileSize = MetadataExpectedMinFileSize( // m_BP4Deserializer, m_Name, !m_IdxHeaderParsed); @@ -536,7 +535,7 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant, newIdxSize, "re-allocating metadata index buffer, in " "call to BP4Reader::BeginStep/UpdateBuffer"); } - m_BP4Deserializer.m_MetadataIndex.m_Position = 0; + m_BP4Deserializer.m_MetadataIndex.Reset(true, false); std::copy(idxbuf.begin(), idxbuf.begin() + newIdxSize, m_BP4Deserializer.m_MetadataIndex.m_Buffer.begin()); @@ -564,20 +563,26 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant, */ const size_t fileSize = m_MDFileManager.GetFileSize(0); const size_t newMDSize = - expectedMinFileSize - m_MDFileProcessedSize; + expectedMinFileSize - m_MDFileAlreadyReadSize; if (m_BP4Deserializer.m_Metadata.m_Buffer.size() < newMDSize) { m_BP4Deserializer.m_Metadata.Resize( newMDSize, "allocating metadata buffer, in call to " "BP4Reader Open"); } - m_BP4Deserializer.m_Metadata.m_Position = 0; + m_BP4Deserializer.m_Metadata.Reset(true, false); m_MDFileManager.ReadFile( m_BP4Deserializer.m_Metadata.m_Buffer.data(), newMDSize, - m_MDFileProcessedSize); + m_MDFileAlreadyReadSize); + + m_MDFileAbsolutePos = m_MDFileAlreadyReadSize; + m_MDFileAlreadyReadSize = expectedMinFileSize; + + m_MDIndexFileAlreadyReadSize = idxFileSize; sizes[0] = newIdxSize; - sizes[1] = m_MDFileProcessedSize; + sizes[1] = m_MDFileAlreadyReadSize; + sizes[2] = m_MDFileAbsolutePos; } } } @@ -587,18 +592,21 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant, if (newIdxSize > 0) { - // broadcast buffer to all ranks from zero - m_Comm.BroadcastVector(m_BP4Deserializer.m_Metadata.m_Buffer); - - // broadcast metadata index buffer to all ranks from zero - m_Comm.BroadcastVector(m_BP4Deserializer.m_MetadataIndex.m_Buffer); - if (m_BP4Deserializer.m_RankMPI != 0) { - m_MDFileProcessedSize = sizes[1]; + m_MDFileAlreadyReadSize = sizes[1]; + m_MDFileAbsolutePos = sizes[2]; + m_BP4Deserializer.m_MetadataIndex.Reset(true, false); + m_BP4Deserializer.m_Metadata.Reset(true, false); // we need this pointer in Metadata buffer on all processes // for parsing it correctly in ProcessMetadataForNewSteps() } + + // broadcast buffer to all ranks from zero + m_Comm.BroadcastVector(m_BP4Deserializer.m_Metadata.m_Buffer); + + // broadcast metadata index buffer to all ranks from zero + m_Comm.BroadcastVector(m_BP4Deserializer.m_MetadataIndex.m_Buffer); } return newIdxSize; } @@ -613,8 +621,8 @@ void BP4Reader::ProcessMetadataForNewSteps(const size_t newIdxSize) size of the already-processed metadata because the memory buffer of new metadata starts from 0 */ m_BP4Deserializer.ParseMetadataIndex(m_BP4Deserializer.m_MetadataIndex, - m_MDFileProcessedSize, - !m_IdxHeaderParsed); + m_MDFileAbsolutePos, + !m_IdxHeaderParsed, true); m_IdxHeaderParsed = true; // fills IO with Variables and Attributes @@ -622,11 +630,11 @@ void BP4Reader::ProcessMetadataForNewSteps(const size_t newIdxSize) m_BP4Deserializer.m_Metadata, *this, false); // remember current end position in metadata and index table for next round - m_MDFileProcessedSize += newProcessedMDSize; - if (m_BP4Deserializer.m_RankMPI == 0) - { - m_MDIndexFileProcessedSize += newIdxSize; - } + m_MDFileProcessedSize = m_MDFileAbsolutePos + newProcessedMDSize; + // if (m_BP4Deserializer.m_RankMPI == 0) + //{ + // m_MDIndexFileAlreadyReadSize += newIdxSize; + //} } bool BP4Reader::CheckWriterActive() @@ -645,12 +653,32 @@ bool BP4Reader::CheckWriterActive() return m_WriterIsActive; } +bool BP4Reader::ProcessNextStepInMemory() +{ + if (m_MDFileAlreadyReadSize > m_MDFileProcessedSize) + { + // Hack: processing metadata for multiple new steps only works + // when pretending not to be in streaming mode + const bool saveReadStreaming = m_IO.m_ReadStreaming; + m_IO.m_ReadStreaming = false; + ProcessMetadataForNewSteps(0); + m_IO.m_ReadStreaming = saveReadStreaming; + return true; + } + return false; +} + StepStatus BP4Reader::CheckForNewSteps(Seconds timeoutSeconds) { /* Do a collective wait for a step within timeout. Make sure every reader comes to the same conclusion */ StepStatus retval = StepStatus::OK; + if (ProcessNextStepInMemory()) + { + return retval; + } + if (timeoutSeconds < Seconds::zero()) { timeoutSeconds = Seconds(999999999); // max 1 billion seconds wait diff --git a/source/adios2/engine/bp4/BP4Reader.h b/source/adios2/engine/bp4/BP4Reader.h index 4e2d430226..d5d546c837 100644 --- a/source/adios2/engine/bp4/BP4Reader.h +++ b/source/adios2/engine/bp4/BP4Reader.h @@ -61,14 +61,26 @@ class BP4Reader : public Engine format::BP4Deserializer m_BP4Deserializer; /* transport manager for metadata file */ transportman::TransportMan m_MDFileManager; + /* How many bytes of metadata have we already read in? */ + size_t m_MDFileAlreadyReadSize = 0; + /* How many bytes of metadata have we already processed? + * It is <= m_MDFileAlreadyReadSize, at = we need to read more */ size_t m_MDFileProcessedSize = 0; + /* The file position of the first byte that is currently + * residing in memory. Needed for skewing positions when + * processing metadata index. + */ + size_t m_MDFileAbsolutePos = 0; + /* m_MDFileAbsolutePos <= m_MDFileProcessedSize <= m_MDFileAlreadyReadSize + */ /* transport manager for managing data file(s) */ transportman::TransportMan m_DataFileManager; /* transport manager for managing the metadata index file */ transportman::TransportMan m_MDIndexFileManager; - size_t m_MDIndexFileProcessedSize = 0; + /* How many bytes of metadata index have we already read in? */ + size_t m_MDIndexFileAlreadyReadSize = 0; /* transport manager for managing the active flag file */ transportman::TransportMan m_ActiveFlagFileManager; @@ -124,6 +136,14 @@ class BP4Reader : public Engine */ bool CheckWriterActive(); + /** Check for a step that is already in memory but haven't + * been processed yet. + * @return true: if new step has been found and processed, false otherwise + * Used by CheckForNewSteps() to get the next step from memory if there is + * one. + */ + bool ProcessNextStepInMemory(); + /** Check for new steps withing timeout and only if writer is active. * @return the status flag * Used by BeginStep() to get new steps from file when it reaches the diff --git a/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.cpp b/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.cpp index f6afb96a7b..e9673a91f2 100644 --- a/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.cpp +++ b/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.cpp @@ -58,13 +58,13 @@ size_t BP4Deserializer::ParseMetadata(const BufferSTL &bufferSTL, return lastposition; } -void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL, +void BP4Deserializer::ParseMetadataIndex(BufferSTL &bufferSTL, const size_t absoluteStartPos, - const bool hasHeader) + const bool hasHeader, + const bool oneStepOnly) { const auto &buffer = bufferSTL.m_Buffer; - const size_t bufferSize = buffer.size(); - size_t position = 0; + size_t &position = bufferSTL.m_Position; if (hasHeader) { @@ -113,7 +113,7 @@ void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL, } // Read each record now - while (position < bufferSize) + do { std::vector ptrs; const uint64_t currentStep = helper::ReadValue( @@ -137,7 +137,7 @@ void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL, ptrs.push_back(currentTimeStamp); m_MetadataIndexTable[mpiRank][currentStep] = ptrs; position += 8; - } + } while (!oneStepOnly && position < buffer.size()); } const helper::BlockOperationInfo &BP4Deserializer::InitPostOperatorBlockData( diff --git a/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.h b/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.h index 0c36e12bbc..149e03c694 100644 --- a/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.h +++ b/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.h @@ -46,9 +46,8 @@ class BP4Deserializer : virtual public BP4Base ~BP4Deserializer() = default; - void ParseMetadataIndex(const BufferSTL &bufferSTL, - const size_t absoluteStartPos, - const bool hasHeader); + void ParseMetadataIndex(BufferSTL &bufferSTL, const size_t absoluteStartPos, + const bool hasHeader, const bool oneStepOnly); /* Return the position in the buffer where processing ends. The processing * is controlled by the number of records in the Index, which may be less