Skip to content

Commit

Permalink
Merge pull request #3020 from ornladios/bp5-append
Browse files Browse the repository at this point in the history
Bp5 append
  • Loading branch information
pnorbert authored Jan 27, 2022
2 parents ebc4c8d + 4fbeca5 commit 474570e
Show file tree
Hide file tree
Showing 23 changed files with 623 additions and 280 deletions.
9 changes: 4 additions & 5 deletions source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ class BP5Engine
static constexpr size_t m_IndexHeaderSize = 64;
static constexpr size_t m_EndianFlagPosition = 36;
static constexpr size_t m_BPVersionPosition = 37;
static constexpr size_t m_ActiveFlagPosition = 38;
static constexpr size_t m_BPMinorVersionPosition = 39;
static constexpr size_t m_WriterCountPosition = 40;
static constexpr size_t m_AggregatorCountPosition = 44;
static constexpr size_t m_ColumnMajorFlagPosition = 48;
static constexpr size_t m_BPMinorVersionPosition = 38;
static constexpr size_t m_ActiveFlagPosition = 39;
static constexpr size_t m_ColumnMajorFlagPosition = 40;
static constexpr size_t m_VersionTagPosition = 0;
static constexpr size_t m_VersionTagLength = 32;

Expand Down Expand Up @@ -130,6 +128,7 @@ class BP5Engine
MACRO(BufferChunkSize, SizeBytes, size_t, DefaultBufferChunkSize) \
MACRO(MaxShmSize, SizeBytes, size_t, DefaultMaxShmSize) \
MACRO(BufferVType, BufferVType, int, (int)BufferVType::ChunkVType) \
MACRO(AppendAfterSteps, Int, int, INT_MAX) \
MACRO(ReaderShortCircuitReads, Bool, bool, false)

struct BP5Params
Expand Down
61 changes: 42 additions & 19 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ void BP5Reader::InstallMetadataForTimestep(size_t Step)
{
size_t pgstart = m_MetadataIndexTable[Step][0];
size_t Position = pgstart + sizeof(uint64_t); // skip total data size
size_t MDPosition = Position + 2 * sizeof(uint64_t) * m_WriterCount;
for (size_t WriterRank = 0; WriterRank < m_WriterCount; WriterRank++)
const uint64_t WriterCount =
m_WriterMap[m_WriterMapIndex[Step]].WriterCount;
size_t MDPosition = Position + 2 * sizeof(uint64_t) * WriterCount;
for (size_t WriterRank = 0; WriterRank < WriterCount; WriterRank++)
{
// variable metadata for timestep
size_t ThisMDSize = helper::ReadValue<uint64_t>(
Expand All @@ -58,7 +60,7 @@ void BP5Reader::InstallMetadataForTimestep(size_t Step)
}
MDPosition += ThisMDSize;
}
for (size_t WriterRank = 0; WriterRank < m_WriterCount; WriterRank++)
for (size_t WriterRank = 0; WriterRank < WriterCount; WriterRank++)
{
// attribute metadata for timestep
size_t ThisADSize = helper::ReadValue<uint64_t>(
Expand Down Expand Up @@ -173,7 +175,8 @@ void BP5Reader::ReadData(const size_t WriterRank, const size_t Timestep,
{
size_t FlushCount = m_MetadataIndexTable[Timestep][2];
size_t DataPosPos = m_MetadataIndexTable[Timestep][3];
size_t SubfileNum = m_WriterToFileMap[WriterRank];
size_t SubfileNum = static_cast<size_t>(
m_WriterMap[m_WriterMapIndex[Timestep]].RankToSubfile[WriterRank]);

// check if subfile is already opened
if (m_DataFileManager.m_Transports.count(SubfileNum) == 0)
Expand Down Expand Up @@ -572,7 +575,7 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant,
// done

m_BP5Deserializer = new format::BP5Deserializer(
m_WriterCount, m_WriterIsRowMajor, m_ReaderIsRowMajor,
m_WriterMap[0].WriterCount, m_WriterIsRowMajor, m_ReaderIsRowMajor,
(m_OpenMode == Mode::ReadRandomAccess));
m_BP5Deserializer->m_Engine = this;

Expand Down Expand Up @@ -647,33 +650,27 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
std::to_string(m_Minifooter.Version) + " version \n");
}

// BP minor version, unused
position = m_BPMinorVersionPosition;

// Writer active flag
position = m_ActiveFlagPosition;
const char activeChar = helper::ReadValue<uint8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
m_WriterIsActive = (activeChar == '\1' ? true : false);
position = m_WriterCountPosition;
m_WriterCount = helper::ReadValue<uint32_t>(
buffer, position, m_Minifooter.IsLittleEndian);
position = m_AggregatorCountPosition;
m_AggregatorCount = helper::ReadValue<uint32_t>(
buffer, position, m_Minifooter.IsLittleEndian);

position = m_ColumnMajorFlagPosition;
const uint8_t val = helper::ReadValue<uint8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
m_WriterIsRowMajor = val == 'n';
// move position to first row
position = 64;
}

for (uint64_t i = 0; i < m_WriterCount; i++)
{
m_WriterToFileMap.push_back(helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian));
position = m_IndexHeaderSize;
}

// Read each record now
uint64_t currentStep = 0;
uint64_t lastMapStep = 0;
uint64_t lastWriterCount = 0;
do
{
std::vector<uint64_t> ptrs;
Expand All @@ -683,6 +680,31 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t FlushCount = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t hasWriterMap = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);

if (hasWriterMap)
{
auto p = m_WriterMap.emplace(currentStep, WriterMapStruct());
auto &s = p.first->second;
s.WriterCount = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
s.AggregatorCount = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
s.SubfileCount = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
// Get the process -> subfile map
s.RankToSubfile.reserve(s.WriterCount);
for (uint64_t i = 0; i < s.WriterCount; i++)
{
const uint64_t subfileIdx = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
s.RankToSubfile.push_back(subfileIdx);
}
lastMapStep = currentStep;
lastWriterCount = s.WriterCount;
}
m_WriterMapIndex.push_back(lastMapStep);

ptrs.push_back(MetadataPos);
ptrs.push_back(MetadataSize);
Expand All @@ -708,7 +730,8 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
}
#endif

