Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ICD] Server side subscription persistence and resumption #24361

Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
7f8db2f
[ICD] Server side subscription persistence and resumption
jtung-apple Jan 11, 2023
0e003ba
restyled change
jtung-apple Jan 11, 2023
a3b8add
Correct SimpleSubscriptionResumptionStorage TLV format documentation
jtung-apple Jan 11, 2023
c68cf85
Fix ReadHandler resumption
jtung-apple Jan 11, 2023
c6fb866
Correct state size estimate
jtung-apple Jan 11, 2023
a82a9d0
Replace %zu in log format
jtung-apple Jan 11, 2023
57cf32e
Move TLV buffer off stack
jtung-apple Jan 11, 2023
05006c3
restyled
jtung-apple Jan 11, 2023
9f56e48
Replaced vector with ScopedMemoryBufferWithSize and shim structs
jtung-apple Jan 12, 2023
02ebc9c
Fix struct member order
jtung-apple Jan 12, 2023
65b17af
Fix one more struct member order
jtung-apple Jan 12, 2023
6bacbbd
Fixed more stack buffer
jtung-apple Jan 12, 2023
865a169
Fix copy/paste bug
jtung-apple Jan 12, 2023
3d35c8d
Moved SubscriptionList array to unique_ptr and dynamically allocated …
jtung-apple Jan 13, 2023
1d5bed2
Moved SubscriptionIndex array to unique_ptr and dynamically allocated…
jtung-apple Jan 13, 2023
c050d5b
Fixed error condition checks
jtung-apple Jan 13, 2023
843dfb9
Fixed array size check
jtung-apple Jan 13, 2023
e442d5a
Addressed CI issues, and disabled subscription persistence and resump…
jtung-apple Jan 13, 2023
2316be4
Addressed PR review comments, including:
jtung-apple Jan 15, 2023
cd757e1
Restyled and ReadHandler include
jtung-apple Jan 15, 2023
bda0ca9
ReadHandler Callback fix
jtung-apple Jan 16, 2023
8cc51ef
Merge branch 'master' into jtung/icd-server-subscription-persistence
jtung-apple Jan 16, 2023
3660392
Fix ReadHandler Callback const argument
jtung-apple Jan 16, 2023
8b6fe6a
Explicitly disable subscription persistence and address review comments
jtung-apple Jan 17, 2023
dd6903d
Fixed priming reports on resumption
jtung-apple Jan 18, 2023
cd69518
Revamp subscription storage into flat structure and add unit test
jtung-apple Jan 19, 2023
3a509f4
Fix unit test build warning and minor PR comment change
jtung-apple Jan 19, 2023
6960898
Update src/app/SubscriptionResumptionStorage.h
jtung-apple Jan 19, 2023
194361e
Minor changes to address PR comments
jtung-apple Jan 19, 2023
f8d463b
Address PR review comments:
jtung-apple Jan 19, 2023
3506edf
Address PR comments:
jtung-apple Jan 19, 2023
122224c
Changed storage MaxCount mechanics to Init time clean up
jtung-apple Jan 19, 2023
22cb37c
Clean up comments and unused commented-out old code
jtung-apple Jan 19, 2023
2445ab5
Addressed PR comments:
jtung-apple Jan 19, 2023
72e95cd
Update src/app/SimpleSubscriptionResumptionStorage.cpp
jtung-apple Jan 20, 2023
0a26d5a
Leak fix and better loop variable name
jtung-apple Jan 20, 2023
952007f
Update src/app/InteractionModelEngine.cpp
jtung-apple Jan 20, 2023
866ba53
Remove reference to previously removed variable for config that turns…
jtung-apple Jan 20, 2023
cedd227
Addressed PR comments and enabled chip-tool for testing
jtung-apple Jan 20, 2023
b11ca7e
Changed storage of attribute/event paths to proper List/Structure TLV
jtung-apple Jan 20, 2023
29cbb72
Fixed attribute load
jtung-apple Jan 20, 2023
4e121e2
Make Unit Test names more unique and tighten CHIP_CONFIG_PERSIST_SUBS…
jtung-apple Jan 20, 2023
b8ef2e6
Addressed PR comments and CI issues:
jtung-apple Jan 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/app/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,11 @@ static_library("app") {
"ReadHandler.cpp",
"RequiredPrivilege.cpp",
"RequiredPrivilege.h",
"SimpleSubscriptionResumptionStorage.cpp",
"SimpleSubscriptionResumptionStorage.h",
"StatusResponse.cpp",
"StatusResponse.h",
"SubscriptionResumptionStorage.h",
"TimedHandler.cpp",
"TimedHandler.h",
"TimedRequest.cpp",
Expand Down
47 changes: 43 additions & 4 deletions src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@ InteractionModelEngine * InteractionModelEngine::GetInstance()
}

