Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bp5 append #3020

Merged
merged 8 commits into from
Jan 27, 2022
Merged
9 changes: 4 additions & 5 deletions source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ class BP5Engine
static constexpr size_t m_IndexHeaderSize = 64;
static constexpr size_t m_EndianFlagPosition = 36;
static constexpr size_t m_BPVersionPosition = 37;
static constexpr size_t m_ActiveFlagPosition = 38;
static constexpr size_t m_BPMinorVersionPosition = 39;
static constexpr size_t m_WriterCountPosition = 40;
static constexpr size_t m_AggregatorCountPosition = 44;
static constexpr size_t m_ColumnMajorFlagPosition = 48;
static constexpr size_t m_BPMinorVersionPosition = 38;
static constexpr size_t m_ActiveFlagPosition = 39;
static constexpr size_t m_ColumnMajorFlagPosition = 40;
static constexpr size_t m_VersionTagPosition = 0;
static constexpr size_t m_VersionTagLength = 32;

Expand Down Expand Up @@ -130,6 +128,7 @@ class BP5Engine
MACRO(BufferChunkSize, SizeBytes, size_t, DefaultBufferChunkSize) \
MACRO(MaxShmSize, SizeBytes, size_t, DefaultMaxShmSize) \
MACRO(BufferVType, BufferVType, int, (int)BufferVType::ChunkVType) \
MACRO(AppendAfterSteps, Int, int, INT_MAX) \
MACRO(ReaderShortCircuitReads, Bool, bool, false)

struct BP5Params
Expand Down
61 changes: 42 additions & 19 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ void BP5Reader::InstallMetadataForTimestep(size_t Step)
{
size_t pgstart = m_MetadataIndexTable[Step][0];
size_t Position = pgstart + sizeof(uint64_t); // skip total data size
size_t MDPosition = Position + 2 * sizeof(uint64_t) * m_WriterCount;
for (size_t WriterRank = 0; WriterRank < m_WriterCount; WriterRank++)
const uint64_t WriterCount =
m_WriterMap[m_WriterMapIndex[Step]].WriterCount;
size_t MDPosition = Position + 2 * sizeof(uint64_t) * WriterCount;
for (size_t WriterRank = 0; WriterRank < WriterCount; WriterRank++)
{
// variable metadata for timestep
size_t ThisMDSize = helper::ReadValue<uint64_t>(
Expand All @@ -58,7 +60,7 @@ void BP5Reader::InstallMetadataForTimestep(size_t Step)
}
MDPosition += ThisMDSize;
}
for (size_t WriterRank = 0; WriterRank < m_WriterCount; WriterRank++)
for (size_t WriterRank = 0; WriterRank < WriterCount; WriterRank++)
{
// attribute metadata for timestep
size_t ThisADSize = helper::ReadValue<uint64_t>(
Expand Down Expand Up @@ -173,7 +175,8 @@ void BP5Reader::ReadData(const size_t WriterRank, const size_t Timestep,
{
size_t FlushCount = m_MetadataIndexTable[Timestep][2];
size_t DataPosPos = m_MetadataIndexTable[Timestep][3];
size_t SubfileNum = m_WriterToFileMap[WriterRank];
size_t SubfileNum = static_cast<size_t>(
m_WriterMap[m_WriterMapIndex[Timestep]].RankToSubfile[WriterRank]);

// check if subfile is already opened
if (m_DataFileManager.m_Transports.count(SubfileNum) == 0)
Expand Down Expand Up @@ -572,7 +575,7 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant,
// done

m_BP5Deserializer = new format::BP5Deserializer(
m_WriterCount, m_WriterIsRowMajor, m_ReaderIsRowMajor,
m_WriterMap[0].WriterCount, m_WriterIsRowMajor, m_ReaderIsRowMajor,
(m_OpenMode == Mode::ReadRandomAccess));
m_BP5Deserializer->m_Engine = this;

Expand Down Expand Up @@ -647,33 +650,27 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
std::to_string(m_Minifooter.Version) + " version \n");
}

// BP minor version, unused
position = m_BPMinorVersionPosition;

// Writer active flag
position = m_ActiveFlagPosition;
const char activeChar = helper::ReadValue<uint8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
m_WriterIsActive = (activeChar == '\1' ? true : false);
position = m_WriterCountPosition;
m_WriterCount = helper::ReadValue<uint32_t>(
buffer, position, m_Minifooter.IsLittleEndian);
position = m_AggregatorCountPosition;
m_AggregatorCount = helper::ReadValue<uint32_t>(
buffer, position, m_Minifooter.IsLittleEndian);

position = m_ColumnMajorFlagPosition;
const uint8_t val = helper::ReadValue<uint8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
m_WriterIsRowMajor = val == 'n';
// move position to first row
position = 64;
}

for (uint64_t i = 0; i < m_WriterCount; i++)
{
m_WriterToFileMap.push_back(helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian));
position = m_IndexHeaderSize;
}

// Read each record now
uint64_t currentStep = 0;
uint64_t lastMapStep = 0;
uint64_t lastWriterCount = 0;
do
{
std::vector<uint64_t> ptrs;
Expand All @@ -683,6 +680,31 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t FlushCount = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t hasWriterMap = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);

if (hasWriterMap)
{
auto p = m_WriterMap.emplace(currentStep, WriterMapStruct());
auto &s = p.first->second;
s.WriterCount = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
s.AggregatorCount = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
s.SubfileCount = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
// Get the process -> subfile map
s.RankToSubfile.reserve(s.WriterCount);
for (uint64_t i = 0; i < s.WriterCount; i++)
{
const uint64_t subfileIdx = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
s.RankToSubfile.push_back(subfileIdx);
}
lastMapStep = currentStep;
lastWriterCount = s.WriterCount;
}
m_WriterMapIndex.push_back(lastMapStep);

ptrs.push_back(MetadataPos);
ptrs.push_back(MetadataSize);
Expand All @@ -708,7 +730,8 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
}
#endif

