Skip to content

Commit

Permalink
[ReadHandler] Synchronized report scheduler (#27943)
Browse files Browse the repository at this point in the history
* Added a Synchronized ReportScheduler along with test to confirm the
behavior on the scheduler with up to 4 ReadHandlers as well logging
mechanism to find out what handler fires at what time.

* Added a quick fix because TestDecoding won't compile otherwise, this doesn't belong here

* Apply suggestions from code review

Co-authored-by: Boris Zbarsky <[email protected]>

* Refactored ReportScheduler Impls to better take advantage of inheritance, removed bloat, excluded test for platform in which problems are caused due to unprocessed engine runs

* Apply suggestions from code review

Co-authored-by: Boris Zbarsky <[email protected]>

* Applied comment review and refactoed next timeout calculation logic

* Completed unit test and logic

* Passing a ReportSchedulerPointer instead of an std::function to avoid dynamical memory allocation

* undid ReadHandler changes

* Update src/app/reporting/ReportScheduler.h

Co-authored-by: Boris Zbarsky <[email protected]>

* Removed un-necessary nullptr check, addressed comments regarding tests and added doc on unclear behavior

* Addressed redundant test

---------

Co-authored-by: Boris Zbarsky <[email protected]>
  • Loading branch information
2 people authored and pull[bot] committed Jul 5, 2024
1 parent b8fd040 commit f615859
Show file tree
Hide file tree
Showing 8 changed files with 814 additions and 138 deletions.
2 changes: 2 additions & 0 deletions src/app/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ static_library("app") {
"reporting/ReportScheduler.h",
"reporting/ReportSchedulerImpl.cpp",
"reporting/ReportSchedulerImpl.h",
"reporting/SynchronizedReportSchedulerImpl.cpp",
"reporting/SynchronizedReportSchedulerImpl.h",
"reporting/reporting.h",
]

Expand Down
54 changes: 38 additions & 16 deletions src/app/reporting/ReportScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,12 @@ class ReportScheduler : public ReadHandler::Observer
class ReadHandlerNode : public IntrusiveListNodeBase<>
{
public:
using TimerCompleteCallback = void (*)();

ReadHandlerNode(ReadHandler * aReadHandler, TimerDelegate * aTimerDelegate, TimerCompleteCallback aCallback) :
mTimerDelegate(aTimerDelegate), mCallback(aCallback)
ReadHandlerNode(ReadHandler * aReadHandler, TimerDelegate * aTimerDelegate, ReportScheduler * aScheduler) :
mTimerDelegate(aTimerDelegate), mScheduler(aScheduler)
{
VerifyOrDie(aReadHandler != nullptr);
VerifyOrDie(aTimerDelegate != nullptr);
VerifyOrDie(aCallback != nullptr);
VerifyOrDie(aScheduler != nullptr);

mReadHandler = aReadHandler;
SetIntervalTimeStamps(aReadHandler);
Expand All @@ -78,29 +76,50 @@ class ReportScheduler : public ReadHandler::Observer
// the scheduler in the ReadHandler
Timestamp now = mTimerDelegate->GetCurrentMonotonicTimestamp();
return (mReadHandler->IsGeneratingReports() &&
((now >= mMinTimestamp && mReadHandler->IsDirty()) || now >= mMaxTimestamp));
(now >= mMinTimestamp && (mReadHandler->IsDirty() || now >= mMaxTimestamp || now >= mSyncTimestamp)));
}

bool IsEngineRunScheduled() const { return mEngineRunScheduled; }
void SetEngineRunScheduled(bool aEnginRunScheduled) { mEngineRunScheduled = aEnginRunScheduled; }

void SetIntervalTimeStamps(ReadHandler * aReadHandler)
{
uint16_t minInterval, maxInterval;
aReadHandler->GetReportingIntervals(minInterval, maxInterval);
Timestamp now = mTimerDelegate->GetCurrentMonotonicTimestamp();
mMinTimestamp = now + System::Clock::Seconds16(minInterval);
mMaxTimestamp = now + System::Clock::Seconds16(maxInterval);
Timestamp now = mTimerDelegate->GetCurrentMonotonicTimestamp();
mMinTimestamp = now + System::Clock::Seconds16(minInterval);
mMaxTimestamp = now + System::Clock::Seconds16(maxInterval);
mSyncTimestamp = mMaxTimestamp;
}

void RunCallback()
{
mScheduler->ReportTimerCallback();
SetEngineRunScheduled(true);
}

void RunCallback() { mCallback(); }
void SetSyncTimestamp(System::Clock::Timestamp aSyncTimestamp)
{
// Prevents the sync timestamp being set to a value lower than the min timestamp to prevent it to appear as reportable
// on the next timeout calculation and cause the scheduler to run the engine too early
VerifyOrReturn(aSyncTimestamp >= mMinTimestamp);
mSyncTimestamp = aSyncTimestamp;
}

Timestamp GetMinTimestamp() const { return mMinTimestamp; }
Timestamp GetMaxTimestamp() const { return mMaxTimestamp; }
System::Clock::Timestamp GetMinTimestamp() const { return mMinTimestamp; }
System::Clock::Timestamp GetMaxTimestamp() const { return mMaxTimestamp; }
System::Clock::Timestamp GetSyncTimestamp() const { return mSyncTimestamp; }

private:
TimerDelegate * mTimerDelegate;
TimerCompleteCallback mCallback;
ReadHandler * mReadHandler;
ReportScheduler * mScheduler;
Timestamp mMinTimestamp;
Timestamp mMaxTimestamp;
Timestamp mSyncTimestamp; // Timestamp at which the read handler will be allowed to emit a report so it can be synced with
// other handlers that have an earlier max timestamp
bool mEngineRunScheduled = false; // Flag to indicate if the engine run is already scheduled so the scheduler can ignore
// it when calculating the next run time
};

