diff --git a/source/adios2/engine/sst/SstReader.tcc b/source/adios2/engine/sst/SstReader.tcc index 3249d3ff47..f2f97f0902 100644 --- a/source/adios2/engine/sst/SstReader.tcc +++ b/source/adios2/engine/sst/SstReader.tcc @@ -124,7 +124,11 @@ void SstReader::ReadVariableBlocks(Variable &variable) // wait for all SstRead requests to finish for (const auto &i : sstReadHandlers) { - SstWaitForCompletion(m_Input, i); + if (SstWaitForCompletion(m_Input, i) != SstSuccess) + { + throw std::runtime_error( + "ERROR: Writer failed before returning data"); + } } size_t iter = 0; diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c index c4f762cce2..4d79decf1d 100644 --- a/source/adios2/toolkit/sst/cp/cp_common.c +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -1178,8 +1178,8 @@ SstStream CP_newStream() static void DP_verbose(SstStream Stream, char *Format, ...); static CManager CP_getCManager(SstStream Stream); -static void CP_sendToPeer(SstStream Stream, CP_PeerCohort cohort, int rank, - CMFormat Format, void *data); +static int CP_sendToPeer(SstStream Stream, CP_PeerCohort cohort, int rank, + CMFormat Format, void *data); static MPI_Comm CP_getMPIComm(SstStream Stream); struct _CP_Services Svcs = { @@ -1336,8 +1336,12 @@ static CManager CP_getCManager(SstStream Stream) { return Stream->CPInfo->cm; } static MPI_Comm CP_getMPIComm(SstStream Stream) { return Stream->mpiComm; } -static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, - CMFormat Format, void *Data) +extern void WriterConnCloseHandler(CManager cm, CMConnection closed_conn, + void *client_data); +extern void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn, + void *client_data); +static int CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, + CMFormat Format, void *Data) { CP_PeerConnection *Peers = (CP_PeerConnection *)Cohort; if (Peers[Rank].CMconn == NULL) @@ -1348,14 +1352,44 @@ static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, CP_error(s, "Connection failed in CP_sendToPeer! Contact list was:\n"); CP_error(s, attr_list_to_string(Peers[Rank].ContactList)); - return; + return 0; + } + if (s->Role == ReaderRole) + { + CP_verbose( + s, + "Registering reader close handler for peer %d CONNECTION %p\n", + Rank, Peers[Rank].CMconn); + CMconn_register_close_handler(Peers[Rank].CMconn, + ReaderConnCloseHandler, (void *)s); + } + else + { + for (int i = 0; i < s->ReaderCount; i++) + { + if (Peers == s->Readers[i]->Connections) + { + CP_verbose(s, + "Registering writer close handler for peer %d, " + "CONNECTION %p\n", + Rank, Peers[Rank].CMconn); + CMconn_register_close_handler(Peers[Rank].CMconn, + WriterConnCloseHandler, + (void *)s->Readers[i]); + break; + } + } } } if (CMwrite(Peers[Rank].CMconn, Format, Data) != 1) { - CP_verbose(s, "Message failed to send to peer %d in CP_sendToPeer()\n", - Rank); + CP_verbose(s, + "Message failed to send to peer %d CONNECTION %p in " + "CP_sendToPeer()\n", + Rank, Peers[Rank].CMconn); + return 0; } + return 1; } CPNetworkInfoFunc globalNetinfoCallback = NULL; diff --git a/source/adios2/toolkit/sst/cp/cp_reader.c b/source/adios2/toolkit/sst/cp/cp_reader.c index 5aab1e8a94..38750d404d 100644 --- a/source/adios2/toolkit/sst/cp/cp_reader.c +++ b/source/adios2/toolkit/sst/cp/cp_reader.c @@ -141,12 +141,13 @@ static char *readContactInfo(const char *Name, SstStream Stream, int Timeout) } } -static void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn, +extern void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn, void *client_data) { TAU_START_FUNC(); SstStream Stream = (SstStream)client_data; int FailedPeerRank = -1; + CP_verbose(Stream, "Reader-side close handler invoked\n"); for (int i = 0; i < Stream->WriterCohortSize; i++) { if (Stream->ConnectionsToWriter[i].CMconn == ClosedConn) diff --git a/source/adios2/toolkit/sst/cp/cp_writer.c b/source/adios2/toolkit/sst/cp/cp_writer.c index 0f2bf6347c..9cca461017 100644 --- a/source/adios2/toolkit/sst/cp/cp_writer.c +++ b/source/adios2/toolkit/sst/cp/cp_writer.c @@ -383,7 +383,7 @@ static void QueueMaintenance(SstStream Stream) QueueMaintenance UNLOCK */ -static void WriterConnCloseHandler(CManager cm, CMConnection closed_conn, +extern void WriterConnCloseHandler(CManager cm, CMConnection closed_conn, void *client_data) { TAU_START_FUNC(); diff --git a/source/adios2/toolkit/sst/dp/evpath_dp.c b/source/adios2/toolkit/sst/dp/evpath_dp.c index 1b5ac42f2e..6b50e9848e 100644 --- a/source/adios2/toolkit/sst/dp/evpath_dp.c +++ b/source/adios2/toolkit/sst/dp/evpath_dp.c @@ -821,8 +821,12 @@ static void *EvpathReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, ReadRequestMsg.RS_Stream = Stream; ReadRequestMsg.RequestingRank = Stream->Rank; ReadRequestMsg.NotifyCondition = ret->CMcondition; - Svcs->sendToPeer(Stream->CP_Stream, Stream->PeerCohort, Rank, - Stream->ReadRequestFormat, &ReadRequestMsg); + if (!Svcs->sendToPeer(Stream->CP_Stream, Stream->PeerCohort, Rank, + Stream->ReadRequestFormat, &ReadRequestMsg)) + { + ret->Failed = 1; + CMCondition_signal(cm, ret->CMcondition); + } return ret; } diff --git a/testing/adios2/engine/staging-common/TestCommonClient.cpp b/testing/adios2/engine/staging-common/TestCommonClient.cpp index e712eed5a0..9bd8a932ba 100644 --- a/testing/adios2/engine/staging-common/TestCommonClient.cpp +++ b/testing/adios2/engine/staging-common/TestCommonClient.cpp @@ -285,6 +285,9 @@ TEST_F(SstReadTest, ADIOS2SstRead) catch (...) { std::cout << "Exception in EndStep, client failed"; + WriterFailed = 1; + std::cout << "Noticed Writer failure" << std::endl; + Continue = false; } ++ExpectedStep; diff --git a/testing/adios2/engine/staging-common/run_test.in b/testing/adios2/engine/staging-common/run_test.in index fd085c7059..f509b5d42f 100755 --- a/testing/adios2/engine/staging-common/run_test.in +++ b/testing/adios2/engine/staging-common/run_test.in @@ -128,20 +128,27 @@ def do_one_client_test(writer_cmd, reader_cmd): def do_kill_writer_test(writer_cmd, reader_cmd, interval): return_code = 0 + print("TestDriver: Starting kill_writer test protocol") + sys.stdout.flush() writer = subprocess.Popen(writer_cmd) reader = subprocess.Popen(reader_cmd) print("TestDriver: Waiting " + str(interval) + " seconds") + sys.stdout.flush() time.sleep(interval) print("TestDriver: Killing Writer") + sys.stdout.flush() writer.terminate() writer.wait() reader.wait() print("TestDriver: Reader exit status was " + str(reader.returncode)) + sys.stdout.flush() if reader.returncode != 0: print("TestDriver: Reader failed, causing test failure") + sys.stdout.flush() return_code = 1 print("TestDriver: Writer exit status was " + - str(writer.returncode)) + " (ignored)" + str(writer.returncode) + " (ignored)") + sys.stdout.flush() return return_code