Skip to content

Commit

Permalink
Add records of session establishment for subscription resumption (pro…
Browse files Browse the repository at this point in the history
…ject-chip#31755)

* Add records of session establishment for subscription resumption

* Restyled by clang-format

* review changes

* Schedule subscription resumption when failing to establish the session in SubscriptionResumptionSessionEstablisher

* Add option to set subscription timeout resumption retry interval seconds for Linux app
Add cirque test for subscription resumption timeout

* Restyled by clang-format

* Restyled by autopep8

* Restyled by isort

* fix CI building

* Add test to the test list

* add subscription resumption restries number to SubscriptionInfo struct

* review changes

* make resumption retries persistent

* Restyled by clang-format

* ci build fixes

* try to fix cirque test

---------

Co-authored-by: Restyled.io <[email protected]>
  • Loading branch information
2 people authored and erwinpan1 committed Mar 7, 2024
1 parent 212b10e commit 22984f3
Show file tree
Hide file tree
Showing 12 changed files with 361 additions and 9 deletions.
7 changes: 6 additions & 1 deletion examples/platform/linux/AppMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,12 @@ void ChipLinuxAppMainLoop(AppMainLoopImplementation * impl)
chip::app::InteractionModelEngine::GetInstance()->SetHandlerCapacityForSubscriptions(
LinuxDeviceOptions::GetInstance().subscriptionCapacity);
chip::app::InteractionModelEngine::GetInstance()->SetForceHandlerQuota(true);
#endif
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
// Set subscription time resumption retry interval seconds
chip::app::InteractionModelEngine::GetInstance()->SetSubscriptionTimeoutResumptionRetryIntervalSeconds(
LinuxDeviceOptions::GetInstance().subscriptionResumptionRetryIntervalSec);
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
#endif // CONFIG_BUILD_FOR_HOST_UNIT_TEST

// Now that the server has started and we are done with our startup logging,
// log our discovery/onboarding information again so it's not lost in the
Expand Down
11 changes: 10 additions & 1 deletion examples/platform/linux/Options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ enum
#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
kDeviceOption_SubscriptionCapacity = 0x1024,
#endif
kDeviceOption_WiFiSupports5g = 0x1025
kDeviceOption_WiFiSupports5g = 0x1025,
#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
kDeviceOption_SubscriptionResumptionRetryIntervalSec = 0x1026,
#endif
};

constexpr unsigned kAppUsageLength = 64;
Expand Down Expand Up @@ -151,6 +154,7 @@ OptionDef sDeviceOptionDefs[] = {
#endif
#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
{ "subscription-capacity", kArgumentRequired, kDeviceOption_SubscriptionCapacity },
{ "subscription-resumption-retry-interval", kArgumentRequired, kDeviceOption_SubscriptionResumptionRetryIntervalSec },
#endif
{}
};
Expand Down Expand Up @@ -280,6 +284,8 @@ const char * sDeviceOptionHelp =
#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
" --subscription-capacity\n"
" Max number of subscriptions the device will allow\n"
" --subscription-resumption-retry-interval\n"
" subscription timeout resumption retry interval in seconds\n"
#endif
"\n";

Expand Down Expand Up @@ -547,6 +553,9 @@ bool HandleOption(const char * aProgram, OptionSet * aOptions, int aIdentifier,
case kDeviceOption_SubscriptionCapacity:
LinuxDeviceOptions::GetInstance().subscriptionCapacity = static_cast<int32_t>(atoi(aValue));
break;
case kDeviceOption_SubscriptionResumptionRetryIntervalSec:
LinuxDeviceOptions::GetInstance().subscriptionResumptionRetryIntervalSec = static_cast<int32_t>(atoi(aValue));
break;
#endif
default:
PrintArgError("%s: INTERNAL ERROR: Unhandled option: %s\n", aProgram, aName);
Expand Down
3 changes: 2 additions & 1 deletion examples/platform/linux/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ struct LinuxDeviceOptions
uint16_t rpcServerPort = 33000;
#endif
#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
int32_t subscriptionCapacity = CHIP_IM_MAX_NUM_SUBSCRIPTIONS;
int32_t subscriptionCapacity = CHIP_IM_MAX_NUM_SUBSCRIPTIONS;
int32_t subscriptionResumptionRetryIntervalSec = -1;
#endif
static LinuxDeviceOptions & GetInstance();
};
Expand Down
1 change: 1 addition & 0 deletions scripts/tests/cirque_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ CIRQUE_TESTS=(
"CommissioningWindowTest"
"SubscriptionResumptionTest"
"SubscriptionResumptionCapacityTest"
"SubscriptionResumptionTimeoutTest"
)

