Skip to content

Commit

Permalink
chunked report don't need honor minInterval (#22139)
Browse files Browse the repository at this point in the history
  • Loading branch information
yunhanw-google authored Aug 29, 2022
1 parent 0abfab0 commit c60a0aa
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 35 deletions.
20 changes: 11 additions & 9 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -755,25 +755,27 @@ void ReadHandler::OnRefreshSubscribeTimerSyncCallback(System::Layer * apSystemLa
VerifyOrReturn(apAppState != nullptr);
ReadHandler * readHandler = static_cast<ReadHandler *>(apAppState);
readHandler->mFlags.Set(ReadHandlerFlags::HoldSync, false);
ChipLogDetail(DataManagement, "Refresh subscribe timer sync after %d seconds",
readHandler->mMaxInterval - readHandler->mMinIntervalFloorSeconds);
ChipLogProgress(DataManagement, "Refresh subscribe timer sync after %d seconds",
readHandler->mMaxInterval - readHandler->mMinIntervalFloorSeconds);
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
}

CHIP_ERROR ReadHandler::RefreshSubscribeSyncTimer()
{
ChipLogDetail(DataManagement, "Refresh Subscribe Sync Timer with max %d seconds", mMaxInterval);
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
OnUnblockHoldReportCallback, this);
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
OnRefreshSubscribeTimerSyncCallback, this);

mFlags.Set(ReadHandlerFlags::HoldReport);
mFlags.Set(ReadHandlerFlags::HoldSync);

ReturnErrorOnFailure(
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
System::Clock::Seconds16(mMinIntervalFloorSeconds), OnUnblockHoldReportCallback, this));
if (!IsChunkedReport())
{
ChipLogProgress(DataManagement, "Refresh Subscribe Sync Timer with max %d seconds", mMaxInterval);
mFlags.Set(ReadHandlerFlags::HoldReport);
mFlags.Set(ReadHandlerFlags::HoldSync);
ReturnErrorOnFailure(
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
System::Clock::Seconds16(mMinIntervalFloorSeconds), OnUnblockHoldReportCallback, this));
}

return CHIP_NO_ERROR;
}
Expand Down
163 changes: 137 additions & 26 deletions src/app/tests/TestReadInteraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ class TestReadInteraction
static void TestReadHandlerInvalidAttributePath(nlTestSuite * apSuite, void * apContext);
static void TestProcessSubscribeRequest(nlTestSuite * apSuite, void * apContext);
static void TestReadRoundtrip(nlTestSuite * apSuite, void * apContext);
static void TestPostSubscribeRoundtripChunkReport(nlTestSuite * apSuite, void * apContext);
static void TestReadRoundtripWithDataVersionFilter(nlTestSuite * apSuite, void * apContext);
static void TestReadRoundtripWithNoMatchPathDataVersionFilter(nlTestSuite * apSuite, void * apContext);
static void TestReadRoundtripWithMultiSamePathDifferentDataVersionFilter(nlTestSuite * apSuite, void * apContext);
Expand Down Expand Up @@ -2592,26 +2593,33 @@ void TestReadInteraction::TestPostSubscribeRoundtripChunkStatusReportTimeout(nlT
dirtyPath1.mEndpointId = Test::kMockEndpoint3;
dirtyPath1.mAttributeId = Test::MockAttributeId(4);

delegate.mpReadHandler->mFlags.Set(ReadHandler::ReadHandlerFlags::HoldReport, false);
delegate.mGotReport = false;
delegate.mNumAttributeResponse = 0;

err = engine->GetReportingEngine().SetDirty(dirtyPath1);
delegate.mpReadHandler->mFlags.Set(ReadHandler::ReadHandlerFlags::HoldReport, false);
delegate.mpReadHandler->mFlags.Set(ReadHandler::ReadHandlerFlags::HoldSync, false);
delegate.mGotReport = false;
delegate.mNumAttributeResponse = 0;
ctx.ExpireSessionBobToAlice();

ctx.GetLoopback().mSentMessageCount = 0;
ctx.GetLoopback().mNumMessagesToDrop = 1;
ctx.GetLoopback().mNumMessagesToAllowBeforeDropping = 1;
ctx.GetLoopback().mDroppedMessageCount = 0;

ctx.DrainAndServiceIO();
// Drop status report for the first chunked report, then expire session, handler would be timeout
NL_TEST_ASSERT(apSuite, engine->GetReportingEngine().GetNumReportsInFlight() == 1);
NL_TEST_ASSERT(apSuite, ctx.GetLoopback().mSentMessageCount == 2);
NL_TEST_ASSERT(apSuite, ctx.GetLoopback().mDroppedMessageCount == 1);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers() == 1);

