From a3cf4961657fa81384bea01beb307134c860da9f Mon Sep 17 00:00:00 2001 From: Wang Qixiang <43193572+wqx6@users.noreply.github.com> Date: Fri, 5 Jan 2024 04:46:39 +0800 Subject: [PATCH] IM: Create ReadHandler after Session Establishment for Subscription Resumption (#30491) * IM: Create ReadHandler after Session Establishment for Subscription Resumption * Restyled by clang-format * Make SubscriptionResumptionHelper inherits from SubscriptionInfo * review changes * Rename Helper to SessionEstablisher * Restyled by clang-format * RAII changes * Restyled by clang-format --------- Co-authored-by: Restyled.io --- src/app/BUILD.gn | 7 + src/app/InteractionModelEngine.cpp | 35 ++--- src/app/InteractionModelEngine.h | 2 + src/app/ReadHandler.cpp | 93 +++++-------- src/app/ReadHandler.h | 34 ++--- ...bscriptionResumptionSessionEstablisher.cpp | 124 ++++++++++++++++++ ...SubscriptionResumptionSessionEstablisher.h | 55 ++++++++ 7 files changed, 252 insertions(+), 98 deletions(-) create mode 100644 src/app/SubscriptionResumptionSessionEstablisher.cpp create mode 100644 src/app/SubscriptionResumptionSessionEstablisher.h diff --git a/src/app/BUILD.gn b/src/app/BUILD.gn index a96dc222b2aac3..b8a408faae85bf 100644 --- a/src/app/BUILD.gn +++ b/src/app/BUILD.gn @@ -232,6 +232,13 @@ static_library("app") { ] } + if (chip_persist_subscriptions) { + sources += [ + "SubscriptionResumptionSessionEstablisher.cpp", + "SubscriptionResumptionSessionEstablisher.h", + ] + } + if (chip_enable_read_client) { sources += [ "BufferedReadCallback.cpp", diff --git a/src/app/InteractionModelEngine.cpp b/src/app/InteractionModelEngine.cpp index d2c30362944cef..90cc759f69fa14 100644 --- a/src/app/InteractionModelEngine.cpp +++ b/src/app/InteractionModelEngine.cpp @@ -41,6 +41,18 @@ namespace chip { namespace app { +class AutoReleaseSubscriptionInfoIterator +{ +public: + AutoReleaseSubscriptionInfoIterator(SubscriptionResumptionStorage::SubscriptionInfoIterator * iterator) : mIterator(iterator){}; + ~AutoReleaseSubscriptionInfoIterator() { mIterator->Release(); } + + SubscriptionResumptionStorage::SubscriptionInfoIterator * operator->() const { return mIterator; } + +private: + SubscriptionResumptionStorage::SubscriptionInfoIterator * mIterator; +}; + using Protocols::InteractionModel::Status; Global sInteractionModelEngine; @@ -1872,7 +1884,7 @@ void InteractionModelEngine::ResumeSubscriptionsTimerCallback(System::Layer * ap bool resumedSubscriptions = false; #endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo; - auto * iterator = imEngine->mpSubscriptionResumptionStorage->IterateSubscriptions(); + AutoReleaseSubscriptionInfoIterator iterator(imEngine->mpSubscriptionResumptionStorage->IterateSubscriptions()); while (iterator->Next(subscriptionInfo)) { // If subscription happens between reboot and this timer callback, it's already live and should skip resumption @@ -1890,31 +1902,24 @@ void InteractionModelEngine::ResumeSubscriptionsTimerCallback(System::Layer * ap continue; } - auto requestedAttributePathCount = subscriptionInfo.mAttributePaths.AllocatedSize(); - auto requestedEventPathCount = subscriptionInfo.mEventPaths.AllocatedSize(); - if (!imEngine->EnsureResourceForSubscription(subscriptionInfo.mFabricIndex, requestedAttributePathCount, - requestedEventPathCount)) + auto subscriptionResumptionSessionEstablisher = Platform::MakeUnique(); + if (subscriptionResumptionSessionEstablisher == nullptr) { - ChipLogProgress(InteractionModel, "no resource for Subscription resumption"); - iterator->Release(); + ChipLogProgress(InteractionModel, "Failed to create SubscriptionResumptionSessionEstablisher"); return; } - ReadHandler * handler = imEngine->mReadHandlers.CreateObject(*imEngine, imEngine->GetReportScheduler()); - if (handler == nullptr) + if (subscriptionResumptionSessionEstablisher->ResumeSubscription(*imEngine->mpCASESessionMgr, subscriptionInfo) != + CHIP_NO_ERROR) { - ChipLogProgress(InteractionModel, "no resource for ReadHandler creation"); - iterator->Release(); + ChipLogProgress(InteractionModel, "Failed to ResumeSubscription 0x%" PRIx32, subscriptionInfo.mSubscriptionId); return; } - - ChipLogProgress(InteractionModel, "Resuming subscriptionId %" PRIu32, subscriptionInfo.mSubscriptionId); - handler->ResumeSubscription(*imEngine->mpCASESessionMgr, subscriptionInfo); + subscriptionResumptionSessionEstablisher.release(); #if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION resumedSubscriptions = true; #endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION } - iterator->Release(); #if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION // If no persisted subscriptions needed resumption then all resumption retries are done diff --git a/src/app/InteractionModelEngine.h b/src/app/InteractionModelEngine.h index 521de9b8a3ac78..ba09aec63fd3d5 100644 --- a/src/app/InteractionModelEngine.h +++ b/src/app/InteractionModelEngine.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -380,6 +381,7 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler, friend class reporting::Engine; friend class TestCommandInteraction; friend class TestInteractionModelEngine; + friend class SubscriptionResumptionSessionEstablisher; using Status = Protocols::InteractionModel::Status; void OnDone(CommandHandler & apCommandObj) override; diff --git a/src/app/ReadHandler.cpp b/src/app/ReadHandler.cpp index 411756ab22ff9c..c0cfbe4a829e76 100644 --- a/src/app/ReadHandler.cpp +++ b/src/app/ReadHandler.cpp @@ -56,10 +56,6 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon InteractionType aInteractionType, Observer * observer) : mExchangeCtx(*this), mManagementCallback(apCallback) -#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS - , - mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this) -#endif { VerifyOrDie(apExchangeContext != nullptr); @@ -84,8 +80,7 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon #if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS ReadHandler::ReadHandler(ManagementCallback & apCallback, Observer * observer) : - mExchangeCtx(*this), mManagementCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this), - mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this) + mExchangeCtx(*this), mManagementCallback(apCallback) { mInteractionType = InteractionType::Subscribe; mFlags.ClearAll(); @@ -94,31 +89,30 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Observer * observer) : mObserver = observer; } -void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager, - SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo) +void ReadHandler::OnSubscriptionResumed(const SessionHandle & sessionHandle, + SubscriptionResumptionSessionEstablisher & resumptionSessionEstablisher) { - mSubscriptionId = subscriptionInfo.mSubscriptionId; - mMinIntervalFloorSeconds = subscriptionInfo.mMinInterval; - mMaxInterval = subscriptionInfo.mMaxInterval; - SetStateFlag(ReadHandlerFlags::FabricFiltered, subscriptionInfo.mFabricFiltered); + mSubscriptionId = resumptionSessionEstablisher.mSubscriptionInfo.mSubscriptionId; + mMinIntervalFloorSeconds = resumptionSessionEstablisher.mSubscriptionInfo.mMinInterval; + mMaxInterval = resumptionSessionEstablisher.mSubscriptionInfo.mMaxInterval; + SetStateFlag(ReadHandlerFlags::FabricFiltered, resumptionSessionEstablisher.mSubscriptionInfo.mFabricFiltered); // Move dynamically allocated attributes and events from the SubscriptionInfo struct into // the object pool managed by the IM engine - for (size_t i = 0; i < subscriptionInfo.mAttributePaths.AllocatedSize(); i++) + for (size_t i = 0; i < resumptionSessionEstablisher.mSubscriptionInfo.mAttributePaths.AllocatedSize(); i++) { - AttributePathParams attributePathParams = subscriptionInfo.mAttributePaths[i].GetParams(); - CHIP_ERROR err = - InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, attributePathParams); + AttributePathParams params = resumptionSessionEstablisher.mSubscriptionInfo.mAttributePaths[i].GetParams(); + CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, params); if (err != CHIP_NO_ERROR) { Close(); return; } } - for (size_t i = 0; i < subscriptionInfo.mEventPaths.AllocatedSize(); i++) + for (size_t i = 0; i < resumptionSessionEstablisher.mSubscriptionInfo.mEventPaths.AllocatedSize(); i++) { - EventPathParams eventPathParams = subscriptionInfo.mEventPaths[i].GetParams(); - CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, eventPathParams); + EventPathParams params = resumptionSessionEstablisher.mSubscriptionInfo.mEventPaths[i].GetParams(); + CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, params); if (err != CHIP_NO_ERROR) { Close(); @@ -126,9 +120,26 @@ void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager, } } - // Ask IM engine to start CASE session with subscriber - ScopedNodeId peerNode = ScopedNodeId(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex); - caseSessionManager.FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback); + mSessionHandle.Grab(sessionHandle); + + SetStateFlag(ReadHandlerFlags::ActiveSubscription); + + auto * appCallback = mManagementCallback.GetAppCallback(); + if (appCallback) + { + appCallback->OnSubscriptionEstablished(*this); + } + // Notify the observer that a subscription has been resumed + mObserver->OnSubscriptionEstablished(this); + + MoveToState(HandlerState::CanStartReporting); + + ObjectList * attributePath = mpAttributePathList; + while (attributePath) + { + InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue); + attributePath = attributePath->mpNext; + } } #endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS @@ -893,43 +904,5 @@ void ReadHandler::ClearStateFlag(ReadHandlerFlags aFlag) SetStateFlag(aFlag, false); } -void ReadHandler::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr, - const SessionHandle & sessionHandle) -{ - ReadHandler * const _this = static_cast(context); - - _this->mSessionHandle.Grab(sessionHandle); - - _this->SetStateFlag(ReadHandlerFlags::ActiveSubscription); - - auto * appCallback = _this->mManagementCallback.GetAppCallback(); - if (appCallback) - { - appCallback->OnSubscriptionEstablished(*_this); - } - // Notify the observer that a subscription has been resumed - _this->mObserver->OnSubscriptionEstablished(_this); - - _this->MoveToState(HandlerState::CanStartReporting); - - ObjectList * attributePath = _this->mpAttributePathList; - while (attributePath) - { - InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue); - attributePath = attributePath->mpNext; - } -} - -void ReadHandler::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR err) -{ - ReadHandler * const _this = static_cast(context); - VerifyOrDie(_this != nullptr); - - // TODO: Have a retry mechanism tied to wake interval for IC devices - ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'", - err.Format()); - _this->Close(); -} - } // namespace app } // namespace chip diff --git a/src/app/ReadHandler.h b/src/app/ReadHandler.h index 4c9ad469c782eb..28097868c2f79a 100644 --- a/src/app/ReadHandler.h +++ b/src/app/ReadHandler.h @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -253,6 +254,16 @@ class ReadHandler : public Messaging::ExchangeDelegate return CHIP_NO_ERROR; } +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + /** + * + * @brief Initialize a ReadHandler for a resumed subsciption + * + * Used after the SubscriptionResumptionSessionEstablisher establishs the CASE session + */ + void OnSubscriptionResumed(const SessionHandle & sessionHandle, SubscriptionResumptionSessionEstablisher & sessionEstablisher); +#endif + private: PriorityLevel GetCurrentPriority() const { return mCurrentPriority; } EventNumber & GetEventMin() { return mEventMin; } @@ -302,18 +313,6 @@ class ReadHandler : public Messaging::ExchangeDelegate */ void OnInitialRequest(System::PacketBufferHandle && aPayload); -#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS - /** - * - * @brief Resume a persisted subscription - * - * Used after ReadHandler(ManagementCallback & apCallback). This will start a CASE session - * with the subscriber if one doesn't already exist, and send full priming report when connected. - */ - void ResumeSubscription(CASESessionManager & caseSessionManager, - SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo); -#endif - /** * Send ReportData to initiator * @@ -485,11 +484,6 @@ class ReadHandler : public Messaging::ExchangeDelegate /// @param aFlag Flag to clear void ClearStateFlag(ReadHandlerFlags aFlag); - // Helpers for continuing the subscription resumption - static void HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr, - const SessionHandle & sessionHandle); - static void HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error); - AttributePathExpandIterator mAttributePathExpandIterator = AttributePathExpandIterator(nullptr); // The current generation of the reporting engine dirty set the last time we were notified that a path we're interested in was @@ -571,12 +565,6 @@ class ReadHandler : public Messaging::ExchangeDelegate // TODO (#27675): Merge all observers into one and that one will dispatch the callbacks to the right place. Observer * mObserver = nullptr; - -#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS - // Callbacks to handle server-initiated session success/failure - chip::Callback::Callback mOnConnectedCallback; - chip::Callback::Callback mOnConnectionFailureCallback; -#endif }; } // namespace app } // namespace chip diff --git a/src/app/SubscriptionResumptionSessionEstablisher.cpp b/src/app/SubscriptionResumptionSessionEstablisher.cpp new file mode 100644 index 00000000000000..f0d53566ad6b28 --- /dev/null +++ b/src/app/SubscriptionResumptionSessionEstablisher.cpp @@ -0,0 +1,124 @@ +/* + * + * Copyright (c) 2023 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace chip { +namespace app { + +class AutoDeleteEstablisher +{ +public: + AutoDeleteEstablisher(SubscriptionResumptionSessionEstablisher * sessionEstablisher) : mSessionEstablisher(sessionEstablisher) + {} + ~AutoDeleteEstablisher() { chip::Platform::Delete(mSessionEstablisher); } + + SubscriptionResumptionSessionEstablisher * operator->() const { return mSessionEstablisher; } + + SubscriptionResumptionSessionEstablisher & operator*() const { return *mSessionEstablisher; } + +private: + SubscriptionResumptionSessionEstablisher * mSessionEstablisher; +}; + +SubscriptionResumptionSessionEstablisher::SubscriptionResumptionSessionEstablisher() : + mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this) +{} + +CHIP_ERROR +SubscriptionResumptionSessionEstablisher::ResumeSubscription( + CASESessionManager & caseSessionManager, const SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo) +{ + mSubscriptionInfo.mNodeId = subscriptionInfo.mNodeId; + mSubscriptionInfo.mFabricIndex = subscriptionInfo.mFabricIndex; + mSubscriptionInfo.mSubscriptionId = subscriptionInfo.mSubscriptionId; + mSubscriptionInfo.mMinInterval = subscriptionInfo.mMinInterval; + mSubscriptionInfo.mMaxInterval = subscriptionInfo.mMaxInterval; + mSubscriptionInfo.mFabricFiltered = subscriptionInfo.mFabricFiltered; + // Copy the Attribute Paths and Event Paths + if (subscriptionInfo.mAttributePaths.AllocatedSize() > 0) + { + mSubscriptionInfo.mAttributePaths.Alloc(subscriptionInfo.mAttributePaths.AllocatedSize()); + if (!mSubscriptionInfo.mAttributePaths.Get()) + { + return CHIP_ERROR_NO_MEMORY; + } + for (size_t i = 0; i < mSubscriptionInfo.mAttributePaths.AllocatedSize(); ++i) + { + mSubscriptionInfo.mAttributePaths[i] = subscriptionInfo.mAttributePaths[i]; + } + } + if (subscriptionInfo.mEventPaths.AllocatedSize() > 0) + { + mSubscriptionInfo.mEventPaths.Alloc(subscriptionInfo.mEventPaths.AllocatedSize()); + if (!mSubscriptionInfo.mEventPaths.Get()) + { + return CHIP_ERROR_NO_MEMORY; + } + for (size_t i = 0; i < mSubscriptionInfo.mEventPaths.AllocatedSize(); ++i) + { + mSubscriptionInfo.mEventPaths[i] = subscriptionInfo.mEventPaths[i]; + } + } + + ScopedNodeId peerNode = ScopedNodeId(mSubscriptionInfo.mNodeId, mSubscriptionInfo.mFabricIndex); + caseSessionManager.FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback); + return CHIP_NO_ERROR; +} + +void SubscriptionResumptionSessionEstablisher::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr, + const SessionHandle & sessionHandle) +{ + AutoDeleteEstablisher establisher(static_cast(context)); + SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo = establisher->mSubscriptionInfo; + InteractionModelEngine * imEngine = InteractionModelEngine::GetInstance(); + if (!imEngine->EnsureResourceForSubscription(subscriptionInfo.mFabricIndex, subscriptionInfo.mAttributePaths.AllocatedSize(), + subscriptionInfo.mEventPaths.AllocatedSize())) + { + ChipLogProgress(InteractionModel, "no resource for subscription resumption"); + return; + } + ReadHandler * readHandler = imEngine->mReadHandlers.CreateObject(*imEngine, imEngine->GetReportScheduler()); + if (readHandler == nullptr) + { + ChipLogProgress(InteractionModel, "no resource for ReadHandler creation"); + return; + } + readHandler->OnSubscriptionResumed(sessionHandle, *establisher); +} + +void SubscriptionResumptionSessionEstablisher::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, + CHIP_ERROR error) +{ + AutoDeleteEstablisher establisher(static_cast(context)); + SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo = establisher->mSubscriptionInfo; + ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'", + error.Format()); + // If the device fails to establish the session, the subscriber might be offline and its subscription read client will + // be deleted when the device reconnect to the subscriber. This subscription will be never used again. So clean up + // the persistent subscription information storage. + auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage(); + if (subscriptionResumptionStorage) + { + subscriptionResumptionStorage->Delete(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex, + subscriptionInfo.mSubscriptionId); + } +} + +} // namespace app +} // namespace chip diff --git a/src/app/SubscriptionResumptionSessionEstablisher.h b/src/app/SubscriptionResumptionSessionEstablisher.h new file mode 100644 index 00000000000000..a66c403c1c11ab --- /dev/null +++ b/src/app/SubscriptionResumptionSessionEstablisher.h @@ -0,0 +1,55 @@ +/* + * + * Copyright (c) 2023 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace chip { +namespace app { + +/** + * Session Establisher to resume persistent subscription. A CASE session will be established upon invoking + * ResumeSubscription(), followed by the creation and intialization of a ReadHandler. This class helps prevent + * a scenario where all ReadHandlers in the pool grab the invalid session handle. In such scenario, if the device + * receives a new subscription request, it will crash as there is no evictable ReadHandler. + */ + +class SubscriptionResumptionSessionEstablisher +{ +public: + SubscriptionResumptionSessionEstablisher(); + + CHIP_ERROR ResumeSubscription(CASESessionManager & caseSessionManager, + const SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo); + + SubscriptionResumptionStorage::SubscriptionInfo mSubscriptionInfo; + +private: + // Callback funstions for continuing the subscription resumption + static void HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr, + const SessionHandle & sessionHandle); + static void HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error); + + // Callbacks to handle server-initiated session success/failure + chip::Callback::Callback mOnConnectedCallback; + chip::Callback::Callback mOnConnectionFailureCallback; +}; +} // namespace app +} // namespace chip