Skip to content

Commit

Permalink
Fix management of the mNumReportsInFlight count in reporting engine. (#…
Browse files Browse the repository at this point in the history
…24093) (#24114)

* Fix management of the mNumReportsInFlight count in reporting engine. (#24093)

If a ReadHandler failed out of SendReportData (e.g. because the session it's on
had been marked as defunct), we would increment mNumReportsInFlight and never
decrement it.  After this happened CHIP_IM_MAX_REPORTS_IN_FLIGHT times (4 by
default), we would stop being able to send out any more data reports.

This situation is pretty easy to trigger as follows:

1. Use chip-tool to commission a device with node id 17.
2. Start chip-tool interactive mode.
3. Run the following commands in interactive mode:

   onoff subscribe on-off 0 60 17 1 --keepSubscriptions true
   onoff subscribe on-off 0 60 17 1 --keepSubscriptions true
   onoff subscribe on-off 0 60 17 1 --keepSubscriptions true
   onoff subscribe on-off 0 60 17 1 --keepSubscriptions true
   onoff subscribe on-off 0 2 17 1 --keepSubscriptions true

4. quit interactive mode (Ctrl-C or quit() command).
5. Wait 60 seconds for all the subscriptions to error out.

After this the device will no longer respond with data reports to any
read or subscribe requests.

* Add a test for a subscription trying to send reports on a defunct session.

This is testing the situation described in
#24093

* add mNumReportsInFlight check test when failing to send report

Co-authored-by: yunhanw <[email protected]>
  • Loading branch information
bzbarsky-apple and yunhanw-google authored Dec 16, 2022
1 parent f6d4766 commit 3c1d8cf
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 16 deletions.
35 changes: 20 additions & 15 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ CHIP_ERROR ReadHandler::SendStatusReport(Protocols::InteractionModel::Status aSt
CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, bool aMoreChunks)
{
VerifyOrReturnLogError(IsReportable(), CHIP_ERROR_INCORRECT_STATE);
VerifyOrDie(!IsAwaitingReportResponse()); // Should not be reportable!
if (IsPriming() || IsChunkedReport())
{
mSessionHandle.Grab(mExchangeCtx->GetSessionHandle());
Expand All @@ -217,27 +218,31 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, b
mCurrentReportsBeginGeneration = InteractionModelEngine::GetInstance()->GetReportingEngine().GetDirtySetGeneration();
}
SetStateFlag(ReadHandlerFlags::ChunkedReport, aMoreChunks);
bool noResponseExpected = IsType(InteractionType::Read) && !aMoreChunks;
if (!noResponseExpected)
{
MoveToState(HandlerState::AwaitingReportResponse);
}
bool responseExpected = IsType(InteractionType::Subscribe) || aMoreChunks;

mExchangeCtx->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime);
CHIP_ERROR err =
mExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::ReportData, std::move(aPayload),
Messaging::SendFlags(noResponseExpected ? Messaging::SendMessageFlags::kNone
: Messaging::SendMessageFlags::kExpectResponse));
if (err == CHIP_NO_ERROR && noResponseExpected)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().OnReportConfirm();
}

CHIP_ERROR err = mExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::ReportData, std::move(aPayload),
responseExpected ? Messaging::SendMessageFlags::kExpectResponse
: Messaging::SendMessageFlags::kNone);
if (err == CHIP_NO_ERROR)
{
if (responseExpected)
{
MoveToState(HandlerState::AwaitingReportResponse);
}
else
{
// Make sure we're not treated as an in-flight report waiting for a
// response by the reporting engine.
InteractionModelEngine::GetInstance()->GetReportingEngine().OnReportConfirm();
}

if (IsType(InteractionType::Subscribe) && !IsPriming())
{
err = RefreshSubscribeSyncTimer();
// Ignore the error from RefreshSubscribeSyncTimer. If we've
// successfully sent the message, we need to return success from
// this method.
RefreshSubscribeSyncTimer();
}
}
if (!aMoreChunks)
Expand Down
2 changes: 2 additions & 0 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ class ReadHandler : public Messaging::ExchangeDelegate
* @retval #Others If fails to send report data
* @retval #CHIP_NO_ERROR On success.
*
* If an error is returned, the ReadHandler guarantees that it is not in
* a state where it's waiting for a response.
*/
CHIP_ERROR SendReportData(System::PacketBufferHandle && aPayload, bool aMoreChunks);

