Skip to content

Commit

Permalink
Add records of session establishment for subscription resumption (pro…
Browse files Browse the repository at this point in the history
…ject-chip#31755)

* Add records of session establishment for subscription resumption

* Restyled by clang-format

* review changes

* Schedule subscription resumption when failing to establish the session in SubscriptionResumptionSessionEstablisher

* Add option to set subscription timeout resumption retry interval seconds for Linux app
Add cirque test for subscription resumption timeout

* Restyled by clang-format

* Restyled by autopep8

* Restyled by isort

* fix CI building

* Add test to the test list

* add subscription resumption restries number to SubscriptionInfo struct

* review changes

* make resumption retries persistent

* Restyled by clang-format

* ci build fixes

* try to fix cirque test

---------

Co-authored-by: Restyled.io <[email protected]>
  • Loading branch information
2 people authored and Jerry-ESP committed Mar 28, 2024
1 parent 4561c2c commit 888115f
Show file tree
Hide file tree
Showing 12 changed files with 404 additions and 7 deletions.
12 changes: 12 additions & 0 deletions examples/platform/linux/AppMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,18 @@ void ChipLinuxAppMainLoop(AppMainLoopImplementation * impl)
// Init ZCL Data Model and CHIP App Server
Server::GetInstance().Init(initParams);

#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
// Set ReadHandler Capacity for Subscriptions
chip::app::InteractionModelEngine::GetInstance()->SetHandlerCapacityForSubscriptions(
LinuxDeviceOptions::GetInstance().subscriptionCapacity);
chip::app::InteractionModelEngine::GetInstance()->SetForceHandlerQuota(true);
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
// Set subscription time resumption retry interval seconds
chip::app::InteractionModelEngine::GetInstance()->SetSubscriptionTimeoutResumptionRetryIntervalSeconds(
LinuxDeviceOptions::GetInstance().subscriptionResumptionRetryIntervalSec);
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
#endif // CONFIG_BUILD_FOR_HOST_UNIT_TEST

// Now that the server has started and we are done with our startup logging,
// log our discovery/onboarding information again so it's not lost in the
// noise.
Expand Down
40 changes: 40 additions & 0 deletions examples/platform/linux/Options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ enum
kCommissionerOption_FabricID = 0x1020,
kTraceTo = 0x1021,
kOptionSimulateNoInternalTime = 0x1022,
#if defined(PW_RPC_ENABLED)
kOptionRpcServerPort = 0x1023,
#endif
#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
kDeviceOption_SubscriptionCapacity = 0x1024,
#endif
kDeviceOption_WiFiSupports5g = 0x1025,
#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
kDeviceOption_SubscriptionResumptionRetryIntervalSec = 0x1026,
#endif
};

constexpr unsigned kAppUsageLength = 64;
Expand Down Expand Up @@ -138,6 +148,13 @@ OptionDef sDeviceOptionDefs[] = {
{ "trace-to", kArgumentRequired, kTraceTo },
#endif
{ "simulate-no-internal-time", kNoArgument, kOptionSimulateNoInternalTime },
#if defined(PW_RPC_ENABLED)
{ "rpc-server-port", kArgumentRequired, kOptionRpcServerPort },
#endif
#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
{ "subscription-capacity", kArgumentRequired, kDeviceOption_SubscriptionCapacity },
{ "subscription-resumption-retry-interval", kArgumentRequired, kDeviceOption_SubscriptionResumptionRetryIntervalSec },
#endif
{}
};

Expand Down Expand Up @@ -254,6 +271,16 @@ const char * sDeviceOptionHelp =
#endif
" --simulate-no-internal-time\n"
" Time cluster does not use internal platform time\n"
#if defined(PW_RPC_ENABLED)
" --rpc-server-port\n"
" Start RPC server on specified port\n"
#endif
#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
" --subscription-capacity\n"
" Max number of subscriptions the device will allow\n"
" --subscription-resumption-retry-interval\n"
" subscription timeout resumption retry interval in seconds\n"
#endif
"\n";

bool Base64ArgToVector(const char * arg, size_t maxSize, std::vector<uint8_t> & outVector)
Expand Down Expand Up @@ -507,6 +534,19 @@ bool HandleOption(const char * aProgram, OptionSet * aOptions, int aIdentifier,
case kOptionSimulateNoInternalTime:
LinuxDeviceOptions::GetInstance().mSimulateNoInternalTime = true;
break;
#if defined(PW_RPC_ENABLED)
case kOptionRpcServerPort:
LinuxDeviceOptions::GetInstance().rpcServerPort = static_cast<uint16_t>(atoi(aValue));
break;
#endif
#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
case kDeviceOption_SubscriptionCapacity:
LinuxDeviceOptions::GetInstance().subscriptionCapacity = static_cast<int32_t>(atoi(aValue));
break;
case kDeviceOption_SubscriptionResumptionRetryIntervalSec:
LinuxDeviceOptions::GetInstance().subscriptionResumptionRetryIntervalSec = static_cast<int32_t>(atoi(aValue));
break;
#endif
default:
PrintArgError("%s: INTERNAL ERROR: Unhandled option: %s\n", aProgram, aName);
retval = false;
Expand Down
8 changes: 7 additions & 1 deletion examples/platform/linux/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,13 @@ struct LinuxDeviceOptions
chip::FabricId commissionerFabricId = chip::kUndefinedFabricId;
std::vector<std::string> traceTo;
bool mSimulateNoInternalTime = false;

#if defined(PW_RPC_ENABLED)
uint16_t rpcServerPort = 33000;
#endif
#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
int32_t subscriptionCapacity = CHIP_IM_MAX_NUM_SUBSCRIPTIONS;
int32_t subscriptionResumptionRetryIntervalSec = -1;
#endif
static LinuxDeviceOptions & GetInstance();
};