CHIP_ERROR InteractionModelEngine::Init(Messaging::ExchangeManager * apExchangeMgr, FabricTable * apFabricTable,
CASESessionManager * apCASESessionMgr)
CASESessionManager * apCASESessionMgr,
SubscriptionResumptionStorage * subscriptionResumptionStorage)
{
VerifyOrReturnError(apFabricTable != nullptr, CHIP_ERROR_INVALID_ARGUMENT);
VerifyOrReturnError(apExchangeMgr != nullptr, CHIP_ERROR_INVALID_ARGUMENT);

mpExchangeMgr = apExchangeMgr;
mpFabricTable = apFabricTable;
mpCASESessionMgr = apCASESessionMgr;
mpExchangeMgr = apExchangeMgr;
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved
mpFabricTable = apFabricTable;
mpCASESessionMgr = apCASESessionMgr;
mpSubscriptionResumptionStorage = subscriptionResumptionStorage;

ReturnErrorOnFailure(mpFabricTable->AddFabricDelegate(this));
ReturnErrorOnFailure(mpExchangeMgr->RegisterUnsolicitedMessageHandlerForProtocol(Protocols::InteractionModel::Id, this));
Expand Down Expand Up @@ -664,6 +666,8 @@ Status InteractionModelEngine::OnUnsolicitedReportData(Messaging::ExchangeContex
return Status::Success;
}

ChipLogDetail(InteractionModel, "Received report with invalid subscriptionId %" PRIu32, subscriptionId);

return Status::InvalidSubscription;
}

Expand Down Expand Up @@ -1578,5 +1582,40 @@ void InteractionModelEngine::OnFabricRemoved(const FabricTable & fabricTable, Fa
// the fabric removal, though, so they will fail when they try to actually send their command response
// and will close at that point.
}

CHIP_ERROR InteractionModelEngine::ResumeSubscriptions()
{
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably error out if we have some read handlers allocated already, because that would mean we might create read handlers to represent stored subscriptions that are already represented by an allocated read handler.

And the declaration in the header should document that this is expected to be called during startup, after initing the IM engine?

ReturnErrorCodeIf(!mpSubscriptionResumptionStorage, CHIP_NO_ERROR);

SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo;
auto * iterator = mpSubscriptionResumptionStorage->IterateSubscriptions();
while (iterator->Next(subscriptionInfo))
{
auto requestedAttributePathCount = subscriptionInfo.mAttributePaths.AllocatedSize();
auto requestedEventPathCount = subscriptionInfo.mEventPaths.AllocatedSize();
if (!EnsureResourceForSubscription(subscriptionInfo.mFabricIndex, requestedAttributePathCount, requestedEventPathCount))
{
ChipLogProgress(InteractionModel, "no resource for Subscription resumption");
iterator->Release();
return CHIP_ERROR_NO_MEMORY;
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved
}

ReadHandler * handler = mReadHandlers.CreateObject(*this);
if (handler == nullptr)
{
ChipLogProgress(InteractionModel, "no resource for ReadHandler creation");
iterator->Release();
return CHIP_ERROR_NO_MEMORY;
}

handler->ResumeSubscription(*mpCASESessionMgr, subscriptionInfo);
}
iterator->Release();
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS

return CHIP_NO_ERROR;
}

} // namespace app
} // namespace chip
9 changes: 8 additions & 1 deletion src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
*
*/
CHIP_ERROR Init(Messaging::ExchangeManager * apExchangeMgr, FabricTable * apFabricTable,
CASESessionManager * apCASESessionMgr = nullptr);
CASESessionManager * apCASESessionMgr = nullptr,
SubscriptionResumptionStorage * subscriptionResumptionStorage = nullptr);

void Shutdown();

Expand Down Expand Up @@ -292,6 +293,10 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
// virtual method from FabricTable::Delegate
void OnFabricRemoved(const FabricTable & fabricTable, FabricIndex fabricIndex) override;

