Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ReadHandler] Synchronized report scheduler #27943

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/app/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ static_library("app") {
"reporting/ReportScheduler.h",
"reporting/ReportSchedulerImpl.cpp",
"reporting/ReportSchedulerImpl.h",
"reporting/SynchronizedReportSchedulerImpl.cpp",
"reporting/SynchronizedReportSchedulerImpl.h",
"reporting/reporting.h",
]

Expand Down
2 changes: 1 addition & 1 deletion src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
#include <system/SystemPacketBuffer.h>

// https://github.com/CHIP-Specifications/connectedhomeip-spec/blob/61a9d19e6af12fdfb0872bcff26d19de6c680a1a/src/Ch02_Architecture.adoc#1122-subscribe-interaction-limits
constexpr uint16_t kSubscriptionMaxIntervalPublisherLimit = 3600; // 3600 seconds
constexpr uint16_t kSubscriptionMaxIntervalPublisherLimit = 3600;

namespace chip {
namespace app {
Expand Down
43 changes: 32 additions & 11 deletions src/app/reporting/ReportScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class ReportScheduler : public ReadHandler::Observer
class ReadHandlerNode : public IntrusiveListNodeBase<>
{
public:
using TimerCompleteCallback = void (*)();
using TimerCompleteCallback = std::function<void()>;
lpbeliveau-silabs marked this conversation as resolved.
Show resolved Hide resolved

ReadHandlerNode(ReadHandler * aReadHandler, TimerDelegate * aTimerDelegate, TimerCompleteCallback aCallback) :
mTimerDelegate(aTimerDelegate), mCallback(aCallback)
Expand All @@ -78,29 +78,49 @@ class ReportScheduler : public ReadHandler::Observer
// the scheduler in the ReadHandler
Timestamp now = mTimerDelegate->GetCurrentMonotonicTimestamp();
return (mReadHandler->IsGeneratingReports() &&
((now >= mMinTimestamp && mReadHandler->IsDirty()) || now >= mMaxTimestamp));
(now >= mMinTimestamp && (mReadHandler->IsDirty() || now >= mMaxTimestamp || now >= mSyncTimestamp)));
}

bool IsEnginRunScheduled() const { return mEnginRunScheduled; }
void SetEngineRunScheduled(bool aEnginRunScheduled) { mEnginRunScheduled = aEnginRunScheduled; }

void SetIntervalTimeStamps(ReadHandler * aReadHandler)
{
uint16_t minInterval, maxInterval;
aReadHandler->GetReportingIntervals(minInterval, maxInterval);
Timestamp now = mTimerDelegate->GetCurrentMonotonicTimestamp();
mMinTimestamp = now + System::Clock::Seconds16(minInterval);
mMaxTimestamp = now + System::Clock::Seconds16(maxInterval);
Timestamp now = mTimerDelegate->GetCurrentMonotonicTimestamp();
mMinTimestamp = now + System::Clock::Seconds16(minInterval);
mMaxTimestamp = now + System::Clock::Seconds16(maxInterval);
mSyncTimestamp = mMaxTimestamp;
}

void RunCallback()
{
mCallback();
SetEngineRunScheduled(true);
}

void RunCallback() { mCallback(); }
void SetSyncTimestamp(System::Clock::Timestamp aSyncTimestamp)
{
// Prevents the sync timestamp being set to a value lower than the min timestamp
lpbeliveau-silabs marked this conversation as resolved.
Show resolved Hide resolved
VerifyOrReturn(aSyncTimestamp >= mMinTimestamp);
mSyncTimestamp = aSyncTimestamp;
}

Timestamp GetMinTimestamp() const { return mMinTimestamp; }
Timestamp GetMaxTimestamp() const { return mMaxTimestamp; }
System::Clock::Timestamp GetMinTimestamp() const { return mMinTimestamp; }
System::Clock::Timestamp GetMaxTimestamp() const { return mMaxTimestamp; }
System::Clock::Timestamp GetSyncTimestamp() const { return mSyncTimestamp; }

private:
TimerDelegate * mTimerDelegate;
TimerCompleteCallback mCallback;
ReadHandler * mReadHandler;
Timestamp mMinTimestamp;
Timestamp mMaxTimestamp;
Timestamp mSyncTimestamp; // Timestamp at which the read handler will be allowed to emit a report so it can be synced with
// other handlers that have an earlier max timestamp
bool mEnginRunScheduled = false; // Flag to indicate if the engine run is already scheduled so the scheduler can ignore
lpbeliveau-silabs marked this conversation as resolved.
Show resolved Hide resolved
// it when calculating the next run time
};

