Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SST more thread safe #2334

Merged
merged 6 commits into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
343 changes: 185 additions & 158 deletions source/adios2/toolkit/sst/cp/cp_common.c

Large diffs are not rendered by default.

41 changes: 26 additions & 15 deletions source/adios2/toolkit/sst/cp/cp_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,18 @@

#define SSTMAGICV0 "#ADIOS2-SST v0\n"

typedef struct _CP_GlobalInfo
typedef struct StructList
{
int CustomStructCount;
FMStructDescList *CustomStructList;
} CP_StructList;

typedef struct _CP_GlobalCMInfo
{
/* exchange info */
CManager cm;
FFSContext ffs_c;
FMContext fm_c;
FFSTypeHandle PerRankReaderInfoFormat;
FFSTypeHandle CombinedReaderInfoFormat;
CMFormat ReaderRegisterFormat;
FFSTypeHandle PerRankWriterInfoFormat;
FFSTypeHandle CombinedWriterInfoFormat;
CMFormat WriterResponseFormat;
FFSTypeHandle PerRankMetadataFormat;
FFSTypeHandle TimestepDistributionFormat;
FFSTypeHandle ReturnMetadataInfoFormat;
CMFormat DeliverTimestepMetadataFormat;
CMFormat PeerSetupFormat;
CMFormat ReaderActivateFormat;
Expand All @@ -26,11 +23,25 @@ typedef struct _CP_GlobalInfo
CMFormat CommPatternLockedFormat;
CMFormat WriterCloseFormat;
CMFormat ReaderCloseFormat;
int CustomStructCount;
FMStructDescList *CustomStructList;
int LastCallFreeCount;
void **LastCallFreeList;
} * CP_GlobalInfo;
struct StructList CustomStructs;
} * CP_GlobalCMInfo;

typedef struct _CP_Info
{
CP_GlobalCMInfo SharedCM;
FFSContext ffs_c;
FMContext fm_c;
FFSTypeHandle PerRankReaderInfoFormat;
FFSTypeHandle CombinedReaderInfoFormat;
FFSTypeHandle PerRankWriterInfoFormat;
FFSTypeHandle CombinedWriterInfoFormat;
FFSTypeHandle PerRankMetadataFormat;
FFSTypeHandle TimestepDistributionFormat;
FFSTypeHandle ReturnMetadataInfoFormat;
struct StructList CustomStructs;
} * CP_Info;

struct _ReaderRegisterMsg;

Expand Down Expand Up @@ -121,7 +132,7 @@ typedef struct FFSFormatBlock *FFSFormatList;

struct _SstStream
{
CP_GlobalInfo CPInfo;
CP_Info CPInfo;

SMPI_Comm mpiComm;
enum StreamRole Role;
Expand Down Expand Up @@ -462,7 +473,7 @@ typedef struct _MetadataPlusDPInfo *MetadataPlusDPInfo;
extern atom_t CM_TRANSPORT_ATOM;

void CP_validateParams(SstStream stream, SstParams Params, int Writer);
extern CP_GlobalInfo CP_getCPInfo(CP_DP_Interface DPInfo, char *ControlModule);
extern CP_Info CP_getCPInfo(CP_DP_Interface DPInfo, char *ControlModule);
extern char *CP_GetContactString(SstStream s, attr_list DPAttrs);
extern SstStream CP_newStream();
extern void SstInternalProvideTimestep(
Expand Down
53 changes: 29 additions & 24 deletions source/adios2/toolkit/sst/cp/cp_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ attr_list ContactWriter(SstStream Stream, char *Filename, SstParams Params,
(globalNetinfoCallback)(2, CMContactString, NULL);
}
WriterRank0Contact = attr_list_from_string(CMContactString);
conn = CMget_conn(Stream->CPInfo->cm, WriterRank0Contact);
conn = CMget_conn(Stream->CPInfo->SharedCM->cm, WriterRank0Contact);
free_attr_list(WriterRank0Contact);
}
if (conn)
Expand Down Expand Up @@ -510,7 +510,7 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm)
memset(&ReaderRegister, 0, sizeof(ReaderRegister));
ReaderRegister.WriterFile = WriterFileID;
ReaderRegister.WriterResponseCondition =
CMCondition_get(Stream->CPInfo->cm, rank0_to_rank0_conn);
CMCondition_get(Stream->CPInfo->SharedCM->cm, rank0_to_rank0_conn);
ReaderRegister.ReaderCohortSize = Stream->CohortSize;
switch (Stream->ConfigParams->SpeculativePreloadMode)
{
Expand Down Expand Up @@ -543,11 +543,12 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm)

