Skip to content

Commit

Permalink
Merge pull request #3588 from vicentebolea/fix-mpi-dp
Browse files Browse the repository at this point in the history
Fix MPI Data plane cohort handling

(cherry picked from commit c77df61)
  • Loading branch information
vicentebolea committed Oct 31, 2023
1 parent 133ccc7 commit 4525797
Showing 1 changed file with 70 additions and 19 deletions.
89 changes: 70 additions & 19 deletions source/adios2/toolkit/sst/dp/mpi_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include <mpi.h>

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand All @@ -54,15 +55,15 @@ typedef struct _MpiWriterContactInfo
{
char ContactString[MPI_DP_CONTACT_STRING_LEN];
void *StreamWPR;
int PID;
long taskID;
} * MpiWriterContactInfo;

/* Base Stream class, used implicitly */
typedef struct _MpiStream
{
void *CP_Stream;
int Rank;
int PID;
long taskID;
} MpiStream;

/* Link Stream class, used implicitly */
Expand Down Expand Up @@ -231,7 +232,8 @@ static FMField MpiWriterContactList[] = {
sizeof(char), FMOffset(MpiWriterContactInfo, ContactString)},
{"writer_ID", "integer", sizeof(void *),
FMOffset(MpiWriterContactInfo, StreamWPR)},
{"PID", "integer", sizeof(int), FMOffset(MpiWriterContactInfo, PID)},
{"taskID", "integer", sizeof(long),
FMOffset(MpiWriterContactInfo, taskID)},
{NULL, NULL, 0, 0}};