ReportScheduler(TimerDelegate * aTimerDelegate) : mTimerDelegate(aTimerDelegate) {}
Expand All @@ -109,11 +129,12 @@ class ReportScheduler : public ReadHandler::Observer
*/
virtual ~ReportScheduler() = default;

/// @brief Check if a ReadHandler is scheduled for reporting
virtual bool IsReportScheduled(ReadHandler * aReadHandler) = 0;
/// @brief Check whether a ReadHandler is reportable right now, taking into account its minimum and maximum intervals.
/// @param aReadHandler read handler to check
bool IsReportableNow(ReadHandler * aReadHandler) { return FindReadHandlerNode(aReadHandler)->IsReportableNow(); };
bool IsReportableNow(ReadHandler * aReadHandler)
{
return FindReadHandlerNode(aReadHandler)->IsReportableNow();
} // TODO: Change the IsReportableNow to IsReportable() for readHandlers
/// @brief Check if a ReadHandler is reportable without considering the timing
bool IsReadHandlerReportable(ReadHandler * aReadHandler) const
{
Expand Down
141 changes: 55 additions & 86 deletions src/app/reporting/ReportSchedulerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,12 @@ namespace chip {
namespace app {
namespace reporting {

using Seconds16 = System::Clock::Seconds16;
using Milliseconds32 = System::Clock::Milliseconds32;
using Timeout = System::Clock::Timeout;
using Timestamp = System::Clock::Timestamp;
using namespace System::Clock;
using ReadHandlerNode = ReportScheduler::ReadHandlerNode;

/// @brief Callback called when the report timer expires to schedule an engine run regardless of the state of the ReadHandlers, as
/// the engine already verifies that read handlers are reportable before sending a report
static void ReportTimerCallback()
void ReportSchedulerImpl::ReportTimerCallback()
{
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
}
Expand All @@ -43,7 +40,23 @@ ReportSchedulerImpl::ReportSchedulerImpl(TimerDelegate * aTimerDelegate) : Repor
/// @brief When a ReadHandler is added, register it, which will schedule an engine run
void ReportSchedulerImpl::OnReadHandlerCreated(ReadHandler * aReadHandler)
{
RegisterReadHandler(aReadHandler);
ReadHandlerNode * newNode = FindReadHandlerNode(aReadHandler);
// Handler must not be registered yet; it's just being constructed.
VerifyOrDie(nullptr == newNode);
// The NodePool is the same size as the ReadHandler pool from the IM Engine, so we don't need a check for size here since if a
// ReadHandler was created, space should be available.
newNode = mNodesPool.CreateObject(aReadHandler, mTimerDelegate, [this]() { this->ReportTimerCallback(); });
mReadHandlerList.PushBack(newNode);

ChipLogProgress(DataManagement,
"Registered a ReadHandler that will schedule a report between system Timestamp: %" PRIu64
" and system Timestamp %" PRIu64 ".",
newNode->GetMinTimestamp().count(), newNode->GetMaxTimestamp().count());

Milliseconds32 newTimeout;
// No need to check for error here, since the node is already in the list otherwise we would have Died
CalculateNextReportTimeout(newTimeout, newNode);
ScheduleReport(newTimeout, newNode);
}

/// @brief When a ReadHandler becomes reportable, schedule, verifies if the min interval of a handleris elapsed. If not,
Expand All @@ -54,80 +67,39 @@ void ReportSchedulerImpl::OnBecameReportable(ReadHandler * aReadHandler)
VerifyOrReturn(nullptr != node);

Milliseconds32 newTimeout;
if (node->IsReportableNow())
{
// If the handler is reportable now, just schedule a report immediately
newTimeout = Milliseconds32(0);
}
else
{
// If the handler is not reportable now, schedule a report for the min interval
newTimeout = node->GetMinTimestamp() - mTimerDelegate->GetCurrentMonotonicTimestamp();
}

CalculateNextReportTimeout(newTimeout, node);
ScheduleReport(newTimeout, node);
}

void ReportSchedulerImpl::OnSubscriptionAction(ReadHandler * apReadHandler)
{
ReadHandlerNode * node = FindReadHandlerNode(apReadHandler);
VerifyOrReturn(nullptr != node);
// Schedule callback for max interval by computing the difference between the max timestamp and the current timestamp
node->SetIntervalTimeStamps(apReadHandler);
Milliseconds32 newTimeout = node->GetMaxTimestamp() - mTimerDelegate->GetCurrentMonotonicTimestamp();
Milliseconds32 newTimeout;
CalculateNextReportTimeout(newTimeout, node);
ScheduleReport(newTimeout, node);
node->SetEngineRunScheduled(false);
lpbeliveau-silabs marked this conversation as resolved.
Show resolved Hide resolved
}

/// @brief When a ReadHandler is removed, unregister it, which will cancel any scheduled report
void ReportSchedulerImpl::OnReadHandlerDestroyed(ReadHandler * aReadHandler)
{
UnregisterReadHandler(aReadHandler);
}

CHIP_ERROR ReportSchedulerImpl::RegisterReadHandler(ReadHandler * aReadHandler)
{
ReadHandlerNode * newNode = FindReadHandlerNode(aReadHandler);
// Handler must not be registered yet; it's just being constructed.
VerifyOrDie(nullptr == newNode);
// The NodePool is the same size as the ReadHandler pool from the IM Engine, so we don't need a check for size here since if a
// ReadHandler was created, space should be available.
newNode = mNodesPool.CreateObject(aReadHandler, mTimerDelegate, ReportTimerCallback);
mReadHandlerList.PushBack(newNode);

ChipLogProgress(DataManagement,
"Registered a ReadHandler that will schedule a report between system Timestamp: %" PRIu64
" and system Timestamp %" PRIu64 ".",
newNode->GetMinTimestamp().count(), newNode->GetMaxTimestamp().count());
CancelReport(aReadHandler);

Timestamp now = mTimerDelegate->GetCurrentMonotonicTimestamp();
Milliseconds32 newTimeout;
// If the handler is reportable, schedule a report for the min interval, otherwise schedule a report for the max interval
if (newNode->IsReportableNow())
{
// If the handler is reportable now, just schedule a report immediately
newTimeout = Milliseconds32(0);
}
else if (IsReadHandlerReportable(aReadHandler) && (newNode->GetMinTimestamp() > now))
{
// If the handler is reportable now, but the min interval is not elapsed, schedule a report for the moment the min interval
// has elapsed
newTimeout = newNode->GetMinTimestamp() - now;
}
else
{
// If the handler is not reportable now, schedule a report for the max interval
newTimeout = newNode->GetMaxTimestamp() - now;
}
ReadHandlerNode * removeNode = FindReadHandlerNode(aReadHandler);
// Nothing to remove if the handler is not found in the list
VerifyOrReturn(nullptr != removeNode);

ReturnErrorOnFailure(ScheduleReport(newTimeout, newNode));
return CHIP_NO_ERROR;
mReadHandlerList.Remove(removeNode);
mNodesPool.ReleaseObject(removeNode);
}

CHIP_ERROR ReportSchedulerImpl::ScheduleReport(Timeout timeout, ReadHandlerNode * node)
{
// Cancel Report if it is currently scheduled
CancelSchedulerTimer(node);
StartSchedulerTimer(node, timeout);
mTimerDelegate->CancelTimer(node);
ReturnErrorOnFailure(mTimerDelegate->StartTimer(node, timeout));

return CHIP_NO_ERROR;
}
Expand All @@ -136,51 +108,48 @@ void ReportSchedulerImpl::CancelReport(ReadHandler * aReadHandler)
{
ReadHandlerNode * node = FindReadHandlerNode(aReadHandler);
VerifyOrReturn(nullptr != node);
CancelSchedulerTimer(node);
}

void ReportSchedulerImpl::UnregisterReadHandler(ReadHandler * aReadHandler)
{
CancelReport(aReadHandler);

ReadHandlerNode * removeNode = FindReadHandlerNode(aReadHandler);
// Nothing to remove if the handler is not found in the list
VerifyOrReturn(nullptr != removeNode);

mReadHandlerList.Remove(removeNode);
mNodesPool.ReleaseObject(removeNode);
mTimerDelegate->CancelTimer(node);
}

void ReportSchedulerImpl::UnregisterAllHandlers()
{
while (!mReadHandlerList.Empty())
{
ReadHandler * firstReadHandler = mReadHandlerList.begin()->GetReadHandler();
UnregisterReadHandler(firstReadHandler);
OnReadHandlerDestroyed(firstReadHandler);
}
}

bool ReportSchedulerImpl::IsReportScheduled(ReadHandler * aReadHandler)
{
ReadHandlerNode * node = FindReadHandlerNode(aReadHandler);
VerifyOrReturnValue(nullptr != node, false);
return CheckSchedulerTimerActive(node);
}

CHIP_ERROR ReportSchedulerImpl::StartSchedulerTimer(ReadHandlerNode * node, System::Clock::Timeout aTimeout)
{
// Schedule Report
return mTimerDelegate->StartTimer(node, aTimeout);
return mTimerDelegate->IsTimerActive(node);
}

void ReportSchedulerImpl::CancelSchedulerTimer(ReadHandlerNode * node)
CHIP_ERROR ReportSchedulerImpl::CalculateNextReportTimeout(Timeout & timeout, ReadHandlerNode * aNode)
{
mTimerDelegate->CancelTimer(node);
}
VerifyOrReturnError(mReadHandlerList.Contains(aNode), CHIP_ERROR_INVALID_ARGUMENT);
Timestamp now = mTimerDelegate->GetCurrentMonotonicTimestamp();

bool ReportSchedulerImpl::CheckSchedulerTimerActive(ReadHandlerNode * node)
{
return mTimerDelegate->IsTimerActive(node);
// If the handler is reportable now, just schedule a report immediately
if (aNode->IsReportableNow())
{
// If the handler is reportable now, just schedule a report immediately
timeout = Milliseconds32(0);
}
else if (IsReadHandlerReportable(aNode->GetReadHandler()) && (aNode->GetMinTimestamp() > now))
{
// If the handler is reportable now, but the min interval is not elapsed, schedule a report for the moment the min interval
// has elapsed
timeout = aNode->GetMinTimestamp() - now;
}
else
{
// If the handler is not reportable now, schedule a report for the max interval
timeout = aNode->GetMaxTimestamp() - now;
}
return CHIP_NO_ERROR;
}

} // namespace reporting
Expand Down
29 changes: 12 additions & 17 deletions src/app/reporting/ReportSchedulerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,30 @@ namespace reporting {
class ReportSchedulerImpl : public ReportScheduler
{
public:
using Timeout = System::Clock::Timeout;

ReportSchedulerImpl(TimerDelegate * aTimerDelegate);
~ReportSchedulerImpl() override { UnregisterAllHandlers(); }

// ReadHandlerObserver
void OnReadHandlerCreated(ReadHandler * aReadHandler) override;
void OnBecameReportable(ReadHandler * aReadHandler) override;
void OnSubscriptionAction(ReadHandler * aReadHandler) override;
void OnReadHandlerCreated(ReadHandler * aReadHandler) final;
void OnBecameReportable(ReadHandler * aReadHandler) final;
void OnSubscriptionAction(ReadHandler * aReadHandler) final;
void OnReadHandlerDestroyed(ReadHandler * aReadHandler) override;

bool IsReportScheduled(ReadHandler * aReadHandler);

void ReportTimerCallback();

protected:
virtual CHIP_ERROR RegisterReadHandler(ReadHandler * aReadHandler);
virtual CHIP_ERROR ScheduleReport(System::Clock::Timeout timeout, ReadHandlerNode * node);
virtual void CancelReport(ReadHandler * aReadHandler);
virtual void UnregisterReadHandler(ReadHandler * aReadHandler);
virtual CHIP_ERROR ScheduleReport(Timeout timeout, ReadHandlerNode * node);
void CancelReport(ReadHandler * aReadHandler);
virtual void UnregisterAllHandlers();

private:
friend class chip::app::reporting::TestReportScheduler;

bool IsReportScheduled(ReadHandler * aReadHandler) override;

/// @brief Start a timer for a given ReadHandlerNode, ensures that if a timer is already running for this node, it is cancelled
/// @param node Node of the ReadHandler list to start a timer for
/// @param aTimeout Delay before the timer expires
virtual CHIP_ERROR StartSchedulerTimer(ReadHandlerNode * node, System::Clock::Timeout aTimeout);
/// @brief Cancel the timer for a given ReadHandlerNode
virtual void CancelSchedulerTimer(ReadHandlerNode * node);
/// @brief Check if the timer for a given ReadHandlerNode is active
virtual bool CheckSchedulerTimerActive(ReadHandlerNode * node);
virtual CHIP_ERROR CalculateNextReportTimeout(Timeout & timeout, ReadHandlerNode * aNode);
};

} // namespace reporting
Expand Down
Loading