/* the response value is set in the handler */
struct _WriterResponseMsg *response = NULL;
CMCondition_set_client_data(Stream->CPInfo->cm,
CMCondition_set_client_data(Stream->CPInfo->SharedCM->cm,
ReaderRegister.WriterResponseCondition,
&response);

if (CMwrite(rank0_to_rank0_conn, Stream->CPInfo->ReaderRegisterFormat,
if (CMwrite(rank0_to_rank0_conn,
Stream->CPInfo->SharedCM->ReaderRegisterFormat,
&ReaderRegister) != 1)
{
CP_verbose(Stream,
Expand All @@ -561,7 +562,7 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm)
Stream,
"Waiting for writer response message in SstReadOpen(\"%s\")\n",
Filename, ReaderRegister.WriterResponseCondition);
CMCondition_wait(Stream->CPInfo->cm,
CMCondition_wait(Stream->CPInfo->SharedCM->cm,
ReaderRegister.WriterResponseCondition);
CP_verbose(Stream,
"finished wait writer response message in read_open\n");
Expand Down Expand Up @@ -701,8 +702,9 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm)
Stream->ConnectionsToWriter, ReturnData->DP_WriterInfo);
CP_verbose(Stream, "Sending Reader Activate messages to writer\n");
memset(&Msg, 0, sizeof(Msg));
sendOneToEachWriterRank(Stream, Stream->CPInfo->ReaderActivateFormat, &Msg,
&Msg.WSR_Stream);
sendOneToEachWriterRank(Stream,
Stream->CPInfo->SharedCM->ReaderActivateFormat,
&Msg, &Msg.WSR_Stream);
CP_verbose(Stream,
"Finish opening Stream \"%s\", starting with Step number %d\n",
Filename, ReturnData->StartingStepNumber);
Expand Down Expand Up @@ -780,9 +782,9 @@ void queueTimestepMetadataMsgAndNotify(SstStream Stream,
"Sending ReleaseTimestep message for PRIOR DISCARD "
"timestep %d, one to each writer\n",
tsm->Timestep);
sendOneToEachWriterRank(Stream,
Stream->CPInfo->ReleaseTimestepFormat, &Msg,
&Msg.WSR_Stream);
sendOneToEachWriterRank(
Stream, Stream->CPInfo->SharedCM->ReleaseTimestepFormat, &Msg,
&Msg.WSR_Stream);
}
else
{
Expand Down Expand Up @@ -1076,7 +1078,7 @@ static void waitForMetadataWithTimeout(SstStream Stream, float timeout_secs)
}

TimeoutTask =
CMadd_delayed_task(Stream->CPInfo->cm, timeout_int_sec,
CMadd_delayed_task(Stream->CPInfo->SharedCM->cm, timeout_int_sec,
timeout_int_usec, triggerDataCondition, Stream);
while (1)
{
Expand Down Expand Up @@ -1155,13 +1157,13 @@ static void releasePriorTimesteps(SstStream Stream, long Latest)
Last->Next = Next;
}
STREAM_MUTEX_UNLOCK(Stream);
sendOneToEachWriterRank(Stream,
Stream->CPInfo->ReleaseTimestepFormat, &Msg,
&Msg.WSR_Stream);
sendOneToEachWriterRank(
Stream, Stream->CPInfo->SharedCM->ReleaseTimestepFormat, &Msg,
&Msg.WSR_Stream);
if (This->MetadataMsg == NULL)
printf("READER RETURN_BUFFER, metadatamsg == %p, line %d\n",
This->MetadataMsg, __LINE__);
CMreturn_buffer(Stream->CPInfo->cm, This->MetadataMsg);
CMreturn_buffer(Stream->CPInfo->SharedCM->cm, This->MetadataMsg);
STREAM_MUTEX_LOCK(Stream);
free(This);
}
Expand All @@ -1187,7 +1189,7 @@ static void FreeTimestep(SstStream Stream, long Timestep)
if (List->MetadataMsg == NULL)
printf("READER RETURN_BUFFER, List->MEtadataMsg == %p, line %d\n",
List->MetadataMsg, __LINE__);
CMreturn_buffer(Stream->CPInfo->cm, List->MetadataMsg);
CMreturn_buffer(Stream->CPInfo->SharedCM->cm, List->MetadataMsg);

free(List);
}
Expand All @@ -1204,7 +1206,8 @@ static void FreeTimestep(SstStream Stream, long Timestep)
printf("READER RETURN_BUFFER, List->MEtadataMsg == %p, "
"line %d\n",
List->MetadataMsg, __LINE__);
CMreturn_buffer(Stream->CPInfo->cm, List->MetadataMsg);
CMreturn_buffer(Stream->CPInfo->SharedCM->cm,
List->MetadataMsg);

