Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added decompression in sst reader for bp marshaling #767

Merged
merged 1 commit into from
Aug 2, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 84 additions & 39 deletions source/adios2/engine/sst/SstReader.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -32,70 +32,114 @@ void SstReader::ReadVariableBlocks(Variable<T> &variable)
for (typename Variable<T>::Info &blockInfo : variable.m_BlocksInfo)
{
T *originalBlockData = blockInfo.Data;

for (const auto &stepPair : blockInfo.StepBlockSubStreamsInfo)
{
const std::vector<helper::SubStreamBoxInfo> &subStreamsInfo =
stepPair.second;

for (const helper::SubStreamBoxInfo &subStreamInfo : subStreamsInfo)
{
const size_t rank = subStreamInfo.SubStreamID;
const auto &seeks = subStreamInfo.Seeks;
const size_t writerBlockStart = seeks.first;
const size_t writerBlockSize = seeks.second - seeks.first;
size_t elementOffset, dummy;
void *dp_info = NULL;
if (m_CurrentStepMetaData->DP_TimestepInfo)
{
dp_info = m_CurrentStepMetaData->DP_TimestepInfo[rank];
}
if (helper::IsIntersectionContiguousSubarray(
subStreamInfo.BlockBox, subStreamInfo.IntersectionBox,
m_BP3Deserializer->m_IsRowMajor, dummy) &&
helper::IsIntersectionContiguousSubarray(
helper::StartEndBox(
blockInfo.Start, blockInfo.Count,
m_BP3Deserializer->m_ReverseDimensions),
subStreamInfo.IntersectionBox,
m_BP3Deserializer->m_IsRowMajor, elementOffset))
// if remote data buffer is compressed
if (subStreamInfo.OperationsInfo.size() > 0)
{
const bool identity =
m_BP3Deserializer->IdentityOperation<T>(
blockInfo.Operations);
const helper::BlockOperationInfo &blockOperationInfo =
m_BP3Deserializer->InitPostOperatorBlockData(
subStreamInfo.OperationsInfo,
variable.m_RawMemory[1], identity);
// if identity is true, just read the entire block content
char *output =
identity ? reinterpret_cast<char *>(blockInfo.Data)
: variable.m_RawMemory[1].data();
auto ret = SstReadRemoteMemory(
m_Input, rank, CurrentStep(), writerBlockStart,
writerBlockSize, blockInfo.Data + elementOffset,
dp_info);
sstReadHandlers.push_back(ret);
m_Input, rank, CurrentStep(),
blockOperationInfo.PayloadOffset,
blockOperationInfo.PayloadSize, output, dp_info);
SstWaitForCompletion(m_Input, ret);
if (identity)
{
continue;
}
m_BP3Deserializer->GetPreOperatorBlockData(
variable.m_RawMemory[1], blockOperationInfo,
variable.m_RawMemory[0]);
m_BP3Deserializer->ClipContiguousMemory<T>(
blockInfo, variable.m_RawMemory[0],
subStreamInfo.BlockBox, subStreamInfo.IntersectionBox);
}
// if remote data buffer is not compressed
else
{
if (m_BufferNonContiguousVariables)
const auto &seeks = subStreamInfo.Seeks;
const size_t writerBlockStart = seeks.first;
const size_t writerBlockSize = seeks.second - seeks.first;
size_t elementOffset, dummy;
// if both input and output are contiguous memory then
// directly issue SstRead and put data in place
if (helper::IsIntersectionContiguousSubarray(
subStreamInfo.BlockBox,
subStreamInfo.IntersectionBox,
m_BP3Deserializer->m_IsRowMajor, dummy) &&
helper::IsIntersectionContiguousSubarray(
helper::StartEndBox(
blockInfo.Start, blockInfo.Count,
m_BP3Deserializer->m_ReverseDimensions),
subStreamInfo.IntersectionBox,
m_BP3Deserializer->m_IsRowMajor, elementOffset))
{
nonContiguousBpBuffer.emplace_back();
nonContiguousBpBuffer.back().VariableName =
variable.m_Name;
nonContiguousBpBuffer.back().ContiguousMemory.resize(
writerBlockSize);
nonContiguousBpBuffer.back().BlockBox =
subStreamInfo.BlockBox;
nonContiguousBpBuffer.back().IntersectionBox =
subStreamInfo.IntersectionBox;
auto ret = SstReadRemoteMemory(
m_Input, rank, CurrentStep(), writerBlockStart,
writerBlockSize, nonContiguousBpBuffer.back()
.ContiguousMemory.data(),
writerBlockSize, blockInfo.Data + elementOffset,
dp_info);
sstReadHandlers.push_back(ret);
}
// if either input or output is not contiguous memory then
// find all contiguous parts.
else
{
std::vector<char> contiguousMemory(writerBlockSize);
auto ret = SstReadRemoteMemory(
m_Input, rank, CurrentStep(), writerBlockStart,
writerBlockSize, contiguousMemory.data(), dp_info);
SstWaitForCompletion(m_Input, ret);
m_BP3Deserializer->ClipContiguousMemory<T>(
blockInfo, contiguousMemory, subStreamInfo.BlockBox,
subStreamInfo.IntersectionBox);
// if configured to buffer all sst read requests then
// put all read requests in a vector
if (m_BufferNonContiguousVariables)
{
nonContiguousBpBuffer.emplace_back();
nonContiguousBpBuffer.back().VariableName =
variable.m_Name;
nonContiguousBpBuffer.back()
.ContiguousMemory.resize(writerBlockSize);
nonContiguousBpBuffer.back().BlockBox =
subStreamInfo.BlockBox;
nonContiguousBpBuffer.back().IntersectionBox =
subStreamInfo.IntersectionBox;
auto ret = SstReadRemoteMemory(
m_Input, rank, CurrentStep(), writerBlockStart,
writerBlockSize, nonContiguousBpBuffer.back()
.ContiguousMemory.data(),
dp_info);
sstReadHandlers.push_back(ret);
}
// if not configured to buffer all sst read requests
// then immediately issue each SstRead. This is NOT
// recommended as blocking read will harm performance.
else
{
std::vector<char> contiguousMemory(writerBlockSize);
auto ret = SstReadRemoteMemory(
m_Input, rank, CurrentStep(), writerBlockStart,
writerBlockSize, contiguousMemory.data(),
dp_info);
SstWaitForCompletion(m_Input, ret);
m_BP3Deserializer->ClipContiguousMemory<T>(
blockInfo, contiguousMemory,
subStreamInfo.BlockBox,
subStreamInfo.IntersectionBox);
}
}
}
}
Expand All @@ -106,6 +150,7 @@ void SstReader::ReadVariableBlocks(Variable<T> &variable)
blockInfo.Data = originalBlockData;
}

// wait for all SstRead requests to finish
for (const auto &i : sstReadHandlers)
{
SstWaitForCompletion(m_Input, i);
Expand Down