Skip to content

Commit

Permalink
[ReadHandler] Integration of ReportScheduler into the ReadHandler and…
Browse files Browse the repository at this point in the history
… IM engine. (#28104)

* Integration of ReportSchedulers into the ReadHandler and IM engine.

Also added a static report scheduler to use in all test relying on the IM engine, also added test flags in ReportScheduler replace ReadHandler timing flags

Refactored the TestReadInteraction to adopt correct behavior

* Restyled by clang-format

* Restyled by gn

* Moved namespaces into class to avoid namespace pollution

* Fix missing pragma once and copyright

* Update src/app/reporting/ReportScheduler.h

Co-authored-by: mkardous-silabs <[email protected]>

* Apply suggestions from code review

Co-authored-by: Boris Zbarsky <[email protected]>

* Removed OnIntervalsChanged, removed TestReadInteraction from ReportScheduler's friend class and refactored the tests to use wrapper functions, refactored tests for the removal of OnIntervalsChanged, removed reportscheduler nullptr check form controllers

* Restyled by clang-format

* Fix for the TimerDelegates

And also added a default Scheduler to controllers

* Restyled by clang-format

* Added a timer Context class to allow interface between timer delegates, ReadHandlerNodes and SynchronisedReport Scheduler

* Fix using InteractionModel... error

* Added Platform::Delete() for allocated TimerDelegate and ReportScheduler on controller

* Update src/app/ReadHandler.cpp

Co-authored-by: mkardous-silabs <[email protected]>

* Update src/app/TimerDelegates.h

Co-authored-by: Boris Zbarsky <[email protected]>

* Addressed PR comments, next step trying to reduce RAM bloat

* Restyled by clang-format

* Removed IntrusiveList from the report Scheduler

* Removed ReadHandlerNode's inheritance of IntrusiveListBase

* Restyled by clang-format

* Fix commented code and comment syntax

* Restyled by clang-format

* Update src/app/tests/TestReportScheduler.cpp

Co-authored-by: Boris Zbarsky <[email protected]>

* Added extra check on timerDelegate in DeviceController, removed logging causing failure in linux CI

* Restyled by clang-format

---------

Co-authored-by: Restyled.io <[email protected]>
Co-authored-by: mkardous-silabs <[email protected]>
Co-authored-by: Boris Zbarsky <[email protected]>
  • Loading branch information
4 people authored and pull[bot] committed Jan 6, 2024
1 parent 1f25004 commit 3902440
Show file tree
Hide file tree
Showing 30 changed files with 711 additions and 504 deletions.
8 changes: 5 additions & 3 deletions src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,18 @@ InteractionModelEngine * InteractionModelEngine::GetInstance()
}