static FMStructDescRec MpiWriterContactStructs[] = {
Expand All @@ -241,6 +243,16 @@ static FMStructDescRec MpiWriterContactStructs[] = {

/*****Internal functions*****************************************************/

/**
* Return an unique process ID (Task ID) for the current process. We do this by
* combining the PID of the process and the hostid (as return the same output
* as `hostid` or the content of /etc/machine-id in modern UNIX-like systems).
*/
static uint64_t GetUniqueTaskId()
{
return ((uint32_t)getpid() * (1ll << 32ll)) | (uint32_t)gethostid();
}

static void MpiReadReplyHandler(CManager cm, CMConnection conn, void *msg_v,
void *client_Data, attr_list attrs);

Expand All @@ -256,9 +268,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v,
* the reader side. It should do whatever is necessary to initialize a new
* reader-side data plane. A pointer to per-reader-rank contact information
* should be placed in *ReaderContactInfoPtr. The structure of that
* information should be described by DPInterface.ReaderContactFormats. (This
* is an FFS format description. See
* https://www.cc.gatech.edu/systems/projects/FFS/.)
* information should be described by DPInterface.ReaderContactFormats.
*/
static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream,
void **ReaderContactInfoPtr,
Expand All @@ -271,7 +281,7 @@ static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream,
CMFormat F;

Stream->Stream.CP_Stream = CP_Stream;
Stream->Stream.PID = getpid();
Stream->Stream.taskID = GetUniqueTaskId();
Stream->Link.Stats = Stats;

SMPI_Comm_rank(comm, &Stream->Stream.Rank);
Expand Down Expand Up @@ -322,7 +332,7 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream,
SMPI_Comm_rank(comm, &Stream->Stream.Rank);

Stream->Stream.CP_Stream = CP_Stream;
Stream->Stream.PID = getpid();
Stream->Stream.taskID = GetUniqueTaskId();
STAILQ_INIT(&Stream->TimeSteps);
TAILQ_INIT(&Stream->Readers);

Expand All @@ -347,8 +357,7 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream,
* on the connecting peer in InitReader) and should create its own
* per-writer-rank contact information and place it in *writerContactInfoPtr.
* The structure of that information should be described by
* DPInterface.WriterContactFormats. (This is an FFS format description. See
* https://www.cc.gatech.edu/systems/projects/FFS/.)
* DPInterface.WriterContactFormats.
*/
static DP_WSR_Stream
MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v,
Expand Down Expand Up @@ -392,7 +401,7 @@ MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v,
"Writer Rank %d, test contact", Rank);

StreamWPR->MyContactInfo.StreamWPR = StreamWPR;
StreamWPR->MyContactInfo.PID = StreamWR->Stream.PID;
StreamWPR->MyContactInfo.taskID = StreamWR->Stream.taskID;
*WriterContactInfoPtr = &StreamWPR->MyContactInfo;

return StreamWPR;
Expand Down Expand Up @@ -503,9 +512,9 @@ static void *MpiReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v,
ret->cm = cm;
ret->CPStream = Stream->Stream.CP_Stream;
ret->DestinationRank = Rank;
ret->CommType = (TargetContact->PID == Stream->Stream.PID) ? MPI_DP_LOCAL
: MPI_DP_REMOTE;

ret->CommType =
(TargetContact->taskID == Stream->Stream.taskID) ? MPI_DP_LOCAL
: MPI_DP_REMOTE;
if (ret->CommType == MPI_DP_REMOTE)
{
CMCondition_set_client_data(cm, ReadRequestMsg.NotifyCondition, ret);
Expand Down Expand Up @@ -576,7 +585,7 @@ static int MpiWaitForCompletion(CP_Services Svcs, void *Handle_v)
else
{
Svcs->verbose(
Handle->CPStream, DPTraceVerbose,
Handle->CPStream, DPCriticalVerbose,
"Remote memory read to rank %d with condition %d has FAILED"
"because of "
"writer failure\n",
Expand Down Expand Up @@ -615,7 +624,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v,
if (!RequestedData)
{
PERFSTUBS_TIMER_STOP_FUNC(timer);
Svcs->verbose(StreamWR->Stream.CP_Stream, DPPerStepVerbose,
Svcs->verbose(StreamWR->Stream.CP_Stream, DPCriticalVerbose,
"Failed to read TimeStep %ld, not found\n",
ReadRequestMsg->TimeStep);
return;
Expand Down Expand Up @@ -850,11 +859,42 @@ static void MpiNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v,
FailedPeerRank);
}

/** MpiDisconnectWriterPerReader.
*
* This is called whenever a reader disconnect from a writer. This function
* simply disconnect the mpi communicator, it does not frees any data
* structure. We must do it in this way since:
*
* - There is the possibility of the failed peer to re-enter in the network.
* - We must disconnect the MPI port for that particular mpi reader task since
* otherwise it the reader task might hung in mpi_finalize, in the case the
* the failure leads to a application graceful exit.
*/
static void MpiDisconnectWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v)
{
MpiStreamWPR StreamWPR = (MpiStreamWPR)WSR_Stream_v;
MpiStreamWR StreamWR = StreamWPR->StreamWR;

const int CohortSize = StreamWPR->Link.CohortSize;

Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
"MpiDisconnectWriterPerReader invoked [rank:%d;cohortSize:%d]\n", CohortSize,
StreamWR->Stream.Rank);

for (int i = 0; i < CohortSize; i++)
{
if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL)
{
MPI_Comm_disconnect(&StreamWPR->CohortMpiComms[i]);
}
}
}

/**
* MpiDestroyWriterPerReader.
*
* This is called whenever a reader disconnect from a writer. This function
* also removes the StreamWPR from its own StreamWR.
* This is called by the MpiDestroyWriter function. This function will free any resource
* allocated to the particulare WriterPerReader instance (StreamWPR).
*/
static void MpiDestroyWriterPerReader(CP_Services Svcs,
DP_WSR_Stream WSR_Stream_v)
Expand All @@ -864,6 +904,10 @@ static void MpiDestroyWriterPerReader(CP_Services Svcs,

const int CohortSize = StreamWPR->Link.CohortSize;

Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
"MpiDestroyWriterPerReader invoked [rank:%d;cohortSize:%d]", CohortSize,
StreamWR->Stream.Rank);

for (int i = 0; i < CohortSize; i++)
{
if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL)
Expand All @@ -889,6 +933,9 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
{
MpiStreamWR StreamWR = (MpiStreamWR)WS_Stream_v;

Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
"MpiDestroyWriter invoked [rank:%d]\n", StreamWR->Stream.Rank);

pthread_mutex_lock(&StreamWR->MutexReaders);
while (!TAILQ_EMPTY(&StreamWR->Readers))
{
Expand Down Expand Up @@ -918,6 +965,10 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
static void MpiDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v)
{
MpiStreamRD StreamRS = (MpiStreamRD)RS_Stream_v;

Svcs->verbose(StreamRS->Stream.CP_Stream, DPTraceVerbose,
"MpiDestroyReader invoked [rank:%d]\n", StreamRS->Stream.Rank);

const int CohortSize = StreamRS->Link.CohortSize;

for (int i = 0; i < CohortSize; i++)
Expand Down Expand Up @@ -948,7 +999,7 @@ extern CP_DP_Interface LoadMpiDP()
.getPriority = MpiGetPriority,
.destroyReader = MpiDestroyReader,
.destroyWriter = MpiDestroyWriter,
.destroyWriterPerReader = MpiDestroyWriterPerReader,
.destroyWriterPerReader = MpiDisconnectWriterPerReader,
.notifyConnFailure = MpiNotifyConnFailure,
};

Expand Down

0 comments on commit 4525797

Please sign in to comment.