free(List);
break;
Expand Down Expand Up @@ -1388,8 +1391,9 @@ extern void SstReaderDefinitionLock(SstStream Stream, long EffectiveTimestep)
memset(&Msg, 0, sizeof(Msg));
Msg.Timestep = EffectiveTimestep;

sendOneToEachWriterRank(Stream, Stream->CPInfo->LockReaderDefinitionsFormat,
&Msg, &Msg.WSR_Stream);
sendOneToEachWriterRank(
Stream, Stream->CPInfo->SharedCM->LockReaderDefinitionsFormat, &Msg,
&Msg.WSR_Stream);
}

// SstReleaseStep is only called by the main program thread. It
Expand Down Expand Up @@ -1430,8 +1434,9 @@ extern void SstReleaseStep(SstStream Stream)
Stream,
"Sending ReleaseTimestep message for timestep %d, one to each writer\n",
Timestep);
sendOneToEachWriterRank(Stream, Stream->CPInfo->ReleaseTimestepFormat, &Msg,
&Msg.WSR_Stream);
sendOneToEachWriterRank(Stream,
Stream->CPInfo->SharedCM->ReleaseTimestepFormat,
&Msg, &Msg.WSR_Stream);

if (Stream->WriterConfigParams->MarshalMethod == SstMarshalFFS)
{
Expand Down Expand Up @@ -1990,12 +1995,12 @@ extern void SstReaderClose(SstStream Stream)
gettimeofday(&CloseTime, NULL);
timersub(&CloseTime, &Stream->ValidStartTime, &Diff);
memset(&Msg, 0, sizeof(Msg));
sendOneToEachWriterRank(Stream, Stream->CPInfo->ReaderCloseFormat, &Msg,
&Msg.WSR_Stream);
sendOneToEachWriterRank(Stream, Stream->CPInfo->SharedCM->ReaderCloseFormat,
&Msg, &Msg.WSR_Stream);
if (Stream->Stats)
Stream->Stats->ValidTimeSecs = (double)Diff.tv_usec / 1e6 + Diff.tv_sec;

CMusleep(Stream->CPInfo->cm, 100000);
CMusleep(Stream->CPInfo->SharedCM->cm, 100000);
if (Stream->CurrentMetadata != NULL)
{
if (Stream->CurrentMetadata->FreeBlock)
Expand Down
55 changes: 22 additions & 33 deletions source/adios2/toolkit/sst/cp/cp_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -318,20 +318,6 @@ static void RemoveQueueEntries(SstStream Stream)
}
}

static void ReleaseAndDiscardRemainingTimesteps(SstStream Stream)
{
CPTimestepList List = Stream->QueuedTimesteps;

while (List)
{
List->Expired = 1;
List->PreciousTimestep = 0;
List->ReferenceCount = 0;
List = List->Next;
}
RemoveQueueEntries(Stream);
}