SubscriptionResumptionStorage * GetSubscriptionResumptionStorage() { return mpSubscriptionResumptionStorage; };

CHIP_ERROR ResumeSubscriptions();

#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
//
// Get direct access to the underlying read handler pool
Expand Down Expand Up @@ -596,6 +601,8 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,

CASESessionManager * mpCASESessionMgr = nullptr;

SubscriptionResumptionStorage * mpSubscriptionResumptionStorage = nullptr;

// A magic number for tracking values between stack Shutdown()-s and Init()-s.
// An ObjectHandle is valid iff. its magic equals to this one.
uint32_t mMagic = 0;
Expand Down
146 changes: 144 additions & 2 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ using Status = Protocols::InteractionModel::Status;
ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeContext * apExchangeContext,
InteractionType aInteractionType) :
mExchangeCtx(*this),
mManagementCallback(apCallback)
mManagementCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this),
mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
{
VerifyOrDie(apExchangeContext != nullptr);

Expand All @@ -61,6 +62,54 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon
mSessionHandle.Grab(mExchangeCtx->GetSessionHandle());
}

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
ReadHandler::ReadHandler(ManagementCallback & apCallback) :
mExchangeCtx(*this), mManagementCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this),
mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
{
mInteractionType = InteractionType::Subscribe;
mFlags.ClearAll();
}

void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager,
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo)
{
mSubscriptionId = subscriptionInfo.mSubscriptionId;
mMinIntervalFloorSeconds = subscriptionInfo.mMinInterval;
mMaxInterval = subscriptionInfo.mMaxInterval;
SetStateFlag(ReadHandlerFlags::FabricFiltered, subscriptionInfo.mFabricFiltered);

// Move dynamically allocated attributes and events from the SubscriptionInfo struct into
// the object pool managed by the IM engine
for (size_t i = 0; i < subscriptionInfo.mAttributePaths.AllocatedSize(); i++)
{
AttributePathParams attributePathParams = subscriptionInfo.mAttributePaths[i].GetParams();
CHIP_ERROR err =
InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, attributePathParams);
if (err != CHIP_NO_ERROR)
{
Close();
return;
}
}
for (size_t i = 0; i < subscriptionInfo.mEventPaths.AllocatedSize(); i++)
{
EventPathParams eventPathParams = subscriptionInfo.mEventPaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, eventPathParams);
if (err != CHIP_NO_ERROR)
{
Close();
return;
}
}

// Ask IM engine to start CASE session with subscriber
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Ask IM engine to start CASE session with subscriber
// Start a CASE session with subscriber so we can keep delivering reports.

We're not using the IM engine for this here, right?

ScopedNodeId peerNode = ScopedNodeId(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex);
caseSessionManager.FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback);
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved
}

#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS

ReadHandler::~ReadHandler()
{
auto * appCallback = mManagementCallback.GetAppCallback();
Expand All @@ -87,8 +136,18 @@ ReadHandler::~ReadHandler()
InteractionModelEngine::GetInstance()->ReleaseDataVersionFilterList(mpDataVersionFilterList);
}

