Skip to content

Commit

Permalink
[ReadHandler] Report Scheduler class (#27553)
Browse files Browse the repository at this point in the history
* Added a new class that will handle the scheduling of reports.

* Restyled by clang-format

* Removed un-necessary define in TestReportScheduler and applied refactor of SetReportingIntervals to SetMaxReportingIntervals to platform code

* Added TimerDelegate and wrapper functions around calls to Timer. Remove unnecessary checks for nullptr

* Added VerifyOrReturn after NL_TEST_ASSERTS for nullptr

* Completed TimerDelegate class and modified ReadHandlerNodes so they carry their own callback

* Modified TimerDelegate to allow to pass different objects as context

* ifdefing out ScheduleRun() to debug failing CI

* Added issue # to TODOs, refactored Min/Max Intervals to Min/Max Timestamp

* Clarified some comments regarding timing

* Restyled by whitespace

* Restyled by clang-format

* Added interface to GetMonotonicTimestamp in the timer delegate

* Apply suggestions from code review

Co-authored-by: Boris Zbarsky <[email protected]>

* Completed renaming to eliminate compiling error, moved TestReporScehduler in reporting namespace, addressed some low hanging fruits

* Removed useless objects from tests as well as useless typecasting, and unnecessary check

* Fixed comment about private methods used in ReportScheduler as a friend class

* Changed to SetMinReportInterval to SetMinReportingIntervalForTests, removed the IsChunkedReport from comment about friend class, added a mock timestamp and timer to test to better control time in simulation for specific timing test cases

* Apply suggestions from code review

Co-authored-by: Boris Zbarsky <[email protected]>

* Restyled by clang-format

* Removed all calls to ReadHandler States to prevent Engine calls from the Test as it seems to impact the CI

---------

Co-authored-by: Restyled.io <[email protected]>
Co-authored-by: Boris Zbarsky <[email protected]>
  • Loading branch information
3 people authored Jul 13, 2023
1 parent b3f8db6 commit edbdf8a
Show file tree
Hide file tree
Showing 13 changed files with 985 additions and 16 deletions.
2 changes: 1 addition & 1 deletion examples/platform/nrfconnect/util/ICDUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ CHIP_ERROR ICDUtil::OnSubscriptionRequested(chip::app::ReadHandler & aReadHandle
agreedMaxInterval = kSubscriptionMaxIntervalPublisherLimit;
}

return aReadHandler.SetReportingIntervals(agreedMaxInterval);
return aReadHandler.SetMaxReportingInterval(agreedMaxInterval);
}
2 changes: 1 addition & 1 deletion examples/platform/silabs/ICDSubscriptionCallback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,5 @@ CHIP_ERROR ICDSubscriptionCallback::OnSubscriptionRequested(chip::app::ReadHandl
decidedMaxInterval = maximumMaxInterval;
}

return aReadHandler.SetReportingIntervals(decidedMaxInterval);
return aReadHandler.SetMaxReportingInterval(decidedMaxInterval);
}
3 changes: 3 additions & 0 deletions src/app/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ static_library("app") {
"WriteHandler.cpp",
"reporting/Engine.cpp",
"reporting/Engine.h",
"reporting/ReportScheduler.h",
"reporting/ReportSchedulerImpl.cpp",
"reporting/ReportSchedulerImpl.h",
"reporting/reporting.h",
]

Expand Down
77 changes: 74 additions & 3 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace app {
using Status = Protocols::InteractionModel::Status;

ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeContext * apExchangeContext,
InteractionType aInteractionType) :
InteractionType aInteractionType, Observer * observer) :
mExchangeCtx(*this),
mManagementCallback(apCallback)
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
Expand All @@ -63,15 +63,37 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon
SetStateFlag(ReadHandlerFlags::PrimingReports);

mSessionHandle.Grab(mExchangeCtx->GetSessionHandle());