ctx.ExpireSessionAliceToBob();
NL_TEST_ASSERT(apSuite, engine->GetReportingEngine().GetNumReportsInFlight() == 0);
NL_TEST_ASSERT(apSuite, delegate.mNumAttributeResponse == 0);
ctx.ExpireSessionBobToAlice();
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers() == 0);
ctx.GetLoopback().mSentMessageCount = 0;
ctx.GetLoopback().mNumMessagesToDrop = 0;
ctx.GetLoopback().mNumMessagesToAllowBeforeDropping = 0;
ctx.GetLoopback().mDroppedMessageCount = 0;
}

// By now we should have closed all exchanges and sent all pending acks, so
// there should be no queued-up things in the retransmit table.
NL_TEST_ASSERT(apSuite, rm->TestGetCountRetransTable() == 0);

NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadClients() == 0);
engine->Shutdown();
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
Expand Down Expand Up @@ -2686,36 +2694,138 @@ void TestReadInteraction::TestPostSubscribeRoundtripChunkReportTimeout(nlTestSui
dirtyPath1.mEndpointId = Test::kMockEndpoint3;
dirtyPath1.mAttributeId = Test::MockAttributeId(4);

delegate.mpReadHandler->mFlags.Set(ReadHandler::ReadHandlerFlags::HoldReport, false);
delegate.mGotReport = false;
delegate.mNumAttributeResponse = 0;

err = engine->GetReportingEngine().SetDirty(dirtyPath1);
delegate.mpReadHandler->mFlags.Set(ReadHandler::ReadHandlerFlags::HoldReport, false);
delegate.mpReadHandler->mFlags.Set(ReadHandler::ReadHandlerFlags::HoldSync, false);
delegate.mGotReport = false;
delegate.mNumAttributeResponse = 0;

// Drop second chunked report then expire session, client would be timeout
ctx.GetLoopback().mSentMessageCount = 0;
ctx.GetLoopback().mNumMessagesToDrop = 1;
ctx.GetLoopback().mNumMessagesToAllowBeforeDropping = 2;
ctx.GetLoopback().mDroppedMessageCount = 0;

ctx.DrainAndServiceIO();
ctx.ExpireSessionBobToAlice();
NL_TEST_ASSERT(apSuite, engine->GetReportingEngine().GetNumReportsInFlight() == 1);
NL_TEST_ASSERT(apSuite, ctx.GetLoopback().mSentMessageCount == 3);
NL_TEST_ASSERT(apSuite, ctx.GetLoopback().mDroppedMessageCount == 1);

NL_TEST_ASSERT(apSuite, engine->GetReportingEngine().GetNumReportsInFlight() == 0);
ctx.ExpireSessionAliceToBob();
ctx.ExpireSessionBobToAlice();
NL_TEST_ASSERT(apSuite, delegate.mError == CHIP_ERROR_TIMEOUT);
}

// By now we should have closed all exchanges and sent all pending acks, so
// there should be no queued-up things in the retransmit table.
NL_TEST_ASSERT(apSuite, rm->TestGetCountRetransTable() == 0);
ctx.GetLoopback().mSentMessageCount = 0;
ctx.GetLoopback().mNumMessagesToDrop = 0;
ctx.GetLoopback().mNumMessagesToAllowBeforeDropping = 0;
ctx.GetLoopback().mDroppedMessageCount = 0;
}

NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadClients() == 0);
engine->Shutdown();
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
ctx.CreateSessionAliceToBob();
ctx.CreateSessionBobToAlice();
}

// Engine shutdown seems to trigger some messages to try to be sent. Make
// sure those get flushed out.
ctx.DrainAndServiceIO();
void TestReadInteraction::TestPostSubscribeRoundtripChunkReport(nlTestSuite * apSuite, void * apContext)
{
TestContext & ctx = *static_cast<TestContext *>(apContext);
CHIP_ERROR err = CHIP_NO_ERROR;

ctx.CreateSessionBobToAlice();
Messaging::ReliableMessageMgr * rm = ctx.GetExchangeManager().GetReliableMessageMgr();
// Shouldn't have anything in the retransmit table when starting the test.
NL_TEST_ASSERT(apSuite, rm->TestGetCountRetransTable() == 0);

MockInteractionModelApp delegate;
auto * engine = chip::app::InteractionModelEngine::GetInstance();
err = engine->Init(&ctx.GetExchangeManager(), &ctx.GetFabricTable());
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);
NL_TEST_ASSERT(apSuite, !delegate.mGotEventResponse);

ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice());
chip::app::EventPathParams eventPathParams[2];
readPrepareParams.mpEventPathParamsList = eventPathParams;
readPrepareParams.mpEventPathParamsList[0].mEndpointId = kTestEndpointId;
readPrepareParams.mpEventPathParamsList[0].mClusterId = kTestClusterId;
readPrepareParams.mpEventPathParamsList[0].mEventId = kTestEventIdDebug;

