diff --git a/source/adios2/engine/bp5/BP5Engine.h b/source/adios2/engine/bp5/BP5Engine.h index 7ed67ada31..6d0e6b82f8 100644 --- a/source/adios2/engine/bp5/BP5Engine.h +++ b/source/adios2/engine/bp5/BP5Engine.h @@ -47,11 +47,9 @@ class BP5Engine static constexpr size_t m_IndexHeaderSize = 64; static constexpr size_t m_EndianFlagPosition = 36; static constexpr size_t m_BPVersionPosition = 37; - static constexpr size_t m_ActiveFlagPosition = 38; - static constexpr size_t m_BPMinorVersionPosition = 39; - static constexpr size_t m_WriterCountPosition = 40; - static constexpr size_t m_AggregatorCountPosition = 44; - static constexpr size_t m_ColumnMajorFlagPosition = 48; + static constexpr size_t m_BPMinorVersionPosition = 38; + static constexpr size_t m_ActiveFlagPosition = 39; + static constexpr size_t m_ColumnMajorFlagPosition = 40; static constexpr size_t m_VersionTagPosition = 0; static constexpr size_t m_VersionTagLength = 32; @@ -130,6 +128,7 @@ class BP5Engine MACRO(BufferChunkSize, SizeBytes, size_t, DefaultBufferChunkSize) \ MACRO(MaxShmSize, SizeBytes, size_t, DefaultMaxShmSize) \ MACRO(BufferVType, BufferVType, int, (int)BufferVType::ChunkVType) \ + MACRO(AppendAfterSteps, Int, int, INT_MAX) \ MACRO(ReaderShortCircuitReads, Bool, bool, false) struct BP5Params diff --git a/source/adios2/engine/bp5/BP5Reader.cpp b/source/adios2/engine/bp5/BP5Reader.cpp index 040e25c8de..96eda6e064 100644 --- a/source/adios2/engine/bp5/BP5Reader.cpp +++ b/source/adios2/engine/bp5/BP5Reader.cpp @@ -40,8 +40,10 @@ void BP5Reader::InstallMetadataForTimestep(size_t Step) { size_t pgstart = m_MetadataIndexTable[Step][0]; size_t Position = pgstart + sizeof(uint64_t); // skip total data size - size_t MDPosition = Position + 2 * sizeof(uint64_t) * m_WriterCount; - for (size_t WriterRank = 0; WriterRank < m_WriterCount; WriterRank++) + const uint64_t WriterCount = + m_WriterMap[m_WriterMapIndex[Step]].WriterCount; + size_t MDPosition = Position + 2 * sizeof(uint64_t) * WriterCount; + for (size_t WriterRank = 0; WriterRank < WriterCount; WriterRank++) { // variable metadata for timestep size_t ThisMDSize = helper::ReadValue( @@ -58,7 +60,7 @@ void BP5Reader::InstallMetadataForTimestep(size_t Step) } MDPosition += ThisMDSize; } - for (size_t WriterRank = 0; WriterRank < m_WriterCount; WriterRank++) + for (size_t WriterRank = 0; WriterRank < WriterCount; WriterRank++) { // attribute metadata for timestep size_t ThisADSize = helper::ReadValue( @@ -173,7 +175,8 @@ void BP5Reader::ReadData(const size_t WriterRank, const size_t Timestep, { size_t FlushCount = m_MetadataIndexTable[Timestep][2]; size_t DataPosPos = m_MetadataIndexTable[Timestep][3]; - size_t SubfileNum = m_WriterToFileMap[WriterRank]; + size_t SubfileNum = static_cast( + m_WriterMap[m_WriterMapIndex[Timestep]].RankToSubfile[WriterRank]); // check if subfile is already opened if (m_DataFileManager.m_Transports.count(SubfileNum) == 0) @@ -572,7 +575,7 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant, // done m_BP5Deserializer = new format::BP5Deserializer( - m_WriterCount, m_WriterIsRowMajor, m_ReaderIsRowMajor, + m_WriterMap[0].WriterCount, m_WriterIsRowMajor, m_ReaderIsRowMajor, (m_OpenMode == Mode::ReadRandomAccess)); m_BP5Deserializer->m_Engine = this; @@ -647,33 +650,27 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, std::to_string(m_Minifooter.Version) + " version \n"); } + // BP minor version, unused + position = m_BPMinorVersionPosition; + // Writer active flag position = m_ActiveFlagPosition; const char activeChar = helper::ReadValue( buffer, position, m_Minifooter.IsLittleEndian); m_WriterIsActive = (activeChar == '\1' ? true : false); - position = m_WriterCountPosition; - m_WriterCount = helper::ReadValue( - buffer, position, m_Minifooter.IsLittleEndian); - position = m_AggregatorCountPosition; - m_AggregatorCount = helper::ReadValue( - buffer, position, m_Minifooter.IsLittleEndian); + position = m_ColumnMajorFlagPosition; const uint8_t val = helper::ReadValue( buffer, position, m_Minifooter.IsLittleEndian); m_WriterIsRowMajor = val == 'n'; // move position to first row - position = 64; - } - - for (uint64_t i = 0; i < m_WriterCount; i++) - { - m_WriterToFileMap.push_back(helper::ReadValue( - buffer, position, m_Minifooter.IsLittleEndian)); + position = m_IndexHeaderSize; } // Read each record now uint64_t currentStep = 0; + uint64_t lastMapStep = 0; + uint64_t lastWriterCount = 0; do { std::vector ptrs; @@ -683,6 +680,31 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, buffer, position, m_Minifooter.IsLittleEndian); const uint64_t FlushCount = helper::ReadValue( buffer, position, m_Minifooter.IsLittleEndian); + const uint64_t hasWriterMap = helper::ReadValue( + buffer, position, m_Minifooter.IsLittleEndian); + + if (hasWriterMap) + { + auto p = m_WriterMap.emplace(currentStep, WriterMapStruct()); + auto &s = p.first->second; + s.WriterCount = helper::ReadValue( + buffer, position, m_Minifooter.IsLittleEndian); + s.AggregatorCount = helper::ReadValue( + buffer, position, m_Minifooter.IsLittleEndian); + s.SubfileCount = helper::ReadValue( + buffer, position, m_Minifooter.IsLittleEndian); + // Get the process -> subfile map + s.RankToSubfile.reserve(s.WriterCount); + for (uint64_t i = 0; i < s.WriterCount; i++) + { + const uint64_t subfileIdx = helper::ReadValue( + buffer, position, m_Minifooter.IsLittleEndian); + s.RankToSubfile.push_back(subfileIdx); + } + lastMapStep = currentStep; + lastWriterCount = s.WriterCount; + } + m_WriterMapIndex.push_back(lastMapStep); ptrs.push_back(MetadataPos); ptrs.push_back(MetadataSize); @@ -708,7 +730,8 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, } #endif - position += sizeof(uint64_t) * m_WriterCount * ((2 * FlushCount) + 1); + // skip over the writer -> data file offset records + position += sizeof(uint64_t) * lastWriterCount * ((2 * FlushCount) + 1); m_StepsCount++; currentStep++; } while (!oneStepOnly && position < buffer.size()); diff --git a/source/adios2/engine/bp5/BP5Reader.h b/source/adios2/engine/bp5/BP5Reader.h index a84fb0d70b..e7103910ff 100644 --- a/source/adios2/engine/bp5/BP5Reader.h +++ b/source/adios2/engine/bp5/BP5Reader.h @@ -20,6 +20,8 @@ #include "adios2/toolkit/transportman/TransportMan.h" #include +#include +#include namespace adios2 { @@ -189,12 +191,11 @@ class BP5Reader : public BP5Engine, public Engine #undef declare_type size_t DoSteps() const final; - uint32_t m_WriterCount = 0; - uint32_t m_AggregatorCount = 0; + uint32_t m_WriterColumnMajor = 0; bool m_ReaderIsRowMajor = true; bool m_WriterIsRowMajor = true; - std::vector m_WriterToFileMap; + format::BufferSTL m_MetadataIndex; format::BufferSTL m_MetaMetadata; format::BufferSTL m_Metadata; @@ -205,6 +206,19 @@ class BP5Reader : public BP5Engine, public Engine void ReadData(const size_t WriterRank, const size_t Timestep, const size_t StartOffset, const size_t Length, char *Destination); + + struct WriterMapStruct + { + uint32_t WriterCount = 0; + uint32_t AggregatorCount = 0; + uint32_t SubfileCount = 0; + std::vector RankToSubfile; // size WriterCount + }; + + // step -> writermap but not for all steps + std::map m_WriterMap; + // step -> writermap index (for all steps) + std::vector m_WriterMapIndex; }; } // end namespace engine diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index 3fdb5fddbc..726adc1b82 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -311,16 +311,32 @@ void BP5Writer::WriteData_EveryoneWrites(format::BufferV *Data, void BP5Writer::WriteMetadataFileIndex(uint64_t MetaDataPos, uint64_t MetaDataSize) { - m_FileMetadataManager.FlushFiles(); - std::vector buf; - buf.resize(3 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size()); + std::vector buf( + 4 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size() + 3 + + m_Comm.Size()); + buf.resize(4 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size()); buf[0] = MetaDataPos; buf[1] = MetaDataSize; buf[2] = FlushPosSizeInfo.size(); + buf[3] = static_cast(m_WriterSubfileMap.size() > 0); - uint64_t pos = 3; + uint64_t pos = 4; + + if (!m_WriterSubfileMap.empty()) + { + // Add Writer to Subfiles Map + buf.resize(buf.size() + 3 + m_Comm.Size()); + buf[4] = static_cast(m_Comm.Size()); + buf[5] = static_cast(m_Aggregator->m_NumAggregators); + buf[6] = static_cast(m_Aggregator->m_SubStreams); + pos += 3; + std::copy(m_WriterSubfileMap.begin(), m_WriterSubfileMap.end(), + buf.begin() + pos); + m_WriterSubfileMap.clear(); + pos += m_Comm.Size(); + } for (int writer = 0; writer < m_Comm.Size(); writer++) { @@ -551,43 +567,6 @@ void BP5Writer::Init() InitBPBuffer(); } -#define declare_type(T) \ - void BP5Writer::DoPut(Variable &variable, \ - typename Variable::Span &span, \ - const bool initialize, const T &value) \ - { \ - PERFSTUBS_SCOPED_TIMER("BP5Writer::Put"); \ - PutCommonSpan(variable, span, initialize, value); \ - } - -ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_type) -#undef declare_type - -#define declare_type(T) \ - void BP5Writer::DoPutSync(Variable &variable, const T *data) \ - { \ - PutCommon(variable, data, true); \ - } \ - void BP5Writer::DoPutDeferred(Variable &variable, const T *data) \ - { \ - PutCommon(variable, data, false); \ - } - -ADIOS2_FOREACH_STDTYPE_1ARG(declare_type) -#undef declare_type - -#define declare_type(T, L) \ - T *BP5Writer::DoBufferData_##L(const int bufferIdx, \ - const size_t payloadPosition, \ - const size_t bufferID) noexcept \ - { \ - return reinterpret_cast( \ - m_BP5Serializer.GetPtr(bufferIdx, payloadPosition)); \ - } - -ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type) -#undef declare_type - void BP5Writer::InitParameters() { ParseParams(m_IO, m_Parameters); @@ -615,6 +594,205 @@ void BP5Writer::InitParameters() } } +uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) +{ + const auto &buffer = bufferSTL.m_Buffer; + size_t &position = bufferSTL.m_Position; + + if (buffer.size() < m_IndexHeaderSize) + { + m_AppendMetadataPos = 0; + m_AppendMetaMetadataPos = 0; + m_AppendMetadataIndexPos = 0; + m_AppendDataPos.resize(m_Aggregator->m_NumAggregators, + 0ULL); // safe bet + return 0; + } + + // Check endinanness + position = m_EndianFlagPosition; + const uint8_t endianness = helper::ReadValue(buffer, position); + bool IsLittleEndian = (endianness == 0) ? true : false; + if (helper::IsLittleEndian() != IsLittleEndian) + { + std::string m = (IsLittleEndian ? "Little" : "Big"); + throw std::runtime_error( + "ERROR: ADIOS2 BP5 Engine only supports appending with the same " + "endianness. The existing file is " + + m + "Endian\n"); + } + + // BP version + position = m_BPVersionPosition; + uint8_t Version = + helper::ReadValue(buffer, position, IsLittleEndian); + if (Version != 5) + { + throw std::runtime_error( + "ERROR: ADIOS2 BP5 Engine only supports bp format " + "version 5, found " + + std::to_string(Version) + " version \n"); + } + + position = m_ColumnMajorFlagPosition; + const uint8_t columnMajor = + helper::ReadValue(buffer, position, IsLittleEndian); + const uint8_t NowColumnMajor = + (m_IO.m_ArrayOrder == ArrayOrdering::ColumnMajor) ? 'y' : 'n'; + if (columnMajor != NowColumnMajor) + { + std::string m = (columnMajor == 'y' ? "column" : "row"); + throw std::runtime_error( + "ERROR: ADIOS2 BP5 Engine only supports appending with the same " + "column/row major settings as it was written." + " Existing file is " + + m + " major\n"); + } + + position = m_IndexHeaderSize; // after the header + // Just count the steps first + unsigned int availableSteps = 0; + uint64_t nDataFiles = 0; + while (position < buffer.size()) + { + position += 2 * sizeof(uint64_t); // MetadataPos, MetadataSize + const uint64_t FlushCount = + helper::ReadValue(buffer, position, IsLittleEndian); + const uint64_t hasWriterMap = + helper::ReadValue(buffer, position, IsLittleEndian); + if (hasWriterMap) + { + m_AppendWriterCount = + helper::ReadValue(buffer, position, IsLittleEndian); + m_AppendAggregatorCount = + helper::ReadValue(buffer, position, IsLittleEndian); + m_AppendSubfileCount = + helper::ReadValue(buffer, position, IsLittleEndian); + if (m_AppendSubfileCount > nDataFiles) + { + nDataFiles = m_AppendSubfileCount; + } + // jump over writermap + position += m_AppendWriterCount * sizeof(uint64_t); + } + + position += + sizeof(uint64_t) * m_AppendWriterCount * ((2 * FlushCount) + 1); + availableSteps++; + } + + unsigned int targetStep = 0; + + if (m_Parameters.AppendAfterSteps < 0) + { + // -1 means append after last step + int s = (int)availableSteps + m_Parameters.AppendAfterSteps + 1; + if (s < 0) + { + s = 0; + } + targetStep = static_cast(s); + } + else + { + targetStep = static_cast(m_Parameters.AppendAfterSteps); + } + if (targetStep > availableSteps) + { + targetStep = availableSteps; + } + + m_AppendDataPos.resize(nDataFiles, 0ULL); + + if (!targetStep) + { + // append at 0 is like writing new file + m_AppendMetadataPos = 0; + m_AppendMetaMetadataPos = 0; + m_AppendMetadataIndexPos = 0; + return 0; + } + + m_AppendMetadataPos = MaxSizeT; // size of header + m_AppendMetaMetadataPos = MaxSizeT; + m_AppendMetadataIndexPos = MaxSizeT; + std::fill(m_AppendDataPos.begin(), m_AppendDataPos.end(), MaxSizeT); + + if (targetStep == availableSteps) + { + // append after existing steps + return targetStep; + } + + // append but not at 0 and not after existing steps + // Read each record now completely to get offsets at step+1 + position = m_IndexHeaderSize; + unsigned int currentStep = 0; + std::vector writerToFileMap; + // reading one step beyond target to get correct offsets + while (currentStep <= targetStep && position < buffer.size()) + { + m_AppendMetadataIndexPos = position; + const uint64_t MetadataPos = + helper::ReadValue(buffer, position, IsLittleEndian); + position += sizeof(uint64_t); // MetadataSize + const uint64_t FlushCount = + helper::ReadValue(buffer, position, IsLittleEndian); + const uint64_t hasWriterMap = + helper::ReadValue(buffer, position, IsLittleEndian); + if (hasWriterMap) + { + m_AppendWriterCount = + helper::ReadValue(buffer, position, IsLittleEndian); + m_AppendAggregatorCount = + helper::ReadValue(buffer, position, IsLittleEndian); + m_AppendSubfileCount = + helper::ReadValue(buffer, position, IsLittleEndian); + + // Get the process -> subfile map + writerToFileMap.clear(); + for (uint64_t i = 0; i < m_AppendWriterCount; i++) + { + const uint64_t subfileIdx = helper::ReadValue( + buffer, position, IsLittleEndian); + writerToFileMap.push_back(subfileIdx); + } + } + + m_AppendMetadataPos = static_cast(MetadataPos); + + if (currentStep == targetStep) + { + // we need the very first (smallest) write position to each subfile + // Offsets and sizes, 2*FlushCount + 1 per writer + for (uint64_t i = 0; i < m_AppendWriterCount; i++) + { + // first flush/write position will do + const size_t FirstDataPos = + static_cast(helper::ReadValue( + buffer, position, IsLittleEndian)); + position += + sizeof(uint64_t) * 2 * FlushCount; // no need to read + /* std::cout << "Writer " << i << " subfile " << + writerToFileMap[i] << " first data loc:" << FirstDataPos << + std::endl; */ + if (FirstDataPos < m_AppendDataPos[writerToFileMap[i]]) + { + m_AppendDataPos[writerToFileMap[i]] = FirstDataPos; + } + } + } + else + { + // jump over all data offsets in this step + position += + sizeof(uint64_t) * m_AppendWriterCount * (1 + 2 * FlushCount); + } + currentStep++; + } + return targetStep; +} + void BP5Writer::InitAggregator() { // in BP5, aggregation is "always on", but processes may be alone, so @@ -724,6 +902,7 @@ void BP5Writer::InitTransports() m_FileMetadataManager.MkDirsBarrier(m_MetadataFileNames, m_IO.m_TransportsParameters, m_Parameters.NodeLocal || m_WriteToBB); + /* Create the directories on burst buffer if used */ if (m_DrainBB) { /* Create the directories on target anyway by main thread */ @@ -819,9 +998,9 @@ void BP5Writer::MakeHeader(format::BufferSTL &b, const std::string fileType, " bytes."); } - if (b.GetAvailableSize() < 64) + if (b.GetAvailableSize() < m_IndexHeaderSize) { - b.Resize(position + 64, "BP4Serializer::MakeHeader " + fileType); + b.Resize(m_IndexHeaderSize, "BP4Serializer::MakeHeader " + fileType); } const std::string majorVersion(std::to_string(ADIOS2_VERSION_MAJOR)); @@ -873,7 +1052,7 @@ void BP5Writer::MakeHeader(format::BufferSTL &b, const std::string fileType, if (position != m_EndianFlagPosition) { throw std::runtime_error( - "ADIOS Coding ERROR in BP4Serializer::MakeHeader. Endian Flag " + "ADIOS Coding ERROR in BP5Writer::MakeHeader. Endian Flag " "position mismatch"); } const uint8_t endianness = helper::IsLittleEndian() ? 0 : 1; @@ -883,41 +1062,40 @@ void BP5Writer::MakeHeader(format::BufferSTL &b, const std::string fileType, if (position != m_BPVersionPosition) { throw std::runtime_error( - "ADIOS Coding ERROR in BP4Serializer::MakeHeader. Active Flag " + "ADIOS Coding ERROR in BP5Writer::MakeHeader. BP Version " "position mismatch"); } const uint8_t version = 5; helper::CopyToBuffer(buffer, position, &version); - // byte 38: Active flag (used in Index Table only) + // byte 38: BP Minor version 1 + if (position != m_BPMinorVersionPosition) + { + throw std::runtime_error( + "ADIOS Coding ERROR in BP5Writer::MakeHeader. BP Minor version " + "position mismatch"); + } + const uint8_t minorversion = 1; + helper::CopyToBuffer(buffer, position, &minorversion); + + // byte 39: Active flag (used in Index Table only) if (position != m_ActiveFlagPosition) { throw std::runtime_error( - "ADIOS Coding ERROR in BP4Serializer::MakeHeader. Active Flag " + "ADIOS Coding ERROR in BP5Writer::MakeHeader. Active Flag " "position mismatch"); } const uint8_t activeFlag = (isActive ? 1 : 0); helper::CopyToBuffer(buffer, position, &activeFlag); - // byte 39: Minor file version - const uint8_t subversion = 0; - helper::CopyToBuffer(buffer, position, &subversion); - - // bytes 40-43 writer count - const uint32_t WriterCount = m_Comm.Size(); - helper::CopyToBuffer(buffer, position, &WriterCount); - // bytes 44-47 aggregator count - const uint32_t AggregatorCount = - static_cast(m_Aggregator->m_NumAggregators); - helper::CopyToBuffer(buffer, position, &AggregatorCount); - // byte 48 columnMajor + // byte 40 columnMajor // write if data is column major in metadata and data const uint8_t columnMajor = (m_IO.m_ArrayOrder == ArrayOrdering::ColumnMajor) ? 'y' : 'n'; helper::CopyToBuffer(buffer, position, &columnMajor); - // byte 49-63: unused - position += 15; + // byte 41-63: unused + position += 23; absolutePosition = position; } @@ -940,95 +1118,6 @@ void BP5Writer::UpdateActiveFlag(const bool active) } } -uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) -{ - const auto &buffer = bufferSTL.m_Buffer; - size_t &position = bufferSTL.m_Position; - - // Check endinanness - position = m_EndianFlagPosition; - const uint8_t endianness = helper::ReadValue(buffer, position); - bool IsLittleEndian = (endianness == 0) ? true : false; -#ifndef ADIOS2_HAVE_ENDIAN_REVERSE - if (helper::IsLittleEndian() != IsLittleEndian) - { - throw std::runtime_error( - "ERROR: reader found BigEndian bp file, " - "this version of ADIOS2 wasn't compiled " - "with the cmake flag -DADIOS2_USE_Endian_Reverse=ON " - "explicitly, in call to Open\n"); - } -#endif - - // BP version - position = m_BPVersionPosition; - uint8_t Version = - helper::ReadValue(buffer, position, IsLittleEndian); - if (Version != 5) - { - throw std::runtime_error( - "ERROR: ADIOS2 BP5 Engine only supports bp format " - "version 5, found " + - std::to_string(Version) + " version \n"); - } - - position = m_WriterCountPosition; - uint32_t WriterCount = - helper::ReadValue(buffer, position, IsLittleEndian); - if ((int)WriterCount != m_Comm.Size()) - { - throw std::runtime_error( - "ERROR: ADIOS2 BP5 Engine only supports appending with the same " - "number of processes as it was written." - " Number of writers in the existing file = " + - std::to_string(WriterCount) + "\n"); - } - - position = m_AggregatorCountPosition; - uint32_t AggregatorCount = - helper::ReadValue(buffer, position, IsLittleEndian); - if ((size_t)AggregatorCount != m_Aggregator->m_NumAggregators) - { - throw std::runtime_error( - "ERROR: ADIOS2 BP5 Engine only supports appending with the same " - "number of aggregators as it was written." - " Number of aggregators in the existing file = " + - std::to_string(AggregatorCount) + - " current number of aggregators = " + - std::to_string(m_Aggregator->m_NumAggregators) + "\n"); - } - - position = m_ColumnMajorFlagPosition; - const uint8_t columnMajor = - helper::ReadValue(buffer, position, IsLittleEndian); - const uint8_t NowColumnMajor = - (m_IO.m_ArrayOrder == ArrayOrdering::ColumnMajor) ? 'y' : 'n'; - if (columnMajor != NowColumnMajor) - { - std::string m = (columnMajor == 'y' ? "column" : "row"); - throw std::runtime_error( - "ERROR: ADIOS2 BP5 Engine only supports appending with the same " - "column/row major settings as it was written." - " Existing file is " + - m + " major\n"); - } - - // move position to first row - position = 64 + WriterCount * sizeof(uint64_t); - - // Read each record now - uint64_t currentStep = 0; - while (position < buffer.size()) - { - position += 2 * sizeof(uint64_t); // MetadataPos, MetadataSize - const uint64_t FlushCount = - helper::ReadValue(buffer, position, IsLittleEndian); - position += sizeof(uint64_t) * WriterCount * ((2 * FlushCount) + 1); - currentStep++; - } - return currentStep; -} - void BP5Writer::InitBPBuffer() { if (m_OpenMode == Mode::Append) @@ -1049,56 +1138,95 @@ void BP5Writer::InitBPBuffer() } m_Comm.BroadcastVector(preMetadataIndex.m_Buffer); m_WriterStep = CountStepsInMetadataIndex(preMetadataIndex); - if (m_WriterStep > 0) + + // truncate and seek + if (m_Aggregator->m_IsAggregator) { - if (m_Aggregator->m_IsAggregator) + const size_t off = m_AppendDataPos[m_Aggregator->m_SubStreamIndex]; + if (off < MaxSizeT) + { + m_FileDataManager.Truncate(off); + // Seek is needed since truncate does not seek. + // SeekTo instead of SeetToFileEnd in case a transport + // does not support actual truncate. + m_FileDataManager.SeekTo(off); + m_DataPos = off; + } + else { m_DataPos = m_FileDataManager.GetFileSize(0); } + } - if (m_Comm.Rank() == 0) + if (m_Comm.Rank() == 0) + { + // Truncate existing metadata file + if (m_AppendMetadataPos < MaxSizeT) + { + m_MetaDataPos = m_AppendMetadataPos; + m_FileMetadataManager.Truncate(m_MetaDataPos); + m_FileMetadataManager.SeekTo(m_MetaDataPos); + } + else { - // Get the size of existing metametadata file - m_BP5Serializer.m_PreMetaMetadataFileLength = - m_FileMetaMetadataManager.GetFileSize(0); - // Get the size of existing metadata file m_MetaDataPos = m_FileMetadataManager.GetFileSize(0); + m_FileMetadataManager.SeekToFileEnd(); + } + + // Truncate existing meta-meta file + if (m_AppendMetaMetadataPos < MaxSizeT) + { + m_FileMetaMetadataManager.Truncate(m_AppendMetaMetadataPos); + m_FileMetaMetadataManager.SeekTo(m_AppendMetaMetadataPos); + } + else + { + m_FileMetadataIndexManager.SeekToFileEnd(); + } + + // Set the flag in the header of metadata index table to 1 again + // to indicate a new run begins + UpdateActiveFlag(true); + + // Truncate existing index file + if (m_AppendMetadataIndexPos < MaxSizeT) + { + m_FileMetadataIndexManager.Truncate(m_AppendMetadataIndexPos); + m_FileMetadataIndexManager.SeekTo(m_AppendMetadataIndexPos); + } + else + { + m_FileMetadataIndexManager.SeekToFileEnd(); } } + m_AppendDataPos.clear(); } if (!m_WriterStep) { - /* This is a new file. + /* This is a new file or append at 0 * Make headers in data buffer and metadata buffer (but do not write * them yet so that Open() can stay free of writing to disk) */ - const uint64_t a = - static_cast(m_Aggregator->m_SubStreamIndex); - std::vector Assignment = m_Comm.GatherValues(a, 0); if (m_Comm.Rank() == 0) { format::BufferSTL b; MakeHeader(b, "Metadata", false); + m_FileMetadataManager.SeekToFileBegin(); m_FileMetadataManager.WriteFiles(b.m_Buffer.data(), b.m_Position); m_MetaDataPos = b.m_Position; format::BufferSTL bi; MakeHeader(bi, "Index Table", true); + m_FileMetadataIndexManager.SeekToFileBegin(); m_FileMetadataIndexManager.WriteFiles(bi.m_Buffer.data(), bi.m_Position); - // where each rank's data will end up - m_FileMetadataIndexManager.WriteFiles((char *)Assignment.data(), - sizeof(Assignment[0]) * - Assignment.size()); + m_FileMetaMetadataManager.SeekToFileBegin(); } - } - else - { - if (m_Comm.Rank() == 0) + // last attempt to clean up datafile if called with append mode, + // data existed but index was missing + if (m_Aggregator->m_IsAggregator) { - // Set the flag in the header of metadata index table to 1 again - // to indicate a new run begins - UpdateActiveFlag(true); + m_FileDataManager.SeekTo(0); } } @@ -1106,6 +1234,19 @@ void BP5Writer::InitBPBuffer() { m_WriterDataPos.resize(m_Comm.Size()); } + + if (!m_WriterStep || + m_AppendWriterCount != static_cast(m_Comm.Size()) || + m_AppendAggregatorCount != + static_cast(m_Aggregator->m_NumAggregators) || + m_AppendSubfileCount != + static_cast(m_Aggregator->m_SubStreams)) + { + // new Writer Map is needed, generate now, write later + const uint64_t a = + static_cast(m_Aggregator->m_SubStreamIndex); + m_WriterSubfileMap = m_Comm.GatherValues(a, 0); + } } void BP5Writer::NotifyEngineAttribute(std::string name, DataType type) noexcept @@ -1357,6 +1498,43 @@ size_t BP5Writer::DebugGetDataBufferSize() const return m_BP5Serializer.DebugGetDataBufferSize(); } +#define declare_type(T) \ + void BP5Writer::DoPut(Variable &variable, \ + typename Variable::Span &span, \ + const bool initialize, const T &value) \ + { \ + PERFSTUBS_SCOPED_TIMER("BP5Writer::Put"); \ + PutCommonSpan(variable, span, initialize, value); \ + } + +ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_type) +#undef declare_type + +#define declare_type(T) \ + void BP5Writer::DoPutSync(Variable &variable, const T *data) \ + { \ + PutCommon(variable, data, true); \ + } \ + void BP5Writer::DoPutDeferred(Variable &variable, const T *data) \ + { \ + PutCommon(variable, data, false); \ + } + +ADIOS2_FOREACH_STDTYPE_1ARG(declare_type) +#undef declare_type + +#define declare_type(T, L) \ + T *BP5Writer::DoBufferData_##L(const int bufferIdx, \ + const size_t payloadPosition, \ + const size_t bufferID) noexcept \ + { \ + return reinterpret_cast( \ + m_BP5Serializer.GetPtr(bufferIdx, payloadPosition)); \ + } + +ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type) +#undef declare_type + } // end namespace engine } // end namespace core } // end namespace adios2 diff --git a/source/adios2/engine/bp5/BP5Writer.h b/source/adios2/engine/bp5/BP5Writer.h index de2f0c3465..d452de2c3b 100644 --- a/source/adios2/engine/bp5/BP5Writer.h +++ b/source/adios2/engine/bp5/BP5Writer.h @@ -106,7 +106,7 @@ class BP5Writer : public BP5Engine, public core::Engine void InitParameters() final; /** Set up the aggregator */ void InitAggregator(); - /** Parses transports and parameters from IO AddTransport */ + /** Complete opening/createing metadata and data files */ void InitTransports() final; /** Allocates memory and starts a PG group */ void InitBPBuffer(); @@ -238,6 +238,21 @@ class BP5Writer : public BP5Engine, public core::Engine void MakeHeader(format::BufferSTL &b, const std::string fileType, const bool isActive); + std::vector m_WriterSubfileMap; // rank => subfile index + + // Append helper data + std::vector m_AppendDataPos; // each subfile append pos + size_t m_AppendMetadataPos; // metadata file append pos + size_t m_AppendMetaMetadataPos; // meta-metadata file append pos + size_t m_AppendMetadataIndexPos; // index file append pos + uint32_t m_AppendWriterCount; // last active number of writers + unsigned int m_AppendAggregatorCount; // last active number of aggr + unsigned int m_AppendSubfileCount; // last active number of subfiles + /* Process existing index, fill in append variables, + * and return the actual step we land after appending. + * Uses parameter AppendAfterStep + * It resets m_Aggregator->m_NumAggregators so init aggregators later + */ uint64_t CountStepsInMetadataIndex(format::BufferSTL &bufferSTL); /* Async write's future */ diff --git a/source/adios2/toolkit/transport/Transport.h b/source/adios2/toolkit/transport/Transport.h index 3ccde6be19..dafeb94ac1 100644 --- a/source/adios2/toolkit/transport/Transport.h +++ b/source/adios2/toolkit/transport/Transport.h @@ -154,6 +154,8 @@ class Transport virtual void Seek(const size_t start = MaxSizeT) = 0; + virtual void Truncate(const size_t length) = 0; + virtual void MkDir(const std::string &fileName) = 0; protected: diff --git a/source/adios2/toolkit/transport/file/FileDaos.cpp b/source/adios2/toolkit/transport/file/FileDaos.cpp index adfed83b40..0ba08b0ee6 100644 --- a/source/adios2/toolkit/transport/file/FileDaos.cpp +++ b/source/adios2/toolkit/transport/file/FileDaos.cpp @@ -946,5 +946,10 @@ void FileDaos::Seek(const size_t start) } } +void FileDaos::Truncate(const size_t length) +{ + throw std::ios_base::failure("ERROR: Daos Truncate is not implemented yet"); +} + } // end namespace transport } // end namespace adios2 diff --git a/source/adios2/toolkit/transport/file/FileDaos.h b/source/adios2/toolkit/transport/file/FileDaos.h index d6f533b576..810267a5cc 100644 --- a/source/adios2/toolkit/transport/file/FileDaos.h +++ b/source/adios2/toolkit/transport/file/FileDaos.h @@ -57,6 +57,8 @@ class FileDaos : public Transport void Seek(const size_t start = MaxSizeT) final; + void Truncate(const size_t length) final; + void MkDir(const std::string &fileName) final; private: diff --git a/source/adios2/toolkit/transport/file/FileFStream.cpp b/source/adios2/toolkit/transport/file/FileFStream.cpp index e9da737894..2ccdd931af 100644 --- a/source/adios2/toolkit/transport/file/FileFStream.cpp +++ b/source/adios2/toolkit/transport/file/FileFStream.cpp @@ -354,6 +354,24 @@ void FileFStream::Seek(const size_t start) } } +#if __cplusplus >= 201703L +#include +#endif + +void FileFStream::Truncate(const size_t length) +{ +#if __cplusplus >= 201703L + // C++17 specific stuff here + WaitForOpen(); + std::filesystem::path p(m_Name); + std::filesystem::resize_file(p, static_cast(length)); + CheckFile("couldn't move to offset " + std::to_string(start) + " of file " + + m_Name + ", in call to fstream seekp"); +#else + // Trunation is not supported in a portable manner pre C++17 +#endif +} + void FileFStream::MkDir(const std::string &fileName) {} } // end namespace transport diff --git a/source/adios2/toolkit/transport/file/FileFStream.h b/source/adios2/toolkit/transport/file/FileFStream.h index bc3b7ac0ef..f9441c52d3 100644 --- a/source/adios2/toolkit/transport/file/FileFStream.h +++ b/source/adios2/toolkit/transport/file/FileFStream.h @@ -59,6 +59,8 @@ class FileFStream : public Transport void Seek(const size_t start = MaxSizeT) final; + void Truncate(const size_t length) final; + void MkDir(const std::string &fileName) final; private: diff --git a/source/adios2/toolkit/transport/file/FileIME.cpp b/source/adios2/toolkit/transport/file/FileIME.cpp index 5a46b17cc3..7047743731 100644 --- a/source/adios2/toolkit/transport/file/FileIME.cpp +++ b/source/adios2/toolkit/transport/file/FileIME.cpp @@ -355,6 +355,11 @@ void FileIME::Seek(const size_t start) } } +void FileIME::Truncate(const size_t length) +{ + throw std::ios_base::failure("ERROR: Daos Truncate is not implemented yet"); +} + void FileIME::MkDir(const std::string &fileName) {} } // end namespace transport diff --git a/source/adios2/toolkit/transport/file/FileIME.h b/source/adios2/toolkit/transport/file/FileIME.h index 24b9b60fbc..f45b63a9a4 100644 --- a/source/adios2/toolkit/transport/file/FileIME.h +++ b/source/adios2/toolkit/transport/file/FileIME.h @@ -59,6 +59,8 @@ class FileIME : public Transport void Seek(const size_t start = MaxSizeT) final; + void Truncate(const size_t length) final; + void MkDir(const std::string &fileName) final; private: diff --git a/source/adios2/toolkit/transport/file/FilePOSIX.cpp b/source/adios2/toolkit/transport/file/FilePOSIX.cpp index 52dc677767..a197541462 100644 --- a/source/adios2/toolkit/transport/file/FilePOSIX.cpp +++ b/source/adios2/toolkit/transport/file/FilePOSIX.cpp @@ -17,7 +17,7 @@ #include // open, fstat #include // open #include // writev -#include // write, close +#include // write, close, ftruncate #include @@ -566,6 +566,21 @@ void FilePOSIX::Seek(const size_t start) } } +void FilePOSIX::Truncate(const size_t length) +{ + WaitForOpen(); + errno = 0; + const int status = ftruncate(m_FileDescriptor, static_cast(length)); + m_Errno = errno; + if (status == -1) + { + throw std::ios_base::failure( + "ERROR: couldn't truncate to " + std::to_string(length) + + " bytes of file " + m_Name + ", in call to POSIX IO truncate" + + SysErrMsg()); + } +} + void FilePOSIX::MkDir(const std::string &fileName) {} } // end namespace transport diff --git a/source/adios2/toolkit/transport/file/FilePOSIX.h b/source/adios2/toolkit/transport/file/FilePOSIX.h index 2585d4a093..255dbba2b3 100644 --- a/source/adios2/toolkit/transport/file/FilePOSIX.h +++ b/source/adios2/toolkit/transport/file/FilePOSIX.h @@ -66,6 +66,8 @@ class FilePOSIX : public Transport void Seek(const size_t start = MaxSizeT) final; + void Truncate(const size_t length) final; + void MkDir(const std::string &fileName) final; private: diff --git a/source/adios2/toolkit/transport/file/FileStdio.cpp b/source/adios2/toolkit/transport/file/FileStdio.cpp index 0db79d7254..8f213610ca 100644 --- a/source/adios2/toolkit/transport/file/FileStdio.cpp +++ b/source/adios2/toolkit/transport/file/FileStdio.cpp @@ -372,6 +372,30 @@ void FileStdio::Seek(const size_t start) } } +#ifdef _WIN32 +void FileStdio::Truncate(const size_t length) +{ + throw std::ios_base::failure( + "ERROR: FileStdio::Truncate is not supported in Windows\n"); +} +#else +#include // ftruncate +void FileStdio::Truncate(const size_t length) +{ + + WaitForOpen(); + int fd = fileno(m_File); + const auto status = ftruncate(fd, length); + if (status == -1) + { + throw std::ios_base::failure("ERROR: couldn't truncate to " + + std::to_string(length) + " of file " + + m_Name + ", in call to stdio Truncate\n"); + } +} +#endif + void FileStdio::MkDir(const std::string &fileName) {} + } // end namespace transport } // end namespace adios2 diff --git a/source/adios2/toolkit/transport/file/FileStdio.h b/source/adios2/toolkit/transport/file/FileStdio.h index d3b9e9c44a..37d7ded3d4 100644 --- a/source/adios2/toolkit/transport/file/FileStdio.h +++ b/source/adios2/toolkit/transport/file/FileStdio.h @@ -57,6 +57,8 @@ class FileStdio : public Transport void Seek(const size_t start) final; + void Truncate(const size_t length) final; + void MkDir(const std::string &fileName) final; private: diff --git a/source/adios2/toolkit/transport/null/NullTransport.cpp b/source/adios2/toolkit/transport/null/NullTransport.cpp index 41825fd3df..2a478c36f0 100644 --- a/source/adios2/toolkit/transport/null/NullTransport.cpp +++ b/source/adios2/toolkit/transport/null/NullTransport.cpp @@ -141,6 +141,16 @@ void NullTransport::Seek(const size_t start) Impl->CurPos = start; } +void NullTransport::Truncate(const size_t length) +{ + if (!Impl->IsOpen) + { + throw std::runtime_error( + "ERROR: NullTransport::Truncate: The transport is not open."); + } + Impl->Capacity = length; +} + void NullTransport::MkDir(const std::string &fileName) { return; } void NullTransport::CheckName() const { return; } diff --git a/source/adios2/toolkit/transport/null/NullTransport.h b/source/adios2/toolkit/transport/null/NullTransport.h index f8b95c78a6..28c9f272b0 100644 --- a/source/adios2/toolkit/transport/null/NullTransport.h +++ b/source/adios2/toolkit/transport/null/NullTransport.h @@ -58,6 +58,8 @@ class NullTransport : public Transport void Seek(const size_t start = MaxSizeT) override; + void Truncate(const size_t length) override; + protected: struct NullTransportImpl; std::unique_ptr Impl; diff --git a/source/adios2/toolkit/transportman/TransportMan.cpp b/source/adios2/toolkit/transportman/TransportMan.cpp index 307416064c..0f6b9eab26 100644 --- a/source/adios2/toolkit/transportman/TransportMan.cpp +++ b/source/adios2/toolkit/transportman/TransportMan.cpp @@ -376,6 +376,28 @@ void TransportMan::SeekTo(const size_t start, const int transportIndex) } } +void TransportMan::Truncate(const size_t length, const int transportIndex) +{ + if (transportIndex == -1) + { + for (auto &transportPair : m_Transports) + { + auto &transport = transportPair.second; + if (transport->m_Type == "File") + { + transport->Truncate(length); + } + } + } + else + { + auto itTransport = m_Transports.find(transportIndex); + CheckFile(itTransport, ", in call to Truncate with index " + + std::to_string(transportIndex)); + itTransport->second->Truncate(length); + } +} + size_t TransportMan::GetFileSize(const size_t transportIndex) const { auto itTransport = m_Transports.find(transportIndex); diff --git a/source/adios2/toolkit/transportman/TransportMan.h b/source/adios2/toolkit/transportman/TransportMan.h index 8d901cc723..ba60c84897 100644 --- a/source/adios2/toolkit/transportman/TransportMan.h +++ b/source/adios2/toolkit/transportman/TransportMan.h @@ -203,6 +203,8 @@ class TransportMan void SeekTo(const size_t start, const int transportIndex = -1); + void Truncate(const size_t length, const int transportIndex = -1); + /** * Check if a file exists. * @param name diff --git a/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py b/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py index 0d68354bf7..cfae81fa4c 100644 --- a/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py +++ b/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py @@ -1,26 +1,35 @@ +from ast import Sub import numpy as np from os import fstat from .utils import * WriterCount = -1 -def ReadWriterArray(f, fileSize, WriterCount): +def ReadWriterMap(bytearray, pos): + data = np.frombuffer(bytearray, dtype=np.uint64, count=3, + offset=pos) + WriterCount = int(data[0]) + AggregatorCount = int(data[1]) + SubfileCount = int(data[2]) + pos = pos + 3 * 8 - print("Writer count is " + str(WriterCount)) - array = f.read(WriterCount * 8) - print("=====================") - print("| Rank | Subfile |") - print("=====================") + print(" WriterMap: Writers = {0} Aggregators = {1} Subfiles = {2}" + .format(WriterCount, AggregatorCount, SubfileCount)) + data = np.frombuffer(bytearray, dtype=np.uint64, count=WriterCount, + offset=pos) + print(" =====================") + print(" | Rank | Subfile |") + print(" ---------------------") for r in range(0, WriterCount): - pos = r * 8 - data = np.frombuffer(array, dtype=np.uint64, count=1, offset=pos) rank = str(r).rjust(7) - sub = str(data[0]).rjust(9) - print("|" + rank + " | FlushCount = " + sub + " |") - print("=====================") - return True + sub = str(data[r]).rjust(8) + print(" |" + rank + " | " + sub + " |") + print(" =====================") + + pos = pos + WriterCount * 8 + return pos, WriterCount, AggregatorCount, SubfileCount -def ReadIndex(f, fileSize, WriterCount): +def ReadIndex(f, fileSize): nBytes = fileSize - f.tell() if nBytes <= 0: return True @@ -30,17 +39,25 @@ def ReadIndex(f, fileSize, WriterCount): while pos < nBytes: print("-----------------------------------------------" + "---------------------------------------------------") - data = np.frombuffer(table, dtype=np.uint64, count=3, + data = np.frombuffer(table, dtype=np.uint64, count=4, offset=pos) stepstr = str(step).ljust(6) mdatapos = str(data[0]).ljust(10) mdatasize = str(data[1]).ljust(10) flushcount = str(data[2]).ljust(3) FlushCount = data[2] + haswritermap = data[3] print("| Step = " + stepstr + "| MetadataPos = " + mdatapos + - " | MetadataSize = " + mdatasize + " |" + flushcount + "|") + " | MetadataSize = " + mdatasize + " | FlushCount = " + + flushcount + "| hasWriterMap = " + + str(haswritermap).ljust(3) + "|") + + pos = pos + 4 * 8 + + if (haswritermap > 0): + pos, WriterCount, AggregatorCount, SubfileCount = ReadWriterMap( + table, pos) - pos = pos + 3 * 8 for Writer in range(0, WriterCount): start = " Writer " + str(Writer) + " data " thiswriter = np.frombuffer(table, dtype=np.uint64, @@ -74,12 +91,9 @@ def DumpIndexTable(fileName): fileSize = fstat(f.fileno()).st_size status = ReadHeader(f, fileSize, "Index Table") if isinstance(status, list): - WriterCount = status[1] status = status[0] if status: - status = ReadWriterArray(f, fileSize, WriterCount) - if status: - status = ReadIndex(f, fileSize, WriterCount) + status = ReadIndex(f, fileSize) return status diff --git a/source/utils/bp5dbg/adios2/bp5dbg/utils.py b/source/utils/bp5dbg/adios2/bp5dbg/utils.py index bcc207d490..22dbe45e6b 100644 --- a/source/utils/bp5dbg/adios2/bp5dbg/utils.py +++ b/source/utils/bp5dbg/adios2/bp5dbg/utils.py @@ -130,37 +130,35 @@ def ReadHeader(f, fileSize, fileType): status = False bpversion = int(header[37]) - active = int(header[38]) + bpminorversion = int(header[38]) + active = int(header[39]) if active == 0: activeStr = ' no' else: activeStr = 'yes' - - # unused = hStr[39] - - WriterCount = int(header[40]) - aggregatorcount = int(header[44]) - iscolumnmajor = header[49] + iscolumnmajor = hStr[40] + if iscolumnmajor == 'n': + clmnStr = ' no' + else: + clmnStr = ' yes' # 45..63 unused - print("-----------------------------------------------------------" - "-----------------------------------------------------------") - print("| Version string | Major | Minor | Patch " - "| unused | Endian | BP version | Active | WriterCount | AggCount" + - " | ColumnMajor | unused |") - print("| 32 bytes | 1B | 1B | 1B " - "| 1B | 1B | 1B | 1B | 4b | 4b " + - "| 1b | 16B |") - print("+----------------------------------------------------------" - "----------------------------------------------------------+") - print("| {0} | {1} | {2} | {3} | | {4} " - "| {5} | {6} | {7:d} | {8:d} | " + - "{9} | |".format( - versionStr, major, minor, micro, endian, bpversion, activeStr, - WriterCount, aggregatorcount, iscolumnmajor)) - print("-----------------------------------------------------------" - "-----------------------------------------------------------") - return [status, WriterCount] + print("---------------------------------------------------------" + "---------------------------------------------------------") + print("| Version string |Major|Minor|Patch" + "|unused|Endian|BP version|BP minor|Active|ColumnMajor|unused|") + print("| 32 bytes | 1B | 1B | 1B " + "| 1B | 1B | 1B | 1B | 1B | 1B | 23B |") + print("+----------------------------------+-----+-----+-----" + "+------+------+----------+--------+------+-----------+------+") + print("| {0} | {1} | {2} | {3} | | {4} " + "| {5} | {6} | {7} | {8} | |".format( + versionStr, str(major).center(3), str(minor).center(3), + str(micro).center(3), endian, str(bpversion).center(3), + str(bpminorversion).center(3), activeStr, clmnStr)) + print("---------------------------------------------------------" + "---------------------------------------------------------") + return status if __name__ == "__main__": diff --git a/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp b/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp index 4617c8252d..114ec156cb 100644 --- a/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp +++ b/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp @@ -836,48 +836,35 @@ TEST_F(BPWriteAppendReadTestADIOS2, ADIOS2BPWriteAppendReadVaryingAggregation) /* Write phase II: append */ io.SetParameter("NumAggregators", "1"); - if (engineName == "BP5" && mpiSize > 1) + bpWriter = io.Open(fname, adios2::Mode::Append); + for (size_t step = NSteps; step < 2 * NSteps; ++step) { - EXPECT_THROW(bpWriter = io.Open(fname, adios2::Mode::Append), - std::runtime_error); + // Generate test data for each process uniquely + SmallTestData currentTestData = generateNewSmallTestData( + m_TestData, static_cast(step), mpiRank, mpiSize); + bpWriter.BeginStep(); + bpWriter.Put(var_i32, currentTestData.I32.data()); + bpWriter.EndStep(); } - else - { - bpWriter = io.Open(fname, adios2::Mode::Append); - for (size_t step = NSteps; step < 2 * NSteps; ++step) - { - // Generate test data for each process uniquely - SmallTestData currentTestData = generateNewSmallTestData( - m_TestData, static_cast(step), mpiRank, mpiSize); - bpWriter.BeginStep(); - bpWriter.Put(var_i32, currentTestData.I32.data()); - bpWriter.EndStep(); - } - bpWriter.Close(); + bpWriter.Close(); - /* Write phase III: append */ - io.SetParameter("NumAggregators", "2"); - bpWriter = io.Open(fname, adios2::Mode::Append); - for (size_t step = 2 * NSteps; step < 3 * NSteps; ++step) - { - // Generate test data for each process uniquely - SmallTestData currentTestData = generateNewSmallTestData( - m_TestData, static_cast(step), mpiRank, mpiSize); - bpWriter.BeginStep(); - bpWriter.Put(var_i32, currentTestData.I32.data()); - bpWriter.EndStep(); - } - bpWriter.Close(); + /* Write phase III: append */ + io.SetParameter("NumAggregators", "2"); + bpWriter = io.Open(fname, adios2::Mode::Append); + for (size_t step = 2 * NSteps; step < 3 * NSteps; ++step) + { + // Generate test data for each process uniquely + SmallTestData currentTestData = generateNewSmallTestData( + m_TestData, static_cast(step), mpiRank, mpiSize); + bpWriter.BeginStep(); + bpWriter.Put(var_i32, currentTestData.I32.data()); + bpWriter.EndStep(); } + bpWriter.Close(); } { size_t NumSteps = 3 * NSteps; - if (engineName == "BP5" && mpiSize > 1) - { - NumSteps = NSteps; - } - adios2::IO io = adios.DeclareIO("ReadIO"); io.SetEngine(engineName);