// TODO (#27672): Uncomment when the ReportScheduler is implemented
#if 0
if (nullptr != observer)
{
if (CHIP_NO_ERROR == SetObserver(observer))
{
mObserver->OnReadHandlerCreated(this);
}
}
#endif
}

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
ReadHandler::ReadHandler(ManagementCallback & apCallback) :
ReadHandler::ReadHandler(ManagementCallback & apCallback, Observer * observer) :
mExchangeCtx(*this), mManagementCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this),
mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
{
mInteractionType = InteractionType::Subscribe;
mFlags.ClearAll();

// TODO (#27672): Uncomment when the ReportScheduler is implemented
#if 0
if (nullptr != observer)
{
if (CHIP_NO_ERROR == SetObserver(observer))
{
mObserver->OnReadHandlerCreated(this);
}
}
#endif
}

void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager,
Expand Down Expand Up @@ -115,6 +137,13 @@ void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager,

ReadHandler::~ReadHandler()
{
// TODO (#27672): Enable when the ReportScheduler is implemented and move in Close() after testing
#if 0
if (nullptr != mObserver)
{
mObserver->OnReadHandlerDestroyed(this);
}
#endif
auto * appCallback = mManagementCallback.GetAppCallback();
if (mFlags.Has(ReadHandlerFlags::ActiveSubscription) && appCallback)
{
Expand Down Expand Up @@ -319,6 +348,15 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, b

if (IsType(InteractionType::Subscribe) && !IsPriming())
{
// TODO (#27672): Enable when the ReportScheduler is implemented and remove call to UpdateReportTimer, will be handled by
// the report Scheduler
#if 0
if (nullptr != mObserver)
{
mObserver->OnSubscriptionAction(this);
}
#endif

// Ignore the error from UpdateReportTimer. If we've
// successfully sent the message, we need to return success from
// this method.
Expand Down Expand Up @@ -593,6 +631,13 @@ void ReadHandler::MoveToState(const HandlerState aTargetState)
//
if (aTargetState == HandlerState::GeneratingReports && IsReportableNow())
{
// TODO (#27672): Enable when the ReportScheduler is implemented and remove the call to ScheduleRun()
#if 0
if(nullptr != mObserver)
{
mObserver->OnBecameReportable(this);
}
#endif
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
}
}
Expand Down Expand Up @@ -634,6 +679,14 @@ CHIP_ERROR ReadHandler::SendSubscribeResponse()
ReturnErrorOnFailure(writer.Finalize(&packet));
VerifyOrReturnLogError(mExchangeCtx, CHIP_ERROR_INCORRECT_STATE);

// TODO (#27672): Uncomment when the ReportScheduler is implemented and remove call to UpdateReportTimer, handled by
// the report Scheduler
#if 0
if (nullptr != mObserver)
{
mObserver->OnSubscriptionAction(this);
}
#endif
ReturnErrorOnFailure(UpdateReportTimer());

ClearStateFlag(ReadHandlerFlags::PrimingReports);
Expand Down Expand Up @@ -753,6 +806,7 @@ void ReadHandler::PersistSubscription()
}
}

// TODO (#27672): Remove when ReportScheduler is enabled as timing will now be handled by the ReportScheduler
void ReadHandler::MinIntervalExpiredCallback(System::Layer * apSystemLayer, void * apAppState)
{
VerifyOrReturn(apAppState != nullptr);
Expand All @@ -764,6 +818,7 @@ void ReadHandler::MinIntervalExpiredCallback(System::Layer * apSystemLayer, void
readHandler);
}

// TODO (#27672): Remove when ReportScheduler is enabled as timing will now be handled by the ReportScheduler
void ReadHandler::MaxIntervalExpiredCallback(System::Layer * apSystemLayer, void * apAppState)
{
VerifyOrReturn(apAppState != nullptr);
Expand All @@ -773,6 +828,7 @@ void ReadHandler::MaxIntervalExpiredCallback(System::Layer * apSystemLayer, void
readHandler->mMaxInterval - readHandler->mMinIntervalFloorSeconds);
}

