Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BP5 index format is now record-based: #3099

Merged
merged 2 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ class BP5Engine
static constexpr size_t m_VersionTagPosition = 0;
static constexpr size_t m_VersionTagLength = 32;

static constexpr uint8_t m_BP5MinorVersion = 2;

/** Index record types */
enum IndexRecord
{
StepRecord = 's',
WriterMapRecord = 'w',
};

std::vector<std::string>
GetBPSubStreamNames(const std::vector<std::string> &names,
size_t subFileIndex) const noexcept;
Expand Down
148 changes: 93 additions & 55 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,16 @@ size_t BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,

// BP minor version, unused
position = m_BPMinorVersionPosition;
const uint8_t minorversion = helper::ReadValue<uint8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
if (minorversion != m_BP5MinorVersion)
{
helper::Throw<std::runtime_error>(
"Engine", "BP5Reader", "ParseMetadataIndex",
"Current ADIOS2 BP5 Engine only supports version 5." +
std::to_string(m_BP5MinorVersion) + ", found 5." +
std::to_string(minorversion) + " version");
}

// Writer active flag
position = m_ActiveFlagPosition;
Expand Down Expand Up @@ -735,27 +745,22 @@ size_t BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
m_FilteredMetadataInfo.clear();
uint64_t minfo_pos = 0;
uint64_t minfo_size = 0;
int n = 0; // a loop counter for current run
int n = 0; // a loop counter for current run4
int nrec = 0; // number of records in current run

while (position < buffer.size() &&
metadataSizeToRead < maxMetadataSizeInMemory)
{
std::vector<uint64_t> ptrs;
const uint64_t MetadataPos = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t MetadataSize = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t FlushCount = helper::ReadValue<uint64_t>(

const unsigned char recordID = helper::ReadValue<unsigned char>(
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t hasWriterMap = helper::ReadValue<uint64_t>(
const uint64_t recordLength = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
const size_t dbgRecordStartPosition = position;

if (!n)
switch (recordID)
{
minfo_pos = MetadataPos; // initialize minfo_pos properly
MetadataPosTotalSkip = MetadataPos;
}

if (hasWriterMap)
case IndexRecord::WriterMapRecord:
{
auto p = m_WriterMap.emplace(m_StepsCount, WriterMapStruct());
auto &s = p.first->second;
Expand All @@ -775,60 +780,93 @@ size_t BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
}
m_LastMapStep = m_StepsCount;
m_LastWriterCount = s.WriterCount;
break;
}

if (m_SelectedSteps.IsSelected(m_AbsStepsInFile))
case IndexRecord::StepRecord:
{
m_WriterMapIndex.push_back(m_LastMapStep);
std::vector<uint64_t> ptrs;
const uint64_t MetadataPos = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t MetadataSize = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t FlushCount = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);

// pos in metadata in memory
ptrs.push_back(MetadataPos - MetadataPosTotalSkip);
ptrs.push_back(MetadataSize);
ptrs.push_back(FlushCount);
ptrs.push_back(position);
// absolute pos in file before read
ptrs.push_back(MetadataPos);
m_MetadataIndexTable[m_StepsCount] = ptrs;
#ifdef DUMPDATALOCINFO
for (uint64_t i = 0; i < m_WriterCount; i++)
if (!n)
{
size_t DataPosPos = ptrs[3];
std::cout << "Writer " << i << " data at ";
for (uint64_t j = 0; j < FlushCount; j++)
minfo_pos = MetadataPos; // initialize minfo_pos properly
MetadataPosTotalSkip = MetadataPos;
}

if (m_SelectedSteps.IsSelected(m_AbsStepsInFile))
{
m_WriterMapIndex.push_back(m_LastMapStep);

// pos in metadata in memory
ptrs.push_back(MetadataPos - MetadataPosTotalSkip);
ptrs.push_back(MetadataSize);
ptrs.push_back(FlushCount);
ptrs.push_back(position);
// absolute pos in file before read
ptrs.push_back(MetadataPos);
m_MetadataIndexTable[m_StepsCount] = ptrs;
#ifdef DUMPDATALOCINFO
for (uint64_t i = 0; i < m_WriterCount; i++)
{
size_t DataPosPos = ptrs[3];
std::cout << "Writer " << i << " data at ";
for (uint64_t j = 0; j < FlushCount; j++)
{
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
const uint64_t DataSize = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << " siz:" << DataSize
<< "; ";
}
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
const uint64_t DataSize = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << " siz:" << DataSize
<< "; ";
std::cout << "loc:" << DataPos << std::endl;
}
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << std::endl;
}
#endif
minfo_size += MetadataSize;
metadataSizeToRead += MetadataSize;
m_StepsCount++;
}
else
{
MetadataPosTotalSkip += MetadataSize;
if (minfo_size > 0)
minfo_size += MetadataSize;
metadataSizeToRead += MetadataSize;
m_StepsCount++;
}
else
{
m_FilteredMetadataInfo.push_back(
std::make_pair(minfo_pos, minfo_size));
MetadataPosTotalSkip += MetadataSize;
if (minfo_size > 0)
{
m_FilteredMetadataInfo.push_back(
std::make_pair(minfo_pos, minfo_size));
}
minfo_pos = MetadataPos;
minfo_size = 0;
}
minfo_pos = MetadataPos;
minfo_size = 0;

// skip over the writer -> data file offset records
position +=
sizeof(uint64_t) * m_LastWriterCount * ((2 * FlushCount) + 1);
++m_AbsStepsInFile;
++n;
break;
}
}
// dbg
if ((position - dbgRecordStartPosition) != (size_t)recordLength)
{
helper::Throw<std::runtime_error>(
"Engine", "BP5Reader", "ParseMetadataIndex",
"Record " + std::to_string(nrec) + " (id = " +
std::to_string(recordID) + ") has invalid length " +
std::to_string(recordLength) + ". We parsed " +
std::to_string(position - dbgRecordStartPosition) +
" bytes for this record"

// skip over the writer -> data file offset records
position +=
sizeof(uint64_t) * m_LastWriterCount * ((2 * FlushCount) + 1);
++m_AbsStepsInFile;
++n;
);
}
++nrec;
}
if (minfo_size > 0)
{
Expand Down
Loading