Skip to content

Commit

Permalink
Fix delivery of urgent events to actually work correctly. (project-ch…
Browse files Browse the repository at this point in the history
…ip#23240) (project-chip#23271)

* Fix delivery of urgent events to actually work correctly.

If an urgent event was emitted at a point when the ReadHandler subscribing for
it had already gotten its "min interval has elapsed" callback, we would just
mark the read handler dirty but not schedule a run of the reporting engine.
This would cause us to not report the event until something _did_ trigger such a
run (either the max interval being reached, or some other reading/reporting
activity).

The fix is to make sure ReadHandler always schedules a run when IsReportable()
becomes true.

* Address review comments.

* Make TestSubscribeUrgentWildcardEvent slightly less random-failure-prone.
  • Loading branch information
bzbarsky-apple authored Oct 22, 2022
1 parent 4a3f2d5 commit 2cc09d8
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 59 deletions.
62 changes: 42 additions & 20 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon
mLastWrittenEventsBytes = 0;
mTransactionStartGeneration = InteractionModelEngine::GetInstance()->GetReportingEngine().GetDirtySetGeneration();
mFlags.ClearAll();
mFlags.Set(ReadHandlerFlags::PrimingReports, true);
SetStateFlag(ReadHandlerFlags::PrimingReports);

mSessionHandle.Grab(mExchangeCtx->GetSessionHandle());
}
Expand Down Expand Up @@ -115,7 +115,7 @@ void ReadHandler::OnInitialRequest(System::PacketBufferHandle && aPayload)
else
{
// Force us to be in a dirty state so we get processed by the reporting
mFlags.Set(ReadHandlerFlags::ForceDirty);
SetStateFlag(ReadHandlerFlags::ForceDirty);
}
}

Expand All @@ -142,7 +142,7 @@ CHIP_ERROR ReadHandler::OnStatusResponse(Messaging::ExchangeContext * apExchange
{
err = SendSubscribeResponse();

mFlags.Set(ReadHandlerFlags::ActiveSubscription);
SetStateFlag(ReadHandlerFlags::ActiveSubscription);

auto * appCallback = mManagementCallback.GetAppCallback();
if (appCallback)
Expand Down Expand Up @@ -216,7 +216,7 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, b
{
mCurrentReportsBeginGeneration = InteractionModelEngine::GetInstance()->GetReportingEngine().GetDirtySetGeneration();
}
mFlags.Set(ReadHandlerFlags::ChunkedReport, aMoreChunks);
SetStateFlag(ReadHandlerFlags::ChunkedReport, aMoreChunks);
bool noResponseExpected = IsType(InteractionType::Read) && !aMoreChunks;
if (!noResponseExpected)
{
Expand Down Expand Up @@ -351,7 +351,7 @@ CHIP_ERROR ReadHandler::ProcessReadRequest(System::PacketBufferHandle && aPayloa

bool isFabricFiltered;
ReturnErrorOnFailure(readRequestParser.GetIsFabricFiltered(&isFabricFiltered));
mFlags.Set(ReadHandlerFlags::FabricFiltered, isFabricFiltered);
SetStateFlag(ReadHandlerFlags::FabricFiltered, isFabricFiltered);
ReturnErrorOnFailure(readRequestParser.ExitContainer());
MoveToState(HandlerState::GeneratingReports);

Expand Down Expand Up @@ -493,17 +493,17 @@ void ReadHandler::MoveToState(const HandlerState aTargetState)
InteractionModelEngine::GetInstance()->GetReportingEngine().OnReportConfirm();
}

mState = aTargetState;
ChipLogDetail(DataManagement, "IM RH moving to [%s]", GetStateStr());

//
// If we just unblocked sending reports, let's go ahead and schedule the reporting
// engine to run to kick that off.
//
if (aTargetState == HandlerState::GeneratingReports)
if (aTargetState == HandlerState::GeneratingReports && IsReportable())
{
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
}

mState = aTargetState;
ChipLogDetail(DataManagement, "IM RH moving to [%s]", GetStateStr());
}

bool ReadHandler::CheckEventClean(EventManagement & aEventManager)
Expand Down Expand Up @@ -546,7 +546,7 @@ CHIP_ERROR ReadHandler::SendSubscribeResponse()

ReturnErrorOnFailure(RefreshSubscribeSyncTimer());

mFlags.Set(ReadHandlerFlags::PrimingReports, false);
ClearStateFlag(ReadHandlerFlags::PrimingReports);
return mExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::SubscribeResponse, std::move(packet));
}