Expand Down
3 changes: 3 additions & 0 deletions scripts/tests/cirque_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ CIRQUE_TESTS=(
"CommissioningFailureOnReportTest"
"PythonCommissioningTest"
"CommissioningWindowTest"
"SubscriptionResumptionTest"
"SubscriptionResumptionCapacityTest"
"SubscriptionResumptionTimeoutTest"
)

BOLD_GREEN_TEXT="\033[1;32m"
Expand Down
14 changes: 13 additions & 1 deletion src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,11 @@ void InteractionModelEngine::OnDone(ReadHandler & apReadObj)
mReportingEngine.ResetReadHandlerTracker(&apReadObj);

mReadHandlers.ReleaseObject(&apReadObj);
TryToResumeSubscriptions();
}

void InteractionModelEngine::TryToResumeSubscriptions()
{
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
if (!mSubscriptionResumptionScheduled && HasSubscriptionsToResume())
{
Expand All @@ -354,8 +358,10 @@ void InteractionModelEngine::OnDone(ReadHandler & apReadObj)
mpExchangeMgr->GetSessionManager()->SystemLayer()->StartTimer(
System::Clock::Seconds32(timeTillNextSubscriptionResumptionSecs), ResumeSubscriptionsTimerCallback, this);
mNumSubscriptionResumptionRetries++;
ChipLogProgress(InteractionModel, "Schedule subscription resumption when failing to establish session, Retries: %" PRIu32,
mNumSubscriptionResumptionRetries);
}
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
}