CHIP_ERROR InteractionModelEngine::Init(Messaging::ExchangeManager * apExchangeMgr, FabricTable * apFabricTable,
CASESessionManager * apCASESessionMgr,
reporting::ReportScheduler * reportScheduler, CASESessionManager * apCASESessionMgr,
SubscriptionResumptionStorage * subscriptionResumptionStorage)
{
VerifyOrReturnError(apFabricTable != nullptr, CHIP_ERROR_INVALID_ARGUMENT);
VerifyOrReturnError(apExchangeMgr != nullptr, CHIP_ERROR_INVALID_ARGUMENT);
VerifyOrReturnError(reportScheduler != nullptr, CHIP_ERROR_INVALID_ARGUMENT);

mpExchangeMgr = apExchangeMgr;
mpFabricTable = apFabricTable;
mpCASESessionMgr = apCASESessionMgr;
mpSubscriptionResumptionStorage = subscriptionResumptionStorage;
mReportScheduler = reportScheduler;

ReturnErrorOnFailure(mpFabricTable->AddFabricDelegate(this));
ReturnErrorOnFailure(mpExchangeMgr->RegisterUnsolicitedMessageHandlerForProtocol(Protocols::InteractionModel::Id, this));
Expand Down Expand Up @@ -741,7 +743,7 @@ Protocols::InteractionModel::Status InteractionModelEngine::OnReadInitialRequest

// We have already reserved enough resources for read requests, and have granted enough resources for current subscriptions, so
// we should be able to allocate resources requested by this request.
ReadHandler * handler = mReadHandlers.CreateObject(*this, apExchangeContext, aInteractionType);
ReadHandler * handler = mReadHandlers.CreateObject(*this, apExchangeContext, aInteractionType, mReportScheduler);
if (handler == nullptr)
{
ChipLogProgress(InteractionModel, "no resource for %s interaction",
Expand Down Expand Up @@ -1845,7 +1847,7 @@ void InteractionModelEngine::ResumeSubscriptionsTimerCallback(System::Layer * ap
return;
}

ReadHandler * handler = imEngine->mReadHandlers.CreateObject(*imEngine);
ReadHandler * handler = imEngine->mReadHandlers.CreateObject(*imEngine, imEngine->GetReportScheduler());
if (handler == nullptr)
{
ChipLogProgress(InteractionModel, "no resource for ReadHandler creation");
Expand Down
6 changes: 5 additions & 1 deletion src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include <app/WriteClient.h>
#include <app/WriteHandler.h>
#include <app/reporting/Engine.h>
#include <app/reporting/ReportScheduler.h>
#include <app/util/attribute-metadata.h>
#include <app/util/basic-types.h>

Expand Down Expand Up @@ -115,7 +116,7 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
*
*/
CHIP_ERROR Init(Messaging::ExchangeManager * apExchangeMgr, FabricTable * apFabricTable,
CASESessionManager * apCASESessionMgr = nullptr,
reporting::ReportScheduler * reportScheduler, CASESessionManager * apCASESessionMgr = nullptr,
SubscriptionResumptionStorage * subscriptionResumptionStorage = nullptr);

void Shutdown();
Expand Down Expand Up @@ -178,6 +179,8 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,

reporting::Engine & GetReportingEngine() { return mReportingEngine; }

reporting::ReportScheduler * GetReportScheduler() { return mReportScheduler; }

void ReleaseAttributePathList(ObjectList<AttributePathParams> *& aAttributePathList);

CHIP_ERROR PushFrontAttributePathList(ObjectList<AttributePathParams> *& aAttributePathList,
Expand Down Expand Up @@ -566,6 +569,7 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
ObjectPool<TimedHandler, CHIP_IM_MAX_NUM_TIMED_HANDLER> mTimedHandlers;
WriteHandler mWriteHandlers[CHIP_IM_MAX_NUM_WRITE_HANDLER];
reporting::Engine mReportingEngine;
reporting::ReportScheduler * mReportScheduler = nullptr;

static constexpr size_t kReservedHandlersForReads = kMinSupportedReadRequestsPerFabric * (CHIP_CONFIG_MAX_FABRICS);
static constexpr size_t kReservedPathsForReads = kMinSupportedPathsPerReadRequest * kReservedHandlersForReads;
Expand Down
159 changes: 24 additions & 135 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,9 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon

mSessionHandle.Grab(mExchangeCtx->GetSessionHandle());

// TODO (#27672): Uncomment when the ReportScheduler is implemented
#if 0
if (nullptr != observer)
{
if (CHIP_NO_ERROR == SetObserver(observer))
{
mObserver->OnReadHandlerCreated(this);
}
}
#endif
VerifyOrDie(observer != nullptr);
mObserver = observer;
mObserver->OnReadHandlerCreated(this);
}

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
Expand All @@ -97,16 +90,9 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Observer * observer) :
mInteractionType = InteractionType::Subscribe;
mFlags.ClearAll();

// TODO (#27672): Uncomment when the ReportScheduler is implemented
#if 0
if (nullptr != observer)
{
if (CHIP_NO_ERROR == SetObserver(observer))
{
mObserver->OnReadHandlerCreated(this);
}
}
#endif
VerifyOrDie(observer != nullptr);
mObserver = observer;
mObserver->OnReadHandlerCreated(this);
}

void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager,
Expand Down Expand Up @@ -150,28 +136,14 @@ void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager,

ReadHandler::~ReadHandler()
{
// TODO (#27672): Enable when the ReportScheduler is implemented and move in Close() after testing
#if 0
if (nullptr != mObserver)
{
mObserver->OnReadHandlerDestroyed(this);
}
#endif
mObserver->OnReadHandlerDestroyed(this);

auto * appCallback = mManagementCallback.GetAppCallback();
if (mFlags.Has(ReadHandlerFlags::ActiveSubscription) && appCallback)
{
appCallback->OnSubscriptionTerminated(*this);
}

if (IsType(InteractionType::Subscribe))
{
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
MinIntervalExpiredCallback, this);

InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
MaxIntervalExpiredCallback, this);
}

if (IsAwaitingReportResponse())
{
InteractionModelEngine::GetInstance()->GetReportingEngine().OnReportConfirm();
Expand Down Expand Up @@ -290,7 +262,7 @@ CHIP_ERROR ReadHandler::OnStatusResponse(Messaging::ExchangeContext * apExchange

CHIP_ERROR ReadHandler::SendStatusReport(Protocols::InteractionModel::Status aStatus)
{
VerifyOrReturnLogError(IsReportableNow(), CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnLogError(mState == HandlerState::GeneratingReports, CHIP_ERROR_INCORRECT_STATE);
if (IsPriming() || IsChunkedReport())
{
mSessionHandle.Grab(mExchangeCtx->GetSessionHandle());
Expand All @@ -314,7 +286,7 @@ CHIP_ERROR ReadHandler::SendStatusReport(Protocols::InteractionModel::Status aSt

CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, bool aMoreChunks)
{
VerifyOrReturnLogError(IsReportableNow(), CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnLogError(mState == HandlerState::GeneratingReports, CHIP_ERROR_INCORRECT_STATE);
VerifyOrDie(!IsAwaitingReportResponse()); // Should not be reportable!
if (IsPriming() || IsChunkedReport())
{
Expand Down Expand Up @@ -359,21 +331,11 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, b
InteractionModelEngine::GetInstance()->GetReportingEngine().OnReportConfirm();
}

if (IsType(InteractionType::Subscribe) && !IsPriming())
// If we just finished a non-priming subscription report, notify our observers.
// Priming reports are handled when we send a SubscribeResponse.
if (IsType(InteractionType::Subscribe) && !IsPriming() && !IsChunkedReport())
{
// TODO (#27672): Enable when the ReportScheduler is implemented and remove call to UpdateReportTimer, will be handled by
// the report Scheduler
#if 0
if (nullptr != mObserver)
{
mObserver->OnSubscriptionAction(this);
}
#endif

// Ignore the error from UpdateReportTimer. If we've
// successfully sent the message, we need to return success from
// this method.
UpdateReportTimer();
mObserver->OnSubscriptionAction(this);
}
}
if (!aMoreChunks)
Expand Down Expand Up @@ -641,16 +603,10 @@ void ReadHandler::MoveToState(const HandlerState aTargetState)
// If we just unblocked sending reports, let's go ahead and schedule the reporting
// engine to run to kick that off.
//
if (aTargetState == HandlerState::GeneratingReports && IsReportableNow())
if (aTargetState == HandlerState::GeneratingReports)
{
// TODO (#27672): Enable when the ReportScheduler is implemented and remove the call to ScheduleRun()
#if 0
if(nullptr != mObserver)
{
mObserver->OnBecameReportable(this);
}
#endif
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
// mObserver will take care of scheduling the report as soon as allowed
mObserver->OnBecameReportable(this);
}
}

Expand Down Expand Up @@ -691,15 +647,7 @@ CHIP_ERROR ReadHandler::SendSubscribeResponse()
ReturnErrorOnFailure(writer.Finalize(&packet));
VerifyOrReturnLogError(mExchangeCtx, CHIP_ERROR_INCORRECT_STATE);

// TODO (#27672): Uncomment when the ReportScheduler is implemented and remove call to UpdateReportTimer, handled by
// the report Scheduler
#if 0
if (nullptr != mObserver)
{
mObserver->OnSubscriptionAction(this);
}
#endif
ReturnErrorOnFailure(UpdateReportTimer());
mObserver->OnSubscriptionAction(this);

ClearStateFlag(ReadHandlerFlags::PrimingReports);
return mExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::SubscribeResponse, std::move(packet));
Expand Down Expand Up @@ -818,50 +766,6 @@ void ReadHandler::PersistSubscription()
}
}

// TODO (#27672): Remove when ReportScheduler is enabled as timing will now be handled by the ReportScheduler
void ReadHandler::MinIntervalExpiredCallback(System::Layer * apSystemLayer, void * apAppState)
{
VerifyOrReturn(apAppState != nullptr);
ReadHandler * readHandler = static_cast<ReadHandler *>(apAppState);
ChipLogDetail(DataManagement, "Unblock report hold after min %d seconds", readHandler->mMinIntervalFloorSeconds);
readHandler->ClearStateFlag(ReadHandlerFlags::WaitingUntilMinInterval);
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
System::Clock::Seconds16(readHandler->mMaxInterval - readHandler->mMinIntervalFloorSeconds), MaxIntervalExpiredCallback,
readHandler);
}

// TODO (#27672): Remove when ReportScheduler is enabled as timing will now be handled by the ReportScheduler
void ReadHandler::MaxIntervalExpiredCallback(System::Layer * apSystemLayer, void * apAppState)
{
VerifyOrReturn(apAppState != nullptr);
ReadHandler * readHandler = static_cast<ReadHandler *>(apAppState);
readHandler->ClearStateFlag(ReadHandlerFlags::WaitingUntilMaxInterval);
ChipLogProgress(DataManagement, "Refresh subscribe timer sync after %d seconds",
readHandler->mMaxInterval - readHandler->mMinIntervalFloorSeconds);
}

// TODO (#27672): Remove when ReportScheduler is enabled as timing will now be handled by the ReportScheduler
CHIP_ERROR ReadHandler::UpdateReportTimer()
{
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
MinIntervalExpiredCallback, this);
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
MaxIntervalExpiredCallback, this);

if (!IsChunkedReport())
{
ChipLogProgress(DataManagement, "Refresh Subscribe Sync Timer with min %d seconds and max %d seconds",
mMinIntervalFloorSeconds, mMaxInterval);
SetStateFlag(ReadHandlerFlags::WaitingUntilMinInterval);
SetStateFlag(ReadHandlerFlags::WaitingUntilMaxInterval);
ReturnErrorOnFailure(
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
System::Clock::Seconds16(mMinIntervalFloorSeconds), MinIntervalExpiredCallback, this));
}