Expand Down Expand Up @@ -627,7 +627,7 @@ CHIP_ERROR ReadHandler::ProcessSubscribeRequest(System::PacketBufferHandle && aP

bool isFabricFiltered;
ReturnErrorOnFailure(subscribeRequestParser.GetIsFabricFiltered(&isFabricFiltered));
mFlags.Set(ReadHandlerFlags::FabricFiltered, isFabricFiltered);
SetStateFlag(ReadHandlerFlags::FabricFiltered, isFabricFiltered);
ReturnErrorOnFailure(Crypto::DRBG_get_bytes(reinterpret_cast<uint8_t *>(&mSubscriptionId), sizeof(mSubscriptionId)));
ReturnErrorOnFailure(subscribeRequestParser.ExitContainer());
MoveToState(HandlerState::GeneratingReports);
Expand All @@ -642,11 +642,7 @@ void ReadHandler::OnUnblockHoldReportCallback(System::Layer * apSystemLayer, voi
VerifyOrReturn(apAppState != nullptr);
ReadHandler * readHandler = static_cast<ReadHandler *>(apAppState);
ChipLogDetail(DataManagement, "Unblock report hold after min %d seconds", readHandler->mMinIntervalFloorSeconds);
readHandler->mFlags.Set(ReadHandlerFlags::HoldReport, false);
if (readHandler->IsDirty())
{
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
}
readHandler->ClearStateFlag(ReadHandlerFlags::HoldReport);
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
System::Clock::Seconds16(readHandler->mMaxInterval - readHandler->mMinIntervalFloorSeconds),
OnRefreshSubscribeTimerSyncCallback, readHandler);
Expand All @@ -656,10 +652,9 @@ void ReadHandler::OnRefreshSubscribeTimerSyncCallback(System::Layer * apSystemLa
{
VerifyOrReturn(apAppState != nullptr);
ReadHandler * readHandler = static_cast<ReadHandler *>(apAppState);
readHandler->mFlags.Set(ReadHandlerFlags::HoldSync, false);
readHandler->ClearStateFlag(ReadHandlerFlags::HoldSync);
ChipLogProgress(DataManagement, "Refresh subscribe timer sync after %d seconds",
readHandler->mMaxInterval - readHandler->mMinIntervalFloorSeconds);
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
}

CHIP_ERROR ReadHandler::RefreshSubscribeSyncTimer()
Expand All @@ -673,8 +668,8 @@ CHIP_ERROR ReadHandler::RefreshSubscribeSyncTimer()
{
ChipLogProgress(DataManagement, "Refresh Subscribe Sync Timer with min %d seconds and max %d seconds",
mMinIntervalFloorSeconds, mMaxInterval);
mFlags.Set(ReadHandlerFlags::HoldReport);
mFlags.Set(ReadHandlerFlags::HoldSync);
SetStateFlag(ReadHandlerFlags::HoldReport);
SetStateFlag(ReadHandlerFlags::HoldSync);
ReturnErrorOnFailure(
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
System::Clock::Seconds16(mMinIntervalFloorSeconds), OnUnblockHoldReportCallback, this));
Expand Down Expand Up @@ -717,6 +712,11 @@ void ReadHandler::SetDirty(const AttributePathParams & aAttributeChanged)
mAttributePathExpandIterator.ResetCurrentCluster();
mAttributeEncoderState = AttributeValueEncoder::AttributeEncodeState();
}

if (IsReportable())
{
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
}
}

Transport::SecureSession * ReadHandler::GetSession() const
Expand All @@ -727,5 +727,27 @@ Transport::SecureSession * ReadHandler::GetSession() const
}
return mSessionHandle->AsSecureSession();
}

void ReadHandler::UnblockUrgentEventDelivery()
{
SetStateFlag(ReadHandlerFlags::ForceDirty);
}