Status InteractionModelEngine::OnInvokeCommandRequest(Messaging::ExchangeContext * apExchangeContext,
Expand Down Expand Up @@ -1898,6 +1904,12 @@ void InteractionModelEngine::ResumeSubscriptionsTimerCallback(System::Layer * ap
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
uint32_t InteractionModelEngine::ComputeTimeSecondsTillNextSubscriptionResumption()
{
#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
if (mSubscriptionResumptionRetrySecondsOverride > 0)
{
return static_cast<uint32_t>(mSubscriptionResumptionRetrySecondsOverride);
}
#endif
if (mNumSubscriptionResumptionRetries > CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION_MAX_FIBONACCI_STEP_INDEX)
{
return CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION_MAX_RETRY_INTERVAL_SECS;
Expand Down
20 changes: 19 additions & 1 deletion src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,19 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
//
void SetForceHandlerQuota(bool forceHandlerQuota) { mForceHandlerQuota = forceHandlerQuota; }

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
//
// Override the subscription timeout resumption retry interval seconds. The default retry interval will be
// 300s + GetFibonacciForIndex(retry_times) * 300s, which is too long for unit-tests.
//
// If -1 is passed in, no override is instituted and default behavior resumes.
//
void SetSubscriptionTimeoutResumptionRetryIntervalSeconds(int32_t seconds)
{
mSubscriptionResumptionRetrySecondsOverride = seconds;
}
#endif

//
// When testing subscriptions using the high-level APIs in src/controller/ReadInteraction.h,
// they don't provide for the ability to shut down those subscriptions after they've been established.
Expand Down Expand Up @@ -384,6 +397,8 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
void OnDone(CommandHandler & apCommandObj) override;
void OnDone(ReadHandler & apReadObj) override;

void TryToResumeSubscriptions();

ReadHandler::ApplicationCallback * GetAppCallback() override { return mpReadHandlerApplicationCallback; }

CHIP_ERROR OnUnsolicitedMessageReceived(const PayloadHeader & payloadHeader, ExchangeDelegate *& newDelegate) override;
Expand Down Expand Up @@ -627,7 +642,10 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
// enforce such check based on the configured size. This flag is used for unit tests only, there is another compare time flag
// CHIP_CONFIG_IM_FORCE_FABRIC_QUOTA_CHECK for stress tests.
bool mForceHandlerQuota = false;
#endif
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
int mSubscriptionResumptionRetrySecondsOverride = -1;
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
#endif // CONFIG_BUILD_FOR_HOST_UNIT_TEST

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
bool HasSubscriptionsToResume();
Expand Down
16 changes: 16 additions & 0 deletions src/app/SimpleSubscriptionResumptionStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kClusterIdTag;
constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kAttributeIdTag;
constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kEventIdTag;
constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kEventPathTypeTag;
constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kResumptionRetriesTag;

SimpleSubscriptionResumptionStorage::SimpleSubscriptionInfoIterator::SimpleSubscriptionInfoIterator(
SimpleSubscriptionResumptionStorage & storage) :
Expand Down Expand Up @@ -252,6 +253,18 @@ CHIP_ERROR SimpleSubscriptionResumptionStorage::Load(uint16_t subscriptionIndex,
}
ReturnErrorOnFailure(reader.ExitContainer(eventsListType));

#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
// If the reader cannot get resumption retries, set it to 0 for subscriptionInfo
if (reader.Next(kResumptionRetriesTag) == CHIP_NO_ERROR)
{
ReturnErrorOnFailure(reader.Get(subscriptionInfo.mResumptionRetries));
}
else
{
subscriptionInfo.mResumptionRetries = 0;
}
#endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION

ReturnErrorOnFailure(reader.ExitContainer(subscriptionContainerType));

return CHIP_NO_ERROR;
Expand Down Expand Up @@ -307,6 +320,9 @@ CHIP_ERROR SimpleSubscriptionResumptionStorage::Save(TLV::TLVWriter & writer, Su
ReturnErrorOnFailure(writer.EndContainer(eventContainerType));
}
ReturnErrorOnFailure(writer.EndContainer(eventsListType));
#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
ReturnErrorOnFailure(writer.Put(kResumptionRetriesTag, subscriptionInfo.mResumptionRetries));
#endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION

ReturnErrorOnFailure(writer.EndContainer(subscriptionContainerType));

Expand Down
1 change: 1 addition & 0 deletions src/app/SimpleSubscriptionResumptionStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class SimpleSubscriptionResumptionStorage : public SubscriptionResumptionStorage
static constexpr TLV::Tag kAttributeIdTag = TLV::ContextTag(13);
static constexpr TLV::Tag kEventIdTag = TLV::ContextTag(14);
static constexpr TLV::Tag kEventPathTypeTag = TLV::ContextTag(16);
static constexpr TLV::Tag kResumptionRetriesTag = TLV::ContextTag(17);

PersistentStorageDelegate * mStorage;
ObjectPool<SimpleSubscriptionInfoIterator, kIteratorsMax> mSubscriptionInfoIterators;
Expand Down
33 changes: 29 additions & 4 deletions src/app/SubscriptionResumptionSessionEstablisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ SubscriptionResumptionSessionEstablisher::ResumeSubscription(
mSubscriptionInfo.mMinInterval = subscriptionInfo.mMinInterval;
mSubscriptionInfo.mMaxInterval = subscriptionInfo.mMaxInterval;
mSubscriptionInfo.mFabricFiltered = subscriptionInfo.mFabricFiltered;
#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
mSubscriptionInfo.mResumptionRetries = subscriptionInfo.mResumptionRetries;
#endif
// Copy the Attribute Paths and Event Paths
if (subscriptionInfo.mAttributePaths.AllocatedSize() > 0)
{
Expand Down Expand Up @@ -100,6 +103,15 @@ void SubscriptionResumptionSessionEstablisher::HandleDeviceConnected(void * cont
return;
}
readHandler->OnSubscriptionResumed(sessionHandle, *establisher);
#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
// Reset the resumption retries to 0 if subscription is resumed
subscriptionInfo.mResumptionRetries = 0;
auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage();
if (subscriptionResumptionStorage)
{
subscriptionResumptionStorage->Save(subscriptionInfo);
}
#endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
}

void SubscriptionResumptionSessionEstablisher::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId,
Expand All @@ -109,12 +121,25 @@ void SubscriptionResumptionSessionEstablisher::HandleDeviceConnectionFailure(voi
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo = establisher->mSubscriptionInfo;
ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'",
error.Format());
// If the device fails to establish the session, the subscriber might be offline and its subscription read client will
// be deleted when the device reconnect to the subscriber. This subscription will be never used again. So clean up
// the persistent subscription information storage.
auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage();
if (subscriptionResumptionStorage)
if (!subscriptionResumptionStorage)
{
ChipLogError(DataManagement, "Failed to get subscription resumption storage");
return;
}
#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
if (subscriptionInfo.mResumptionRetries <= CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION_MAX_FIBONACCI_STEP_INDEX)
{
InteractionModelEngine::GetInstance()->TryToResumeSubscriptions();
subscriptionInfo.mResumptionRetries++;
subscriptionResumptionStorage->Save(subscriptionInfo);
}
else
#endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
{
// If the device fails to establish the session several times, the subscriber might be offline and its subscription
// read client will be deleted when the device reconnects to the subscriber. This subscription will be never used again.
// Clean up the persistent subscription information storage.
subscriptionResumptionStorage->Delete(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex,
subscriptionInfo.mSubscriptionId);
}
Expand Down
3 changes: 3 additions & 0 deletions src/app/SubscriptionResumptionStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class SubscriptionResumptionStorage
NodeId mNodeId;
FabricIndex mFabricIndex;
SubscriptionId mSubscriptionId;
#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
uint32_t mResumptionRetries;
#endif
uint16_t mMinInterval;
uint16_t mMaxInterval;
bool mFabricFiltered;
Expand Down
Loading

0 comments on commit 888115f

Please sign in to comment.