From 7a6836275ae3daa14cea90e1c156efeb74aec58d Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Wed, 11 Aug 2021 12:00:30 -0400 Subject: [PATCH] Delay Adding large Deferred Puts to output vector until last minute --- source/adios2/engine/bp5/BP5Writer.cpp | 2 - .../toolkit/format/bp5/BP5Serializer.cpp | 107 +++++++++++------- .../adios2/toolkit/format/bp5/BP5Serializer.h | 14 ++- 3 files changed, 77 insertions(+), 46 deletions(-) diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index 797690be4a..df9ea149b8 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -140,8 +140,6 @@ uint64_t BP5Writer::WriteMetadata( void BP5Writer::WriteData(format::BufferV *Data) { - format::BufferV::BufferV_iovec DataVec = Data->DataVec(); - (void)DataVec; switch (m_Parameters.AggregationType) { case (int)AggregationType::EveryoneWrites: diff --git a/source/adios2/toolkit/format/bp5/BP5Serializer.cpp b/source/adios2/toolkit/format/bp5/BP5Serializer.cpp index b7edeebc73..4800591949 100644 --- a/source/adios2/toolkit/format/bp5/BP5Serializer.cpp +++ b/source/adios2/toolkit/format/bp5/BP5Serializer.cpp @@ -377,14 +377,6 @@ BP5Serializer::CreateWriterRec(void *Variable, const char *Name, DataType Type, free(LocationsName); RecalcMarshalStorageSize(); -#ifdef NDEF - if ((ConfigParams->CompressionMethod == SstCompressZFP) && - ZFPcompressionPossible(Type, DimCount)) - { - Type = Int8; - ElemSize = 1; - } -#endif // To Data, add FMFields for ElemCount and Array matching _ArrayRec char *ElemCountName = ConcatName(Name, "ElemCount"); AddField(&Info.DataFields, &Info.DataFieldCount, ElemCountName, @@ -431,25 +423,61 @@ size_t BP5Serializer::CalcSize(const size_t Count, const size_t *Vals) return Elems; } -void BP5Serializer::PerformPuts() { CurDataBuffer->CopyExternalToInternal(); } +void BP5Serializer::PerformPuts() +{ + // Dump data for externs into iovec + DumpDeferredBlocks(); + + CurDataBuffer->CopyExternalToInternal(); +} + +void BP5Serializer::DumpDeferredBlocks() +{ + for (auto &Def : DeferredExterns) + { + MetaArrayRec *MetaEntry = + (MetaArrayRec *)((char *)(MetadataBuf) + Def.MetaOffset); + size_t DataOffset = m_PriorDataBufferSizeTotal + + CurDataBuffer->AddToVec(Def.DataSize, Def.Data, + Def.AlignReq, false); + MetaEntry->DataLocation[Def.BlockID] = DataOffset; + } + DeferredExterns.clear(); +} void BP5Serializer::Marshal(void *Variable, const char *Name, const DataType Type, size_t ElemSize, size_t DimCount, const size_t *Shape, const size_t *Count, const size_t *Offsets, const void *Data, bool Sync, - BufferV::BufferPos *span) + BufferV::BufferPos *Span) { FFSMetadataInfoStruct *MBase; BP5WriterRec Rec = LookupWriterRec(Variable); + bool DeferAddToVec; + if (!Rec) { Rec = CreateWriterRec(Variable, Name, Type, ElemSize, DimCount); } + if (!Sync && (Rec->DimCount != 0) && !Span) + { + /* + * If this is a big external block, we'll do everything except add it to + * the BufferV now, saving enough information to add it and patch back + * the DataLocation in the metadata in DumpDeferredBlocks() + */ + DeferAddToVec = true; + } + else + { + DeferAddToVec = false; + } + MBase = (struct FFSMetadataInfoStruct *)MetadataBuf; int AlreadyWritten = FFSBitfieldTest(MBase, Rec->FieldID); FFSBitfieldSet(MBase, Rec->FieldID); @@ -471,7 +499,7 @@ void BP5Serializer::Marshal(void *Variable, const char *Name, MetaArrayRec *MetaEntry = (MetaArrayRec *)((char *)(MetadataBuf) + Rec->MetaOffset); size_t ElemCount = CalcSize(DimCount, Count); - size_t DataOffset; + size_t DataOffset = 0; /* handle metadata */ MetaEntry->Dims = DimCount; @@ -481,21 +509,19 @@ void BP5Serializer::Marshal(void *Variable, const char *Name, "BP5Serializer:: Marshall without Prior Init"); } - if (span == nullptr) + if (Span == nullptr) { - DataOffset = m_PriorDataBufferSizeTotal + - CurDataBuffer->AddToVec(ElemCount * ElemSize, Data, - ElemSize, Sync); - if (AlreadyWritten) + if (!DeferAddToVec) { - printf("Marshalling %g at offset %ld\n", *(float *)Data, - DataOffset); + DataOffset = m_PriorDataBufferSizeTotal + + CurDataBuffer->AddToVec(ElemCount * ElemSize, Data, + ElemSize, Sync); } } else { - *span = CurDataBuffer->Allocate(ElemCount * ElemSize, ElemSize); - DataOffset = m_PriorDataBufferSizeTotal + span->globalPos; + *Span = CurDataBuffer->Allocate(ElemCount * ElemSize, ElemSize); + DataOffset = m_PriorDataBufferSizeTotal + Span->globalPos; } if (!AlreadyWritten) @@ -513,6 +539,12 @@ void BP5Serializer::Marshal(void *Variable, const char *Name, MetaEntry->Offsets = CopyDims(DimCount, Offsets); else MetaEntry->Offsets = NULL; + if (DeferAddToVec) + { + DeferredExtern rec = {Rec->MetaOffset, 0, Data, + ElemCount * ElemSize, ElemSize}; + DeferredExterns.push_back(rec); + } } else { @@ -530,26 +562,16 @@ void BP5Serializer::Marshal(void *Variable, const char *Name, (size_t *)realloc(MetaEntry->DataLocation, MetaEntry->BlockCount * sizeof(size_t)); MetaEntry->DataLocation[MetaEntry->BlockCount - 1] = DataOffset; + if (DeferAddToVec) + { + DeferredExterns.push_back({Rec->MetaOffset, + MetaEntry->BlockCount - 1, Data, + ElemCount * ElemSize, ElemSize}); + } if (Offsets) MetaEntry->Offsets = AppendDims( MetaEntry->Offsets, PreviousDBCount, DimCount, Offsets); } - - // if ((Stream->ConfigParams->CompressionMethod == - // SstCompressZFP) && - // ZFPcompressionPossible(Type, DimCount)) - // { -#ifdef ADIOS2_HAVE_ZFP - // /* this should never be true if ZFP is not available - // */ size_t ByteCount; char *Output = - // FFS_ZFPCompress(Stream, Rec->DimCount, Rec->Type, - // (void *)Data, Count, - // &ByteCount); - // DataEntry->ElemCount = ByteCount; - // DataEntry->Array = Output; -#endif - // } - // else } } @@ -610,12 +632,6 @@ void BP5Serializer::MarshalAttribute(const char *Name, const DataType Type, /* free(OffsetsName); */ /* RecalcMarshalStorageSize(Stream); */ - /* if ((Stream->ConfigParams->CompressionMethod == SstCompressZFP) && */ - /* ZFPcompressionPossible(Type, DimCount)) */ - /* { */ - /* Type = "char"; */ - /* ElemSize = 1; */ - /* } */ /* // To Data, add FMFields for ElemCount and Array matching _ArrayRec */ /* char *ElemCountName = ConcatName(Name, "ElemCount"); */ @@ -652,6 +668,9 @@ BufferV *BP5Serializer::ReinitStepData(BufferV *DataBuffer) { throw std::logic_error("BP5Serializer:: ReinitStep without prior Init"); } + // Dump data for externs into iovec + DumpDeferredBlocks(); + m_PriorDataBufferSizeTotal += CurDataBuffer->AddToVec( 0, NULL, sizeof(max_align_t), true); // output block size aligned @@ -715,6 +734,10 @@ BP5Serializer::TimestepInfo BP5Serializer::CloseTimestep(int timestep) throw std::logic_error( "BP5Serializer:: CloseTimestep without Prior Init"); } + + // Dump data for externs into iovec + DumpDeferredBlocks(); + MBase->DataBlockSize = CurDataBuffer->AddToVec( 0, NULL, sizeof(max_align_t), true); // output block size aligned diff --git a/source/adios2/toolkit/format/bp5/BP5Serializer.h b/source/adios2/toolkit/format/bp5/BP5Serializer.h index 6cfd300fba..73ee426d83 100644 --- a/source/adios2/toolkit/format/bp5/BP5Serializer.h +++ b/source/adios2/toolkit/format/bp5/BP5Serializer.h @@ -131,10 +131,18 @@ class BP5Serializer : virtual public BP5Base FMFormat AttributeFormat = NULL; void *AttributeData = NULL; int AttributeSize = 0; - int CompressZFP = 0; - attr_list ZFPParams = NULL; }; + struct DeferredExtern + { + size_t MetaOffset; + size_t BlockID; + const void *Data; + size_t DataSize; + size_t AlignReq; + }; + std::vector DeferredExterns; + FFSWriterMarshalBase Info; void *MetadataBuf = NULL; bool NewAttribute = false; @@ -176,6 +184,8 @@ class BP5Serializer : virtual public BP5Base size_t *AppendDims(size_t *OldDims, const size_t OldCount, const size_t Count, const size_t *Vals); + void DumpDeferredBlocks(); + typedef struct _ArrayRec { size_t ElemCount;