diff --git a/docs/user_guide/source/engines/sst.rst b/docs/user_guide/source/engines/sst.rst index 39a1f4cbd4..c2831afb61 100644 --- a/docs/user_guide/source/engines/sst.rst +++ b/docs/user_guide/source/engines/sst.rst @@ -267,6 +267,19 @@ eager data sending of all data from each writer to all readers. Currently value is interpreted by only by the SST Reader engine. +17. ``StepDistributionMode``: Default **"StepsAllToAll"**. This value +controls how steps are distributed, particularly when there are +multiple readers. By default, the value is **StepsAllToAll*, which +means that all timesteps are to be delivered to all readers (subject +to discard rules, etc.). In other distribution modes, this is not the +case. For example, in **"StepsRoundRobin"**, each step is delivered +only to a single reader, determined in a round-robin fashion based +upon the number or readers who have opened the stream at the time the +step is submitted. In **"StepsOnDemand"** each step is delivered to a +single reader, but only upon request (with a request being initiated +by the reader doing BeginStep()). Normal reader-side rules (like +BeginStep timeouts) and writer-side rules (like queue limit behavior) apply. + ============================= ===================== ================================================ **Key** **Value Format** **Default** and Examples ============================= ===================== ================================================ diff --git a/source/adios2/engine/sst/SstParamParser.cpp b/source/adios2/engine/sst/SstParamParser.cpp index a8c94a01bc..190551ad3a 100644 --- a/source/adios2/engine/sst/SstParamParser.cpp +++ b/source/adios2/engine/sst/SstParamParser.cpp @@ -212,6 +212,37 @@ void SstParamParser::ParseParams(IO &io, struct _SstParams &Params) return false; }; + auto lf_SetStepDistributionModeParameter = [&](const std::string key, + size_t ¶meter) { + auto itKey = io.m_Parameters.find(key); + if (itKey != io.m_Parameters.end()) + { + std::string method = itKey->second; + std::transform(method.begin(), method.end(), method.begin(), + ::tolower); + if (method == "alltoall") + { + parameter = StepsAllToAll; + } + else if (method == "roundrobin") + { + parameter = StepsRoundRobin; + } + else if (method == "ondemand") + { + parameter = StepsOnDemand; + } + else + { + helper::Throw( + "Engine", "SstParamParser", "ParseParams", + "Unknown Sst StepDistributionMode parameter \"" + method + + "\""); + } + return true; + } + return false; + }; auto lf_SetSpecPreloadModeParameter = [&](const std::string key, int ¶meter) { auto itKey = io.m_Parameters.find(key); diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c index 86e12435ea..197ebb6a63 100644 --- a/source/adios2/toolkit/sst/cp/cp_common.c +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -171,6 +171,8 @@ static char *SstQueueFullStr[] = {"Block", "Discard"}; static char *SstCompressStr[] = {"None", "ZFP"}; static char *SstCommPatternStr[] = {"Min", "Peer"}; static char *SstPreloadModeStr[] = {"Off", "On", "Auto"}; +static char *SstStepDistributionModeStr[] = {"StepsAllToAll", "StepsRoundRobin", + "StepsOnDemand"}; extern void CP_dumpParams(SstStream Stream, struct _SstParams *Params, int ReaderSide) @@ -188,6 +190,8 @@ extern void CP_dumpParams(SstStream Stream, struct _SstParams *Params, (Params->QueueLimit == 0) ? "(unlimited)" : ""); fprintf(stderr, "Param - QueueFullPolicy=%s\n", SstQueueFullStr[Params->QueueFullPolicy]); + fprintf(stderr, "Param - StepDistributionMode=%s\n", + SstStepDistributionModeStr[Params->StepDistributionMode]); } fprintf(stderr, "Param - DataTransport=%s\n", Params->DataTransport ? Params->DataTransport : ""); @@ -532,6 +536,16 @@ static FMStructDescRec ReaderActivateStructs[] = { NULL}, {NULL, NULL, 0, NULL}}; +static FMField ReaderRequestStepList[] = { + {"WSR_Stream", "integer", sizeof(void *), + FMOffset(struct _ReaderRequestStepMsg *, WSR_Stream)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec ReaderRequestStepStructs[] = { + {"ReaderRequestStep", ReaderRequestStepList, + sizeof(struct _ReaderRequestStepMsg), NULL}, + {NULL, NULL, 0, NULL}}; + static FMField WriterCloseList[] = { {"RS_Stream", "integer", sizeof(void *), FMOffset(struct _WriterCloseMsg *, RS_Stream)}, @@ -910,6 +924,11 @@ static void doCMFormatRegistration(CP_GlobalCMInfo CPInfo, CMregister_format(CPInfo->cm, ReaderActivateStructs); CMregister_handler(CPInfo->ReaderActivateFormat, CP_ReaderActivateHandler, NULL); + CPInfo->ReaderRequestStepFormat = + CMregister_format(CPInfo->cm, ReaderRequestStepStructs); + CMregister_handler(CPInfo->ReaderRequestStepFormat, + CP_ReaderRequestStepHandler, NULL); + CPInfo->ReleaseTimestepFormat = CMregister_format(CPInfo->cm, ReleaseTimestepStructs); CMregister_handler(CPInfo->ReleaseTimestepFormat, CP_ReleaseTimestepHandler, diff --git a/source/adios2/toolkit/sst/cp/cp_internal.h b/source/adios2/toolkit/sst/cp/cp_internal.h index d3addea00d..1777b700a1 100644 --- a/source/adios2/toolkit/sst/cp/cp_internal.h +++ b/source/adios2/toolkit/sst/cp/cp_internal.h @@ -18,6 +18,7 @@ typedef struct _CP_GlobalCMInfo CMFormat DeliverTimestepMetadataFormat; CMFormat PeerSetupFormat; CMFormat ReaderActivateFormat; + CMFormat ReaderRequestStepFormat; CMFormat ReleaseTimestepFormat; CMFormat LockReaderDefinitionsFormat; CMFormat CommPatternLockedFormat; @@ -45,12 +46,18 @@ typedef struct _CP_Info struct _ReaderRegisterMsg; -typedef struct _RequestQueue +typedef struct _RegisterQueue { struct _ReaderRegisterMsg *Msg; CMConnection Conn; - struct _RequestQueue *Next; -} * RequestQueue; + struct _RegisterQueue *Next; +} * RegisterQueue; + +typedef struct _StepRequest +{ + int RequestingReader; + struct _StepRequest *Next; +} * StepRequest; typedef struct _CP_PeerConnection { @@ -92,6 +99,7 @@ typedef struct _WS_ReaderInfo SstPreloadModeType PreloadMode; long PreloadModeActiveTimestep; long OldestUnreleasedTimestep; + size_t FormatSentCount; struct _SentTimestepRec *SentTimestepList; void *DP_WSR_Stream; int ReaderCohortSize; @@ -170,18 +178,20 @@ struct _SstStream int QueueLimit; SstQueueFullPolicy QueueFullPolicy; int LastProvidedTimestep; - int NewReaderPresent; int WriterDefinitionsLocked; + size_t NextRRDistribution; + size_t LastDemandTimestep; /* rendezvous condition */ int FirstReaderCondition; - RequestQueue ReadRequestQueue; + RegisterQueue ReaderRegisterQueue; int ReaderCount; WS_ReaderInfo *Readers; char *Filename; char *AbsoluteFilename; int GlobalOpRequired; + StepRequest StepRequestQueue; /* writer side marshal info */ void *WriterMarshalData; @@ -358,6 +368,15 @@ struct _ReaderActivateMsg void *WSR_Stream; }; +/* + * The ReaderRequestStep message informs the writer that this reader is now + * ready to receive a new step (Used in OnDemand step distribution mode) + */ +struct _ReaderRequestStepMsg +{ + void *WSR_Stream; +}; + /* * The timestepMetadata message carries the metadata from all writer ranks. * One is sent to each reader in peer mode, between rank 0's in min mode. @@ -507,6 +526,9 @@ extern void CP_PeerSetupHandler(CManager cm, CMConnection conn, void *msg_v, extern void CP_ReaderActivateHandler(CManager cm, CMConnection conn, void *msg_v, void *client_data, attr_list attrs); +extern void CP_ReaderRequestStepHandler(CManager cm, CMConnection conn, + void *msg_v, void *client_data, + attr_list attrs); extern void CP_TimestepMetadataHandler(CManager cm, CMConnection conn, void *msg_v, void *client_data, attr_list attrs); diff --git a/source/adios2/toolkit/sst/cp/cp_reader.c b/source/adios2/toolkit/sst/cp/cp_reader.c index 306c5866e9..dc22361482 100644 --- a/source/adios2/toolkit/sst/cp/cp_reader.c +++ b/source/adios2/toolkit/sst/cp/cp_reader.c @@ -810,7 +810,7 @@ void queueTimestepMetadataMsgAndNotify(SstStream Stream, } } - struct _TimestepMetadataList *New = malloc(sizeof(struct _RequestQueue)); + struct _TimestepMetadataList *New = malloc(sizeof(struct _RegisterQueue)); New->MetadataMsg = tsm; New->Next = NULL; if (Stream->Timesteps) @@ -2215,6 +2215,17 @@ extern SstStatusValue SstAdvanceStep(SstStream Stream, const float timeout_sec) Stream->CurrentMetadata = NULL; } + if (Stream->WriterConfigParams->StepDistributionMode == StepsOnDemand) + { + struct _ReaderRequestStepMsg Msg; + CP_verbose(Stream, PerRankVerbose, + "Sending Reader Request Step messages to writer\n"); + memset(&Msg, 0, sizeof(Msg)); + sendOneToEachWriterRank( + Stream, Stream->CPInfo->SharedCM->ReaderRequestStepFormat, &Msg, + &Msg.WSR_Stream); + } + SstStepMode mode = SstNextAvailable; if (Stream->ConfigParams->AlwaysProvideLatestTimestep) { diff --git a/source/adios2/toolkit/sst/cp/cp_writer.c b/source/adios2/toolkit/sst/cp/cp_writer.c index 2ec5765383..62b1463bed 100644 --- a/source/adios2/toolkit/sst/cp/cp_writer.c +++ b/source/adios2/toolkit/sst/cp/cp_writer.c @@ -772,7 +772,7 @@ static void SubRefTimestep(SstStream Stream, long Timestep, int SetLast) WS_ReaderInfo WriterParticipateInReaderOpen(SstStream Stream) { - RequestQueue Req; + RegisterQueue Req; reader_data_t ReturnData; void *free_block = NULL; int WriterResponseCondition = -1; @@ -785,9 +785,9 @@ WS_ReaderInfo WriterParticipateInReaderOpen(SstStream Stream) if (Stream->Rank == 0) { STREAM_MUTEX_LOCK(Stream); - assert((Stream->ReadRequestQueue)); - Req = Stream->ReadRequestQueue; - Stream->ReadRequestQueue = Req->Next; + assert((Stream->ReaderRegisterQueue)); + Req = Stream->ReaderRegisterQueue; + Stream->ReaderRegisterQueue = Req->Next; Req->Next = NULL; STREAM_MUTEX_UNLOCK(Stream); struct _CombinedReaderInfo reader_data; @@ -956,7 +956,6 @@ WS_ReaderInfo WriterParticipateInReaderOpen(SstStream Stream) free(ret_data_block); if (pointers) free(pointers); - Stream->NewReaderPresent = 1; CP_verbose(Stream, PerStepVerbose, "Finish writer-side reader open protocol for reader %p, " "reader ready response pending\n", @@ -1121,6 +1120,9 @@ static void DerefAllSentTimesteps(SstStream Stream, WS_ReaderInfo Reader) CP_verbose(Stream, PerRankVerbose, "DONE DEREFERENCING\n"); } +static FFSFormatList ReturnNthListEntry(FFSFormatList List, size_t Count); +static size_t FormatListCount(FFSFormatList List); + static void SendTimestepEntryToSingleReader(SstStream Stream, CPTimestepList Entry, WS_ReaderInfo CP_WSR_Stream, @@ -1129,7 +1131,12 @@ static void SendTimestepEntryToSingleReader(SstStream Stream, STREAM_ASSERT_LOCKED(Stream); if (CP_WSR_Stream->ReaderStatus == Established) { + size_t PriorSent = CP_WSR_Stream->FormatSentCount; CP_WSR_Stream->LastSentTimestep = Entry->Timestep; + FFSFormatList ToSend = + ReturnNthListEntry(Stream->PreviousFormats, PriorSent); + Entry->Msg->Formats = ToSend; + if (rank != -1) { CP_verbose(Stream, PerRankVerbose, @@ -1163,6 +1170,7 @@ static void SendTimestepEntryToSingleReader(SstStream Stream, } Entry->Msg->PreloadMode = PMode; + CP_WSR_Stream->FormatSentCount += FormatListCount(ToSend); STREAM_MUTEX_LOCK(Stream); if (CP_WSR_Stream->ReaderStatus == Established) sendOneToWSRCohort( @@ -1175,10 +1183,58 @@ static void SendTimestepEntryToSingleReader(SstStream Stream, static void SendTimestepEntryToReaders(SstStream Stream, CPTimestepList Entry) { STREAM_ASSERT_LOCKED(Stream); - for (int i = 0; i < Stream->ReaderCount; i++) + switch (Stream->ConfigParams->StepDistributionMode) { - WS_ReaderInfo CP_WSR_Stream = Stream->Readers[i]; - SendTimestepEntryToSingleReader(Stream, Entry, CP_WSR_Stream, i); + case StepsAllToAll: + { + for (int i = 0; i < Stream->ReaderCount; i++) + { + WS_ReaderInfo CP_WSR_Stream = Stream->Readers[i]; + SendTimestepEntryToSingleReader(Stream, Entry, CP_WSR_Stream, i); + } + break; + } + case StepsRoundRobin: + { + if (Stream->ReaderCount == 0) + return; + if (Stream->NextRRDistribution >= Stream->ReaderCount) + Stream->NextRRDistribution = 0; + CP_verbose(Stream, PerRankVerbose, + "Round Robin Distribution, step sent to reader %d\n", + Stream->NextRRDistribution); + WS_ReaderInfo CP_WSR_Stream = + Stream->Readers[Stream->NextRRDistribution]; + SendTimestepEntryToSingleReader(Stream, Entry, CP_WSR_Stream, + Stream->NextRRDistribution); + Stream->NextRRDistribution++; + } + case StepsOnDemand: + { + if (Stream->ReaderCount == 0) + return; + retry: + /* send this entry to the first queued request and delete that request + */ + if (Stream->StepRequestQueue) + { + StepRequest Request = Stream->StepRequestQueue; + Stream->StepRequestQueue = Request->Next; + int RequestingReader = Request->RequestingReader; + free(Request); + if (Stream->Readers[RequestingReader]->ReaderStatus == Established) + { + SendTimestepEntryToSingleReader( + Stream, Entry, Stream->Readers[RequestingReader], + RequestingReader); + Stream->LastDemandTimestep = Entry->Timestep; + } + else + { + goto retry; + } + } + } } } @@ -1221,52 +1277,47 @@ static void waitForReaderResponseAndSendQueued(WS_ReaderInfo Reader) "Reader ready on WSR %p, Stream established, Starting %d " "LastProvided %d.\n", Reader, Reader->StartingTimestep, Stream->LastProvidedTimestep); - for (long TS = Reader->StartingTimestep; TS <= Stream->LastProvidedTimestep; - TS++) + if (Stream->ConfigParams->StepDistributionMode == StepsAllToAll) { - CPTimestepList List = Stream->QueuedTimesteps; - while (List) + for (long TS = Reader->StartingTimestep; + TS <= Stream->LastProvidedTimestep; TS++) { - CP_verbose( - Stream, TraceVerbose, - "In send queued, trying to send TS %ld, examining TS %ld\n", TS, - List->Timestep); - if (Reader->ReaderStatus != Established) + CPTimestepList List = Stream->QueuedTimesteps; + while (List) { - break; /* break out of while if we've fallen out of established - */ - } - if (List->Timestep == TS) - { - FFSFormatList SavedFormats = List->Msg->Formats; - if (List->Expired && !List->PreciousTimestep) + CP_verbose( + Stream, TraceVerbose, + "In send queued, trying to send TS %ld, examining TS %ld\n", + TS, List->Timestep); + if (Reader->ReaderStatus != Established) { - CP_verbose(Stream, TraceVerbose, - "Reader send queued skipping TS %d, expired " - "and not precious\n", - List->Timestep, TS); - List = List->Next; - continue; /* skip timestep is expired, but not - precious */ + break; /* break out of while if we've fallen out of + * established + */ } - if (TS == Reader->StartingTimestep) + if (List->Timestep == TS) { - /* For first Msg, send all previous formats */ - List->Msg->Formats = Stream->PreviousFormats; - } - CP_verbose(Stream, PerStepVerbose, - "Sending Queued TimestepMetadata for timestep %d, " - "reference count = %d\n", - TS, List->ReferenceCount); + if (List->Expired && !List->PreciousTimestep) + { + CP_verbose( + Stream, TraceVerbose, + "Reader send queued skipping TS %d, expired " + "and not precious\n", + List->Timestep, TS); + List = List->Next; + continue; /* skip timestep is expired, but not + precious */ + } + CP_verbose( + Stream, PerStepVerbose, + "Sending Queued TimestepMetadata for timestep %d, " + "reference count = %d\n", + TS, List->ReferenceCount); - SendTimestepEntryToSingleReader(Stream, List, Reader, -1); - if (TS == Reader->StartingTimestep) - { - /* restore Msg format list */ - List->Msg->Formats = SavedFormats; + SendTimestepEntryToSingleReader(Stream, List, Reader, -1); } + List = List->Next; } - List = List->Next; } } STREAM_MUTEX_UNLOCK(Stream); @@ -1346,11 +1397,10 @@ SstStream SstWriterOpen(const char *Name, SstParams Params, SMPI_Comm comm) if (Stream->Rank == 0) { STREAM_MUTEX_LOCK(Stream); - if (Stream->ReadRequestQueue == NULL) + while (Stream->ReaderRegisterQueue == NULL) { STREAM_CONDITION_WAIT(Stream); } - assert(Stream->ReadRequestQueue); STREAM_MUTEX_UNLOCK(Stream); } SMPI_Barrier(Stream->mpiComm); @@ -1660,88 +1710,85 @@ void SstWriterClose(SstStream Stream) } } -#ifdef NOTDEF +static FFSFormatList ReturnNthListEntry(FFSFormatList List, size_t Count) +{ + while (List && Count) + { + Count--; + List = List->Next; + } + return List; +} + +static size_t FormatListCount(FFSFormatList List) +{ + FFSFormatList tmp = List; + size_t count = 0; + while (tmp) + { + count++; + tmp = tmp->Next; + } + return count; +} + static FFSFormatList AddUniqueFormats(FFSFormatList List, - FFSFormatList Candidates) + FFSFormatList Candidates, int copy) { while (Candidates) { + FFSFormatList Last = NULL; FFSFormatList Tmp = List; - int found = 0; + int Found = 0; + FFSFormatList ThisCandidate = Candidates; while (Tmp) { - if ((Tmp->FormatIDRepLen == Candidates->FormatIDRepLen) && - (memcmp(Tmp->FormatIDRep, Candidates->FormatIDRep, + if ((Tmp->FormatIDRepLen == ThisCandidate->FormatIDRepLen) && + (memcmp(Tmp->FormatIDRep, ThisCandidate->FormatIDRep, Tmp->FormatIDRepLen) == 0)) { - found++; - break; + // Identical format already in List, don't add this one + Found++; } + Last = Tmp; Tmp = Tmp->Next; } - if (!found) - { - FFSFormatList New = malloc(sizeof(*New)); - memset(New, 0, sizeof(*New)); - New->FormatServerRep = malloc(Candidates->FormatServerRepLen); - memcpy(New->FormatServerRep, Candidates->FormatServerRep, - Candidates->FormatServerRepLen); - New->FormatServerRepLen = Candidates->FormatServerRepLen; - New->FormatIDRep = malloc(Candidates->FormatIDRepLen); - memcpy(New->FormatIDRep, Candidates->FormatIDRep, - Candidates->FormatIDRepLen); - New->FormatIDRepLen = Candidates->FormatIDRepLen; - New->Next = List; - List = New; - } Candidates = Candidates->Next; - } - return List; -} -#endif - -static FFSFormatList AddUniqueFormats(FFSFormatList List, - FFSFormatList Candidates, int copy) -{ - FFSFormatList Tmp = List; - FFSFormatList Ret = List; - - // If nothing to add, return original - if (!Candidates) - return Ret; - - // Add tail of candidates list first - Ret = AddUniqueFormats(List, Candidates->Next, copy); - - while (Tmp) - { - if ((Tmp->FormatIDRepLen == Candidates->FormatIDRepLen) && - (memcmp(Tmp->FormatIDRep, Candidates->FormatIDRep, - Tmp->FormatIDRepLen) == 0)) + if (!Found) { - // Identical format already in List, don't add this one - return Ret; + // New format not in list, add him to tail. + if (copy) + { + // Copy top Candidates entry before return + FFSFormatList Tmp = malloc(sizeof(*Tmp)); + memset(Tmp, 0, sizeof(*Tmp)); + Tmp->FormatServerRep = + malloc(ThisCandidate->FormatServerRepLen); + memcpy(Tmp->FormatServerRep, ThisCandidate->FormatServerRep, + ThisCandidate->FormatServerRepLen); + Tmp->FormatServerRepLen = ThisCandidate->FormatServerRepLen; + Tmp->FormatIDRep = malloc(ThisCandidate->FormatIDRepLen); + memcpy(Tmp->FormatIDRep, ThisCandidate->FormatIDRep, + ThisCandidate->FormatIDRepLen); + Tmp->FormatIDRepLen = ThisCandidate->FormatIDRepLen; + ThisCandidate = Tmp; + } + else + { + // disconnect this guy so that he can become list end + ThisCandidate->Next = NULL; + } + if (Last) + { + Last->Next = ThisCandidate; + } + else + { + List = ThisCandidate; + } } - Tmp = Tmp->Next; - } - // New format not in list, add him to head and return. - if (copy) - { - // Copy top Candidates entry before return - FFSFormatList Tmp = malloc(sizeof(*Tmp)); - memset(Tmp, 0, sizeof(*Tmp)); - Tmp->FormatServerRep = malloc(Candidates->FormatServerRepLen); - memcpy(Tmp->FormatServerRep, Candidates->FormatServerRep, - Candidates->FormatServerRepLen); - Tmp->FormatServerRepLen = Candidates->FormatServerRepLen; - Tmp->FormatIDRep = malloc(Candidates->FormatIDRepLen); - memcpy(Tmp->FormatIDRep, Candidates->FormatIDRep, - Candidates->FormatIDRepLen); - Tmp->FormatIDRepLen = Candidates->FormatIDRepLen; - Candidates = Tmp; } - Candidates->Next = Ret; - return Candidates; + return List; } static void *FillMetadataMsg(SstStream Stream, struct _TimestepMetadataMsg *Msg, @@ -1812,20 +1859,6 @@ static void *FillMetadataMsg(SstStream Stream, struct _TimestepMetadataMsg *Msg, Stream->PreviousFormats = AddUniqueFormats(Stream->PreviousFormats, XmitFormats, /*copy*/ 1); - if (Stream->NewReaderPresent) - { - /* - * If there is a new reader cohort, those ranks will need all prior - * FFS - * Format info. - */ - Msg->Formats = Stream->PreviousFormats; - Stream->NewReaderPresent = 0; - } - else - { - Msg->Formats = XmitFormats; - } return MetadataFreeValue; } @@ -2179,10 +2212,10 @@ extern void SstInternalProvideTimestep( { int DiscardThisTimestep = 0; struct _ReturnMetadataInfo TimestepMetaData; - RequestQueue ArrivingReader; + RegisterQueue ArrivingReader; void *MetadataFreeValue; STREAM_MUTEX_LOCK(Stream); - ArrivingReader = Stream->ReadRequestQueue; + ArrivingReader = Stream->ReaderRegisterQueue; QueueMaintenance(Stream); if (Stream->QueueFullPolicy == SstQueueFullDiscard) { @@ -2447,13 +2480,13 @@ void queueReaderRegisterMsgAndNotify(SstStream Stream, CMConnection conn) { STREAM_MUTEX_LOCK(Stream); - RequestQueue New = malloc(sizeof(struct _RequestQueue)); + RegisterQueue New = malloc(sizeof(struct _RegisterQueue)); New->Msg = Req; New->Conn = conn; New->Next = NULL; - if (Stream->ReadRequestQueue) + if (Stream->ReaderRegisterQueue) { - RequestQueue Last = Stream->ReadRequestQueue; + RegisterQueue Last = Stream->ReaderRegisterQueue; while (Last->Next) { Last = Last->Next; @@ -2462,7 +2495,7 @@ void queueReaderRegisterMsgAndNotify(SstStream Stream, } else { - Stream->ReadRequestQueue = New; + Stream->ReaderRegisterQueue = New; } STREAM_CONDITION_SIGNAL(Stream); STREAM_MUTEX_UNLOCK(Stream); @@ -2550,6 +2583,89 @@ void CP_ReaderActivateHandler(CManager cm, CMConnection conn, void *Msg_v, PERFSTUBS_TIMER_STOP_FUNC(timer); } +void CP_ReaderRequestStepHandler(CManager cm, CMConnection conn, void *Msg_v, + void *client_data, attr_list attrs) +{ + struct _ReaderRequestStepMsg *Msg = (struct _ReaderRequestStepMsg *)Msg_v; + + WS_ReaderInfo CP_WSR_Stream = Msg->WSR_Stream; + SstStream Stream = CP_WSR_Stream->ParentStream; + CP_verbose(CP_WSR_Stream->ParentStream, PerStepVerbose, + "Reader Request Step message received " + "for Stream %p.\n", + CP_WSR_Stream); + if (CP_WSR_Stream->ParentStream->ConfigParams->CPCommPattern == + SstCPCommPeer) + { + assert(0); + } + + STREAM_MUTEX_LOCK(CP_WSR_Stream->ParentStream); + CPTimestepList List = Stream->QueuedTimesteps; + int RequestingReader = -1; + for (int i = 0; i < Stream->ReaderCount; i++) + { + if (CP_WSR_Stream == Stream->Readers[i]) + { + RequestingReader = i; + } + } + while (List) + { + size_t NextTS = Stream->LastDemandTimestep + 1; + CP_verbose( + Stream, TraceVerbose, + "In RequestStepHandler, trying to send TS %ld, examining TS %ld\n", + NextTS, List->Timestep); + if (CP_WSR_Stream->ReaderStatus != Established) + { + break; /* break out of while if we've fallen out of established + */ + } + if (List->Timestep == NextTS) + { + if (List->Expired && !List->PreciousTimestep) + { + CP_verbose(Stream, TraceVerbose, + "Reader send queued skipping TS %d, expired " + "and not precious\n", + List->Timestep, NextTS); + List = List->Next; + continue; /* skip timestep is expired, but not + precious */ + } + CP_verbose(Stream, PerStepVerbose, + "Sending Queued TimestepMetadata for timestep %d, " + "reference count = %d\n", + NextTS, List->ReferenceCount); + + SendTimestepEntryToSingleReader(Stream, List, CP_WSR_Stream, + RequestingReader); + STREAM_MUTEX_UNLOCK(CP_WSR_Stream->ParentStream); + return; + } + List = List->Next; + } + + CP_verbose(Stream, TraceVerbose, + "In RequestStepHandler, queueing request\n"); + assert(RequestingReader != -1); + StepRequest Request = calloc(sizeof(*Request), 1); + Request->RequestingReader = RequestingReader; + if (!Stream->StepRequestQueue) + { + Stream->StepRequestQueue = Request; + } + else + { + StepRequest Last = Stream->StepRequestQueue; + while (Last->Next) + Last = Last->Next; + Last->Next = Request; + } + STREAM_MUTEX_UNLOCK(CP_WSR_Stream->ParentStream); +} + extern void CP_ReleaseTimestepHandler(CManager cm, CMConnection conn, void *Msg_v, void *client_data, attr_list attrs) diff --git a/source/adios2/toolkit/sst/sst_data.h b/source/adios2/toolkit/sst/sst_data.h index a223a84237..2494fa35f0 100644 --- a/source/adios2/toolkit/sst/sst_data.h +++ b/source/adios2/toolkit/sst/sst_data.h @@ -57,6 +57,7 @@ typedef struct _SstStats MACRO(MarshalMethod, MarshalMethod, size_t, SstMarshalBP) \ MACRO(verbose, Int, int, 0) \ MACRO(RegistrationMethod, RegMethod, size_t, 0) \ + MACRO(StepDistributionMode, StepDistributionMode, size_t, StepsAllToAll) \ MACRO(DataTransport, String, char *, NULL) \ MACRO(WANDataTransport, String, char *, NULL) \ MACRO(OpenTimeoutSecs, Int, int, 60) \ @@ -92,6 +93,13 @@ typedef enum SpecPreloadAuto } SpeculativePreloadMode; +typedef enum +{ + StepsAllToAll, + StepsRoundRobin, + StepsOnDemand +} StepDistributionMode; + struct _SstParams { #define declare_struct(Param, Type, Typedecl, Default) Typedecl Param; diff --git a/testing/adios2/engine/staging-common/CMakeLists.txt b/testing/adios2/engine/staging-common/CMakeLists.txt index d8f972ae78..ebed92a915 100644 --- a/testing/adios2/engine/staging-common/CMakeLists.txt +++ b/testing/adios2/engine/staging-common/CMakeLists.txt @@ -20,6 +20,7 @@ foreach(helper TestCommonWriteAttrs TestCommonWriteLocal TestCommonWriteShared + TestDistributionWrite TestDefSyncWrite TestCommonRead TestCommonReadR64 @@ -28,6 +29,7 @@ foreach(helper TestCommonReadAttrs TestCommonServer TestCommonClient + TestDistributionRead ) add_executable(${helper} ${helper}.cpp) @@ -136,7 +138,7 @@ set (ALL_SIMPLE_TESTS "") list (APPEND ALL_SIMPLE_TESTS ${SIMPLE_TESTS} ${SIMPLE_FORTRAN_TESTS} ${SIMPLE_MPI_TESTS} ${SIMPLE_ZFP_TESTS}) set (SST_SPECIFIC_TESTS "") -list (APPEND SST_SPECIFIC_TESTS "1x1.SstRUDP;1x1.LocalMultiblock") +list (APPEND SST_SPECIFIC_TESTS "1x1.SstRUDP;1x1.LocalMultiblock;RoundRobinDistribution.1x1x3;AllToAllDistribution.1x1x3;OnDemandDistribution.1x1x3") if (ADIOS2_HAVE_MPI) list (APPEND SST_SPECIFIC_TESTS "2x3.SstRUDP;2x1.LocalMultiblock;5x3.LocalMultiblock;") endif() diff --git a/testing/adios2/engine/staging-common/ParseArgs.h b/testing/adios2/engine/staging-common/ParseArgs.h index 6063dd8735..a0057010f8 100644 --- a/testing/adios2/engine/staging-common/ParseArgs.h +++ b/testing/adios2/engine/staging-common/ParseArgs.h @@ -41,6 +41,8 @@ int EarlyExit = 0; int LocalCount = 1; int DataSize = 5 * 1024 * 1024 / 8; /* DefaultMinDeferredSize is 4*1024*1024 This should be more than that. */ +bool RoundRobin = false; +bool OnDemand = false; std::string shutdown_name = "DieTest"; adios2::Mode GlobalWriteMode = adios2::Mode::Deferred; @@ -262,9 +264,19 @@ void ParseArgs(int argc, char **argv) { ZeroDataVar++; } - else if (std::string(argv[1]) == "--zero_data_rank") + else if (std::string(argv[1]) == "--round_robin") { - ZeroDataRank++; + if (OnDemand) + std::cerr << "OnDemand already specified, round robin ignored" + << std::endl; + RoundRobin = true; + } + else if (std::string(argv[1]) == "--on_demand") + { + if (RoundRobin) + std::cerr << "RoundRobin already specified, on_demand ignored" + << std::endl; + OnDemand = true; } else if (std::string(argv[1]) == "--no_data") { diff --git a/testing/adios2/engine/staging-common/TestData.h b/testing/adios2/engine/staging-common/TestData.h index 6a30c5b17b..37514cf710 100644 --- a/testing/adios2/engine/staging-common/TestData.h +++ b/testing/adios2/engine/staging-common/TestData.h @@ -88,8 +88,6 @@ int validateSimpleForwardData(std::vector &data_forward, int step, int ret = 0; int64_t j = 100 * step + start; - std::cout << "Calling validate Simple data forward with step " << step - << ", start = " << start << ", count = " << count << std::endl; for (int i = 0; i < count; i++) { if (data_forward[i] != (double)j + i) diff --git a/testing/adios2/engine/staging-common/TestDistributionRead.cpp b/testing/adios2/engine/staging-common/TestDistributionRead.cpp new file mode 100644 index 0000000000..530376218b --- /dev/null +++ b/testing/adios2/engine/staging-common/TestDistributionRead.cpp @@ -0,0 +1,227 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + */ +#include +#include +#include +#include /* SIZE_MAX */ +#include + +#include +#include + +#include + +#include + +#include "TestData.h" + +#include "ParseArgs.h" + +class CommonReadTest : public ::testing::Test +{ +public: + CommonReadTest() = default; +}; + +#if ADIOS2_USE_MPI +MPI_Comm testComm; +#endif + +// ADIOS2 Common read +TEST_F(CommonReadTest, ADIOS2CommonRead1D8) +{ + // Each process would write a 1x8 array and all processes would + // form a mpiSize * Nx 1D array + int mpiRank = 0, mpiSize = 1; + +#if ADIOS2_USE_MPI + MPI_Comm_rank(testComm, &mpiRank); + MPI_Comm_size(testComm, &mpiSize); +#endif + + // Write test data using ADIOS2 + +#if ADIOS2_USE_MPI + adios2::ADIOS adios(testComm); +#else + adios2::ADIOS adios; +#endif + /* we won't share IOs on the reader side */ + adios2::IO io1 = adios.DeclareIO("TestIO"); + adios2::IO io2 = adios.DeclareIO("TestIO2"); + + // Create the Engines + io1.SetEngine(engine); + io1.SetParameters(engineParams); + io2.SetEngine(engine); + io2.SetParameters(engineParams); + + adios2::Engine engine1 = io1.Open(fname, adios2::Mode::Read); + + std::vector write_times; + + std::string varname1 = "r64"; + + size_t first_step = SIZE_MAX; + size_t total_steps = 0; + while (engine1.BeginStep() == adios2::StepStatus::OK) + { + size_t writerSize; + size_t step; + auto var1 = io1.InquireVariable(varname1); + + EXPECT_TRUE(var1); + ASSERT_EQ(var1.ShapeID(), adios2::ShapeID::GlobalArray); + + auto step_var = io1.InquireVariable("Step"); + + EXPECT_TRUE(step_var); + ASSERT_EQ(step_var.ShapeID(), adios2::ShapeID::GlobalValue); + + /* take the first size as something that gives us writer size */ + writerSize = var1.Shape()[0] / 10; + + long unsigned int myStart = + (long unsigned int)(writerSize * Nx / mpiSize) * mpiRank; + long unsigned int myLength = + (long unsigned int)((writerSize * Nx + mpiSize - 1) / mpiSize); + + if (myStart + myLength > writerSize * Nx) + { + myLength = (long unsigned int)(writerSize + 1) * (int)Nx - myStart; + } + const adios2::Dims start{myStart}; + const adios2::Dims count{myLength}; + std::vector in_R64_1; + + const adios2::Box sel(start, count); + + var1.SetSelection(sel); + + in_R64_1.resize(myLength); + engine1.Get(var1, in_R64_1.data()); + engine1.Get(step_var, &step); + engine1.EndStep(); + + int result = validateSimpleForwardData(in_R64_1, (int)step, myStart, + myLength, writerSize * Nx); + if (first_step == SIZE_MAX) + { + std::cout << "My first step was step " << step << std::endl; + first_step = step; + } + if (result != 0) + { + std::cout << "Read Data Validation failed on node " << mpiRank + << " timestep " << step << std::endl; + } + EXPECT_EQ(result, 0); + if (OnDemand) + { + std::cout << "Reader " << first_step << " Got Step " << step + << std::endl; + switch (first_step) + { + case 0: + { + int StepDelay[] = {1, 3, 5, 0, 0, 20}; + int ExpectedStep[] = {0, 4, 7, 10, 12, 15}; + EXPECT_EQ(ExpectedStep[total_steps], step); + std::cout << "Reader " << first_step << " Sleeping for " + << StepDelay[total_steps] << std::endl; + std::this_thread::sleep_for( + std::chrono::seconds(StepDelay[total_steps])); + break; + } + case 1: + { + int StepDelay[] = {0, 0, 1, 5, 0, 0, 1, 10}; + int ExpectedStep[] = {1, 3, 5, 8, 11, 14, 17, 19}; + EXPECT_EQ(ExpectedStep[total_steps], step); + std::cout << "Reader " << first_step << " Sleeping for " + << StepDelay[total_steps] << std::endl; + std::this_thread::sleep_for( + std::chrono::seconds(StepDelay[total_steps])); + break; + } + case 2: + { + int StepDelay[] = {3, 2, 4, 0, 0, 0, 0}; + int ExpectedStep[] = {2, 6, 9, 13, 16, 18}; + EXPECT_EQ(ExpectedStep[total_steps], step); + std::cout << "Reader " << first_step << " Sleeping for " + << StepDelay[total_steps] << std::endl; + std::this_thread::sleep_for( + std::chrono::seconds(StepDelay[total_steps])); + break; + } + } + } + total_steps++; + } + if (RoundRobin) + { + if (first_step == 0) + { + EXPECT_EQ(total_steps, 4); + } + else + { + EXPECT_EQ(total_steps, 3); + } + } + else if (OnDemand) + { + switch (first_step) + { + case 0: + EXPECT_EQ(total_steps, 6); + break; + case 1: + EXPECT_EQ(total_steps, 8); + break; + case 2: + EXPECT_EQ(total_steps, 6); + break; + } + } + else + { + EXPECT_EQ(first_step, 0); + EXPECT_EQ(total_steps, 10); + } + // Close the file + engine1.Close(); +} + +//****************************************************************************** +// main +//****************************************************************************** + +int main(int argc, char **argv) +{ +#if ADIOS2_USE_MPI + MPI_Init(nullptr, nullptr); + + int key; + MPI_Comm_rank(MPI_COMM_WORLD, &key); + + const unsigned int color = 2; + MPI_Comm_split(MPI_COMM_WORLD, color, key, &testComm); +#endif + + int result; + ::testing::InitGoogleTest(&argc, argv); + + ParseArgs(argc, argv); + + result = RUN_ALL_TESTS(); + +#if ADIOS2_USE_MPI + MPI_Finalize(); +#endif + + return result; +} diff --git a/testing/adios2/engine/staging-common/TestDistributionWrite.cpp b/testing/adios2/engine/staging-common/TestDistributionWrite.cpp new file mode 100644 index 0000000000..e23449950a --- /dev/null +++ b/testing/adios2/engine/staging-common/TestDistributionWrite.cpp @@ -0,0 +1,161 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + */ +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include + +#include "TestData.h" + +#include "ParseArgs.h" + +class CommonWriteTest : public ::testing::Test +{ +public: + CommonWriteTest() = default; +}; + +#if ADIOS2_USE_MPI +MPI_Comm testComm; +#endif + +// ADIOS2 COMMON write +TEST_F(CommonWriteTest, ADIOS2CommonWrite) +{ + // form a mpiSize * Nx 1D array + int mpiRank = 0, mpiSize = 1; + +#if ADIOS2_USE_MPI + MPI_Comm_rank(testComm, &mpiRank); + MPI_Comm_size(testComm, &mpiSize); +#endif + + // Write test data using ADIOS2 + +#if ADIOS2_USE_MPI + adios2::ADIOS adios(testComm); +#else + adios2::ADIOS adios; +#endif + adios2::IO io1 = adios.DeclareIO("TestIO"); + + std::string varname1 = "r64"; + std::string varname2 = "r64_2"; + + // Declare 1D variables (NumOfProcesses * Nx) + // The local process' part (start, count) can be defined now or later + // before Write(). + unsigned int myStart1 = (int)Nx * mpiRank; + unsigned int myCount1 = (int)Nx; + adios2::Dims shape1{static_cast(Nx * mpiSize)}; + adios2::Dims start1{static_cast(myStart1)}; + adios2::Dims count1{static_cast(myCount1)}; + + { + // auto var1 = + (void)io1.DefineVariable(varname1, shape1, start1, count1); + (void)io1.DefineVariable(varname2, shape1, start1, count1); + (void)io1.DefineVariable("Step"); + } + + // Create the Engine + io1.SetEngine(engine); + if (RoundRobin) + { + engineParams["StepDistributionMode"] = "RoundRobin"; + } + else if (OnDemand) + { + engineParams["StepDistributionMode"] = "OnDemand"; + } + else + { + // default + } + io1.SetParameters(engineParams); + + adios2::Engine engine1 = io1.Open(fname, adios2::Mode::Write); + + for (size_t step = 0; step < (size_t)NSteps; step++) + { + + // Generate test data for each process uniquely + std::vector data_forward; + + generateSimpleForwardData(data_forward, (int)step, myStart1, myCount1, + (int)Nx * mpiSize); + + engine1.BeginStep(); + auto var1 = io1.InquireVariable(varname1); + auto step_var = io1.InquireVariable("Step"); + + // Make a 1D selection to describe the local dimensions of the + // variable we write and its offsets in the global spaces + adios2::Box sel1({myStart1}, {myCount1}); + + // Write each one + // fill in the variable with values from starting index to + // starting index + count + const adios2::Mode sync = GlobalWriteMode; + + var1.SetSelection(sel1); + engine1.Put(var1, data_forward.data(), sync); + if (step > 5) + { + auto var2 = io1.InquireVariable(varname2); + var2.SetSelection(sel1); + engine1.Put(var2, data_forward.data(), sync); + } + engine1.Put(step_var, step); + engine1.EndStep(); + if (OnDemand) + { + if (step >= 2) + { + // send out first three quickly, so those (assume 3) guys find + // out who they are, then one every 2 sec + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + } + if (step == 8) + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + } + } + // Close the file + engine1.Close(); +} + +int main(int argc, char **argv) +{ +#if ADIOS2_USE_MPI + MPI_Init(nullptr, nullptr); + + int key; + MPI_Comm_rank(MPI_COMM_WORLD, &key); + + const unsigned int color = 1; + MPI_Comm_split(MPI_COMM_WORLD, color, key, &testComm); +#endif + + int result; + ::testing::InitGoogleTest(&argc, argv); + + ParseArgs(argc, argv); + + result = RUN_ALL_TESTS(); + +#if ADIOS2_USE_MPI + MPI_Finalize(); +#endif + + return result; +} diff --git a/testing/adios2/engine/staging-common/TestSupp.cmake b/testing/adios2/engine/staging-common/TestSupp.cmake index dd27751966..19186ef72c 100644 --- a/testing/adios2/engine/staging-common/TestSupp.cmake +++ b/testing/adios2/engine/staging-common/TestSupp.cmake @@ -157,6 +157,11 @@ set (PreciousTimestep.3x2_TIMEOUT "300") set (PreciousTimestepDiscard.3x2_CMD "run_test.py.$ --test_protocol kill_readers -nw 3 -nr 2 --max_readers 2 --warg=FirstTimestepPrecious=On,RendezvousReaderCount=0,QueueLimit=3,QueueFullPolicy=discard,WENGINE_PARAMS --rarg=--ignore_time_gap --rarg=--precious_first --rarg=--discard --warg=--ms_delay --warg=500") set (PreciousTimestepDiscard.3x2_TIMEOUT "300") +# Writer StepDistributionModes. Here we run the writer and three clients +set (AllToAllDistribution.1x1x3_CMD "run_test.py.$ --test_protocol multi_client -nw 1 -nr 1 -w $ -r $ --warg=RendezvousReaderCount=3,WENGINE_PARAMS") +set (RoundRobinDistribution.1x1x3_CMD "run_test.py.$ --test_protocol multi_client -nw 1 -nr 1 -w $ -r $ --warg=RendezvousReaderCount=3,WENGINE_PARAMS --warg=--round_robin --rarg=--round_robin") +set (OnDemandDistribution.1x1x3_CMD "run_test.py.$ --test_protocol multi_client -nw 1 -nr 1 -w $ -r $ --warg=RendezvousReaderCount=3,WENGINE_PARAMS --warg=--on_demand --rarg=--on_demand --warg=--num_steps --warg=20") + # Readers using BeginStep with timeout. Here we run the writer with a longer delay to make the reader timeout set (TimeoutReader.1x1_CMD "run_test.py.$ --test_protocol one_client -nw 1 -nr 1 --rarg=--non_blocking --warg=--ms_delay --warg=2000") set (TimeoutReader.1x1_TIMEOUT "60") diff --git a/testing/adios2/engine/staging-common/run_test.py.gen.in b/testing/adios2/engine/staging-common/run_test.py.gen.in index 0b99343daf..73b3db937b 100755 --- a/testing/adios2/engine/staging-common/run_test.py.gen.in +++ b/testing/adios2/engine/staging-common/run_test.py.gen.in @@ -145,6 +145,35 @@ def do_one_client_test(writer_cmd, reader_cmd): return return_code +def do_multi_client_test(writer_cmd, reader_cmd): + return_code = 0 + writer = subprocess.Popen(writer_cmd) + reader1 = subprocess.Popen(reader_cmd) + reader2 = subprocess.Popen(reader_cmd) + reader3 = subprocess.Popen(reader_cmd) + print("TestDriver: Waiting for Reader1") + reader1.wait() + print("TestDriver: Reader1 exit status was " + + str(reader1.returncode)) + print("TestDriver: Waiting for Reader2") + reader2.wait() + print("TestDriver: Reader2 exit status was " + + str(reader2.returncode)) + print("TestDriver: Waiting for Reader3") + reader3.wait() + print("TestDriver: Reader3 exit status was " + + str(reader3.returncode)) + if ((reader1.returncode + reader2.returncode + reader3.returncode) != 0): + print("TestDriver: A reader failed, causing test failure") + return_code = 1 + writer.wait(); + print("TestDriver: Writer exit status was " + str(writer.returncode)) + if writer.returncode != 0: + print("TestDriver: Writer failed, causing test failure") + return_code = 1 + return return_code + + def do_kill_writer_test(writer_cmd, reader_cmd, interval): return_code = 0 print("TestDriver: Starting kill_writer test protocol") @@ -260,7 +289,7 @@ parser.add_argument('--interval', type=int, default=5) parser.add_argument('--reader_delay', '-rd', type=int, default=0) parser.add_argument('--disable_mpmd', action='store_true') parser.add_argument('--test_protocol', '-tp', choices=[ - 'simple', 'kill_readers', 'one_client', + 'simple', 'kill_readers', 'one_client', 'multi_client', 'kill_writer'], default='simple') parser.add_argument('engine', default='sst') parser.add_argument('filename', default='tmp') @@ -368,6 +397,9 @@ elif args.test_protocol == 'kill_readers': elif args.test_protocol == 'one_client': return_code = do_one_client_test(writer_command_line, reader_command_line) +elif args.test_protocol == 'multi_client': + return_code = do_multi_client_test(writer_command_line, reader_command_line) + elif args.test_protocol == 'kill_writer': return_code = do_kill_writer_test( writer_command_line, reader_command_line, args.interval)