Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

on-demand step delivery #3170

Merged
merged 9 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/user_guide/source/engines/sst.rst
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,19 @@ eager data sending of all data from each writer to all readers.
Currently value is interpreted by only by the SST Reader engine.


17. ``StepDistributionMode``: Default **"StepsAllToAll"**. This value
controls how steps are distributed, particularly when there are
multiple readers. By default, the value is **StepsAllToAll*, which
means that all timesteps are to be delivered to all readers (subject
to discard rules, etc.). In other distribution modes, this is not the
case. For example, in **"StepsRoundRobin"**, each step is delivered
only to a single reader, determined in a round-robin fashion based
upon the number or readers who have opened the stream at the time the
step is submitted. In **"StepsOnDemand"** each step is delivered to a
single reader, but only upon request (with a request being initiated
by the reader doing BeginStep()). Normal reader-side rules (like
BeginStep timeouts) and writer-side rules (like queue limit behavior) apply.

============================= ===================== ================================================
**Key** **Value Format** **Default** and Examples
============================= ===================== ================================================
Expand Down
31 changes: 31 additions & 0 deletions source/adios2/engine/sst/SstParamParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,37 @@ void SstParamParser::ParseParams(IO &io, struct _SstParams &Params)
return false;
};

auto lf_SetStepDistributionModeParameter = [&](const std::string key,
size_t &parameter) {
auto itKey = io.m_Parameters.find(key);
if (itKey != io.m_Parameters.end())
{
std::string method = itKey->second;
std::transform(method.begin(), method.end(), method.begin(),
::tolower);
if (method == "alltoall")
{
parameter = StepsAllToAll;
}
else if (method == "roundrobin")
{
parameter = StepsRoundRobin;
}
else if (method == "ondemand")
{
parameter = StepsOnDemand;
}
else
{
helper::Throw<std::invalid_argument>(
"Engine", "SstParamParser", "ParseParams",
"Unknown Sst StepDistributionMode parameter \"" + method +
"\"");
}
return true;
}
return false;
};
auto lf_SetSpecPreloadModeParameter = [&](const std::string key,
int &parameter) {
auto itKey = io.m_Parameters.find(key);
Expand Down
19 changes: 19 additions & 0 deletions source/adios2/toolkit/sst/cp/cp_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ static char *SstQueueFullStr[] = {"Block", "Discard"};
static char *SstCompressStr[] = {"None", "ZFP"};
static char *SstCommPatternStr[] = {"Min", "Peer"};
static char *SstPreloadModeStr[] = {"Off", "On", "Auto"};
static char *SstStepDistributionModeStr[] = {"StepsAllToAll", "StepsRoundRobin",
"StepsOnDemand"};

extern void CP_dumpParams(SstStream Stream, struct _SstParams *Params,
int ReaderSide)
Expand All @@ -188,6 +190,8 @@ extern void CP_dumpParams(SstStream Stream, struct _SstParams *Params,
(Params->QueueLimit == 0) ? "(unlimited)" : "");
fprintf(stderr, "Param - QueueFullPolicy=%s\n",
SstQueueFullStr[Params->QueueFullPolicy]);
fprintf(stderr, "Param - StepDistributionMode=%s\n",
SstStepDistributionModeStr[Params->StepDistributionMode]);
}
fprintf(stderr, "Param - DataTransport=%s\n",
Params->DataTransport ? Params->DataTransport : "");
Expand Down Expand Up @@ -532,6 +536,16 @@ static FMStructDescRec ReaderActivateStructs[] = {
NULL},
{NULL, NULL, 0, NULL}};

static FMField ReaderRequestStepList[] = {
{"WSR_Stream", "integer", sizeof(void *),
FMOffset(struct _ReaderRequestStepMsg *, WSR_Stream)},
{NULL, NULL, 0, 0}};

static FMStructDescRec ReaderRequestStepStructs[] = {
{"ReaderRequestStep", ReaderRequestStepList,
sizeof(struct _ReaderRequestStepMsg), NULL},
{NULL, NULL, 0, NULL}};

