From c6fb0cb63648bbd9f8873b1a63d29786de4db817 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Tue, 6 Aug 2019 07:20:15 -0400 Subject: [PATCH 01/12] Add time to KillWriter delay for stability --- testing/adios2/engine/staging-common/TestSupp.cmake | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/testing/adios2/engine/staging-common/TestSupp.cmake b/testing/adios2/engine/staging-common/TestSupp.cmake index 7bac6a9105..27b3a7f798 100644 --- a/testing/adios2/engine/staging-common/TestSupp.cmake +++ b/testing/adios2/engine/staging-common/TestSupp.cmake @@ -118,8 +118,8 @@ set (KillReaders3Max_CMD "run_test.py --test_protocol kill_readers -nw 3 -nr 2 set (KillReaders3Max_TIMEOUT "300") set (KillReaders3Max_PROPERTIES "RUN_SERIAL;1") -set (KillWriter_2x2_CMD "run_test.py --test_protocol kill_writer -nw 2 -nr 2 --interval 2 --warg=RendezvousReaderCount=1,WENGINE_PARAMS --rarg=--expect_writer_failure --rarg=--num_steps --rarg=1000") -set (KillWriterTimeout_2x2_CMD "run_test.py --test_protocol kill_writer -nw 2 -nr 2 --interval 2 --warg=RendezvousReaderCount=1,WENGINE_PARAMS --rarg=--expect_writer_failure --rarg=--num_steps --rarg=1000 --rarg=--non_blocking") +set (KillWriter_2x2_CMD "run_test.py --test_protocol kill_writer -nw 2 -nr 2 --interval 5 --warg=RendezvousReaderCount=1,WENGINE_PARAMS --rarg=--expect_writer_failure --rarg=--num_steps --rarg=1000") +set (KillWriterTimeout_2x2_CMD "run_test.py --test_protocol kill_writer -nw 2 -nr 2 --interval 5 --warg=RendezvousReaderCount=1,WENGINE_PARAMS --rarg=--expect_writer_failure --rarg=--num_steps --rarg=1000 --rarg=--non_blocking") set (PreciousTimestep_CMD "run_test.py --test_protocol kill_readers -nw 3 -nr 2 --max_readers 2 --warg=FirstTimestepPrecious=true,RendezvousReaderCount=0,WENGINE_PARAMS --rarg=--ignore_time_gap --rarg=--precious_first") From f71f569950bca14fc41589a99f6d264707b37269 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Tue, 6 Aug 2019 09:12:05 -0400 Subject: [PATCH 02/12] Flush stdout in kill_writer for more info --- testing/adios2/engine/staging-common/run_test.in | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/testing/adios2/engine/staging-common/run_test.in b/testing/adios2/engine/staging-common/run_test.in index fd085c7059..f509b5d42f 100755 --- a/testing/adios2/engine/staging-common/run_test.in +++ b/testing/adios2/engine/staging-common/run_test.in @@ -128,20 +128,27 @@ def do_one_client_test(writer_cmd, reader_cmd): def do_kill_writer_test(writer_cmd, reader_cmd, interval): return_code = 0 + print("TestDriver: Starting kill_writer test protocol") + sys.stdout.flush() writer = subprocess.Popen(writer_cmd) reader = subprocess.Popen(reader_cmd) print("TestDriver: Waiting " + str(interval) + " seconds") + sys.stdout.flush() time.sleep(interval) print("TestDriver: Killing Writer") + sys.stdout.flush() writer.terminate() writer.wait() reader.wait() print("TestDriver: Reader exit status was " + str(reader.returncode)) + sys.stdout.flush() if reader.returncode != 0: print("TestDriver: Reader failed, causing test failure") + sys.stdout.flush() return_code = 1 print("TestDriver: Writer exit status was " + - str(writer.returncode)) + " (ignored)" + str(writer.returncode) + " (ignored)") + sys.stdout.flush() return return_code From bebe6b2a0527f34fbe94502bff543417ad932ad8 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Tue, 6 Aug 2019 11:03:59 -0400 Subject: [PATCH 03/12] Turn on SstCPVerbose for kill_writer only --- testing/adios2/engine/staging-common/run_test.in | 1 + 1 file changed, 1 insertion(+) diff --git a/testing/adios2/engine/staging-common/run_test.in b/testing/adios2/engine/staging-common/run_test.in index f509b5d42f..90574bc6b4 100755 --- a/testing/adios2/engine/staging-common/run_test.in +++ b/testing/adios2/engine/staging-common/run_test.in @@ -128,6 +128,7 @@ def do_one_client_test(writer_cmd, reader_cmd): def do_kill_writer_test(writer_cmd, reader_cmd, interval): return_code = 0 + os.environ['SstCPVerbose'] = "1" # visible in this process + all children print("TestDriver: Starting kill_writer test protocol") sys.stdout.flush() writer = subprocess.Popen(writer_cmd) From 22f931fa7fc33d06e030032693f162d18ee7e832 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Tue, 6 Aug 2019 13:43:16 -0400 Subject: [PATCH 04/12] handle failures of DP-created connections so that we get peer close notifications --- source/adios2/toolkit/sst/cp/cp_common.c | 22 ++++++++++++++++++++++ source/adios2/toolkit/sst/cp/cp_reader.c | 2 +- source/adios2/toolkit/sst/cp/cp_writer.c | 2 +- 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c index c4f762cce2..df6ab9fb1d 100644 --- a/source/adios2/toolkit/sst/cp/cp_common.c +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -1336,6 +1336,10 @@ static CManager CP_getCManager(SstStream Stream) { return Stream->CPInfo->cm; } static MPI_Comm CP_getMPIComm(SstStream Stream) { return Stream->mpiComm; } +extern void WriterConnCloseHandler(CManager cm, CMConnection closed_conn, + void *client_data); +extern void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn, + void *client_data); static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, CMFormat Format, void *Data) { @@ -1350,6 +1354,24 @@ static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, CP_error(s, attr_list_to_string(Peers[Rank].ContactList)); return; } + if (s->Role == ReaderRole) + { + CMconn_register_close_handler(Peers[Rank].CMconn, + ReaderConnCloseHandler, (void *)s); + } + else + { + for (int i = 0; i < s->ReaderCount; i++) + { + if (Peers == s->Readers[i]->Connections) + { + CMconn_register_close_handler(Peers[Rank].CMconn, + WriterConnCloseHandler, + (void *)s->Readers[i]); + break; + } + } + } } if (CMwrite(Peers[Rank].CMconn, Format, Data) != 1) { diff --git a/source/adios2/toolkit/sst/cp/cp_reader.c b/source/adios2/toolkit/sst/cp/cp_reader.c index 5aab1e8a94..3bc38af31f 100644 --- a/source/adios2/toolkit/sst/cp/cp_reader.c +++ b/source/adios2/toolkit/sst/cp/cp_reader.c @@ -141,7 +141,7 @@ static char *readContactInfo(const char *Name, SstStream Stream, int Timeout) } } -static void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn, +extern void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn, void *client_data) { TAU_START_FUNC(); diff --git a/source/adios2/toolkit/sst/cp/cp_writer.c b/source/adios2/toolkit/sst/cp/cp_writer.c index 0f2bf6347c..9cca461017 100644 --- a/source/adios2/toolkit/sst/cp/cp_writer.c +++ b/source/adios2/toolkit/sst/cp/cp_writer.c @@ -383,7 +383,7 @@ static void QueueMaintenance(SstStream Stream) QueueMaintenance UNLOCK */ -static void WriterConnCloseHandler(CManager cm, CMConnection closed_conn, +extern void WriterConnCloseHandler(CManager cm, CMConnection closed_conn, void *client_data) { TAU_START_FUNC(); From c5ab6608e3bdf67c553bb6910392fb60504984a9 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Tue, 6 Aug 2019 14:50:44 -0400 Subject: [PATCH 05/12] handle write failure --- source/adios2/toolkit/sst/cp/cp_common.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c index df6ab9fb1d..d3d2a2321b 100644 --- a/source/adios2/toolkit/sst/cp/cp_common.c +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -1356,6 +1356,8 @@ static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, } if (s->Role == ReaderRole) { + CP_verbose(s, "Registering reader close handler for peer %d\n", + Rank); CMconn_register_close_handler(Peers[Rank].CMconn, ReaderConnCloseHandler, (void *)s); } @@ -1365,6 +1367,9 @@ static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, { if (Peers == s->Readers[i]->Connections) { + CP_verbose(s, + "Registering writer close handler for peer %d\n", + Rank); CMconn_register_close_handler(Peers[Rank].CMconn, WriterConnCloseHandler, (void *)s->Readers[i]); @@ -1377,6 +1382,8 @@ static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, { CP_verbose(s, "Message failed to send to peer %d in CP_sendToPeer()\n", Rank); + CMConnection_close(Peers[Rank].CMconn); + Peers[Rank].CMconn = NULL; } } From 3ab836d7c12fd458a0edc265b4df900ef40ae3f2 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Tue, 6 Aug 2019 15:51:38 -0400 Subject: [PATCH 06/12] More verbosity --- source/adios2/toolkit/sst/cp/cp_common.c | 19 +++++++++++-------- source/adios2/toolkit/sst/cp/cp_reader.c | 1 + 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c index d3d2a2321b..85bf864dea 100644 --- a/source/adios2/toolkit/sst/cp/cp_common.c +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -1356,8 +1356,10 @@ static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, } if (s->Role == ReaderRole) { - CP_verbose(s, "Registering reader close handler for peer %d\n", - Rank); + CP_verbose( + s, + "Registering reader close handler for peer %d CONNECTION %p\n", + Rank, Peers[Rank].CMconn); CMconn_register_close_handler(Peers[Rank].CMconn, ReaderConnCloseHandler, (void *)s); } @@ -1368,8 +1370,9 @@ static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, if (Peers == s->Readers[i]->Connections) { CP_verbose(s, - "Registering writer close handler for peer %d\n", - Rank); + "Registering writer close handler for peer %d, " + "CONNECTION %p\n", + Rank, Peers[Rank].CMconn); CMconn_register_close_handler(Peers[Rank].CMconn, WriterConnCloseHandler, (void *)s->Readers[i]); @@ -1380,10 +1383,10 @@ static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, } if (CMwrite(Peers[Rank].CMconn, Format, Data) != 1) { - CP_verbose(s, "Message failed to send to peer %d in CP_sendToPeer()\n", - Rank); - CMConnection_close(Peers[Rank].CMconn); - Peers[Rank].CMconn = NULL; + CP_verbose(s, + "Message failed to send to peer %d CONNECTION %p in " + "CP_sendToPeer()\n", + Rank, Peers[Rank].CMconn, ); } } diff --git a/source/adios2/toolkit/sst/cp/cp_reader.c b/source/adios2/toolkit/sst/cp/cp_reader.c index 3bc38af31f..38750d404d 100644 --- a/source/adios2/toolkit/sst/cp/cp_reader.c +++ b/source/adios2/toolkit/sst/cp/cp_reader.c @@ -147,6 +147,7 @@ extern void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn, TAU_START_FUNC(); SstStream Stream = (SstStream)client_data; int FailedPeerRank = -1; + CP_verbose(Stream, "Reader-side close handler invoked\n"); for (int i = 0; i < Stream->WriterCohortSize; i++) { if (Stream->ConnectionsToWriter[i].CMconn == ClosedConn) From 5cbe659bdd5973b3c5e5c2e7843ed8cfee2f372d Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Tue, 6 Aug 2019 16:04:16 -0400 Subject: [PATCH 07/12] fix --- source/adios2/toolkit/sst/cp/cp_common.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c index 85bf864dea..740cd5f704 100644 --- a/source/adios2/toolkit/sst/cp/cp_common.c +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -1386,7 +1386,7 @@ static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, CP_verbose(s, "Message failed to send to peer %d CONNECTION %p in " "CP_sendToPeer()\n", - Rank, Peers[Rank].CMconn, ); + Rank, Peers[Rank].CMconn); } } From 594d25db9fd541a5a060006c08593dbaeafb3ff6 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Wed, 7 Aug 2019 07:45:33 -0400 Subject: [PATCH 08/12] return status from sendToPeer and use that status --- source/adios2/toolkit/sst/cp/cp_common.c | 12 +++++++----- source/adios2/toolkit/sst/dp/evpath_dp.c | 8 ++++++-- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c index 740cd5f704..4d79decf1d 100644 --- a/source/adios2/toolkit/sst/cp/cp_common.c +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -1178,8 +1178,8 @@ SstStream CP_newStream() static void DP_verbose(SstStream Stream, char *Format, ...); static CManager CP_getCManager(SstStream Stream); -static void CP_sendToPeer(SstStream Stream, CP_PeerCohort cohort, int rank, - CMFormat Format, void *data); +static int CP_sendToPeer(SstStream Stream, CP_PeerCohort cohort, int rank, + CMFormat Format, void *data); static MPI_Comm CP_getMPIComm(SstStream Stream); struct _CP_Services Svcs = { @@ -1340,8 +1340,8 @@ extern void WriterConnCloseHandler(CManager cm, CMConnection closed_conn, void *client_data); extern void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn, void *client_data); -static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, - CMFormat Format, void *Data) +static int CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, + CMFormat Format, void *Data) { CP_PeerConnection *Peers = (CP_PeerConnection *)Cohort; if (Peers[Rank].CMconn == NULL) @@ -1352,7 +1352,7 @@ static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, CP_error(s, "Connection failed in CP_sendToPeer! Contact list was:\n"); CP_error(s, attr_list_to_string(Peers[Rank].ContactList)); - return; + return 0; } if (s->Role == ReaderRole) { @@ -1387,7 +1387,9 @@ static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, "Message failed to send to peer %d CONNECTION %p in " "CP_sendToPeer()\n", Rank, Peers[Rank].CMconn); + return 0; } + return 1; } CPNetworkInfoFunc globalNetinfoCallback = NULL; diff --git a/source/adios2/toolkit/sst/dp/evpath_dp.c b/source/adios2/toolkit/sst/dp/evpath_dp.c index 36cd75f31b..9c69dd7783 100644 --- a/source/adios2/toolkit/sst/dp/evpath_dp.c +++ b/source/adios2/toolkit/sst/dp/evpath_dp.c @@ -811,8 +811,12 @@ static void *EvpathReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, ReadRequestMsg.RS_Stream = Stream; ReadRequestMsg.RequestingRank = Stream->Rank; ReadRequestMsg.NotifyCondition = ret->CMcondition; - Svcs->sendToPeer(Stream->CP_Stream, Stream->PeerCohort, Rank, - Stream->ReadRequestFormat, &ReadRequestMsg); + if (!Svcs->sendToPeer(Stream->CP_Stream, Stream->PeerCohort, Rank, + Stream->ReadRequestFormat, &ReadRequestMsg)) + { + ret->Failed = 1; + CMCondition_signal(cm, ret->CMcondition); + } return ret; } From d2d82e29347f4981796618be5be8aaf5ef163931 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Wed, 7 Aug 2019 07:46:05 -0400 Subject: [PATCH 09/12] return to prior interval for test spec --- testing/adios2/engine/staging-common/TestSupp.cmake | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/testing/adios2/engine/staging-common/TestSupp.cmake b/testing/adios2/engine/staging-common/TestSupp.cmake index 27b3a7f798..7bac6a9105 100644 --- a/testing/adios2/engine/staging-common/TestSupp.cmake +++ b/testing/adios2/engine/staging-common/TestSupp.cmake @@ -118,8 +118,8 @@ set (KillReaders3Max_CMD "run_test.py --test_protocol kill_readers -nw 3 -nr 2 set (KillReaders3Max_TIMEOUT "300") set (KillReaders3Max_PROPERTIES "RUN_SERIAL;1") -set (KillWriter_2x2_CMD "run_test.py --test_protocol kill_writer -nw 2 -nr 2 --interval 5 --warg=RendezvousReaderCount=1,WENGINE_PARAMS --rarg=--expect_writer_failure --rarg=--num_steps --rarg=1000") -set (KillWriterTimeout_2x2_CMD "run_test.py --test_protocol kill_writer -nw 2 -nr 2 --interval 5 --warg=RendezvousReaderCount=1,WENGINE_PARAMS --rarg=--expect_writer_failure --rarg=--num_steps --rarg=1000 --rarg=--non_blocking") +set (KillWriter_2x2_CMD "run_test.py --test_protocol kill_writer -nw 2 -nr 2 --interval 2 --warg=RendezvousReaderCount=1,WENGINE_PARAMS --rarg=--expect_writer_failure --rarg=--num_steps --rarg=1000") +set (KillWriterTimeout_2x2_CMD "run_test.py --test_protocol kill_writer -nw 2 -nr 2 --interval 2 --warg=RendezvousReaderCount=1,WENGINE_PARAMS --rarg=--expect_writer_failure --rarg=--num_steps --rarg=1000 --rarg=--non_blocking") set (PreciousTimestep_CMD "run_test.py --test_protocol kill_readers -nw 3 -nr 2 --max_readers 2 --warg=FirstTimestepPrecious=true,RendezvousReaderCount=0,WENGINE_PARAMS --rarg=--ignore_time_gap --rarg=--precious_first") From f8b854beb4b05183f5eb884a2a4430ff51e193ef Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Wed, 7 Aug 2019 07:58:56 -0400 Subject: [PATCH 10/12] Mod common client to mark failure when catching EndStep exception, not done earlier --- testing/adios2/engine/staging-common/TestCommonClient.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/testing/adios2/engine/staging-common/TestCommonClient.cpp b/testing/adios2/engine/staging-common/TestCommonClient.cpp index e712eed5a0..9bd8a932ba 100644 --- a/testing/adios2/engine/staging-common/TestCommonClient.cpp +++ b/testing/adios2/engine/staging-common/TestCommonClient.cpp @@ -285,6 +285,9 @@ TEST_F(SstReadTest, ADIOS2SstRead) catch (...) { std::cout << "Exception in EndStep, client failed"; + WriterFailed = 1; + std::cout << "Noticed Writer failure" << std::endl; + Continue = false; } ++ExpectedStep; From 2a6eccd4a67a48fcc5b2c316d243b2786d3bcf90 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Wed, 7 Aug 2019 08:29:31 -0400 Subject: [PATCH 11/12] Check result status from waitForCompletion in BP marshaling case --- source/adios2/engine/sst/SstReader.tcc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/adios2/engine/sst/SstReader.tcc b/source/adios2/engine/sst/SstReader.tcc index 3249d3ff47..f2f97f0902 100644 --- a/source/adios2/engine/sst/SstReader.tcc +++ b/source/adios2/engine/sst/SstReader.tcc @@ -124,7 +124,11 @@ void SstReader::ReadVariableBlocks(Variable &variable) // wait for all SstRead requests to finish for (const auto &i : sstReadHandlers) { - SstWaitForCompletion(m_Input, i); + if (SstWaitForCompletion(m_Input, i) != SstSuccess) + { + throw std::runtime_error( + "ERROR: Writer failed before returning data"); + } } size_t iter = 0; From 7fdf9baae5006fd2779227a076fa5977d13003d9 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Wed, 7 Aug 2019 10:58:53 -0400 Subject: [PATCH 12/12] Kill explicit SstVerbose on kill_writer test --- testing/adios2/engine/staging-common/run_test.in | 1 - 1 file changed, 1 deletion(-) diff --git a/testing/adios2/engine/staging-common/run_test.in b/testing/adios2/engine/staging-common/run_test.in index 90574bc6b4..f509b5d42f 100755 --- a/testing/adios2/engine/staging-common/run_test.in +++ b/testing/adios2/engine/staging-common/run_test.in @@ -128,7 +128,6 @@ def do_one_client_test(writer_cmd, reader_cmd): def do_kill_writer_test(writer_cmd, reader_cmd, interval): return_code = 0 - os.environ['SstCPVerbose'] = "1" # visible in this process + all children print("TestDriver: Starting kill_writer test protocol") sys.stdout.flush() writer = subprocess.Popen(writer_cmd)