readPrepareParams.mpEventPathParamsList[1].mEndpointId = kTestEndpointId;
readPrepareParams.mpEventPathParamsList[1].mClusterId = kTestClusterId;
readPrepareParams.mpEventPathParamsList[1].mEventId = kTestEventIdCritical;

readPrepareParams.mEventPathParamsListSize = 2;

chip::app::AttributePathParams attributePathParams[1];
// Mock Attribute 4 is a big attribute, with 6 large OCTET_STRING
attributePathParams[0].mEndpointId = Test::kMockEndpoint3;
attributePathParams[0].mClusterId = Test::MockClusterId(2);
attributePathParams[0].mAttributeId = Test::MockAttributeId(4);

readPrepareParams.mpAttributePathParamsList = attributePathParams;
readPrepareParams.mAttributePathParamsListSize = 1;

readPrepareParams.mMinIntervalFloorSeconds = 2;
readPrepareParams.mMaxIntervalCeilingSeconds = 5;

delegate.mNumAttributeResponse = 0;
readPrepareParams.mKeepSubscriptions = false;

{
app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate,
chip::app::ReadClient::InteractionType::Subscribe);
printf("\nSend first subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);
delegate.mGotReport = false;
err = readClient.SendRequest(readPrepareParams);
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);

ctx.DrainAndServiceIO();

NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers() == 1);
NL_TEST_ASSERT(apSuite, engine->ActiveHandlerAt(0) != nullptr);
delegate.mpReadHandler = engine->ActiveHandlerAt(0);

NL_TEST_ASSERT(apSuite, delegate.mGotEventResponse);
NL_TEST_ASSERT(apSuite, delegate.mGotReport);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1);

GenerateEvents(apSuite, apContext);
chip::app::AttributePathParams dirtyPath1;
dirtyPath1.mClusterId = Test::MockClusterId(2);
dirtyPath1.mEndpointId = Test::kMockEndpoint3;
dirtyPath1.mAttributeId = Test::MockAttributeId(4);

err = engine->GetReportingEngine().SetDirty(dirtyPath1);
delegate.mGotReport = false;
delegate.mNumAttributeResponse = 0;

// wait for min interval 2 seconds(in test, we use 1.9second considering the time variation), expect no event is received,
// then wait for 0.5 seconds, then all chunked dirty reports are sent out, which would not honor minInterval
System::Clock::Timestamp startTime = System::SystemClock().GetMonotonicTimestamp();
while (true)
{
if ((System::SystemClock().GetMonotonicTimestamp() - startTime) >= System::Clock::Milliseconds32(1900))
{
break;
}
ctx.GetIOContext().DriveIO();
}
NL_TEST_ASSERT(apSuite, delegate.mNumAttributeResponse == 0);
startTime = System::SystemClock().GetMonotonicTimestamp();
while (true)
{
if ((System::SystemClock().GetMonotonicTimestamp() - startTime) >= System::Clock::Milliseconds32(500))
{
break;
}
ctx.GetIOContext().DriveIO();
}
ctx.DrainAndServiceIO();
}
// Two chunked reports carry 7 attributeDataIB
NL_TEST_ASSERT(apSuite, delegate.mNumAttributeResponse == 7);

NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadClients() == 0);
engine->Shutdown();
}

namespace {
Expand Down Expand Up @@ -3832,6 +3942,7 @@ const nlTest sTests[] =
NL_TEST_DEF("TestReadHandlerInvalidAttributePath", chip::app::TestReadInteraction::TestReadHandlerInvalidAttributePath),
NL_TEST_DEF("TestProcessSubscribeRequest", chip::app::TestReadInteraction::TestProcessSubscribeRequest),
NL_TEST_DEF("TestSubscribeRoundtrip", chip::app::TestReadInteraction::TestSubscribeRoundtrip),
NL_TEST_DEF("TestPostSubscribeRoundtripChunkReport", chip::app::TestReadInteraction::TestPostSubscribeRoundtripChunkReport),
NL_TEST_DEF("TestReadClientReceiveInvalidMessage", chip::app::TestReadInteraction::TestReadClientReceiveInvalidMessage),
NL_TEST_DEF("TestSubscribeClientReceiveInvalidStatusResponse", chip::app::TestReadInteraction::TestSubscribeClientReceiveInvalidStatusResponse),
NL_TEST_DEF("TestSubscribeClientReceiveWellFormedStatusResponse", chip::app::TestReadInteraction::TestSubscribeClientReceiveWellFormedStatusResponse),
Expand Down

0 comments on commit c60a0aa

Please sign in to comment.