static FMField WriterCloseList[] = {
{"RS_Stream", "integer", sizeof(void *),
FMOffset(struct _WriterCloseMsg *, RS_Stream)},
Expand Down Expand Up @@ -910,6 +924,11 @@ static void doCMFormatRegistration(CP_GlobalCMInfo CPInfo,
CMregister_format(CPInfo->cm, ReaderActivateStructs);
CMregister_handler(CPInfo->ReaderActivateFormat, CP_ReaderActivateHandler,
NULL);
CPInfo->ReaderRequestStepFormat =
CMregister_format(CPInfo->cm, ReaderRequestStepStructs);
CMregister_handler(CPInfo->ReaderRequestStepFormat,
CP_ReaderRequestStepHandler, NULL);

CPInfo->ReleaseTimestepFormat =
CMregister_format(CPInfo->cm, ReleaseTimestepStructs);
CMregister_handler(CPInfo->ReleaseTimestepFormat, CP_ReleaseTimestepHandler,
Expand Down
32 changes: 27 additions & 5 deletions source/adios2/toolkit/sst/cp/cp_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ typedef struct _CP_GlobalCMInfo
CMFormat DeliverTimestepMetadataFormat;
CMFormat PeerSetupFormat;
CMFormat ReaderActivateFormat;
CMFormat ReaderRequestStepFormat;
CMFormat ReleaseTimestepFormat;
CMFormat LockReaderDefinitionsFormat;
CMFormat CommPatternLockedFormat;
Expand Down Expand Up @@ -45,12 +46,18 @@ typedef struct _CP_Info

struct _ReaderRegisterMsg;

typedef struct _RequestQueue
typedef struct _RegisterQueue
{
struct _ReaderRegisterMsg *Msg;
CMConnection Conn;
struct _RequestQueue *Next;
} * RequestQueue;
struct _RegisterQueue *Next;
} * RegisterQueue;

typedef struct _StepRequest
{
int RequestingReader;
struct _StepRequest *Next;
} * StepRequest;

typedef struct _CP_PeerConnection
{
Expand Down Expand Up @@ -92,6 +99,7 @@ typedef struct _WS_ReaderInfo
SstPreloadModeType PreloadMode;
long PreloadModeActiveTimestep;
long OldestUnreleasedTimestep;
size_t FormatSentCount;
struct _SentTimestepRec *SentTimestepList;
void *DP_WSR_Stream;
int ReaderCohortSize;
Expand Down Expand Up @@ -170,18 +178,20 @@ struct _SstStream
int QueueLimit;
SstQueueFullPolicy QueueFullPolicy;
int LastProvidedTimestep;
int NewReaderPresent;
int WriterDefinitionsLocked;
size_t NextRRDistribution;
size_t LastDemandTimestep;

/* rendezvous condition */
int FirstReaderCondition;
RequestQueue ReadRequestQueue;
RegisterQueue ReaderRegisterQueue;

int ReaderCount;
WS_ReaderInfo *Readers;
char *Filename;
char *AbsoluteFilename;
int GlobalOpRequired;
StepRequest StepRequestQueue;

/* writer side marshal info */
void *WriterMarshalData;
Expand Down Expand Up @@ -358,6 +368,15 @@ struct _ReaderActivateMsg
void *WSR_Stream;
};

/*
* The ReaderRequestStep message informs the writer that this reader is now
* ready to receive a new step (Used in OnDemand step distribution mode)
*/
struct _ReaderRequestStepMsg
{
void *WSR_Stream;
};

/*
* The timestepMetadata message carries the metadata from all writer ranks.
* One is sent to each reader in peer mode, between rank 0's in min mode.
Expand Down Expand Up @@ -507,6 +526,9 @@ extern void CP_PeerSetupHandler(CManager cm, CMConnection conn, void *msg_v,
extern void CP_ReaderActivateHandler(CManager cm, CMConnection conn,
void *msg_v, void *client_data,
attr_list attrs);
extern void CP_ReaderRequestStepHandler(CManager cm, CMConnection conn,
void *msg_v, void *client_data,
attr_list attrs);
extern void CP_TimestepMetadataHandler(CManager cm, CMConnection conn,
void *msg_v, void *client_data,
attr_list attrs);
Expand Down
13 changes: 12 additions & 1 deletion source/adios2/toolkit/sst/cp/cp_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ void queueTimestepMetadataMsgAndNotify(SstStream Stream,
}
}

struct _TimestepMetadataList *New = malloc(sizeof(struct _RequestQueue));
struct _TimestepMetadataList *New = malloc(sizeof(struct _RegisterQueue));
New->MetadataMsg = tsm;
New->Next = NULL;
if (Stream->Timesteps)
Expand Down Expand Up @@ -2215,6 +2215,17 @@ extern SstStatusValue SstAdvanceStep(SstStream Stream, const float timeout_sec)
Stream->CurrentMetadata = NULL;
}

if (Stream->WriterConfigParams->StepDistributionMode == StepsOnDemand)
{
struct _ReaderRequestStepMsg Msg;
CP_verbose(Stream, PerRankVerbose,
"Sending Reader Request Step messages to writer\n");
memset(&Msg, 0, sizeof(Msg));
sendOneToEachWriterRank(
Stream, Stream->CPInfo->SharedCM->ReaderRequestStepFormat, &Msg,
&Msg.WSR_Stream);
}

SstStepMode mode = SstNextAvailable;
if (Stream->ConfigParams->AlwaysProvideLatestTimestep)
{
Expand Down
Loading