Skip to content

Commit

Permalink
Inject InteractionModelEngine into ReadHandler and reporting/Engine (#…
Browse files Browse the repository at this point in the history
…31494)

* Inject InteractionModelEngine into ReadHandler and reporting/Engine

Pass in a poinger of InteractionModelEngine in constructor or init
function instead of using InteractionMOdelEngine as a singleton.

* Modify tests for the change

* Fix Engine cpp mpImEngine pointer nullptr

* Changes to avoid public interface modifications

* Address comments and fix tests

* Restyled by clang-format

* Move setting ImEngine is Reporting::Engine from Init to constructor

---------

Co-authored-by: Restyled.io <[email protected]>
  • Loading branch information
yyzhong-g and restyled-commits authored Feb 1, 2024
1 parent 8507f90 commit 2576913
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 51 deletions.
2 changes: 1 addition & 1 deletion src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ using Protocols::InteractionModel::Status;

Global<InteractionModelEngine> sInteractionModelEngine;

InteractionModelEngine::InteractionModelEngine() {}
InteractionModelEngine::InteractionModelEngine() : mReportingEngine(this) {}

InteractionModelEngine * InteractionModelEngine::GetInstance()
{
Expand Down
2 changes: 2 additions & 0 deletions src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,

ReadHandler::ApplicationCallback * GetAppCallback() override { return mpReadHandlerApplicationCallback; }

InteractionModelEngine * GetInteractionModelEngine() override { return this; }

CHIP_ERROR OnUnsolicitedMessageReceived(const PayloadHeader & payloadHeader, ExchangeDelegate *& newDelegate) override;

/**
Expand Down
50 changes: 27 additions & 23 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon

mInteractionType = aInteractionType;
mLastWrittenEventsBytes = 0;
mTransactionStartGeneration = InteractionModelEngine::GetInstance()->GetReportingEngine().GetDirtySetGeneration();
mTransactionStartGeneration = mManagementCallback.GetInteractionModelEngine()->GetReportingEngine().GetDirtySetGeneration();
mFlags.ClearAll();
SetStateFlag(ReadHandlerFlags::PrimingReports);

Expand Down Expand Up @@ -102,7 +102,7 @@ void ReadHandler::OnSubscriptionResumed(const SessionHandle & sessionHandle,
for (size_t i = 0; i < resumptionSessionEstablisher.mSubscriptionInfo.mAttributePaths.AllocatedSize(); i++)
{
AttributePathParams params = resumptionSessionEstablisher.mSubscriptionInfo.mAttributePaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, params);
CHIP_ERROR err = mManagementCallback.GetInteractionModelEngine()->PushFrontAttributePathList(mpAttributePathList, params);
if (err != CHIP_NO_ERROR)
{
Close();
Expand All @@ -112,7 +112,7 @@ void ReadHandler::OnSubscriptionResumed(const SessionHandle & sessionHandle,
for (size_t i = 0; i < resumptionSessionEstablisher.mSubscriptionInfo.mEventPaths.AllocatedSize(); i++)
{
EventPathParams params = resumptionSessionEstablisher.mSubscriptionInfo.mEventPaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, params);
CHIP_ERROR err = mManagementCallback.GetInteractionModelEngine()->PushFrontEventPathParamsList(mpEventPathList, params);
if (err != CHIP_NO_ERROR)
{
Close();
Expand All @@ -137,7 +137,7 @@ void ReadHandler::OnSubscriptionResumed(const SessionHandle & sessionHandle,
ObjectList<AttributePathParams> * attributePath = mpAttributePathList;
while (attributePath)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue);
mManagementCallback.GetInteractionModelEngine()->GetReportingEngine().SetDirty(attributePath->mValue);
attributePath = attributePath->mpNext;
}
}
Expand All @@ -156,19 +156,19 @@ ReadHandler::~ReadHandler()

if (IsAwaitingReportResponse())
{
InteractionModelEngine::GetInstance()->GetReportingEngine().OnReportConfirm();
mManagementCallback.GetInteractionModelEngine()->GetReportingEngine().OnReportConfirm();
}
InteractionModelEngine::GetInstance()->ReleaseAttributePathList(mpAttributePathList);
InteractionModelEngine::GetInstance()->ReleaseEventPathList(mpEventPathList);
InteractionModelEngine::GetInstance()->ReleaseDataVersionFilterList(mpDataVersionFilterList);
mManagementCallback.GetInteractionModelEngine()->ReleaseAttributePathList(mpAttributePathList);
mManagementCallback.GetInteractionModelEngine()->ReleaseEventPathList(mpEventPathList);
mManagementCallback.GetInteractionModelEngine()->ReleaseDataVersionFilterList(mpDataVersionFilterList);
}

void ReadHandler::Close(CloseOptions options)
{
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
if (IsType(InteractionType::Subscribe) && options == CloseOptions::kDropPersistedSubscription)
{
auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage();
auto * subscriptionResumptionStorage = mManagementCallback.GetInteractionModelEngine()->GetSubscriptionResumptionStorage();
if (subscriptionResumptionStorage)
{
subscriptionResumptionStorage->Delete(GetInitiatorNodeId(), GetAccessingFabricIndex(), mSubscriptionId);
Expand Down Expand Up @@ -285,7 +285,8 @@ CHIP_ERROR ReadHandler::SendStatusReport(Protocols::InteractionModel::Status aSt
#if CHIP_CONFIG_UNSAFE_SUBSCRIPTION_EXCHANGE_MANAGER_USE
auto exchange = mExchangeMgr->NewContext(mSessionHandle.Get().Value(), this);
#else // CHIP_CONFIG_UNSAFE_SUBSCRIPTION_EXCHANGE_MANAGER_USE
auto exchange = InteractionModelEngine::GetInstance()->GetExchangeManager()->NewContext(mSessionHandle.Get().Value(), this);
auto exchange =
mManagementCallback.GetInteractionModelEngine()->GetExchangeManager()->NewContext(mSessionHandle.Get().Value(), this);
#endif // CHIP_CONFIG_UNSAFE_SUBSCRIPTION_EXCHANGE_MANAGER_USE
VerifyOrReturnLogError(exchange != nullptr, CHIP_ERROR_INCORRECT_STATE);
mExchangeCtx.Grab(exchange);
Expand All @@ -310,7 +311,8 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, b
#if CHIP_CONFIG_UNSAFE_SUBSCRIPTION_EXCHANGE_MANAGER_USE
auto exchange = mExchangeMgr->NewContext(mSessionHandle.Get().Value(), this);
#else // CHIP_CONFIG_UNSAFE_SUBSCRIPTION_EXCHANGE_MANAGER_USE
auto exchange = InteractionModelEngine::GetInstance()->GetExchangeManager()->NewContext(mSessionHandle.Get().Value(), this);
auto exchange =
mManagementCallback.GetInteractionModelEngine()->GetExchangeManager()->NewContext(mSessionHandle.Get().Value(), this);
#endif // CHIP_CONFIG_UNSAFE_SUBSCRIPTION_EXCHANGE_MANAGER_USE
VerifyOrReturnLogError(exchange != nullptr, CHIP_ERROR_INCORRECT_STATE);
mExchangeCtx.Grab(exchange);
Expand All @@ -320,7 +322,8 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, b

if (!IsReporting())
{
mCurrentReportsBeginGeneration = InteractionModelEngine::GetInstance()->GetReportingEngine().GetDirtySetGeneration();
mCurrentReportsBeginGeneration =
mManagementCallback.GetInteractionModelEngine()->GetReportingEngine().GetDirtySetGeneration();
}
SetStateFlag(ReadHandlerFlags::ChunkedReport, aMoreChunks);
bool responseExpected = IsType(InteractionType::Subscribe) || aMoreChunks;
Expand All @@ -339,7 +342,7 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, b
{
// Make sure we're not treated as an in-flight report waiting for a
// response by the reporting engine.
InteractionModelEngine::GetInstance()->GetReportingEngine().OnReportConfirm();
mManagementCallback.GetInteractionModelEngine()->GetReportingEngine().OnReportConfirm();
}

// If we just finished a non-priming subscription report, notify our observers.
Expand All @@ -353,7 +356,7 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, b
{
mPreviousReportsBeginGeneration = mCurrentReportsBeginGeneration;
ClearForceDirtyFlag();
InteractionModelEngine::GetInstance()->ReleaseDataVersionFilterList(mpDataVersionFilterList);
mManagementCallback.GetInteractionModelEngine()->ReleaseDataVersionFilterList(mpDataVersionFilterList);
}

return err;
Expand Down Expand Up @@ -489,12 +492,13 @@ CHIP_ERROR ReadHandler::ProcessAttributePaths(AttributePathIBs::Parser & aAttrib
AttributePathIB::Parser path;
ReturnErrorOnFailure(path.Init(reader));
ReturnErrorOnFailure(path.ParsePath(attribute));
ReturnErrorOnFailure(InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, attribute));
ReturnErrorOnFailure(
mManagementCallback.GetInteractionModelEngine()->PushFrontAttributePathList(mpAttributePathList, attribute));
}
// if we have exhausted this container
if (CHIP_END_OF_TLV == err)
{
InteractionModelEngine::GetInstance()->RemoveDuplicateConcreteAttributePath(mpAttributePathList);
mManagementCallback.GetInteractionModelEngine()->RemoveDuplicateConcreteAttributePath(mpAttributePathList);
mAttributePathExpandIterator = AttributePathExpandIterator(mpAttributePathList);
err = CHIP_NO_ERROR;
}
Expand All @@ -521,8 +525,8 @@ CHIP_ERROR ReadHandler::ProcessDataVersionFilterList(DataVersionFilterIBs::Parse
ReturnErrorOnFailure(path.GetEndpoint(&(versionFilter.mEndpointId)));
ReturnErrorOnFailure(path.GetCluster(&(versionFilter.mClusterId)));
VerifyOrReturnError(versionFilter.IsValidDataVersionFilter(), CHIP_ERROR_IM_MALFORMED_DATA_VERSION_FILTER_IB);
ReturnErrorOnFailure(
InteractionModelEngine::GetInstance()->PushFrontDataVersionFilterList(mpDataVersionFilterList, versionFilter));
ReturnErrorOnFailure(mManagementCallback.GetInteractionModelEngine()->PushFrontDataVersionFilterList(
mpDataVersionFilterList, versionFilter));
}

if (CHIP_END_OF_TLV == err)
Expand All @@ -544,7 +548,7 @@ CHIP_ERROR ReadHandler::ProcessEventPaths(EventPathIBs::Parser & aEventPathsPars
EventPathIB::Parser path;
ReturnErrorOnFailure(path.Init(reader));
ReturnErrorOnFailure(path.ParsePath(event));
ReturnErrorOnFailure(InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, event));
ReturnErrorOnFailure(mManagementCallback.GetInteractionModelEngine()->PushFrontEventPathParamsList(mpEventPathList, event));
}

// if we have exhausted this container
Expand Down Expand Up @@ -604,7 +608,7 @@ void ReadHandler::MoveToState(const HandlerState aTargetState)

if (IsAwaitingReportResponse() && aTargetState != HandlerState::AwaitingReportResponse)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().OnReportConfirm();
mManagementCallback.GetInteractionModelEngine()->GetReportingEngine().OnReportConfirm();
}

mState = aTargetState;
Expand All @@ -618,7 +622,7 @@ void ReadHandler::MoveToState(const HandlerState aTargetState)
{
if (ShouldReportUnscheduled())
{
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
mManagementCallback.GetInteractionModelEngine()->GetReportingEngine().ScheduleRun();
}
else
{
Expand Down Expand Up @@ -814,7 +818,7 @@ CHIP_ERROR ReadHandler::ProcessSubscribeRequest(System::PacketBufferHandle && aP

void ReadHandler::PersistSubscription()
{
auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage();
auto * subscriptionResumptionStorage = mManagementCallback.GetInteractionModelEngine()->GetSubscriptionResumptionStorage();
VerifyOrReturn(subscriptionResumptionStorage != nullptr);

SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo = { .mNodeId = GetInitiatorNodeId(),
Expand Down Expand Up @@ -843,7 +847,7 @@ void ReadHandler::AttributePathIsDirty(const AttributePathParams & aAttributeCha
{
ConcreteAttributePath path;

mDirtyGeneration = InteractionModelEngine::GetInstance()->GetReportingEngine().GetDirtySetGeneration();
mDirtyGeneration = mManagementCallback.GetInteractionModelEngine()->GetReportingEngine().GetDirtySetGeneration();

// We won't reset the path iterator for every AttributePathIsDirty call to reduce the number of full data reports.
// The iterator will be reset after finishing each report session.
Expand Down
5 changes: 5 additions & 0 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ class ReadHandler : public Messaging::ExchangeDelegate
* issues w.r.t the ReadHandler itself.
*/
virtual ApplicationCallback * GetAppCallback() = 0;

/*
* Retrieve the InteractionalModelEngine that holds this ReadHandler.
*/
virtual InteractionModelEngine * GetInteractionModelEngine() = 0;
};