BOLD_GREEN_TEXT="\033[1;32m"
Expand Down
14 changes: 13 additions & 1 deletion src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,11 @@ void InteractionModelEngine::OnDone(ReadHandler & apReadObj)
mReportingEngine.ResetReadHandlerTracker(&apReadObj);

mReadHandlers.ReleaseObject(&apReadObj);
TryToResumeSubscriptions();
}

void InteractionModelEngine::TryToResumeSubscriptions()
{
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
if (!mSubscriptionResumptionScheduled && HasSubscriptionsToResume())
{
Expand All @@ -398,8 +402,10 @@ void InteractionModelEngine::OnDone(ReadHandler & apReadObj)
mpExchangeMgr->GetSessionManager()->SystemLayer()->StartTimer(
System::Clock::Seconds32(timeTillNextSubscriptionResumptionSecs), ResumeSubscriptionsTimerCallback, this);
mNumSubscriptionResumptionRetries++;
ChipLogProgress(InteractionModel, "Schedule subscription resumption when failing to establish session, Retries: %" PRIu32,
mNumSubscriptionResumptionRetries);
}
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
}

Status InteractionModelEngine::OnInvokeCommandRequest(Messaging::ExchangeContext * apExchangeContext,
Expand Down Expand Up @@ -1990,6 +1996,12 @@ void InteractionModelEngine::ResumeSubscriptionsTimerCallback(System::Layer * ap
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
uint32_t InteractionModelEngine::ComputeTimeSecondsTillNextSubscriptionResumption()
{
#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
if (mSubscriptionResumptionRetrySecondsOverride > 0)
{
return static_cast<uint32_t>(mSubscriptionResumptionRetrySecondsOverride);
}
#endif
if (mNumSubscriptionResumptionRetries > CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION_MAX_FIBONACCI_STEP_INDEX)
{
return CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION_MAX_RETRY_INTERVAL_SECS;
Expand Down
20 changes: 19 additions & 1 deletion src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,19 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
//
void SetForceHandlerQuota(bool forceHandlerQuota) { mForceHandlerQuota = forceHandlerQuota; }

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
//
// Override the subscription timeout resumption retry interval seconds. The default retry interval will be
// 300s + GetFibonacciForIndex(retry_times) * 300s, which is too long for unit-tests.
//
// If -1 is passed in, no override is instituted and default behavior resumes.
//
void SetSubscriptionTimeoutResumptionRetryIntervalSeconds(int32_t seconds)
{
mSubscriptionResumptionRetrySecondsOverride = seconds;
}
#endif

//
// When testing subscriptions using the high-level APIs in src/controller/ReadInteraction.h,
// they don't provide for the ability to shut down those subscriptions after they've been established.
Expand Down Expand Up @@ -392,6 +405,8 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
void OnDone(CommandHandler & apCommandObj) override;
void OnDone(ReadHandler & apReadObj) override;

void TryToResumeSubscriptions();

ReadHandler::ApplicationCallback * GetAppCallback() override { return mpReadHandlerApplicationCallback; }

InteractionModelEngine * GetInteractionModelEngine() override { return this; }
Expand Down Expand Up @@ -637,7 +652,10 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
// enforce such check based on the configured size. This flag is used for unit tests only, there is another compare time flag
// CHIP_CONFIG_IM_FORCE_FABRIC_QUOTA_CHECK for stress tests.
bool mForceHandlerQuota = false;
#endif
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
int mSubscriptionResumptionRetrySecondsOverride = -1;
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
#endif // CONFIG_BUILD_FOR_HOST_UNIT_TEST

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS && CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
bool HasSubscriptionsToResume();
Expand Down
16 changes: 16 additions & 0 deletions src/app/SimpleSubscriptionResumptionStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kClusterIdTag;
constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kAttributeIdTag;
constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kEventIdTag;
constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kEventPathTypeTag;
constexpr TLV::Tag SimpleSubscriptionResumptionStorage::kResumptionRetriesTag;

SimpleSubscriptionResumptionStorage::SimpleSubscriptionInfoIterator::SimpleSubscriptionInfoIterator(
SimpleSubscriptionResumptionStorage & storage) :
Expand Down Expand Up @@ -252,6 +253,18 @@ CHIP_ERROR SimpleSubscriptionResumptionStorage::Load(uint16_t subscriptionIndex,
}
ReturnErrorOnFailure(reader.ExitContainer(eventsListType));

#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
// If the reader cannot get resumption retries, set it to 0 for subscriptionInfo
if (reader.Next(kResumptionRetriesTag) == CHIP_NO_ERROR)
{
ReturnErrorOnFailure(reader.Get(subscriptionInfo.mResumptionRetries));
}
else
{
subscriptionInfo.mResumptionRetries = 0;
}
#endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION

ReturnErrorOnFailure(reader.ExitContainer(subscriptionContainerType));

return CHIP_NO_ERROR;
Expand Down Expand Up @@ -307,6 +320,9 @@ CHIP_ERROR SimpleSubscriptionResumptionStorage::Save(TLV::TLVWriter & writer, Su
ReturnErrorOnFailure(writer.EndContainer(eventContainerType));
}
ReturnErrorOnFailure(writer.EndContainer(eventsListType));
#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
ReturnErrorOnFailure(writer.Put(kResumptionRetriesTag, subscriptionInfo.mResumptionRetries));
#endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION

ReturnErrorOnFailure(writer.EndContainer(subscriptionContainerType));

Expand Down
1 change: 1 addition & 0 deletions src/app/SimpleSubscriptionResumptionStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class SimpleSubscriptionResumptionStorage : public SubscriptionResumptionStorage
static constexpr TLV::Tag kAttributeIdTag = TLV::ContextTag(13);
static constexpr TLV::Tag kEventIdTag = TLV::ContextTag(14);
static constexpr TLV::Tag kEventPathTypeTag = TLV::ContextTag(16);
static constexpr TLV::Tag kResumptionRetriesTag = TLV::ContextTag(17);

PersistentStorageDelegate * mStorage;
ObjectPool<SimpleSubscriptionInfoIterator, kIteratorsMax> mSubscriptionInfoIterators;
Expand Down
33 changes: 29 additions & 4 deletions src/app/SubscriptionResumptionSessionEstablisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ SubscriptionResumptionSessionEstablisher::ResumeSubscription(
mSubscriptionInfo.mMinInterval = subscriptionInfo.mMinInterval;
mSubscriptionInfo.mMaxInterval = subscriptionInfo.mMaxInterval;
mSubscriptionInfo.mFabricFiltered = subscriptionInfo.mFabricFiltered;
#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
mSubscriptionInfo.mResumptionRetries = subscriptionInfo.mResumptionRetries;
#endif
// Copy the Attribute Paths and Event Paths
if (subscriptionInfo.mAttributePaths.AllocatedSize() > 0)
{
Expand Down Expand Up @@ -100,6 +103,15 @@ void SubscriptionResumptionSessionEstablisher::HandleDeviceConnected(void * cont
return;
}
readHandler->OnSubscriptionResumed(sessionHandle, *establisher);
#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
// Reset the resumption retries to 0 if subscription is resumed
subscriptionInfo.mResumptionRetries = 0;
auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage();
if (subscriptionResumptionStorage)
{
subscriptionResumptionStorage->Save(subscriptionInfo);
}
#endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
}

void SubscriptionResumptionSessionEstablisher::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId,
Expand All @@ -109,12 +121,25 @@ void SubscriptionResumptionSessionEstablisher::HandleDeviceConnectionFailure(voi
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo = establisher->mSubscriptionInfo;
ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'",
error.Format());
// If the device fails to establish the session, the subscriber might be offline and its subscription read client will
// be deleted when the device reconnect to the subscriber. This subscription will be never used again. So clean up
// the persistent subscription information storage.
auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage();
if (subscriptionResumptionStorage)
if (!subscriptionResumptionStorage)
{
ChipLogError(DataManagement, "Failed to get subscription resumption storage");
return;
}
#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
if (subscriptionInfo.mResumptionRetries <= CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION_MAX_FIBONACCI_STEP_INDEX)
{
InteractionModelEngine::GetInstance()->TryToResumeSubscriptions();
subscriptionInfo.mResumptionRetries++;
subscriptionResumptionStorage->Save(subscriptionInfo);
}
else
#endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
{
// If the device fails to establish the session several times, the subscriber might be offline and its subscription
// read client will be deleted when the device reconnects to the subscriber. This subscription will be never used again.
// Clean up the persistent subscription information storage.
subscriptionResumptionStorage->Delete(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex,
subscriptionInfo.mSubscriptionId);
}
Expand Down
3 changes: 3 additions & 0 deletions src/app/SubscriptionResumptionStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class SubscriptionResumptionStorage
NodeId mNodeId;
FabricIndex mFabricIndex;
SubscriptionId mSubscriptionId;
#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
uint32_t mResumptionRetries;
#endif
uint16_t mMinInterval;
uint16_t mMaxInterval;
bool mFabricFiltered;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#!/usr/bin/env python3

#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Commissioning test.

import os
import sys
from optparse import OptionParser

from base import BaseTestHelper, FailIfNot, TestFail, TestTimeout, logger

TEST_DISCRIMINATOR = 3840
TEST_SETUPPIN = 20202021

TEST_ENDPOINT_ID = 0


def main():
optParser = OptionParser()
optParser.add_option(
"-t",
"--timeout",
action="store",
dest="testTimeout",
default=90,
type='int',
help="The program will return with timeout after specified seconds.",
metavar="<timeout-second>",
)
optParser.add_option(
"-a",
"--address",
action="store",
dest="deviceAddress",
default='',
type='str',
help="Address of the device",
metavar="<device-addr>",
)
optParser.add_option(
"--nodeid",
action="store",
dest="nodeid",
default=1,
type=int,
help="The Node ID issued to the device",
metavar="<nodeid>"
)
optParser.add_option(
"--discriminator",
action="store",
dest="discriminator",
default=TEST_DISCRIMINATOR,
type=int,
help="Discriminator of the device",
metavar="<nodeid>"
)
optParser.add_option(
"--setuppin",
action="store",
dest="setuppin",
default=TEST_SETUPPIN,
type=int,
help="Setup PIN of the device",
metavar="<nodeid>"
)
optParser.add_option(
"-p",
"--paa-trust-store-path",
action="store",
dest="paaTrustStorePath",
default='',
type='str',
help="Path that contains valid and trusted PAA Root Certificates.",
metavar="<paa-trust-store-path>"
)

(options, remainingArgs) = optParser.parse_args(sys.argv[1:])

timeoutTicker = TestTimeout(options.testTimeout)
timeoutTicker.start()

test = BaseTestHelper(
nodeid=112233, paaTrustStorePath=options.paaTrustStorePath, testCommissioner=True)

FailIfNot(
test.TestOnNetworkCommissioning(options.discriminator, options.setuppin, options.nodeid, options.deviceAddress),
"Failed on on-network commissioing")
try:
test.devCtrl.ZCLSubscribeAttribute("BasicInformation", "NodeLabel", options.nodeid, TEST_ENDPOINT_ID, 1, 2,
keepSubscriptions=True, autoResubscribe=False)
except Exception as ex:
TestFail(f"Failed to subscribe attribute: {ex}")

timeoutTicker.stop()

logger.info("Test finished")

# TODO: Python device controller cannot be shutdown clean sometimes and will block on AsyncDNSResolverSockets shutdown.
# Call os._exit(0) to force close it.
os._exit(0)


if __name__ == "__main__":
try:
main()
except Exception as ex:
logger.exception(ex)
TestFail("Exception occurred when running tests.")
Loading

0 comments on commit 22984f3

Please sign in to comment.