From 3c1d8cf8ce5df84be2d046b687dc4cddd71cc326 Mon Sep 17 00:00:00 2001 From: Boris Zbarsky Date: Fri, 16 Dec 2022 14:53:21 -0500 Subject: [PATCH] Fix management of the mNumReportsInFlight count in reporting engine. (#24093) (#24114) * Fix management of the mNumReportsInFlight count in reporting engine. (#24093) If a ReadHandler failed out of SendReportData (e.g. because the session it's on had been marked as defunct), we would increment mNumReportsInFlight and never decrement it. After this happened CHIP_IM_MAX_REPORTS_IN_FLIGHT times (4 by default), we would stop being able to send out any more data reports. This situation is pretty easy to trigger as follows: 1. Use chip-tool to commission a device with node id 17. 2. Start chip-tool interactive mode. 3. Run the following commands in interactive mode: onoff subscribe on-off 0 60 17 1 --keepSubscriptions true onoff subscribe on-off 0 60 17 1 --keepSubscriptions true onoff subscribe on-off 0 60 17 1 --keepSubscriptions true onoff subscribe on-off 0 60 17 1 --keepSubscriptions true onoff subscribe on-off 0 2 17 1 --keepSubscriptions true 4. quit interactive mode (Ctrl-C or quit() command). 5. Wait 60 seconds for all the subscriptions to error out. After this the device will no longer respond with data reports to any read or subscribe requests. * Add a test for a subscription trying to send reports on a defunct session. This is testing the situation described in https://github.com/project-chip/connectedhomeip/pull/24093 * add mNumReportsInFlight check test when failing to send report Co-authored-by: yunhanw --- src/app/ReadHandler.cpp | 35 +++-- src/app/ReadHandler.h | 2 + src/app/reporting/Engine.cpp | 4 + src/app/tests/TestReadInteraction.cpp | 142 +++++++++++++++++++ src/transport/raw/tests/NetworkTestHelpers.h | 11 +- 5 files changed, 178 insertions(+), 16 deletions(-) diff --git a/src/app/ReadHandler.cpp b/src/app/ReadHandler.cpp index 7fd2436d645f87..68b9af9bb59d5b 100644 --- a/src/app/ReadHandler.cpp +++ b/src/app/ReadHandler.cpp @@ -197,6 +197,7 @@ CHIP_ERROR ReadHandler::SendStatusReport(Protocols::InteractionModel::Status aSt CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, bool aMoreChunks) { VerifyOrReturnLogError(IsReportable(), CHIP_ERROR_INCORRECT_STATE); + VerifyOrDie(!IsAwaitingReportResponse()); // Should not be reportable! if (IsPriming() || IsChunkedReport()) { mSessionHandle.Grab(mExchangeCtx->GetSessionHandle()); @@ -217,27 +218,31 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, b mCurrentReportsBeginGeneration = InteractionModelEngine::GetInstance()->GetReportingEngine().GetDirtySetGeneration(); } SetStateFlag(ReadHandlerFlags::ChunkedReport, aMoreChunks); - bool noResponseExpected = IsType(InteractionType::Read) && !aMoreChunks; - if (!noResponseExpected) - { - MoveToState(HandlerState::AwaitingReportResponse); - } + bool responseExpected = IsType(InteractionType::Subscribe) || aMoreChunks; mExchangeCtx->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime); - CHIP_ERROR err = - mExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::ReportData, std::move(aPayload), - Messaging::SendFlags(noResponseExpected ? Messaging::SendMessageFlags::kNone - : Messaging::SendMessageFlags::kExpectResponse)); - if (err == CHIP_NO_ERROR && noResponseExpected) - { - InteractionModelEngine::GetInstance()->GetReportingEngine().OnReportConfirm(); - } - + CHIP_ERROR err = mExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::ReportData, std::move(aPayload), + responseExpected ? Messaging::SendMessageFlags::kExpectResponse + : Messaging::SendMessageFlags::kNone); if (err == CHIP_NO_ERROR) { + if (responseExpected) + { + MoveToState(HandlerState::AwaitingReportResponse); + } + else + { + // Make sure we're not treated as an in-flight report waiting for a + // response by the reporting engine. + InteractionModelEngine::GetInstance()->GetReportingEngine().OnReportConfirm(); + } + if (IsType(InteractionType::Subscribe) && !IsPriming()) { - err = RefreshSubscribeSyncTimer(); + // Ignore the error from RefreshSubscribeSyncTimer. If we've + // successfully sent the message, we need to return success from + // this method. + RefreshSubscribeSyncTimer(); } } if (!aMoreChunks) diff --git a/src/app/ReadHandler.h b/src/app/ReadHandler.h index cf0ee7ab50b706..cc510e3907a1bc 100644 --- a/src/app/ReadHandler.h +++ b/src/app/ReadHandler.h @@ -252,6 +252,8 @@ class ReadHandler : public Messaging::ExchangeDelegate * @retval #Others If fails to send report data * @retval #CHIP_NO_ERROR On success. * + * If an error is returned, the ReadHandler guarantees that it is not in + * a state where it's waiting for a response. */ CHIP_ERROR SendReportData(System::PacketBufferHandle && aPayload, bool aMoreChunks); diff --git a/src/app/reporting/Engine.cpp b/src/app/reporting/Engine.cpp index e4dcdfc99b012e..9234297b00aa48 100644 --- a/src/app/reporting/Engine.cpp +++ b/src/app/reporting/Engine.cpp @@ -848,6 +848,10 @@ CHIP_ERROR Engine::SendReport(ReadHandler * apReadHandler, System::PacketBufferH // We can only have 1 report in flight for any given read - increment and break out. mNumReportsInFlight++; err = apReadHandler->SendReportData(std::move(aPayload), aHasMoreChunks); + if (err != CHIP_NO_ERROR) + { + --mNumReportsInFlight; + } return err; } diff --git a/src/app/tests/TestReadInteraction.cpp b/src/app/tests/TestReadInteraction.cpp index c56131845e2d0a..04aa4a64343081 100644 --- a/src/app/tests/TestReadInteraction.cpp +++ b/src/app/tests/TestReadInteraction.cpp @@ -319,6 +319,7 @@ class TestReadInteraction static void TestSubscribeRoundtripStatusReportTimeout(nlTestSuite * apSuite, void * apContext); static void TestPostSubscribeRoundtripStatusReportTimeout(nlTestSuite * apSuite, void * apContext); static void TestReadChunkingStatusReportTimeout(nlTestSuite * apSuite, void * apContext); + static void TestReadReportFailure(nlTestSuite * apSuite, void * apContext); static void TestSubscribeRoundtripChunkStatusReportTimeout(nlTestSuite * apSuite, void * apContext); static void TestPostSubscribeRoundtripChunkStatusReportTimeout(nlTestSuite * apSuite, void * apContext); static void TestPostSubscribeRoundtripChunkReportTimeout(nlTestSuite * apSuite, void * apContext); @@ -338,6 +339,7 @@ class TestReadInteraction static void TestReadHandlerInvalidSubscribeRequest(nlTestSuite * apSuite, void * apContext); static void TestSubscribeInvalidateFabric(nlTestSuite * apSuite, void * apContext); static void TestShutdownSubscription(nlTestSuite * apSuite, void * apContext); + static void TestSubscriptionReportWithDefunctSession(nlTestSuite * apSuite, void * apContext); static void TestReadHandlerMalformedSubscribeRequest(nlTestSuite * apSuite, void * apContext); private: @@ -2536,6 +2538,57 @@ void TestReadInteraction::TestReadChunkingStatusReportTimeout(nlTestSuite * apSu ctx.CreateSessionBobToAlice(); } +// ReadClient sends the read request, but handler fails to send the one report (SendMessage returns an error). +// Since this is an un-chunked read, we are not in the AwaitingReportResponse state, so the "reports in flight" +// counter should not increase. +void TestReadInteraction::TestReadReportFailure(nlTestSuite * apSuite, void * apContext) +{ + TestContext & ctx = *static_cast(apContext); + CHIP_ERROR err = CHIP_NO_ERROR; + + Messaging::ReliableMessageMgr * rm = ctx.GetExchangeManager().GetReliableMessageMgr(); + // Shouldn't have anything in the retransmit table when starting the test. + NL_TEST_ASSERT(apSuite, rm->TestGetCountRetransTable() == 0); + + MockInteractionModelApp delegate; + auto * engine = chip::app::InteractionModelEngine::GetInstance(); + err = engine->Init(&ctx.GetExchangeManager(), &ctx.GetFabricTable()); + NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR); + NL_TEST_ASSERT(apSuite, !delegate.mGotEventResponse); + + chip::app::AttributePathParams attributePathParams[1]; + attributePathParams[0].mEndpointId = Test::kMockEndpoint2; + attributePathParams[0].mClusterId = Test::MockClusterId(3); + attributePathParams[0].mAttributeId = Test::MockAttributeId(1); + + ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice()); + readPrepareParams.mpEventPathParamsList = nullptr; + readPrepareParams.mEventPathParamsListSize = 0; + readPrepareParams.mpAttributePathParamsList = attributePathParams; + readPrepareParams.mAttributePathParamsListSize = 1; + + { + app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate, + chip::app::ReadClient::InteractionType::Read); + + ctx.GetLoopback().mNumMessagesToAllowBeforeError = 1; + ctx.GetLoopback().mMessageSendError = CHIP_ERROR_INCORRECT_STATE; + err = readClient.SendRequest(readPrepareParams); + NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR); + + ctx.DrainAndServiceIO(); + NL_TEST_ASSERT(apSuite, engine->GetReportingEngine().GetNumReportsInFlight() == 0); + NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers() == 0); + + ctx.GetLoopback().mNumMessagesToAllowBeforeError = 0; + ctx.GetLoopback().mMessageSendError = CHIP_NO_ERROR; + } + + NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadClients() == 0); + engine->Shutdown(); + NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); +} + void TestReadInteraction::TestSubscribeRoundtripChunkStatusReportTimeout(nlTestSuite * apSuite, void * apContext) { TestContext & ctx = *static_cast(apContext); @@ -4101,6 +4154,93 @@ void TestReadInteraction::TestShutdownSubscription(nlTestSuite * apSuite, void * NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); } +/** + * Tests what happens when a subscription tries to deliver reports but the + * session it has is defunct. Makes sure we correctly tear down the ReadHandler + * and don't increment the "reports in flight" count. + */ +void TestReadInteraction::TestSubscriptionReportWithDefunctSession(nlTestSuite * apSuite, void * apContext) +{ + TestContext & ctx = *static_cast(apContext); + CHIP_ERROR err = CHIP_NO_ERROR; + + Messaging::ReliableMessageMgr * rm = ctx.GetExchangeManager().GetReliableMessageMgr(); + // Shouldn't have anything in the retransmit table when starting the test. + NL_TEST_ASSERT(apSuite, rm->TestGetCountRetransTable() == 0); + + MockInteractionModelApp delegate; + auto * engine = chip::app::InteractionModelEngine::GetInstance(); + err = engine->Init(&ctx.GetExchangeManager(), &ctx.GetFabricTable()); + NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR); + + ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice()); + readPrepareParams.mpAttributePathParamsList = new chip::app::AttributePathParams[1]; + readPrepareParams.mAttributePathParamsListSize = 1; + + AttributePathParams subscribePath(Test::kMockEndpoint3, Test::MockClusterId(2), Test::MockAttributeId(4)); + readPrepareParams.mpAttributePathParamsList[0] = subscribePath; + + readPrepareParams.mMinIntervalFloorSeconds = 0; + readPrepareParams.mMaxIntervalCeilingSeconds = 0; + + { + app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate, + chip::app::ReadClient::InteractionType::Subscribe); + + delegate.mGotReport = false; + + err = readClient.SendSubscribeRequest(std::move(readPrepareParams)); + NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR); + + ctx.DrainAndServiceIO(); + + NL_TEST_ASSERT(apSuite, delegate.mGotReport); + NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1); + NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Read) == 0); + NL_TEST_ASSERT(apSuite, engine->GetReportingEngine().GetNumReportsInFlight() == 0); + + NL_TEST_ASSERT(apSuite, engine->ActiveHandlerAt(0) != nullptr); + auto * readHandler = engine->ActiveHandlerAt(0); + + // Verify that the session we will reset later is the one we will mess + // with now. + NL_TEST_ASSERT(apSuite, SessionHandle(*readHandler->GetSession()) == ctx.GetSessionAliceToBob()); + + // Test that we send reports as needed. + readHandler->mFlags.Set(ReadHandler::ReadHandlerFlags::HoldReport, false); + delegate.mGotReport = false; + engine->GetReportingEngine().SetDirty(subscribePath); + + ctx.DrainAndServiceIO(); + + NL_TEST_ASSERT(apSuite, delegate.mGotReport); + NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1); + NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Read) == 0); + NL_TEST_ASSERT(apSuite, engine->GetReportingEngine().GetNumReportsInFlight() == 0); + + // Test that if the session is defunct we don't send reports and clean + // up properly. + readHandler->GetSession()->MarkAsDefunct(); + readHandler->mFlags.Set(ReadHandler::ReadHandlerFlags::HoldReport, false); + delegate.mGotReport = false; + engine->GetReportingEngine().SetDirty(subscribePath); + + ctx.DrainAndServiceIO(); + + NL_TEST_ASSERT(apSuite, !delegate.mGotReport); + NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 0); + NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Read) == 0); + NL_TEST_ASSERT(apSuite, engine->GetReportingEngine().GetNumReportsInFlight() == 0); + } + engine->Shutdown(); + NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadClients() == 0); + NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); + + // Get rid of our defunct session. + ctx.ExpireSessionAliceToBob(); + ctx.CreateSessionAliceToBob(); +} + } // namespace app } // namespace chip @@ -4160,10 +4300,12 @@ const nlTest sTests[] = NL_TEST_DEF("TestSubscribeRoundtripStatusReportTimeout", chip::app::TestReadInteraction::TestSubscribeRoundtripStatusReportTimeout), NL_TEST_DEF("TestPostSubscribeRoundtripStatusReportTimeout", chip::app::TestReadInteraction::TestPostSubscribeRoundtripStatusReportTimeout), NL_TEST_DEF("TestReadChunkingStatusReportTimeout", chip::app::TestReadInteraction::TestReadChunkingStatusReportTimeout), + NL_TEST_DEF("TestReadReportFailure", chip::app::TestReadInteraction::TestReadReportFailure), NL_TEST_DEF("TestSubscribeRoundtripChunkStatusReportTimeout", chip::app::TestReadInteraction::TestSubscribeRoundtripChunkStatusReportTimeout), NL_TEST_DEF("TestPostSubscribeRoundtripChunkStatusReportTimeout", chip::app::TestReadInteraction::TestPostSubscribeRoundtripChunkStatusReportTimeout), NL_TEST_DEF("TestPostSubscribeRoundtripChunkReportTimeout", chip::app::TestReadInteraction::TestPostSubscribeRoundtripChunkReportTimeout), NL_TEST_DEF("TestReadShutdown", chip::app::TestReadInteraction::TestReadShutdown), + NL_TEST_DEF("TestSubscriptionReportWithDefunctSession", chip::app::TestReadInteraction::TestSubscriptionReportWithDefunctSession), NL_TEST_SENTINEL() }; // clang-format on diff --git a/src/transport/raw/tests/NetworkTestHelpers.h b/src/transport/raw/tests/NetworkTestHelpers.h index de459cbeed9090..46333b7fec594b 100644 --- a/src/transport/raw/tests/NetworkTestHelpers.h +++ b/src/transport/raw/tests/NetworkTestHelpers.h @@ -103,9 +103,16 @@ class LoopbackTransport : public Transport::Base CHIP_ERROR SendMessage(const Transport::PeerAddress & address, System::PacketBufferHandle && msgBuf) override { - ReturnErrorOnFailure(mMessageSendError); + if (mNumMessagesToAllowBeforeError == 0) + { + ReturnErrorOnFailure(mMessageSendError); + } mSentMessageCount++; bool dropMessage = false; + if (mNumMessagesToAllowBeforeError > 0) + { + --mNumMessagesToAllowBeforeError; + } if (mNumMessagesToAllowBeforeDropping > 0) { --mNumMessagesToAllowBeforeDropping; @@ -143,6 +150,7 @@ class LoopbackTransport : public Transport::Base mDroppedMessageCount = 0; mSentMessageCount = 0; mNumMessagesToAllowBeforeDropping = 0; + mNumMessagesToAllowBeforeError = 0; mMessageSendError = CHIP_NO_ERROR; } @@ -163,6 +171,7 @@ class LoopbackTransport : public Transport::Base uint32_t mDroppedMessageCount = 0; uint32_t mSentMessageCount = 0; uint32_t mNumMessagesToAllowBeforeDropping = 0; + uint32_t mNumMessagesToAllowBeforeError = 0; CHIP_ERROR mMessageSendError = CHIP_NO_ERROR; LoopbackTransportDelegate * mDelegate = nullptr; };