From ad6fa1276e01e6e846950a9469a649a490cb3910 Mon Sep 17 00:00:00 2001 From: Jerry Johns Date: Sat, 25 Jun 2022 07:47:16 -0700 Subject: [PATCH 01/10] Establish CASE on re-subscription This adds support for re-establishing CASE on re-subscription as a default policy implementation, with the application having the ability to over-ride that if needed. --- src/app/BufferedReadCallback.h | 4 +- src/app/ClusterStateCache.h | 4 +- src/app/DeviceProxy.h | 1 - src/app/InteractionModelEngine.cpp | 12 +- src/app/InteractionModelEngine.h | 12 +- src/app/OperationalDeviceProxy.cpp | 11 +- src/app/ReadClient.cpp | 261 +++++++++++------- src/app/ReadClient.h | 79 +++++- src/app/ReadPrepareParams.h | 22 +- .../operational-credentials-server.cpp | 1 + src/app/server/Server.cpp | 6 +- src/app/tests/BUILD.gn | 1 + .../tests/integration/chip_im_initiator.cpp | 1 - .../interaction_model/InteractionModel.cpp | 2 - .../interaction_model/InteractionModel.h | 2 +- .../CHIPDeviceControllerFactory.cpp | 5 +- src/controller/TypedReadCallback.h | 16 +- .../python/chip/clusters/attribute.cpp | 9 +- src/controller/tests/TestEventChunking.cpp | 2 - src/controller/tests/TestReadChunking.cpp | 3 - src/controller/tests/data_model/TestRead.cpp | 8 +- src/transport/SessionManager.h | 2 + 22 files changed, 295 insertions(+), 169 deletions(-) diff --git a/src/app/BufferedReadCallback.h b/src/app/BufferedReadCallback.h index ea7dd28c4dbe5a..6b2d31d2b19a1e 100644 --- a/src/app/BufferedReadCallback.h +++ b/src/app/BufferedReadCallback.h @@ -86,9 +86,9 @@ class BufferedReadCallback : public ReadClient::Callback mCallback.OnSubscriptionEstablished(aSubscriptionId); } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override + CHIP_ERROR OnResubscriptionNeeded(ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override { - mCallback.OnResubscriptionAttempt(aTerminationCause, aNextResubscribeIntervalMsec); + return mCallback.OnResubscriptionNeeded(apReadClient, aTerminationCause); } void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override diff --git a/src/app/ClusterStateCache.h b/src/app/ClusterStateCache.h index b16fd58b1ee012..8ba58f1d923798 100644 --- a/src/app/ClusterStateCache.h +++ b/src/app/ClusterStateCache.h @@ -576,9 +576,9 @@ class ClusterStateCache : protected ReadClient::Callback mCallback.OnSubscriptionEstablished(aSubscriptionId); } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override + CHIP_ERROR OnResubscriptionNeeded(ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override { - mCallback.OnResubscriptionAttempt(aTerminationCause, aNextResubscribeIntervalMsec); + return mCallback.OnResubscriptionNeeded(apReadClient, aTerminationCause); } void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override diff --git a/src/app/DeviceProxy.h b/src/app/DeviceProxy.h index ce9a3909561ebc..3315576065b6ed 100644 --- a/src/app/DeviceProxy.h +++ b/src/app/DeviceProxy.h @@ -27,7 +27,6 @@ #pragma once #include -#include #include #include #include diff --git a/src/app/InteractionModelEngine.cpp b/src/app/InteractionModelEngine.cpp index bfffe29cb5010f..97380ab253e120 100644 --- a/src/app/InteractionModelEngine.cpp +++ b/src/app/InteractionModelEngine.cpp @@ -42,13 +42,17 @@ InteractionModelEngine * InteractionModelEngine::GetInstance() return &sInteractionModelEngine; } -CHIP_ERROR InteractionModelEngine::Init(Messaging::ExchangeManager * apExchangeMgr, FabricTable * apFabricTable) +CHIP_ERROR InteractionModelEngine::Init(Messaging::ExchangeManager * apExchangeMgr, FabricTable * apFabricTable, + CASESessionManager * apCASESessionMgr) { - mpExchangeMgr = apExchangeMgr; - mpFabricTable = apFabricTable; + VerifyOrReturnError(apFabricTable != nullptr, CHIP_ERROR_INVALID_ARGUMENT); + VerifyOrReturnError(apExchangeMgr != nullptr, CHIP_ERROR_INVALID_ARGUMENT); + + mpExchangeMgr = apExchangeMgr; + mpFabricTable = apFabricTable; + mpCASESessionMgr = apCASESessionMgr; ReturnErrorOnFailure(mpExchangeMgr->RegisterUnsolicitedMessageHandlerForProtocol(Protocols::InteractionModel::Id, this)); - VerifyOrReturnError(mpFabricTable != nullptr, CHIP_ERROR_INVALID_ARGUMENT); mReportingEngine.Init(); mMagic++; diff --git a/src/app/InteractionModelEngine.h b/src/app/InteractionModelEngine.h index c3a5248978d789..41cef9eccb44aa 100644 --- a/src/app/InteractionModelEngine.h +++ b/src/app/InteractionModelEngine.h @@ -59,6 +59,8 @@ #include #include +#include + namespace chip { namespace app { @@ -102,17 +104,21 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler, * Initialize the InteractionModel Engine. * * @param[in] apExchangeMgr A pointer to the ExchangeManager object. + * @param[in] apFabricTable A pointer to the FabricTable object. + * @param[in] apCASESessionMgr An optional pointer to a CASESessionManager (used for re-subscriptions). * * @retval #CHIP_ERROR_INCORRECT_STATE If the state is not equal to * kState_NotInitialized. * @retval #CHIP_NO_ERROR On success. * */ - CHIP_ERROR Init(Messaging::ExchangeManager * apExchangeMgr, FabricTable * apFabricTable); + CHIP_ERROR Init(Messaging::ExchangeManager * apExchangeMgr, FabricTable * apFabricTable, + CASESessionManager * apCASESessionMgr = nullptr); void Shutdown(); - Messaging::ExchangeManager * GetExchangeManager(void) const { return mpExchangeMgr; }; + Messaging::ExchangeManager * GetExchangeManager(void) const { return mpExchangeMgr; } + CASESessionManager * GetCASESessionManager() const { return mpCASESessionMgr; } /** * Tears down an active subscription. @@ -545,6 +551,8 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler, FabricTable * mpFabricTable; + CASESessionManager * mpCASESessionMgr = nullptr; + // A magic number for tracking values between stack Shutdown()-s and Init()-s. // An ObjectHandle is valid iff. its magic equals to this one. uint32_t mMagic = 0; diff --git a/src/app/OperationalDeviceProxy.cpp b/src/app/OperationalDeviceProxy.cpp index 92651af3f68bad..96c650f299ad1b 100644 --- a/src/app/OperationalDeviceProxy.cpp +++ b/src/app/OperationalDeviceProxy.cpp @@ -24,12 +24,13 @@ * messages to and from the corresponding CHIP devices. */ -#include "OperationalDeviceProxy.h" +#include -#include "CASEClient.h" -#include "CommandSender.h" -#include "ReadPrepareParams.h" -#include "transport/SecureSession.h" +#include +#include +#include +#include +#include #include #include diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp index 63a4b95eb96ff1..970e484c50a4bc 100644 --- a/src/app/ReadClient.cpp +++ b/src/app/ReadClient.cpp @@ -34,47 +34,10 @@ namespace chip { namespace app { -/** - * @brief The default resubscribe policy will pick a random timeslot - * with millisecond resolution over an ever increasing window, - * following a fibonacci sequence up to CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX, - * Average of the randomized wait time past the CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX - * will be around one hour. - * When the retry count resets to 0, the sequence starts from the beginning again. - */ -static void DefaultResubscribePolicy(uint32_t aNumCumulativeRetries, uint32_t & aNextSubscriptionIntervalMsec, - bool & aShouldResubscribe) -{ - uint32_t maxWaitTimeInMsec = 0; - uint32_t waitTimeInMsec = 0; - uint32_t minWaitTimeInMsec = 0; - - if (aNumCumulativeRetries <= CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX) - { - maxWaitTimeInMsec = GetFibonacciForIndex(aNumCumulativeRetries) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS; - } - else - { - maxWaitTimeInMsec = CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS; - } - - if (maxWaitTimeInMsec != 0) - { - minWaitTimeInMsec = (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100; - waitTimeInMsec = minWaitTimeInMsec + (Crypto::GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec)); - } - - aNextSubscriptionIntervalMsec = waitTimeInMsec; - aShouldResubscribe = true; - ChipLogProgress(DataManagement, - "Computing Resubscribe policy: attempts %" PRIu32 ", max wait time %" PRIu32 " ms, selected wait time %" PRIu32 - " ms", - aNumCumulativeRetries, maxWaitTimeInMsec, waitTimeInMsec); -} - ReadClient::ReadClient(InteractionModelEngine * apImEngine, Messaging::ExchangeManager * apExchangeMgr, Callback & apCallback, InteractionType aInteractionType) : - mpCallback(apCallback) + mpCallback(apCallback), + mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this) { // Error if already initialized. mpExchangeMgr = apExchangeMgr; @@ -102,7 +65,7 @@ void ReadClient::ClearActiveSubscriptionState() void ReadClient::StopResubscription() { - ClearActiveSubscriptionState(); + CancelLivenessCheckTimer(); CancelResubscribeTimer(); mpCallback.OnDeallocatePaths(std::move(mReadPrepareParams)); @@ -115,6 +78,8 @@ ReadClient::~ReadClient() if (IsSubscriptionType()) { CancelLivenessCheckTimer(); + CancelResubscribeTimer(); + // // Only remove ourselves from the engine's tracker list if we still continue to have a valid pointer to it. // This won't be the case if the engine shut down before this destructor was called (in which case, mpImEngine @@ -127,7 +92,44 @@ ReadClient::~ReadClient() } } -void ReadClient::Close(CHIP_ERROR aError) +uint32_t ReadClient::ComputeTimeTillNextSubscription() +{ + uint32_t maxWaitTimeInMsec = 0; + uint32_t waitTimeInMsec = 0; + uint32_t minWaitTimeInMsec = 0; + + if (mNumRetries <= CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX) + { + maxWaitTimeInMsec = GetFibonacciForIndex(mNumRetries) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS; + } + else + { + maxWaitTimeInMsec = CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS; + } + + if (maxWaitTimeInMsec != 0) + { + minWaitTimeInMsec = (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100; + waitTimeInMsec = minWaitTimeInMsec + (Crypto::GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec)); + } + + return waitTimeInMsec; +} + +CHIP_ERROR ReadClient::ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, bool establishCASE) +{ + VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE); + + mDoCaseOnNextResub = establishCASE; + + ReturnErrorOnFailure( + InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer( + System::Clock::Milliseconds32(aTimeTillNextResubscriptionMs), OnResubscribeTimerCallback, this)); + + return CHIP_NO_ERROR; +} + +void ReadClient::Close(CHIP_ERROR aError, bool allowResubscription) { // OnDone below can destroy us before we unwind all the way back into the // exchange code and it tries to close itself. Make sure that it doesn't @@ -152,20 +154,30 @@ void ReadClient::Close(CHIP_ERROR aError) { if (aError != CHIP_NO_ERROR) { - uint32_t nextResubscribeMsec = 0; - - if (ResubscribeIfNeeded(nextResubscribeMsec)) + ClearActiveSubscriptionState(); + + // + // We infer that re-subscription was requested by virtue of having a non-zero list of event OR attribute paths present + // in mReadPrepareParams. This would only be the case if an application called SendAutoResubscribeRequest which + // populates mReadPrepareParams with the values provided by the application. + if (allowResubscription && + (mReadPrepareParams.mEventPathParamsListSize != 0 || mReadPrepareParams.mAttributePathParamsListSize != 0)) { - ChipLogProgress(DataManagement, - "Will try to resubscribe to %02x:" ChipLogFormatX64 " at retry index %" PRIu32 " after %" PRIu32 - "ms due to error %" CHIP_ERROR_FORMAT, - mFabricIndex, ChipLogValueX64(mPeerNodeId), mNumRetries, nextResubscribeMsec, aError.Format()); - mpCallback.OnResubscriptionAttempt(aError, nextResubscribeMsec); - ClearActiveSubscriptionState(); - return; + aError = mpCallback.OnResubscriptionNeeded(this, aError); + if (aError == CHIP_NO_ERROR) + { + return; + } } + + // + // Either something bad happened when requesting resubscription or the application has decided to not + // continue by returning an error. Let's convey the error back up to the application + // and shut everything down. + // mpCallback.OnError(aError); } + StopResubscription(); } @@ -299,9 +311,7 @@ CHIP_ERROR ReadClient::SendReadRequest(ReadPrepareParams & aReadPrepareParams) ReturnErrorOnFailure(mpExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::ReadRequest, std::move(msgBuf), Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse))); - mPeerNodeId = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeerNodeId(); - mFabricIndex = aReadPrepareParams.mSessionHolder->GetFabricIndex(); - + mPeer = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeer(); MoveToState(ClientState::AwaitingInitialReport); return CHIP_NO_ERROR; @@ -668,7 +678,8 @@ CHIP_ERROR ReadClient::ProcessAttributeReportIBs(TLV::TLVReader & aAttributeRepo DataVersion version = 0; ReturnErrorOnFailure(data.GetDataVersion(&version)); attributePath.mDataVersion.SetValue(version); - if (mReadPrepareParams.mResubscribePolicy != nullptr) + + if (mReadPrepareParams.mpDataVersionFilterList != nullptr) { UpdateDataVersionFilters(attributePath); } @@ -719,10 +730,11 @@ CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsRea ReturnErrorOnFailure(data.GetData(&dataReader)); - if (mReadPrepareParams.mResubscribePolicy != nullptr) - { - mReadPrepareParams.mEventNumber.SetValue(header.mEventNumber + 1); - } + // + // Update the event number being tracked in mReadPrepareParams in case + // we want to use it for re-subscriptions later. + // + mReadPrepareParams.mEventNumber.SetValue(header.mEventNumber + 1); NoteReportingData(); mpCallback.OnEventData(header, &dataReader, nullptr); @@ -760,10 +772,10 @@ CHIP_ERROR ReadClient::RefreshLivenessCheckTimer() System::Clock::Timeout timeout = System::Clock::Seconds16(mMaxInterval) + mpExchangeCtx->GetSessionHandle()->GetAckTimeout(); // EFR32/MBED/INFINION/K32W's chrono count return long unsinged, but other platform returns unsigned - ChipLogProgress(DataManagement, - "Refresh LivenessCheckTime for %lu milliseconds with SubscriptionId = 0x%08" PRIx32 - " Peer = %02x:" ChipLogFormatX64, - static_cast(timeout.count()), mSubscriptionId, mFabricIndex, ChipLogValueX64(mPeerNodeId)); + ChipLogProgress( + DataManagement, + "Refresh LivenessCheckTime for %lu milliseconds with SubscriptionId = 0x%08" PRIx32 " Peer = %02x:" ChipLogFormatX64, + static_cast(timeout.count()), mSubscriptionId, mPeer.GetFabricIndex(), ChipLogValueX64(mPeer.GetNodeId())); err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer( timeout, OnLivenessTimeoutCallback, this); @@ -800,7 +812,7 @@ void ReadClient::OnLivenessTimeoutCallback(System::Layer * apSystemLayer, void * ChipLogError(DataManagement, "Subscription Liveness timeout with SubscriptionID = 0x%08" PRIx32 ", Peer = %02x:" ChipLogFormatX64, - _this->mSubscriptionId, _this->mFabricIndex, ChipLogValueX64(_this->mPeerNodeId)); + _this->mSubscriptionId, _this->mPeer.GetFabricIndex(), ChipLogValueX64(_this->mPeer.GetNodeId())); // TODO: add a more specific error here for liveness timeout failure to distinguish between other classes of timeouts (i.e // response timeouts). @@ -827,7 +839,8 @@ CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aP ChipLogProgress(DataManagement, "Subscription established with SubscriptionID = 0x%08" PRIx32 " MinInterval = %u" "s MaxInterval = %us Peer = %02x:" ChipLogFormatX64, - mSubscriptionId, mMinIntervalFloorSeconds, mMaxInterval, mFabricIndex, ChipLogValueX64(mPeerNodeId)); + mSubscriptionId, mMinIntervalFloorSeconds, mMaxInterval, mPeer.GetFabricIndex(), + ChipLogValueX64(mPeer.GetNodeId())); ReturnErrorOnFailure(subscribeResponse.ExitContainer()); @@ -835,10 +848,7 @@ CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aP mpCallback.OnSubscriptionEstablished(subscriptionId); - if (mReadPrepareParams.mResubscribePolicy != nullptr) - { - mNumRetries = 0; - } + mNumRetries = 0; RefreshLivenessCheckTimer(); @@ -848,12 +858,7 @@ CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aP CHIP_ERROR ReadClient::SendAutoResubscribeRequest(ReadPrepareParams && aReadPrepareParams) { mReadPrepareParams = std::move(aReadPrepareParams); - if (mReadPrepareParams.mResubscribePolicy == nullptr) - { - mReadPrepareParams.mResubscribePolicy = DefaultResubscribePolicy; - } - - CHIP_ERROR err = SendSubscribeRequest(mReadPrepareParams); + CHIP_ERROR err = SendSubscribeRequest(mReadPrepareParams); if (err != CHIP_NO_ERROR) { StopResubscription(); @@ -881,6 +886,9 @@ CHIP_ERROR ReadClient::SendSubscribeRequestImpl(const ReadPrepareParams & aReadP Span dataVersionFilters(aReadPrepareParams.mpDataVersionFilterList, aReadPrepareParams.mDataVersionFilterListSize); + VerifyOrReturnError(aReadPrepareParams.mAttributePathParamsListSize != 0 || aReadPrepareParams.mEventPathParamsListSize != 0, + CHIP_ERROR_INVALID_ARGUMENT); + System::PacketBufferHandle msgBuf; System::PacketBufferTLVWriter writer; SubscribeRequestMessage::Builder request; @@ -957,48 +965,99 @@ CHIP_ERROR ReadClient::SendSubscribeRequestImpl(const ReadPrepareParams & aReadP ReturnErrorOnFailure(mpExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::SubscribeRequest, std::move(msgBuf), Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse))); - mPeerNodeId = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeerNodeId(); - mFabricIndex = aReadPrepareParams.mSessionHolder->GetFabricIndex(); - + mPeer = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeer(); MoveToState(ClientState::AwaitingInitialReport); return CHIP_NO_ERROR; } -void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void * apAppState) +CHIP_ERROR ReadClient::DefaultResubscribePolicy(CHIP_ERROR aTerminationCause) { - ReadClient * const _this = reinterpret_cast(apAppState); - assert(_this != nullptr); - _this->SendSubscribeRequest(_this->mReadPrepareParams); - _this->mNumRetries++; + VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE); + + auto timeTillNextResubscription = ComputeTimeTillNextSubscription(); + ChipLogProgress(DataManagement, + "Will try to resubscribe to %02x:" ChipLogFormatX64 " at retry index %" PRIu32 " after %" PRIu32 + "ms due to error %" CHIP_ERROR_FORMAT, + GetFabricIndex(), ChipLogValueX64(GetPeerNodeId()), mNumRetries, timeTillNextResubscription, + aTerminationCause.Format()); + ReturnErrorOnFailure(ScheduleResubscription(timeTillNextResubscription, aTerminationCause == CHIP_ERROR_TIMEOUT)); + return CHIP_NO_ERROR; } -bool ReadClient::ResubscribeIfNeeded(uint32_t & aNextResubscribeIntervalMsec) +void ReadClient::HandleDeviceConnected(void * context, OperationalDeviceProxy * device) { - bool shouldResubscribe = true; - uint32_t intervalMsec = 0; - aNextResubscribeIntervalMsec = 0; - if (mReadPrepareParams.mResubscribePolicy == nullptr) + ReadClient * const _this = static_cast(context); + VerifyOrDie(_this != nullptr); + + _this->mReadPrepareParams.mSessionHolder.Release(); + _this->mReadPrepareParams.mSessionHolder.Grab(device->GetSecureSession().Value()); + + auto err = _this->SendSubscribeRequest(_this->mReadPrepareParams); + if (err != CHIP_NO_ERROR) { - ChipLogDetail(DataManagement, "mResubscribePolicy is null"); - return false; + _this->Close(err); } - mReadPrepareParams.mResubscribePolicy(mNumRetries, intervalMsec, shouldResubscribe); - if (!shouldResubscribe) +} + +void ReadClient::HandleDeviceConnectionFailure(void * context, PeerId peerId, CHIP_ERROR err) +{ + ReadClient * const _this = static_cast(context); + VerifyOrDie(_this != nullptr); + + ChipLogError(DataManagement, "Failed connecting to device with error '%s'", err.AsString()); + + auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager(); + VerifyOrDie(caseSessionManager != nullptr); + + caseSessionManager->ReleaseSession(peerId); + _this->Close(err); +} + +void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void * apAppState) +{ + ReadClient * const _this = static_cast(apAppState); + assert(_this != nullptr); + + CHIP_ERROR err; + + ChipLogProgress(DataManagement, "OnResubscribeTimerCallback: DoCASE = %d", _this->mDoCaseOnNextResub); + _this->mNumRetries++; + + if (_this->mDoCaseOnNextResub) { - ChipLogProgress(DataManagement, "Resubscribe has been stopped"); - return false; + auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager(); + VerifyOrDie(caseSessionManager != nullptr); + + auto fabric = + InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->GetFabricTable()->FindFabricWithIndex( + _this->mPeer.GetFabricIndex()); + VerifyOrExit(fabric != nullptr, err = CHIP_ERROR_INVALID_FABRIC_INDEX; + ChipLogError(DataManagement, "Underlying fabric has gone away, stopping re-subscriptions!");); + + PeerId peerId(fabric->GetCompressedFabricId(), _this->mPeer.GetNodeId()); + + auto proxy = caseSessionManager->FindExistingSession(peerId); + if (proxy != nullptr) + { + proxy->Disconnect(); + } + + caseSessionManager->FindOrEstablishSession(peerId, &_this->mOnConnectedCallback, &_this->mOnConnectionFailureCallback); + return; } - CHIP_ERROR err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer( - System::Clock::Milliseconds32(intervalMsec), OnResubscribeTimerCallback, this); + + err = _this->SendSubscribeRequest(_this->mReadPrepareParams); + +exit: if (err != CHIP_NO_ERROR) { - ChipLogError(DataManagement, "Fail to resubscribe with error %" CHIP_ERROR_FORMAT, err.Format()); - return false; + // + // Call Close (which should trigger re-subscription again) EXCEPT if we got here because we didn't have a valid fabric. + // In that case, don't permit re-subscription to occur. + // + _this->Close(err, err != CHIP_ERROR_INVALID_FABRIC_INDEX); } - - aNextResubscribeIntervalMsec = intervalMsec; - return true; } void ReadClient::UpdateDataVersionFilters(const ConcreteDataAttributePath & aPath) diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h index a4514e943f33f4..462b81820e84dd 100644 --- a/src/app/ReadClient.h +++ b/src/app/ReadClient.h @@ -32,8 +32,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -130,14 +132,30 @@ class ReadClient : public Messaging::ExchangeDelegate virtual void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) {} /** - * OnResubscriptionAttempt will be called when a re-subscription has been scheduled as a result of the termination of an - * in-progress or previously active subscription. This object MUST continue to exist after this call is completed. The + * OnResubscriptionNeeded will be called when a subscription that was started with SendAutoResubscribeRequest has terminated + * and re-subscription is needed. The termination cause is provided to help inform subsequent re-subscription logic. + * + * The base implementation automatically re-subscribes at appropriate intervals taking the termination cause into account + * (see ReadClient::DefaultResubscribePolicy for more details). If the default implementation doesn't suffice, the logic of + * ReadClient::DefaultResubscribePolicy is broken down into its constituent methods that are publicly available for + * applications to call and sequence. + * + * If the method is over-ridden, it's the application's responsibility to take the appropriate steps needed to eventually + * call-back into the ReadClient object to schedule a re-subscription (by invoking ReadClient::ScheduleResubscription). + * + * If the application DOES NOT want re-subscription to happen on a particular invocation of this method, returning anything + * other than CHIP_NO_ERROR will terminate the interaction and result in OnError, OnDeallocatePaths and OnDone being called + * in that sequence. + * + * This object MUST continue to exist after this call is completed. The * application shall wait until it receives an OnDone call to destroy the object. * * @param[in] aTerminationCause The cause of failure of the subscription that just terminated. - * @param[in] aNextResubscribeIntervalMsec How long we will wait before trying to auto-resubscribe. */ - virtual void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) {} + virtual CHIP_ERROR OnResubscriptionNeeded(ReadClient * apReadClient, CHIP_ERROR aTerminationCause) + { + return apReadClient->DefaultResubscribePolicy(aTerminationCause); + } /** * OnError will be called when an error occurs *after* a successful call to SendRequest(). The following @@ -166,8 +184,7 @@ class ReadClient : public Messaging::ExchangeDelegate * - Always be called exactly *once* for a given ReadClient instance. * - Be called even in error circumstances. * - Only be called after a successful call to SendRequest has been - * made, when the read completes or the subscription is shut down. - * + * made, when the read completes or the subscription is shut down. * * @param[in] apReadClient the ReadClient for the completed interaction. */ virtual void OnDone(ReadClient * apReadClient) = 0; @@ -277,8 +294,8 @@ class ReadClient : public Messaging::ExchangeDelegate return mInteractionType == InteractionType::Subscribe ? returnType(mSubscriptionId) : returnType::Missing(); } - FabricIndex GetFabricIndex() const { return mFabricIndex; } - NodeId GetPeerNodeId() const { return mPeerNodeId; } + FabricIndex GetFabricIndex() const { return mPeer.GetFabricIndex(); } + NodeId GetPeerNodeId() const { return mPeer.GetNodeId(); } bool IsReadType() { return mInteractionType == InteractionType::Read; } bool IsSubscriptionType() const { return mInteractionType == InteractionType::Subscribe; }; @@ -315,6 +332,32 @@ class ReadClient : public Messaging::ExchangeDelegate // that's the only case when the consumer moved a ReadParams into the client. CHIP_ERROR SendAutoResubscribeRequest(ReadPrepareParams && aReadPrepareParams); + // + // This provides a standard re-subscription policy implementation that given a termination cause, does the following: + // - Calculates the time till next subscription with fibonacci back-off (implemented by ComputeTimeTillNextSubscription()). + // - Schedules the next subscription attempt at the computed interval from the previous step. Operational discovery and + // CASE establishment will be attempted if aTerminationCause was CHIP_ERROR_TIMEOUT. In all other cases, it will attempt + // to re-use a previously established session. + // + CHIP_ERROR DefaultResubscribePolicy(CHIP_ERROR aTerminationCause); + + // + // Computes the time till the next re-subscription with millisecond resolution over + // an even increasing window following a fibonacci sequence with the current retry count + // used as input to the fibonacci algorithm. + // + // CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX is used as the maximum ceiling for that input. + // + uint32_t ComputeTimeTillNextSubscription(); + + // + // Schedules a re-subscription aTimeTillNextResubscriptionMs into the future. + // + // If restablishCASE is true, operational discovery and CASE will be attempted at that time before + // the actual IM interaction is initiated. + // + CHIP_ERROR ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, bool restablishCASE = true); + // Like SendSubscribeRequest, but allows sending certain forms of invalid // subscribe requests that servers are expected to reject, for testing // purposes. Should only be called from tests. @@ -405,13 +448,19 @@ class ReadClient : public Messaging::ExchangeDelegate * exchange and finally, signal to the application that it's * safe to release this object. * - * If aError != CHIP_NO_ERROR, it is delivered to the application through the OnError callback first. + * If aError != CHIP_NO_ERROR, this will trigger re-subscriptions if allowResubscription is true + * AND if this ReadClient instance is tracking a subscription AND the applications decides to do so + * in their implementation of Callback::OnResubscriptionNeeded(). * */ - void Close(CHIP_ERROR aError); + void Close(CHIP_ERROR aError, bool allowResubscription = true); void StopResubscription(); void ClearActiveSubscriptionState(); + + static void HandleDeviceConnected(void * context, OperationalDeviceProxy * device); + static void HandleDeviceConnectionFailure(void * context, PeerId peerId, CHIP_ERROR error); + CHIP_ERROR GetMinEventNumber(const ReadPrepareParams & aReadPrepareParams, Optional & aEventMin); Messaging::ExchangeManager * mpExchangeMgr = nullptr; @@ -425,11 +474,15 @@ class ReadClient : public Messaging::ExchangeDelegate uint16_t mMinIntervalFloorSeconds = 0; uint16_t mMaxInterval = 0; SubscriptionId mSubscriptionId = 0; - NodeId mPeerNodeId = kUndefinedNodeId; - FabricIndex mFabricIndex = kUndefinedFabricIndex; - InteractionType mInteractionType = InteractionType::Read; + ScopedNodeId mPeer; + InteractionType mInteractionType = InteractionType::Read; Timestamp mEventTimestamp; + bool mDoCaseOnNextResub = true; + + chip::Callback::Callback mOnConnectedCallback; + chip::Callback::Callback mOnConnectionFailureCallback; + ReadClient * mpNext = nullptr; InteractionModelEngine * mpImEngine = nullptr; ReadPrepareParams mReadPrepareParams; diff --git a/src/app/ReadPrepareParams.h b/src/app/ReadPrepareParams.h index 6d95b8b0b48a17..c37a2ef6e36eab 100644 --- a/src/app/ReadPrepareParams.h +++ b/src/app/ReadPrepareParams.h @@ -29,15 +29,6 @@ namespace chip { namespace app { -/** - * @brief Used to specify the re-subscription policy. Namely, the method is invoked and provided the number of - * retries that have occurred so far. - * - * aShouldResubscribe and aNextSubscriptionIntervalMsec are outparams indicating whether and how long into - * the future a re-subscription should happen. - */ -typedef void (*OnResubscribePolicyCB)(uint32_t aNumCumulativeRetries, uint32_t & aNextSubscriptionIntervalMsec, - bool & aShouldResubscribe); struct ReadPrepareParams { @@ -51,12 +42,11 @@ struct ReadPrepareParams Optional mEventNumber; // The timeout for waiting for the response or System::Clock::kZero to let the interaction model decide the timeout based on the // MRP timeouts of the session. - System::Clock::Timeout mTimeout = System::Clock::kZero; - uint16_t mMinIntervalFloorSeconds = 0; - uint16_t mMaxIntervalCeilingSeconds = 0; - bool mKeepSubscriptions = false; - bool mIsFabricFiltered = true; - OnResubscribePolicyCB mResubscribePolicy = nullptr; + System::Clock::Timeout mTimeout = System::Clock::kZero; + uint16_t mMinIntervalFloorSeconds = 0; + uint16_t mMaxIntervalCeilingSeconds = 0; + bool mKeepSubscriptions = false; + bool mIsFabricFiltered = true; ReadPrepareParams() {} ReadPrepareParams(const SessionHandle & sessionHandle) { mSessionHolder.Grab(sessionHandle); } @@ -78,7 +68,6 @@ struct ReadPrepareParams other.mEventPathParamsListSize = 0; other.mpAttributePathParamsList = nullptr; other.mAttributePathParamsListSize = 0; - mResubscribePolicy = other.mResubscribePolicy; } ReadPrepareParams & operator=(ReadPrepareParams && other) @@ -103,7 +92,6 @@ struct ReadPrepareParams other.mEventPathParamsListSize = 0; other.mpAttributePathParamsList = nullptr; other.mAttributePathParamsListSize = 0; - mResubscribePolicy = other.mResubscribePolicy; return *this; } }; diff --git a/src/app/clusters/operational-credentials-server/operational-credentials-server.cpp b/src/app/clusters/operational-credentials-server/operational-credentials-server.cpp index e7f71c0e3a67cf..65a87f35d7bdae 100644 --- a/src/app/clusters/operational-credentials-server/operational-credentials-server.cpp +++ b/src/app/clusters/operational-credentials-server/operational-credentials-server.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include diff --git a/src/app/server/Server.cpp b/src/app/server/Server.cpp index 0ea320dc9f0be8..fc7fd761ca8794 100644 --- a/src/app/server/Server.cpp +++ b/src/app/server/Server.cpp @@ -214,9 +214,6 @@ CHIP_ERROR Server::Init(const ServerInitParams & initParams) err = mUnsolicitedStatusHandler.Init(&mExchangeMgr); SuccessOrExit(err); - err = chip::app::InteractionModelEngine::GetInstance()->Init(&mExchangeMgr, &GetFabricTable()); - SuccessOrExit(err); - chip::Dnssd::Resolver::Instance().Init(DeviceLayer::UDPEndPointManager()); #if CHIP_CONFIG_ENABLE_SERVER_IM_EVENT @@ -291,6 +288,9 @@ CHIP_ERROR Server::Init(const ServerInitParams & initParams) mCertificateValidityPolicy, mGroupsProvider); SuccessOrExit(err); + err = chip::app::InteractionModelEngine::GetInstance()->Init(&mExchangeMgr, &GetFabricTable(), &mCASESessionManager); + SuccessOrExit(err); + // This code is necessary to restart listening to existing groups after a reboot // Each manufacturer needs to validate that they can rejoin groups by placing this code at the appropriate location for them // diff --git a/src/app/tests/BUILD.gn b/src/app/tests/BUILD.gn index 868f08a879aa7a..27e214eff09216 100644 --- a/src/app/tests/BUILD.gn +++ b/src/app/tests/BUILD.gn @@ -32,6 +32,7 @@ static_library("helpers") { deps = [ "${chip_root}/src/access", + "${chip_root}/src/app", "${chip_root}/src/lib/support", "${chip_root}/src/messaging/tests:helpers", "${chip_root}/src/transport/raw/tests:helpers", diff --git a/src/app/tests/integration/chip_im_initiator.cpp b/src/app/tests/integration/chip_im_initiator.cpp index b83c598bce4b14..4636e14bc3fc0f 100644 --- a/src/app/tests/integration/chip_im_initiator.cpp +++ b/src/app/tests/integration/chip_im_initiator.cpp @@ -146,7 +146,6 @@ class MockInteractionModelApp : public ::chip::app::CommandSender::Callback, } } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override {} void OnAttributeData(const chip::app::ConcreteDataAttributePath & aPath, chip::TLV::TLVReader * aData, const chip::app::StatusIB & status) override {} diff --git a/src/app/tests/suites/commands/interaction_model/InteractionModel.cpp b/src/app/tests/suites/commands/interaction_model/InteractionModel.cpp index 526106c56d4a07..1ccfb5eb9b6bb7 100644 --- a/src/app/tests/suites/commands/interaction_model/InteractionModel.cpp +++ b/src/app/tests/suites/commands/interaction_model/InteractionModel.cpp @@ -125,8 +125,6 @@ void InteractionModel::OnSubscriptionEstablished(SubscriptionId subscriptionId) ContinueOnChipMainThread(CHIP_NO_ERROR); } -void InteractionModel::OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) {} - /////////// WriteClient Callback Interface ///////// void InteractionModel::OnResponse(const WriteClient * client, const ConcreteDataAttributePath & path, StatusIB status) { diff --git a/src/app/tests/suites/commands/interaction_model/InteractionModel.h b/src/app/tests/suites/commands/interaction_model/InteractionModel.h index a5853fdd3ac560..f3a0820bd07f27 100644 --- a/src/app/tests/suites/commands/interaction_model/InteractionModel.h +++ b/src/app/tests/suites/commands/interaction_model/InteractionModel.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -405,7 +406,6 @@ class InteractionModel : public InteractionModelReports, void OnError(CHIP_ERROR error) override; void OnDone(chip::app::ReadClient * aReadClient) override; void OnSubscriptionEstablished(chip::SubscriptionId subscriptionId) override; - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override; /////////// WriteClient Callback Interface ///////// void OnResponse(const chip::app::WriteClient * client, const chip::app::ConcreteDataAttributePath & path, chip::app::StatusIB status) override; diff --git a/src/controller/CHIPDeviceControllerFactory.cpp b/src/controller/CHIPDeviceControllerFactory.cpp index ef9aaaa77ae72c..fdd1c11e8d294b 100644 --- a/src/controller/CHIPDeviceControllerFactory.cpp +++ b/src/controller/CHIPDeviceControllerFactory.cpp @@ -205,8 +205,6 @@ CHIP_ERROR DeviceControllerFactory::InitSystemState(FactoryInitParams params) InitDataModelHandler(stateParams.exchangeMgr); - ReturnErrorOnFailure(chip::app::InteractionModelEngine::GetInstance()->Init(stateParams.exchangeMgr, stateParams.fabricTable)); - ReturnErrorOnFailure(Dnssd::Resolver::Instance().Init(stateParams.udpEndPointManager)); if (params.enableServerInteractions) @@ -266,6 +264,9 @@ CHIP_ERROR DeviceControllerFactory::InitSystemState(FactoryInitParams params) stateParams.caseSessionManager = Platform::New(); ReturnErrorOnFailure(stateParams.caseSessionManager->Init(stateParams.systemLayer, sessionManagerConfig)); + ReturnErrorOnFailure(chip::app::InteractionModelEngine::GetInstance()->Init(stateParams.exchangeMgr, stateParams.fabricTable, + stateParams.caseSessionManager)); + // store the system state mSystemState = chip::Platform::New(std::move(stateParams)); mSystemState->SetTempFabricTable(tempFabricTable); diff --git a/src/controller/TypedReadCallback.h b/src/controller/TypedReadCallback.h index 29fe831ae93a4e..1ea747737861fb 100644 --- a/src/controller/TypedReadCallback.h +++ b/src/controller/TypedReadCallback.h @@ -109,12 +109,16 @@ class TypedReadAttributeCallback final : public app::ReadClient::Callback } } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override + CHIP_ERROR OnResubscriptionNeeded(chip::app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override { + ReturnErrorOnFailure(app::ReadClient::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause)); + if (mOnResubscriptionAttempt) { - mOnResubscriptionAttempt(*mReadClient.get(), aTerminationCause, aNextResubscribeIntervalMsec); + mOnResubscriptionAttempt(*mReadClient.get(), aTerminationCause, apReadClient->ComputeTimeTillNextSubscription()); } + + return CHIP_NO_ERROR; } void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override @@ -214,12 +218,16 @@ class TypedReadEventCallback final : public app::ReadClient::Callback } } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override + CHIP_ERROR OnResubscriptionNeeded(chip::app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override { + ReturnErrorOnFailure(app::ReadClient::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause)); + if (mOnResubscriptionAttempt) { - mOnResubscriptionAttempt(*mReadClient.get(), aTerminationCause, aNextResubscribeIntervalMsec); + mOnResubscriptionAttempt(*mReadClient.get(), aTerminationCause, apReadClient->ComputeTimeTillNextSubscription()); } + + return CHIP_NO_ERROR; } OnSuccessCallbackType mOnSuccess; diff --git a/src/controller/python/chip/clusters/attribute.cpp b/src/controller/python/chip/clusters/attribute.cpp index e4480a3bb1e059..6472aedeaacabf 100644 --- a/src/controller/python/chip/clusters/attribute.cpp +++ b/src/controller/python/chip/clusters/attribute.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -144,9 +145,12 @@ class ReadClientCallback : public ReadClient::Callback gOnSubscriptionEstablishedCallback(mAppContext, aSubscriptionId); } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override + CHIP_ERROR OnResubscriptionNeeded(ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override { - gOnResubscriptionAttemptedCallback(mAppContext, aTerminationCause.AsInteger(), aNextResubscribeIntervalMsec); + ReturnErrorOnFailure(ReadClient::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause)); + gOnResubscriptionAttemptedCallback(mAppContext, aTerminationCause.AsInteger(), + apReadClient->ComputeTimeTillNextSubscription()); + return CHIP_NO_ERROR; } void OnEventData(const EventHeader & aEventHeader, TLV::TLVReader * apData, const StatusIB * apStatus) override @@ -458,7 +462,6 @@ chip::ChipError::StorageType pychip_ReadClient_Read(void * appContext, ReadClien params.mMinIntervalFloorSeconds = pyParams.minInterval; params.mMaxIntervalCeilingSeconds = pyParams.maxInterval; params.mKeepSubscriptions = pyParams.keepSubscriptions; - params.mResubscribePolicy = PythonResubscribePolicy; dataVersionFilters.release(); attributePaths.release(); diff --git a/src/controller/tests/TestEventChunking.cpp b/src/controller/tests/TestEventChunking.cpp index 5dc88eb8dcd434..f3466fcbeae04b 100644 --- a/src/controller/tests/TestEventChunking.cpp +++ b/src/controller/tests/TestEventChunking.cpp @@ -157,8 +157,6 @@ class TestReadCallback : public app::ReadClient::Callback void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override { mOnSubscriptionEstablished = true; } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override {} - uint32_t mAttributeCount = 0; uint32_t mEventCount = 0; bool mOnReportEnd = false; diff --git a/src/controller/tests/TestReadChunking.cpp b/src/controller/tests/TestReadChunking.cpp index 7b7f4247b86781..060b08e189ecd7 100644 --- a/src/controller/tests/TestReadChunking.cpp +++ b/src/controller/tests/TestReadChunking.cpp @@ -136,8 +136,6 @@ class TestReadCallback : public app::ReadClient::Callback void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override { mOnSubscriptionEstablished = true; } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override {} - uint32_t mAttributeCount = 0; bool mOnReportEnd = false; bool mOnSubscriptionEstablished = false; @@ -306,7 +304,6 @@ class TestMutableReadCallback : public app::ReadClient::Callback void OnReportEnd() override { mOnReportEnd = true; } void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override { mOnSubscriptionEstablished = true; } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override {} uint32_t mAttributeCount = 0; // We record every dataversion field from every attribute IB. diff --git a/src/controller/tests/data_model/TestRead.cpp b/src/controller/tests/data_model/TestRead.cpp index b26c85eb187ec2..5c13c979aaf383 100644 --- a/src/controller/tests/data_model/TestRead.cpp +++ b/src/controller/tests/data_model/TestRead.cpp @@ -1436,6 +1436,12 @@ void TestReadInteraction::TestReadAttributeTimeout(nlTestSuite * apSuite, void * NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); } +#if 0 +// +// This test isn't actually possible since the underlying re-subscription layer automatically attempts to re-establish CASE, which isn't +// possible today with our current test infra machinery. +// + // After client initiated subscription request, test expire session so that subscription fails to establish, and trigger the timeout // error. Client would automatically try to resubscribe and bump the value for numResubscriptionAttemptedCalls. void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext) @@ -1498,6 +1504,7 @@ void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, v NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); } +#endif void TestReadInteraction::TestReadHandler_MultipleSubscriptions(nlTestSuite * apSuite, void * apContext) { @@ -4132,7 +4139,6 @@ const nlTest sTests[] = NL_TEST_DEF("TestReadHandler_TwoSubscribesMultipleReads", TestReadInteraction::TestReadHandler_TwoSubscribesMultipleReads), NL_TEST_DEF("TestReadHandlerResourceExhaustion_MultipleReads", TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleReads), NL_TEST_DEF("TestReadAttributeTimeout", TestReadInteraction::TestReadAttributeTimeout), - NL_TEST_DEF("TestSubscribeAttributeTimeout", TestReadInteraction::TestSubscribeAttributeTimeout), NL_TEST_DEF("TestReadHandler_SubscriptionReportingIntervalsTest1", TestReadInteraction::TestReadHandler_SubscriptionReportingIntervalsTest1), NL_TEST_DEF("TestReadHandler_SubscriptionReportingIntervalsTest2", TestReadInteraction::TestReadHandler_SubscriptionReportingIntervalsTest2), NL_TEST_DEF("TestReadHandler_SubscriptionReportingIntervalsTest3", TestReadInteraction::TestReadHandler_SubscriptionReportingIntervalsTest3), diff --git a/src/transport/SessionManager.h b/src/transport/SessionManager.h index 095c6d6dd60c5f..029c80480527ef 100644 --- a/src/transport/SessionManager.h +++ b/src/transport/SessionManager.h @@ -126,6 +126,8 @@ class DLL_EXPORT SessionManager : public TransportMgrDelegate, public FabricTabl SessionManager(); ~SessionManager() override; + FabricTable * GetFabricTable() const { return mFabricTable; } + /** * @brief * This function takes the payload and returns an encrypted message which can be sent multiple times. From 204ef18fb96cc5dc10981462216a0e3157e71bae Mon Sep 17 00:00:00 2001 From: Jerry Johns Date: Thu, 30 Jun 2022 10:52:30 -0700 Subject: [PATCH 02/10] Review feedback WIP --- src/app/InteractionModelEngine.cpp | 4 +++ src/app/InteractionModelEngine.h | 7 ++++ src/app/OperationalDeviceProxy.cpp | 2 -- src/app/ReadClient.cpp | 32 ++++++++++--------- src/app/ReadClient.h | 4 +-- .../operational-credentials-server.cpp | 1 - src/transport/SessionManager.h | 2 -- 7 files changed, 30 insertions(+), 22 deletions(-) diff --git a/src/app/InteractionModelEngine.cpp b/src/app/InteractionModelEngine.cpp index 97380ab253e120..538e4edf95d209 100644 --- a/src/app/InteractionModelEngine.cpp +++ b/src/app/InteractionModelEngine.cpp @@ -125,6 +125,10 @@ void InteractionModelEngine::Shutdown() mEventPathPool.ReleaseAll(); mDataVersionFilterPool.ReleaseAll(); mpExchangeMgr->UnregisterUnsolicitedMessageHandlerForProtocol(Protocols::InteractionModel::Id); + + mpCASESessionMgr = nullptr; + mpFabricTable = nullptr; + mpExchangeMgr = nullptr; } uint32_t InteractionModelEngine::GetNumActiveReadHandlers() const diff --git a/src/app/InteractionModelEngine.h b/src/app/InteractionModelEngine.h index 41cef9eccb44aa..d3fa606545141c 100644 --- a/src/app/InteractionModelEngine.h +++ b/src/app/InteractionModelEngine.h @@ -118,8 +118,15 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler, void Shutdown(); Messaging::ExchangeManager * GetExchangeManager(void) const { return mpExchangeMgr; } + + /** + * Returns a pointer to the CASESessionManager. This can return nullptr if one wasn't + * provided in the call to Init(). + */ CASESessionManager * GetCASESessionManager() const { return mpCASESessionMgr; } + FabricTable * GetFabricTable() const { return mpFabricTable; } + /** * Tears down an active subscription. * diff --git a/src/app/OperationalDeviceProxy.cpp b/src/app/OperationalDeviceProxy.cpp index 96c650f299ad1b..d4903302d32cad 100644 --- a/src/app/OperationalDeviceProxy.cpp +++ b/src/app/OperationalDeviceProxy.cpp @@ -27,9 +27,7 @@ #include #include -#include #include -#include #include #include diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp index 970e484c50a4bc..5ba8033e8da4fc 100644 --- a/src/app/ReadClient.cpp +++ b/src/app/ReadClient.cpp @@ -775,7 +775,7 @@ CHIP_ERROR ReadClient::RefreshLivenessCheckTimer() ChipLogProgress( DataManagement, "Refresh LivenessCheckTime for %lu milliseconds with SubscriptionId = 0x%08" PRIx32 " Peer = %02x:" ChipLogFormatX64, - static_cast(timeout.count()), mSubscriptionId, mPeer.GetFabricIndex(), ChipLogValueX64(mPeer.GetNodeId())); + static_cast(timeout.count()), mSubscriptionId, GetFabricIndex(), ChipLogValueX64(GetPeerNodeId())); err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer( timeout, OnLivenessTimeoutCallback, this); @@ -812,7 +812,7 @@ void ReadClient::OnLivenessTimeoutCallback(System::Layer * apSystemLayer, void * ChipLogError(DataManagement, "Subscription Liveness timeout with SubscriptionID = 0x%08" PRIx32 ", Peer = %02x:" ChipLogFormatX64, - _this->mSubscriptionId, _this->mPeer.GetFabricIndex(), ChipLogValueX64(_this->mPeer.GetNodeId())); + _this->mSubscriptionId, _this->GetFabricIndex(), ChipLogValueX64(_this->GetPeerNodeId())); // TODO: add a more specific error here for liveness timeout failure to distinguish between other classes of timeouts (i.e // response timeouts). @@ -839,8 +839,7 @@ CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aP ChipLogProgress(DataManagement, "Subscription established with SubscriptionID = 0x%08" PRIx32 " MinInterval = %u" "s MaxInterval = %us Peer = %02x:" ChipLogFormatX64, - mSubscriptionId, mMinIntervalFloorSeconds, mMaxInterval, mPeer.GetFabricIndex(), - ChipLogValueX64(mPeer.GetNodeId())); + mSubscriptionId, mMinIntervalFloorSeconds, mMaxInterval, GetFabricIndex(), ChipLogValueX64(GetPeerNodeId())); ReturnErrorOnFailure(subscribeResponse.ExitContainer()); @@ -990,7 +989,6 @@ void ReadClient::HandleDeviceConnected(void * context, OperationalDeviceProxy * ReadClient * const _this = static_cast(context); VerifyOrDie(_this != nullptr); - _this->mReadPrepareParams.mSessionHolder.Release(); _this->mReadPrepareParams.mSessionHolder.Grab(device->GetSecureSession().Value()); auto err = _this->SendSubscribeRequest(_this->mReadPrepareParams); @@ -1005,19 +1003,18 @@ void ReadClient::HandleDeviceConnectionFailure(void * context, PeerId peerId, CH ReadClient * const _this = static_cast(context); VerifyOrDie(_this != nullptr); - ChipLogError(DataManagement, "Failed connecting to device with error '%s'", err.AsString()); + ChipLogError(DataManagement, "Failed to establish CASE for re-subscription with error '%" CHIP_ERROR_FORMAT "'", err.Format()); auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager(); VerifyOrDie(caseSessionManager != nullptr); - caseSessionManager->ReleaseSession(peerId); _this->Close(err); } void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void * apAppState) { ReadClient * const _this = static_cast(apAppState); - assert(_this != nullptr); + VerifyOrDie(_this != nullptr); CHIP_ERROR err; @@ -1027,15 +1024,18 @@ void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void if (_this->mDoCaseOnNextResub) { auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager(); - VerifyOrDie(caseSessionManager != nullptr); + VerifyOrExit(caseSessionManager != nullptr, err = CHIP_ERROR_INCORRECT_STATE); + + auto fabric = InteractionModelEngine::GetInstance()->GetFabricTable()->FindFabricWithIndex(_this->GetFabricIndex()); - auto fabric = - InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->GetFabricTable()->FindFabricWithIndex( - _this->mPeer.GetFabricIndex()); + // + // Temporary until #21084 is addressed. This object would have been synchronously cleaned-up + // when a fabric has gone away, and this condition should never arise. + // VerifyOrExit(fabric != nullptr, err = CHIP_ERROR_INVALID_FABRIC_INDEX; ChipLogError(DataManagement, "Underlying fabric has gone away, stopping re-subscriptions!");); - PeerId peerId(fabric->GetCompressedFabricId(), _this->mPeer.GetNodeId()); + PeerId peerId(fabric->GetCompressedFabricId(), _this->GetPeerNodeId()); auto proxy = caseSessionManager->FindExistingSession(peerId); if (proxy != nullptr) @@ -1053,10 +1053,12 @@ void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void if (err != CHIP_NO_ERROR) { // - // Call Close (which should trigger re-subscription again) EXCEPT if we got here because we didn't have a valid fabric. + // Call Close (which should trigger re-subscription again) EXCEPT if we got here because we didn't have a valid fabric, + // or an invalid CASESessionManager pointer when mDoCaseOnNextResub was true. + // // In that case, don't permit re-subscription to occur. // - _this->Close(err, err != CHIP_ERROR_INVALID_FABRIC_INDEX); + _this->Close(err, err != CHIP_ERROR_INVALID_FABRIC_INDEX && err != CHIP_ERROR_INCORRECT_STATE); } } diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h index 462b81820e84dd..1552ebebf70a8f 100644 --- a/src/app/ReadClient.h +++ b/src/app/ReadClient.h @@ -184,7 +184,7 @@ class ReadClient : public Messaging::ExchangeDelegate * - Always be called exactly *once* for a given ReadClient instance. * - Be called even in error circumstances. * - Only be called after a successful call to SendRequest has been - * made, when the read completes or the subscription is shut down. * + * made, when the read completes or the subscription is shut down. * @param[in] apReadClient the ReadClient for the completed interaction. */ virtual void OnDone(ReadClient * apReadClient) = 0; @@ -343,7 +343,7 @@ class ReadClient : public Messaging::ExchangeDelegate // // Computes the time till the next re-subscription with millisecond resolution over - // an even increasing window following a fibonacci sequence with the current retry count + // an ever increasing window following a fibonacci sequence with the current retry count // used as input to the fibonacci algorithm. // // CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX is used as the maximum ceiling for that input. diff --git a/src/app/clusters/operational-credentials-server/operational-credentials-server.cpp b/src/app/clusters/operational-credentials-server/operational-credentials-server.cpp index 65a87f35d7bdae..e7f71c0e3a67cf 100644 --- a/src/app/clusters/operational-credentials-server/operational-credentials-server.cpp +++ b/src/app/clusters/operational-credentials-server/operational-credentials-server.cpp @@ -30,7 +30,6 @@ #include #include #include -#include #include #include #include diff --git a/src/transport/SessionManager.h b/src/transport/SessionManager.h index 029c80480527ef..095c6d6dd60c5f 100644 --- a/src/transport/SessionManager.h +++ b/src/transport/SessionManager.h @@ -126,8 +126,6 @@ class DLL_EXPORT SessionManager : public TransportMgrDelegate, public FabricTabl SessionManager(); ~SessionManager() override; - FabricTable * GetFabricTable() const { return mFabricTable; } - /** * @brief * This function takes the payload and returns an encrypted message which can be sent multiple times. From e79be450e4304097ddc20f12819bb8ec86b37841 Mon Sep 17 00:00:00 2001 From: Jerry Johns Date: Sun, 24 Jul 2022 15:38:14 -0700 Subject: [PATCH 03/10] Further fixes --- src/app/ReadClient.cpp | 33 ++++- src/app/ReadClient.h | 33 ++++- src/controller/tests/data_model/TestRead.cpp | 141 ++++++++++++------- 3 files changed, 147 insertions(+), 60 deletions(-) diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp index 0d9e3af6585264..da64439fdc5255 100644 --- a/src/app/ReadClient.cpp +++ b/src/app/ReadClient.cpp @@ -22,6 +22,7 @@ * */ +#include "system/SystemClock.h" #include #include #include @@ -115,11 +116,20 @@ uint32_t ReadClient::ComputeTimeTillNextSubscription() return waitTimeInMsec; } -CHIP_ERROR ReadClient::ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, bool establishCASE) +CHIP_ERROR ReadClient::ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, Optional aNewSessionHandle, bool aReestablishCASE) { VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE); - mDoCaseOnNextResub = establishCASE; + // + // If we're establishing CASE, make sure we not provided a new SessionHandle as well. + // + VerifyOrReturnError(!aReestablishCASE || !aNewSessionHandle.HasValue(), CHIP_ERROR_INVALID_ARGUMENT); + + if (aNewSessionHandle.HasValue()) { + mReadPrepareParams.mSessionHolder.Grab(aNewSessionHandle.Value()); + } + + mDoCaseOnNextResub = aReestablishCASE; ReturnErrorOnFailure( InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer( @@ -147,6 +157,7 @@ void ReadClient::Close(CHIP_ERROR aError, bool allowResubscription) // We infer that re-subscription was requested by virtue of having a non-zero list of event OR attribute paths present // in mReadPrepareParams. This would only be the case if an application called SendAutoResubscribeRequest which // populates mReadPrepareParams with the values provided by the application. + // if (allowResubscription && (mReadPrepareParams.mEventPathParamsListSize != 0 || mReadPrepareParams.mAttributePathParamsListSize != 0)) { @@ -541,7 +552,7 @@ CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload) exit: if (IsSubscriptionType()) { - if (IsAwaitingInitialReport()) + if (IsAwaitingInitialReport() || IsAwaitingSubscribeResponse()) { MoveToState(ClientState::AwaitingSubscribeResponse); } @@ -715,6 +726,11 @@ CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsRea return err; } +void ReadClient::OverrideLivenessTimeout(System::Clock::Timeout aLivenessTimeout) +{ + mLivenessTimeoutOverride = aLivenessTimeout; +} + CHIP_ERROR ReadClient::RefreshLivenessCheckTimer() { CHIP_ERROR err = CHIP_NO_ERROR; @@ -724,7 +740,14 @@ CHIP_ERROR ReadClient::RefreshLivenessCheckTimer() VerifyOrReturnError(mExchange, CHIP_ERROR_INCORRECT_STATE); VerifyOrReturnError(mExchange->HasSessionHandle(), CHIP_ERROR_INCORRECT_STATE); - System::Clock::Timeout timeout = System::Clock::Seconds16(mMaxInterval) + mExchange->GetSessionHandle()->GetAckTimeout(); + System::Clock::Timeout timeout; + + if (mLivenessTimeoutOverride != System::Clock::kZero) { + timeout = mLivenessTimeoutOverride; + } + else { + timeout = System::Clock::Seconds16(mMaxInterval) + mExchange->GetSessionHandle()->GetAckTimeout(); + } // EFR32/MBED/INFINION/K32W's chrono count return long unsinged, but other platform returns unsigned ChipLogProgress( @@ -937,7 +960,7 @@ CHIP_ERROR ReadClient::DefaultResubscribePolicy(CHIP_ERROR aTerminationCause) "ms due to error %" CHIP_ERROR_FORMAT, GetFabricIndex(), ChipLogValueX64(GetPeerNodeId()), mNumRetries, timeTillNextResubscription, aTerminationCause.Format()); - ReturnErrorOnFailure(ScheduleResubscription(timeTillNextResubscription, aTerminationCause == CHIP_ERROR_TIMEOUT)); + ReturnErrorOnFailure(ScheduleResubscription(timeTillNextResubscription, NullOptional, aTerminationCause == CHIP_ERROR_TIMEOUT)); return CHIP_NO_ERROR; } diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h index 52ddc40aba635c..8dc188ed94cfd8 100644 --- a/src/app/ReadClient.h +++ b/src/app/ReadClient.h @@ -23,6 +23,7 @@ */ #pragma once +#include "system/SystemClock.h" #include #include #include @@ -56,8 +57,12 @@ class InteractionModelEngine; /** * @class ReadClient * - * @brief The read client represents the initiator side of a Read Interaction, and is responsible - * for generating one Read Request for a particular set of attributes and/or events, and handling the Report Data response. + * @brief The read client represents the initiator side of a Read Or Subscribe Interaction (depending on the APIs invoked). + * + * When used to manage subscriptions, the client provides functionality to automatically re-subscribe as needed, + * including re-establishing CASE under certain conditions (see Callback::OnResubscriptionNeeded for more info). + * This is the default behavior. A consumer can completely opt-out of this behavior by over-riding Callback::OnResubscriptionNeeded + * and providing an alternative implementation. * */ class ReadClient : public Messaging::ExchangeDelegate @@ -352,12 +357,20 @@ class ReadClient : public Messaging::ExchangeDelegate uint32_t ComputeTimeTillNextSubscription(); // - // Schedules a re-subscription aTimeTillNextResubscriptionMs into the future. + // Schedules a re-subscription aTimeTillNextResubscriptionMs into the future. // - // If restablishCASE is true, operational discovery and CASE will be attempted at that time before + // If an application wants to setup CASE on their own, they should call ComputeTimeTillNextSubscription() to compute the next interval + // at which they should attempt CASE and attempt CASE at that time. On successful CASE establishment, this method should be called with + // the new SessionHandle provided through 'aNewSessionHandle', 'aTimeTillNextResubscriptionMs' set to 0 (i.e re-subscribe immediately) and + // 'aReestablishCASE' set to false. + // + // Otherwise, if aReestablishCASE is true, operational discovery and CASE will be attempted at that time before // the actual IM interaction is initiated. // - CHIP_ERROR ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, bool restablishCASE = true); + // aReestablishCASE SHALL NOT be set to true if a valid SessionHandle is provided through newSessionHandle. + // + // + CHIP_ERROR ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, Optional aNewSessionHandle, bool aReestablishCASE); // Like SendSubscribeRequest, but allows sending certain forms of invalid // subscribe requests that servers are expected to reject, for testing @@ -369,6 +382,14 @@ class ReadClient : public Messaging::ExchangeDelegate } #endif // CONFIG_BUILD_FOR_HOST_UNIT_TEST + // + // Override the interval at which liveness of the subscription is assessed. + // By default, this is set set to the max interval of the subscription + ACK timeout of the underlying session. + // + // This can be called at any time. + // + void OverrideLivenessTimeout(System::Clock::Timeout aLivenessTimeout); + private: friend class TestReadInteraction; friend class InteractionModelEngine; @@ -488,6 +509,8 @@ class ReadClient : public Messaging::ExchangeDelegate ReadPrepareParams mReadPrepareParams; uint32_t mNumRetries = 0; + System::Clock::Timeout mLivenessTimeoutOverride = System::Clock::kZero; + // End Of Container (0x18) uses one byte. static constexpr uint16_t kReservedSizeForEndOfContainer = 1; // Reserved size for the uint8_t InteractionModelRevision flag, which takes up 1 byte for the control tag and 1 byte for the diff --git a/src/controller/tests/data_model/TestRead.cpp b/src/controller/tests/data_model/TestRead.cpp index 6219cce5fbc8ca..74192e63920ee5 100644 --- a/src/controller/tests/data_model/TestRead.cpp +++ b/src/controller/tests/data_model/TestRead.cpp @@ -16,6 +16,7 @@ * limitations under the License. */ +#include "system/SystemClock.h" #include "transport/SecureSession.h" #include #include @@ -1472,75 +1473,114 @@ void TestReadInteraction::TestReadAttributeTimeout(nlTestSuite * apSuite, void * NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); } -#if 0 +class TestResubscriptionCallback : public app::ReadClient::Callback +{ +public: + TestResubscriptionCallback() { } + + void SetReadClient(app::ReadClient *apReadClient) { + mpReadClient = apReadClient; + } + + void OnDone(app::ReadClient *) override { mOnDone++; } + + void OnError(CHIP_ERROR aError) override + { + mOnError++; + mLastError = aError; + } + + void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override { + mOnSubscriptionEstablishedCount++; + + // + // Set the liveness timeout to a super small number that isn't 0 to + // force the liveness timeout to fire. + // + mpReadClient->OverrideLivenessTimeout(System::Clock::Milliseconds32(10)); + } + + CHIP_ERROR OnResubscriptionNeeded(app::ReadClient *apReadClient, CHIP_ERROR aTerminationCause) override + { + mOnResubscriptionsAttempted++; + return apReadClient->ScheduleResubscription(apReadClient->ComputeTimeTillNextSubscription(), NullOptional, false); + } + + void ClearCounters() + { + mOnSubscriptionEstablishedCount = 0; + mOnDone = 0; + mOnError = 0; + mOnResubscriptionsAttempted = 0; + mLastError = CHIP_NO_ERROR; + } + + int32_t mAttributeCount = 0; + int32_t mOnReportEnd = 0; + int32_t mOnSubscriptionEstablishedCount = 0; + int32_t mOnResubscriptionsAttempted = 0; + int32_t mOnDone = 0; + int32_t mOnError = 0; + CHIP_ERROR mLastError = CHIP_NO_ERROR; + app::ReadClient *mpReadClient = nullptr; +}; + +// +// This validates the re-subscription logic within ReadClient. This achieves it by overriding the timeout for the liveness +// timer within ReadClient to be a smaller value than the nominal max interval of the subscription. This causes the +// subscription to fail on the client side, triggering re-subscription. +// +// TODO: This does not validate the CASE establishment pathways since we're limited by the PASE-centric TestContext. // -// This test isn't actually possible since the underlying re-subscription layer automatically attempts to re-establish CASE, which isn't -// possible today with our current test infra machinery. // - -// After client initiated subscription request, test expire session so that subscription fails to establish, and trigger the timeout -// error. Client would automatically try to resubscribe and bump the value for numResubscriptionAttemptedCalls. void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext) { TestContext & ctx = *static_cast(apContext); auto sessionHandle = ctx.GetSessionBobToAlice(); - bool onSuccessCbInvoked = false, onFailureCbInvoked = false; - responseDirective = kSendDataError; - uint32_t numSubscriptionEstablishedCalls = 0, numResubscriptionAttemptedCalls = 0; - // Passing of stack variables by reference is only safe because of synchronous completion of the interaction. Otherwise, it's - // not safe to do so. - auto onSuccessCb = [&onSuccessCbInvoked](const app::ConcreteDataAttributePath & attributePath, const auto & dataResponse) { - onSuccessCbInvoked = true; - }; - // Passing of stack variables by reference is only safe because of synchronous completion of the interaction. Otherwise, it's - // not safe to do so. - auto onFailureCb = [&onFailureCbInvoked, apSuite](const app::ConcreteDataAttributePath * attributePath, CHIP_ERROR aError) { - NL_TEST_ASSERT(apSuite, aError == CHIP_ERROR_TIMEOUT); - onFailureCbInvoked = true; - }; - - auto onSubscriptionEstablishedCb = [&numSubscriptionEstablishedCalls](const app::ReadClient & readClient) { - numSubscriptionEstablishedCalls++; - }; + { + TestResubscriptionCallback callback; + app::ReadClient readClient(app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), callback, app::ReadClient::InteractionType::Subscribe); - auto onSubscriptionAttemptedCb = [&numResubscriptionAttemptedCalls](const app::ReadClient & readClient, CHIP_ERROR aError, - uint32_t aNextResubscribeIntervalMsec) { - numResubscriptionAttemptedCalls++; - }; + callback.SetReadClient(&readClient); - Controller::SubscribeAttribute( - &ctx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb, 0, 20, onSubscriptionEstablishedCb, - onSubscriptionAttemptedCb, false, true); + app::ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice()); - ctx.ExpireSessionAliceToBob(); + // Read full wildcard paths, repeat twice to ensure chunking. + app::AttributePathParams attributePathParams[1]; + readPrepareParams.mpAttributePathParamsList = attributePathParams; + readPrepareParams.mAttributePathParamsListSize = ArraySize(attributePathParams); - ctx.DrainAndServiceIO(); + attributePathParams[0].mClusterId = app::Clusters::TestCluster::Id; + attributePathParams[0].mAttributeId = app::Clusters::TestCluster::Attributes::Boolean::Id; - NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 1); + readPrepareParams.mMaxIntervalCeilingSeconds = 1; - ctx.ExpireSessionBobToAlice(); + readClient.SendAutoResubscribeRequest(std::move(readPrepareParams)); - ctx.DrainAndServiceIO(); - - NL_TEST_ASSERT(apSuite, - !onSuccessCbInvoked && !onFailureCbInvoked && numSubscriptionEstablishedCalls == 0 && - numResubscriptionAttemptedCalls == 1); - - NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); + // + // Drive servicing IO till we have established a subscription at least 2 times. + // + ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(2), [&]() { + return callback.mOnSubscriptionEstablishedCount > 1; + }); - NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadHandlers() == 0); + NL_TEST_ASSERT(apSuite, callback.mOnDone == 0); - // - // Let's put back the sessions so that the next tests (which assume a valid initialized set of sessions) - // can function correctly. - // - ctx.CreateSessionAliceToBob(); - ctx.CreateSessionBobToAlice(); + // + // With re-sub enabled, we shouldn't encounter any errors. + // + NL_TEST_ASSERT(apSuite, callback.mOnError == 0); + // + // We should have attempted just one re-subscription. + // + NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 1); + } + + app::InteractionModelEngine::GetInstance()->ShutdownActiveReads(); NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); } -#endif void TestReadInteraction::TestReadHandler_MultipleSubscriptions(nlTestSuite * apSuite, void * apContext) { @@ -4295,6 +4335,7 @@ const nlTest sTests[] = NL_TEST_DEF("TestReadAttribute_ManyDataValues", TestReadInteraction::TestReadAttribute_ManyDataValues), NL_TEST_DEF("TestReadAttribute_ManyDataValuesWrongPath", TestReadInteraction::TestReadAttribute_ManyDataValuesWrongPath), NL_TEST_DEF("TestReadAttribute_ManyErrors", TestReadInteraction::TestReadAttribute_ManyErrors), + NL_TEST_DEF("TestSubscribeAttributeTimeout", TestReadInteraction::TestSubscribeAttributeTimeout), NL_TEST_SENTINEL() }; // clang-format on From 816fc949ed125b7940f91f5dc6b4a2292f030cc7 Mon Sep 17 00:00:00 2001 From: Jerry Johns Date: Mon, 25 Jul 2022 13:30:22 -0700 Subject: [PATCH 04/10] Added a Python test to validate re-subscription --- src/app/ReadClient.cpp | 23 ++++---- src/app/ReadClient.h | 19 +++---- src/app/ReadHandler.cpp | 3 ++ .../operational-credentials-server.cpp | 1 + .../python/chip/clusters/Attribute.py | 40 ++++++++++++-- .../python/chip/clusters/attribute.cpp | 7 +++ .../python/test/test_scripts/base.py | 52 +++++++++++++++++++ .../test/test_scripts/mobile-device-test.py | 4 ++ src/controller/tests/data_model/TestRead.cpp | 35 ++++++------- 9 files changed, 143 insertions(+), 41 deletions(-) diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp index da64439fdc5255..41dc001b61d1a6 100644 --- a/src/app/ReadClient.cpp +++ b/src/app/ReadClient.cpp @@ -38,8 +38,8 @@ namespace app { ReadClient::ReadClient(InteractionModelEngine * apImEngine, Messaging::ExchangeManager * apExchangeMgr, Callback & apCallback, InteractionType aInteractionType) : mExchange(*this), - mpCallback(apCallback), - mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this) + mpCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this), + mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this) { // Error if already initialized. mpExchangeMgr = apExchangeMgr; @@ -116,7 +116,8 @@ uint32_t ReadClient::ComputeTimeTillNextSubscription() return waitTimeInMsec; } -CHIP_ERROR ReadClient::ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, Optional aNewSessionHandle, bool aReestablishCASE) +CHIP_ERROR ReadClient::ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, Optional aNewSessionHandle, + bool aReestablishCASE) { VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE); @@ -125,7 +126,8 @@ CHIP_ERROR ReadClient::ScheduleResubscription(uint32_t aTimeTillNextResubscripti // VerifyOrReturnError(!aReestablishCASE || !aNewSessionHandle.HasValue(), CHIP_ERROR_INVALID_ARGUMENT); - if (aNewSessionHandle.HasValue()) { + if (aNewSessionHandle.HasValue()) + { mReadPrepareParams.mSessionHolder.Grab(aNewSessionHandle.Value()); } @@ -729,6 +731,7 @@ CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsRea void ReadClient::OverrideLivenessTimeout(System::Clock::Timeout aLivenessTimeout) { mLivenessTimeoutOverride = aLivenessTimeout; + RefreshLivenessCheckTimer(); } CHIP_ERROR ReadClient::RefreshLivenessCheckTimer() @@ -737,16 +740,16 @@ CHIP_ERROR ReadClient::RefreshLivenessCheckTimer() CancelLivenessCheckTimer(); - VerifyOrReturnError(mExchange, CHIP_ERROR_INCORRECT_STATE); - VerifyOrReturnError(mExchange->HasSessionHandle(), CHIP_ERROR_INCORRECT_STATE); - System::Clock::Timeout timeout; - if (mLivenessTimeoutOverride != System::Clock::kZero) { + if (mLivenessTimeoutOverride != System::Clock::kZero) + { timeout = mLivenessTimeoutOverride; } - else { - timeout = System::Clock::Seconds16(mMaxInterval) + mExchange->GetSessionHandle()->GetAckTimeout(); + else + { + VerifyOrReturnError(mReadPrepareParams.mSessionHolder, CHIP_ERROR_INCORRECT_STATE); + timeout = System::Clock::Seconds16(mMaxInterval) + mReadPrepareParams.mSessionHolder->GetAckTimeout(); } // EFR32/MBED/INFINION/K32W's chrono count return long unsinged, but other platform returns unsigned diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h index 8dc188ed94cfd8..8419a4855c0b42 100644 --- a/src/app/ReadClient.h +++ b/src/app/ReadClient.h @@ -58,11 +58,11 @@ class InteractionModelEngine; * @class ReadClient * * @brief The read client represents the initiator side of a Read Or Subscribe Interaction (depending on the APIs invoked). - * + * * When used to manage subscriptions, the client provides functionality to automatically re-subscribe as needed, * including re-establishing CASE under certain conditions (see Callback::OnResubscriptionNeeded for more info). - * This is the default behavior. A consumer can completely opt-out of this behavior by over-riding Callback::OnResubscriptionNeeded - * and providing an alternative implementation. + * This is the default behavior. A consumer can completely opt-out of this behavior by over-riding + * Callback::OnResubscriptionNeeded and providing an alternative implementation. * */ class ReadClient : public Messaging::ExchangeDelegate @@ -357,12 +357,12 @@ class ReadClient : public Messaging::ExchangeDelegate uint32_t ComputeTimeTillNextSubscription(); // - // Schedules a re-subscription aTimeTillNextResubscriptionMs into the future. + // Schedules a re-subscription aTimeTillNextResubscriptionMs into the future. // - // If an application wants to setup CASE on their own, they should call ComputeTimeTillNextSubscription() to compute the next interval - // at which they should attempt CASE and attempt CASE at that time. On successful CASE establishment, this method should be called with - // the new SessionHandle provided through 'aNewSessionHandle', 'aTimeTillNextResubscriptionMs' set to 0 (i.e re-subscribe immediately) and - // 'aReestablishCASE' set to false. + // If an application wants to setup CASE on their own, they should call ComputeTimeTillNextSubscription() to compute the next + // interval at which they should attempt CASE and attempt CASE at that time. On successful CASE establishment, this method + // should be called with the new SessionHandle provided through 'aNewSessionHandle', 'aTimeTillNextResubscriptionMs' set to 0 + // (i.e re-subscribe immediately) and 'aReestablishCASE' set to false. // // Otherwise, if aReestablishCASE is true, operational discovery and CASE will be attempted at that time before // the actual IM interaction is initiated. @@ -370,7 +370,8 @@ class ReadClient : public Messaging::ExchangeDelegate // aReestablishCASE SHALL NOT be set to true if a valid SessionHandle is provided through newSessionHandle. // // - CHIP_ERROR ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, Optional aNewSessionHandle, bool aReestablishCASE); + CHIP_ERROR ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, Optional aNewSessionHandle, + bool aReestablishCASE); // Like SendSubscribeRequest, but allows sending certain forms of invalid // subscribe requests that servers are expected to reject, for testing diff --git a/src/app/ReadHandler.cpp b/src/app/ReadHandler.cpp index 0b68e02d1cec53..cc6dcd5ac1adfa 100644 --- a/src/app/ReadHandler.cpp +++ b/src/app/ReadHandler.cpp @@ -181,6 +181,7 @@ CHIP_ERROR ReadHandler::SendStatusReport(Protocols::InteractionModel::Status aSt VerifyOrReturnLogError(!mExchangeCtx, CHIP_ERROR_INCORRECT_STATE); VerifyOrReturnLogError(mSessionHandle, CHIP_ERROR_INCORRECT_STATE); auto exchange = InteractionModelEngine::GetInstance()->GetExchangeManager()->NewContext(mSessionHandle.Get().Value(), this); + VerifyOrReturnLogError(exchange != nullptr, CHIP_ERROR_INCORRECT_STATE); mExchangeCtx.Grab(exchange); } @@ -200,10 +201,12 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, b VerifyOrReturnLogError(!mExchangeCtx, CHIP_ERROR_INCORRECT_STATE); VerifyOrReturnLogError(mSessionHandle, CHIP_ERROR_INCORRECT_STATE); auto exchange = InteractionModelEngine::GetInstance()->GetExchangeManager()->NewContext(mSessionHandle.Get().Value(), this); + VerifyOrReturnLogError(exchange != nullptr, CHIP_ERROR_INCORRECT_STATE); mExchangeCtx.Grab(exchange); } VerifyOrReturnLogError(mExchangeCtx, CHIP_ERROR_INCORRECT_STATE); + if (!IsReporting()) { mCurrentReportsBeginGeneration = InteractionModelEngine::GetInstance()->GetReportingEngine().GetDirtySetGeneration(); diff --git a/src/app/clusters/operational-credentials-server/operational-credentials-server.cpp b/src/app/clusters/operational-credentials-server/operational-credentials-server.cpp index 35c4ec99e4ccd3..dd9a4ff10e2e2b 100644 --- a/src/app/clusters/operational-credentials-server/operational-credentials-server.cpp +++ b/src/app/clusters/operational-credentials-server/operational-credentials-server.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include diff --git a/src/controller/python/chip/clusters/Attribute.py b/src/controller/python/chip/clusters/Attribute.py index 87fd371cf963c3..fe8ee128d9b1c8 100644 --- a/src/controller/python/chip/clusters/Attribute.py +++ b/src/controller/python/chip/clusters/Attribute.py @@ -474,6 +474,9 @@ def __init__(self, transaction: 'AsyncReadTransaction', subscriptionId, devCtrl) self._subscriptionId = subscriptionId self._devCtrl = devCtrl self._isDone = False + self._onResubscriptionSucceededCb = None + self._onResubscriptionSucceededCb_isAsync = False + self._onResubscriptionAttemptedCb_isAsync = False def GetAttributes(self): ''' Returns the attribute value cache tracking the latest state on the publisher. @@ -493,7 +496,13 @@ def GetAttribute(self, path: TypedAttributePath) -> Any: def GetEvents(self): return self._readTransaction.GetAllEventValues() - def SetResubscriptionAttemptedCallback(self, callback: Callable[[SubscriptionTransaction, int, int], None]): + def OverrideLivenessTimeoutMs(self, timeoutMs: int): + handle = chip.native.GetLibraryHandle() + builtins.chipStack.Call( + lambda: handle.pychip_ReadClient_OverrideLivenessTimeout(self._readTransaction._pReadClient, timeoutMs) + ) + + def SetResubscriptionAttemptedCallback(self, callback: Callable[[SubscriptionTransaction, int, int], None], isAsync=False): ''' Sets the callback function that gets invoked anytime a re-subscription is attempted. The callback is expected to have the following signature: @@ -501,6 +510,17 @@ def Callback(transaction: SubscriptionTransaction, errorEncountered: int, nextRe ''' if callback is not None: self._onResubscriptionAttemptedCb = callback + self._onResubscriptionAttemptedCb_isAsync = isAsync + + def SetResubscriptionSucceededCallback(self, callback: Callback[[SubscriptionTransaction], None], isAsync=False): + ''' + Sets the callback function that gets invoked when a re-subscription attempt succeeds. The callback + is expected to have the following signature: + def Callback(transaction: SubscriptionTransaction) + ''' + if callback is not None: + self._onResubscriptionSucceededCb = callback + self._onResubscriptionSucceededCb_isAsync = isAsync def SetAttributeUpdateCallback(self, callback: Callable[[TypedAttributePath, SubscriptionTransaction], None]): ''' @@ -550,7 +570,7 @@ def __repr__(self): return f'' -def DefaultResubscriptionAttemptedCallback(transaction: SubscriptionTransaction, terminationError, nextResubscribeIntervalMsec): +async def DefaultResubscriptionAttemptedCallback(transaction: SubscriptionTransaction, terminationError, nextResubscribeIntervalMsec): print(f"Previous subscription failed with Error: {terminationError} - re-subscribing in {nextResubscribeIntervalMsec}ms...") @@ -691,14 +711,26 @@ def _handleSubscriptionEstablished(self, subscriptionId): self._subscription_handler = SubscriptionTransaction( self, subscriptionId, self._devCtrl) self._future.set_result(self._subscription_handler) + else: + logging.info("Re-subscription succeeded!") + if self._subscription_handler._onResubscriptionSucceededCb is not None: + if (self._subscription_handler._onResubscriptionSucceededCb_isAsync): + self._event_loop.create_task( + self._subscription_handler._onResubscriptionSucceededCb(self._subscription_handler)) + else: + self._subscription_handler._onResubscriptionSucceededCb(self._subscription_handler) def handleSubscriptionEstablished(self, subscriptionId): self._event_loop.call_soon_threadsafe( self._handleSubscriptionEstablished, subscriptionId) def handleResubscriptionAttempted(self, terminationCause: int, nextResubscribeIntervalMsec: int): - self._event_loop.call_soon_threadsafe( - self._subscription_handler._onResubscriptionAttemptedCb, self._subscription_handler, terminationCause, nextResubscribeIntervalMsec) + if (self._subscription_handler._onResubscriptionAttemptedCb_isAsync): + self._event_loop.create_task(self._subscription_handler._onResubscriptionAttemptedCb( + self._subscription_handler, terminationCause, nextResubscribeIntervalMsec)) + else: + self._event_loop.call_soon_threadsafe( + self._subscription_handler._onResubscriptionAttemptedCb, self._subscription_handler, terminationCause, nextResubscribeIntervalMsec) def _handleReportBegin(self): pass diff --git a/src/controller/python/chip/clusters/attribute.cpp b/src/controller/python/chip/clusters/attribute.cpp index d1985432107138..488c1bc1c47521 100644 --- a/src/controller/python/chip/clusters/attribute.cpp +++ b/src/controller/python/chip/clusters/attribute.cpp @@ -15,6 +15,7 @@ * limitations under the License. */ +#include "system/SystemClock.h" #include #include #include @@ -387,6 +388,12 @@ void pychip_ReadClient_Abort(ReadClient * apReadClient, ReadClientCallback * apC delete apCallback; } +void pychip_ReadClient_OverrideLivenessTimeout(ReadClient * pReadClient, uint32_t livenessTimeoutMs) +{ + VerifyOrDie(pReadClient != nullptr); + pReadClient->OverrideLivenessTimeout(System::Clock::Milliseconds32(livenessTimeoutMs)); +} + chip::ChipError::StorageType pychip_ReadClient_Read(void * appContext, ReadClient ** pReadClient, ReadClientCallback ** pCallback, DeviceProxy * device, uint8_t * readParamsBuf, size_t numAttributePaths, size_t numDataversionFilters, size_t numEventPaths, ...) diff --git a/src/controller/python/test/test_scripts/base.py b/src/controller/python/test/test_scripts/base.py index 6cd812371f32c2..a2f1aaf18d15b1 100644 --- a/src/controller/python/test/test_scripts/base.py +++ b/src/controller/python/test/test_scripts/base.py @@ -756,6 +756,58 @@ def CompareUnfilteredData(accessingFabric, otherFabric, expectedData): if (expectedDataFabric1 != readListDataFabric1): raise AssertionError("Got back mismatched data") + async def TestResubscription(self, nodeid: int): + ''' This validates the re-subscription logic by triggering a liveness failure caused by the expiration + of the underlying CASE session and the resultant failure to receive reports from the server. This should + trigger CASE session establishment and subscription restablishment. Both the attempt and successful + restablishment of the subscription are validated. + ''' + cv = asyncio.Condition() + resubAttempted = False + resubSucceeded = True + + async def OnResubscriptionAttempted(transaction, errorEncountered: int, nextResubsribeIntervalMsec: int): + self.logger.info("Re-subscription Attempted") + nonlocal resubAttempted + resubAttempted = True + + async def OnResubscriptionSucceeded(transaction): + self.logger.info("Re-subscription Succeeded") + nonlocal cv + async with cv: + cv.notify() + + subscription = await self.devCtrl.ReadAttribute(nodeid, [(Clusters.Basic.Attributes.ClusterRevision)], reportInterval=(0, 5)) + + # + # Register async callbacks that will fire when a re-sub is attempted or succeeds. + # + subscription.SetResubscriptionAttemptedCallback(OnResubscriptionAttempted, True) + subscription.SetResubscriptionSucceededCallback(OnResubscriptionSucceeded, True) + + # + # Now, let's go and expire the session to the node. That will immediately prevent + # future reports from the server from being dispatched to the client logic. + # + self.devCtrl.ExpireSessions(nodeid) + + # + # Over-ride the default liveness timeout (which is set quite high to accomodate for + # transport delays) to something very small. This ensures that our liveness timer will + # fire quickly and cause a re-subscription to occur naturally. + # + subscription.OverrideLivenessTimeoutMs(100) + + async with cv: + if (not(resubAttempted) or not(resubSucceeded)): + res = await asyncio.wait_for(cv.wait(), 3) + if not res: + self.logger.error("Timed out waiting for resubscription to succeed") + return False + + subscription.Shutdown() + return True + def TestCloseSession(self, nodeid: int): self.logger.info(f"Closing sessions with device {nodeid}") try: diff --git a/src/controller/python/test/test_scripts/mobile-device-test.py b/src/controller/python/test/test_scripts/mobile-device-test.py index f02a93dc388953..b6b7a1d4e91595 100755 --- a/src/controller/python/test/test_scripts/mobile-device-test.py +++ b/src/controller/python/test/test_scripts/mobile-device-test.py @@ -140,6 +140,10 @@ def TestDatamodel(test: BaseTestHelper, device_nodeid: int): FailIfNot(test.TestSubscription(nodeid=device_nodeid, endpoint=LIGHTING_ENDPOINT_ID), "Failed to subscribe attributes.") + logger.info("Testing re-subscription") + FailIfNot(asyncio.run(test.TestResubscription(nodeid=device_nodeid)), + "Failed to validated re-subscription") + logger.info("Testing on off cluster over resolved connection") FailIfNot(test.TestOnOffCluster(nodeid=device_nodeid, endpoint=LIGHTING_ENDPOINT_ID, diff --git a/src/controller/tests/data_model/TestRead.cpp b/src/controller/tests/data_model/TestRead.cpp index 74192e63920ee5..0dda7e31483ce4 100644 --- a/src/controller/tests/data_model/TestRead.cpp +++ b/src/controller/tests/data_model/TestRead.cpp @@ -1476,11 +1476,9 @@ void TestReadInteraction::TestReadAttributeTimeout(nlTestSuite * apSuite, void * class TestResubscriptionCallback : public app::ReadClient::Callback { public: - TestResubscriptionCallback() { } + TestResubscriptionCallback() {} - void SetReadClient(app::ReadClient *apReadClient) { - mpReadClient = apReadClient; - } + void SetReadClient(app::ReadClient * apReadClient) { mpReadClient = apReadClient; } void OnDone(app::ReadClient *) override { mOnDone++; } @@ -1490,7 +1488,8 @@ class TestResubscriptionCallback : public app::ReadClient::Callback mLastError = aError; } - void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override { + void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override + { mOnSubscriptionEstablishedCount++; // @@ -1500,29 +1499,29 @@ class TestResubscriptionCallback : public app::ReadClient::Callback mpReadClient->OverrideLivenessTimeout(System::Clock::Milliseconds32(10)); } - CHIP_ERROR OnResubscriptionNeeded(app::ReadClient *apReadClient, CHIP_ERROR aTerminationCause) override + CHIP_ERROR OnResubscriptionNeeded(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override { mOnResubscriptionsAttempted++; return apReadClient->ScheduleResubscription(apReadClient->ComputeTimeTillNextSubscription(), NullOptional, false); } - + void ClearCounters() { mOnSubscriptionEstablishedCount = 0; mOnDone = 0; mOnError = 0; - mOnResubscriptionsAttempted = 0; + mOnResubscriptionsAttempted = 0; mLastError = CHIP_NO_ERROR; } int32_t mAttributeCount = 0; int32_t mOnReportEnd = 0; int32_t mOnSubscriptionEstablishedCount = 0; - int32_t mOnResubscriptionsAttempted = 0; + int32_t mOnResubscriptionsAttempted = 0; int32_t mOnDone = 0; int32_t mOnError = 0; CHIP_ERROR mLastError = CHIP_NO_ERROR; - app::ReadClient *mpReadClient = nullptr; + app::ReadClient * mpReadClient = nullptr; }; // @@ -1535,12 +1534,13 @@ class TestResubscriptionCallback : public app::ReadClient::Callback // void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext) { - TestContext & ctx = *static_cast(apContext); - auto sessionHandle = ctx.GetSessionBobToAlice(); + TestContext & ctx = *static_cast(apContext); + auto sessionHandle = ctx.GetSessionBobToAlice(); { TestResubscriptionCallback callback; - app::ReadClient readClient(app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), callback, app::ReadClient::InteractionType::Subscribe); + app::ReadClient readClient(app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), callback, + app::ReadClient::InteractionType::Subscribe); callback.SetReadClient(&readClient); @@ -1551,7 +1551,7 @@ void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, v readPrepareParams.mpAttributePathParamsList = attributePathParams; readPrepareParams.mAttributePathParamsListSize = ArraySize(attributePathParams); - attributePathParams[0].mClusterId = app::Clusters::TestCluster::Id; + attributePathParams[0].mClusterId = app::Clusters::TestCluster::Id; attributePathParams[0].mAttributeId = app::Clusters::TestCluster::Attributes::Boolean::Id; readPrepareParams.mMaxIntervalCeilingSeconds = 1; @@ -1561,9 +1561,8 @@ void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, v // // Drive servicing IO till we have established a subscription at least 2 times. // - ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(2), [&]() { - return callback.mOnSubscriptionEstablishedCount > 1; - }); + ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(2), + [&]() { return callback.mOnSubscriptionEstablishedCount > 1; }); NL_TEST_ASSERT(apSuite, callback.mOnDone == 0); @@ -1577,7 +1576,7 @@ void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, v // NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 1); } - + app::InteractionModelEngine::GetInstance()->ShutdownActiveReads(); NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); } From 201b70edcb5191b8bb0283005ee84e1e4d6f119a Mon Sep 17 00:00:00 2001 From: Jerry Johns Date: Mon, 25 Jul 2022 14:31:05 -0700 Subject: [PATCH 05/10] Updated comment style --- src/app/ReadClient.h | 85 +++++++++++++++++++++++--------------------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h index 8419a4855c0b42..31a6b7ba573a00 100644 --- a/src/app/ReadClient.h +++ b/src/app/ReadClient.h @@ -323,53 +323,56 @@ class ReadClient : public Messaging::ExchangeDelegate ReadClient * GetNextClient() { return mpNext; } void SetNextClient(ReadClient * apClient) { mpNext = apClient; } - // Like SendSubscribeRequest, but the ReadClient will automatically attempt to re-establish the subscription if - // we decide that the subscription has dropped. The exact behavior of the re-establishment can be controlled - // by setting mResubscribePolicy in the ReadPrepareParams. If not set, a default behavior with exponential backoff will be - // used. - // - // The application has to know to - // a) allocate a ReadPrepareParams object that will have fields mpEventPathParamsList and mpAttributePathParamsList and - // mpDataVersionFilterList with lifetimes as long as the ReadClient itself and b) free those up later in the call to - // OnDeallocatePaths. Note: At a given time in the system, you can either have a single subscription with re-sub enabled that - // that has mKeepSubscriptions = false, OR, multiple subs with re-sub enabled with mKeepSubscriptions = true. You shall not have - // a mix of both simultaneously. If SendAutoResubscribeRequest is called at all, it guarantees that it will call - // OnDeallocatePaths when OnDone is called. SendAutoResubscribeRequest is the only case that calls OnDeallocatePaths, since - // that's the only case when the consumer moved a ReadParams into the client. + /** + * Like SendSubscribeRequest, but the ReadClient will automatically attempt to re-establish the subscription if + * we decide that the subscription has dropped. The exact behavior of the re-establishment can be controlled + * by setting mResubscribePolicy in the ReadPrepareParams. If not set, a default behavior with exponential backoff will be + * used. + * + * The application has to know to + * a) allocate a ReadPrepareParams object that will have fields mpEventPathParamsList and mpAttributePathParamsList and + * mpDataVersionFilterList with lifetimes as long as the ReadClient itself and b) free those up later in the call to + * OnDeallocatePaths. Note: At a given time in the system, you can either have a single subscription with re-sub enabled that + * that has mKeepSubscriptions = false, OR, multiple subs with re-sub enabled with mKeepSubscriptions = true. You shall not + * have a mix of both simultaneously. If SendAutoResubscribeRequest is called at all, it guarantees that it will call + * OnDeallocatePaths when OnDone is called. SendAutoResubscribeRequest is the only case that calls OnDeallocatePaths, since + * that's the only case when the consumer moved a ReadParams into the client. + * + */ CHIP_ERROR SendAutoResubscribeRequest(ReadPrepareParams && aReadPrepareParams); - // - // This provides a standard re-subscription policy implementation that given a termination cause, does the following: - // - Calculates the time till next subscription with fibonacci back-off (implemented by ComputeTimeTillNextSubscription()). - // - Schedules the next subscription attempt at the computed interval from the previous step. Operational discovery and - // CASE establishment will be attempted if aTerminationCause was CHIP_ERROR_TIMEOUT. In all other cases, it will attempt - // to re-use a previously established session. - // + /** + * This provides a standard re-subscription policy implementation that given a termination cause, does the following: + * - Calculates the time till next subscription with fibonacci back-off (implemented by ComputeTimeTillNextSubscription()). + * - Schedules the next subscription attempt at the computed interval from the previous step. Operational discovery and + * CASE establishment will be attempted if aTerminationCause was CHIP_ERROR_TIMEOUT. In all other cases, it will attempt + * to re-use a previously established session. + */ CHIP_ERROR DefaultResubscribePolicy(CHIP_ERROR aTerminationCause); - // - // Computes the time till the next re-subscription with millisecond resolution over - // an ever increasing window following a fibonacci sequence with the current retry count - // used as input to the fibonacci algorithm. - // - // CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX is used as the maximum ceiling for that input. - // + /** + * Computes the time till the next re-subscription with millisecond resolution over + * an ever increasing window following a fibonacci sequence with the current retry count + * used as input to the fibonacci algorithm. + * + * CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX is used as the maximum ceiling for that input. + * + */ uint32_t ComputeTimeTillNextSubscription(); - // - // Schedules a re-subscription aTimeTillNextResubscriptionMs into the future. - // - // If an application wants to setup CASE on their own, they should call ComputeTimeTillNextSubscription() to compute the next - // interval at which they should attempt CASE and attempt CASE at that time. On successful CASE establishment, this method - // should be called with the new SessionHandle provided through 'aNewSessionHandle', 'aTimeTillNextResubscriptionMs' set to 0 - // (i.e re-subscribe immediately) and 'aReestablishCASE' set to false. - // - // Otherwise, if aReestablishCASE is true, operational discovery and CASE will be attempted at that time before - // the actual IM interaction is initiated. - // - // aReestablishCASE SHALL NOT be set to true if a valid SessionHandle is provided through newSessionHandle. - // - // + /** + * Schedules a re-subscription aTimeTillNextResubscriptionMs into the future. + * + * If an application wants to setup CASE on their own, they should call ComputeTimeTillNextSubscription() to compute the next + * interval at which they should attempt CASE and attempt CASE at that time. On successful CASE establishment, this method + * should be called with the new SessionHandle provided through 'aNewSessionHandle', 'aTimeTillNextResubscriptionMs' set to 0 + * (i.e re-subscribe immediately) and 'aReestablishCASE' set to false. + * + * Otherwise, if aReestablishCASE is true, operational discovery and CASE will be attempted at that time before + * the actual IM interaction is initiated. + * + * aReestablishCASE SHALL NOT be set to true if a valid SessionHandle is provided through newSessionHandle. + */ CHIP_ERROR ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, Optional aNewSessionHandle, bool aReestablishCASE); From 56414ef47bf3e8e62076cbce24b660d5ed673095 Mon Sep 17 00:00:00 2001 From: Jerry Johns Date: Mon, 25 Jul 2022 14:45:09 -0700 Subject: [PATCH 06/10] More minor cleanup --- src/app/ReadClient.cpp | 5 ++++- src/app/ReadClient.h | 18 +++++++++++------- .../python/chip/clusters/Attribute.py | 4 ++++ 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp index d8b4a9304d3952..e69c07cce1ec09 100644 --- a/src/app/ReadClient.cpp +++ b/src/app/ReadClient.cpp @@ -696,7 +696,8 @@ CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsRea // // Update the event number being tracked in mReadPrepareParams in case - // we want to use it for re-subscriptions later. + // we want to send it in the next SubscribeRequest message to convey + // the event number till which we've already received events for. // mReadPrepareParams.mEventNumber.SetValue(header.mEventNumber + 1); @@ -737,6 +738,8 @@ CHIP_ERROR ReadClient::RefreshLivenessCheckTimer() { CHIP_ERROR err = CHIP_NO_ERROR; + VerifyOrReturnError(mState == ClientState::SubscriptionActive, CHIP_ERROR_INCORRECT_STATE); + CancelLivenessCheckTimer(); System::Clock::Timeout timeout; diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h index 31a6b7ba573a00..8f3580a43a051f 100644 --- a/src/app/ReadClient.h +++ b/src/app/ReadClient.h @@ -334,7 +334,7 @@ class ReadClient : public Messaging::ExchangeDelegate * mpDataVersionFilterList with lifetimes as long as the ReadClient itself and b) free those up later in the call to * OnDeallocatePaths. Note: At a given time in the system, you can either have a single subscription with re-sub enabled that * that has mKeepSubscriptions = false, OR, multiple subs with re-sub enabled with mKeepSubscriptions = true. You shall not - * have a mix of both simultaneously. If SendAutoResubscribeRequest is called at all, it guarantees that it will call + * have a mix of both simultaneously. If SendAutoResubscribeRequest is called at all, it guarantees that it will call * OnDeallocatePaths when OnDone is called. SendAutoResubscribeRequest is the only case that calls OnDeallocatePaths, since * that's the only case when the consumer moved a ReadParams into the client. * @@ -386,12 +386,16 @@ class ReadClient : public Messaging::ExchangeDelegate } #endif // CONFIG_BUILD_FOR_HOST_UNIT_TEST - // - // Override the interval at which liveness of the subscription is assessed. - // By default, this is set set to the max interval of the subscription + ACK timeout of the underlying session. - // - // This can be called at any time. - // + /** + * Override the interval at which liveness of the subscription is assessed. + * By default, this is set set to the max interval of the subscription + ACK timeout of the underlying session. + * + * This can be only be called once a subscription has been established and is active. Once called, this will cancel any existing + * liveness timers and schedule a new one. + * + * This can be called from the Callback::OnSubscriptionEstablished callback. + * + */ void OverrideLivenessTimeout(System::Clock::Timeout aLivenessTimeout); private: diff --git a/src/controller/python/chip/clusters/Attribute.py b/src/controller/python/chip/clusters/Attribute.py index fe8ee128d9b1c8..db29b86974a284 100644 --- a/src/controller/python/chip/clusters/Attribute.py +++ b/src/controller/python/chip/clusters/Attribute.py @@ -507,6 +507,8 @@ def SetResubscriptionAttemptedCallback(self, callback: Callable[[SubscriptionTra Sets the callback function that gets invoked anytime a re-subscription is attempted. The callback is expected to have the following signature: def Callback(transaction: SubscriptionTransaction, errorEncountered: int, nextResubscribeIntervalMsec: int) + + If the callback is an awaitable co-routine, isAsync should be set to True. ''' if callback is not None: self._onResubscriptionAttemptedCb = callback @@ -517,6 +519,8 @@ def SetResubscriptionSucceededCallback(self, callback: Callback[[SubscriptionTra Sets the callback function that gets invoked when a re-subscription attempt succeeds. The callback is expected to have the following signature: def Callback(transaction: SubscriptionTransaction) + + If the callback is an awaitable co-routine, isAsync should be set to True. ''' if callback is not None: self._onResubscriptionSucceededCb = callback From d5ef73fec8276a65fbd24cf76ee884d7e83b9390 Mon Sep 17 00:00:00 2001 From: Jerry Johns Date: Mon, 25 Jul 2022 22:06:37 -0700 Subject: [PATCH 07/10] More build fixes --- src/app/InteractionModelEngine.cpp | 10 ++++++++-- src/controller/java/AndroidCallbacks.cpp | 11 +++++++++-- src/controller/java/AndroidCallbacks.h | 2 +- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/app/InteractionModelEngine.cpp b/src/app/InteractionModelEngine.cpp index 7348876344b7d5..e4f76d46d113b5 100644 --- a/src/app/InteractionModelEngine.cpp +++ b/src/app/InteractionModelEngine.cpp @@ -128,8 +128,14 @@ void InteractionModelEngine::Shutdown() mpExchangeMgr->UnregisterUnsolicitedMessageHandlerForProtocol(Protocols::InteractionModel::Id); mpCASESessionMgr = nullptr; - mpFabricTable = nullptr; - mpExchangeMgr = nullptr; + + // + // We _should_ be clearing these out, but doing so invites a world + // of trouble. #21233 tracks fixing the underlying assumptions to make + // this possible. + // + // mpFabricTable = nullptr; + // mpExchangeMgr = nullptr; } uint32_t InteractionModelEngine::GetNumActiveReadHandlers() const diff --git a/src/controller/java/AndroidCallbacks.cpp b/src/controller/java/AndroidCallbacks.cpp index 588f47c14a9389..7b68a4e5d0d7e5 100644 --- a/src/controller/java/AndroidCallbacks.cpp +++ b/src/controller/java/AndroidCallbacks.cpp @@ -547,7 +547,7 @@ void ReportEventCallback::OnSubscriptionEstablished(SubscriptionId aSubscription JniReferences::GetInstance().CallSubscriptionEstablished(mSubscriptionEstablishedCallbackRef); } -void ReportEventCallback::OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) +void ReportEventCallback::OnResubscriptionAttempt(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) { VerifyOrReturn(mResubscriptionAttemptCallbackRef != nullptr, ChipLogError(Controller, "mResubscriptionAttemptCallbackRef is null")); @@ -555,6 +555,13 @@ void ReportEventCallback::OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, CHIP_ERROR err = CHIP_NO_ERROR; JNIEnv * env = JniReferences::GetInstance().GetEnvForCurrentThread(); + err = app::ReadClient::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause); + if (err != CHIP_NO_ERROR) + { + ReportError(nullptr, ErrorStr(err), err.AsInteger()); + return; + } + jmethodID onResubscriptionAttemptMethod; err = JniReferences::GetInstance().FindMethod(env, mResubscriptionAttemptCallbackRef, "onResubscriptionAttempt", "(II)V", &onResubscriptionAttemptMethod); @@ -562,7 +569,7 @@ void ReportEventCallback::OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, DeviceLayer::StackUnlock unlock; env->CallVoidMethod(mResubscriptionAttemptCallbackRef, onResubscriptionAttemptMethod, aTerminationCause.AsInteger(), - aNextResubscribeIntervalMsec); + apReadClient->ComputeTimeTillNextSubscription()); } void ReportEventCallback::ReportError(jobject attributePath, CHIP_ERROR err) diff --git a/src/controller/java/AndroidCallbacks.h b/src/controller/java/AndroidCallbacks.h index 51c38d3c2b4e1a..45fab4c80efcbf 100644 --- a/src/controller/java/AndroidCallbacks.h +++ b/src/controller/java/AndroidCallbacks.h @@ -98,7 +98,7 @@ struct ReportEventCallback : public app::ReadClient::Callback void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override; - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override; + void OnResubscriptionAttempt(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override; /** Report errors back to Java layer. attributePath may be nullptr for general errors. */ void ReportError(jobject eventPath, CHIP_ERROR err); From a495e4185d00fb115a9e43fdb8fd73c6eba63ff7 Mon Sep 17 00:00:00 2001 From: Jerry Johns Date: Tue, 26 Jul 2022 11:32:57 -0700 Subject: [PATCH 08/10] Update to master --- src/app/ReadClient.cpp | 18 ++++-------------- src/app/ReadClient.h | 2 +- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp index e69c07cce1ec09..b2255fd73f64ea 100644 --- a/src/app/ReadClient.cpp +++ b/src/app/ReadClient.cpp @@ -983,7 +983,7 @@ void ReadClient::HandleDeviceConnected(void * context, OperationalDeviceProxy * } } -void ReadClient::HandleDeviceConnectionFailure(void * context, PeerId peerId, CHIP_ERROR err) +void ReadClient::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR err) { ReadClient * const _this = static_cast(context); VerifyOrDie(_this != nullptr); @@ -1011,24 +1011,14 @@ void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager(); VerifyOrExit(caseSessionManager != nullptr, err = CHIP_ERROR_INCORRECT_STATE); - auto fabric = InteractionModelEngine::GetInstance()->GetFabricTable()->FindFabricWithIndex(_this->GetFabricIndex()); - - // - // Temporary until #21084 is addressed. This object would have been synchronously cleaned-up - // when a fabric has gone away, and this condition should never arise. - // - VerifyOrExit(fabric != nullptr, err = CHIP_ERROR_INVALID_FABRIC_INDEX; - ChipLogError(DataManagement, "Underlying fabric has gone away, stopping re-subscriptions!");); - - PeerId peerId(fabric->GetCompressedFabricId(), _this->GetPeerNodeId()); - - auto proxy = caseSessionManager->FindExistingSession(peerId); + auto proxy = caseSessionManager->FindExistingSession(_this->mPeer); if (proxy != nullptr) { proxy->Disconnect(); } - caseSessionManager->FindOrEstablishSession(peerId, &_this->mOnConnectedCallback, &_this->mOnConnectionFailureCallback); + caseSessionManager->FindOrEstablishSession(_this->mPeer, &_this->mOnConnectedCallback, + &_this->mOnConnectionFailureCallback); return; } diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h index 8f3580a43a051f..3a8038917d6907 100644 --- a/src/app/ReadClient.h +++ b/src/app/ReadClient.h @@ -488,7 +488,7 @@ class ReadClient : public Messaging::ExchangeDelegate void ClearActiveSubscriptionState(); static void HandleDeviceConnected(void * context, OperationalDeviceProxy * device); - static void HandleDeviceConnectionFailure(void * context, PeerId peerId, CHIP_ERROR error); + static void HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error); CHIP_ERROR GetMinEventNumber(const ReadPrepareParams & aReadPrepareParams, Optional & aEventMin); From 10ab0537c7421ee6f1a4516c9ede9e3c437f9efd Mon Sep 17 00:00:00 2001 From: Jerry Johns Date: Tue, 26 Jul 2022 15:37:51 -0700 Subject: [PATCH 09/10] Review feedback --- examples/bridge-app/esp32/main/main.cpp | 1 + src/app/InteractionModelEngine.h | 2 - src/app/ReadClient.cpp | 37 +++++++++++++++---- src/app/ReadClient.h | 13 ++++--- src/controller/java/AndroidCallbacks.cpp | 16 +++----- src/controller/java/AndroidCallbacks.h | 2 +- .../python/test/test_scripts/base.py | 8 +--- 7 files changed, 45 insertions(+), 34 deletions(-) diff --git a/examples/bridge-app/esp32/main/main.cpp b/examples/bridge-app/esp32/main/main.cpp index 0d022828740cfb..2d349211725ab5 100644 --- a/examples/bridge-app/esp32/main/main.cpp +++ b/examples/bridge-app/esp32/main/main.cpp @@ -32,6 +32,7 @@ #include #include +#include #include #if CONFIG_ENABLE_ESP32_FACTORY_DATA_PROVIDER diff --git a/src/app/InteractionModelEngine.h b/src/app/InteractionModelEngine.h index 982d113af1fb78..208b88d29bfa60 100644 --- a/src/app/InteractionModelEngine.h +++ b/src/app/InteractionModelEngine.h @@ -125,8 +125,6 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler, */ CASESessionManager * GetCASESessionManager() const { return mpCASESessionMgr; } - FabricTable * GetFabricTable() const { return mpFabricTable; } - /** * Tears down an active subscription. * diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp index b2255fd73f64ea..195b79216b03f9 100644 --- a/src/app/ReadClient.cpp +++ b/src/app/ReadClient.cpp @@ -121,7 +121,7 @@ CHIP_ERROR ReadClient::ScheduleResubscription(uint32_t aTimeTillNextResubscripti VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE); // - // If we're establishing CASE, make sure we not provided a new SessionHandle as well. + // If we're establishing CASE, make sure we are not provided a new SessionHandle as well. // VerifyOrReturnError(!aReestablishCASE || !aNewSessionHandle.HasValue(), CHIP_ERROR_INVALID_ARGUMENT); @@ -553,12 +553,17 @@ CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload) exit: if (IsSubscriptionType()) { - if (IsAwaitingInitialReport() || IsAwaitingSubscribeResponse()) + if (IsAwaitingInitialReport()) { MoveToState(ClientState::AwaitingSubscribeResponse); } - else + else if (IsSubscriptionActive()) { + // + // Only refresh the liveness check timer if we've successfully established + // a subscription and have a valid value for mMaxInterval which the function + // relies on. + // RefreshLivenessCheckTimer(); } } @@ -697,7 +702,7 @@ CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsRea // // Update the event number being tracked in mReadPrepareParams in case // we want to send it in the next SubscribeRequest message to convey - // the event number till which we've already received events for. + // the event number for which we have already received an event. // mReadPrepareParams.mEventNumber.SetValue(header.mEventNumber + 1); @@ -974,6 +979,7 @@ void ReadClient::HandleDeviceConnected(void * context, OperationalDeviceProxy * ReadClient * const _this = static_cast(context); VerifyOrDie(_this != nullptr); + ChipLogProgress(DataManagement, "HandleDeviceConnected %d\n", device->GetSecureSession().HasValue()); _this->mReadPrepareParams.mSessionHolder.Grab(device->GetSecureSession().Value()); auto err = _this->SendSubscribeRequest(_this->mReadPrepareParams); @@ -990,9 +996,6 @@ void ReadClient::HandleDeviceConnectionFailure(void * context, const ScopedNodeI ChipLogError(DataManagement, "Failed to establish CASE for re-subscription with error '%" CHIP_ERROR_FORMAT "'", err.Format()); - auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager(); - VerifyOrDie(caseSessionManager != nullptr); - _this->Close(err); } @@ -1011,6 +1014,24 @@ void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager(); VerifyOrExit(caseSessionManager != nullptr, err = CHIP_ERROR_INCORRECT_STATE); + // + // We need to mark our session as defunct explicitly since the assessment of a liveness failure + // is usually triggered by the absence of any exchange activity that would have otherwise + // automatically marked the session as defunct on a response timeout. + // + // Doing so will ensure that the subsequent call to FindOrEstablishSession will not bind to + // an existing established session but rather trigger establishing a new one. + // + if (_this->mReadPrepareParams.mSessionHolder) + { + _this->mReadPrepareParams.mSessionHolder.Get().Value()->AsSecureSession()->MarkAsDefunct(); + } + + // + // TODO: Until #19259 is merged, we cannot actually just get by with the above logic since marking sessions + // defunct has no effect on resident OperationalDeviceProxy instances that are already bound + // to a now-defunct CASE session. + // auto proxy = caseSessionManager->FindExistingSession(_this->mPeer); if (proxy != nullptr) { @@ -1033,7 +1054,7 @@ void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void // // In that case, don't permit re-subscription to occur. // - _this->Close(err, err != CHIP_ERROR_INVALID_FABRIC_INDEX && err != CHIP_ERROR_INCORRECT_STATE); + _this->Close(err, err != CHIP_ERROR_INCORRECT_STATE); } } diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h index 3a8038917d6907..78291fa5fddc7a 100644 --- a/src/app/ReadClient.h +++ b/src/app/ReadClient.h @@ -62,7 +62,7 @@ class InteractionModelEngine; * When used to manage subscriptions, the client provides functionality to automatically re-subscribe as needed, * including re-establishing CASE under certain conditions (see Callback::OnResubscriptionNeeded for more info). * This is the default behavior. A consumer can completely opt-out of this behavior by over-riding - * Callback::OnResubscriptionNeeded and providing an alternative implementation. + * Callback::OnResubscriptionNeeded and providing an alternative implementation. * */ class ReadClient : public Messaging::ExchangeDelegate @@ -191,6 +191,7 @@ class ReadClient : public Messaging::ExchangeDelegate * - Be called even in error circumstances. * - Only be called after a successful call to SendRequest has been * made, when the read completes or the subscription is shut down. + * * @param[in] apReadClient the ReadClient for the completed interaction. */ virtual void OnDone(ReadClient * apReadClient) = 0; @@ -333,7 +334,7 @@ class ReadClient : public Messaging::ExchangeDelegate * a) allocate a ReadPrepareParams object that will have fields mpEventPathParamsList and mpAttributePathParamsList and * mpDataVersionFilterList with lifetimes as long as the ReadClient itself and b) free those up later in the call to * OnDeallocatePaths. Note: At a given time in the system, you can either have a single subscription with re-sub enabled that - * that has mKeepSubscriptions = false, OR, multiple subs with re-sub enabled with mKeepSubscriptions = true. You shall not + * has mKeepSubscriptions = false, OR, multiple subs with re-sub enabled with mKeepSubscriptions = true. You shall not * have a mix of both simultaneously. If SendAutoResubscribeRequest is called at all, it guarantees that it will call * OnDeallocatePaths when OnDone is called. SendAutoResubscribeRequest is the only case that calls OnDeallocatePaths, since * that's the only case when the consumer moved a ReadParams into the client. @@ -351,11 +352,11 @@ class ReadClient : public Messaging::ExchangeDelegate CHIP_ERROR DefaultResubscribePolicy(CHIP_ERROR aTerminationCause); /** - * Computes the time till the next re-subscription with millisecond resolution over + * Computes the time, in milliseconds, until the next re-subscription over * an ever increasing window following a fibonacci sequence with the current retry count * used as input to the fibonacci algorithm. * - * CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX is used as the maximum ceiling for that input. + * CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX is the maximum value the retry count can tick up to. * */ uint32_t ComputeTimeTillNextSubscription(); @@ -363,10 +364,10 @@ class ReadClient : public Messaging::ExchangeDelegate /** * Schedules a re-subscription aTimeTillNextResubscriptionMs into the future. * - * If an application wants to setup CASE on their own, they should call ComputeTimeTillNextSubscription() to compute the next + * If an application wants to set up CASE on their own, they should call ComputeTimeTillNextSubscription() to compute the next * interval at which they should attempt CASE and attempt CASE at that time. On successful CASE establishment, this method * should be called with the new SessionHandle provided through 'aNewSessionHandle', 'aTimeTillNextResubscriptionMs' set to 0 - * (i.e re-subscribe immediately) and 'aReestablishCASE' set to false. + * (i.e async, but as soon as possible) and 'aReestablishCASE' set to false. * * Otherwise, if aReestablishCASE is true, operational discovery and CASE will be attempted at that time before * the actual IM interaction is initiated. diff --git a/src/controller/java/AndroidCallbacks.cpp b/src/controller/java/AndroidCallbacks.cpp index a76f50aef551b2..d960a3aafee8ae 100644 --- a/src/controller/java/AndroidCallbacks.cpp +++ b/src/controller/java/AndroidCallbacks.cpp @@ -547,7 +547,7 @@ void ReportEventCallback::OnSubscriptionEstablished(SubscriptionId aSubscription JniReferences::GetInstance().CallSubscriptionEstablished(mSubscriptionEstablishedCallbackRef); } -void ReportEventCallback::OnResubscriptionAttempt(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) +CHIP_ERROR ReportEventCallback::OnResubscriptionNeeded(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) { VerifyOrReturn(mResubscriptionAttemptCallbackRef != nullptr, ChipLogError(Controller, "mResubscriptionAttemptCallbackRef is null")); @@ -555,21 +555,17 @@ void ReportEventCallback::OnResubscriptionAttempt(app::ReadClient * apReadClient CHIP_ERROR err = CHIP_NO_ERROR; JNIEnv * env = JniReferences::GetInstance().GetEnvForCurrentThread(); - err = app::ReadClient::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause); - if (err != CHIP_NO_ERROR) - { - ReportError(nullptr, ErrorStr(err), err.AsInteger()); - return; - } + ReturnErrorOnFailure(app::ReadClient::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause)); jmethodID onResubscriptionAttemptMethod; - err = JniReferences::GetInstance().FindMethod(env, mResubscriptionAttemptCallbackRef, "onResubscriptionAttempt", "(II)V", - &onResubscriptionAttemptMethod); - VerifyOrReturn(err == CHIP_NO_ERROR, ChipLogError(Controller, "Could not find onResubscriptionAttempt method")); + ReturnLogErrorOnFailure(JniReferences::GetInstance().FindMethod( + env, mResubscriptionAttemptCallbackRef, "onResubscriptionAttempt", "(II)V", &onResubscriptionAttemptMethod)); DeviceLayer::StackUnlock unlock; env->CallVoidMethod(mResubscriptionAttemptCallbackRef, onResubscriptionAttemptMethod, aTerminationCause.AsInteger(), apReadClient->ComputeTimeTillNextSubscription()); + + return CHIP_NO_ERROR; } void ReportEventCallback::ReportError(jobject attributePath, CHIP_ERROR err) diff --git a/src/controller/java/AndroidCallbacks.h b/src/controller/java/AndroidCallbacks.h index 276ac95969f10c..9ba97362f53340 100644 --- a/src/controller/java/AndroidCallbacks.h +++ b/src/controller/java/AndroidCallbacks.h @@ -98,7 +98,7 @@ struct ReportEventCallback : public app::ReadClient::Callback void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override; - void OnResubscriptionAttempt(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override; + CHIP_ERROR OnResubscriptionNeeded(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override; /** Report errors back to Java layer. attributePath may be nullptr for general errors. */ void ReportError(jobject eventPath, CHIP_ERROR err); diff --git a/src/controller/python/test/test_scripts/base.py b/src/controller/python/test/test_scripts/base.py index a2f1aaf18d15b1..c742abe3476045 100644 --- a/src/controller/python/test/test_scripts/base.py +++ b/src/controller/python/test/test_scripts/base.py @@ -766,7 +766,7 @@ async def TestResubscription(self, nodeid: int): resubAttempted = False resubSucceeded = True - async def OnResubscriptionAttempted(transaction, errorEncountered: int, nextResubsribeIntervalMsec: int): + async def OnResubscriptionAttempted(transaction, errorEncountered: int, nextResubscribeIntervalMsec: int): self.logger.info("Re-subscription Attempted") nonlocal resubAttempted resubAttempted = True @@ -785,12 +785,6 @@ async def OnResubscriptionSucceeded(transaction): subscription.SetResubscriptionAttemptedCallback(OnResubscriptionAttempted, True) subscription.SetResubscriptionSucceededCallback(OnResubscriptionSucceeded, True) - # - # Now, let's go and expire the session to the node. That will immediately prevent - # future reports from the server from being dispatched to the client logic. - # - self.devCtrl.ExpireSessions(nodeid) - # # Over-ride the default liveness timeout (which is set quite high to accomodate for # transport delays) to something very small. This ensures that our liveness timer will From 00f7ac3e0cd6330d9c1aad6b63858728e6342104 Mon Sep 17 00:00:00 2001 From: Jerry Johns Date: Tue, 26 Jul 2022 22:23:39 -0700 Subject: [PATCH 10/10] Review --- src/app/ReadClient.cpp | 6 +++--- src/controller/java/AndroidCallbacks.cpp | 6 ++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp index 195b79216b03f9..8d581db39bf319 100644 --- a/src/app/ReadClient.cpp +++ b/src/app/ReadClient.cpp @@ -1024,7 +1024,7 @@ void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void // if (_this->mReadPrepareParams.mSessionHolder) { - _this->mReadPrepareParams.mSessionHolder.Get().Value()->AsSecureSession()->MarkAsDefunct(); + _this->mReadPrepareParams.mSessionHolder->AsSecureSession()->MarkAsDefunct(); } // @@ -1049,8 +1049,8 @@ void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void if (err != CHIP_NO_ERROR) { // - // Call Close (which should trigger re-subscription again) EXCEPT if we got here because we didn't have a valid fabric, - // or an invalid CASESessionManager pointer when mDoCaseOnNextResub was true. + // Call Close (which should trigger re-subscription again) EXCEPT if we got here because we didn't have a valid + // CASESessionManager pointer when mDoCaseOnNextResub was true. // // In that case, don't permit re-subscription to occur. // diff --git a/src/controller/java/AndroidCallbacks.cpp b/src/controller/java/AndroidCallbacks.cpp index d960a3aafee8ae..1966bd59ecca68 100644 --- a/src/controller/java/AndroidCallbacks.cpp +++ b/src/controller/java/AndroidCallbacks.cpp @@ -549,11 +549,9 @@ void ReportEventCallback::OnSubscriptionEstablished(SubscriptionId aSubscription CHIP_ERROR ReportEventCallback::OnResubscriptionNeeded(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) { - VerifyOrReturn(mResubscriptionAttemptCallbackRef != nullptr, - ChipLogError(Controller, "mResubscriptionAttemptCallbackRef is null")); + VerifyOrReturnLogError(mResubscriptionAttemptCallbackRef != nullptr, CHIP_ERROR_INVALID_ARGUMENT); - CHIP_ERROR err = CHIP_NO_ERROR; - JNIEnv * env = JniReferences::GetInstance().GetEnvForCurrentThread(); + JNIEnv * env = JniReferences::GetInstance().GetEnvForCurrentThread(); ReturnErrorOnFailure(app::ReadClient::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause));