diff --git a/src/app/BUILD.gn b/src/app/BUILD.gn index 71f6dac3921262..16226ae0622446 100644 --- a/src/app/BUILD.gn +++ b/src/app/BUILD.gn @@ -199,6 +199,13 @@ static_library("app") { ] } + if (chip_persist_subscriptions) { + sources += [ + "SubscriptionResumptionSessionEstablisher.cpp", + "SubscriptionResumptionSessionEstablisher.h", + ] + } + public_deps = [ ":app_config", "${chip_root}/src/access", diff --git a/src/app/InteractionModelEngine.cpp b/src/app/InteractionModelEngine.cpp index f25512422faa41..10e44eb9fb33a8 100644 --- a/src/app/InteractionModelEngine.cpp +++ b/src/app/InteractionModelEngine.cpp @@ -38,6 +38,18 @@ extern bool emberAfContainsAttribute(chip::EndpointId endpoint, chip::ClusterId namespace chip { namespace app { +class AutoReleaseSubscriptionInfoIterator +{ +public: + AutoReleaseSubscriptionInfoIterator(SubscriptionResumptionStorage::SubscriptionInfoIterator * iterator) : mIterator(iterator){}; + ~AutoReleaseSubscriptionInfoIterator() { mIterator->Release(); } + + SubscriptionResumptionStorage::SubscriptionInfoIterator * operator->() const { return mIterator; } + +private: + SubscriptionResumptionStorage::SubscriptionInfoIterator * mIterator; +}; + using Protocols::InteractionModel::Status; InteractionModelEngine sInteractionModelEngine; @@ -1649,7 +1661,7 @@ void InteractionModelEngine::ResumeSubscriptionsTimerCallback(System::Layer * ap VerifyOrReturn(apAppState != nullptr); InteractionModelEngine * imEngine = static_cast(apAppState); SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo; - auto * iterator = imEngine->mpSubscriptionResumptionStorage->IterateSubscriptions(); + AutoReleaseSubscriptionInfoIterator iterator(imEngine->mpSubscriptionResumptionStorage->IterateSubscriptions()); while (iterator->Next(subscriptionInfo)) { // If subscription happens between reboot and this timer callback, it's already live and should skip resumption @@ -1667,28 +1679,21 @@ void InteractionModelEngine::ResumeSubscriptionsTimerCallback(System::Layer * ap continue; } - auto requestedAttributePathCount = subscriptionInfo.mAttributePaths.AllocatedSize(); - auto requestedEventPathCount = subscriptionInfo.mEventPaths.AllocatedSize(); - if (!imEngine->EnsureResourceForSubscription(subscriptionInfo.mFabricIndex, requestedAttributePathCount, - requestedEventPathCount)) + auto subscriptionResumptionSessionEstablisher = Platform::MakeUnique(); + if (subscriptionResumptionSessionEstablisher == nullptr) { - ChipLogProgress(InteractionModel, "no resource for Subscription resumption"); - iterator->Release(); + ChipLogProgress(InteractionModel, "Failed to create SubscriptionResumptionSessionEstablisher"); return; } - ReadHandler * handler = imEngine->mReadHandlers.CreateObject(*imEngine); - if (handler == nullptr) + if (subscriptionResumptionSessionEstablisher->ResumeSubscription(*imEngine->mpCASESessionMgr, subscriptionInfo) != + CHIP_NO_ERROR) { - ChipLogProgress(InteractionModel, "no resource for ReadHandler creation"); - iterator->Release(); + ChipLogProgress(InteractionModel, "Failed to ResumeSubscription 0x%" PRIx32, subscriptionInfo.mSubscriptionId); return; } - - ChipLogProgress(InteractionModel, "Resuming subscriptionId %" PRIu32, subscriptionInfo.mSubscriptionId); - handler->ResumeSubscription(*imEngine->mpCASESessionMgr, subscriptionInfo); + subscriptionResumptionSessionEstablisher.release(); } - iterator->Release(); #endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS } diff --git a/src/app/InteractionModelEngine.h b/src/app/InteractionModelEngine.h index 5401511899e0d5..e30091b6d4b113 100644 --- a/src/app/InteractionModelEngine.h +++ b/src/app/InteractionModelEngine.h @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -365,6 +366,7 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler, private: friend class reporting::Engine; friend class TestCommandInteraction; + friend class SubscriptionResumptionSessionEstablisher; using Status = Protocols::InteractionModel::Status; void OnDone(CommandHandler & apCommandObj) override; diff --git a/src/app/ReadHandler.cpp b/src/app/ReadHandler.cpp index ef594e3badd163..4cb7bbe01006f2 100644 --- a/src/app/ReadHandler.cpp +++ b/src/app/ReadHandler.cpp @@ -42,10 +42,6 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon InteractionType aInteractionType) : mExchangeCtx(*this), mManagementCallback(apCallback) -#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS - , - mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this) -#endif { VerifyOrDie(apExchangeContext != nullptr); @@ -67,38 +63,36 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon #if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS ReadHandler::ReadHandler(ManagementCallback & apCallback) : - mExchangeCtx(*this), mManagementCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this), - mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this) + mExchangeCtx(*this), mManagementCallback(apCallback) { mInteractionType = InteractionType::Subscribe; mFlags.ClearAll(); } -void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager, - SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo) +void ReadHandler::OnSubscriptionResumed(const SessionHandle & sessionHandle, + SubscriptionResumptionSessionEstablisher & resumptionSessionEstablisher) { - mSubscriptionId = subscriptionInfo.mSubscriptionId; - mMinIntervalFloorSeconds = subscriptionInfo.mMinInterval; - mMaxInterval = subscriptionInfo.mMaxInterval; - SetStateFlag(ReadHandlerFlags::FabricFiltered, subscriptionInfo.mFabricFiltered); + mSubscriptionId = resumptionSessionEstablisher.mSubscriptionInfo.mSubscriptionId; + mMinIntervalFloorSeconds = resumptionSessionEstablisher.mSubscriptionInfo.mMinInterval; + mMaxInterval = resumptionSessionEstablisher.mSubscriptionInfo.mMaxInterval; + SetStateFlag(ReadHandlerFlags::FabricFiltered, resumptionSessionEstablisher.mSubscriptionInfo.mFabricFiltered); // Move dynamically allocated attributes and events from the SubscriptionInfo struct into // the object pool managed by the IM engine - for (size_t i = 0; i < subscriptionInfo.mAttributePaths.AllocatedSize(); i++) + for (size_t i = 0; i < resumptionSessionEstablisher.mSubscriptionInfo.mAttributePaths.AllocatedSize(); i++) { - AttributePathParams attributePathParams = subscriptionInfo.mAttributePaths[i].GetParams(); - CHIP_ERROR err = - InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, attributePathParams); + AttributePathParams params = resumptionSessionEstablisher.mSubscriptionInfo.mAttributePaths[i].GetParams(); + CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, params); if (err != CHIP_NO_ERROR) { Close(); return; } } - for (size_t i = 0; i < subscriptionInfo.mEventPaths.AllocatedSize(); i++) + for (size_t i = 0; i < resumptionSessionEstablisher.mSubscriptionInfo.mEventPaths.AllocatedSize(); i++) { - EventPathParams eventPathParams = subscriptionInfo.mEventPaths[i].GetParams(); - CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, eventPathParams); + EventPathParams params = resumptionSessionEstablisher.mSubscriptionInfo.mEventPaths[i].GetParams(); + CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, params); if (err != CHIP_NO_ERROR) { Close(); @@ -106,9 +100,16 @@ void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager, } } - // Ask IM engine to start CASE session with subscriber - ScopedNodeId peerNode = ScopedNodeId(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex); - caseSessionManager.FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback); + mSessionHandle.Grab(sessionHandle); + + MoveToState(HandlerState::GeneratingReports); + + ObjectList * attributePath = mpAttributePathList; + while (attributePath) + { + InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue); + attributePath = attributePath->mpNext; + } } #endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS @@ -866,33 +867,5 @@ void ReadHandler::ClearStateFlag(ReadHandlerFlags aFlag) SetStateFlag(aFlag, false); } -void ReadHandler::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr, - const SessionHandle & sessionHandle) -{ - ReadHandler * const _this = static_cast(context); - - _this->mSessionHandle.Grab(sessionHandle); - - _this->MoveToState(HandlerState::GeneratingReports); - - ObjectList * attributePath = _this->mpAttributePathList; - while (attributePath) - { - InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue); - attributePath = attributePath->mpNext; - } -} - -void ReadHandler::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR err) -{ - ReadHandler * const _this = static_cast(context); - VerifyOrDie(_this != nullptr); - - // TODO: Have a retry mechanism tied to wake interval for IC devices - ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'", - err.Format()); - _this->Close(); -} - } // namespace app } // namespace chip diff --git a/src/app/ReadHandler.h b/src/app/ReadHandler.h index a933aa76e51389..4f4a9aad0ee002 100644 --- a/src/app/ReadHandler.h +++ b/src/app/ReadHandler.h @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -206,6 +207,16 @@ class ReadHandler : public Messaging::ExchangeDelegate return CHIP_NO_ERROR; } +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + /** + * + * @brief Initialize a ReadHandler for a resumed subsciption + * + * Used after the SubscriptionResumptionSessionEstablisher establishs the CASE session + */ + void OnSubscriptionResumed(const SessionHandle & sessionHandle, SubscriptionResumptionSessionEstablisher & sessionEstablisher); +#endif + private: PriorityLevel GetCurrentPriority() const { return mCurrentPriority; } EventNumber & GetEventMin() { return mEventMin; } @@ -258,18 +269,6 @@ class ReadHandler : public Messaging::ExchangeDelegate */ void OnInitialRequest(System::PacketBufferHandle && aPayload); -#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS - /** - * - * @brief Resume a persisted subscription - * - * Used after ReadHandler(ManagementCallback & apCallback). This will start a CASE session - * with the subscriber if one doesn't already exist, and send full priming report when connected. - */ - void ResumeSubscription(CASESessionManager & caseSessionManager, - SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo); -#endif - /** * Send ReportData to initiator * @@ -420,11 +419,6 @@ class ReadHandler : public Messaging::ExchangeDelegate void SetStateFlag(ReadHandlerFlags aFlag, bool aValue = true); void ClearStateFlag(ReadHandlerFlags aFlag); - // Helpers for continuing the subscription resumption - static void HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr, - const SessionHandle & sessionHandle); - static void HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error); - AttributePathExpandIterator mAttributePathExpandIterator = AttributePathExpandIterator(nullptr); // The current generation of the reporting engine dirty set the last time we were notified that a path we're interested in was @@ -503,12 +497,6 @@ class ReadHandler : public Messaging::ExchangeDelegate PriorityLevel mCurrentPriority = PriorityLevel::Invalid; BitFlags mFlags; InteractionType mInteractionType = InteractionType::Read; - -#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS - // Callbacks to handle server-initiated session success/failure - chip::Callback::Callback mOnConnectedCallback; - chip::Callback::Callback mOnConnectionFailureCallback; -#endif }; } // namespace app } // namespace chip diff --git a/src/app/SubscriptionResumptionSessionEstablisher.cpp b/src/app/SubscriptionResumptionSessionEstablisher.cpp new file mode 100644 index 00000000000000..ac9346dfe33bbc --- /dev/null +++ b/src/app/SubscriptionResumptionSessionEstablisher.cpp @@ -0,0 +1,124 @@ +/* + * + * Copyright (c) 2023 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace chip { +namespace app { + +class AutoDeleteEstablisher +{ +public: + AutoDeleteEstablisher(SubscriptionResumptionSessionEstablisher * sessionEstablisher) : mSessionEstablisher(sessionEstablisher) + {} + ~AutoDeleteEstablisher() { chip::Platform::Delete(mSessionEstablisher); } + + SubscriptionResumptionSessionEstablisher * operator->() const { return mSessionEstablisher; } + + SubscriptionResumptionSessionEstablisher & operator*() const { return *mSessionEstablisher; } + +private: + SubscriptionResumptionSessionEstablisher * mSessionEstablisher; +}; + +SubscriptionResumptionSessionEstablisher::SubscriptionResumptionSessionEstablisher() : + mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this) +{} + +CHIP_ERROR +SubscriptionResumptionSessionEstablisher::ResumeSubscription( + CASESessionManager & caseSessionManager, const SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo) +{ + mSubscriptionInfo.mNodeId = subscriptionInfo.mNodeId; + mSubscriptionInfo.mFabricIndex = subscriptionInfo.mFabricIndex; + mSubscriptionInfo.mSubscriptionId = subscriptionInfo.mSubscriptionId; + mSubscriptionInfo.mMinInterval = subscriptionInfo.mMinInterval; + mSubscriptionInfo.mMaxInterval = subscriptionInfo.mMaxInterval; + mSubscriptionInfo.mFabricFiltered = subscriptionInfo.mFabricFiltered; + // Copy the Attribute Paths and Event Paths + if (subscriptionInfo.mAttributePaths.AllocatedSize() > 0) + { + mSubscriptionInfo.mAttributePaths.Alloc(subscriptionInfo.mAttributePaths.AllocatedSize()); + if (!mSubscriptionInfo.mAttributePaths.Get()) + { + return CHIP_ERROR_NO_MEMORY; + } + for (size_t i = 0; i < mSubscriptionInfo.mAttributePaths.AllocatedSize(); ++i) + { + mSubscriptionInfo.mAttributePaths[i] = subscriptionInfo.mAttributePaths[i]; + } + } + if (subscriptionInfo.mEventPaths.AllocatedSize() > 0) + { + mSubscriptionInfo.mEventPaths.Alloc(subscriptionInfo.mEventPaths.AllocatedSize()); + if (!mSubscriptionInfo.mEventPaths.Get()) + { + return CHIP_ERROR_NO_MEMORY; + } + for (size_t i = 0; i < mSubscriptionInfo.mEventPaths.AllocatedSize(); ++i) + { + mSubscriptionInfo.mEventPaths[i] = subscriptionInfo.mEventPaths[i]; + } + } + + ScopedNodeId peerNode = ScopedNodeId(mSubscriptionInfo.mNodeId, mSubscriptionInfo.mFabricIndex); + caseSessionManager.FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback); + return CHIP_NO_ERROR; +} + +void SubscriptionResumptionSessionEstablisher::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr, + const SessionHandle & sessionHandle) +{ + AutoDeleteEstablisher establisher(static_cast(context)); + SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo = establisher->mSubscriptionInfo; + InteractionModelEngine * imEngine = InteractionModelEngine::GetInstance(); + if (!imEngine->EnsureResourceForSubscription(subscriptionInfo.mFabricIndex, subscriptionInfo.mAttributePaths.AllocatedSize(), + subscriptionInfo.mEventPaths.AllocatedSize())) + { + ChipLogProgress(InteractionModel, "no resource for subscription resumption"); + return; + } + ReadHandler * readHandler = imEngine->mReadHandlers.CreateObject(*imEngine); + if (readHandler == nullptr) + { + ChipLogProgress(InteractionModel, "no resource for ReadHandler creation"); + return; + } + readHandler->OnSubscriptionResumed(sessionHandle, *establisher); +} + +void SubscriptionResumptionSessionEstablisher::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, + CHIP_ERROR error) +{ + AutoDeleteEstablisher establisher(static_cast(context)); + SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo = establisher->mSubscriptionInfo; + ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'", + error.Format()); + // If the device fails to establish the session, the subscriber might be offline and its subscription read client will + // be deleted when the device reconnect to the subscriber. This subscription will be never used again. So clean up + // the persistent subscription information storage. + auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage(); + if (subscriptionResumptionStorage) + { + subscriptionResumptionStorage->Delete(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex, + subscriptionInfo.mSubscriptionId); + } +} + +} // namespace app +} // namespace chip diff --git a/src/app/SubscriptionResumptionSessionEstablisher.h b/src/app/SubscriptionResumptionSessionEstablisher.h new file mode 100644 index 00000000000000..a66c403c1c11ab --- /dev/null +++ b/src/app/SubscriptionResumptionSessionEstablisher.h @@ -0,0 +1,55 @@ +/* + * + * Copyright (c) 2023 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace chip { +namespace app { + +/** + * Session Establisher to resume persistent subscription. A CASE session will be established upon invoking + * ResumeSubscription(), followed by the creation and intialization of a ReadHandler. This class helps prevent + * a scenario where all ReadHandlers in the pool grab the invalid session handle. In such scenario, if the device + * receives a new subscription request, it will crash as there is no evictable ReadHandler. + */ + +class SubscriptionResumptionSessionEstablisher +{ +public: + SubscriptionResumptionSessionEstablisher(); + + CHIP_ERROR ResumeSubscription(CASESessionManager & caseSessionManager, + const SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo); + + SubscriptionResumptionStorage::SubscriptionInfo mSubscriptionInfo; + +private: + // Callback funstions for continuing the subscription resumption + static void HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr, + const SessionHandle & sessionHandle); + static void HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error); + + // Callbacks to handle server-initiated session success/failure + chip::Callback::Callback mOnConnectedCallback; + chip::Callback::Callback mOnConnectionFailureCallback; +}; +} // namespace app +} // namespace chip