void ReadHandler::SetStateFlag(ReadHandlerFlags aFlag, bool aValue)
{
bool oldReportable = IsReportable();
mFlags.Set(aFlag, aValue);
// If we became reportable, schedule a reporting run.
if (!oldReportable && IsReportable())
{
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
}
}

void ReadHandler::ClearStateFlag(ReadHandlerFlags aFlag)
{
SetStateFlag(aFlag, false);
}

} // namespace app
} // namespace chip
11 changes: 9 additions & 2 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ class ReadHandler : public Messaging::ExchangeDelegate
bool IsIdle() const { return mState == HandlerState::Idle; }
bool IsReportable() const
{
// Important: Anything that changes the state IsReportable depends on in
// a way that causes IsReportable to become true must call ScheduleRun
// on the reporting engine.
return mState == HandlerState::GeneratingReports && !mFlags.Has(ReadHandlerFlags::HoldReport) &&
(IsDirty() || !mFlags.Has(ReadHandlerFlags::HoldSync));
}
Expand Down Expand Up @@ -299,7 +302,7 @@ class ReadHandler : public Messaging::ExchangeDelegate
{
return (mDirtyGeneration > mPreviousReportsBeginGeneration) || mFlags.Has(ReadHandlerFlags::ForceDirty);
}
void ClearForceDirtyFlag() { mFlags.Clear(ReadHandlerFlags::ForceDirty); }
void ClearForceDirtyFlag() { ClearStateFlag(ReadHandlerFlags::ForceDirty); }
NodeId GetInitiatorNodeId() const
{
auto session = GetSession();
Expand All @@ -317,7 +320,7 @@ class ReadHandler : public Messaging::ExchangeDelegate

auto GetTransactionStartGeneration() const { return mTransactionStartGeneration; }

void UnblockUrgentEventDelivery() { mFlags.Set(ReadHandlerFlags::ForceDirty); }
void UnblockUrgentEventDelivery();

const AttributeValueEncoder::AttributeEncodeState & GetAttributeEncodeState() const { return mAttributeEncoderState; }
void SetAttributeEncodeState(const AttributeValueEncoder::AttributeEncodeState & aState) { mAttributeEncoderState = aState; }
Expand Down Expand Up @@ -374,6 +377,10 @@ class ReadHandler : public Messaging::ExchangeDelegate

const char * GetStateStr() const;

// Helpers for managing our state flags properly.
void SetStateFlag(ReadHandlerFlags aFlag, bool aValue = true);
void ClearStateFlag(ReadHandlerFlags aFlag);

AttributePathExpandIterator mAttributePathExpandIterator = AttributePathExpandIterator(nullptr);

// The current generation of the reporting engine dirty set the last time we were notified that a path we're interested in was
Expand Down
10 changes: 1 addition & 9 deletions src/app/reporting/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -838,14 +838,6 @@ CHIP_ERROR Engine::SetDirty(AttributePathParams & aAttributePath)
}
ReturnErrorOnFailure(InsertPathIntoDirtySet(aAttributePath));

// Schedule work to run asynchronously on the CHIP thread. The scheduled
// work won't execute until the current execution context has
// completed. This ensures that we can 'gather up' multiple attribute
// changes that have occurred in the same execution context without
// requiring any explicit 'start' or 'end' change calls into the engine to
// book-end the change.
ScheduleRun();

return CHIP_NO_ERROR;
}

Expand Down Expand Up @@ -938,7 +930,7 @@ CHIP_ERROR Engine::ScheduleEventDelivery(ConcreteEventPath & aPath, uint32_t aBy

if (isUrgentEvent)
{
ChipLogDetail(DataManagement, "urgent event would be sent after min interval");
ChipLogDetail(DataManagement, "Urgent event will be sent once reporting is not blocked by the min interval");
return CHIP_NO_ERROR;
}

Expand Down
4 changes: 4 additions & 0 deletions src/app/reporting/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@

namespace chip {
namespace app {

class TestReadInteraction;

namespace reporting {
/*
* @class Engine
Expand Down Expand Up @@ -138,6 +141,7 @@ class Engine
void Run();

friend class TestReportingEngine;
friend class ::chip::app::TestReadInteraction;

struct AttributePathParamsWithGeneration : public AttributePathParams
{
Expand Down
Loading

0 comments on commit 2cc09d8

Please sign in to comment.