ReportScheduler(TimerDelegate * aTimerDelegate) : mTimerDelegate(aTimerDelegate) {}
Expand All @@ -109,11 +128,14 @@ class ReportScheduler : public ReadHandler::Observer
*/
virtual ~ReportScheduler() = default;

/// @brief Check if a ReadHandler is scheduled for reporting
virtual bool IsReportScheduled(ReadHandler * aReadHandler) = 0;
virtual void ReportTimerCallback() = 0;

/// @brief Check whether a ReadHandler is reportable right now, taking into account its minimum and maximum intervals.
/// @param aReadHandler read handler to check
bool IsReportableNow(ReadHandler * aReadHandler) { return FindReadHandlerNode(aReadHandler)->IsReportableNow(); };
bool IsReportableNow(ReadHandler * aReadHandler)
{
return FindReadHandlerNode(aReadHandler)->IsReportableNow();
} // TODO: Change the IsReportableNow to IsReportable() for readHandlers
/// @brief Check if a ReadHandler is reportable without considering the timing
bool IsReadHandlerReportable(ReadHandler * aReadHandler) const
{
Expand Down
141 changes: 55 additions & 86 deletions src/app/reporting/ReportSchedulerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,12 @@ namespace chip {
namespace app {
namespace reporting {

using Seconds16 = System::Clock::Seconds16;
using Milliseconds32 = System::Clock::Milliseconds32;
using Timeout = System::Clock::Timeout;
using Timestamp = System::Clock::Timestamp;
using namespace System::Clock;
using ReadHandlerNode = ReportScheduler::ReadHandlerNode;

/// @brief Callback called when the report timer expires to schedule an engine run regardless of the state of the ReadHandlers, as
/// the engine already verifies that read handlers are reportable before sending a report
static void ReportTimerCallback()
void ReportSchedulerImpl::ReportTimerCallback()
{
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
}
Expand All @@ -43,7 +40,23 @@ ReportSchedulerImpl::ReportSchedulerImpl(TimerDelegate * aTimerDelegate) : Repor
/// @brief When a ReadHandler is added, register it, which will schedule an engine run
void ReportSchedulerImpl::OnReadHandlerCreated(ReadHandler * aReadHandler)
{
RegisterReadHandler(aReadHandler);
ReadHandlerNode * newNode = FindReadHandlerNode(aReadHandler);
// Handler must not be registered yet; it's just being constructed.
VerifyOrDie(nullptr == newNode);
// The NodePool is the same size as the ReadHandler pool from the IM Engine, so we don't need a check for size here since if a
// ReadHandler was created, space should be available.
newNode = mNodesPool.CreateObject(aReadHandler, mTimerDelegate, this);
mReadHandlerList.PushBack(newNode);

ChipLogProgress(DataManagement,
"Registered a ReadHandler that will schedule a report between system Timestamp: %" PRIu64
" and system Timestamp %" PRIu64 ".",
newNode->GetMinTimestamp().count(), newNode->GetMaxTimestamp().count());

Milliseconds32 newTimeout;
// No need to check for error here, since the node is already in the list otherwise we would have Died
CalculateNextReportTimeout(newTimeout, newNode);
ScheduleReport(newTimeout, newNode);
}

/// @brief When a ReadHandler becomes reportable, schedule, verifies if the min interval of a handleris elapsed. If not,
Expand All @@ -54,80 +67,39 @@ void ReportSchedulerImpl::OnBecameReportable(ReadHandler * aReadHandler)
VerifyOrReturn(nullptr != node);

Milliseconds32 newTimeout;
if (node->IsReportableNow())
{
// If the handler is reportable now, just schedule a report immediately
newTimeout = Milliseconds32(0);
}
else
{
// If the handler is not reportable now, schedule a report for the min interval
newTimeout = node->GetMinTimestamp() - mTimerDelegate->GetCurrentMonotonicTimestamp();
}

CalculateNextReportTimeout(newTimeout, node);
ScheduleReport(newTimeout, node);
}

void ReportSchedulerImpl::OnSubscriptionAction(ReadHandler * apReadHandler)
{
ReadHandlerNode * node = FindReadHandlerNode(apReadHandler);
VerifyOrReturn(nullptr != node);
// Schedule callback for max interval by computing the difference between the max timestamp and the current timestamp
node->SetIntervalTimeStamps(apReadHandler);
Milliseconds32 newTimeout = node->GetMaxTimestamp() - mTimerDelegate->GetCurrentMonotonicTimestamp();
Milliseconds32 newTimeout;
CalculateNextReportTimeout(newTimeout, node);
ScheduleReport(newTimeout, node);
node->SetEngineRunScheduled(false);
}

/// @brief When a ReadHandler is removed, unregister it, which will cancel any scheduled report
void ReportSchedulerImpl::OnReadHandlerDestroyed(ReadHandler * aReadHandler)
{
UnregisterReadHandler(aReadHandler);
}

CHIP_ERROR ReportSchedulerImpl::RegisterReadHandler(ReadHandler * aReadHandler)
{
ReadHandlerNode * newNode = FindReadHandlerNode(aReadHandler);
// Handler must not be registered yet; it's just being constructed.
VerifyOrDie(nullptr == newNode);
// The NodePool is the same size as the ReadHandler pool from the IM Engine, so we don't need a check for size here since if a
// ReadHandler was created, space should be available.
newNode = mNodesPool.CreateObject(aReadHandler, mTimerDelegate, ReportTimerCallback);
mReadHandlerList.PushBack(newNode);

ChipLogProgress(DataManagement,
"Registered a ReadHandler that will schedule a report between system Timestamp: %" PRIu64
" and system Timestamp %" PRIu64 ".",
newNode->GetMinTimestamp().count(), newNode->GetMaxTimestamp().count());
CancelReport(aReadHandler);

Timestamp now = mTimerDelegate->GetCurrentMonotonicTimestamp();
Milliseconds32 newTimeout;
// If the handler is reportable, schedule a report for the min interval, otherwise schedule a report for the max interval
if (newNode->IsReportableNow())
{
// If the handler is reportable now, just schedule a report immediately
newTimeout = Milliseconds32(0);
}
else if (IsReadHandlerReportable(aReadHandler) && (newNode->GetMinTimestamp() > now))
{
// If the handler is reportable now, but the min interval is not elapsed, schedule a report for the moment the min interval
// has elapsed
newTimeout = newNode->GetMinTimestamp() - now;
}
else
{
// If the handler is not reportable now, schedule a report for the max interval
newTimeout = newNode->GetMaxTimestamp() - now;
}
ReadHandlerNode * removeNode = FindReadHandlerNode(aReadHandler);
// Nothing to remove if the handler is not found in the list
VerifyOrReturn(nullptr != removeNode);

ReturnErrorOnFailure(ScheduleReport(newTimeout, newNode));
return CHIP_NO_ERROR;
mReadHandlerList.Remove(removeNode);
mNodesPool.ReleaseObject(removeNode);
}

CHIP_ERROR ReportSchedulerImpl::ScheduleReport(Timeout timeout, ReadHandlerNode * node)
{
// Cancel Report if it is currently scheduled
CancelSchedulerTimer(node);
StartSchedulerTimer(node, timeout);
mTimerDelegate->CancelTimer(node);
ReturnErrorOnFailure(mTimerDelegate->StartTimer(node, timeout));

return CHIP_NO_ERROR;
}
Expand All @@ -136,51 +108,48 @@ void ReportSchedulerImpl::CancelReport(ReadHandler * aReadHandler)
{
ReadHandlerNode * node = FindReadHandlerNode(aReadHandler);
VerifyOrReturn(nullptr != node);
CancelSchedulerTimer(node);
}

void ReportSchedulerImpl::UnregisterReadHandler(ReadHandler * aReadHandler)
{
CancelReport(aReadHandler);

ReadHandlerNode * removeNode = FindReadHandlerNode(aReadHandler);
// Nothing to remove if the handler is not found in the list
VerifyOrReturn(nullptr != removeNode);

mReadHandlerList.Remove(removeNode);
mNodesPool.ReleaseObject(removeNode);
mTimerDelegate->CancelTimer(node);
}

void ReportSchedulerImpl::UnregisterAllHandlers()
{
while (!mReadHandlerList.Empty())
{
ReadHandler * firstReadHandler = mReadHandlerList.begin()->GetReadHandler();
UnregisterReadHandler(firstReadHandler);
OnReadHandlerDestroyed(firstReadHandler);
}
}

bool ReportSchedulerImpl::IsReportScheduled(ReadHandler * aReadHandler)
{
ReadHandlerNode * node = FindReadHandlerNode(aReadHandler);
VerifyOrReturnValue(nullptr != node, false);
return CheckSchedulerTimerActive(node);
}

CHIP_ERROR ReportSchedulerImpl::StartSchedulerTimer(ReadHandlerNode * node, System::Clock::Timeout aTimeout)
{
// Schedule Report
return mTimerDelegate->StartTimer(node, aTimeout);
return mTimerDelegate->IsTimerActive(node);
}

void ReportSchedulerImpl::CancelSchedulerTimer(ReadHandlerNode * node)
CHIP_ERROR ReportSchedulerImpl::CalculateNextReportTimeout(Timeout & timeout, ReadHandlerNode * aNode)
{
mTimerDelegate->CancelTimer(node);
}
VerifyOrReturnError(mReadHandlerList.Contains(aNode), CHIP_ERROR_INVALID_ARGUMENT);
Timestamp now = mTimerDelegate->GetCurrentMonotonicTimestamp();

bool ReportSchedulerImpl::CheckSchedulerTimerActive(ReadHandlerNode * node)
{
return mTimerDelegate->IsTimerActive(node);
// If the handler is reportable now, just schedule a report immediately
if (aNode->IsReportableNow())
{
// If the handler is reportable now, just schedule a report immediately
timeout = Milliseconds32(0);
}
else if (IsReadHandlerReportable(aNode->GetReadHandler()) && (aNode->GetMinTimestamp() > now))
{
// If the handler is reportable now, but the min interval is not elapsed, schedule a report for the moment the min interval
// has elapsed
timeout = aNode->GetMinTimestamp() - now;
}
else
{
// If the handler is not reportable now, schedule a report for the max interval
timeout = aNode->GetMaxTimestamp() - now;
}
return CHIP_NO_ERROR;
}

} // namespace reporting
Expand Down
29 changes: 12 additions & 17 deletions src/app/reporting/ReportSchedulerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,30 @@ namespace reporting {
class ReportSchedulerImpl : public ReportScheduler
{
public:
using Timeout = System::Clock::Timeout;

ReportSchedulerImpl(TimerDelegate * aTimerDelegate);
~ReportSchedulerImpl() override { UnregisterAllHandlers(); }

// ReadHandlerObserver
void OnReadHandlerCreated(ReadHandler * aReadHandler) override;
void OnBecameReportable(ReadHandler * aReadHandler) override;
void OnSubscriptionAction(ReadHandler * aReadHandler) override;
void OnReadHandlerCreated(ReadHandler * aReadHandler) final;
void OnBecameReportable(ReadHandler * aReadHandler) final;
void OnSubscriptionAction(ReadHandler * aReadHandler) final;
void OnReadHandlerDestroyed(ReadHandler * aReadHandler) override;

bool IsReportScheduled(ReadHandler * aReadHandler);

void ReportTimerCallback() override;

protected:
virtual CHIP_ERROR RegisterReadHandler(ReadHandler * aReadHandler);
virtual CHIP_ERROR ScheduleReport(System::Clock::Timeout timeout, ReadHandlerNode * node);
virtual void CancelReport(ReadHandler * aReadHandler);
virtual void UnregisterReadHandler(ReadHandler * aReadHandler);
virtual CHIP_ERROR ScheduleReport(Timeout timeout, ReadHandlerNode * node);
void CancelReport(ReadHandler * aReadHandler);
virtual void UnregisterAllHandlers();

private:
friend class chip::app::reporting::TestReportScheduler;

bool IsReportScheduled(ReadHandler * aReadHandler) override;

/// @brief Start a timer for a given ReadHandlerNode, ensures that if a timer is already running for this node, it is cancelled
/// @param node Node of the ReadHandler list to start a timer for
/// @param aTimeout Delay before the timer expires
virtual CHIP_ERROR StartSchedulerTimer(ReadHandlerNode * node, System::Clock::Timeout aTimeout);
/// @brief Cancel the timer for a given ReadHandlerNode
virtual void CancelSchedulerTimer(ReadHandlerNode * node);
/// @brief Check if the timer for a given ReadHandlerNode is active
virtual bool CheckSchedulerTimerActive(ReadHandlerNode * node);
virtual CHIP_ERROR CalculateNextReportTimeout(Timeout & timeout, ReadHandlerNode * aNode);
};

} // namespace reporting
Expand Down
Loading

0 comments on commit f615859

Please sign in to comment.