diff --git a/source/adios2/toolkit/sst/dp/mpi_dp.c b/source/adios2/toolkit/sst/dp/mpi_dp.c index 8ddec7b9eb..77ed69dee5 100644 --- a/source/adios2/toolkit/sst/dp/mpi_dp.c +++ b/source/adios2/toolkit/sst/dp/mpi_dp.c @@ -30,6 +30,7 @@ #include +#include #include #include #include @@ -54,7 +55,7 @@ typedef struct _MpiWriterContactInfo { char ContactString[MPI_DP_CONTACT_STRING_LEN]; void *StreamWPR; - int PID; + long taskID; } * MpiWriterContactInfo; /* Base Stream class, used implicitly */ @@ -62,7 +63,7 @@ typedef struct _MpiStream { void *CP_Stream; int Rank; - int PID; + long taskID; } MpiStream; /* Link Stream class, used implicitly */ @@ -231,7 +232,8 @@ static FMField MpiWriterContactList[] = { sizeof(char), FMOffset(MpiWriterContactInfo, ContactString)}, {"writer_ID", "integer", sizeof(void *), FMOffset(MpiWriterContactInfo, StreamWPR)}, - {"PID", "integer", sizeof(int), FMOffset(MpiWriterContactInfo, PID)}, + {"taskID", "integer", sizeof(long), + FMOffset(MpiWriterContactInfo, taskID)}, {NULL, NULL, 0, 0}}; static FMStructDescRec MpiWriterContactStructs[] = { @@ -241,6 +243,16 @@ static FMStructDescRec MpiWriterContactStructs[] = { /*****Internal functions*****************************************************/ +/** + * Return an unique process ID (Task ID) for the current process. We do this by + * combining the PID of the process and the hostid (as return the same output + * as `hostid` or the content of /etc/machine-id in modern UNIX-like systems). + */ +static uint64_t GetUniqueTaskId() +{ + return ((uint32_t)getpid() * (1ll << 32ll)) | (uint32_t)gethostid(); +} + static void MpiReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, void *client_Data, attr_list attrs); @@ -256,9 +268,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, * the reader side. It should do whatever is necessary to initialize a new * reader-side data plane. A pointer to per-reader-rank contact information * should be placed in *ReaderContactInfoPtr. The structure of that - * information should be described by DPInterface.ReaderContactFormats. (This - * is an FFS format description. See - * https://www.cc.gatech.edu/systems/projects/FFS/.) + * information should be described by DPInterface.ReaderContactFormats. */ static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream, void **ReaderContactInfoPtr, @@ -271,7 +281,7 @@ static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream, CMFormat F; Stream->Stream.CP_Stream = CP_Stream; - Stream->Stream.PID = getpid(); + Stream->Stream.taskID = GetUniqueTaskId(); Stream->Link.Stats = Stats; SMPI_Comm_rank(comm, &Stream->Stream.Rank); @@ -322,7 +332,7 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream, SMPI_Comm_rank(comm, &Stream->Stream.Rank); Stream->Stream.CP_Stream = CP_Stream; - Stream->Stream.PID = getpid(); + Stream->Stream.taskID = GetUniqueTaskId(); STAILQ_INIT(&Stream->TimeSteps); TAILQ_INIT(&Stream->Readers); @@ -347,8 +357,7 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream, * on the connecting peer in InitReader) and should create its own * per-writer-rank contact information and place it in *writerContactInfoPtr. * The structure of that information should be described by - * DPInterface.WriterContactFormats. (This is an FFS format description. See - * https://www.cc.gatech.edu/systems/projects/FFS/.) + * DPInterface.WriterContactFormats. */ static DP_WSR_Stream MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v, @@ -392,7 +401,7 @@ MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v, "Writer Rank %d, test contact", Rank); StreamWPR->MyContactInfo.StreamWPR = StreamWPR; - StreamWPR->MyContactInfo.PID = StreamWR->Stream.PID; + StreamWPR->MyContactInfo.taskID = StreamWR->Stream.taskID; *WriterContactInfoPtr = &StreamWPR->MyContactInfo; return StreamWPR; @@ -503,9 +512,9 @@ static void *MpiReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, ret->cm = cm; ret->CPStream = Stream->Stream.CP_Stream; ret->DestinationRank = Rank; - ret->CommType = (TargetContact->PID == Stream->Stream.PID) ? MPI_DP_LOCAL - : MPI_DP_REMOTE; - + ret->CommType = + (TargetContact->taskID == Stream->Stream.taskID) ? MPI_DP_LOCAL + : MPI_DP_REMOTE; if (ret->CommType == MPI_DP_REMOTE) { CMCondition_set_client_data(cm, ReadRequestMsg.NotifyCondition, ret); @@ -576,7 +585,7 @@ static int MpiWaitForCompletion(CP_Services Svcs, void *Handle_v) else { Svcs->verbose( - Handle->CPStream, DPTraceVerbose, + Handle->CPStream, DPCriticalVerbose, "Remote memory read to rank %d with condition %d has FAILED" "because of " "writer failure\n", @@ -615,7 +624,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, if (!RequestedData) { PERFSTUBS_TIMER_STOP_FUNC(timer); - Svcs->verbose(StreamWR->Stream.CP_Stream, DPPerStepVerbose, + Svcs->verbose(StreamWR->Stream.CP_Stream, DPCriticalVerbose, "Failed to read TimeStep %ld, not found\n", ReadRequestMsg->TimeStep); return; @@ -850,11 +859,42 @@ static void MpiNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v, FailedPeerRank); } +/** MpiDisconnectWriterPerReader. + * + * This is called whenever a reader disconnect from a writer. This function + * simply disconnect the mpi communicator, it does not frees any data + * structure. We must do it in this way since: + * + * - There is the possibility of the failed peer to re-enter in the network. + * - We must disconnect the MPI port for that particular mpi reader task since + * otherwise it the reader task might hung in mpi_finalize, in the case the + * the failure leads to a application graceful exit. + */ +static void MpiDisconnectWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v) +{ + MpiStreamWPR StreamWPR = (MpiStreamWPR)WSR_Stream_v; + MpiStreamWR StreamWR = StreamWPR->StreamWR; + + const int CohortSize = StreamWPR->Link.CohortSize; + + Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose, + "MpiDisconnectWriterPerReader invoked [rank:%d;cohortSize:%d]\n", CohortSize, + StreamWR->Stream.Rank); + + for (int i = 0; i < CohortSize; i++) + { + if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL) + { + MPI_Comm_disconnect(&StreamWPR->CohortMpiComms[i]); + } + } +} + /** * MpiDestroyWriterPerReader. * - * This is called whenever a reader disconnect from a writer. This function - * also removes the StreamWPR from its own StreamWR. + * This is called by the MpiDestroyWriter function. This function will free any resource + * allocated to the particulare WriterPerReader instance (StreamWPR). */ static void MpiDestroyWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v) @@ -864,6 +904,10 @@ static void MpiDestroyWriterPerReader(CP_Services Svcs, const int CohortSize = StreamWPR->Link.CohortSize; + Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose, + "MpiDestroyWriterPerReader invoked [rank:%d;cohortSize:%d]", CohortSize, + StreamWR->Stream.Rank); + for (int i = 0; i < CohortSize; i++) { if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL) @@ -889,6 +933,9 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v) { MpiStreamWR StreamWR = (MpiStreamWR)WS_Stream_v; + Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose, + "MpiDestroyWriter invoked [rank:%d]\n", StreamWR->Stream.Rank); + pthread_mutex_lock(&StreamWR->MutexReaders); while (!TAILQ_EMPTY(&StreamWR->Readers)) { @@ -918,6 +965,10 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v) static void MpiDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v) { MpiStreamRD StreamRS = (MpiStreamRD)RS_Stream_v; + + Svcs->verbose(StreamRS->Stream.CP_Stream, DPTraceVerbose, + "MpiDestroyReader invoked [rank:%d]\n", StreamRS->Stream.Rank); + const int CohortSize = StreamRS->Link.CohortSize; for (int i = 0; i < CohortSize; i++) @@ -948,7 +999,7 @@ extern CP_DP_Interface LoadMpiDP() .getPriority = MpiGetPriority, .destroyReader = MpiDestroyReader, .destroyWriter = MpiDestroyWriter, - .destroyWriterPerReader = MpiDestroyWriterPerReader, + .destroyWriterPerReader = MpiDisconnectWriterPerReader, .notifyConnFailure = MpiNotifyConnFailure, };