// TODO (#27672): Remove when ReportScheduler is enabled as timing will now be handled by the ReportScheduler
CHIP_ERROR ReadHandler::UpdateReportTimer()
{
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
Expand Down Expand Up @@ -812,7 +868,7 @@ void ReadHandler::AttributePathIsDirty(const AttributePathParams & aAttributeCha
// Here we just reset the iterator to the beginning of the current cluster, if the dirty path affects it.
// This will ensure the reports are consistent within a single cluster generated from a single path in the request.

// TODO (#16699): Currently we can only gurentee the reports generated from a single path in the request are consistent. The
// TODO (#16699): Currently we can only guarantee the reports generated from a single path in the request are consistent. The
// data might be inconsistent if the user send a request with two paths from the same cluster. We need to clearify the behavior
// or make it consistent.
if (mAttributePathExpandIterator.Get(path) &&
Expand All @@ -831,6 +887,13 @@ void ReadHandler::AttributePathIsDirty(const AttributePathParams & aAttributeCha

if (IsReportableNow())
{
// TODO (#27672): Enable when the ReportScheduler is implemented and remove the call to ScheduleRun()
#if 0
if(nullptr != mObserver)
{
mObserver->OnBecameReportable(this);
}
#endif
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
}
}
Expand All @@ -853,9 +916,17 @@ void ReadHandler::SetStateFlag(ReadHandlerFlags aFlag, bool aValue)
{
bool oldReportable = IsReportableNow();
mFlags.Set(aFlag, aValue);

// If we became reportable, schedule a reporting run.
if (!oldReportable && IsReportableNow())
{
// TODO (#27672): Enable when the ReportScheduler is implemented and remove the call to ScheduleRun()
#if 0
if(nullptr != mObserver)
{
mObserver->OnBecameReportable(this);
}
#endif
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
}
}
Expand Down
87 changes: 78 additions & 9 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ namespace app {
namespace reporting {
class Engine;
class TestReportingEngine;
class ReportScheduler;
class TestReportScheduler;
} // namespace reporting

class InteractionModelEngine;
Expand Down Expand Up @@ -152,6 +154,38 @@ class ReadHandler : public Messaging::ExchangeDelegate
virtual ApplicationCallback * GetAppCallback() = 0;
};

// TODO (#27675) : Merge existing callback and observer into one class and have an observer pool in the Readhandler to notify
// every
/*
* Observer class for ReadHandler, meant to allow multiple objects to observe the ReadHandler. Currently only one observer is
* supported but all above callbacks should be merged into observer type and an observer pool should be added to allow multiple
* objects to observe ReadHandler
*/
class Observer
{
public:
virtual ~Observer() = default;

/// @brief Callback invoked to notify a ReadHandler was created and can be registered
/// @param[in] apReadHandler ReadHandler getting added
virtual void OnReadHandlerCreated(ReadHandler * apReadHandler) = 0;

/// @brief Callback invoked when a ReadHandler went from a non reportable state to a reportable state so a report can be
/// sent immediately if the minimal interval allows it. Otherwise the report should be rescheduled to the earliest time
/// allowed.
/// @param[in] apReadHandler ReadHandler that became dirty
virtual void OnBecameReportable(ReadHandler * apReadHandler) = 0;

/// @brief Callback invoked when the read handler needs to make sure to send a message to the subscriber within the next
/// maxInterval time period.
/// @param[in] apReadHandler ReadHandler that has generated a report
virtual void OnSubscriptionAction(ReadHandler * apReadHandler) = 0;

/// @brief Callback invoked when a ReadHandler is getting removed so it can be unregistered
/// @param[in] apReadHandler ReadHandler getting destroyed
virtual void OnReadHandlerDestroyed(ReadHandler * apReadHandler) = 0;
};

/*
* Destructor - as part of destruction, it will abort the exchange context
* if a valid one still exists.
Expand All @@ -167,7 +201,8 @@ class ReadHandler : public Messaging::ExchangeDelegate
* The callback passed in has to outlive this handler object.
*
*/
ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeContext * apExchangeContext, InteractionType aInteractionType);
ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeContext * apExchangeContext, InteractionType aInteractionType,
Observer * observer = nullptr);

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
/**
Expand All @@ -177,7 +212,7 @@ class ReadHandler : public Messaging::ExchangeDelegate
* The callback passed in has to outlive this handler object.
*
*/
ReadHandler(ManagementCallback & apCallback);
ReadHandler(ManagementCallback & apCallback, Observer * observer = nullptr);
#endif

const ObjectList<AttributePathParams> * GetAttributePathList() const { return mpAttributePathList; }
Expand All @@ -190,13 +225,22 @@ class ReadHandler : public Messaging::ExchangeDelegate
aMaxInterval = mMaxInterval;
}

