From 807f36caa3445e39f2d0cc75b07092fa82b001c8 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Thu, 22 Oct 2020 15:29:35 -0400 Subject: [PATCH 1/6] Restructure FFS marshalling --- source/adios2/toolkit/sst/cp/cp_common.c | 4 +- source/adios2/toolkit/sst/cp/cp_internal.h | 5 +- source/adios2/toolkit/sst/cp/cp_reader.c | 36 +- source/adios2/toolkit/sst/cp/cp_writer.c | 8 + source/adios2/toolkit/sst/cp/ffs_marshal.c | 327 +++++++++++------- source/adios2/toolkit/sst/cp/ffs_marshal.h | 23 +- .../engine/staging-common/TestThreads.cpp | 36 +- 7 files changed, 297 insertions(+), 142 deletions(-) diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c index 16bb8623d6..baf721f1c9 100644 --- a/source/adios2/toolkit/sst/cp/cp_common.c +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -370,11 +370,11 @@ static FMField MetaDataPlusDPInfoList[] = { static FMField FFSFormatBlockList[] = { {"FormatServerRep", "char[FormatServerRepLen]", 1, FMOffset(struct FFSFormatBlock *, FormatServerRep)}, - {"FormatServerRepLen", "integer", sizeof(int), + {"FormatServerRepLen", "integer", sizeof(size_t), FMOffset(struct FFSFormatBlock *, FormatServerRepLen)}, {"FormatIDRep", "char[FormatIDRepLen]", 1, FMOffset(struct FFSFormatBlock *, FormatIDRep)}, - {"FormatIDRepLen", "integer", sizeof(int), + {"FormatIDRepLen", "integer", sizeof(size_t), FMOffset(struct FFSFormatBlock *, FormatIDRepLen)}, {"Next", "*FFSFormatBlock", sizeof(struct FFSFormatBlock), FMOffset(struct FFSFormatBlock *, Next)}, diff --git a/source/adios2/toolkit/sst/cp/cp_internal.h b/source/adios2/toolkit/sst/cp/cp_internal.h index aa1721e1a6..822d7e8143 100644 --- a/source/adios2/toolkit/sst/cp/cp_internal.h +++ b/source/adios2/toolkit/sst/cp/cp_internal.h @@ -135,6 +135,7 @@ struct _SstStream { CP_Info CPInfo; + struct timeval MarshalSum; SMPI_Comm mpiComm; enum StreamRole Role; @@ -260,9 +261,9 @@ struct _CP_DP_PairInfo struct FFSFormatBlock { char *FormatServerRep; - int FormatServerRepLen; + size_t FormatServerRepLen; char *FormatIDRep; - int FormatIDRepLen; + size_t FormatIDRepLen; struct FFSFormatBlock *Next; }; diff --git a/source/adios2/toolkit/sst/cp/cp_reader.c b/source/adios2/toolkit/sst/cp/cp_reader.c index 45f57dbe60..41184e4385 100644 --- a/source/adios2/toolkit/sst/cp/cp_reader.c +++ b/source/adios2/toolkit/sst/cp/cp_reader.c @@ -1746,8 +1746,16 @@ static SstStatusValue SstAdvanceStepPeer(SstStream Stream, SstStepMode mode, if (Stream->WriterConfigParams->MarshalMethod == SstMarshalFFS) { + static int count = 0; TAU_START("FFS marshaling case"); + struct timeval Start, Stop, Diff; + gettimeofday(&Start, NULL); FFSMarshalInstallMetadata(Stream, Entry->MetadataMsg); + gettimeofday(&Stop, NULL); + timersub(&Stop, &Start, &Diff); + timeradd(&Stream->MarshalSum, &Diff, &Stream->MarshalSum); + printf("Adding %g secs, count %d\n", + (double)Diff.tv_usec / 1e6 + Diff.tv_sec, count++); TAU_STOP("FFS marshaling case"); } Stream->ReaderTimestep = Entry->MetadataMsg->Timestep; @@ -1978,10 +1986,18 @@ static SstStatusValue SstAdvanceStepMin(SstStream Stream, SstStepMode mode, if ((Stream->WriterConfigParams->MarshalMethod == SstMarshalFFS) && (ReturnData->TSmsg)) { + int count = 0; CP_verbose( Stream, PerRankVerbose, "SstAdvanceStep installing precious metadata before exiting\n"); + struct timeval Start, Stop, Diff; + gettimeofday(&Start, NULL); FFSMarshalInstallPreciousMetadata(Stream, ReturnData->TSmsg); + gettimeofday(&Stop, NULL); + timersub(&Stop, &Start, &Diff); + printf("Adding %g secs, count2 %d\n", + (double)Diff.tv_usec / 1e6 + Diff.tv_sec, count++); + timeradd(&Stream->MarshalSum, &Diff, &Stream->MarshalSum); } free(free_block); @@ -2008,15 +2024,22 @@ static SstStatusValue SstAdvanceStepMin(SstStream Stream, SstStepMode mode, { NotifyDPArrivedMetadata(Stream, MetadataMsg); + Stream->ReaderTimestep = MetadataMsg->Timestep; if (Stream->WriterConfigParams->MarshalMethod == SstMarshalFFS) { - CP_verbose( - Stream, TraceVerbose, - "Calling install precious metadata from metadata block %p\n", - MetadataMsg); + static int count = 0; + CP_verbose(Stream, TraceVerbose, + "Calling install metadata from metadata block %p\n", + MetadataMsg); + struct timeval Start, Stop, Diff; + gettimeofday(&Start, NULL); FFSMarshalInstallMetadata(Stream, MetadataMsg); + gettimeofday(&Stop, NULL); + timersub(&Stop, &Start, &Diff); + timeradd(&Stream->MarshalSum, &Diff, &Stream->MarshalSum); + printf("Adding %g secs, count3 %d\n", + (double)Diff.tv_usec / 1e6 + Diff.tv_sec, count++); } - Stream->ReaderTimestep = MetadataMsg->Timestep; SstFullMetadata Mdata = malloc(sizeof(struct _SstFullMetadata)); memset(Mdata, 0, sizeof(struct _SstFullMetadata)); Mdata->WriterCohortSize = MetadataMsg->CohortSize; @@ -2099,6 +2122,9 @@ extern void SstReaderClose(SstStream Stream) struct timeval CloseTime, Diff; struct _ReaderCloseMsg Msg; /* wait until each reader rank has done SstReaderClose() */ + printf("FFSMetadataInstall time is %g secs\n", + (double)Stream->MarshalSum.tv_usec / 1e6 + + Stream->MarshalSum.tv_sec); SMPI_Barrier(Stream->mpiComm); gettimeofday(&CloseTime, NULL); timersub(&CloseTime, &Stream->ValidStartTime, &Diff); diff --git a/source/adios2/toolkit/sst/cp/cp_writer.c b/source/adios2/toolkit/sst/cp/cp_writer.c index 5d0c5caafb..b819fb29c5 100644 --- a/source/adios2/toolkit/sst/cp/cp_writer.c +++ b/source/adios2/toolkit/sst/cp/cp_writer.c @@ -2121,6 +2121,14 @@ extern void SstInternalProvideTimestep( Stream->DP_Interface->provideTimestep(&Svcs, Stream->DP_Stream, Data, LocalMetadata, Timestep, &DP_TimestepInfo); + if (Formats) + { + FFSFormatList tmp = Formats; + while (tmp) + { + tmp = tmp->Next; + } + } STREAM_MUTEX_LOCK(Stream); /* Md is the local contribution to MetaData */ diff --git a/source/adios2/toolkit/sst/cp/ffs_marshal.c b/source/adios2/toolkit/sst/cp/ffs_marshal.c index 397af318e3..8d65d4eba4 100644 --- a/source/adios2/toolkit/sst/cp/ffs_marshal.c +++ b/source/adios2/toolkit/sst/cp/ffs_marshal.c @@ -399,13 +399,14 @@ extern void FFSFreeMarshalData(SstStream Stream) free(Info->DataFieldLists); for (int i = 0; i < Info->VarCount; i++) { - free(Info->VarList[i].VarName); - free(Info->VarList[i].PerWriterMetaFieldDesc); - free(Info->VarList[i].PerWriterDataFieldDesc); - free(Info->VarList[i].PerWriterStart); - free(Info->VarList[i].PerWriterCounts); - free(Info->VarList[i].PerWriterIncomingData); - free(Info->VarList[i].PerWriterIncomingSize); + free(Info->VarList[i]->VarName); + free(Info->VarList[i]->PerWriterMetaFieldOffset); + free(Info->VarList[i]->PerWriterDataFieldDesc); + free(Info->VarList[i]->PerWriterStart); + free(Info->VarList[i]->PerWriterCounts); + free(Info->VarList[i]->PerWriterIncomingData); + free(Info->VarList[i]->PerWriterIncomingSize); + free(Info->VarList[i]); } if (Info->VarList) free(Info->VarList); @@ -602,9 +603,9 @@ static FFSVarRec LookupVarByKey(SstStream Stream, void *Key) for (int i = 0; i < Info->VarCount; i++) { - if (Info->VarList[i].Variable == Key) + if (Info->VarList[i]->Variable == Key) { - return &Info->VarList[i]; + return Info->VarList[i]; } } @@ -617,9 +618,9 @@ static FFSVarRec LookupVarByName(SstStream Stream, const char *Name) for (int i = 0; i < Info->VarCount; i++) { - if (strcmp(Info->VarList[i].VarName, Name) == 0) + if (strcmp(Info->VarList[i]->VarName, Name) == 0) { - return &Info->VarList[i]; + return Info->VarList[i]; } } @@ -631,21 +632,20 @@ static FFSVarRec CreateVarRec(SstStream Stream, const char *ArrayName) struct FFSReaderMarshalBase *Info = Stream->ReaderMarshalData; Info->VarList = realloc(Info->VarList, sizeof(Info->VarList[0]) * (Info->VarCount + 1)); - memset(&Info->VarList[Info->VarCount], 0, sizeof(Info->VarList[0])); - Info->VarList[Info->VarCount].VarName = strdup(ArrayName); - Info->VarList[Info->VarCount].PerWriterMetaFieldDesc = - calloc(sizeof(FMFieldList), Stream->WriterCohortSize); - Info->VarList[Info->VarCount].PerWriterDataFieldDesc = + FFSVarRec Ret = calloc(1, sizeof(struct FFSVarRec)); + Ret->VarName = strdup(ArrayName); + Ret->PerWriterMetaFieldOffset = + calloc(sizeof(size_t), Stream->WriterCohortSize); + Ret->PerWriterDataFieldDesc = calloc(sizeof(FMFieldList), Stream->WriterCohortSize); - Info->VarList[Info->VarCount].PerWriterStart = - calloc(sizeof(size_t *), Stream->WriterCohortSize); - Info->VarList[Info->VarCount].PerWriterCounts = - calloc(sizeof(size_t *), Stream->WriterCohortSize); - Info->VarList[Info->VarCount].PerWriterIncomingData = + Ret->PerWriterStart = calloc(sizeof(size_t *), Stream->WriterCohortSize); + Ret->PerWriterCounts = calloc(sizeof(size_t *), Stream->WriterCohortSize); + Ret->PerWriterIncomingData = calloc(sizeof(void *), Stream->WriterCohortSize); - Info->VarList[Info->VarCount].PerWriterIncomingSize = + Ret->PerWriterIncomingSize = calloc(sizeof(size_t), Stream->WriterCohortSize); - return &Info->VarList[Info->VarCount++]; + Info->VarList[Info->VarCount++] = Ret; + return Ret; } extern int SstFFSWriterBeginStep(SstStream Stream, int mode, @@ -686,13 +686,15 @@ extern int SstFFSGetDeferred(SstStream Stream, void *Variable, const char *Name, { void *IncomingDataBase = ((char *)Info->MetadataBaseAddrs[GetFromWriter]) + - Var->PerWriterMetaFieldDesc[GetFromWriter]->field_offset; - memcpy(Data, IncomingDataBase, - Var->PerWriterMetaFieldDesc[GetFromWriter]->field_size); + Var->PerWriterMetaFieldOffset[GetFromWriter]; + memcpy(Data, IncomingDataBase, Var->ElementSize); return 0; // No Sync needed } else { + CP_verbose(Stream, TraceVerbose, + "Get request, Name %s, Start %zu, Count %zu\n", Name, + Start[0], Count[0]); // Build request structure and enter it into requests list FFSArrayRequest Req = malloc(sizeof(*Req)); Req->VarRec = Var; @@ -724,9 +726,8 @@ extern int SstFFSGetLocalDeferred(SstStream Stream, void *Variable, { void *IncomingDataBase = ((char *)Info->MetadataBaseAddrs[GetFromWriter]) + - Var->PerWriterMetaFieldDesc[GetFromWriter]->field_offset; - memcpy(Data, IncomingDataBase, - Var->PerWriterMetaFieldDesc[GetFromWriter]->field_size); + Var->PerWriterMetaFieldOffset[GetFromWriter]; + memcpy(Data, IncomingDataBase, Var->ElementSize); return 0; // No Sync needed } else @@ -1425,10 +1426,11 @@ extern void SstFFSWriterEndStep(SstStream Stream, size_t Timestep) FMFormat Format = register_data_format(Info->LocalFMContext, &struct_list[0]); Info->MetaFormat = Format; - Block->FormatServerRep = - get_server_rep_FMformat(Format, &Block->FormatServerRepLen); - Block->FormatIDRep = - get_server_ID_FMformat(Format, &Block->FormatIDRepLen); + int size; + Block->FormatServerRep = get_server_rep_FMformat(Format, &size); + Block->FormatServerRepLen = size; + Block->FormatIDRep = get_server_ID_FMformat(Format, &size); + Block->FormatIDRepLen = size; Block->Next = NULL; Formats = Block; } @@ -1447,10 +1449,11 @@ extern void SstFFSWriterEndStep(SstStream Stream, size_t Timestep) FMFormat Format = register_data_format(Info->LocalFMContext, &struct_list[0]); Info->DataFormat = Format; - Block->FormatServerRep = - get_server_rep_FMformat(Format, &Block->FormatServerRepLen); - Block->FormatIDRep = - get_server_ID_FMformat(Format, &Block->FormatIDRepLen); + int size; + Block->FormatServerRep = get_server_rep_FMformat(Format, &size); + Block->FormatServerRepLen = size; + Block->FormatIDRep = get_server_ID_FMformat(Format, &size); + Block->FormatIDRepLen = size; Block->Next = NULL; if (Formats) { @@ -1469,10 +1472,11 @@ extern void SstFFSWriterEndStep(SstStream Stream, size_t Timestep) Info->LocalFMContext, "Attributes", Info->AttributeFields, FMstruct_size_field_list(Info->AttributeFields, sizeof(char *))); AttributeFormat = Format; - Block->FormatServerRep = - get_server_rep_FMformat(Format, &Block->FormatServerRepLen); - Block->FormatIDRep = - get_server_ID_FMformat(Format, &Block->FormatIDRepLen); + int size; + Block->FormatServerRep = get_server_rep_FMformat(Format, &size); + Block->FormatServerRepLen = size; + Block->FormatIDRep = get_server_ID_FMformat(Format, &size); + Block->FormatIDRepLen = size; Block->Next = NULL; if (Formats) { @@ -1688,6 +1692,8 @@ static int NameIndicatesArray(const char *Name) return (strcmp("Dims", Name + Len - 4) == 0); } +static void ClearPriorControl(SstStream Stream); + extern void FFSClearTimestepData(SstStream Stream) { @@ -1709,15 +1715,126 @@ extern void FFSClearTimestepData(SstStream Stream) sizeof(Info->DataFieldLists[0]) * Stream->WriterCohortSize); for (int i = 0; i < Info->VarCount; i++) { - free(Info->VarList[i].VarName); - free(Info->VarList[i].PerWriterMetaFieldDesc); - free(Info->VarList[i].PerWriterDataFieldDesc); - free(Info->VarList[i].PerWriterStart); - free(Info->VarList[i].PerWriterCounts); - free(Info->VarList[i].PerWriterIncomingData); - free(Info->VarList[i].PerWriterIncomingSize); + Info->VarList[i]->Variable = NULL; + /* free(Info->VarList[i]->VarName); */ + /* free(Info->VarList[i]->PerWriterDataFieldDesc); */ + /* free(Info->VarList[i]->PerWriterStart); */ + /* free(Info->VarList[i]->PerWriterCounts); */ + /* free(Info->VarList[i]->PerWriterIncomingData); */ + /* free(Info->VarList[i]->PerWriterIncomingSize); */ + /* free(Info->VarList[i]); */ + } + /* Info->VarCount = 0; */ + ClearPriorControl(Stream); +} + +static struct ControlInfo *BuildControl(SstStream Stream, FMFormat Format) +{ + struct FFSReaderMarshalBase *Info = Stream->ReaderMarshalData; + FMStructDescList FormatList = format_list_of_FMFormat(Format); + FMFieldList FieldList = FormatList[0].field_list; + while (strncmp(FieldList->field_name, "BitField", 8) == 0) + FieldList++; + while (FieldList->field_name && + (strncmp(FieldList->field_name, "DataBlockSize", 8) == 0)) + FieldList++; + int i = 0; + int ControlCount = 0; + struct ControlInfo *ret = malloc(sizeof(*ret)); + ret->Format = Format; + while (FieldList[i].field_name) + { + ret = realloc(ret, + sizeof(*ret) + ControlCount * sizeof(struct ControlInfo)); + struct ControlStruct *C = &(ret->Controls[ControlCount]); + ControlCount++; + + C->FieldIndex = i; + C->FieldOffset = FieldList[i].field_offset; + + if (NameIndicatesArray(FieldList[i].field_name)) + { + char *ArrayName; + int Type; + FFSVarRec VarRec = NULL; + int ElementSize; + C->IsArray = 1; + BreakdownArrayName(FieldList[i].field_name, &ArrayName, &Type, + &ElementSize); + // if (WriterRank != 0) + // { + VarRec = LookupVarByName(Stream, ArrayName); + // } + if (!VarRec) + { + VarRec = CreateVarRec(Stream, ArrayName); + VarRec->Type = Type; + VarRec->ElementSize = ElementSize; + C->ElementSize = ElementSize; + VarRec->VarName = strdup(ArrayName); + } + i += 4; + free(ArrayName); + C->VarRec = VarRec; + } + else + { + /* simple field */ + char *FieldName = strdup(FieldList[i].field_name + 4); // skip SST_ + FFSVarRec VarRec = NULL; + C->IsArray = 0; + VarRec = LookupVarByName(Stream, FieldName); + if (!VarRec) + { + int Type = TranslateFFSType2ADIOS(FieldList[i].field_type, + FieldList[i].field_size); + VarRec = CreateVarRec(Stream, FieldName); + VarRec->DimCount = 0; + C->Type = Type; + VarRec->Type = Type; + } + VarRec->ElementSize = FieldList[i].field_size; + C->ElementSize = FieldList[i].field_size; + C->VarRec = VarRec; + free(FieldName); + i++; + } + } + ret->ControlCount = ControlCount; + ret->Next = Info->ControlBlocks; + Info->ControlBlocks = ret; + return ret; +} + +static struct ControlInfo *GetPriorControl(SstStream Stream, FMFormat Format) +{ + struct FFSReaderMarshalBase *Info = Stream->ReaderMarshalData; + struct ControlInfo *tmp = Info->ControlBlocks; + while (tmp) + { + if (tmp->Format == Format) + { + return tmp; + } + tmp = tmp->Next; } - Info->VarCount = 0; + return NULL; +} + +static void ClearPriorControl(SstStream Stream) +{ + struct FFSReaderMarshalBase *Info = Stream->ReaderMarshalData; + struct ControlInfo *tmp = Info->ControlBlocks; + /* while (tmp) */ + /* { */ + /* for (int i = 0; i < tmp->ControlCount; i++) { */ + /* tmp->Controls[i].VarRec = NULL; */ + /* } */ + /* tmp = tmp->Next; */ + /* } */ + /* if (Info->ControlBlocks) */ + /* free(Info->ControlBlocks); */ + /* Info->ControlBlocks = NULL; */ } static void BuildVarList(SstStream Stream, TSMetadataMsg MetaData, @@ -1798,9 +1915,8 @@ static void BuildVarList(SstStream Stream, TSMetadataMsg MetaData, Stream->ReaderFFSContext, MetaData->Metadata[WriterRank].block, MetaData->Metadata[WriterRank].DataSize); BaseData = malloc(DecodedLength); - FFSBuffer decode_buf = create_fixed_FFSBuffer(BaseData, DecodedLength); FFSdecode_to_buffer(Stream->ReaderFFSContext, - MetaData->Metadata[WriterRank].block, decode_buf); + MetaData->Metadata[WriterRank].block, BaseData); } if (DumpMetadata == -1) { @@ -1813,39 +1929,29 @@ static void BuildVarList(SstStream Stream, TSMetadataMsg MetaData, FMdump_data(FMFormat_of_original(FFSformat), BaseData, 1024000); printf("\n\n"); } + struct ControlInfo *Control; + struct ControlStruct *ControlArray; + Control = GetPriorControl(Stream, FMFormat_of_original(FFSformat)); + if (!Control) + { + Control = BuildControl(Stream, FMFormat_of_original(FFSformat)); + } + ControlArray = &Control->Controls[0]; + Info->MetadataBaseAddrs[WriterRank] = BaseData; - FormatList = format_list_of_FMFormat(FMFormat_of_original(FFSformat)); - FieldList = FormatList[0].field_list; - while (strncmp(FieldList->field_name, "BitField", 8) == 0) - FieldList++; - while (FieldList->field_name && - (strncmp(FieldList->field_name, "DataBlockSize", 8) == 0)) - FieldList++; - int i = 0; - int j = 0; - while (FieldList[i].field_name) + for (int i = 0; i < Control->ControlCount; i++) { - void *field_data = (char *)BaseData + FieldList[i].field_offset; - if (NameIndicatesArray(FieldList[i].field_name)) + int FieldIndex = ControlArray[i].FieldIndex; + int FieldOffset = ControlArray[i].FieldOffset; + FFSVarRec VarRec = ControlArray[i].VarRec; + void *field_data = (char *)BaseData + FieldOffset; + if (!FFSBitfieldTest(BaseData, i)) + { + continue; + } + if (ControlArray[i].IsArray) { MetaArrayRec *meta_base = field_data; - char *ArrayName; - int Type; - FFSVarRec VarRec = NULL; - int ElementSize; - if (!FFSBitfieldTest(BaseData, j)) - { - /* only work with fields that were written */ - i += 4; - j++; - continue; - } - BreakdownArrayName(FieldList[i].field_name, &ArrayName, &Type, - &ElementSize); - if (WriterRank != 0) - { - VarRec = LookupVarByName(Stream, ArrayName); - } if ((meta_base->Dims > 1) && (Stream->WriterConfigParams->IsRowMajor != Stream->ConfigParams->IsRowMajor)) @@ -1856,63 +1962,34 @@ static void BuildVarList(SstStream Stream, TSMetadataMsg MetaData, ReverseDimensions(meta_base->Count, meta_base->Dims); ReverseDimensions(meta_base->Offsets, meta_base->Dims); } - if (!VarRec) - { - VarRec = CreateVarRec(Stream, ArrayName); - VarRec->DimCount = meta_base->Dims; - VarRec->Type = Type; - VarRec->ElementSize = ElementSize; - VarRec->Variable = Stream->ArraySetupUpcall( - Stream->SetupUpcallReader, ArrayName, Type, meta_base->Dims, - meta_base->Shape, meta_base->Offsets, meta_base->Count); - } if (WriterRank == 0) { VarRec->GlobalDims = meta_base->Shape; } + if (!VarRec->Variable) + { + VarRec->Variable = Stream->ArraySetupUpcall( + Stream->SetupUpcallReader, VarRec->VarName, VarRec->Type, + meta_base->Dims, meta_base->Shape, meta_base->Offsets, + meta_base->Count); + } + VarRec->DimCount = meta_base->Dims; VarRec->PerWriterStart[WriterRank] = meta_base->Offsets; VarRec->PerWriterCounts[WriterRank] = meta_base->Count; - VarRec->PerWriterMetaFieldDesc[WriterRank] = &FieldList[i]; VarRec->PerWriterDataFieldDesc[WriterRank] = NULL; - Stream->ArrayBlocksInfoUpcall(Stream->SetupUpcallReader, - VarRec->Variable, Type, WriterRank, - meta_base->Dims, meta_base->Shape, - meta_base->Offsets, meta_base->Count); - i += 4; - free(ArrayName); + Stream->ArrayBlocksInfoUpcall( + Stream->SetupUpcallReader, VarRec->Variable, VarRec->Type, + WriterRank, meta_base->Dims, meta_base->Shape, + meta_base->Offsets, meta_base->Count); } else { - /* simple field */ - char *FieldName = strdup(FieldList[i].field_name + 4); // skip SST_ - FFSVarRec VarRec = NULL; - if (!FFSBitfieldTest(BaseData, j)) - { - /* only work with fields that were written */ - i++; - j++; - continue; - } - if (WriterRank != 0) - { - VarRec = LookupVarByName(Stream, FieldName); - } - if (!VarRec) - { - int Type = TranslateFFSType2ADIOS(FieldList[i].field_type, - FieldList[i].field_size); - VarRec = CreateVarRec(Stream, FieldName); - VarRec->DimCount = 0; - VarRec->Variable = Stream->VarSetupUpcall( - Stream->SetupUpcallReader, FieldName, Type, field_data); - } - VarRec->PerWriterMetaFieldDesc[WriterRank] = &FieldList[i]; + VarRec->Variable = Stream->VarSetupUpcall(Stream->SetupUpcallReader, + VarRec->VarName, + VarRec->Type, field_data); VarRec->PerWriterDataFieldDesc[WriterRank] = NULL; - free(FieldName); - i++; + VarRec->PerWriterMetaFieldOffset[WriterRank] = FieldOffset; } - /* real variable count is in j, i tracks the entries in the metadata */ - j++; } } diff --git a/source/adios2/toolkit/sst/cp/ffs_marshal.h b/source/adios2/toolkit/sst/cp/ffs_marshal.h index 14b1421105..5fb82efe5e 100644 --- a/source/adios2/toolkit/sst/cp/ffs_marshal.h +++ b/source/adios2/toolkit/sst/cp/ffs_marshal.h @@ -52,7 +52,7 @@ typedef struct FFSVarRec { void *Variable; char *VarName; - FMFieldList *PerWriterMetaFieldDesc; + size_t *PerWriterMetaFieldOffset; FMFieldList *PerWriterDataFieldDesc; size_t DimCount; int Type; @@ -96,10 +96,28 @@ typedef struct FFSReaderPerWriterRec DP_CompletionHandle ReadHandle; } FFSReaderPerWriterRec; +struct ControlStruct +{ + int FieldIndex; + int FieldOffset; + FFSVarRec VarRec; + int IsArray; + int Type; + int ElementSize; +}; + +struct ControlInfo +{ + FMFormat Format; + int ControlCount; + struct ControlInfo *Next; + struct ControlStruct Controls[1]; +}; + struct FFSReaderMarshalBase { int VarCount; - FFSVarRec VarList; + FFSVarRec *VarList; FMContext LocalFMContext; FFSArrayRequest PendingVarRequests; @@ -110,6 +128,7 @@ struct FFSReaderMarshalBase FMFieldList *DataFieldLists; FFSReaderPerWriterRec *WriterInfo; + struct ControlInfo *ControlBlocks; }; extern char *FFS_ZFPCompress(SstStream Stream, const size_t DimCount, int Type, diff --git a/testing/adios2/engine/staging-common/TestThreads.cpp b/testing/adios2/engine/staging-common/TestThreads.cpp index 0150dea870..32965b71e4 100644 --- a/testing/adios2/engine/staging-common/TestThreads.cpp +++ b/testing/adios2/engine/staging-common/TestThreads.cpp @@ -16,7 +16,7 @@ int value_errors = 0; std::mutex StdOutMtx; -int Read() +int Read(int ID) { adios2::ADIOS adios; adios2::IO io = adios.DeclareIO("IO"); @@ -35,7 +35,8 @@ int Read() try { - adios2::Engine Reader = io.Open("communicate", adios2::Mode::Read); + std::string FName = "File" + std::to_string(ID); + adios2::Engine Reader = io.Open(FName, adios2::Mode::Read); { std::lock_guard guard(StdOutMtx); std::cout << "Reader: passed Open" << std::endl; @@ -91,7 +92,7 @@ int Read() return true; } -bool Write() +bool Write(int ID) { adios2::ADIOS adios; adios2::IO io = adios.DeclareIO("IO"); @@ -109,7 +110,8 @@ bool Write() try { - adios2::Engine Writer = io.Open("communicate", adios2::Mode::Write); + std::string FName = "File" + std::to_string(ID); + adios2::Engine Writer = io.Open(FName, adios2::Mode::Write); { std::lock_guard guard(StdOutMtx); @@ -143,8 +145,8 @@ class TestThreads : public ::testing::Test TEST_F(TestThreads, Basic) { - auto read_fut = std::async(std::launch::async, Read); - auto write_fut = std::async(std::launch::async, Write); + auto read_fut = std::async(std::launch::async, Read, 0); + auto write_fut = std::async(std::launch::async, Write, 0); bool reader_success = read_fut.get(); bool writer_success = write_fut.get(); EXPECT_TRUE(reader_success); @@ -153,6 +155,28 @@ TEST_F(TestThreads, Basic) << "We got " << value_errors << " erroneous values at the reader"; } +TEST_F(TestThreads, Repeated) +{ + auto high_write_fut = std::async(std::launch::async, Write, 0); + for (int i = 0; i < 1024; i++) { + auto read_fut = std::async(std::launch::async, Read, i+1); + auto write_fut = std::async(std::launch::async, Write, i+1); + bool reader_success = read_fut.get(); + bool writer_success = write_fut.get(); + EXPECT_TRUE(reader_success); + EXPECT_TRUE(writer_success); + EXPECT_EQ(value_errors, 0) + << "We got " << value_errors << " erroneous values at the reader"; + std::cout << "finished pair " << i << std::endl; + } + auto high_read_fut = std::async(std::launch::async, Read, 0); + bool reader_success = high_read_fut.get(); + bool writer_success = high_write_fut.get(); + EXPECT_TRUE(reader_success); + EXPECT_TRUE(writer_success); + EXPECT_EQ(value_errors, 0); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); From 7add3169b149314837c757c85a70a7f36ec472d3 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Tue, 27 Oct 2020 12:44:01 -0400 Subject: [PATCH 2/6] Remove temporary timing code --- source/adios2/toolkit/sst/cp/cp_internal.h | 1 - source/adios2/toolkit/sst/cp/cp_reader.c | 24 ---------------------- 2 files changed, 25 deletions(-) diff --git a/source/adios2/toolkit/sst/cp/cp_internal.h b/source/adios2/toolkit/sst/cp/cp_internal.h index 822d7e8143..d8d2e9d9ab 100644 --- a/source/adios2/toolkit/sst/cp/cp_internal.h +++ b/source/adios2/toolkit/sst/cp/cp_internal.h @@ -135,7 +135,6 @@ struct _SstStream { CP_Info CPInfo; - struct timeval MarshalSum; SMPI_Comm mpiComm; enum StreamRole Role; diff --git a/source/adios2/toolkit/sst/cp/cp_reader.c b/source/adios2/toolkit/sst/cp/cp_reader.c index 41184e4385..5722d45941 100644 --- a/source/adios2/toolkit/sst/cp/cp_reader.c +++ b/source/adios2/toolkit/sst/cp/cp_reader.c @@ -1748,14 +1748,7 @@ static SstStatusValue SstAdvanceStepPeer(SstStream Stream, SstStepMode mode, { static int count = 0; TAU_START("FFS marshaling case"); - struct timeval Start, Stop, Diff; - gettimeofday(&Start, NULL); FFSMarshalInstallMetadata(Stream, Entry->MetadataMsg); - gettimeofday(&Stop, NULL); - timersub(&Stop, &Start, &Diff); - timeradd(&Stream->MarshalSum, &Diff, &Stream->MarshalSum); - printf("Adding %g secs, count %d\n", - (double)Diff.tv_usec / 1e6 + Diff.tv_sec, count++); TAU_STOP("FFS marshaling case"); } Stream->ReaderTimestep = Entry->MetadataMsg->Timestep; @@ -1990,14 +1983,7 @@ static SstStatusValue SstAdvanceStepMin(SstStream Stream, SstStepMode mode, CP_verbose( Stream, PerRankVerbose, "SstAdvanceStep installing precious metadata before exiting\n"); - struct timeval Start, Stop, Diff; - gettimeofday(&Start, NULL); FFSMarshalInstallPreciousMetadata(Stream, ReturnData->TSmsg); - gettimeofday(&Stop, NULL); - timersub(&Stop, &Start, &Diff); - printf("Adding %g secs, count2 %d\n", - (double)Diff.tv_usec / 1e6 + Diff.tv_sec, count++); - timeradd(&Stream->MarshalSum, &Diff, &Stream->MarshalSum); } free(free_block); @@ -2031,14 +2017,7 @@ static SstStatusValue SstAdvanceStepMin(SstStream Stream, SstStepMode mode, CP_verbose(Stream, TraceVerbose, "Calling install metadata from metadata block %p\n", MetadataMsg); - struct timeval Start, Stop, Diff; - gettimeofday(&Start, NULL); FFSMarshalInstallMetadata(Stream, MetadataMsg); - gettimeofday(&Stop, NULL); - timersub(&Stop, &Start, &Diff); - timeradd(&Stream->MarshalSum, &Diff, &Stream->MarshalSum); - printf("Adding %g secs, count3 %d\n", - (double)Diff.tv_usec / 1e6 + Diff.tv_sec, count++); } SstFullMetadata Mdata = malloc(sizeof(struct _SstFullMetadata)); memset(Mdata, 0, sizeof(struct _SstFullMetadata)); @@ -2122,9 +2101,6 @@ extern void SstReaderClose(SstStream Stream) struct timeval CloseTime, Diff; struct _ReaderCloseMsg Msg; /* wait until each reader rank has done SstReaderClose() */ - printf("FFSMetadataInstall time is %g secs\n", - (double)Stream->MarshalSum.tv_usec / 1e6 + - Stream->MarshalSum.tv_sec); SMPI_Barrier(Stream->mpiComm); gettimeofday(&CloseTime, NULL); timersub(&CloseTime, &Stream->ValidStartTime, &Diff); From da65c9cd1cbab98518a439b333ea1d37cd87ded6 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Tue, 27 Oct 2020 13:26:49 -0400 Subject: [PATCH 3/6] Format --- .../engine/staging-common/TestThreads.cpp | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/testing/adios2/engine/staging-common/TestThreads.cpp b/testing/adios2/engine/staging-common/TestThreads.cpp index 32965b71e4..7d9fbdafcb 100644 --- a/testing/adios2/engine/staging-common/TestThreads.cpp +++ b/testing/adios2/engine/staging-common/TestThreads.cpp @@ -35,7 +35,7 @@ int Read(int ID) try { - std::string FName = "File" + std::to_string(ID); + std::string FName = "File" + std::to_string(ID); adios2::Engine Reader = io.Open(FName, adios2::Mode::Read); { std::lock_guard guard(StdOutMtx); @@ -110,7 +110,7 @@ bool Write(int ID) try { - std::string FName = "File" + std::to_string(ID); + std::string FName = "File" + std::to_string(ID); adios2::Engine Writer = io.Open(FName, adios2::Mode::Write); { @@ -158,16 +158,17 @@ TEST_F(TestThreads, Basic) TEST_F(TestThreads, Repeated) { auto high_write_fut = std::async(std::launch::async, Write, 0); - for (int i = 0; i < 1024; i++) { - auto read_fut = std::async(std::launch::async, Read, i+1); - auto write_fut = std::async(std::launch::async, Write, i+1); - bool reader_success = read_fut.get(); - bool writer_success = write_fut.get(); - EXPECT_TRUE(reader_success); - EXPECT_TRUE(writer_success); - EXPECT_EQ(value_errors, 0) - << "We got " << value_errors << " erroneous values at the reader"; - std::cout << "finished pair " << i << std::endl; + for (int i = 0; i < 1024; i++) + { + auto read_fut = std::async(std::launch::async, Read, i + 1); + auto write_fut = std::async(std::launch::async, Write, i + 1); + bool reader_success = read_fut.get(); + bool writer_success = write_fut.get(); + EXPECT_TRUE(reader_success); + EXPECT_TRUE(writer_success); + EXPECT_EQ(value_errors, 0) + << "We got " << value_errors << " erroneous values at the reader"; + std::cout << "finished pair " << i << std::endl; } auto high_read_fut = std::async(std::launch::async, Read, 0); bool reader_success = high_read_fut.get(); From 46ea86f85356f7409d7efd3ab38f3bc60551de9a Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Tue, 27 Oct 2020 14:10:15 -0400 Subject: [PATCH 4/6] Don't run Repeated thread test on a daily basis --- .../engine/staging-common/TestThreads.cpp | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/testing/adios2/engine/staging-common/TestThreads.cpp b/testing/adios2/engine/staging-common/TestThreads.cpp index 7d9fbdafcb..d4d4af26a8 100644 --- a/testing/adios2/engine/staging-common/TestThreads.cpp +++ b/testing/adios2/engine/staging-common/TestThreads.cpp @@ -155,28 +155,32 @@ TEST_F(TestThreads, Basic) << "We got " << value_errors << " erroneous values at the reader"; } -TEST_F(TestThreads, Repeated) -{ - auto high_write_fut = std::async(std::launch::async, Write, 0); - for (int i = 0; i < 1024; i++) - { - auto read_fut = std::async(std::launch::async, Read, i + 1); - auto write_fut = std::async(std::launch::async, Write, i + 1); - bool reader_success = read_fut.get(); - bool writer_success = write_fut.get(); - EXPECT_TRUE(reader_success); - EXPECT_TRUE(writer_success); - EXPECT_EQ(value_errors, 0) - << "We got " << value_errors << " erroneous values at the reader"; - std::cout << "finished pair " << i << std::endl; - } - auto high_read_fut = std::async(std::launch::async, Read, 0); - bool reader_success = high_read_fut.get(); - bool writer_success = high_write_fut.get(); - EXPECT_TRUE(reader_success); - EXPECT_TRUE(writer_success); - EXPECT_EQ(value_errors, 0); -} +// This test tries to push up to the limits to see if we're leaking FDs, but it +// runs slowly, commenting it out until needed. +// +// TEST_F(TestThreads, Repeated) +// { +// auto high_write_fut = std::async(std::launch::async, Write, 0); +// for (int i = 0; i < 1024; i++) +// { +// auto read_fut = std::async(std::launch::async, Read, i + 1); +// auto write_fut = std::async(std::launch::async, Write, i + 1); +// bool reader_success = read_fut.get(); +// bool writer_success = write_fut.get(); +// EXPECT_TRUE(reader_success); +// EXPECT_TRUE(writer_success); +// EXPECT_EQ(value_errors, 0) +// << "We got " << value_errors << " erroneous values at the +// reader"; +// std::cout << "finished pair " << i << std::endl; +// } +// auto high_read_fut = std::async(std::launch::async, Read, 0); +// bool reader_success = high_read_fut.get(); +// bool writer_success = high_write_fut.get(); +// EXPECT_TRUE(reader_success); +// EXPECT_TRUE(writer_success); +// EXPECT_EQ(value_errors, 0); +// } int main(int argc, char **argv) { From 95299b7c1ae195cb53d6f2ad96d07d634ed38fc4 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Wed, 28 Oct 2020 12:39:33 -0400 Subject: [PATCH 5/6] Leaks --- source/adios2/toolkit/sst/cp/ffs_marshal.c | 41 +++++----------------- 1 file changed, 9 insertions(+), 32 deletions(-) diff --git a/source/adios2/toolkit/sst/cp/ffs_marshal.c b/source/adios2/toolkit/sst/cp/ffs_marshal.c index 8d65d4eba4..1ad23cc0dc 100644 --- a/source/adios2/toolkit/sst/cp/ffs_marshal.c +++ b/source/adios2/toolkit/sst/cp/ffs_marshal.c @@ -411,6 +411,14 @@ extern void FFSFreeMarshalData(SstStream Stream) if (Info->VarList) free(Info->VarList); + struct ControlInfo *tmp = Info->ControlBlocks; + Info->ControlBlocks = NULL; + while (tmp) + { + struct ControlInfo *next = tmp->Next; + free(tmp); + tmp = next; + } free(Info); Stream->ReaderMarshalData = NULL; } @@ -1692,8 +1700,6 @@ static int NameIndicatesArray(const char *Name) return (strcmp("Dims", Name + Len - 4) == 0); } -static void ClearPriorControl(SstStream Stream); - extern void FFSClearTimestepData(SstStream Stream) { @@ -1716,16 +1722,7 @@ extern void FFSClearTimestepData(SstStream Stream) for (int i = 0; i < Info->VarCount; i++) { Info->VarList[i]->Variable = NULL; - /* free(Info->VarList[i]->VarName); */ - /* free(Info->VarList[i]->PerWriterDataFieldDesc); */ - /* free(Info->VarList[i]->PerWriterStart); */ - /* free(Info->VarList[i]->PerWriterCounts); */ - /* free(Info->VarList[i]->PerWriterIncomingData); */ - /* free(Info->VarList[i]->PerWriterIncomingSize); */ - /* free(Info->VarList[i]); */ - } - /* Info->VarCount = 0; */ - ClearPriorControl(Stream); + } } static struct ControlInfo *BuildControl(SstStream Stream, FMFormat Format) @@ -1771,7 +1768,6 @@ static struct ControlInfo *BuildControl(SstStream Stream, FMFormat Format) VarRec->Type = Type; VarRec->ElementSize = ElementSize; C->ElementSize = ElementSize; - VarRec->VarName = strdup(ArrayName); } i += 4; free(ArrayName); @@ -1821,28 +1817,10 @@ static struct ControlInfo *GetPriorControl(SstStream Stream, FMFormat Format) return NULL; } -static void ClearPriorControl(SstStream Stream) -{ - struct FFSReaderMarshalBase *Info = Stream->ReaderMarshalData; - struct ControlInfo *tmp = Info->ControlBlocks; - /* while (tmp) */ - /* { */ - /* for (int i = 0; i < tmp->ControlCount; i++) { */ - /* tmp->Controls[i].VarRec = NULL; */ - /* } */ - /* tmp = tmp->Next; */ - /* } */ - /* if (Info->ControlBlocks) */ - /* free(Info->ControlBlocks); */ - /* Info->ControlBlocks = NULL; */ -} - static void BuildVarList(SstStream Stream, TSMetadataMsg MetaData, int WriterRank) { FFSTypeHandle FFSformat; - FMFieldList FieldList; - FMStructDescList FormatList; void *BaseData; static int DumpMetadata = -1; @@ -1941,7 +1919,6 @@ static void BuildVarList(SstStream Stream, TSMetadataMsg MetaData, Info->MetadataBaseAddrs[WriterRank] = BaseData; for (int i = 0; i < Control->ControlCount; i++) { - int FieldIndex = ControlArray[i].FieldIndex; int FieldOffset = ControlArray[i].FieldOffset; FFSVarRec VarRec = ControlArray[i].VarRec; void *field_data = (char *)BaseData + FieldOffset; From 8356628e7e00e81f9221cb070ac2d6781cb09a16 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Tue, 3 Nov 2020 18:25:13 -0500 Subject: [PATCH 6/6] Don't create var if it exists --- source/adios2/toolkit/sst/cp/ffs_marshal.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/source/adios2/toolkit/sst/cp/ffs_marshal.c b/source/adios2/toolkit/sst/cp/ffs_marshal.c index 1ad23cc0dc..8d1c3cc49e 100644 --- a/source/adios2/toolkit/sst/cp/ffs_marshal.c +++ b/source/adios2/toolkit/sst/cp/ffs_marshal.c @@ -1961,9 +1961,12 @@ static void BuildVarList(SstStream Stream, TSMetadataMsg MetaData, } else { - VarRec->Variable = Stream->VarSetupUpcall(Stream->SetupUpcallReader, - VarRec->VarName, - VarRec->Type, field_data); + if (!VarRec->Variable) + { + VarRec->Variable = Stream->VarSetupUpcall( + Stream->SetupUpcallReader, VarRec->VarName, VarRec->Type, + field_data); + } VarRec->PerWriterDataFieldDesc[WriterRank] = NULL; VarRec->PerWriterMetaFieldOffset[WriterRank] = FieldOffset; }