return CHIP_NO_ERROR;
}

void ReadHandler::ResetPathIterator()
{
mAttributePathExpandIterator = AttributePathExpandIterator(mpAttributePathList);
Expand Down Expand Up @@ -897,17 +801,8 @@ void ReadHandler::AttributePathIsDirty(const AttributePathParams & aAttributeCha
mAttributeEncoderState = AttributeValueEncoder::AttributeEncodeState();
}

if (IsReportableNow())
{
// TODO (#27672): Enable when the ReportScheduler is implemented and remove the call to ScheduleRun()
#if 0
if(nullptr != mObserver)
{
mObserver->OnBecameReportable(this);
}
#endif
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
}
// ReportScheduler will take care of verifying the reportability of the handler and schedule the run
mObserver->OnBecameReportable(this);
}

Transport::SecureSession * ReadHandler::GetSession() const
Expand All @@ -926,20 +821,14 @@ void ReadHandler::ForceDirtyState()

void ReadHandler::SetStateFlag(ReadHandlerFlags aFlag, bool aValue)
{
bool oldReportable = IsReportableNow();
bool oldReportable = IsReportable();
mFlags.Set(aFlag, aValue);

// If we became reportable, schedule a reporting run.
if (!oldReportable && IsReportableNow())
if (!oldReportable && IsReportable())
{
// TODO (#27672): Enable when the ReportScheduler is implemented and remove the call to ScheduleRun()
#if 0
if(nullptr != mObserver)
{
mObserver->OnBecameReportable(this);
}
#endif
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
// If we became reportable, the scheduler will schedule a run as soon as allowed
mObserver->OnBecameReportable(this);
}
}

Expand Down
Loading

0 comments on commit 3902440

Please sign in to comment.