CHIP_ERROR SetMinReportingIntervalForTests(uint16_t aMinInterval)
{
VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(aMinInterval <= mMaxInterval, CHIP_ERROR_INVALID_ARGUMENT);
// Ensures the new min interval is higher than the subscriber established one.
mMinIntervalFloorSeconds = std::max(mMinIntervalFloorSeconds, aMinInterval);
return CHIP_NO_ERROR;
}

/*
* Set the reporting intervals for the subscription. This SHALL only be called
* Set the maximum reporting interval for the subscription. This SHALL only be called
* from the OnSubscriptionRequested callback above. The restriction is as below
* MinIntervalFloor ≤ MaxInterval ≤ MAX(SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT, MaxIntervalCeiling)
* Where SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT is set to 60m in the spec.
*/
CHIP_ERROR SetReportingIntervals(uint16_t aMaxInterval)
CHIP_ERROR SetMaxReportingInterval(uint16_t aMaxInterval)
{
VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(mMinIntervalFloorSeconds <= aMaxInterval, CHIP_ERROR_INVALID_ARGUMENT);
Expand All @@ -206,6 +250,18 @@ class ReadHandler : public Messaging::ExchangeDelegate
return CHIP_NO_ERROR;
}

/// @brief Add an observer to the read handler, currently only one observer is supported but all other callbacks should be
/// merged with a general observer type to allow multiple object to observe readhandlers
/// @param aObserver observer to be added
/// @return CHIP_ERROR_INVALID_ARGUMENT if passing in nullptr
CHIP_ERROR SetObserver(Observer * aObserver)
{
VerifyOrReturnError(nullptr != aObserver, CHIP_ERROR_INVALID_ARGUMENT);
// TODO (#27675) : After merging the callbacks and observer, change so the method adds a new observer to an observer pool
mObserver = aObserver;
return CHIP_NO_ERROR;
}