void ReadHandler::Close()
void ReadHandler::Close(CloseOptions options)
{
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
if (options == CloseOptions::kDropPersistedSubscription)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably check that mInteractionType == InteractionType::Subscribe before using the otherwise-probably-not-meaningful mSubscriptionId.

{
auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage();
if (subscriptionResumptionStorage)
{
subscriptionResumptionStorage->Delete(GetInitiatorNodeId(), GetAccessingFabricIndex(), mSubscriptionId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetInitiatorNodeId() and GetAccessingFabricIndex() might not really be meaningful if GetSession() returns null (which for example it will if this gets closed from one of the error-path calls in ResumeSubscription.

I think we should not use those methods here if GetSession() is null. Separately, should the error paths in ResumeSubscription clear out the stored subscription?

}
}
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved
MoveToState(HandlerState::AwaitingDestruction);
mManagementCallback.OnDone(*this);
}
Expand Down Expand Up @@ -306,7 +365,12 @@ void ReadHandler::OnResponseTimeout(Messaging::ExchangeContext * apExchangeConte
{
ChipLogError(DataManagement, "Time out! failed to receive status response from Exchange: " ChipLogFormatExchange,
ChipLogValueExchange(apExchangeContext));
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
// TODO: Have a retry mechanism tied to wake interval for IC devices
Close(CloseOptions::kKeepPersistedSubscription);
#else
Close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the model here is that the stored thing is cleaned up if we hit one of the non-timeout error cases in ReadHandler, right?

It feels like we should also clean it up in InteractionModelEngine::OnReadInitialRequest when keepExistingSubscriptions is false and we ReleaseObject some read handlers to tear down those subscriptions, right?

But we don't want to clean it up in InteractionModelEngine::Shutdown when we drop all the ReadHandlers, because at that point we want to keep the state stored. We should probably document that.

#endif
}

CHIP_ERROR ReadHandler::ProcessReadRequest(System::PacketBufferHandle && aPayload)
Expand Down Expand Up @@ -652,9 +716,59 @@ CHIP_ERROR ReadHandler::ProcessSubscribeRequest(System::PacketBufferHandle && aP

mExchangeCtx->WillSendMessage();

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
PersistSubscription();
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS

return CHIP_NO_ERROR;
}

void ReadHandler::PersistSubscription()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method's definition and declaration should probably be conditional on CHIP_CONFIG_PERSIST_SUBSCRIPTIONS.

{
auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage();
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved
VerifyOrReturn(subscriptionResumptionStorage != nullptr);

SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo = { .mNodeId = GetInitiatorNodeId(),
.mFabricIndex = GetAccessingFabricIndex(),
.mSubscriptionId = mSubscriptionId,
.mMinInterval = mMinIntervalFloorSeconds,
.mMaxInterval = mMaxInterval,
.mFabricFiltered = IsFabricFiltered() };
ObjectList<AttributePathParams> * attributePath = mpAttributePathList;
size_t attributePathCount = 0;
while (attributePath)
{
attributePathCount++;
attributePath = attributePath->mpNext;
}
attributePath = mpAttributePathList;
subscriptionInfo.mAttributePaths.Calloc(attributePathCount);
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved
VerifyOrReturn(subscriptionInfo.mAttributePaths.Get() != nullptr);
for (size_t i = 0; i < attributePathCount; i++)
{
subscriptionInfo.mAttributePaths[i].SetValues(attributePath->mValue);
attributePath = attributePath->mpNext;
}

ObjectList<EventPathParams> * eventPath = mpEventPathList;
size_t eventPathCount = 0;
while (eventPath)
{
eventPathCount++;
eventPath = eventPath->mpNext;
}
eventPath = mpEventPathList;
subscriptionInfo.mEventPaths.Calloc(eventPathCount);
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved
VerifyOrReturn(subscriptionInfo.mEventPaths.Get() != nullptr);
for (size_t i = 0; i < eventPathCount; i++)
{
subscriptionInfo.mEventPaths[i].SetValues(eventPath->mValue);
eventPath = eventPath->mpNext;
}

subscriptionResumptionStorage->Save(subscriptionInfo);
}

void ReadHandler::OnUnblockHoldReportCallback(System::Layer * apSystemLayer, void * apAppState)
{
VerifyOrReturn(apAppState != nullptr);
Expand Down Expand Up @@ -767,5 +881,33 @@ void ReadHandler::ClearStateFlag(ReadHandlerFlags aFlag)
SetStateFlag(aFlag, false);
}

void ReadHandler::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These HandleDevice* should also be conditioned on CHIP_CONFIG_PERSIST_SUBSCRIPTIONS

const SessionHandle & sessionHandle)
{
ReadHandler * const _this = static_cast<ReadHandler *>(context);

_this->mSessionHandle.Grab(sessionHandle);

_this->MoveToState(HandlerState::GeneratingReports);

ObjectList<AttributePathParams> * attributePath = _this->mpAttributePathList;
while (attributePath)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue);
attributePath = attributePath->mpNext;
}
}

void ReadHandler::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR err)
{
ReadHandler * const _this = static_cast<ReadHandler *>(context);
VerifyOrDie(_this != nullptr);

// TODO: Have a retry mechanism tied to wake interval for IC devices
ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'",
err.Format());
_this->Close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is going to not actually erase out stored subscription, since we have no session. I assume that is in fact the behavior we want here, but we might want to document that... and maybe pass the "don't erase" flag explicitly if that's what we really want.

}

} // namespace app
} // namespace chip
Loading