Expand Down
4 changes: 4 additions & 0 deletions src/app/reporting/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,10 @@ CHIP_ERROR Engine::SendReport(ReadHandler * apReadHandler, System::PacketBufferH
// We can only have 1 report in flight for any given read - increment and break out.
mNumReportsInFlight++;
err = apReadHandler->SendReportData(std::move(aPayload), aHasMoreChunks);
if (err != CHIP_NO_ERROR)
{
--mNumReportsInFlight;
}
return err;
}

Expand Down
142 changes: 142 additions & 0 deletions src/app/tests/TestReadInteraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ class TestReadInteraction
static void TestSubscribeRoundtripStatusReportTimeout(nlTestSuite * apSuite, void * apContext);
static void TestPostSubscribeRoundtripStatusReportTimeout(nlTestSuite * apSuite, void * apContext);
static void TestReadChunkingStatusReportTimeout(nlTestSuite * apSuite, void * apContext);
static void TestReadReportFailure(nlTestSuite * apSuite, void * apContext);
static void TestSubscribeRoundtripChunkStatusReportTimeout(nlTestSuite * apSuite, void * apContext);
static void TestPostSubscribeRoundtripChunkStatusReportTimeout(nlTestSuite * apSuite, void * apContext);
static void TestPostSubscribeRoundtripChunkReportTimeout(nlTestSuite * apSuite, void * apContext);
Expand All @@ -338,6 +339,7 @@ class TestReadInteraction
static void TestReadHandlerInvalidSubscribeRequest(nlTestSuite * apSuite, void * apContext);
static void TestSubscribeInvalidateFabric(nlTestSuite * apSuite, void * apContext);
static void TestShutdownSubscription(nlTestSuite * apSuite, void * apContext);
static void TestSubscriptionReportWithDefunctSession(nlTestSuite * apSuite, void * apContext);
static void TestReadHandlerMalformedSubscribeRequest(nlTestSuite * apSuite, void * apContext);

private:
Expand Down Expand Up @@ -2536,6 +2538,57 @@ void TestReadInteraction::TestReadChunkingStatusReportTimeout(nlTestSuite * apSu
ctx.CreateSessionBobToAlice();
}

// ReadClient sends the read request, but handler fails to send the one report (SendMessage returns an error).
// Since this is an un-chunked read, we are not in the AwaitingReportResponse state, so the "reports in flight"
// counter should not increase.
void TestReadInteraction::TestReadReportFailure(nlTestSuite * apSuite, void * apContext)
{
TestContext & ctx = *static_cast<TestContext *>(apContext);
CHIP_ERROR err = CHIP_NO_ERROR;

Messaging::ReliableMessageMgr * rm = ctx.GetExchangeManager().GetReliableMessageMgr();
// Shouldn't have anything in the retransmit table when starting the test.
NL_TEST_ASSERT(apSuite, rm->TestGetCountRetransTable() == 0);

MockInteractionModelApp delegate;
auto * engine = chip::app::InteractionModelEngine::GetInstance();
err = engine->Init(&ctx.GetExchangeManager(), &ctx.GetFabricTable());
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);
NL_TEST_ASSERT(apSuite, !delegate.mGotEventResponse);

chip::app::AttributePathParams attributePathParams[1];
attributePathParams[0].mEndpointId = Test::kMockEndpoint2;
attributePathParams[0].mClusterId = Test::MockClusterId(3);
attributePathParams[0].mAttributeId = Test::MockAttributeId(1);

ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice());
readPrepareParams.mpEventPathParamsList = nullptr;
readPrepareParams.mEventPathParamsListSize = 0;
readPrepareParams.mpAttributePathParamsList = attributePathParams;
readPrepareParams.mAttributePathParamsListSize = 1;