position += sizeof(uint64_t) * m_WriterCount * ((2 * FlushCount) + 1);
// skip over the writer -> data file offset records
position += sizeof(uint64_t) * lastWriterCount * ((2 * FlushCount) + 1);
m_StepsCount++;
currentStep++;
} while (!oneStepOnly && position < buffer.size());
Expand Down
20 changes: 17 additions & 3 deletions source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "adios2/toolkit/transportman/TransportMan.h"

#include <chrono>
#include <map>
#include <vector>

namespace adios2
{
Expand Down Expand Up @@ -189,12 +191,11 @@ class BP5Reader : public BP5Engine, public Engine
#undef declare_type

size_t DoSteps() const final;
uint32_t m_WriterCount = 0;
uint32_t m_AggregatorCount = 0;

uint32_t m_WriterColumnMajor = 0;
bool m_ReaderIsRowMajor = true;
bool m_WriterIsRowMajor = true;
std::vector<uint64_t> m_WriterToFileMap;

format::BufferSTL m_MetadataIndex;
format::BufferSTL m_MetaMetadata;
format::BufferSTL m_Metadata;
Expand All @@ -205,6 +206,19 @@ class BP5Reader : public BP5Engine, public Engine
void ReadData(const size_t WriterRank, const size_t Timestep,
const size_t StartOffset, const size_t Length,
char *Destination);

struct WriterMapStruct
{
uint32_t WriterCount = 0;
uint32_t AggregatorCount = 0;
uint32_t SubfileCount = 0;
std::vector<uint64_t> RankToSubfile; // size WriterCount
};

// step -> writermap but not for all steps
std::map<uint64_t, WriterMapStruct> m_WriterMap;
// step -> writermap index (for all steps)
std::vector<uint64_t> m_WriterMapIndex;
};

} // end namespace engine
Expand Down
Loading

0 comments on commit 474570e

Please sign in to comment.