Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay adding large Deferred Puts to output vector until last minute #2821

Merged
merged 1 commit into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ uint64_t BP5Writer::WriteMetadata(

void BP5Writer::WriteData(format::BufferV *Data)
{
format::BufferV::BufferV_iovec DataVec = Data->DataVec();
(void)DataVec;
switch (m_Parameters.AggregationType)
{
case (int)AggregationType::EveryoneWrites:
Expand Down
107 changes: 65 additions & 42 deletions source/adios2/toolkit/format/bp5/BP5Serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,6 @@ BP5Serializer::CreateWriterRec(void *Variable, const char *Name, DataType Type,
free(LocationsName);
RecalcMarshalStorageSize();

#ifdef NDEF
if ((ConfigParams->CompressionMethod == SstCompressZFP) &&
ZFPcompressionPossible(Type, DimCount))
{
Type = Int8;
ElemSize = 1;
}
#endif
// To Data, add FMFields for ElemCount and Array matching _ArrayRec
char *ElemCountName = ConcatName(Name, "ElemCount");
AddField(&Info.DataFields, &Info.DataFieldCount, ElemCountName,
Expand Down Expand Up @@ -431,25 +423,61 @@ size_t BP5Serializer::CalcSize(const size_t Count, const size_t *Vals)
return Elems;
}

void BP5Serializer::PerformPuts() { CurDataBuffer->CopyExternalToInternal(); }
void BP5Serializer::PerformPuts()
{
// Dump data for externs into iovec
DumpDeferredBlocks();

CurDataBuffer->CopyExternalToInternal();
}

void BP5Serializer::DumpDeferredBlocks()
{
for (auto &Def : DeferredExterns)
{
MetaArrayRec *MetaEntry =
(MetaArrayRec *)((char *)(MetadataBuf) + Def.MetaOffset);
size_t DataOffset = m_PriorDataBufferSizeTotal +
CurDataBuffer->AddToVec(Def.DataSize, Def.Data,
Def.AlignReq, false);
MetaEntry->DataLocation[Def.BlockID] = DataOffset;
}
DeferredExterns.clear();
}

void BP5Serializer::Marshal(void *Variable, const char *Name,
const DataType Type, size_t ElemSize,
size_t DimCount, const size_t *Shape,
const size_t *Count, const size_t *Offsets,
const void *Data, bool Sync,
BufferV::BufferPos *span)
BufferV::BufferPos *Span)
{

FFSMetadataInfoStruct *MBase;

BP5WriterRec Rec = LookupWriterRec(Variable);

bool DeferAddToVec;

if (!Rec)
{
Rec = CreateWriterRec(Variable, Name, Type, ElemSize, DimCount);
}

if (!Sync && (Rec->DimCount != 0) && !Span)
{
/*
* If this is a big external block, we'll do everything except add it to
* the BufferV now, saving enough information to add it and patch back
* the DataLocation in the metadata in DumpDeferredBlocks()
*/
DeferAddToVec = true;
}
else
{
DeferAddToVec = false;
}

MBase = (struct FFSMetadataInfoStruct *)MetadataBuf;
int AlreadyWritten = FFSBitfieldTest(MBase, Rec->FieldID);
FFSBitfieldSet(MBase, Rec->FieldID);
Expand All @@ -471,7 +499,7 @@ void BP5Serializer::Marshal(void *Variable, const char *Name,
MetaArrayRec *MetaEntry =
(MetaArrayRec *)((char *)(MetadataBuf) + Rec->MetaOffset);
size_t ElemCount = CalcSize(DimCount, Count);
size_t DataOffset;
size_t DataOffset = 0;

/* handle metadata */
MetaEntry->Dims = DimCount;
Expand All @@ -481,21 +509,19 @@ void BP5Serializer::Marshal(void *Variable, const char *Name,
"BP5Serializer:: Marshall without Prior Init");
}

if (span == nullptr)
if (Span == nullptr)
{
DataOffset = m_PriorDataBufferSizeTotal +
CurDataBuffer->AddToVec(ElemCount * ElemSize, Data,
ElemSize, Sync);
if (AlreadyWritten)
if (!DeferAddToVec)
{
printf("Marshalling %g at offset %ld\n", *(float *)Data,
DataOffset);
DataOffset = m_PriorDataBufferSizeTotal +
CurDataBuffer->AddToVec(ElemCount * ElemSize, Data,
ElemSize, Sync);
}
}
else
{
*span = CurDataBuffer->Allocate(ElemCount * ElemSize, ElemSize);
DataOffset = m_PriorDataBufferSizeTotal + span->globalPos;
*Span = CurDataBuffer->Allocate(ElemCount * ElemSize, ElemSize);
DataOffset = m_PriorDataBufferSizeTotal + Span->globalPos;
}

if (!AlreadyWritten)
Expand All @@ -513,6 +539,12 @@ void BP5Serializer::Marshal(void *Variable, const char *Name,
MetaEntry->Offsets = CopyDims(DimCount, Offsets);
else
MetaEntry->Offsets = NULL;
if (DeferAddToVec)
{
DeferredExtern rec = {Rec->MetaOffset, 0, Data,
ElemCount * ElemSize, ElemSize};
DeferredExterns.push_back(rec);
}
}
else
{
Expand All @@ -530,26 +562,16 @@ void BP5Serializer::Marshal(void *Variable, const char *Name,
(size_t *)realloc(MetaEntry->DataLocation,
MetaEntry->BlockCount * sizeof(size_t));
MetaEntry->DataLocation[MetaEntry->BlockCount - 1] = DataOffset;
if (DeferAddToVec)
{
DeferredExterns.push_back({Rec->MetaOffset,
MetaEntry->BlockCount - 1, Data,
ElemCount * ElemSize, ElemSize});
}
if (Offsets)
MetaEntry->Offsets = AppendDims(
MetaEntry->Offsets, PreviousDBCount, DimCount, Offsets);
}

// if ((Stream->ConfigParams->CompressionMethod ==
// SstCompressZFP) &&
// ZFPcompressionPossible(Type, DimCount))
// {
#ifdef ADIOS2_HAVE_ZFP
// /* this should never be true if ZFP is not available
// */ size_t ByteCount; char *Output =
// FFS_ZFPCompress(Stream, Rec->DimCount, Rec->Type,
// (void *)Data, Count,
// &ByteCount);
// DataEntry->ElemCount = ByteCount;
// DataEntry->Array = Output;
#endif
// }
// else
}
}

Expand Down Expand Up @@ -610,12 +632,6 @@ void BP5Serializer::MarshalAttribute(const char *Name, const DataType Type,
/* free(OffsetsName); */
/* RecalcMarshalStorageSize(Stream); */

/* if ((Stream->ConfigParams->CompressionMethod == SstCompressZFP) && */
/* ZFPcompressionPossible(Type, DimCount)) */
/* { */
/* Type = "char"; */
/* ElemSize = 1; */
/* } */
/* // To Data, add FMFields for ElemCount and Array matching _ArrayRec
*/
/* char *ElemCountName = ConcatName(Name, "ElemCount"); */
Expand Down Expand Up @@ -652,6 +668,9 @@ BufferV *BP5Serializer::ReinitStepData(BufferV *DataBuffer)
{
throw std::logic_error("BP5Serializer:: ReinitStep without prior Init");
}
// Dump data for externs into iovec
DumpDeferredBlocks();

m_PriorDataBufferSizeTotal += CurDataBuffer->AddToVec(
0, NULL, sizeof(max_align_t), true); // output block size aligned

Expand Down Expand Up @@ -715,6 +734,10 @@ BP5Serializer::TimestepInfo BP5Serializer::CloseTimestep(int timestep)
throw std::logic_error(
"BP5Serializer:: CloseTimestep without Prior Init");
}

// Dump data for externs into iovec
DumpDeferredBlocks();

MBase->DataBlockSize = CurDataBuffer->AddToVec(
0, NULL, sizeof(max_align_t), true); // output block size aligned

Expand Down
14 changes: 12 additions & 2 deletions source/adios2/toolkit/format/bp5/BP5Serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,18 @@ class BP5Serializer : virtual public BP5Base
FMFormat AttributeFormat = NULL;
void *AttributeData = NULL;
int AttributeSize = 0;
int CompressZFP = 0;
attr_list ZFPParams = NULL;
};

struct DeferredExtern
{
size_t MetaOffset;
size_t BlockID;
const void *Data;
size_t DataSize;
size_t AlignReq;
};
std::vector<DeferredExtern> DeferredExterns;

FFSWriterMarshalBase Info;
void *MetadataBuf = NULL;
bool NewAttribute = false;
Expand Down Expand Up @@ -176,6 +184,8 @@ class BP5Serializer : virtual public BP5Base
size_t *AppendDims(size_t *OldDims, const size_t OldCount,
const size_t Count, const size_t *Vals);

void DumpDeferredBlocks();

typedef struct _ArrayRec
{
size_t ElemCount;
Expand Down