position += sizeof(uint64_t) * m_WriterCount * ((2 * FlushCount) + 1);
// skip over the writer -> data file offset records
position += sizeof(uint64_t) * lastWriterCount * ((2 * FlushCount) + 1);
m_StepsCount++;
currentStep++;
} while (!oneStepOnly && position < buffer.size());
Expand Down
20 changes: 17 additions & 3 deletions source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "adios2/toolkit/transportman/TransportMan.h"

#include <chrono>
#include <map>
#include <vector>

namespace adios2
{
Expand Down Expand Up @@ -189,12 +191,11 @@ class BP5Reader : public BP5Engine, public Engine
#undef declare_type

size_t DoSteps() const final;
uint32_t m_WriterCount = 0;
uint32_t m_AggregatorCount = 0;

uint32_t m_WriterColumnMajor = 0;
bool m_ReaderIsRowMajor = true;
bool m_WriterIsRowMajor = true;
std::vector<uint64_t> m_WriterToFileMap;

format::BufferSTL m_MetadataIndex;
format::BufferSTL m_MetaMetadata;
format::BufferSTL m_Metadata;
Expand All @@ -205,6 +206,19 @@ class BP5Reader : public BP5Engine, public Engine
void ReadData(const size_t WriterRank, const size_t Timestep,
const size_t StartOffset, const size_t Length,
char *Destination);

struct WriterMapStruct
{
uint32_t WriterCount = 0;
uint32_t AggregatorCount = 0;
uint32_t SubfileCount = 0;
std::vector<uint64_t> RankToSubfile; // size WriterCount
};

// step -> writermap but not for all steps
std::map<uint64_t, WriterMapStruct> m_WriterMap;
// step -> writermap index (for all steps)
std::vector<uint64_t> m_WriterMapIndex;
};

} // end namespace engine
Expand Down
Loading