Skip to content

Commit

Permalink
urgent event needs to honor min interval for subscription. (#21938)
Browse files Browse the repository at this point in the history
* Improve urgent event

-- In latest spec, urgent event needs to honor min interval for
subscription.
-- Add new fflag, urgentEvent, and remove unused flag, SuppressResponse
-- ForceDirty would be used when recieving the initial request, where it
mark the corresponding handler with dirtiness.
-- When there is urgent event, we would set new flag, UrgentEvent, in
readHandler instead of setting ForceDirty, after minInterval elapsed,
the urgent event would be sent out. Modify the existing urgent event
test and check if event can be generated after minInterval elapsed.

* address comments

* address comment
  • Loading branch information
yunhanw-google authored Aug 23, 2022
1 parent c3fcbc1 commit f740b75
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 74 deletions.
2 changes: 1 addition & 1 deletion src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, b
if (!aMoreChunks)
{
mPreviousReportsBeginGeneration = mCurrentReportsBeginGeneration;
ClearDirty();
ClearForceDirtyFlag();
InteractionModelEngine::GetInstance()->ReleaseDataVersionFilterList(mpDataVersionFilterList);
}

Expand Down
16 changes: 5 additions & 11 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ class ReadHandler : public Messaging::ExchangeDelegate
enum class ReadHandlerFlags : uint8_t
{
// mHoldReport is used to prevent subscription data delivery while we are
// waiting for the min reporting interval to elapse. If we have to send a
// report immediately due to an urgent event being queued,
// UnblockUrgentEventDelivery can be used to force mHoldReport to false.
// waiting for the min reporting interval to elapse.
HoldReport = (1 << 0),

// mHoldSync is used to prevent subscription empty report delivery while we
Expand All @@ -219,14 +217,15 @@ class ReadHandler : public Messaging::ExchangeDelegate
PrimingReports = (1 << 3),
ActiveSubscription = (1 << 4),
FabricFiltered = (1 << 5),

// For subscriptions, we record the dirty set generation when we started to generate the last report.
// The mCurrentReportsBeginGeneration records the generation at the start of the current report. This only/
// has a meaningful value while IsReporting() is true.
//
// mPreviousReportsBeginGeneration will be set to mCurrentReportsBeginGeneration after we send the last
// chunk of the current report. Anything that was dirty with a generation earlier than
// mPreviousReportsBeginGeneration has had its value sent to the client.
// when receiving initial request, it needs mark current handler as dirty.
// when there is urgent event, it needs mark current handler as dirty.
ForceDirty = (1 << 6),

// Don't need the response for report data if true
Expand Down Expand Up @@ -300,8 +299,7 @@ class ReadHandler : public Messaging::ExchangeDelegate
{
return (mDirtyGeneration > mPreviousReportsBeginGeneration) || mFlags.Has(ReadHandlerFlags::ForceDirty);
}
void ClearDirty() { mFlags.Clear(ReadHandlerFlags::ForceDirty); }

void ClearForceDirtyFlag() { mFlags.Clear(ReadHandlerFlags::ForceDirty); }
NodeId GetInitiatorNodeId() const
{
auto session = GetSession();
Expand All @@ -319,11 +317,7 @@ class ReadHandler : public Messaging::ExchangeDelegate

auto GetTransactionStartGeneration() const { return mTransactionStartGeneration; }

void UnblockUrgentEventDelivery()
{
mFlags.Clear(ReadHandlerFlags::HoldReport);
mFlags.Set(ReadHandlerFlags::ForceDirty);
}
void UnblockUrgentEventDelivery() { mFlags.Set(ReadHandlerFlags::ForceDirty); }

const AttributeValueEncoder::AttributeEncodeState & GetAttributeEncodeState() const { return mAttributeEncoderState; }
void SetAttributeEncodeState(const AttributeValueEncoder::AttributeEncodeState & aState) { mAttributeEncoderState = aState; }
Expand Down
42 changes: 3 additions & 39 deletions src/app/reporting/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,7 @@ void Engine::Run()

bool allReadClean = true;

imEngine->mReadHandlers.ForEachActiveObject([this, &allReadClean](ReadHandler * handler) {
UpdateReadHandlerDirty(*handler);
imEngine->mReadHandlers.ForEachActiveObject([&allReadClean](ReadHandler * handler) {
if (handler->IsDirty())
{
allReadClean = false;
Expand Down Expand Up @@ -850,41 +849,6 @@ CHIP_ERROR Engine::SetDirty(AttributePathParams & aAttributePath)
return CHIP_NO_ERROR;
}

void Engine::UpdateReadHandlerDirty(ReadHandler & aReadHandler)
{
if (!aReadHandler.IsDirty())
{
return;
}

if (!aReadHandler.IsType(ReadHandler::InteractionType::Subscribe))
{
return;
}

bool intersected = false;
for (auto object = aReadHandler.GetAttributePathList(); object != nullptr; object = object->mpNext)
{
mGlobalDirtySet.ForEachActiveObject([&](auto * path) {
if (path->Intersects(object->mValue) && path->mGeneration > aReadHandler.mPreviousReportsBeginGeneration)
{
intersected = true;
return Loop::Break;
}
return Loop::Continue;
});
if (intersected)
{
break;
}
}
if (!intersected)
{
aReadHandler.ClearDirty();
ChipLogDetail(InteractionModel, "clear read handler dirty in UpdateReadHandlerDirty!");
}
}

CHIP_ERROR Engine::SendReport(ReadHandler * apReadHandler, System::PacketBufferHandle && aPayload, bool aHasMoreChunks)
{
CHIP_ERROR err = CHIP_NO_ERROR;
Expand Down Expand Up @@ -974,8 +938,8 @@ CHIP_ERROR Engine::ScheduleEventDelivery(ConcreteEventPath & aPath, uint32_t aBy

if (isUrgentEvent)
{
ChipLogDetail(DataManagement, "urgent event schedule run");
return ScheduleRun();
ChipLogDetail(DataManagement, "urgent event would be sent after min interval");
return CHIP_NO_ERROR;
}

return ScheduleBufferPressureEventDelivery(aBytesWritten);
Expand Down
6 changes: 0 additions & 6 deletions src/app/reporting/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,6 @@ class Engine
bool IsClusterDataVersionMatch(const ObjectList<DataVersionFilter> * aDataVersionFilterList,
const ConcreteReadAttributePath & aPath);

/**
* Check all active subscription, if the subscription has no paths that intersect with global dirty set,
* it would clear dirty flag for that subscription
*
*/
void UpdateReadHandlerDirty(ReadHandler & aReadHandler);
/**
* Send Report via ReadHandler
*
Expand Down
51 changes: 34 additions & 17 deletions src/app/tests/TestReadInteraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1515,9 +1515,8 @@ void TestReadInteraction::TestSubscribeRoundtrip(nlTestSuite * apSuite, void * a
{
app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate,
chip::app::ReadClient::InteractionType::Subscribe);
readPrepareParams.mpEventPathParamsList[0].mIsUrgentEvent = true;
delegate.mGotReport = false;
err = readClient.SendRequest(readPrepareParams);
delegate.mGotReport = false;
err = readClient.SendRequest(readPrepareParams);
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);

ctx.DrainAndServiceIO();
Expand All @@ -1532,8 +1531,6 @@ void TestReadInteraction::TestSubscribeRoundtrip(nlTestSuite * apSuite, void * a
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1);

GenerateEvents(apSuite, apContext);
NL_TEST_ASSERT(apSuite, !delegate.mpReadHandler->mFlags.Has(ReadHandler::ReadHandlerFlags::HoldReport));
NL_TEST_ASSERT(apSuite, delegate.mpReadHandler->IsDirty());
chip::app::AttributePathParams dirtyPath1;
dirtyPath1.mClusterId = kTestClusterId;
dirtyPath1.mEndpointId = kTestEndpointId;
Expand Down Expand Up @@ -1563,6 +1560,7 @@ void TestReadInteraction::TestSubscribeRoundtrip(nlTestSuite * apSuite, void * a
// Test report with 2 different path
delegate.mpReadHandler->mFlags.Set(ReadHandler::ReadHandlerFlags::HoldReport, false);
delegate.mGotReport = false;
delegate.mGotEventResponse = false;
delegate.mNumAttributeResponse = 0;

printf("HereHere\n");
Expand All @@ -1575,6 +1573,7 @@ void TestReadInteraction::TestSubscribeRoundtrip(nlTestSuite * apSuite, void * a
ctx.DrainAndServiceIO();

NL_TEST_ASSERT(apSuite, delegate.mGotReport);
NL_TEST_ASSERT(apSuite, delegate.mGotEventResponse == true);
NL_TEST_ASSERT(apSuite, delegate.mNumAttributeResponse == 2);

// Test report with 2 different path, and 1 same path
Expand Down Expand Up @@ -1701,6 +1700,8 @@ void TestReadInteraction::TestSubscribeUrgentWildcardEvent(nlTestSuite * apSuite

ctx.DrainAndServiceIO();

System::Clock::Timestamp startTime = System::SystemClock().GetMonotonicTimestamp();

NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers() == 1);
NL_TEST_ASSERT(apSuite, engine->ActiveHandlerAt(0) != nullptr);
delegate.mpReadHandler = engine->ActiveHandlerAt(0);
Expand All @@ -1711,12 +1712,37 @@ void TestReadInteraction::TestSubscribeUrgentWildcardEvent(nlTestSuite * apSuite
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1);

GenerateEvents(apSuite, apContext);
NL_TEST_ASSERT(apSuite, !delegate.mpReadHandler->mFlags.Has(ReadHandler::ReadHandlerFlags::HoldReport));
NL_TEST_ASSERT(apSuite, delegate.mpReadHandler->mFlags.Has(ReadHandler::ReadHandlerFlags::HoldReport));
NL_TEST_ASSERT(apSuite, delegate.mpReadHandler->IsDirty() == true);
delegate.mGotEventResponse = false;
delegate.mGotReport = false;
ctx.DrainAndServiceIO();
NL_TEST_ASSERT(apSuite, delegate.mGotEventResponse);

// wait for min interval 2 seconds(in test, we use 1.9second considering the time variation), expect no event is received,
// then wait for 0.5 seconds, then the urgent event would be sent out
// currently DriveIOUntil will call `DriveIO` at least once, which means that if there is any CPU scheduling issues,
// there's a chance 1.9s will already have elapsed by the time we get there, which will result in DriveIO being called when
// it shouldn't. Better fix could happen inside DriveIOUntil, not sure the sideeffect there.
while (true)
{
if ((System::SystemClock().GetMonotonicTimestamp() - startTime) >= System::Clock::Milliseconds32(1900))
{
break;
}
ctx.GetIOContext().DriveIO(); // at least one IO loop is guaranteed
}

NL_TEST_ASSERT(apSuite, delegate.mGotEventResponse != true);

startTime = System::SystemClock().GetMonotonicTimestamp();
while (true)
{
if ((System::SystemClock().GetMonotonicTimestamp() - startTime) >= System::Clock::Milliseconds32(500))
{
break;
}
ctx.GetIOContext().DriveIO(); // at least one IO loop is guaranteed
}
NL_TEST_ASSERT(apSuite, delegate.mGotEventResponse == true);
}

// By now we should have closed all exchanges and sent all pending acks, so
Expand Down Expand Up @@ -2228,7 +2254,6 @@ void TestReadInteraction::TestPostSubscribeRoundtripStatusReportTimeout(nlTestSu
{
app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate,
chip::app::ReadClient::InteractionType::Subscribe);
readPrepareParams.mpEventPathParamsList[0].mIsUrgentEvent = true;
printf("\nSend first subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);
delegate.mGotReport = false;
err = readClient.SendRequest(readPrepareParams);
Expand All @@ -2246,8 +2271,6 @@ void TestReadInteraction::TestPostSubscribeRoundtripStatusReportTimeout(nlTestSu
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1);

GenerateEvents(apSuite, apContext);
NL_TEST_ASSERT(apSuite, !delegate.mpReadHandler->mFlags.Has(ReadHandler::ReadHandlerFlags::HoldReport));
NL_TEST_ASSERT(apSuite, delegate.mpReadHandler->IsDirty());
chip::app::AttributePathParams dirtyPath1;
dirtyPath1.mClusterId = kTestClusterId;
dirtyPath1.mEndpointId = kTestEndpointId;
Expand Down Expand Up @@ -2548,7 +2571,6 @@ void TestReadInteraction::TestPostSubscribeRoundtripChunkStatusReportTimeout(nlT
{
app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate,
chip::app::ReadClient::InteractionType::Subscribe);
readPrepareParams.mpEventPathParamsList[0].mIsUrgentEvent = true;
printf("\nSend first subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);
delegate.mGotReport = false;
err = readClient.SendRequest(readPrepareParams);
Expand All @@ -2565,8 +2587,6 @@ void TestReadInteraction::TestPostSubscribeRoundtripChunkStatusReportTimeout(nlT
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1);

GenerateEvents(apSuite, apContext);
NL_TEST_ASSERT(apSuite, !delegate.mpReadHandler->mFlags.Has(ReadHandler::ReadHandlerFlags::HoldReport));
NL_TEST_ASSERT(apSuite, delegate.mpReadHandler->IsDirty());
chip::app::AttributePathParams dirtyPath1;
dirtyPath1.mClusterId = Test::MockClusterId(2);
dirtyPath1.mEndpointId = Test::kMockEndpoint3;
Expand Down Expand Up @@ -2645,7 +2665,6 @@ void TestReadInteraction::TestPostSubscribeRoundtripChunkReportTimeout(nlTestSui
{
app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate,
chip::app::ReadClient::InteractionType::Subscribe);
readPrepareParams.mpEventPathParamsList[0].mIsUrgentEvent = true;
printf("\nSend first subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);
delegate.mGotReport = false;
err = readClient.SendRequest(readPrepareParams);
Expand All @@ -2662,8 +2681,6 @@ void TestReadInteraction::TestPostSubscribeRoundtripChunkReportTimeout(nlTestSui
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1);

GenerateEvents(apSuite, apContext);
NL_TEST_ASSERT(apSuite, !delegate.mpReadHandler->mFlags.Has(ReadHandler::ReadHandlerFlags::HoldReport));
NL_TEST_ASSERT(apSuite, delegate.mpReadHandler->IsDirty());
chip::app::AttributePathParams dirtyPath1;
dirtyPath1.mClusterId = Test::MockClusterId(2);
dirtyPath1.mEndpointId = Test::kMockEndpoint3;
Expand Down

0 comments on commit f740b75

Please sign in to comment.