private:
PriorityLevel GetCurrentPriority() const { return mCurrentPriority; }
EventNumber & GetEventMin() { return mEventMin; }
Expand All @@ -214,13 +270,13 @@ class ReadHandler : public Messaging::ExchangeDelegate
{
// WaitingUntilMinInterval is used to prevent subscription data delivery while we are
// waiting for the min reporting interval to elapse.
WaitingUntilMinInterval = (1 << 0),
WaitingUntilMinInterval = (1 << 0), // TODO (#27672): Remove once ReportScheduler is implemented or change to test flag

// WaitingUntilMaxInterval is used to prevent subscription empty report delivery while we
// are waiting for the max reporting interval to elaps. When WaitingUntilMaxInterval
// becomes false, we are allowed to send an empty report to keep the
// subscription alive on the client.
WaitingUntilMaxInterval = (1 << 1),
WaitingUntilMaxInterval = (1 << 1), // TODO (#27672): Remove once ReportScheduler is implemented

// The flag indicating we are in the middle of a series of chunked report messages, this flag will be cleared during
// sending last chunked message.
Expand Down Expand Up @@ -291,6 +347,8 @@ class ReadHandler : public Messaging::ExchangeDelegate

bool IsIdle() const { return mState == HandlerState::Idle; }

// TODO (#27672): Change back to IsReportable once ReportScheduler is implemented so this can assess reportability without
// considering timing. The ReporScheduler will handle timing.
/// @brief Returns whether the ReadHandler is in a state where it can immediately send a report. This function
/// is used to determine whether a report generation should be scheduled for the handler.
bool IsReportableNow() const
Expand Down Expand Up @@ -370,6 +428,7 @@ class ReadHandler : public Messaging::ExchangeDelegate

friend class TestReadInteraction;
friend class chip::app::reporting::TestReportingEngine;
friend class chip::app::reporting::TestReportScheduler;

//
// The engine needs to be able to Abort/Close a ReadHandler instance upon completion of work for a given read/subscribe
Expand All @@ -379,6 +438,10 @@ class ReadHandler : public Messaging::ExchangeDelegate
friend class chip::app::reporting::Engine;
friend class chip::app::InteractionModelEngine;

// The report scheduler needs to be able to access StateFlag private functions IsGeneratingReports() and IsDirty() to
// know when to schedule a run so it is declared as a friend class.
friend class chip::app::reporting::ReportScheduler;

enum class HandlerState : uint8_t
{
Idle, ///< The handler has been initialized and is ready
Expand All @@ -404,10 +467,13 @@ class ReadHandler : public Messaging::ExchangeDelegate

/// @brief This function is called when the min interval timer has expired, it restarts the timer on a timeout equal to the
/// difference between the max interval and the min interval.
static void MinIntervalExpiredCallback(System::Layer * apSystemLayer, void * apAppState);
static void MaxIntervalExpiredCallback(System::Layer * apSystemLayer, void * apAppState);
static void MinIntervalExpiredCallback(System::Layer * apSystemLayer, void * apAppState); // TODO (#27672): Remove once
// ReportScheduler is implemented.
static void MaxIntervalExpiredCallback(System::Layer * apSystemLayer, void * apAppState); // TODO (#27672): Remove once
// ReportScheduler is implemented.
/// @brief This function is called when a report is sent and it restarts the min interval timer.
CHIP_ERROR UpdateReportTimer();
CHIP_ERROR UpdateReportTimer(); // TODO (#27672) : Remove once ReportScheduler is implemented.

CHIP_ERROR SendSubscribeResponse();
CHIP_ERROR ProcessSubscribeRequest(System::PacketBufferHandle && aPayload);
CHIP_ERROR ProcessReadRequest(System::PacketBufferHandle && aPayload);
Expand Down Expand Up @@ -520,6 +586,9 @@ class ReadHandler : public Messaging::ExchangeDelegate
BitFlags<ReadHandlerFlags> mFlags;
InteractionType mInteractionType = InteractionType::Read;

// TODO (#27675): Merge all observers into one and that one will dispatch the callbacks to the right place.
Observer * mObserver = nullptr;

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
// Callbacks to handle server-initiated session success/failure
chip::Callback::Callback<OnDeviceConnected> mOnConnectedCallback;
Expand Down
1 change: 1 addition & 0 deletions src/app/reporting/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ void Engine::Run()
ReadHandler * readHandler = imEngine->ActiveHandlerAt(mCurReadHandlerIdx % (uint32_t) imEngine->mReadHandlers.Allocated());
VerifyOrDie(readHandler != nullptr);

// TODO (#27672): Replace with check with Report Scheduler if the read handler is reportable
if (readHandler->IsReportableNow())
{
mRunningReadHandler = readHandler;
Expand Down
Loading

0 comments on commit edbdf8a

Please sign in to comment.