From 37919a1326373694c4b6b5a36481f4d96a02a1c2 Mon Sep 17 00:00:00 2001 From: Jeff Tung <100387939+jtung-apple@users.noreply.github.com> Date: Fri, 20 Jan 2023 17:58:51 -0800 Subject: [PATCH] [ICD] Server side subscription persistence and resumption (#24361) * [ICD] Server side subscription persistence and resumption * restyled change * Correct SimpleSubscriptionResumptionStorage TLV format documentation * Fix ReadHandler resumption * Correct state size estimate * Replace %zu in log format * Move TLV buffer off stack * restyled * Replaced vector with ScopedMemoryBufferWithSize and shim structs * Fix struct member order * Fix one more struct member order * Fixed more stack buffer * Fix copy/paste bug * Moved SubscriptionList array to unique_ptr and dynamically allocated / off stack * Moved SubscriptionIndex array to unique_ptr and dynamically allocated / off stack * Fixed error condition checks * Fixed array size check * Addressed CI issues, and disabled subscription persistence and resumption for cc13x2_26x2 and CYW30739 * Addressed PR review comments, including: - ReadHandler constructor side effect moved to separate function - SubscriptionList and SubscriptionIndex member initialization moved to runtome - Improved error handling - remove stored info on error - Changed for loop indices to more descriptive names - Disabled feature by default except Linux and Darwin for CI testing - Added operator[] getter with index to access SubscriptionList and SubscriptionIndex elements * Restyled and ReadHandler include * ReadHandler Callback fix * Fix ReadHandler Callback const argument * Explicitly disable subscription persistence and address review comments * Fixed priming reports on resumption * Revamp subscription storage into flat structure and add unit test * Fix unit test build warning and minor PR comment change * Update src/app/SubscriptionResumptionStorage.h Co-authored-by: Michael Sandstedt * Minor changes to address PR comments * Address PR review comments: Unit test structs constructed explicitly in place for clarity IM engine ResumeSubscriptions nullptr check and exit conditions fix * Address PR comments: Nullptr checks Minor refactor Unit test fix * Changed storage MaxCount mechanics to Init time clean up * Clean up comments and unused commented-out old code * Addressed PR comments: Removed AllocatedCount, and made AllocatedSize return count of elements * Update src/app/SimpleSubscriptionResumptionStorage.cpp Co-authored-by: Michael Sandstedt * Update src/app/InteractionModelEngine.cpp Co-authored-by: Michael Sandstedt * Remove reference to previously removed variable for config that turns the feature off * Addressed PR comments and enabled chip-tool for testing Added setters for SubscriptionInfo attribute and event paths Fixed wrong constant Enabled server interactions for chiptool * Changed storage of attribute/event paths to proper List/Structure TLV * Fixed attribute load * Make Unit Test names more unique and tighten CHIP_CONFIG_PERSIST_SUBSCRIPTIONS usage * Addressed PR comments and CI issues: Delete() error return clarification Comment doc cleanup Fix loop variable build warning Revert chip-tool server interactions enablement Co-authored-by: Michael Sandstedt --- src/app/BUILD.gn | 3 + src/app/InteractionModelEngine.cpp | 47 +- src/app/InteractionModelEngine.h | 9 +- src/app/ReadHandler.cpp | 122 ++++- src/app/ReadHandler.h | 50 +- .../SimpleSubscriptionResumptionStorage.cpp | 473 +++++++++++++++++ src/app/SimpleSubscriptionResumptionStorage.h | 145 ++++++ src/app/SubscriptionResumptionStorage.h | 158 ++++++ src/app/server/Server.cpp | 27 +- src/app/server/Server.h | 39 ++ src/app/tests/BUILD.gn | 1 + ...estSimpleSubscriptionResumptionStorage.cpp | 482 ++++++++++++++++++ src/darwin/Framework/CHIP/MTRDevice.mm | 2 +- src/lib/core/CHIPConfig.h | 22 + src/lib/support/DefaultStorageKeyAllocator.h | 7 + src/lib/support/ScopedBuffer.h | 28 +- src/platform/Darwin/CHIPPlatformConfig.h | 3 + src/platform/Linux/CHIPPlatformConfig.h | 3 + src/platform/cc13x2_26x2/CHIPPlatformConfig.h | 2 + 19 files changed, 1596 insertions(+), 27 deletions(-) create mode 100644 src/app/SimpleSubscriptionResumptionStorage.cpp create mode 100644 src/app/SimpleSubscriptionResumptionStorage.h create mode 100644 src/app/SubscriptionResumptionStorage.h create mode 100644 src/app/tests/TestSimpleSubscriptionResumptionStorage.cpp diff --git a/src/app/BUILD.gn b/src/app/BUILD.gn index 83d6c7d354a713..cef97a1328f93e 100644 --- a/src/app/BUILD.gn +++ b/src/app/BUILD.gn @@ -173,8 +173,11 @@ static_library("app") { "ReadHandler.cpp", "RequiredPrivilege.cpp", "RequiredPrivilege.h", + "SimpleSubscriptionResumptionStorage.cpp", + "SimpleSubscriptionResumptionStorage.h", "StatusResponse.cpp", "StatusResponse.h", + "SubscriptionResumptionStorage.h", "TimedHandler.cpp", "TimedHandler.h", "TimedRequest.cpp", diff --git a/src/app/InteractionModelEngine.cpp b/src/app/InteractionModelEngine.cpp index bba9cd763c27da..7662ca2d540832 100644 --- a/src/app/InteractionModelEngine.cpp +++ b/src/app/InteractionModelEngine.cpp @@ -50,14 +50,16 @@ InteractionModelEngine * InteractionModelEngine::GetInstance() } CHIP_ERROR InteractionModelEngine::Init(Messaging::ExchangeManager * apExchangeMgr, FabricTable * apFabricTable, - CASESessionManager * apCASESessionMgr) + CASESessionManager * apCASESessionMgr, + SubscriptionResumptionStorage * subscriptionResumptionStorage) { VerifyOrReturnError(apFabricTable != nullptr, CHIP_ERROR_INVALID_ARGUMENT); VerifyOrReturnError(apExchangeMgr != nullptr, CHIP_ERROR_INVALID_ARGUMENT); - mpExchangeMgr = apExchangeMgr; - mpFabricTable = apFabricTable; - mpCASESessionMgr = apCASESessionMgr; + mpExchangeMgr = apExchangeMgr; + mpFabricTable = apFabricTable; + mpCASESessionMgr = apCASESessionMgr; + mpSubscriptionResumptionStorage = subscriptionResumptionStorage; ReturnErrorOnFailure(mpFabricTable->AddFabricDelegate(this)); ReturnErrorOnFailure(mpExchangeMgr->RegisterUnsolicitedMessageHandlerForProtocol(Protocols::InteractionModel::Id, this)); @@ -664,6 +666,8 @@ Status InteractionModelEngine::OnUnsolicitedReportData(Messaging::ExchangeContex return Status::Success; } + ChipLogDetail(InteractionModel, "Received report with invalid subscriptionId %" PRIu32, subscriptionId); + return Status::InvalidSubscription; } @@ -1578,5 +1582,40 @@ void InteractionModelEngine::OnFabricRemoved(const FabricTable & fabricTable, Fa // the fabric removal, though, so they will fail when they try to actually send their command response // and will close at that point. } + +CHIP_ERROR InteractionModelEngine::ResumeSubscriptions() +{ +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + ReturnErrorCodeIf(!mpSubscriptionResumptionStorage, CHIP_NO_ERROR); + + SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo; + auto * iterator = mpSubscriptionResumptionStorage->IterateSubscriptions(); + while (iterator->Next(subscriptionInfo)) + { + auto requestedAttributePathCount = subscriptionInfo.mAttributePaths.AllocatedSize(); + auto requestedEventPathCount = subscriptionInfo.mEventPaths.AllocatedSize(); + if (!EnsureResourceForSubscription(subscriptionInfo.mFabricIndex, requestedAttributePathCount, requestedEventPathCount)) + { + ChipLogProgress(InteractionModel, "no resource for Subscription resumption"); + iterator->Release(); + return CHIP_ERROR_NO_MEMORY; + } + + ReadHandler * handler = mReadHandlers.CreateObject(*this); + if (handler == nullptr) + { + ChipLogProgress(InteractionModel, "no resource for ReadHandler creation"); + iterator->Release(); + return CHIP_ERROR_NO_MEMORY; + } + + handler->ResumeSubscription(*mpCASESessionMgr, subscriptionInfo); + } + iterator->Release(); +#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + + return CHIP_NO_ERROR; +} + } // namespace app } // namespace chip diff --git a/src/app/InteractionModelEngine.h b/src/app/InteractionModelEngine.h index 6475f8bceda547..c1bb9bd3b85260 100644 --- a/src/app/InteractionModelEngine.h +++ b/src/app/InteractionModelEngine.h @@ -114,7 +114,8 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler, * */ CHIP_ERROR Init(Messaging::ExchangeManager * apExchangeMgr, FabricTable * apFabricTable, - CASESessionManager * apCASESessionMgr = nullptr); + CASESessionManager * apCASESessionMgr = nullptr, + SubscriptionResumptionStorage * subscriptionResumptionStorage = nullptr); void Shutdown(); @@ -292,6 +293,10 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler, // virtual method from FabricTable::Delegate void OnFabricRemoved(const FabricTable & fabricTable, FabricIndex fabricIndex) override; + SubscriptionResumptionStorage * GetSubscriptionResumptionStorage() { return mpSubscriptionResumptionStorage; }; + + CHIP_ERROR ResumeSubscriptions(); + #if CONFIG_BUILD_FOR_HOST_UNIT_TEST // // Get direct access to the underlying read handler pool @@ -596,6 +601,8 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler, CASESessionManager * mpCASESessionMgr = nullptr; + SubscriptionResumptionStorage * mpSubscriptionResumptionStorage = nullptr; + // A magic number for tracking values between stack Shutdown()-s and Init()-s. // An ObjectHandle is valid iff. its magic equals to this one. uint32_t mMagic = 0; diff --git a/src/app/ReadHandler.cpp b/src/app/ReadHandler.cpp index 14b79dda3066a9..b393edad4fc01e 100644 --- a/src/app/ReadHandler.cpp +++ b/src/app/ReadHandler.cpp @@ -42,6 +42,10 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon InteractionType aInteractionType) : mExchangeCtx(*this), mManagementCallback(apCallback) +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + , + mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this) +#endif { VerifyOrDie(apExchangeContext != nullptr); @@ -61,6 +65,54 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon mSessionHandle.Grab(mExchangeCtx->GetSessionHandle()); } +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS +ReadHandler::ReadHandler(ManagementCallback & apCallback) : + mExchangeCtx(*this), mManagementCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this), + mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this) +{ + mInteractionType = InteractionType::Subscribe; + mFlags.ClearAll(); +} + +void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager, + SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo) +{ + mSubscriptionId = subscriptionInfo.mSubscriptionId; + mMinIntervalFloorSeconds = subscriptionInfo.mMinInterval; + mMaxInterval = subscriptionInfo.mMaxInterval; + SetStateFlag(ReadHandlerFlags::FabricFiltered, subscriptionInfo.mFabricFiltered); + + // Move dynamically allocated attributes and events from the SubscriptionInfo struct into + // the object pool managed by the IM engine + for (size_t i = 0; i < subscriptionInfo.mAttributePaths.AllocatedSize(); i++) + { + AttributePathParams attributePathParams = subscriptionInfo.mAttributePaths[i].GetParams(); + CHIP_ERROR err = + InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, attributePathParams); + if (err != CHIP_NO_ERROR) + { + Close(); + return; + } + } + for (size_t i = 0; i < subscriptionInfo.mEventPaths.AllocatedSize(); i++) + { + EventPathParams eventPathParams = subscriptionInfo.mEventPaths[i].GetParams(); + CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, eventPathParams); + if (err != CHIP_NO_ERROR) + { + Close(); + return; + } + } + + // Ask IM engine to start CASE session with subscriber + ScopedNodeId peerNode = ScopedNodeId(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex); + caseSessionManager.FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback); +} + +#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + ReadHandler::~ReadHandler() { auto * appCallback = mManagementCallback.GetAppCallback(); @@ -87,8 +139,18 @@ ReadHandler::~ReadHandler() InteractionModelEngine::GetInstance()->ReleaseDataVersionFilterList(mpDataVersionFilterList); } -void ReadHandler::Close() +void ReadHandler::Close(CloseOptions options) { +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + if (options == CloseOptions::kDropPersistedSubscription) + { + auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage(); + if (subscriptionResumptionStorage) + { + subscriptionResumptionStorage->Delete(GetInitiatorNodeId(), GetAccessingFabricIndex(), mSubscriptionId); + } + } +#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS MoveToState(HandlerState::AwaitingDestruction); mManagementCallback.OnDone(*this); } @@ -306,7 +368,12 @@ void ReadHandler::OnResponseTimeout(Messaging::ExchangeContext * apExchangeConte { ChipLogError(DataManagement, "Time out! failed to receive status response from Exchange: " ChipLogFormatExchange, ChipLogValueExchange(apExchangeContext)); +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + // TODO: Have a retry mechanism tied to wake interval for IC devices + Close(CloseOptions::kKeepPersistedSubscription); +#else Close(); +#endif } CHIP_ERROR ReadHandler::ProcessReadRequest(System::PacketBufferHandle && aPayload) @@ -652,9 +719,34 @@ CHIP_ERROR ReadHandler::ProcessSubscribeRequest(System::PacketBufferHandle && aP mExchangeCtx->WillSendMessage(); +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + PersistSubscription(); +#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + return CHIP_NO_ERROR; } +void ReadHandler::PersistSubscription() +{ + auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage(); + VerifyOrReturn(subscriptionResumptionStorage != nullptr); + + SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo = { .mNodeId = GetInitiatorNodeId(), + .mFabricIndex = GetAccessingFabricIndex(), + .mSubscriptionId = mSubscriptionId, + .mMinInterval = mMinIntervalFloorSeconds, + .mMaxInterval = mMaxInterval, + .mFabricFiltered = IsFabricFiltered() }; + VerifyOrReturn(subscriptionInfo.SetAttributePaths(mpAttributePathList) == CHIP_NO_ERROR); + VerifyOrReturn(subscriptionInfo.SetEventPaths(mpEventPathList) == CHIP_NO_ERROR); + + CHIP_ERROR err = subscriptionResumptionStorage->Save(subscriptionInfo); + if (err != CHIP_NO_ERROR) + { + ChipLogError(DataManagement, "Failed to save subscription info error: '%" CHIP_ERROR_FORMAT, err.Format()); + } +} + void ReadHandler::OnUnblockHoldReportCallback(System::Layer * apSystemLayer, void * apAppState) { VerifyOrReturn(apAppState != nullptr); @@ -767,5 +859,33 @@ void ReadHandler::ClearStateFlag(ReadHandlerFlags aFlag) SetStateFlag(aFlag, false); } +void ReadHandler::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr, + const SessionHandle & sessionHandle) +{ + ReadHandler * const _this = static_cast(context); + + _this->mSessionHandle.Grab(sessionHandle); + + _this->MoveToState(HandlerState::GeneratingReports); + + ObjectList * attributePath = _this->mpAttributePathList; + while (attributePath) + { + InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue); + attributePath = attributePath->mpNext; + } +} + +void ReadHandler::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR err) +{ + ReadHandler * const _this = static_cast(context); + VerifyOrDie(_this != nullptr); + + // TODO: Have a retry mechanism tied to wake interval for IC devices + ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'", + err.Format()); + _this->Close(); +} + } // namespace app } // namespace chip diff --git a/src/app/ReadHandler.h b/src/app/ReadHandler.h index 1e4f55066e9f3b..a933aa76e51389 100644 --- a/src/app/ReadHandler.h +++ b/src/app/ReadHandler.h @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -36,6 +37,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -165,6 +169,17 @@ class ReadHandler : public Messaging::ExchangeDelegate */ ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeContext * apExchangeContext, InteractionType aInteractionType); +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + /** + * + * Constructor in preparation for resuming a persisted subscription + * + * The callback passed in has to outlive this handler object. + * + */ + ReadHandler(ManagementCallback & apCallback); +#endif + const ObjectList * GetAttributePathList() const { return mpAttributePathList; } const ObjectList * GetEventPathList() const { return mpEventPathList; } const ObjectList * GetDataVersionFilterList() const { return mpDataVersionFilterList; } @@ -243,6 +258,18 @@ class ReadHandler : public Messaging::ExchangeDelegate */ void OnInitialRequest(System::PacketBufferHandle && aPayload); +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + /** + * + * @brief Resume a persisted subscription + * + * Used after ReadHandler(ManagementCallback & apCallback). This will start a CASE session + * with the subscriber if one doesn't already exist, and send full priming report when connected. + */ + void ResumeSubscription(CASESessionManager & caseSessionManager, + SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo); +#endif + /** * Send ReportData to initiator * @@ -355,11 +382,19 @@ class ReadHandler : public Messaging::ExchangeDelegate AwaitingDestruction, ///< The object has completed its work and is awaiting destruction by the application. }; + enum class CloseOptions + { + kDropPersistedSubscription, + kKeepPersistedSubscription + }; /** * Called internally to signal the completion of all work on this objecta and signal to a registered callback that it's * safe to release this object. + * + * @param options This specifies whether to drop or keep the subscription + * */ - void Close(); + void Close(CloseOptions options = CloseOptions::kDropPersistedSubscription); static void OnUnblockHoldReportCallback(System::Layer * apSystemLayer, void * apAppState); static void OnRefreshSubscribeTimerSyncCallback(System::Layer * apSystemLayer, void * apAppState); @@ -379,10 +414,17 @@ class ReadHandler : public Messaging::ExchangeDelegate const char * GetStateStr() const; + void PersistSubscription(); + // Helpers for managing our state flags properly. void SetStateFlag(ReadHandlerFlags aFlag, bool aValue = true); void ClearStateFlag(ReadHandlerFlags aFlag); + // Helpers for continuing the subscription resumption + static void HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr, + const SessionHandle & sessionHandle); + static void HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error); + AttributePathExpandIterator mAttributePathExpandIterator = AttributePathExpandIterator(nullptr); // The current generation of the reporting engine dirty set the last time we were notified that a path we're interested in was @@ -461,6 +503,12 @@ class ReadHandler : public Messaging::ExchangeDelegate PriorityLevel mCurrentPriority = PriorityLevel::Invalid; BitFlags mFlags; InteractionType mInteractionType = InteractionType::Read; + +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + // Callbacks to handle server-initiated session success/failure + chip::Callback::Callback mOnConnectedCallback; + chip::Callback::Callback mOnConnectionFailureCallback; +#endif }; } // namespace app } // namespace chip diff --git a/src/app/SimpleSubscriptionResumptionStorage.cpp b/src/app/SimpleSubscriptionResumptionStorage.cpp new file mode 100644 index 00000000000000..1f51f751639522 --- /dev/null +++ b/src/app/SimpleSubscriptionResumptionStorage.cpp @@ -0,0 +1,473 @@ +/* + * Copyright (c) 2023 Project CHIP Authors + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * This file defines a basic implementation of SubscriptionResumptionStorage that + * persists subscriptions in a flat list in TLV. + */ + +#include + +// TODO: move the conditional compilation into BUILD.gn config options +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + +#include +#include +#include +#include + +namespace chip { +namespace app { + +constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kPeerNodeIdTag; +constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kFabricIndexTag; +constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kSubscriptionIdTag; +constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kMinIntervalTag; +constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kMaxIntervalTag; +constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kFabricFilteredTag; +constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kAttributePathsListTag; +constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kEventPathsListTag; +constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kAttributePathTag; +constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kEventPathTag; +constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kEndpointIdTag; +constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kClusterIdTag; +constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kAttributeIdTag; +constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kEventIdTag; +constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kEventPathTypeTag; + +SimpleSubscriptionResumptionStorage::SimpleSubscriptionInfoIterator::SimpleSubscriptionInfoIterator( + SimpleSubscriptionResumptionStorage & storage) : + mStorage(storage) +{ + mNextIndex = 0; +} + +size_t SimpleSubscriptionResumptionStorage::SimpleSubscriptionInfoIterator::Count() +{ + return static_cast(mStorage.Count()); +} + +bool SimpleSubscriptionResumptionStorage::SimpleSubscriptionInfoIterator::Next(SubscriptionInfo & output) +{ + for (; mNextIndex < CHIP_IM_MAX_NUM_SUBSCRIPTIONS; mNextIndex++) + { + CHIP_ERROR err = mStorage.Load(mNextIndex, output); + if (err == CHIP_NO_ERROR) + { + // increment index for the next call + mNextIndex++; + return true; + } + + if (err != CHIP_ERROR_PERSISTED_STORAGE_VALUE_NOT_FOUND) + { + ChipLogError(DataManagement, "Failed to load subscription at index %u error %" CHIP_ERROR_FORMAT, + static_cast(mNextIndex), err.Format()); + mStorage.Delete(mNextIndex); + } + } + + return false; +} + +void SimpleSubscriptionResumptionStorage::SimpleSubscriptionInfoIterator::Release() +{ + mStorage.mSubscriptionInfoIterators.ReleaseObject(this); +} + +CHIP_ERROR SimpleSubscriptionResumptionStorage::Init(PersistentStorageDelegate * storage) +{ + VerifyOrReturnError(storage != nullptr, CHIP_ERROR_INVALID_ARGUMENT); + mStorage = storage; + + uint16_t countMax; + uint16_t len = sizeof(countMax); + CHIP_ERROR err = + mStorage->SyncGetKeyValue(DefaultStorageKeyAllocator::SubscriptionResumptionMaxCount().KeyName(), &countMax, len); + // If there's a previous countMax and it's larger than CHIP_IM_MAX_NUM_SUBSCRIPTIONS, + // clean up subscriptions beyond the limit + if ((err == CHIP_NO_ERROR) && (countMax != CHIP_IM_MAX_NUM_SUBSCRIPTIONS)) + { + for (uint16_t subscriptionIndex = CHIP_IM_MAX_NUM_SUBSCRIPTIONS; subscriptionIndex < countMax; subscriptionIndex++) + { + Delete(subscriptionIndex); + } + } + + // Always save the current CHIP_IM_MAX_NUM_SUBSCRIPTIONS + uint16_t countMaxToSave = CHIP_IM_MAX_NUM_SUBSCRIPTIONS; + ReturnErrorOnFailure(mStorage->SyncSetKeyValue(DefaultStorageKeyAllocator::SubscriptionResumptionMaxCount().KeyName(), + &countMaxToSave, sizeof(uint16_t))); + + return CHIP_NO_ERROR; +} + +SubscriptionResumptionStorage::SubscriptionInfoIterator * SimpleSubscriptionResumptionStorage::IterateSubscriptions() +{ + return mSubscriptionInfoIterators.CreateObject(*this); +} + +uint16_t SimpleSubscriptionResumptionStorage::Count() +{ + uint16_t subscriptionCount = 0; + for (uint16_t subscriptionIndex = 0; subscriptionIndex < CHIP_IM_MAX_NUM_SUBSCRIPTIONS; subscriptionIndex++) + { + if (mStorage->SyncDoesKeyExist(DefaultStorageKeyAllocator::SubscriptionResumption(subscriptionIndex).KeyName())) + { + subscriptionCount++; + } + } + + return subscriptionCount; +} + +CHIP_ERROR SimpleSubscriptionResumptionStorage::Delete(uint16_t subscriptionIndex) +{ + return mStorage->SyncDeleteKeyValue(DefaultStorageKeyAllocator::SubscriptionResumption(subscriptionIndex).KeyName()); +} + +CHIP_ERROR SimpleSubscriptionResumptionStorage::Load(uint16_t subscriptionIndex, SubscriptionInfo & subscriptionInfo) +{ + Platform::ScopedMemoryBuffer backingBuffer; + backingBuffer.Calloc(MaxSubscriptionSize()); + ReturnErrorCodeIf(backingBuffer.Get() == nullptr, CHIP_ERROR_NO_MEMORY); + + uint16_t len = static_cast(MaxSubscriptionSize()); + ReturnErrorOnFailure(mStorage->SyncGetKeyValue(DefaultStorageKeyAllocator::SubscriptionResumption(subscriptionIndex).KeyName(), + backingBuffer.Get(), len)); + + TLV::ScopedBufferTLVReader reader(std::move(backingBuffer), len); + + ReturnErrorOnFailure(reader.Next(TLV::kTLVType_Structure, TLV::AnonymousTag())); + + TLV::TLVType subscriptionContainerType; + ReturnErrorOnFailure(reader.EnterContainer(subscriptionContainerType)); + + // Node ID + ReturnErrorOnFailure(reader.Next(kPeerNodeIdTag)); + ReturnErrorOnFailure(reader.Get(subscriptionInfo.mNodeId)); + + // Fabric index + ReturnErrorOnFailure(reader.Next(kFabricIndexTag)); + ReturnErrorOnFailure(reader.Get(subscriptionInfo.mFabricIndex)); + + // Subscription ID + ReturnErrorOnFailure(reader.Next(kSubscriptionIdTag)); + ReturnErrorOnFailure(reader.Get(subscriptionInfo.mSubscriptionId)); + + // Min interval + ReturnErrorOnFailure(reader.Next(kMinIntervalTag)); + ReturnErrorOnFailure(reader.Get(subscriptionInfo.mMinInterval)); + + // Max interval + ReturnErrorOnFailure(reader.Next(kMaxIntervalTag)); + ReturnErrorOnFailure(reader.Get(subscriptionInfo.mMaxInterval)); + + // Fabric filtered boolean + ReturnErrorOnFailure(reader.Next(kFabricFilteredTag)); + ReturnErrorOnFailure(reader.Get(subscriptionInfo.mFabricFiltered)); + + // Attribute Paths + ReturnErrorOnFailure(reader.Next(TLV::kTLVType_List, kAttributePathsListTag)); + TLV::TLVType attributesListType; + ReturnErrorOnFailure(reader.EnterContainer(attributesListType)); + + size_t pathCount = 0; + ReturnErrorOnFailure(reader.CountRemainingInContainer(&pathCount)); + + // If a stack struct is being reused to iterate, free the previous paths ScopedMemoryBuffer + subscriptionInfo.mAttributePaths.Free(); + if (pathCount) + { + subscriptionInfo.mAttributePaths.Calloc(pathCount); + ReturnErrorCodeIf(subscriptionInfo.mAttributePaths.Get() == nullptr, CHIP_ERROR_NO_MEMORY); + for (size_t pathIndex = 0; pathIndex < pathCount; pathIndex++) + { + ReturnErrorOnFailure(reader.Next(TLV::kTLVType_Structure, kAttributePathTag)); + TLV::TLVType attributeContainerType; + ReturnErrorOnFailure(reader.EnterContainer(attributeContainerType)); + + ReturnErrorOnFailure(reader.Next(kEndpointIdTag)); + ReturnErrorOnFailure(reader.Get(subscriptionInfo.mAttributePaths[pathIndex].mEndpointId)); + + ReturnErrorOnFailure(reader.Next(kClusterIdTag)); + ReturnErrorOnFailure(reader.Get(subscriptionInfo.mAttributePaths[pathIndex].mClusterId)); + + ReturnErrorOnFailure(reader.Next(kAttributeIdTag)); + ReturnErrorOnFailure(reader.Get(subscriptionInfo.mAttributePaths[pathIndex].mAttributeId)); + + ReturnErrorOnFailure(reader.ExitContainer(attributeContainerType)); + } + } + ReturnErrorOnFailure(reader.ExitContainer(attributesListType)); + + // Event Paths + ReturnErrorOnFailure(reader.Next(TLV::kTLVType_List, kEventPathsListTag)); + TLV::TLVType eventsListType; + ReturnErrorOnFailure(reader.EnterContainer(eventsListType)); + + ReturnErrorOnFailure(reader.CountRemainingInContainer(&pathCount)); + + // If a stack struct is being reused to iterate, free the previous paths ScopedMemoryBuffer + subscriptionInfo.mEventPaths.Free(); + if (pathCount) + { + subscriptionInfo.mEventPaths.Calloc(pathCount); + ReturnErrorCodeIf(subscriptionInfo.mEventPaths.Get() == nullptr, CHIP_ERROR_NO_MEMORY); + for (size_t pathIndex = 0; pathIndex < pathCount; pathIndex++) + { + ReturnErrorOnFailure(reader.Next(TLV::kTLVType_Structure, kEventPathTag)); + TLV::TLVType eventContainerType; + ReturnErrorOnFailure(reader.EnterContainer(eventContainerType)); + + EventPathType eventPathType; + ReturnErrorOnFailure(reader.Next(kEventPathTypeTag)); + ReturnErrorOnFailure(reader.Get(eventPathType)); + + subscriptionInfo.mEventPaths[pathIndex].mIsUrgentEvent = (eventPathType == EventPathType::kUrgent); + + ReturnErrorOnFailure(reader.Next(kEndpointIdTag)); + ReturnErrorOnFailure(reader.Get(subscriptionInfo.mEventPaths[pathIndex].mEndpointId)); + + ReturnErrorOnFailure(reader.Next(kClusterIdTag)); + ReturnErrorOnFailure(reader.Get(subscriptionInfo.mEventPaths[pathIndex].mClusterId)); + + ReturnErrorOnFailure(reader.Next(kEventIdTag)); + ReturnErrorOnFailure(reader.Get(subscriptionInfo.mEventPaths[pathIndex].mEventId)); + + ReturnErrorOnFailure(reader.ExitContainer(eventContainerType)); + } + } + ReturnErrorOnFailure(reader.ExitContainer(eventsListType)); + + ReturnErrorOnFailure(reader.ExitContainer(subscriptionContainerType)); + + return CHIP_NO_ERROR; +} + +CHIP_ERROR SimpleSubscriptionResumptionStorage::Save(TLV::TLVWriter & writer, SubscriptionInfo & subscriptionInfo) +{ + TLV::TLVType subscriptionContainerType; + ReturnErrorOnFailure(writer.StartContainer(TLV::AnonymousTag(), TLV::kTLVType_Structure, subscriptionContainerType)); + ReturnErrorOnFailure(writer.Put(kPeerNodeIdTag, subscriptionInfo.mNodeId)); + ReturnErrorOnFailure(writer.Put(kFabricIndexTag, subscriptionInfo.mFabricIndex)); + ReturnErrorOnFailure(writer.Put(kSubscriptionIdTag, subscriptionInfo.mSubscriptionId)); + ReturnErrorOnFailure(writer.Put(kMinIntervalTag, subscriptionInfo.mMinInterval)); + ReturnErrorOnFailure(writer.Put(kMaxIntervalTag, subscriptionInfo.mMaxInterval)); + ReturnErrorOnFailure(writer.Put(kFabricFilteredTag, subscriptionInfo.mFabricFiltered)); + + // Attribute paths + TLV::TLVType attributesListType; + ReturnErrorOnFailure(writer.StartContainer(kAttributePathsListTag, TLV::kTLVType_List, attributesListType)); + for (size_t pathIndex = 0; pathIndex < subscriptionInfo.mAttributePaths.AllocatedSize(); pathIndex++) + { + TLV::TLVType attributeContainerType = TLV::kTLVType_Structure; + ReturnErrorOnFailure(writer.StartContainer(kAttributePathTag, TLV::kTLVType_Structure, attributeContainerType)); + + ReturnErrorOnFailure(writer.Put(kEndpointIdTag, subscriptionInfo.mAttributePaths[pathIndex].mEndpointId)); + ReturnErrorOnFailure(writer.Put(kClusterIdTag, subscriptionInfo.mAttributePaths[pathIndex].mClusterId)); + ReturnErrorOnFailure(writer.Put(kAttributeIdTag, subscriptionInfo.mAttributePaths[pathIndex].mAttributeId)); + + ReturnErrorOnFailure(writer.EndContainer(attributeContainerType)); + } + ReturnErrorOnFailure(writer.EndContainer(attributesListType)); + + // Event paths + TLV::TLVType eventsListType; + ReturnErrorOnFailure(writer.StartContainer(kEventPathsListTag, TLV::kTLVType_List, eventsListType)); + for (size_t pathIndex = 0; pathIndex < subscriptionInfo.mEventPaths.AllocatedSize(); pathIndex++) + { + TLV::TLVType eventContainerType = TLV::kTLVType_Structure; + ReturnErrorOnFailure(writer.StartContainer(kEventPathTag, TLV::kTLVType_Structure, eventContainerType)); + + if (subscriptionInfo.mEventPaths[pathIndex].mIsUrgentEvent) + { + ReturnErrorOnFailure(writer.Put(kEventPathTypeTag, EventPathType::kUrgent)); + } + else + { + ReturnErrorOnFailure(writer.Put(kEventPathTypeTag, EventPathType::kNonUrgent)); + } + ReturnErrorOnFailure(writer.Put(kEndpointIdTag, subscriptionInfo.mEventPaths[pathIndex].mEndpointId)); + ReturnErrorOnFailure(writer.Put(kClusterIdTag, subscriptionInfo.mEventPaths[pathIndex].mClusterId)); + ReturnErrorOnFailure(writer.Put(kEventIdTag, subscriptionInfo.mEventPaths[pathIndex].mEventId)); + + ReturnErrorOnFailure(writer.EndContainer(eventContainerType)); + } + ReturnErrorOnFailure(writer.EndContainer(eventsListType)); + + ReturnErrorOnFailure(writer.EndContainer(subscriptionContainerType)); + + return CHIP_NO_ERROR; +} + +CHIP_ERROR SimpleSubscriptionResumptionStorage::Save(SubscriptionInfo & subscriptionInfo) +{ + // Find empty index or duplicate if exists + uint16_t subscriptionIndex; + uint16_t firstEmptySubscriptionIndex = CHIP_IM_MAX_NUM_SUBSCRIPTIONS; // initialize to out of bounds as "not set" + for (subscriptionIndex = 0; subscriptionIndex < CHIP_IM_MAX_NUM_SUBSCRIPTIONS; subscriptionIndex++) + { + SubscriptionInfo currentSubscriptionInfo; + CHIP_ERROR err = Load(subscriptionIndex, currentSubscriptionInfo); + + // if empty and firstEmptySubscriptionIndex isn't set yet, then mark empty spot + if ((firstEmptySubscriptionIndex == CHIP_IM_MAX_NUM_SUBSCRIPTIONS) && (err == CHIP_ERROR_PERSISTED_STORAGE_VALUE_NOT_FOUND)) + { + firstEmptySubscriptionIndex = subscriptionIndex; + } + + // delete duplicate + if (err == CHIP_NO_ERROR) + { + if ((subscriptionInfo.mNodeId == currentSubscriptionInfo.mNodeId) && + (subscriptionInfo.mFabricIndex == currentSubscriptionInfo.mFabricIndex) && + (subscriptionInfo.mSubscriptionId == currentSubscriptionInfo.mSubscriptionId)) + { + Delete(subscriptionIndex); + // if duplicate is the first empty spot, then also set it + if (firstEmptySubscriptionIndex == CHIP_IM_MAX_NUM_SUBSCRIPTIONS) + { + firstEmptySubscriptionIndex = subscriptionIndex; + } + } + } + } + + // Fail if no empty space + if (firstEmptySubscriptionIndex == CHIP_IM_MAX_NUM_SUBSCRIPTIONS) + { + return CHIP_ERROR_NO_MEMORY; + } + + // Now construct subscription state and save + Platform::ScopedMemoryBuffer backingBuffer; + backingBuffer.Calloc(MaxSubscriptionSize()); + ReturnErrorCodeIf(backingBuffer.Get() == nullptr, CHIP_ERROR_NO_MEMORY); + + TLV::ScopedBufferTLVWriter writer(std::move(backingBuffer), MaxSubscriptionSize()); + + ReturnErrorOnFailure(Save(writer, subscriptionInfo)); + + const auto len = writer.GetLengthWritten(); + VerifyOrReturnError(CanCastTo(len), CHIP_ERROR_BUFFER_TOO_SMALL); + + writer.Finalize(backingBuffer); + + ReturnErrorOnFailure( + mStorage->SyncSetKeyValue(DefaultStorageKeyAllocator::SubscriptionResumption(firstEmptySubscriptionIndex).KeyName(), + backingBuffer.Get(), static_cast(len))); + + return CHIP_NO_ERROR; +} + +CHIP_ERROR SimpleSubscriptionResumptionStorage::Delete(NodeId nodeId, FabricIndex fabricIndex, SubscriptionId subscriptionId) +{ + bool subscriptionFound = false; + CHIP_ERROR lastDeleteErr = CHIP_NO_ERROR; + + uint16_t remainingSubscriptionsCount = 0; + for (uint16_t subscriptionIndex = 0; subscriptionIndex < CHIP_IM_MAX_NUM_SUBSCRIPTIONS; subscriptionIndex++) + { + SubscriptionInfo subscriptionInfo; + CHIP_ERROR err = Load(subscriptionIndex, subscriptionInfo); + + // delete match + if (err == CHIP_NO_ERROR) + { + if ((nodeId == subscriptionInfo.mNodeId) && (fabricIndex == subscriptionInfo.mFabricIndex) && + (subscriptionId == subscriptionInfo.mSubscriptionId)) + { + subscriptionFound = true; + CHIP_ERROR deleteErr = Delete(subscriptionIndex); + if (deleteErr != CHIP_NO_ERROR) + { + lastDeleteErr = deleteErr; + } + } + else + { + remainingSubscriptionsCount++; + } + } + } + + // if there are no persisted subscriptions, the MaxCount can also be deleted + if (remainingSubscriptionsCount == 0) + { + DeleteMaxCount(); + } + + if (lastDeleteErr != CHIP_NO_ERROR) + { + return lastDeleteErr; + } + + return subscriptionFound ? CHIP_NO_ERROR : CHIP_ERROR_PERSISTED_STORAGE_VALUE_NOT_FOUND; +} + +CHIP_ERROR SimpleSubscriptionResumptionStorage::DeleteMaxCount() +{ + return mStorage->SyncDeleteKeyValue(DefaultStorageKeyAllocator::SubscriptionResumptionMaxCount().KeyName()); +} + +CHIP_ERROR SimpleSubscriptionResumptionStorage::DeleteAll(FabricIndex fabricIndex) +{ + CHIP_ERROR deleteErr = CHIP_NO_ERROR; + + uint16_t count = 0; + for (uint16_t subscriptionIndex = 0; subscriptionIndex < CHIP_IM_MAX_NUM_SUBSCRIPTIONS; subscriptionIndex++) + { + SubscriptionInfo subscriptionInfo; + CHIP_ERROR err = Load(subscriptionIndex, subscriptionInfo); + + if (err == CHIP_NO_ERROR) + { + if (fabricIndex == subscriptionInfo.mFabricIndex) + { + err = Delete(subscriptionIndex); + if ((err != CHIP_NO_ERROR) && (err != CHIP_ERROR_PERSISTED_STORAGE_VALUE_NOT_FOUND)) + { + deleteErr = err; + } + } + else + { + count++; + } + } + } + + // if there are no persisted subscriptions, the MaxCount can also be deleted + if (count == 0) + { + CHIP_ERROR err = DeleteMaxCount(); + + if ((err != CHIP_NO_ERROR) && (err != CHIP_ERROR_PERSISTED_STORAGE_VALUE_NOT_FOUND)) + { + deleteErr = err; + } + } + + return deleteErr; +} + +} // namespace app +} // namespace chip + +#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS diff --git a/src/app/SimpleSubscriptionResumptionStorage.h b/src/app/SimpleSubscriptionResumptionStorage.h new file mode 100644 index 00000000000000..95efa2d032aac8 --- /dev/null +++ b/src/app/SimpleSubscriptionResumptionStorage.h @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2023 Project CHIP Authors + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * This file defines a basic implementation of SubscriptionResumptionStorage that + * persists subscriptions in a flat list in TLV. + */ + +#pragma once + +#include + +// TODO: move the conditional compilation into BUILD.gn config options +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + +#include +#include +#include + +namespace chip { +namespace app { + +/** + * An example SubscriptionResumptionStorage using PersistentStorageDelegate as it backend. + */ +class SimpleSubscriptionResumptionStorage : public SubscriptionResumptionStorage +{ +public: + static constexpr size_t kIteratorsMax = CHIP_CONFIG_MAX_SUBSCRIPTION_RESUMPTION_STORAGE_CONCURRENT_ITERATORS; + + CHIP_ERROR Init(PersistentStorageDelegate * storage); + + SubscriptionInfoIterator * IterateSubscriptions() override; + + CHIP_ERROR Save(SubscriptionInfo & subscriptionInfo) override; + + CHIP_ERROR Delete(NodeId nodeId, FabricIndex fabricIndex, SubscriptionId subscriptionId) override; + + CHIP_ERROR DeleteAll(FabricIndex fabricIndex) override; + +protected: + CHIP_ERROR Save(TLV::TLVWriter & writer, SubscriptionInfo & subscriptionInfo); + CHIP_ERROR Load(uint16_t subscriptionIndex, SubscriptionInfo & subscriptionInfo); + CHIP_ERROR Delete(uint16_t subscriptionIndex); + uint16_t Count(); + CHIP_ERROR DeleteMaxCount(); + + class SimpleSubscriptionInfoIterator : public SubscriptionInfoIterator + { + public: + SimpleSubscriptionInfoIterator(SimpleSubscriptionResumptionStorage & storage); + size_t Count() override; + bool Next(SubscriptionInfo & output) override; + void Release() override; + + private: + SimpleSubscriptionResumptionStorage & mStorage; + uint16_t mNextIndex; + }; + + static constexpr size_t MaxScopedNodeIdSize() { return TLV::EstimateStructOverhead(sizeof(NodeId), sizeof(FabricIndex)); } + + static constexpr size_t MaxSubscriptionPathsSize() + { + // IM engine declares an attribute path pool and an event path pool, and each pool + // includes CHIP_IM_SERVER_MAX_NUM_PATH_GROUPS_FOR_SUBSCRIPTIONS for subscriptions + return 2 * + TLV::EstimateStructOverhead( + TLV::EstimateStructOverhead(sizeof(uint8_t), sizeof(EndpointId), sizeof(ClusterId), sizeof(AttributeId)) * + CHIP_IM_SERVER_MAX_NUM_PATH_GROUPS_FOR_SUBSCRIPTIONS); + } + + static constexpr size_t MaxSubscriptionSize() + { + // All the fields added together + return TLV::EstimateStructOverhead(MaxScopedNodeIdSize(), sizeof(SubscriptionId), sizeof(uint16_t), sizeof(uint16_t), + sizeof(bool), MaxSubscriptionPathsSize()); + } + + enum class EventPathType : uint8_t + { + kUrgent = 0x1, + kNonUrgent = 0x2, + }; + + // Flat list of subscriptions indexed from from 0 to CHIP_IM_SERVER_MAX_NUM_PATH_GROUPS_FOR_SUBSCRIPTIONS-1 + // + // Each entry in list is a Subscription TLV structure: + // Structure of: (Subscription info) + // Node ID + // Fabric Index + // Subscription ID + // Min interval + // Max interval + // Fabric filtered boolean + // List of: + // Structure of: (Attribute path) + // Endpoint ID + // Cluster ID + // Attribute ID + // List of: + // Structure of: (Event path) + // Event subscription type (urgent / non-urgent) + // Endpoint ID + // Cluster ID + // Event ID + + static constexpr TLV::Tag kPeerNodeIdTag = TLV::ContextTag(1); + static constexpr TLV::Tag kFabricIndexTag = TLV::ContextTag(2); + static constexpr TLV::Tag kSubscriptionIdTag = TLV::ContextTag(3); + static constexpr TLV::Tag kMinIntervalTag = TLV::ContextTag(4); + static constexpr TLV::Tag kMaxIntervalTag = TLV::ContextTag(5); + static constexpr TLV::Tag kFabricFilteredTag = TLV::ContextTag(6); + static constexpr TLV::Tag kAttributePathsListTag = TLV::ContextTag(7); + static constexpr TLV::Tag kEventPathsListTag = TLV::ContextTag(8); + static constexpr TLV::Tag kAttributePathTag = TLV::ContextTag(9); + static constexpr TLV::Tag kEventPathTag = TLV::ContextTag(10); + static constexpr TLV::Tag kEndpointIdTag = TLV::ContextTag(11); + static constexpr TLV::Tag kClusterIdTag = TLV::ContextTag(12); + static constexpr TLV::Tag kAttributeIdTag = TLV::ContextTag(13); + static constexpr TLV::Tag kEventIdTag = TLV::ContextTag(14); + static constexpr TLV::Tag kEventPathTypeTag = TLV::ContextTag(16); + + PersistentStorageDelegate * mStorage; + ObjectPool mSubscriptionInfoIterators; +}; +} // namespace app +} // namespace chip + +#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS diff --git a/src/app/SubscriptionResumptionStorage.h b/src/app/SubscriptionResumptionStorage.h new file mode 100644 index 00000000000000..38a522fc445add --- /dev/null +++ b/src/app/SubscriptionResumptionStorage.h @@ -0,0 +1,158 @@ +/* + * + * Copyright (c) 2023 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * This file defines the interface to store subscription information. + */ + +#pragma once + +#include +#include +#include + +namespace chip { +namespace app { + +/** + * The SubscriptionResumptionStorage interface is used to persist subscriptions when they are established. + */ +class SubscriptionResumptionStorage +{ +public: + // Structs to hold path param values as is_trivial struct + struct AttributePathParamsValues + { + ClusterId mClusterId; + AttributeId mAttributeId; + EndpointId mEndpointId; + void SetValues(const AttributePathParams & params) + { + mEndpointId = params.mEndpointId; + mClusterId = params.mClusterId; + mAttributeId = params.mAttributeId; + } + AttributePathParams GetParams() { return AttributePathParams(mEndpointId, mClusterId, mAttributeId); } + }; + struct EventPathParamsValues + { + ClusterId mClusterId; + EventId mEventId; + EndpointId mEndpointId; + bool mIsUrgentEvent; + void SetValues(const EventPathParams & params) + { + mEndpointId = params.mEndpointId; + mClusterId = params.mClusterId; + mEventId = params.mEventId; + mIsUrgentEvent = params.mIsUrgentEvent; + } + EventPathParams GetParams() { return EventPathParams(mEndpointId, mClusterId, mEventId, mIsUrgentEvent); } + }; + + /** + * Struct to hold information about subscriptions + */ + struct SubscriptionInfo + { + NodeId mNodeId; + FabricIndex mFabricIndex; + SubscriptionId mSubscriptionId; + uint16_t mMinInterval; + uint16_t mMaxInterval; + bool mFabricFiltered; + Platform::ScopedMemoryBufferWithSize mAttributePaths; + Platform::ScopedMemoryBufferWithSize mEventPaths; + CHIP_ERROR SetAttributePaths(const ObjectList * pAttributePathList) + { + mAttributePaths.Free(); + const ObjectList * attributePath = pAttributePathList; + size_t attributePathCount = 0; + while (attributePath) + { + attributePathCount++; + attributePath = attributePath->mpNext; + } + ReturnErrorCodeIf((attributePathCount * sizeof(AttributePathParamsValues)) > UINT16_MAX, CHIP_ERROR_NO_MEMORY); + mAttributePaths.Calloc(attributePathCount); + ReturnErrorCodeIf(mAttributePaths.Get() == nullptr, CHIP_ERROR_NO_MEMORY); + attributePath = pAttributePathList; + for (size_t i = 0; i < attributePathCount; i++) + { + mAttributePaths[i].SetValues(attributePath->mValue); + attributePath = attributePath->mpNext; + } + return CHIP_NO_ERROR; + } + CHIP_ERROR SetEventPaths(const ObjectList * pEventPathList) + { + mEventPaths.Free(); + const ObjectList * eventPath = pEventPathList; + size_t eventPathCount = 0; + while (eventPath) + { + eventPathCount++; + eventPath = eventPath->mpNext; + } + ReturnErrorCodeIf((eventPathCount * sizeof(EventPathParamsValues)) > UINT16_MAX, CHIP_ERROR_NO_MEMORY); + mEventPaths.Calloc(eventPathCount); + ReturnErrorCodeIf(mEventPaths.Get() == nullptr, CHIP_ERROR_NO_MEMORY); + eventPath = pEventPathList; + for (size_t i = 0; i < eventPathCount; i++) + { + mEventPaths[i].SetValues(eventPath->mValue); + eventPath = eventPath->mpNext; + } + return CHIP_NO_ERROR; + } + }; + + using SubscriptionInfoIterator = CommonIterator; + + virtual ~SubscriptionResumptionStorage(){}; + + /** + * Iterate through persisted subscriptions + * + * @return A valid iterator on success. Use CommonIterator accessor to retrieve SubscriptionInfo + */ + virtual SubscriptionInfoIterator * IterateSubscriptions() = 0; + + /** + * Save subscription resumption information to storage. + * + * @param subscriptionInfo the subscription information to save - caller should expect the passed in value is consumed + */ + virtual CHIP_ERROR Save(SubscriptionInfo & subscriptionInfo) = 0; + + /** + * Delete subscription resumption information by node ID, fabric index, and subscription ID. + */ + virtual CHIP_ERROR Delete(NodeId nodeId, FabricIndex fabricIndex, SubscriptionId subscriptionId) = 0; + + /** + * Remove all subscription resumption information associated with the specified + * fabric index. If no entries for the fabric index exist, this is a no-op + * and is considered successful. + * + * @param fabricIndex the index of the fabric for which to remove subscription resumption information + */ + virtual CHIP_ERROR DeleteAll(FabricIndex fabricIndex) = 0; +}; +} // namespace app +} // namespace chip diff --git a/src/app/server/Server.cpp b/src/app/server/Server.cpp index b6c0d571d9e727..d7f76abe19a0aa 100644 --- a/src/app/server/Server.cpp +++ b/src/app/server/Server.cpp @@ -126,10 +126,11 @@ CHIP_ERROR Server::Init(const ServerInitParams & initParams) chip::Platform::MemoryInit(); // Initialize PersistentStorageDelegate-based storage - mDeviceStorage = initParams.persistentStorageDelegate; - mSessionResumptionStorage = initParams.sessionResumptionStorage; - mOperationalKeystore = initParams.operationalKeystore; - mOpCertStore = initParams.opCertStore; + mDeviceStorage = initParams.persistentStorageDelegate; + mSessionResumptionStorage = initParams.sessionResumptionStorage; + mSubscriptionResumptionStorage = initParams.subscriptionResumptionStorage; + mOperationalKeystore = initParams.operationalKeystore; + mOpCertStore = initParams.opCertStore; mCertificateValidityPolicy = initParams.certificateValidityPolicy; @@ -304,7 +305,8 @@ CHIP_ERROR Server::Init(const ServerInitParams & initParams) mCertificateValidityPolicy, mGroupsProvider); SuccessOrExit(err); - err = chip::app::InteractionModelEngine::GetInstance()->Init(&mExchangeMgr, &GetFabricTable(), &mCASESessionManager); + err = chip::app::InteractionModelEngine::GetInstance()->Init(&mExchangeMgr, &GetFabricTable(), &mCASESessionManager, + mSubscriptionResumptionStorage); SuccessOrExit(err); // This code is necessary to restart listening to existing groups after a reboot @@ -351,6 +353,10 @@ CHIP_ERROR Server::Init(const ServerInitParams & initParams) } } +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + ResumeSubscriptions(); +#endif + PlatformMgr().HandleServerStarted(); exit: @@ -475,4 +481,15 @@ CHIP_ERROR Server::SendUserDirectedCommissioningRequest(chip::Transport::PeerAdd } #endif // CHIP_DEVICE_CONFIG_ENABLE_COMMISSIONER_DISCOVERY_CLIENT +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS +void Server::ResumeSubscriptions() +{ + CHIP_ERROR err = chip::app::InteractionModelEngine::GetInstance()->ResumeSubscriptions(); + if (err != CHIP_NO_ERROR) + { + ChipLogError(AppServer, "Error when trying to resume subscriptions : %" CHIP_ERROR_FORMAT, err.Format()); + } +} +#endif + } // namespace chip diff --git a/src/app/server/Server.h b/src/app/server/Server.h index 74a5aaedb29f4c..6fcf79f01bf2af 100644 --- a/src/app/server/Server.h +++ b/src/app/server/Server.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -105,6 +106,9 @@ struct ServerInitParams // Session resumption storage: Optional. Support session resumption when provided. // Must be initialized before being provided. SessionResumptionStorage * sessionResumptionStorage = nullptr; + // Session resumption storage: Optional. Support session resumption when provided. + // Must be initialized before being provided. + app::SubscriptionResumptionStorage * subscriptionResumptionStorage = nullptr; // Certificate validity policy: Optional. If none is injected, CHIPCert // enforces a default policy. Credentials::CertificateValidityPolicy * certificateValidityPolicy = nullptr; @@ -220,6 +224,9 @@ struct CommonCaseDeviceServerInitParams : public ServerInitParams #if CHIP_CONFIG_ENABLE_SESSION_RESUMPTION static chip::SimpleSessionResumptionStorage sSessionResumptionStorage; +#endif +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + static chip::app::SimpleSubscriptionResumptionStorage sSubscriptionResumptionStorage; #endif static chip::app::DefaultAclStorage sAclStorage; @@ -273,6 +280,14 @@ struct CommonCaseDeviceServerInitParams : public ServerInitParams // embedded systems. this->certificateValidityPolicy = &sDefaultCertValidityPolicy; +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + ChipLogProgress(AppServer, "Initializing subscription resumption storage..."); + ReturnErrorOnFailure(sSubscriptionResumptionStorage.Init(this->persistentStorageDelegate)); + this->subscriptionResumptionStorage = &sSubscriptionResumptionStorage; +#else + ChipLogProgress(AppServer, "Subscription persistence not supported"); +#endif + return CHIP_NO_ERROR; } }; @@ -320,6 +335,8 @@ class Server SessionResumptionStorage * GetSessionResumptionStorage() { return mSessionResumptionStorage; } + app::SubscriptionResumptionStorage * GetSubscriptionResumptionStorage() { return mSubscriptionResumptionStorage; } + TransportMgrBase & GetTransportManager() { return mTransports; } Credentials::GroupDataProvider * GetGroupDataProvider() { return mGroupsProvider; } @@ -361,6 +378,13 @@ class Server void InitFailSafe(); +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + /** + * @brief Called at Server::Init time to resume persisted subscriptions if the feature flag is enabled + */ + void ResumeSubscriptions(); +#endif + class GroupDataProviderListener final : public Credentials::GroupDataProvider::GroupListener { public: @@ -424,6 +448,7 @@ class Server { (void) fabricTable; ClearCASEResumptionStateOnFabricChange(fabricIndex); + ClearSubscriptionResumptionStateOnFabricChange(fabricIndex); Credentials::GroupDataProvider * groupDataProvider = mServer->GetGroupDataProvider(); if (groupDataProvider != nullptr) @@ -479,6 +504,19 @@ class Server } } + void ClearSubscriptionResumptionStateOnFabricChange(chip::FabricIndex fabricIndex) + { + auto * subscriptionResumptionStorage = mServer->GetSubscriptionResumptionStorage(); + VerifyOrReturn(subscriptionResumptionStorage != nullptr); + CHIP_ERROR err = subscriptionResumptionStorage->DeleteAll(fabricIndex); + if (err != CHIP_NO_ERROR) + { + ChipLogError(AppServer, + "Warning, failed to delete subscription resumption state for fabric index 0x%x: %" CHIP_ERROR_FORMAT, + static_cast(fabricIndex), err.Format()); + } + } + Server * mServer = nullptr; }; @@ -505,6 +543,7 @@ class Server PersistentStorageDelegate * mDeviceStorage; SessionResumptionStorage * mSessionResumptionStorage; + app::SubscriptionResumptionStorage * mSubscriptionResumptionStorage; Credentials::CertificateValidityPolicy * mCertificateValidityPolicy; Credentials::GroupDataProvider * mGroupsProvider; app::DefaultAttributePersistenceProvider mAttributePersister; diff --git a/src/app/tests/BUILD.gn b/src/app/tests/BUILD.gn index 6afc8f8e2615fc..b2f023b3eb2db0 100644 --- a/src/app/tests/BUILD.gn +++ b/src/app/tests/BUILD.gn @@ -105,6 +105,7 @@ chip_test_suite("tests") { "TestPendingNotificationMap.cpp", "TestReadInteraction.cpp", "TestReportingEngine.cpp", + "TestSimpleSubscriptionResumptionStorage.cpp", "TestStatusIB.cpp", "TestStatusResponseMessage.cpp", "TestTimedHandler.cpp", diff --git a/src/app/tests/TestSimpleSubscriptionResumptionStorage.cpp b/src/app/tests/TestSimpleSubscriptionResumptionStorage.cpp new file mode 100644 index 00000000000000..67b1f0757d7b14 --- /dev/null +++ b/src/app/tests/TestSimpleSubscriptionResumptionStorage.cpp @@ -0,0 +1,482 @@ +/* + * Copyright (c) 2021 Project CHIP Authors + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + +#include +#include + +#include + +class SimpleSubscriptionResumptionStorageTest : public chip::app::SimpleSubscriptionResumptionStorage +{ +public: + CHIP_ERROR TestSave(chip::TLV::TLVWriter & writer, + chip::app::SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo) + { + return Save(writer, subscriptionInfo); + } + static constexpr size_t TestMaxSubscriptionSize() { return MaxSubscriptionSize(); } +}; + +struct TestSubscriptionInfo : public chip::app::SubscriptionResumptionStorage::SubscriptionInfo +{ + bool operator==(const SubscriptionInfo & that) const + { + if ((mNodeId != that.mNodeId) || (mFabricIndex != that.mFabricIndex) || (mSubscriptionId != that.mSubscriptionId) || + (mMinInterval != that.mMinInterval) || (mMaxInterval != that.mMaxInterval) || (mFabricFiltered != that.mFabricFiltered)) + { + return false; + } + if ((mAttributePaths.AllocatedSize() != that.mAttributePaths.AllocatedSize()) || + (mEventPaths.AllocatedSize() != that.mEventPaths.AllocatedSize())) + { + return false; + } + for (size_t i = 0; i < mAttributePaths.AllocatedSize(); i++) + { + if ((mAttributePaths[i].mEndpointId != that.mAttributePaths[i].mEndpointId) || + (mAttributePaths[i].mClusterId != that.mAttributePaths[i].mClusterId) || + (mAttributePaths[i].mAttributeId != that.mAttributePaths[i].mAttributeId)) + { + return false; + } + } + for (size_t i = 0; i < mEventPaths.AllocatedSize(); i++) + { + if ((mEventPaths[i].mEndpointId != that.mEventPaths[i].mEndpointId) || + (mEventPaths[i].mClusterId != that.mEventPaths[i].mClusterId) || + (mEventPaths[i].mEventId != that.mEventPaths[i].mEventId) || + (mEventPaths[i].mIsUrgentEvent != that.mEventPaths[i].mIsUrgentEvent)) + { + return false; + } + } + return true; + } +}; + +void TestSubscriptionCount(nlTestSuite * inSuite, void * inContext) +{ + chip::TestPersistentStorageDelegate storage; + SimpleSubscriptionResumptionStorageTest subscriptionStorage; + subscriptionStorage.Init(&storage); + + // Write some subscriptions and see the counts are correct + chip::app::SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo = { .mNodeId = 6666, .mFabricIndex = 46 }; + + for (size_t i = 0; i < (CHIP_IM_MAX_NUM_SUBSCRIPTIONS / 2); i++) + { + subscriptionInfo.mSubscriptionId = static_cast(i); + subscriptionStorage.Save(subscriptionInfo); + } + + // Make sure iterator counts correctly + auto * iterator = subscriptionStorage.IterateSubscriptions(); + NL_TEST_ASSERT(inSuite, iterator->Count() == (CHIP_IM_MAX_NUM_SUBSCRIPTIONS / 2)); + + // Verify subscriptions manually count correctly + size_t count = 0; + while (iterator->Next(subscriptionInfo)) + { + count++; + } + iterator->Release(); + NL_TEST_ASSERT(inSuite, count == (CHIP_IM_MAX_NUM_SUBSCRIPTIONS / 2)); + + // Delete all and verify iterator counts 0 + CHIP_ERROR err = subscriptionStorage.DeleteAll(46); + NL_TEST_ASSERT(inSuite, err == CHIP_NO_ERROR); + iterator = subscriptionStorage.IterateSubscriptions(); + NL_TEST_ASSERT(inSuite, iterator->Count() == 0); + + // Verify subscriptions manually count correctly + count = 0; + while (iterator->Next(subscriptionInfo)) + { + count++; + } + iterator->Release(); + NL_TEST_ASSERT(inSuite, count == 0); +} + +void TestSubscriptionMaxCount(nlTestSuite * inSuite, void * inContext) +{ + // Force large MacCount value and check that Init resets it properly, and deletes extra subs: + + chip::TestPersistentStorageDelegate storage; + SimpleSubscriptionResumptionStorageTest subscriptionStorage; + + // First set a large MaxCount before Init + uint16_t countMaxToSave = 2 * CHIP_IM_MAX_NUM_SUBSCRIPTIONS; + CHIP_ERROR err = storage.SyncSetKeyValue(chip::DefaultStorageKeyAllocator::SubscriptionResumptionMaxCount().KeyName(), + &countMaxToSave, sizeof(uint16_t)); + NL_TEST_ASSERT(inSuite, err == CHIP_NO_ERROR); + + // Then write something beyond CHIP_IM_MAX_NUM_SUBSCRIPTIONS + chip::Platform::ScopedMemoryBuffer junkBytes; + junkBytes.Calloc(subscriptionStorage.TestMaxSubscriptionSize() / 2); + NL_TEST_ASSERT(inSuite, junkBytes.Get() != nullptr); + NL_TEST_ASSERT(inSuite, + storage.SyncSetKeyValue( + chip::DefaultStorageKeyAllocator::SubscriptionResumption(CHIP_IM_MAX_NUM_SUBSCRIPTIONS + 1).KeyName(), + junkBytes.Get(), static_cast(subscriptionStorage.TestMaxSubscriptionSize() / 2)) == CHIP_NO_ERROR); + + subscriptionStorage.Init(&storage); + + // First check the MaxCount is reset to CHIP_IM_MAX_NUM_SUBSCRIPTIONS + uint16_t countMax = 0; + uint16_t len = sizeof(countMax); + err = storage.SyncGetKeyValue(chip::DefaultStorageKeyAllocator::SubscriptionResumptionMaxCount().KeyName(), &countMax, len); + NL_TEST_ASSERT(inSuite, err == CHIP_NO_ERROR); + NL_TEST_ASSERT(inSuite, countMax == CHIP_IM_MAX_NUM_SUBSCRIPTIONS); + + // Then check the fake sub is no more + NL_TEST_ASSERT(inSuite, + !storage.SyncDoesKeyExist( + chip::DefaultStorageKeyAllocator::SubscriptionResumption(CHIP_IM_MAX_NUM_SUBSCRIPTIONS + 1).KeyName())); +} + +void TestSubscriptionState(nlTestSuite * inSuite, void * inContext) +{ + chip::TestPersistentStorageDelegate storage; + SimpleSubscriptionResumptionStorageTest subscriptionStorage; + subscriptionStorage.Init(&storage); + + chip::app::SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo1 = { + .mNodeId = 1111, + .mFabricIndex = 41, + .mSubscriptionId = 1, + .mMinInterval = 1, + .mMaxInterval = 11, + .mFabricFiltered = true, + }; + subscriptionInfo1.mAttributePaths.Calloc(2); + subscriptionInfo1.mAttributePaths[0].mEndpointId = 1; + subscriptionInfo1.mAttributePaths[0].mClusterId = 1; + subscriptionInfo1.mAttributePaths[0].mAttributeId = 1; + subscriptionInfo1.mAttributePaths[1].mEndpointId = 2; + subscriptionInfo1.mAttributePaths[1].mClusterId = 2; + subscriptionInfo1.mAttributePaths[1].mAttributeId = 2; + + chip::app::SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo2 = { + .mNodeId = 2222, + .mFabricIndex = 42, + .mSubscriptionId = 2, + .mMinInterval = 2, + .mMaxInterval = 12, + .mFabricFiltered = false, + }; + subscriptionInfo2.mEventPaths.Calloc(2); + subscriptionInfo2.mEventPaths[0].mEndpointId = 3; + subscriptionInfo2.mEventPaths[0].mClusterId = 3; + subscriptionInfo2.mEventPaths[0].mEventId = 3; + subscriptionInfo2.mEventPaths[0].mIsUrgentEvent = false; + subscriptionInfo2.mEventPaths[1].mEndpointId = 4; + subscriptionInfo2.mEventPaths[1].mClusterId = 4; + subscriptionInfo2.mEventPaths[1].mEventId = 4; + subscriptionInfo2.mEventPaths[1].mIsUrgentEvent = true; + + chip::app::SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo3 = { + .mNodeId = 3333, + .mFabricIndex = 43, + .mSubscriptionId = 3, + .mMinInterval = 3, + .mMaxInterval = 13, + .mFabricFiltered = true, + }; + subscriptionInfo3.mAttributePaths.Calloc(2); + subscriptionInfo3.mAttributePaths[0].mEndpointId = 5; + subscriptionInfo3.mAttributePaths[0].mClusterId = 5; + subscriptionInfo3.mAttributePaths[0].mAttributeId = 5; + subscriptionInfo3.mAttributePaths[1].mEndpointId = 6; + subscriptionInfo3.mAttributePaths[1].mClusterId = 6; + subscriptionInfo3.mAttributePaths[1].mAttributeId = 6; + subscriptionInfo3.mEventPaths.Calloc(2); + subscriptionInfo3.mEventPaths[0].mEndpointId = 7; + subscriptionInfo3.mEventPaths[0].mClusterId = 7; + subscriptionInfo3.mEventPaths[0].mEventId = 7; + subscriptionInfo2.mEventPaths[0].mIsUrgentEvent = true; + subscriptionInfo3.mEventPaths[1].mEndpointId = 8; + subscriptionInfo3.mEventPaths[1].mClusterId = 8; + subscriptionInfo3.mEventPaths[1].mEventId = 8; + subscriptionInfo2.mEventPaths[1].mIsUrgentEvent = false; + + CHIP_ERROR err; + err = subscriptionStorage.Save(subscriptionInfo1); + NL_TEST_ASSERT(inSuite, err == CHIP_NO_ERROR); + err = subscriptionStorage.Save(subscriptionInfo2); + NL_TEST_ASSERT(inSuite, err == CHIP_NO_ERROR); + err = subscriptionStorage.Save(subscriptionInfo3); + NL_TEST_ASSERT(inSuite, err == CHIP_NO_ERROR); + + auto * iterator = subscriptionStorage.IterateSubscriptions(); + NL_TEST_ASSERT(inSuite, iterator->Count() == 3); + + // Verify subscriptions manually count correctly + TestSubscriptionInfo subscriptionInfo; + NL_TEST_ASSERT(inSuite, iterator->Next(subscriptionInfo)); + NL_TEST_ASSERT(inSuite, subscriptionInfo == subscriptionInfo1); + NL_TEST_ASSERT(inSuite, iterator->Next(subscriptionInfo)); + NL_TEST_ASSERT(inSuite, subscriptionInfo == subscriptionInfo2); + NL_TEST_ASSERT(inSuite, iterator->Next(subscriptionInfo)); + NL_TEST_ASSERT(inSuite, subscriptionInfo == subscriptionInfo3); + // Verify at end of list + NL_TEST_ASSERT(inSuite, !iterator->Next(subscriptionInfo)); + iterator->Release(); + + // Delete fabric 1 and subscription 2 and check only 3 remains. + subscriptionStorage.Delete(subscriptionInfo1.mNodeId, subscriptionInfo1.mFabricIndex, subscriptionInfo1.mSubscriptionId); + subscriptionStorage.DeleteAll(subscriptionInfo2.mFabricIndex); + + iterator = subscriptionStorage.IterateSubscriptions(); + NL_TEST_ASSERT(inSuite, iterator->Count() == 1); + NL_TEST_ASSERT(inSuite, iterator->Next(subscriptionInfo)); + NL_TEST_ASSERT(inSuite, subscriptionInfo == subscriptionInfo3); + // Verify at end of list + NL_TEST_ASSERT(inSuite, !iterator->Next(subscriptionInfo)); + iterator->Release(); + + // Delete 3 also, and see that both count is 0 and MaxCount is removed from storage + subscriptionStorage.DeleteAll(subscriptionInfo3.mFabricIndex); + iterator = subscriptionStorage.IterateSubscriptions(); + NL_TEST_ASSERT(inSuite, iterator->Count() == 0); + NL_TEST_ASSERT(inSuite, !iterator->Next(subscriptionInfo)); + iterator->Release(); + + uint16_t countMax = 0; + uint16_t len = sizeof(countMax); + err = storage.SyncGetKeyValue(chip::DefaultStorageKeyAllocator::SubscriptionResumptionMaxCount().KeyName(), &countMax, len); + NL_TEST_ASSERT(inSuite, err == CHIP_ERROR_PERSISTED_STORAGE_VALUE_NOT_FOUND); +} + +static constexpr chip::TLV::Tag kTestValue1Tag = chip::TLV::ContextTag(30); +static constexpr chip::TLV::Tag kTestValue2Tag = chip::TLV::ContextTag(31); + +void TestSubscriptionStateUnexpectedFields(nlTestSuite * inSuite, void * inContext) +{ + chip::TestPersistentStorageDelegate storage; + SimpleSubscriptionResumptionStorageTest subscriptionStorage; + subscriptionStorage.Init(&storage); + + // Write additional entries at the end of TLV and see it still loads correctly + chip::app::SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo1 = { + .mNodeId = 4444, + .mFabricIndex = 44, + .mSubscriptionId = 4, + .mMinInterval = 4, + .mMaxInterval = 14, + .mFabricFiltered = true, + }; + subscriptionInfo1.mAttributePaths.Calloc(2); + subscriptionInfo1.mAttributePaths[0].mEndpointId = 9; + subscriptionInfo1.mAttributePaths[0].mClusterId = 9; + subscriptionInfo1.mAttributePaths[0].mAttributeId = 9; + subscriptionInfo1.mAttributePaths[1].mEndpointId = 10; + subscriptionInfo1.mAttributePaths[1].mClusterId = 10; + subscriptionInfo1.mAttributePaths[1].mAttributeId = 10; + + chip::Platform::ScopedMemoryBuffer backingBuffer; + backingBuffer.Calloc(subscriptionStorage.TestMaxSubscriptionSize()); + NL_TEST_ASSERT(inSuite, backingBuffer.Get() != nullptr); + chip::TLV::ScopedBufferTLVWriter writer(std::move(backingBuffer), subscriptionStorage.TestMaxSubscriptionSize()); + + NL_TEST_ASSERT(inSuite, subscriptionStorage.TestSave(writer, subscriptionInfo1) == CHIP_NO_ERROR); + + // Additional stuff + chip::TLV::TLVType containerType; + NL_TEST_ASSERT(inSuite, + writer.StartContainer(chip::TLV::AnonymousTag(), chip::TLV::kTLVType_Structure, containerType) == CHIP_NO_ERROR); + uint32_t value1 = 1; + uint32_t value2 = 2; + NL_TEST_ASSERT(inSuite, writer.Put(kTestValue1Tag, value1) == CHIP_NO_ERROR); + NL_TEST_ASSERT(inSuite, writer.Put(kTestValue2Tag, value2) == CHIP_NO_ERROR); + NL_TEST_ASSERT(inSuite, writer.EndContainer(containerType) == CHIP_NO_ERROR); + + const auto len = writer.GetLengthWritten(); + + writer.Finalize(backingBuffer); + + NL_TEST_ASSERT(inSuite, + storage.SyncSetKeyValue(chip::DefaultStorageKeyAllocator::SubscriptionResumption(0).KeyName(), + backingBuffer.Get(), static_cast(len)) == CHIP_NO_ERROR); + + // Now read back and verify + auto * iterator = subscriptionStorage.IterateSubscriptions(); + NL_TEST_ASSERT(inSuite, iterator->Count() == 1); + TestSubscriptionInfo subscriptionInfo; + NL_TEST_ASSERT(inSuite, iterator->Next(subscriptionInfo)); + NL_TEST_ASSERT(inSuite, subscriptionInfo == subscriptionInfo1); + iterator->Release(); +} + +void TestSubscriptionStateTooBigToLoad(nlTestSuite * inSuite, void * inContext) +{ + chip::TestPersistentStorageDelegate storage; + SimpleSubscriptionResumptionStorageTest subscriptionStorage; + subscriptionStorage.Init(&storage); + + // Write additional too-big data at the end of TLV and see it fails to loads and entry deleted + chip::app::SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo1 = { + .mNodeId = 5555, + .mFabricIndex = 45, + .mSubscriptionId = 5, + .mMinInterval = 5, + .mMaxInterval = 15, + .mFabricFiltered = false, + }; + subscriptionInfo1.mAttributePaths.Calloc(2); + subscriptionInfo1.mAttributePaths[0].mEndpointId = 11; + subscriptionInfo1.mAttributePaths[0].mClusterId = 11; + subscriptionInfo1.mAttributePaths[0].mAttributeId = 11; + subscriptionInfo1.mAttributePaths[1].mEndpointId = 12; + subscriptionInfo1.mAttributePaths[1].mClusterId = 12; + subscriptionInfo1.mAttributePaths[1].mAttributeId = 12; + + chip::Platform::ScopedMemoryBuffer backingBuffer; + backingBuffer.Calloc(subscriptionStorage.TestMaxSubscriptionSize() * 2); + NL_TEST_ASSERT(inSuite, backingBuffer.Get() != nullptr); + chip::TLV::ScopedBufferTLVWriter writer(std::move(backingBuffer), subscriptionStorage.TestMaxSubscriptionSize() * 2); + + NL_TEST_ASSERT(inSuite, subscriptionStorage.TestSave(writer, subscriptionInfo1) == CHIP_NO_ERROR); + + // Additional too-many bytes + chip::TLV::TLVType containerType; + NL_TEST_ASSERT(inSuite, + writer.StartContainer(chip::TLV::AnonymousTag(), chip::TLV::kTLVType_Structure, containerType) == CHIP_NO_ERROR); + // Write MaxSubscriptionSize() to guarantee Load failure + chip::Platform::ScopedMemoryBuffer additionalBytes; + additionalBytes.Calloc(subscriptionStorage.TestMaxSubscriptionSize()); + NL_TEST_ASSERT(inSuite, additionalBytes.Get() != nullptr); + NL_TEST_ASSERT(inSuite, + writer.PutBytes(kTestValue1Tag, additionalBytes.Get(), + static_cast(subscriptionStorage.TestMaxSubscriptionSize())) == CHIP_NO_ERROR); + NL_TEST_ASSERT(inSuite, writer.EndContainer(containerType) == CHIP_NO_ERROR); + + const auto len = writer.GetLengthWritten(); + + writer.Finalize(backingBuffer); + + NL_TEST_ASSERT(inSuite, + storage.SyncSetKeyValue(chip::DefaultStorageKeyAllocator::SubscriptionResumption(0).KeyName(), + backingBuffer.Get(), static_cast(len)) == CHIP_NO_ERROR); + + // Now read back and verify + auto * iterator = subscriptionStorage.IterateSubscriptions(); + NL_TEST_ASSERT(inSuite, iterator->Count() == 1); + TestSubscriptionInfo subscriptionInfo; + NL_TEST_ASSERT(inSuite, !iterator->Next(subscriptionInfo)); + NL_TEST_ASSERT(inSuite, iterator->Count() == 0); + iterator->Release(); +} + +void TestSubscriptionStateJunkData(nlTestSuite * inSuite, void * inContext) +{ + chip::TestPersistentStorageDelegate storage; + SimpleSubscriptionResumptionStorageTest subscriptionStorage; + subscriptionStorage.Init(&storage); + + chip::Platform::ScopedMemoryBuffer junkBytes; + junkBytes.Calloc(subscriptionStorage.TestMaxSubscriptionSize() / 2); + NL_TEST_ASSERT(inSuite, junkBytes.Get() != nullptr); + NL_TEST_ASSERT(inSuite, + storage.SyncSetKeyValue(chip::DefaultStorageKeyAllocator::SubscriptionResumption(0).KeyName(), junkBytes.Get(), + static_cast(subscriptionStorage.TestMaxSubscriptionSize() / 2)) == + CHIP_NO_ERROR); + + // Now read back and verify + auto * iterator = subscriptionStorage.IterateSubscriptions(); + NL_TEST_ASSERT(inSuite, iterator->Count() == 1); + TestSubscriptionInfo subscriptionInfo; + NL_TEST_ASSERT(inSuite, !iterator->Next(subscriptionInfo)); + NL_TEST_ASSERT(inSuite, iterator->Count() == 0); + iterator->Release(); +} +/** + * Set up the test suite. + */ +int TestSubscription_Setup(void * inContext) +{ + VerifyOrReturnError(CHIP_NO_ERROR == chip::Platform::MemoryInit(), FAILURE); + + return SUCCESS; +} + +/** + * Tear down the test suite. + */ +int TestSubscription_Teardown(void * inContext) +{ + chip::Platform::MemoryShutdown(); + return SUCCESS; +} + +// Test Suite + +/** + * Test Suite that lists all the test functions. + */ +// clang-format off +static const nlTest sTests[] = +{ + NL_TEST_DEF("TestSubscriptionCount", TestSubscriptionCount), + NL_TEST_DEF("TestSubscriptionMaxCount", TestSubscriptionMaxCount), + NL_TEST_DEF("TestSubscriptionState", TestSubscriptionState), + NL_TEST_DEF("TestSubscriptionStateUnexpectedFields", TestSubscriptionStateUnexpectedFields), + NL_TEST_DEF("TestSubscriptionStateTooBigToLoad", TestSubscriptionStateTooBigToLoad), + NL_TEST_DEF("TestSubscriptionStateJunkData", TestSubscriptionStateJunkData), + + NL_TEST_SENTINEL() +}; +// clang-format on + +// clang-format off +static nlTestSuite sSuite = +{ + "Test-CHIP-SimpleSubscriptionResumptionStorage", + &sTests[0], + &TestSubscription_Setup, &TestSubscription_Teardown +}; +// clang-format on + +/** + * Main + */ +int TestSimpleSubscriptionResumptionStorage() +{ + // Run test suit against one context + nlTestRunner(&sSuite, nullptr); + + return (nlTestRunnerStats(&sSuite)); +} + +CHIP_REGISTER_TEST_SUITE(TestSimpleSubscriptionResumptionStorage) + +#else // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + +/** + * Main + */ +int TestSimpleSubscriptionResumptionStorage() +{ + return 0; +} + +#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS diff --git a/src/darwin/Framework/CHIP/MTRDevice.mm b/src/darwin/Framework/CHIP/MTRDevice.mm index f4455ce222993f..3583698a511f6c 100644 --- a/src/darwin/Framework/CHIP/MTRDevice.mm +++ b/src/darwin/Framework/CHIP/MTRDevice.mm @@ -312,7 +312,7 @@ - (void)_handleUnsolicitedMessageFromPublisher }); } - // in case this is called dyring exponential back off of subscription + // in case this is called during exponential back off of subscription // reestablishment, this starts the attempt right away [self _setupSubscription]; diff --git a/src/lib/core/CHIPConfig.h b/src/lib/core/CHIPConfig.h index 18966b90775c0d..d8ed3429d7ce6f 100644 --- a/src/lib/core/CHIPConfig.h +++ b/src/lib/core/CHIPConfig.h @@ -1343,3 +1343,25 @@ extern const char CHIP_NON_PRODUCTION_MARKER[]; /** * @} */ + +/** + * @def CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + * + * @brief + * Enable persistence and resumption of subscriptions on servers. + * + */ +#ifndef CHIP_CONFIG_PERSIST_SUBSCRIPTIONS +#define CHIP_CONFIG_PERSIST_SUBSCRIPTIONS 0 +#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS + +/** + * @def CHIP_CONFIG_MAX_SUBSCRIPTION_RESUMPTION_STORAGE_CONCURRENT_ITERATORS + * + * @brief Defines the number of simultaneous subscription resumption iterators that can be allocated + * + * Number of iterator instances that can be allocated at any one time + */ +#ifndef CHIP_CONFIG_MAX_SUBSCRIPTION_RESUMPTION_STORAGE_CONCURRENT_ITERATORS +#define CHIP_CONFIG_MAX_SUBSCRIPTION_RESUMPTION_STORAGE_CONCURRENT_ITERATORS 2 +#endif diff --git a/src/lib/support/DefaultStorageKeyAllocator.h b/src/lib/support/DefaultStorageKeyAllocator.h index d74a0e75bcbba5..c5a612db7fe8ea 100644 --- a/src/lib/support/DefaultStorageKeyAllocator.h +++ b/src/lib/support/DefaultStorageKeyAllocator.h @@ -190,6 +190,13 @@ class DefaultStorageKeyAllocator // Event number counter. static StorageKeyName IMEventNumber() { return StorageKeyName::FromConst("g/im/ec"); } + + // Subscription resumption + static StorageKeyName SubscriptionResumption(size_t index) + { + return StorageKeyName::Formatted("g/su/%x", static_cast(index)); + } + static StorageKeyName SubscriptionResumptionMaxCount() { return StorageKeyName::Formatted("g/sum"); } }; } // namespace chip diff --git a/src/lib/support/ScopedBuffer.h b/src/lib/support/ScopedBuffer.h index f01816de1cde50..ffba6c08478839 100644 --- a/src/lib/support/ScopedBuffer.h +++ b/src/lib/support/ScopedBuffer.h @@ -153,9 +153,9 @@ class ScopedMemoryBuffer : public Impl::ScopedMemoryBufferBase return *this; } - ScopedMemoryBuffer & Alloc(size_t size) + ScopedMemoryBuffer & Alloc(size_t elementCount) { - Base::Alloc(size * sizeof(T)); + Base::Alloc(elementCount * sizeof(T)); return *this; } }; @@ -177,28 +177,28 @@ class ScopedMemoryBufferWithSize : public ScopedMemoryBuffer { if (this != &other) { - mSize = other.mSize; - other.mSize = 0; + mCount = other.mCount; + other.mCount = 0; } ScopedMemoryBuffer::operator=(std::move(other)); return *this; } - ~ScopedMemoryBufferWithSize() { mSize = 0; } + ~ScopedMemoryBufferWithSize() { mCount = 0; } - // return the size in bytes - inline size_t AllocatedSize() const { return mSize; } + // return the size as count of elements + inline size_t AllocatedSize() const { return mCount; } void Free() { - mSize = 0; + mCount = 0; ScopedMemoryBuffer::Free(); } T * Release() { T * buffer = ScopedMemoryBuffer::Release(); - mSize = 0; + mCount = 0; return buffer; } @@ -207,23 +207,23 @@ class ScopedMemoryBufferWithSize : public ScopedMemoryBuffer ScopedMemoryBuffer::Calloc(elementCount); if (this->Get() != nullptr) { - mSize = elementCount * sizeof(T); + mCount = elementCount; } return *this; } - ScopedMemoryBufferWithSize & Alloc(size_t size) + ScopedMemoryBufferWithSize & Alloc(size_t elementCount) { - ScopedMemoryBuffer::Alloc(size); + ScopedMemoryBuffer::Alloc(elementCount); if (this->Get() != nullptr) { - mSize = size * sizeof(T); + mCount = elementCount; } return *this; } private: - size_t mSize = 0; + size_t mCount = 0; }; } // namespace Platform diff --git a/src/platform/Darwin/CHIPPlatformConfig.h b/src/platform/Darwin/CHIPPlatformConfig.h index e3e1a99aebe5a1..892812acd89fd6 100644 --- a/src/platform/Darwin/CHIPPlatformConfig.h +++ b/src/platform/Darwin/CHIPPlatformConfig.h @@ -62,3 +62,6 @@ #ifndef CHIP_CONFIG_KVS_PATH #define CHIP_CONFIG_KVS_PATH "/tmp/chip_kvs" #endif // CHIP_CONFIG_KVS_PATH + +// Enable subscription persistence and resumption for CI +#define CHIP_CONFIG_PERSIST_SUBSCRIPTIONS 1 diff --git a/src/platform/Linux/CHIPPlatformConfig.h b/src/platform/Linux/CHIPPlatformConfig.h index 788fe9b80c75d9..0e36ed76144cd2 100644 --- a/src/platform/Linux/CHIPPlatformConfig.h +++ b/src/platform/Linux/CHIPPlatformConfig.h @@ -64,6 +64,9 @@ using CHIP_CONFIG_PERSISTED_STORAGE_KEY_TYPE = const char *; #define CHIP_CONFIG_BDX_MAX_NUM_TRANSFERS 1 #endif // CHIP_CONFIG_BDX_MAX_NUM_TRANSFERS +// Enable subscription persistence and resumption for CI +#define CHIP_CONFIG_PERSIST_SUBSCRIPTIONS 1 + // ==================== Security Configuration Overrides ==================== #ifndef CHIP_CONFIG_KVS_PATH diff --git a/src/platform/cc13x2_26x2/CHIPPlatformConfig.h b/src/platform/cc13x2_26x2/CHIPPlatformConfig.h index 2a5981d4bbdea6..b60ebd2db042e9 100644 --- a/src/platform/cc13x2_26x2/CHIPPlatformConfig.h +++ b/src/platform/cc13x2_26x2/CHIPPlatformConfig.h @@ -68,3 +68,5 @@ #ifndef CHIP_CONFIG_MAX_FABRICS #define CHIP_CONFIG_MAX_FABRICS 5 #endif + +#define CHIP_CONFIG_PERSIST_SUBSCRIPTIONS 0