/*
Queue maintenance: (ASSUME LOCKED)
calculate smallest entry for CurrentTimestep in a reader. Update that
Expand Down Expand Up @@ -517,7 +503,7 @@ static void SendPeerSetupMsg(WS_ReaderInfo reader, int reversePeer, int myRank)
setup.WriterRank = myRank;
setup.WriterCohortSize = Stream->CohortSize;
STREAM_ASSERT_UNLOCKED(Stream);
if (CMwrite(conn, Stream->CPInfo->PeerSetupFormat, &setup) != 1)
if (CMwrite(conn, Stream->CPInfo->SharedCM->PeerSetupFormat, &setup) != 1)
{
CP_verbose(Stream,
"Message failed to send to reader in sendPeerSetup in "
Expand Down Expand Up @@ -606,7 +592,7 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize,
if (!reader->Connections[peer].CMconn)
{
reader->Connections[peer].CMconn =
CMget_conn(reader->ParentStream->CPInfo->cm,
CMget_conn(reader->ParentStream->CPInfo->SharedCM->cm,
reader->Connections[peer].ContactList);
}

Expand Down Expand Up @@ -660,7 +646,7 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize,
usleep(WriterRank *
reader->ParentStream->ConnectionUsleepMultiplier);
reader->Connections[peer].CMconn =
CMget_conn(reader->ParentStream->CPInfo->cm,
CMget_conn(reader->ParentStream->CPInfo->SharedCM->cm,
reader->Connections[peer].ContactList);

if (!reader->Connections[peer].CMconn)
Expand Down Expand Up @@ -694,7 +680,7 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize,
if (!reader->Connections[0].CMconn)
{
reader->Connections[0].CMconn =
CMget_conn(reader->ParentStream->CPInfo->cm,
CMget_conn(reader->ParentStream->CPInfo->SharedCM->cm,
reader->Connections[0].ContactList);
}
if (!reader->Connections[0].CMconn)
Expand Down Expand Up @@ -811,7 +797,7 @@ WS_ReaderInfo WriterParticipateInReaderOpen(SstStream Stream)
&free_block);
WriterResponseCondition = Req->Msg->WriterResponseCondition;
conn = Req->Conn;
CMreturn_buffer(Stream->CPInfo->cm, Req->Msg);
CMreturn_buffer(Stream->CPInfo->SharedCM->cm, Req->Msg);
free(Req);
}
else
Expand Down Expand Up @@ -948,7 +934,8 @@ WS_ReaderInfo WriterParticipateInReaderOpen(SstStream Stream)
response.DP_WriterInfo[i] = pointers[i]->DP_Info;
}
STREAM_ASSERT_UNLOCKED(Stream);
if (CMwrite(conn, Stream->CPInfo->WriterResponseFormat, &response) != 1)
if (CMwrite(conn, Stream->CPInfo->SharedCM->WriterResponseFormat,
&response) != 1)
{
CP_verbose(
Stream,
Expand Down Expand Up @@ -1167,9 +1154,10 @@ static void SendTimestepEntryToSingleReader(SstStream Stream,
Entry->Msg->PreloadMode = PMode;
STREAM_MUTEX_LOCK(Stream);
if (CP_WSR_Stream->ReaderStatus == Established)
sendOneToWSRCohort(CP_WSR_Stream,
Stream->CPInfo->DeliverTimestepMetadataFormat,
Entry->Msg, &Entry->Msg->RS_Stream);
sendOneToWSRCohort(
CP_WSR_Stream,
Stream->CPInfo->SharedCM->DeliverTimestepMetadataFormat,
Entry->Msg, &Entry->Msg->RS_Stream);
}
}

Expand Down Expand Up @@ -1306,7 +1294,7 @@ SstStream SstWriterOpen(const char *Name, SstParams Params, SMPI_Comm comm)
if (Stream->RendezvousReaderCount > 0)
{
Stream->FirstReaderCondition =
CMCondition_get(Stream->CPInfo->cm, NULL);
CMCondition_get(Stream->CPInfo->SharedCM->cm, NULL);
}
else
{
Expand Down Expand Up @@ -1470,7 +1458,7 @@ static void CP_PeerFailCloseWSReader(WS_ReaderInfo CP_WSR_Stream,
if (NewState == PeerFailed)
{
// move to fully closed state later
CMfree(CMadd_delayed_task(ParentStream->CPInfo->cm, 2, 0,
CMfree(CMadd_delayed_task(ParentStream->CPInfo->SharedCM->cm, 2, 0,
CloseWSRStream, CP_WSR_Stream));
}
}
Expand Down Expand Up @@ -1505,8 +1493,8 @@ void SstWriterClose(SstStream Stream)
"SstWriterClose, Sending Close at Timestep %d, one to each reader\n",
Msg.FinalTimestep);

sendOneToEachReaderRank(Stream, Stream->CPInfo->WriterCloseFormat, &Msg,
&Msg.RS_Stream);
sendOneToEachReaderRank(Stream, Stream->CPInfo->SharedCM->WriterCloseFormat,
&Msg, &Msg.RS_Stream);

UntagPreciousTimesteps(Stream);
Stream->ConfigParams->ReserveQueueLimit = 0;
Expand Down Expand Up @@ -1857,9 +1845,10 @@ static void ActOnTSLockStatus(SstStream Stream, long Timestep)
}
Msg.Timestep = Timestep;
SomethingSent++;
sendOneToWSRCohort(Stream->Readers[i],
Stream->CPInfo->CommPatternLockedFormat, &Msg,
&Msg.RS_Stream);
sendOneToWSRCohort(
Stream->Readers[i],
Stream->CPInfo->SharedCM->CommPatternLockedFormat, &Msg,
&Msg.RS_Stream);
Stream->Readers[i]->PreloadMode = SstPreloadLearned;
Stream->Readers[i]->PreloadModeActiveTimestep = Timestep;
CP_verbose(Stream,
Expand Down Expand Up @@ -2273,9 +2262,9 @@ extern void SstInternalProvideTimestep(
Timestep);

STREAM_MUTEX_LOCK(Stream);
sendOneToEachReaderRank(Stream,
Stream->CPInfo->DeliverTimestepMetadataFormat,
Msg, &Msg->RS_Stream);
sendOneToEachReaderRank(
Stream, Stream->CPInfo->SharedCM->DeliverTimestepMetadataFormat,
Msg, &Msg->RS_Stream);

Entry->Expired = 1;
Entry->ReferenceCount = 0;
Expand Down
Loading