{
app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate,
chip::app::ReadClient::InteractionType::Read);

ctx.GetLoopback().mNumMessagesToAllowBeforeError = 1;
ctx.GetLoopback().mMessageSendError = CHIP_ERROR_INCORRECT_STATE;
err = readClient.SendRequest(readPrepareParams);
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);

ctx.DrainAndServiceIO();
NL_TEST_ASSERT(apSuite, engine->GetReportingEngine().GetNumReportsInFlight() == 0);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers() == 0);

ctx.GetLoopback().mNumMessagesToAllowBeforeError = 0;
ctx.GetLoopback().mMessageSendError = CHIP_NO_ERROR;
}

NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadClients() == 0);
engine->Shutdown();
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
}

void TestReadInteraction::TestSubscribeRoundtripChunkStatusReportTimeout(nlTestSuite * apSuite, void * apContext)
{
TestContext & ctx = *static_cast<TestContext *>(apContext);
Expand Down Expand Up @@ -4101,6 +4154,93 @@ void TestReadInteraction::TestShutdownSubscription(nlTestSuite * apSuite, void *
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
}

/**
* Tests what happens when a subscription tries to deliver reports but the
* session it has is defunct. Makes sure we correctly tear down the ReadHandler
* and don't increment the "reports in flight" count.
*/
void TestReadInteraction::TestSubscriptionReportWithDefunctSession(nlTestSuite * apSuite, void * apContext)
{
TestContext & ctx = *static_cast<TestContext *>(apContext);
CHIP_ERROR err = CHIP_NO_ERROR;

Messaging::ReliableMessageMgr * rm = ctx.GetExchangeManager().GetReliableMessageMgr();
// Shouldn't have anything in the retransmit table when starting the test.
NL_TEST_ASSERT(apSuite, rm->TestGetCountRetransTable() == 0);

MockInteractionModelApp delegate;
auto * engine = chip::app::InteractionModelEngine::GetInstance();
err = engine->Init(&ctx.GetExchangeManager(), &ctx.GetFabricTable());
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);

ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice());
readPrepareParams.mpAttributePathParamsList = new chip::app::AttributePathParams[1];
readPrepareParams.mAttributePathParamsListSize = 1;

AttributePathParams subscribePath(Test::kMockEndpoint3, Test::MockClusterId(2), Test::MockAttributeId(4));
readPrepareParams.mpAttributePathParamsList[0] = subscribePath;

readPrepareParams.mMinIntervalFloorSeconds = 0;
readPrepareParams.mMaxIntervalCeilingSeconds = 0;

{
app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate,
chip::app::ReadClient::InteractionType::Subscribe);

delegate.mGotReport = false;

err = readClient.SendSubscribeRequest(std::move(readPrepareParams));
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);

ctx.DrainAndServiceIO();

NL_TEST_ASSERT(apSuite, delegate.mGotReport);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Read) == 0);
NL_TEST_ASSERT(apSuite, engine->GetReportingEngine().GetNumReportsInFlight() == 0);

NL_TEST_ASSERT(apSuite, engine->ActiveHandlerAt(0) != nullptr);
auto * readHandler = engine->ActiveHandlerAt(0);

// Verify that the session we will reset later is the one we will mess
// with now.
NL_TEST_ASSERT(apSuite, SessionHandle(*readHandler->GetSession()) == ctx.GetSessionAliceToBob());

// Test that we send reports as needed.
readHandler->mFlags.Set(ReadHandler::ReadHandlerFlags::HoldReport, false);
delegate.mGotReport = false;
engine->GetReportingEngine().SetDirty(subscribePath);

ctx.DrainAndServiceIO();

NL_TEST_ASSERT(apSuite, delegate.mGotReport);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Read) == 0);
NL_TEST_ASSERT(apSuite, engine->GetReportingEngine().GetNumReportsInFlight() == 0);