// TODO (#27675) : Merge existing callback and observer into one class and have an observer pool in the Readhandler to notify
Expand Down
55 changes: 28 additions & 27 deletions src/app/reporting/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ using namespace chip::Access;
namespace chip {
namespace app {
namespace reporting {

Engine::Engine(InteractionModelEngine * apImEngine) : mpImEngine(apImEngine) {}

CHIP_ERROR Engine::Init()
{
mNumReportsInFlight = 0;
Expand Down Expand Up @@ -616,7 +619,7 @@ CHIP_ERROR Engine::ScheduleRun()
return CHIP_NO_ERROR;
}

Messaging::ExchangeManager * exchangeManager = InteractionModelEngine::GetInstance()->GetExchangeManager();
Messaging::ExchangeManager * exchangeManager = mpImEngine->GetExchangeManager();
if (exchangeManager == nullptr)
{
return CHIP_ERROR_INCORRECT_STATE;
Expand All @@ -640,17 +643,16 @@ void Engine::Run()
{
uint32_t numReadHandled = 0;

InteractionModelEngine * imEngine = InteractionModelEngine::GetInstance();

// We may be deallocating read handlers as we go. Track how many we had
// initially, so we make sure to go through all of them.
size_t initialAllocated = imEngine->mReadHandlers.Allocated();
size_t initialAllocated = mpImEngine->mReadHandlers.Allocated();
while ((mNumReportsInFlight < CHIP_IM_MAX_REPORTS_IN_FLIGHT) && (numReadHandled < initialAllocated))
{
ReadHandler * readHandler = imEngine->ActiveHandlerAt(mCurReadHandlerIdx % (uint32_t) imEngine->mReadHandlers.Allocated());
ReadHandler * readHandler =
mpImEngine->ActiveHandlerAt(mCurReadHandlerIdx % (uint32_t) mpImEngine->mReadHandlers.Allocated());
VerifyOrDie(readHandler != nullptr);

if (readHandler->ShouldReportUnscheduled() || imEngine->GetReportScheduler()->IsReportableNow(readHandler))
if (readHandler->ShouldReportUnscheduled() || mpImEngine->GetReportScheduler()->IsReportableNow(readHandler))
{

mRunningReadHandler = readHandler;
Expand All @@ -674,14 +676,14 @@ void Engine::Run()
// This isn't strictly necessary, but does make it easier to debug issues in this code if they
// do arise.
//
if (mCurReadHandlerIdx >= imEngine->mReadHandlers.Allocated())
if (mCurReadHandlerIdx >= mpImEngine->mReadHandlers.Allocated())
{
mCurReadHandlerIdx = 0;
}

bool allReadClean = true;

imEngine->mReadHandlers.ForEachActiveObject([&allReadClean](ReadHandler * handler) {
mpImEngine->mReadHandlers.ForEachActiveObject([&allReadClean](ReadHandler * handler) {
if (handler->IsDirty())
{
allReadClean = false;
Expand Down Expand Up @@ -839,26 +841,25 @@ CHIP_ERROR Engine::SetDirty(AttributePathParams & aAttributePath)
BumpDirtySetGeneration();

bool intersectsInterestPath = false;
InteractionModelEngine::GetInstance()->mReadHandlers.ForEachActiveObject(
[&aAttributePath, &intersectsInterestPath](ReadHandler * handler) {
// We call AttributePathIsDirty for both read interactions and subscribe interactions, since we may send inconsistent
// attribute data between two chunks. AttributePathIsDirty will not schedule a new run for read handlers which are
// waiting for a response to the last message chunk for read interactions.
if (handler->CanStartReporting() || handler->IsAwaitingReportResponse())
mpImEngine->mReadHandlers.ForEachActiveObject([&aAttributePath, &intersectsInterestPath](ReadHandler * handler) {
// We call AttributePathIsDirty for both read interactions and subscribe interactions, since we may send inconsistent
// attribute data between two chunks. AttributePathIsDirty will not schedule a new run for read handlers which are
// waiting for a response to the last message chunk for read interactions.
if (handler->CanStartReporting() || handler->IsAwaitingReportResponse())
{
for (auto object = handler->GetAttributePathList(); object != nullptr; object = object->mpNext)
{
for (auto object = handler->GetAttributePathList(); object != nullptr; object = object->mpNext)
if (object->mValue.Intersects(aAttributePath))
{
if (object->mValue.Intersects(aAttributePath))
{
handler->AttributePathIsDirty(aAttributePath);
intersectsInterestPath = true;
break;
}
handler->AttributePathIsDirty(aAttributePath);
intersectsInterestPath = true;
break;
}
}
}

return Loop::Continue;
});
return Loop::Continue;
});

if (!intersectsInterestPath)
{
Expand Down Expand Up @@ -899,7 +900,7 @@ void Engine::OnReportConfirm()

void Engine::GetMinEventLogPosition(uint32_t & aMinLogPosition)
{
InteractionModelEngine::GetInstance()->mReadHandlers.ForEachActiveObject([&aMinLogPosition](ReadHandler * handler) {
mpImEngine->mReadHandlers.ForEachActiveObject([&aMinLogPosition](ReadHandler * handler) {
if (handler->IsType(ReadHandler::InteractionType::Read))
{
return Loop::Continue;
Expand Down Expand Up @@ -934,13 +935,13 @@ CHIP_ERROR Engine::ScheduleEventDelivery(ConcreteEventPath & aPath, uint32_t aBy
// we don't need to call schedule run for event.
// If schedule run is called, actually we would not delivery events as well.
// Just wanna save one schedule run here
if (InteractionModelEngine::GetInstance()->mEventPathPool.Allocated() == 0)
if (mpImEngine->mEventPathPool.Allocated() == 0)
{
return CHIP_NO_ERROR;
}

bool isUrgentEvent = false;
InteractionModelEngine::GetInstance()->mReadHandlers.ForEachActiveObject([&aPath, &isUrgentEvent](ReadHandler * handler) {
mpImEngine->mReadHandlers.ForEachActiveObject([&aPath, &isUrgentEvent](ReadHandler * handler) {
if (handler->IsType(ReadHandler::InteractionType::Read))
{
return Loop::Continue;
Expand Down Expand Up @@ -971,7 +972,7 @@ CHIP_ERROR Engine::ScheduleEventDelivery(ConcreteEventPath & aPath, uint32_t aBy

void Engine::ScheduleUrgentEventDeliverySync(Optional<FabricIndex> fabricIndex)
{
InteractionModelEngine::GetInstance()->mReadHandlers.ForEachActiveObject([fabricIndex](ReadHandler * handler) {
mpImEngine->mReadHandlers.ForEachActiveObject([fabricIndex](ReadHandler * handler) {
if (handler->IsType(ReadHandler::InteractionType::Read))
{
return Loop::Continue;
Expand Down
8 changes: 8 additions & 0 deletions src/app/reporting/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#pragma once

#include <access/AccessControl.h>
#include <app/InteractionModelEngine.h>
#include <app/MessageDef/ReportDataMessage.h>
#include <app/ReadHandler.h>
#include <app/util/basic-types.h>
Expand Down Expand Up @@ -56,6 +57,11 @@ namespace reporting {
class Engine
{
public:
/**
* Constructor Engine with a valid InteractionModelEngine pointer.
*/
Engine(InteractionModelEngine * apImEngine);

/**
* Initializes the reporting engine. Should only be called once.
*
Expand Down Expand Up @@ -279,6 +285,8 @@ class Engine
uint32_t mReservedSize = 0;
uint32_t mMaxAttributesPerChunk = UINT32_MAX;
#endif

InteractionModelEngine * mpImEngine = nullptr;
};

}; // namespace reporting
Expand Down
4 changes: 4 additions & 0 deletions src/app/tests/TestReadInteraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ class NullReadHandlerCallback : public chip::app::ReadHandler::ManagementCallbac
public:
void OnDone(chip::app::ReadHandler & apReadHandlerObj) override {}
chip::app::ReadHandler::ApplicationCallback * GetAppCallback() override { return nullptr; }
chip::app::InteractionModelEngine * GetInteractionModelEngine() override
{
return chip::app::InteractionModelEngine::GetInstance();
}
};

} // namespace
Expand Down
Loading

0 comments on commit 2576913

Please sign in to comment.