From 190b4913f91b2b7229d716807c33a8d90d35bd3a Mon Sep 17 00:00:00 2001 From: Yunhan Wang Date: Mon, 27 Dec 2021 11:33:27 -0800 Subject: [PATCH] Add initial resubscribe capability --- src/app/AttributeCache.h | 6 + src/app/BufferedReadCallback.h | 5 + src/app/ReadClient.cpp | 145 ++++++++++++++++-- src/app/ReadClient.h | 32 +++- src/app/ReadHandler.cpp | 5 + src/app/ReadPrepareParams.h | 15 +- src/app/tests/TestReadInteraction.cpp | 18 ++- .../tests/integration/chip_im_initiator.cpp | 22 ++- src/controller/ReadInteraction.h | 43 +++++- src/controller/TypedReadCallback.h | 24 +++ .../python/chip/clusters/attribute.cpp | 37 +++-- src/controller/tests/data_model/TestRead.cpp | 3 +- src/lib/core/CHIPConfig.h | 58 +++++++ src/lib/support/FibonacciUtils.h | 2 +- 14 files changed, 378 insertions(+), 37 deletions(-) diff --git a/src/app/AttributeCache.h b/src/app/AttributeCache.h index 37fb41ba759e04..d0175baafc4cc7 100644 --- a/src/app/AttributeCache.h +++ b/src/app/AttributeCache.h @@ -349,9 +349,15 @@ class AttributeCache : protected ReadClient::Callback return mCallback.OnEventData(aEventHeader, apData, apStatus); } + void OnDone() override { return mCallback.OnDone(); } void OnSubscriptionEstablished(uint64_t aSubscriptionId) override { mCallback.OnSubscriptionEstablished(aSubscriptionId); } + void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override + { + return mCallback.OnDeallocatePaths(std::move(aReadPrepareParams)); + } + private: Callback & mCallback; NodeState mCache; diff --git a/src/app/BufferedReadCallback.h b/src/app/BufferedReadCallback.h index 039316e06f0650..4c96cc46290c90 100644 --- a/src/app/BufferedReadCallback.h +++ b/src/app/BufferedReadCallback.h @@ -79,6 +79,11 @@ class BufferedReadCallback : public ReadClient::Callback void OnDone() override { return mCallback.OnDone(); } void OnSubscriptionEstablished(uint64_t aSubscriptionId) override { mCallback.OnSubscriptionEstablished(aSubscriptionId); } + void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override + { + return mCallback.OnDeallocatePaths(std::move(aReadPrepareParams)); + } + private: /* * Given a reader positioned at a list element, allocate a packet buffer, copy the list item where diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp index 1da2997055d9d0..eb7586d8878ea9 100644 --- a/src/app/ReadClient.cpp +++ b/src/app/ReadClient.cpp @@ -22,15 +22,55 @@ * */ -#include "lib/core/CHIPTLVTypes.h" #include #include #include #include +#include +#include namespace chip { namespace app { +/** + * @brief The default resubscribe policy will pick a random timeslot + * with millisecond resolution over an ever increasing window, + * following a fibonacci sequence up to CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX, + * Average of the randomized wait time past the CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX + * will be around one hour. + * When the retry count resets to 0, the sequence starts from the beginning again. + */ +static void DefaultResubscribePolicy(uint32_t aNumCumulativeRetries, uint32_t & aNextSubscriptionIntervalMsec, + bool & aShouldResubscribe) +{ + uint32_t maxWaitTimeInMsec = 0; + uint32_t waitTimeInMsec = 0; + uint32_t minWaitTimeInMsec = 0; + + if (aNumCumulativeRetries <= CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX) + { + maxWaitTimeInMsec = GetFibonacciForIndex(aNumCumulativeRetries) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS; + } + else + { + maxWaitTimeInMsec = CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS; + } + + if (maxWaitTimeInMsec != 0) + { + minWaitTimeInMsec = (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100; + waitTimeInMsec = minWaitTimeInMsec + (Crypto::GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec)); + } + + aNextSubscriptionIntervalMsec = waitTimeInMsec; + aShouldResubscribe = true; + ChipLogProgress(DataManagement, + "Computing Resubscribe policy: attempts %" PRIu32 ", max wait time %" PRIu32 " ms, selected wait time %" PRIu32 + " ms", + aNumCumulativeRetries, maxWaitTimeInMsec, waitTimeInMsec); + return; +} + ReadClient::ReadClient(InteractionModelEngine * apImEngine, Messaging::ExchangeManager * apExchangeMgr, Callback & apCallback, InteractionType aInteractionType) : mpCallback(apCallback) @@ -48,6 +88,25 @@ ReadClient::ReadClient(InteractionModelEngine * apImEngine, Messaging::ExchangeM } } +void ReadClient::ClearActiveSubscriptionState() +{ + mIsInitialReport = true; + mIsPrimingReports = true; + mPendingMoreChunks = false; + mMinIntervalFloorSeconds = 0; + mMaxIntervalCeilingSeconds = 0; + mSubscriptionId = 0; + MoveToState(ClientState::Idle); +} + +void ReadClient::StopResubscription() +{ + ClearActiveSubscriptionState(); + CancelLivenessCheckTimer(); + CancelResubscribeTimer(); + mpCallback.OnDeallocatePaths(std::move(mReadPrepareParams)); +} + ReadClient::~ReadClient() { Abort(); @@ -55,7 +114,6 @@ ReadClient::~ReadClient() if (IsSubscriptionType()) { CancelLivenessCheckTimer(); - // // Only remove ourselves from the engine's tracker list if we still continue to have a valid pointer to it. // This won't be the case if the engine shut down before this destructor was called (in which case, mpImEngine @@ -84,9 +142,18 @@ void ReadClient::Close(CHIP_ERROR aError) if (aError != CHIP_NO_ERROR) { + if (ResubscribeIfNeeded()) + { + ClearActiveSubscriptionState(); + return; + } mpCallback.OnError(aError); } + if (mReadPrepareParams.mResubscribePolicy != nullptr) + { + StopResubscription(); + } mpCallback.OnDone(); } @@ -598,22 +665,29 @@ void ReadClient::CancelLivenessCheckTimer() OnLivenessTimeoutCallback, this); } +void ReadClient::CancelResubscribeTimer() +{ + InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer( + OnResubscribeTimerCallback, this); +} + void ReadClient::OnLivenessTimeoutCallback(System::Layer * apSystemLayer, void * apAppState) { - ReadClient * const client = reinterpret_cast(apAppState); + ReadClient * const _this = reinterpret_cast(apAppState); // // Might as well try to see if this instance exists in the tracked list in the IM. // This might blow-up if either the client has since been free'ed (use-after-free), or if the engine has since // been shutdown at which point the client wouldn't exist in the active read client list. // - VerifyOrDie(client->mpImEngine->InActiveReadClientList(client)); + VerifyOrDie(_this->mpImEngine->InActiveReadClientList(_this)); - ChipLogError(DataManagement, "Subscription Liveness timeout with peer node 0x%" PRIx64 ", shutting down ", client->mPeerNodeId); + ChipLogError(DataManagement, "Subscription Liveness timeout with subscription id 0x%" PRIx64 " peer node 0x%" PRIx64, + _this->mSubscriptionId, _this->mPeerNodeId); // TODO: add a more specific error here for liveness timeout failure to distinguish between other classes of timeouts (i.e // response timeouts). - client->Close(CHIP_ERROR_TIMEOUT); + _this->Close(CHIP_ERROR_TIMEOUT); } CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aPayload) @@ -644,6 +718,22 @@ CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aP return CHIP_NO_ERROR; } +CHIP_ERROR ReadClient::SendAutoResubscribeRequest(ReadPrepareParams && aReadPrepareParams) +{ + mReadPrepareParams = std::move(aReadPrepareParams); + if (mReadPrepareParams.mResubscribePolicy == nullptr) + { + mReadPrepareParams.mResubscribePolicy = DefaultResubscribePolicy; + } + + CHIP_ERROR err = SendSubscribeRequest(mReadPrepareParams); + if (err != CHIP_NO_ERROR) + { + StopResubscription(); + } + return err; +} + CHIP_ERROR ReadClient::SendSubscribeRequest(ReadPrepareParams & aReadPrepareParams) { CHIP_ERROR err = CHIP_NO_ERROR; @@ -658,7 +748,6 @@ CHIP_ERROR ReadClient::SendSubscribeRequest(ReadPrepareParams & aReadPreparePara VerifyOrReturnError(aReadPrepareParams.mMinIntervalFloorSeconds <= aReadPrepareParams.mMaxIntervalCeilingSeconds, err = CHIP_ERROR_INVALID_ARGUMENT); - writer.Init(std::move(msgBuf)); ReturnErrorOnFailure(request.Init(&writer)); @@ -717,5 +806,43 @@ CHIP_ERROR ReadClient::SendSubscribeRequest(ReadPrepareParams & aReadPreparePara return CHIP_NO_ERROR; } -}; // namespace app -}; // namespace chip +void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void * apAppState) +{ + ReadClient * const _this = reinterpret_cast(apAppState); + assert(_this != nullptr); + _this->SendSubscribeRequest(_this->mReadPrepareParams); + _this->mNumRetries++; +} + +bool ReadClient::ResubscribeIfNeeded() +{ + bool shouldResubscribe = true; + uint32_t intervalMsec = 0; + if (mReadPrepareParams.mResubscribePolicy == nullptr) + { + ChipLogProgress(DataManagement, "mResubscribePolicy is null"); + return false; + } + mReadPrepareParams.mResubscribePolicy(mNumRetries, intervalMsec, shouldResubscribe); + if (!shouldResubscribe) + { + ChipLogProgress(DataManagement, "Resubscribe has been stopped"); + return false; + } + CHIP_ERROR err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer( + System::Clock::Milliseconds32(intervalMsec), OnResubscribeTimerCallback, this); + if (err != CHIP_NO_ERROR) + { + ChipLogProgress(DataManagement, "Fail to resubscribe with error %" CHIP_ERROR_FORMAT, err.Format()); + return false; + } + else + { + ChipLogProgress(DataManagement, "Will try to Resubscribe at retry index %" PRIu32 " after %" PRIu32 "ms", mNumRetries, + intervalMsec); + } + return true; +} + +} // namespace app +} // namespace chip diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h index 653a9cb1d0b716..be67b4fd5d5f6e 100644 --- a/src/app/ReadClient.h +++ b/src/app/ReadClient.h @@ -156,6 +156,15 @@ class ReadClient : public Messaging::ExchangeDelegate * */ virtual void OnDone() = 0; + + /** + * This function is invoked when using SendAutoResubscribeRequest, where the ReadClient was configured to auto re-subscribe and the ReadPrepareParams was + * moved into this client for management. This will have to be free'ed appropriately by the application. + * If SendAutoResubscribeRequest fails, this function will be called before it returns the failure. + * If SendAutoResubscribeRequest succeeds, this function will be called immediately before calling OnDone. + * If SendAutoResubscribeRequest is not called, this function will not be called. + */ + virtual void OnDeallocatePaths(ReadPrepareParams && aReadPrepareParams) {} }; enum class InteractionType : uint8_t @@ -232,6 +241,19 @@ class ReadClient : public Messaging::ExchangeDelegate ReadClient * GetNextClient() { return mpNext; } void SetNextClient(ReadClient * apClient) { mpNext = apClient; } + // Like SendSubscribeRequest, but the ReadClient will automatically attempt to re-establish the subscription if + // we decide that the subscription has dropped. The exact behavior of the re-establishment can be controlled + // by setting mResubscribePolicy in the ReadPrepareParams. If not set, a default behavior with exponential backoff will be used. + // + // The application has to know to + // a) allocate a ReadPrepareParams object that will have fields mpEventPathParamsList and mpAttributePathParamsList with + // lifetimes as long as the ReadClient itself and b) free those up later in the call to OnDeallocatePaths. Note: At a given + // time in the system, you can either have a single subscription with re-sub enabled that that has mKeepSubscriptions = false, + // OR, multiple subs with re-sub enabled with mKeepSubscriptions = true. You shall not have a mix of both simultaneously. + // If SendAutoResubscribeRequest is called at all, it guarantees that it will call OnDeallocatePaths when OnDone is called. + // SendAutoResubscribeRequest is the only case that calls OnDeallocatePaths, since that's the only case when the consumer moved a ReadParams into the client. + CHIP_ERROR SendAutoResubscribeRequest(ReadPrepareParams && aReadPrepareParams); + private: friend class TestReadInteraction; friend class InteractionModelEngine; @@ -274,15 +296,18 @@ class ReadClient : public Messaging::ExchangeDelegate CHIP_ERROR ProcessSubscribeResponse(System::PacketBufferHandle && aPayload); CHIP_ERROR RefreshLivenessCheckTimer(); void CancelLivenessCheckTimer(); + void CancelResubscribeTimer(); void MoveToState(const ClientState aTargetState); CHIP_ERROR ProcessAttributePath(AttributePathIB::Parser & aAttributePath, ConcreteDataAttributePath & aClusterInfo); CHIP_ERROR ProcessReportData(System::PacketBufferHandle && aPayload); const char * GetStateStr() const; - + bool ResubscribeIfNeeded(); // Specialized request-sending functions. CHIP_ERROR SendReadRequest(ReadPrepareParams & aReadPrepareParams); CHIP_ERROR SendSubscribeRequest(ReadPrepareParams & aSubscribePrepareParams); + static void OnResubscribeTimerCallback(System::Layer * apSystemLayer, void * apAppState); + /* * Called internally to signal the completion of all work on this object, gracefully close the * exchange and finally, signal to the application that it's @@ -293,6 +318,9 @@ class ReadClient : public Messaging::ExchangeDelegate */ void Close(CHIP_ERROR aError); + void StopResubscription(); + void ClearActiveSubscriptionState(); + Messaging::ExchangeManager * mpExchangeMgr = nullptr; Messaging::ExchangeContext * mpExchangeCtx = nullptr; Callback & mpCallback; @@ -311,6 +339,8 @@ class ReadClient : public Messaging::ExchangeDelegate ReadClient * mpNext = nullptr; InteractionModelEngine * mpImEngine = nullptr; + ReadPrepareParams mReadPrepareParams; + uint32_t mNumRetries = 0; }; }; // namespace app diff --git a/src/app/ReadHandler.cpp b/src/app/ReadHandler.cpp index b84a0286cc81be..4b08abe8e410ea 100644 --- a/src/app/ReadHandler.cpp +++ b/src/app/ReadHandler.cpp @@ -130,6 +130,11 @@ CHIP_ERROR ReadHandler::OnInitialRequest(System::PacketBufferHandle && aPayload) { Close(); } + else + { + // Mark read handler dirty for read/subscribe priming stage + mDirty = true; + } return err; } diff --git a/src/app/ReadPrepareParams.h b/src/app/ReadPrepareParams.h index 9cdaa5240a0090..86e8c182af3893 100644 --- a/src/app/ReadPrepareParams.h +++ b/src/app/ReadPrepareParams.h @@ -27,6 +27,16 @@ namespace chip { namespace app { +/** + * @brief Used to specify the re-subscription policy. Namely, the method is invoked and provided the number of + * retries that have occurred so far. + * + * aShouldResubscribe and aNextSubscriptionIntervalMsec are outparams indicating whether and how long into + * the future a re-subscription should happen. + */ +typedef void (*OnResubscribePolicyCB)(uint32_t aNumCumulativeRetries, uint32_t & aNextSubscriptionIntervalMsec, + bool & aShouldResubscribe); + struct ReadPrepareParams { SessionHolder mSessionHolder; @@ -40,7 +50,9 @@ struct ReadPrepareParams uint16_t mMaxIntervalCeilingSeconds = 0; bool mKeepSubscriptions = true; bool mIsFabricFiltered = false; + OnResubscribePolicyCB mResubscribePolicy = nullptr; + ReadPrepareParams() {} ReadPrepareParams(const SessionHandle & sessionHandle) { mSessionHolder.Grab(sessionHandle); } ReadPrepareParams(ReadPrepareParams && other) : mSessionHolder(other.mSessionHolder) { @@ -58,6 +70,7 @@ struct ReadPrepareParams other.mEventPathParamsListSize = 0; other.mpAttributePathParamsList = nullptr; other.mAttributePathParamsListSize = 0; + mResubscribePolicy = other.mResubscribePolicy; } ReadPrepareParams & operator=(ReadPrepareParams && other) @@ -80,7 +93,7 @@ struct ReadPrepareParams other.mEventPathParamsListSize = 0; other.mpAttributePathParamsList = nullptr; other.mAttributePathParamsListSize = 0; - + mResubscribePolicy = other.mResubscribePolicy; return *this; } }; diff --git a/src/app/tests/TestReadInteraction.cpp b/src/app/tests/TestReadInteraction.cpp index ab6f49bc225597..056ce6aad126f4 100644 --- a/src/app/tests/TestReadInteraction.cpp +++ b/src/app/tests/TestReadInteraction.cpp @@ -157,6 +157,19 @@ class MockInteractionModelApp : public chip::app::ReadClient::Callback void OnDone() override {} + void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override + { + if (aReadPrepareParams.mpAttributePathParamsList != nullptr) + { + delete[] aReadPrepareParams.mpAttributePathParamsList; + } + + if (aReadPrepareParams.mpEventPathParamsList != nullptr) + { + delete[] aReadPrepareParams.mpEventPathParamsList; + } + } + int mNumDataElementIndex = 0; bool mGotEventResponse = false; int mNumAttributeResponse = 0; @@ -240,6 +253,7 @@ class TestReadInteraction static void TestReadInvalidAttributePathRoundtrip(nlTestSuite * apSuite, void * apContext); static void TestSubscribeInvalidIterval(nlTestSuite * apSuite, void * apContext); static void TestReadShutdown(nlTestSuite * apSuite, void * apContext); + static void TestResubscribeRoundtrip(nlTestSuite * apSuite, void * apContext); private: static void GenerateReportData(nlTestSuite * apSuite, void * apContext, System::PacketBufferHandle & aPayload, @@ -1195,7 +1209,7 @@ void TestReadInteraction::TestSubscribeWildcard(nlTestSuite * apSuite, void * ap ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice()); readPrepareParams.mEventPathParamsListSize = 0; - chip::app::AttributePathParams attributePathParams[2]; + chip::app::AttributePathParams * attributePathParams = new chip::app::AttributePathParams[2]; // Subscribe to full wildcard paths, repeat twice to ensure chunking. readPrepareParams.mpAttributePathParamsList = attributePathParams; readPrepareParams.mAttributePathParamsListSize = 2; @@ -1210,7 +1224,7 @@ void TestReadInteraction::TestSubscribeWildcard(nlTestSuite * apSuite, void * ap delegate.mGotReport = false; - err = readClient.SendRequest(readPrepareParams); + err = readClient.SendAutoResubscribeRequest(std::move(readPrepareParams)); NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR); for (int i = 0; i < 20 && delegate.mNumAttributeResponse < 70; i++) diff --git a/src/app/tests/integration/chip_im_initiator.cpp b/src/app/tests/integration/chip_im_initiator.cpp index 8d13596e5a24a4..a728b10553b133 100644 --- a/src/app/tests/integration/chip_im_initiator.cpp +++ b/src/app/tests/integration/chip_im_initiator.cpp @@ -208,6 +208,18 @@ class MockInteractionModelApp : public chip::app::InteractionModelDelegate, void Shutdown() { mReadClient.reset(); } + void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override + { + if (aReadPrepareParams.mpAttributePathParamsList != nullptr) + { + delete[] aReadPrepareParams.mpAttributePathParamsList; + } + + if (aReadPrepareParams.mpEventPathParamsList != nullptr) + { + delete[] aReadPrepareParams.mpEventPathParamsList; + } + } private: chip::Platform::UniquePtr mReadClient; }; @@ -384,8 +396,8 @@ CHIP_ERROR SendSubscribeRequest() gLastMessageTime = chip::System::SystemClock().GetMonotonicTimestamp(); chip::app::ReadPrepareParams readPrepareParams(gSession.Get()); - chip::app::EventPathParams eventPathParams[2]; - chip::app::AttributePathParams attributePathParams[1]; + chip::app::EventPathParams * eventPathParams = new chip::app::EventPathParams[2]; + chip::app::AttributePathParams * attributePathParams = new chip::app::AttributePathParams[1]; readPrepareParams.mpEventPathParamsList = eventPathParams; readPrepareParams.mpEventPathParamsList[0].mEndpointId = kTestEndpointId; readPrepareParams.mpEventPathParamsList[0].mClusterId = kTestClusterId; @@ -406,19 +418,19 @@ CHIP_ERROR SendSubscribeRequest() readPrepareParams.mMinIntervalFloorSeconds = 5; readPrepareParams.mMaxIntervalCeilingSeconds = 5; + printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId); auto readClient = chip::Platform::MakeUnique(chip::app::InteractionModelEngine::GetInstance(), &gExchangeManager, gMockDelegate, chip::app::ReadClient::InteractionType::Subscribe); - SuccessOrExit(readClient->SendRequest(readPrepareParams)); + err = readClient->SendAutoResubscribeRequest(std::move(readPrepareParams)); gMockDelegate.AdoptReadClient(std::move(readClient)); gSubCount++; -exit: if (err != CHIP_NO_ERROR) { printf("Send subscribe request failed, err: %s\n", chip::ErrorStr(err)); @@ -601,7 +613,7 @@ void SubscribeRequestTimerHandler(chip::System::Layer * systemLayer, void * appS err = SendSubscribeRequest(); VerifyOrExit(err == CHIP_NO_ERROR, printf("Failed to send write request with error: %s\n", chip::ErrorStr(err))); - err = chip::DeviceLayer::SystemLayer().StartTimer(chip::System::Clock::Seconds16(20), SubscribeRequestTimerHandler, NULL); + err = chip::DeviceLayer::SystemLayer().StartTimer(chip::System::Clock::Seconds16(1000), SubscribeRequestTimerHandler, NULL); VerifyOrExit(err == CHIP_NO_ERROR, printf("Failed to schedule timer with error: %s\n", chip::ErrorStr(err))); } else diff --git a/src/controller/ReadInteraction.h b/src/controller/ReadInteraction.h index e59de69eee2265..d97fe64c6af6fc 100644 --- a/src/controller/ReadInteraction.h +++ b/src/controller/ReadInteraction.h @@ -45,11 +45,12 @@ template CHIP_ERROR ReportAttribute(Messaging::ExchangeManager * exchangeMgr, EndpointId endpointId, ClusterId clusterId, AttributeId attributeId, ReportAttributeParams && readParams) { - app::AttributePathParams attributePath(endpointId, clusterId, attributeId); app::InteractionModelEngine * engine = app::InteractionModelEngine::GetInstance(); CHIP_ERROR err = CHIP_NO_ERROR; - readParams.mpAttributePathParamsList = &attributePath; + auto readPaths = Platform::MakeUnique(endpointId, clusterId, attributeId); + VerifyOrReturnError(readPaths != nullptr, CHIP_ERROR_NO_MEMORY); + readParams.mpAttributePathParamsList = readPaths.get(); readParams.mAttributePathParamsListSize = 1; auto onDone = [](TypedReadAttributeCallback * callback) { chip::Platform::Delete(callback); }; @@ -60,8 +61,19 @@ CHIP_ERROR ReportAttribute(Messaging::ExchangeManager * exchangeMgr, EndpointId auto readClient = chip::Platform::MakeUnique(engine, exchangeMgr, callback->GetBufferedCallback(), readParams.mReportType); + VerifyOrReturnError(readClient != nullptr, CHIP_ERROR_NO_MEMORY); - ReturnErrorOnFailure(readClient->SendRequest(readParams)); + if (readClient->IsSubscriptionType()) + { + readPaths.release(); + err = readClient->SendAutoResubscribeRequest(std::move(readParams)); + ReturnErrorOnFailure(err); + } + else + { + err = readClient->SendRequest(readParams); + ReturnErrorOnFailure(err); + } // // At this point, we'll get a callback through the OnDone callback above regardless of success or failure @@ -176,13 +188,16 @@ template CHIP_ERROR ReportEvent(Messaging::ExchangeManager * apExchangeMgr, EndpointId endpointId, ReportEventParams && readParams) { - ClusterId clusterId = DecodableEventType::GetClusterId(); - EventId eventId = DecodableEventType::GetEventId(); - app::EventPathParams eventPath(endpointId, clusterId, eventId); + ClusterId clusterId = DecodableEventType::GetClusterId(); + EventId eventId = DecodableEventType::GetEventId(); app::InteractionModelEngine * engine = app::InteractionModelEngine::GetInstance(); CHIP_ERROR err = CHIP_NO_ERROR; - readParams.mpEventPathParamsList = &eventPath; + auto readPaths = Platform::MakeUnique(endpointId, clusterId, eventId); + VerifyOrReturnError(readPaths != nullptr, CHIP_ERROR_NO_MEMORY); + + readParams.mpEventPathParamsList = readPaths.get(); + readParams.mEventPathParamsListSize = 1; auto onDone = [](TypedReadEventCallback * callback) { chip::Platform::Delete(callback); }; @@ -193,7 +208,19 @@ CHIP_ERROR ReportEvent(Messaging::ExchangeManager * apExchangeMgr, EndpointId en VerifyOrReturnError(callback != nullptr, CHIP_ERROR_NO_MEMORY); auto readClient = chip::Platform::MakeUnique(engine, apExchangeMgr, *callback.get(), readParams.mReportType); - ReturnErrorOnFailure(readClient->SendRequest(readParams)); + VerifyOrReturnError(readClient != nullptr, CHIP_ERROR_NO_MEMORY); + + if (readClient->IsSubscriptionType()) + { + readPaths.release(); + err = readClient->SendAutoResubscribeRequest(std::move(readParams)); + ReturnErrorOnFailure(err); + } + else + { + err = readClient->SendRequest(readParams); + ReturnErrorOnFailure(err); + } // // At this point, we'll get a callback through the OnDone callback above regardless of success or failure diff --git a/src/controller/TypedReadCallback.h b/src/controller/TypedReadCallback.h index 2f82215681e96d..6c2c663b102518 100644 --- a/src/controller/TypedReadCallback.h +++ b/src/controller/TypedReadCallback.h @@ -18,6 +18,7 @@ #pragma once +#include #include #include #include @@ -106,6 +107,18 @@ class TypedReadAttributeCallback final : public app::ReadClient::Callback } } + void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override + { + if (aReadPrepareParams.mpAttributePathParamsList != nullptr) + { + for (size_t i = 0; i < aReadPrepareParams.mAttributePathParamsListSize; i++) + { + chip::Platform::Delete(&aReadPrepareParams.mpAttributePathParamsList[i]); + } + + } + } + ClusterId mClusterId; AttributeId mAttributeId; OnSuccessCallbackType mOnSuccess; @@ -159,6 +172,17 @@ class TypedReadEventCallback final : public app::ReadClient::Callback void OnDone() override { mOnDone(this); } + void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override + { + if (aReadPrepareParams.mpEventPathParamsList != nullptr) + { + for (size_t i = 0; i < aReadPrepareParams.mEventPathParamsListSize; i++) + { + chip::Platform::Delete(&aReadPrepareParams.mpEventPathParamsList[i]); + } + } + } + void OnSubscriptionEstablished(uint64_t aSubscriptionId) override { if (mOnSubscriptionEstablished) diff --git a/src/controller/python/chip/clusters/attribute.cpp b/src/controller/python/chip/clusters/attribute.cpp index 93ca755dbc3da3..9fa11fe07f2e4c 100644 --- a/src/controller/python/chip/clusters/attribute.cpp +++ b/src/controller/python/chip/clusters/attribute.cpp @@ -151,6 +151,18 @@ class ReadClientCallback : public ReadClient::Callback void OnError(CHIP_ERROR aError) override { gOnReadErrorCallback(mAppContext, aError.AsInteger()); } void OnReportBegin() override { gOnReportBeginCallback(mAppContext); } + void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override + { + if (aReadPrepareParams.mpAttributePathParamsList != nullptr) + { + delete[] aReadPrepareParams.mpAttributePathParamsList; + } + + if (aReadPrepareParams.mpEventPathParamsList != nullptr) + { + delete[] aReadPrepareParams.mpEventPathParamsList; + } + } void OnReportEnd() override { gOnReportEndCallback(mAppContext); } @@ -356,19 +368,21 @@ chip::ChipError::StorageType pychip_ReadClient_ReadAttributes(void * appContext, ReadPrepareParams params(session.Value()); params.mpAttributePathParamsList = readPaths.get(); params.mAttributePathParamsListSize = n; - VerifyOrExit(readClient != nullptr, err = CHIP_ERROR_NO_MEMORY); if (pyParams.isSubscription) { params.mMinIntervalFloorSeconds = pyParams.minInterval; params.mMaxIntervalCeilingSeconds = pyParams.maxInterval; + readPaths.release(); + err = readClient->SendAutoResubscribeRequest(std::move(params)); + SuccessOrExit(err); + } + else + { + err = readClient->SendRequest(params); + SuccessOrExit(err); } - - params.mIsFabricFiltered = pyParams.isFabricFiltered; - - err = readClient->SendRequest(params); - SuccessOrExit(err); } *pReadClient = readClient.get(); @@ -428,10 +442,15 @@ chip::ChipError::StorageType pychip_ReadClient_ReadEvents(void * appContext, Rea { params.mMinIntervalFloorSeconds = pyParams.minInterval; params.mMaxIntervalCeilingSeconds = pyParams.maxInterval; + readPaths.release(); + err = readClient->SendAutoResubscribeRequest(std::move(params)); + SuccessOrExit(err); + } + else + { + err = readClient->SendRequest(params); + SuccessOrExit(err); } - - err = readClient->SendRequest(params); - SuccessOrExit(err); } *pReadClient = readClient.get(); diff --git a/src/controller/tests/data_model/TestRead.cpp b/src/controller/tests/data_model/TestRead.cpp index 12e03a9bd24802..93f15c5069b517 100644 --- a/src/controller/tests/data_model/TestRead.cpp +++ b/src/controller/tests/data_model/TestRead.cpp @@ -453,7 +453,8 @@ void TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleSubscription NL_TEST_ASSERT(apSuite, numSuccessCalls == 1); NL_TEST_ASSERT(apSuite, numSubscriptionEstablishedCalls == 1); - NL_TEST_ASSERT(apSuite, numFailureCalls == 1); + //Resubscription is happening for second subscribe call + NL_TEST_ASSERT(apSuite, numFailureCalls == 0); app::InteractionModelEngine::GetInstance()->SetHandlerCapacity(-1); app::InteractionModelEngine::GetInstance()->ShutdownActiveReads(); diff --git a/src/lib/core/CHIPConfig.h b/src/lib/core/CHIPConfig.h index 3ee77cc3696c0b..107ac8c272a676 100644 --- a/src/lib/core/CHIPConfig.h +++ b/src/lib/core/CHIPConfig.h @@ -1694,6 +1694,64 @@ extern const char CHIP_NON_PRODUCTION_MARKER[]; #ifndef CHIP_CONFIG_ENABLE_SERVER_IM_EVENT #define CHIP_CONFIG_ENABLE_SERVER_IM_EVENT 1 #endif + +/** + * @def CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS + * + * @brief + * If auto resubscribe is enabled & default resubscription policy is used, + * specify the max wait time. + * This value was chosen so that the average wait time is 3600000 + * ((100 - CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP) % of CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS) / 2 + + * (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP % of CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS) = average wait is + * 3600000 + */ +#ifndef CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS +#define CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS 5538000 +#endif + +/** + * @def CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX + * + * @brief + * If auto resubscribe is enabled & default resubscription policy is used, + * specify the max fibonacci step index. + * This index must satisfy below conditions: + * 1 . Fibonacci(CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX + 1) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS > + * CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS 2 . Fibonacci(CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX) * + * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS < CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS + * + */ +#ifndef CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX +#define CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX 14 +#endif + +/** + * @def CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP + * + * @brief + * If auto resubscribe is enabled & default resubscription policy is used, + * specify the minimum wait + * time as a percentage of the max wait interval for that step. + * + */ +#ifndef CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP +#define CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP 30 +#endif + +/** + * @def CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS + * + * @brief + * If auto resubscribe is enabled & default resubscription policy is used, + * specify the multiplier that multiplies the result of a fibonacci computation + * based on a specific index to provide a max wait time for + * a step. + * + */ +#ifndef CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS +#define CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS 10000 +#endif /** * @} */ diff --git a/src/lib/support/FibonacciUtils.h b/src/lib/support/FibonacciUtils.h index b0d0d7ff30fbcd..ecefe65c2e64df 100644 --- a/src/lib/support/FibonacciUtils.h +++ b/src/lib/support/FibonacciUtils.h @@ -31,7 +31,7 @@ namespace chip { /** * This function generates 32-bit Fibonacci number - * for a given 32 bit index. + * for a given 32 bit index. The index boundary is 47, then it would overflow. * * @return 32-bit unsigned fibonacci number. *