// Test that if the session is defunct we don't send reports and clean
// up properly.
readHandler->GetSession()->MarkAsDefunct();
readHandler->mFlags.Set(ReadHandler::ReadHandlerFlags::HoldReport, false);
delegate.mGotReport = false;
engine->GetReportingEngine().SetDirty(subscribePath);

ctx.DrainAndServiceIO();

NL_TEST_ASSERT(apSuite, !delegate.mGotReport);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 0);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Read) == 0);
NL_TEST_ASSERT(apSuite, engine->GetReportingEngine().GetNumReportsInFlight() == 0);
}
engine->Shutdown();
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadClients() == 0);
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);

// Get rid of our defunct session.
ctx.ExpireSessionAliceToBob();
ctx.CreateSessionAliceToBob();
}

} // namespace app
} // namespace chip

Expand Down Expand Up @@ -4160,10 +4300,12 @@ const nlTest sTests[] =
NL_TEST_DEF("TestSubscribeRoundtripStatusReportTimeout", chip::app::TestReadInteraction::TestSubscribeRoundtripStatusReportTimeout),
NL_TEST_DEF("TestPostSubscribeRoundtripStatusReportTimeout", chip::app::TestReadInteraction::TestPostSubscribeRoundtripStatusReportTimeout),
NL_TEST_DEF("TestReadChunkingStatusReportTimeout", chip::app::TestReadInteraction::TestReadChunkingStatusReportTimeout),
NL_TEST_DEF("TestReadReportFailure", chip::app::TestReadInteraction::TestReadReportFailure),
NL_TEST_DEF("TestSubscribeRoundtripChunkStatusReportTimeout", chip::app::TestReadInteraction::TestSubscribeRoundtripChunkStatusReportTimeout),
NL_TEST_DEF("TestPostSubscribeRoundtripChunkStatusReportTimeout", chip::app::TestReadInteraction::TestPostSubscribeRoundtripChunkStatusReportTimeout),
NL_TEST_DEF("TestPostSubscribeRoundtripChunkReportTimeout", chip::app::TestReadInteraction::TestPostSubscribeRoundtripChunkReportTimeout),
NL_TEST_DEF("TestReadShutdown", chip::app::TestReadInteraction::TestReadShutdown),
NL_TEST_DEF("TestSubscriptionReportWithDefunctSession", chip::app::TestReadInteraction::TestSubscriptionReportWithDefunctSession),
NL_TEST_SENTINEL()
};
// clang-format on
Expand Down
11 changes: 10 additions & 1 deletion src/transport/raw/tests/NetworkTestHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,16 @@ class LoopbackTransport : public Transport::Base

CHIP_ERROR SendMessage(const Transport::PeerAddress & address, System::PacketBufferHandle && msgBuf) override
{
ReturnErrorOnFailure(mMessageSendError);
if (mNumMessagesToAllowBeforeError == 0)
{
ReturnErrorOnFailure(mMessageSendError);
}
mSentMessageCount++;
bool dropMessage = false;
if (mNumMessagesToAllowBeforeError > 0)
{
--mNumMessagesToAllowBeforeError;
}
if (mNumMessagesToAllowBeforeDropping > 0)
{
--mNumMessagesToAllowBeforeDropping;
Expand Down Expand Up @@ -143,6 +150,7 @@ class LoopbackTransport : public Transport::Base
mDroppedMessageCount = 0;
mSentMessageCount = 0;
mNumMessagesToAllowBeforeDropping = 0;
mNumMessagesToAllowBeforeError = 0;
mMessageSendError = CHIP_NO_ERROR;
}

Expand All @@ -163,6 +171,7 @@ class LoopbackTransport : public Transport::Base
uint32_t mDroppedMessageCount = 0;
uint32_t mSentMessageCount = 0;
uint32_t mNumMessagesToAllowBeforeDropping = 0;
uint32_t mNumMessagesToAllowBeforeError = 0;
CHIP_ERROR mMessageSendError = CHIP_NO_ERROR;
LoopbackTransportDelegate * mDelegate = nullptr;
};
Expand Down

0 comments on commit 3c1d8cf

Please sign in to comment.