diff --git a/.github/workflows/darwin.yaml b/.github/workflows/darwin.yaml index 62a8d08f09ec4a..ad2a513209cc65 100644 --- a/.github/workflows/darwin.yaml +++ b/.github/workflows/darwin.yaml @@ -115,6 +115,11 @@ jobs: echo "This is a simple log" > /tmp/darwin/framework-tests/end_user_support_log.txt ../../../out/debug/chip-all-clusters-app --interface-id -1 --end_user_support_log /tmp/darwin/framework-tests/end_user_support_log.txt > >(tee /tmp/darwin/framework-tests/all-cluster-app.log) 2> >(tee /tmp/darwin/framework-tests/all-cluster-app-err.log >&2) & ../../../out/debug/chip-all-clusters-app --interface-id -1 --dac_provider ../../../credentials/development/commissioner_dut/struct_cd_origin_pid_vid_correct/test_case_vector.json --product-id 32768 --discriminator 3839 --secured-device-port 5539 --KVS /tmp/chip-all-clusters-app-kvs2 > >(tee /tmp/darwin/framework-tests/all-cluster-app-origin-vid.log) 2> >(tee /tmp/darwin/framework-tests/all-cluster-app-origin-vid-err.log >&2) & + ../../../out/debug/chip-all-clusters-app --interface-id -1 --discriminator 101 --passcode 1001 --KVS /tmp/chip-all-clusters-app-kvs101 --secured-device-port 5531 & + ../../../out/debug/chip-all-clusters-app --interface-id -1 --discriminator 102 --passcode 1002 --KVS /tmp/chip-all-clusters-app-kvs102 --secured-device-port 5532 & + ../../../out/debug/chip-all-clusters-app --interface-id -1 --discriminator 103 --passcode 1003 --KVS /tmp/chip-all-clusters-app-kvs103 --secured-device-port 5533 & + ../../../out/debug/chip-all-clusters-app --interface-id -1 --discriminator 104 --passcode 1004 --KVS /tmp/chip-all-clusters-app-kvs104 --secured-device-port 5534 & + ../../../out/debug/chip-all-clusters-app --interface-id -1 --discriminator 105 --passcode 1005 --KVS /tmp/chip-all-clusters-app-kvs105 --secured-device-port 5535 & export TEST_RUNNER_ASAN_OPTIONS=__CURRENT_VALUE__:detect_stack_use_after_return=1 diff --git a/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h b/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h index 371bd8db3a4bd0..ebcf3aae949433 100644 --- a/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h +++ b/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h @@ -100,9 +100,6 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac mClusterStateCache = std::move(aClusterStateCache); } - // Used to reset Resubscription backoff on events that indicate likely availability of device to come back online - void ResetResubscriptionBackoff() { mResubscriptionNumRetries = 0; } - protected: // Report an error, which may be due to issues in our own internal state or // due to the OnError callback happening. @@ -147,6 +144,8 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac NSMutableArray * _Nullable mAttributeReports = nil; NSMutableArray * _Nullable mEventReports = nil; + void CallResubscriptionScheduledHandler(NSError * error, NSNumber * resubscriptionDelay); + private: DataReportCallback _Nullable mAttributeReportCallback = nil; DataReportCallback _Nullable mEventReportCallback = nil; @@ -181,10 +180,6 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac bool mHaveQueuedDeletion = false; OnDoneHandler _Nullable mOnDoneHandler = nil; dispatch_block_t mInterimReportBlock = nil; - - // Copied from ReadClient and customized for - uint32_t ComputeTimeTillNextSubscription(); - uint32_t mResubscriptionNumRetries = 0; }; NS_ASSUME_NONNULL_END diff --git a/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm b/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm index 684333bad5f47a..ca91aed5016126 100644 --- a/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm +++ b/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm @@ -125,57 +125,29 @@ void MTRBaseSubscriptionCallback::OnSubscriptionEstablished(SubscriptionId aSubscriptionId) { - // ReadClient resets it at ProcessSubscribeResponse after calling OnSubscriptionEstablished, so this is equivalent - mResubscriptionNumRetries = 0; if (mSubscriptionEstablishedHandler) { auto subscriptionEstablishedHandler = mSubscriptionEstablishedHandler; subscriptionEstablishedHandler(); } } -uint32_t MTRBaseSubscriptionCallback::ComputeTimeTillNextSubscription() +CHIP_ERROR MTRBaseSubscriptionCallback::OnResubscriptionNeeded(ReadClient * apReadClient, CHIP_ERROR aTerminationCause) { - uint32_t maxWaitTimeInMsec = 0; - uint32_t waitTimeInMsec = 0; - uint32_t minWaitTimeInMsec = 0; - - if (mResubscriptionNumRetries <= CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX) { - maxWaitTimeInMsec = GetFibonacciForIndex(mResubscriptionNumRetries) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS; - } else { - maxWaitTimeInMsec = CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS; - } + CHIP_ERROR err = ClusterStateCache::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause); + ReturnErrorOnFailure(err); - if (maxWaitTimeInMsec != 0) { - minWaitTimeInMsec = (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100; - waitTimeInMsec = minWaitTimeInMsec + (Crypto::GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec)); - } - - return waitTimeInMsec; + auto error = [MTRError errorForCHIPErrorCode:aTerminationCause]; + auto delayMs = @(apReadClient->ComputeTimeTillNextSubscription()); + CallResubscriptionScheduledHandler(error, delayMs); + return CHIP_NO_ERROR; } -CHIP_ERROR MTRBaseSubscriptionCallback::OnResubscriptionNeeded(ReadClient * apReadClient, CHIP_ERROR aTerminationCause) +void MTRBaseSubscriptionCallback::CallResubscriptionScheduledHandler(NSError * error, NSNumber * resubscriptionDelay) { - // No need to check ReadClient internal state is Idle because ReadClient only calls OnResubscriptionNeeded after calling ClearActiveSubscriptionState(), which sets the state to Idle. - - // This part is copied from ReadClient's DefaultResubscribePolicy: - auto timeTillNextResubscription = ComputeTimeTillNextSubscription(); - ChipLogProgress(DataManagement, - "Will try to resubscribe to %02x:" ChipLogFormatX64 " at retry index %" PRIu32 " after %" PRIu32 - "ms due to error %" CHIP_ERROR_FORMAT, - apReadClient->GetFabricIndex(), ChipLogValueX64(apReadClient->GetPeerNodeId()), mResubscriptionNumRetries, timeTillNextResubscription, - aTerminationCause.Format()); - ReturnErrorOnFailure(apReadClient->ScheduleResubscription(timeTillNextResubscription, NullOptional, aTerminationCause == CHIP_ERROR_TIMEOUT)); - - // Not as good a place to increment as when resubscription timer fires, but as is, this should be as good, because OnResubscriptionNeeded is only called from ReadClient's Close() while Idle, and nothing should cause this to happen - mResubscriptionNumRetries++; - if (mResubscriptionCallback != nil) { auto callback = mResubscriptionCallback; - auto error = [MTRError errorForCHIPErrorCode:aTerminationCause]; - auto delayMs = @(apReadClient->ComputeTimeTillNextSubscription()); - callback(error, delayMs); + callback(error, resubscriptionDelay); } - return CHIP_NO_ERROR; } void MTRBaseSubscriptionCallback::OnUnsolicitedMessageFromPublisher(ReadClient *) diff --git a/src/darwin/Framework/CHIP/MTRDevice.mm b/src/darwin/Framework/CHIP/MTRDevice.mm index 478485d509ccd1..cead8878348998 100644 --- a/src/darwin/Framework/CHIP/MTRDevice.mm +++ b/src/darwin/Framework/CHIP/MTRDevice.mm @@ -42,6 +42,7 @@ #include "lib/core/CHIPError.h" #include "lib/core/DataModelTypes.h" #include +#include #include #include @@ -122,10 +123,19 @@ - (id)strongObject { } + // Used to reset Resubscription backoff on events that indicate likely availability of device to come back online + void ResetResubscriptionBackoff() { mResubscriptionNumRetries = 0; } + private: void OnEventData(const EventHeader & aEventHeader, TLV::TLVReader * apData, const StatusIB * apStatus) override; void OnAttributeData(const ConcreteDataAttributePath & aPath, TLV::TLVReader * apData, const StatusIB & aStatus) override; + + CHIP_ERROR OnResubscriptionNeeded(chip::app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override; + + // Copied from ReadClient and customized for MTRDevice resubscription time reset + uint32_t ComputeTimeTillNextSubscription(); + uint32_t mResubscriptionNumRetries = 0; }; } // anonymous namespace @@ -352,6 +362,9 @@ - (BOOL)unitTestShouldSetUpSubscriptionForDevice:(MTRDevice *)device; - (BOOL)unitTestShouldSkipExpectedValuesForWrite:(MTRDevice *)device; - (NSNumber *)unitTestMaxIntervalOverrideForSubscription:(MTRDevice *)device; - (BOOL)unitTestForceAttributeReportsIfMatchingCache:(MTRDevice *)device; +- (BOOL)unitTestPretendThreadEnabled:(MTRDevice *)device; +- (void)unitTestSubscriptionPoolDequeue:(MTRDevice *)device; +- (void)unitTestSubscriptionPoolWorkComplete:(MTRDevice *)device; @end #endif @@ -382,6 +395,12 @@ @implementation MTRDevice { // This boolean keeps track of any device configuration changes received in an attribute report. // If this is true when the report ends, we notify the delegate. BOOL _deviceConfigurationChanged; + + // The completion block is set when the subscription / resubscription work is enqueued, and called / cleared when any of the following happen: + // 1. Subscription establishes + // 2. OnResubscriptionNeeded is called + // 3. Subscription reset (including when getSessionForNode fails) + MTRAsyncWorkCompletionBlock _subscriptionPoolWorkCompletionBlock; } - (instancetype)initWithNodeID:(NSNumber *)nodeID controller:(MTRDeviceController *)controller @@ -673,7 +692,14 @@ - (void)setDelegate:(id)delegate queue:(dispatch_queue_t)queu } if (setUpSubscription) { - [self _setupSubscription]; + if ([self _deviceUsesThread]) { + [self _scheduleSubscriptionPoolWork:^{ + std::lock_guard lock(self->_lock); + [self _setupSubscription]; + } inNanoseconds:0 description:@"MTRDevice setDelegate first subscription"]; + } else { + [self _setupSubscription]; + } } } @@ -884,6 +910,9 @@ - (void)_handleSubscriptionEstablished { os_unfair_lock_lock(&self->_lock); + // We have completed the subscription work - remove from the subscription pool. + [self _clearSubscriptionPoolWork]; + // reset subscription attempt wait time when subscription succeeds _lastSubscriptionAttemptWait = 0; _internalDeviceState = MTRInternalDeviceStateInitalSubscriptionEstablished; @@ -919,9 +948,113 @@ - (void)_handleSubscriptionError:(NSError *)error [self _changeState:MTRDeviceStateUnreachable]; } -- (void)_handleResubscriptionNeeded +// This method is used for signaling whether to use the subscription pool. This functions as +// a heuristic for whether to throttle subscriptions to the device via a pool of subscriptions. +// If products appear that have both Thread and Wifi enabled but are primarily on wifi, this +// method will need to be updated to reflect that. +- (BOOL)_deviceUsesThread { - std::lock_guard lock(_lock); + os_unfair_lock_assert_owner(&self->_lock); + +#ifdef DEBUG + id testDelegate = _weakDelegate.strongObject; + if (testDelegate) { + // Note: This is a hack to allow our unit tests to test the subscription pooling behavior we have implemented for thread, so we mock devices to be a thread device + if ([testDelegate respondsToSelector:@selector(unitTestPretendThreadEnabled:)]) { + if ([testDelegate unitTestPretendThreadEnabled:self]) { + return YES; + } + } + } +#endif + + MTRClusterPath * networkCommissioningClusterPath = [MTRClusterPath clusterPathWithEndpointID:@(kRootEndpointId) clusterID:@(MTRClusterIDTypeNetworkCommissioningID)]; + MTRDeviceClusterData * networkCommissioningClusterData = [self _clusterDataForPath:networkCommissioningClusterPath]; + NSNumber * networkCommissioningClusterFeatureMapValueNumber = networkCommissioningClusterData.attributes[@(MTRClusterGlobalAttributeFeatureMapID)][MTRValueKey]; + + if (networkCommissioningClusterFeatureMapValueNumber == nil) + return NO; + if (![networkCommissioningClusterFeatureMapValueNumber isKindOfClass:[NSNumber class]]) { + MTR_LOG_ERROR("%@ Unexpected NetworkCommissioning FeatureMap value %@", self, networkCommissioningClusterFeatureMapValueNumber); + return NO; + } + + uint32_t networkCommissioningClusterFeatureMapValue = static_cast(networkCommissioningClusterFeatureMapValueNumber.unsignedLongValue); + + return (networkCommissioningClusterFeatureMapValue & MTRNetworkCommissioningFeatureThreadNetworkInterface) != 0 ? YES : NO; +} + +- (void)_clearSubscriptionPoolWork +{ + os_unfair_lock_assert_owner(&self->_lock); + MTRAsyncWorkCompletionBlock completion = self->_subscriptionPoolWorkCompletionBlock; + if (completion) { +#ifdef DEBUG + id delegate = self->_weakDelegate.strongObject; + if (delegate) { + dispatch_async(self->_delegateQueue, ^{ + if ([delegate respondsToSelector:@selector(unitTestSubscriptionPoolWorkComplete:)]) { + [delegate unitTestSubscriptionPoolWorkComplete:self]; + } + }); + } +#endif + self->_subscriptionPoolWorkCompletionBlock = nil; + completion(MTRAsyncWorkComplete); + } +} + +- (void)_scheduleSubscriptionPoolWork:(dispatch_block_t)workBlock inNanoseconds:(int64_t)inNanoseconds description:(NSString *)description +{ + os_unfair_lock_assert_owner(&self->_lock); + + // Sanity check we are not scheduling for this device multiple times in the pool + if (_subscriptionPoolWorkCompletionBlock) { + MTR_LOG_ERROR("%@ already scheduled in subscription pool for this device - ignoring: %@", self, description); + return; + } + + // Wait the required amount of time, then put it in the subscription pool to wait additionally for a spot, if needed + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, inNanoseconds), dispatch_get_main_queue(), ^{ + // In the case where a resubscription triggering event happened and already established, running the work block should result in a no-op + MTRAsyncWorkItem * workItem = [[MTRAsyncWorkItem alloc] initWithQueue:self.queue]; + [workItem setReadyHandler:^(id _Nonnull context, NSInteger retryCount, MTRAsyncWorkCompletionBlock _Nonnull completion) { + os_unfair_lock_lock(&self->_lock); +#ifdef DEBUG + id delegate = self->_weakDelegate.strongObject; + if (delegate) { + dispatch_async(self->_delegateQueue, ^{ + if ([delegate respondsToSelector:@selector(unitTestSubscriptionPoolDequeue:)]) { + [delegate unitTestSubscriptionPoolDequeue:self]; + } + }); + } +#endif + if (self->_subscriptionPoolWorkCompletionBlock) { + // This means a resubscription triggering event happened and is now in-progress + MTR_LOG_DEFAULT("%@ timer fired but already running in subscription pool - ignoring: %@", self, description); + os_unfair_lock_unlock(&self->_lock); + + // call completion as complete to remove from queue + completion(MTRAsyncWorkComplete); + return; + } + + // Otherwise, save the completion block + self->_subscriptionPoolWorkCompletionBlock = completion; + os_unfair_lock_unlock(&self->_lock); + + workBlock(); + }]; + [self->_deviceController.concurrentSubscriptionPool enqueueWorkItem:workItem description:description]; + }); +} + +- (void)_handleResubscriptionNeededWithDelay:(NSNumber *)resubscriptionDelayMs +{ + BOOL deviceUsesThread; + + os_unfair_lock_lock(&self->_lock); [self _changeState:MTRDeviceStateUnknown]; @@ -932,6 +1065,36 @@ - (void)_handleResubscriptionNeeded // retries immediately. _lastSubscriptionFailureTime = [NSDate now]; + deviceUsesThread = [self _deviceUsesThread]; + + // If a previous resubscription failed, remove the item from the subscription pool. + [self _clearSubscriptionPoolWork]; + + os_unfair_lock_unlock(&self->_lock); + + // Use the existing _triggerResubscribeWithReason mechanism, which does the right checks when + // this block is run -- if other triggering events had happened, this would become a no-op. + auto resubscriptionBlock = ^{ + [self->_deviceController asyncDispatchToMatterQueue:^{ + [self _triggerResubscribeWithReason:"ResubscriptionNeeded timer fired" nodeLikelyReachable:NO]; + } errorHandler:^(NSError * _Nonnull error) { + // If controller is not running, clear work item from the subscription queue + MTR_LOG_INFO("%@ could not dispatch to matter queue for resubscription - error %@", self, error); + std::lock_guard lock(self->_lock); + [self _clearSubscriptionPoolWork]; + }]; + }; + + int64_t resubscriptionDelayNs = static_cast(resubscriptionDelayMs.unsignedIntValue * NSEC_PER_MSEC); + if (deviceUsesThread) { + std::lock_guard lock(_lock); + // For Thread-enabled devices, schedule the _triggerResubscribeWithReason call to run in the subscription pool + [self _scheduleSubscriptionPoolWork:resubscriptionBlock inNanoseconds:resubscriptionDelayNs description:@"ReadClient resubscription"]; + } else { + // For non-Thread-enabled devices, just call the resubscription block after the specified time + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, resubscriptionDelayNs), self.queue, resubscriptionBlock); + } + // Set up connectivity monitoring in case network routability changes for the positive, to accellerate resubscription [self _setupConnectivityMonitoring]; } @@ -984,11 +1147,26 @@ - (void)_handleSubscriptionReset:(NSNumber * _Nullable)retryDelay } MTR_LOG_DEFAULT("%@ scheduling to reattempt subscription in %f seconds", self, secondsToWait); - dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t) (secondsToWait * NSEC_PER_SEC)), self.queue, ^{ + + // If we started subscription or session establishment but failed, remove item from the subscription pool so we can re-queue. + [self _clearSubscriptionPoolWork]; + + // Call _reattemptSubscriptionNowIfNeeded when timer fires - if subscription is + // in a better state at that time this will be a no-op. + auto resubscriptionBlock = ^{ os_unfair_lock_lock(&self->_lock); [self _reattemptSubscriptionNowIfNeeded]; os_unfair_lock_unlock(&self->_lock); - }); + }; + + int64_t resubscriptionDelayNs = static_cast(secondsToWait * NSEC_PER_SEC); + if ([self _deviceUsesThread]) { + // For Thread-enabled devices, schedule the _reattemptSubscriptionNowIfNeeded call to run in the subscription pool + [self _scheduleSubscriptionPoolWork:resubscriptionBlock inNanoseconds:resubscriptionDelayNs description:@"MTRDevice resubscription"]; + } else { + // For non-Thread-enabled devices, just call the resubscription block after the specified time + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, resubscriptionDelayNs), self.queue, resubscriptionBlock); + } } - (void)_reattemptSubscriptionNowIfNeeded @@ -1423,7 +1601,7 @@ - (void)_setupConnectivityMonitoring self->_connectivityMonitor = [[MTRDeviceConnectivityMonitor alloc] initWithCompressedFabricID:compressedFabricID nodeID:self.nodeID]; [self->_connectivityMonitor startMonitoringWithHandler:^{ [self->_deviceController asyncDispatchToMatterQueue:^{ - [self _triggerResubscribeWithReason:"read-through skipped while not subscribed" nodeLikelyReachable:YES]; + [self _triggerResubscribeWithReason:"device connectivity changed" nodeLikelyReachable:YES]; } errorHandler:nil]; } queue:self.queue]; @@ -1512,11 +1690,11 @@ - (void)_setupSubscription [self _handleSubscriptionError:error]; }); }, - ^(NSError * error, NSNumber * resubscriptionDelay) { - MTR_LOG_DEFAULT("%@ got resubscription error %@ delay %@", self, error, resubscriptionDelay); + ^(NSError * error, NSNumber * resubscriptionDelayMs) { + MTR_LOG_DEFAULT("%@ got resubscription error %@ delay %@", self, error, resubscriptionDelayMs); dispatch_async(self.queue, ^{ // OnResubscriptionNeeded - [self _handleResubscriptionNeeded]; + [self _handleResubscriptionNeededWithDelay:resubscriptionDelayMs]; }); }, ^(void) { @@ -3070,4 +3248,49 @@ - (void)invokeCommandWithEndpointID:(NSNumber *)endpointID QueueInterimReport(); } + +uint32_t SubscriptionCallback::ComputeTimeTillNextSubscription() +{ + uint32_t maxWaitTimeInMsec = 0; + uint32_t waitTimeInMsec = 0; + uint32_t minWaitTimeInMsec = 0; + + if (mResubscriptionNumRetries <= CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX) { + maxWaitTimeInMsec = GetFibonacciForIndex(mResubscriptionNumRetries) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS; + } else { + maxWaitTimeInMsec = CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS; + } + + if (maxWaitTimeInMsec != 0) { + minWaitTimeInMsec = (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100; + waitTimeInMsec = minWaitTimeInMsec + (Crypto::GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec)); + } + + return waitTimeInMsec; +} + +CHIP_ERROR SubscriptionCallback::OnResubscriptionNeeded(ReadClient * apReadClient, CHIP_ERROR aTerminationCause) +{ + // No need to check ReadClient internal state is Idle because ReadClient only calls OnResubscriptionNeeded after calling ClearActiveSubscriptionState(), which sets the state to Idle. + + // This part is copied from ReadClient's DefaultResubscribePolicy: + auto timeTillNextResubscriptionMs = ComputeTimeTillNextSubscription(); + ChipLogProgress(DataManagement, + "Will try to resubscribe to %02x:" ChipLogFormatX64 " at retry index %" PRIu32 " after %" PRIu32 + "ms due to error %" CHIP_ERROR_FORMAT, + apReadClient->GetFabricIndex(), ChipLogValueX64(apReadClient->GetPeerNodeId()), mResubscriptionNumRetries, timeTillNextResubscriptionMs, + aTerminationCause.Format()); + + // Schedule a maximum time resubscription, to be triggered with TriggerResubscribeIfScheduled after a separate timer. + // This way the aReestablishCASE value is saved, and the sanity checks in ScheduleResubscription are observed and returned. + ReturnErrorOnFailure(apReadClient->ScheduleResubscription(UINT32_MAX, NullOptional, aTerminationCause == CHIP_ERROR_TIMEOUT)); + + // Not as good a place to increment as when resubscription timer fires, but as is, this should be as good, because OnResubscriptionNeeded is only called from ReadClient's Close() while Idle, and nothing should cause this to happen + mResubscriptionNumRetries++; + + auto error = [MTRError errorForCHIPErrorCode:aTerminationCause]; + CallResubscriptionScheduledHandler(error, @(timeTillNextResubscriptionMs)); + + return CHIP_NO_ERROR; +} } // anonymous namespace diff --git a/src/darwin/Framework/CHIP/MTRDeviceController.mm b/src/darwin/Framework/CHIP/MTRDeviceController.mm index fe266c22a2fff9..278ccdfc092d1a 100644 --- a/src/darwin/Framework/CHIP/MTRDeviceController.mm +++ b/src/darwin/Framework/CHIP/MTRDeviceController.mm @@ -19,6 +19,7 @@ #import "MTRDeviceController_Internal.h" +#import "MTRAsyncWorkQueue.h" #import "MTRAttestationTrustStoreBridge.h" #import "MTRBaseDevice_Internal.h" #import "MTRCommissionableBrowser.h" @@ -144,13 +145,17 @@ - (nullable instancetype)initWithParameters:(MTRDeviceControllerAbstractParamete return [MTRDeviceControllerFactory.sharedInstance initializeController:self withParameters:controllerParameters error:error]; } +static NSString * const kLocalTestUserDefaultDomain = @"org.csa-iot.matter.darwintest"; +static NSString * const kLocalTestUserDefaultSubscriptionPoolSizeOverrideKey = @"subscriptionPoolSizeOverride"; + - (instancetype)initWithFactory:(MTRDeviceControllerFactory *)factory - queue:(dispatch_queue_t)queue - storageDelegate:(id _Nullable)storageDelegate - storageDelegateQueue:(dispatch_queue_t _Nullable)storageDelegateQueue - otaProviderDelegate:(id _Nullable)otaProviderDelegate - otaProviderDelegateQueue:(dispatch_queue_t _Nullable)otaProviderDelegateQueue - uniqueIdentifier:(NSUUID *)uniqueIdentifier + queue:(dispatch_queue_t)queue + storageDelegate:(id _Nullable)storageDelegate + storageDelegateQueue:(dispatch_queue_t _Nullable)storageDelegateQueue + otaProviderDelegate:(id _Nullable)otaProviderDelegate + otaProviderDelegateQueue:(dispatch_queue_t _Nullable)otaProviderDelegateQueue + uniqueIdentifier:(NSUUID *)uniqueIdentifier + concurrentSubscriptionPoolSize:(NSUInteger)concurrentSubscriptionPoolSize { if (self = [super init]) { // Make sure our storage is all set up to work as early as possible, @@ -250,6 +255,22 @@ - (instancetype)initWithFactory:(MTRDeviceControllerFactory *)factory return nil; } + // Provide a way to test different subscription pool sizes without code change + NSUserDefaults * defaults = [[NSUserDefaults alloc] initWithSuiteName:kLocalTestUserDefaultDomain]; + if ([defaults objectForKey:kLocalTestUserDefaultSubscriptionPoolSizeOverrideKey]) { + NSInteger subscriptionPoolSizeOverride = [defaults integerForKey:kLocalTestUserDefaultSubscriptionPoolSizeOverrideKey]; + if (subscriptionPoolSizeOverride < 1) { + concurrentSubscriptionPoolSize = 1; + } else { + concurrentSubscriptionPoolSize = static_cast(subscriptionPoolSizeOverride); + } + } + + if (!concurrentSubscriptionPoolSize) { + concurrentSubscriptionPoolSize = 1; + } + _concurrentSubscriptionPool = [[MTRAsyncWorkQueue alloc] initWithContext:self width:concurrentSubscriptionPoolSize]; + _storedFabricIndex = chip::kUndefinedFabricIndex; } return self; diff --git a/src/darwin/Framework/CHIP/MTRDeviceControllerFactory.mm b/src/darwin/Framework/CHIP/MTRDeviceControllerFactory.mm index 5886876923a89d..0ca254acb34115 100644 --- a/src/darwin/Framework/CHIP/MTRDeviceControllerFactory.mm +++ b/src/darwin/Framework/CHIP/MTRDeviceControllerFactory.mm @@ -472,6 +472,7 @@ - (MTRDeviceController * _Nullable)_startDeviceController:(MTRDeviceController * NSUUID * uniqueIdentifier; id _Nullable otaProviderDelegate; dispatch_queue_t _Nullable otaProviderDelegateQueue; + NSUInteger concurrentSubscriptionPoolSize = 0; if ([startupParams isKindOfClass:[MTRDeviceControllerParameters class]]) { MTRDeviceControllerParameters * params = startupParams; storageDelegate = params.storageDelegate; @@ -479,6 +480,7 @@ - (MTRDeviceController * _Nullable)_startDeviceController:(MTRDeviceController * uniqueIdentifier = params.uniqueIdentifier; otaProviderDelegate = params.otaProviderDelegate; otaProviderDelegateQueue = params.otaProviderDelegateQueue; + concurrentSubscriptionPoolSize = params.concurrentSubscriptionEstablishmentsAllowedOnThread; } else if ([startupParams isKindOfClass:[MTRDeviceControllerStartupParams class]]) { MTRDeviceControllerStartupParams * params = startupParams; storageDelegate = nil; @@ -539,7 +541,8 @@ - (MTRDeviceController * _Nullable)_startDeviceController:(MTRDeviceController * storageDelegateQueue:storageDelegateQueue otaProviderDelegate:otaProviderDelegate otaProviderDelegateQueue:otaProviderDelegateQueue - uniqueIdentifier:uniqueIdentifier]; + uniqueIdentifier:uniqueIdentifier + concurrentSubscriptionPoolSize:concurrentSubscriptionPoolSize]; if (controller == nil) { if (error != nil) { *error = [MTRError errorForCHIPErrorCode:CHIP_ERROR_INVALID_ARGUMENT]; diff --git a/src/darwin/Framework/CHIP/MTRDeviceControllerParameters.h b/src/darwin/Framework/CHIP/MTRDeviceControllerParameters.h index 6cae94b69b80d3..d42032cad57dd2 100644 --- a/src/darwin/Framework/CHIP/MTRDeviceControllerParameters.h +++ b/src/darwin/Framework/CHIP/MTRDeviceControllerParameters.h @@ -77,6 +77,14 @@ MTR_AVAILABLE(ios(17.6), macos(14.6), watchos(10.6), tvos(17.6)) */ - (void)setOTAProviderDelegate:(id)otaProviderDelegate queue:(dispatch_queue_t)queue; +/** + * Sets the maximum simultaneous subscription establishments that can be happening + * at one time for devices on Thread. This defaults to a large number. + * + * If this value is 0, the maximum subscription establishments allowed at a time will be set to 1. + */ +@property (nonatomic, assign) NSUInteger concurrentSubscriptionEstablishmentsAllowedOnThread MTR_NEWLY_AVAILABLE; + @end MTR_AVAILABLE(ios(17.6), macos(14.6), watchos(10.6), tvos(17.6)) diff --git a/src/darwin/Framework/CHIP/MTRDeviceControllerStartupParams.mm b/src/darwin/Framework/CHIP/MTRDeviceControllerStartupParams.mm index c48c48c4a424d2..a97bae4ece6c6c 100644 --- a/src/darwin/Framework/CHIP/MTRDeviceControllerStartupParams.mm +++ b/src/darwin/Framework/CHIP/MTRDeviceControllerStartupParams.mm @@ -254,6 +254,8 @@ - (instancetype)_initInternal } @end +constexpr NSUInteger kDefaultConcurrentSubscriptionPoolSize = 300; + @implementation MTRDeviceControllerParameters - (instancetype)initWithStorageDelegate:(id)storageDelegate storageDelegateQueue:(dispatch_queue_t)storageDelegateQueue @@ -286,6 +288,8 @@ - (instancetype)initWithStorageDelegate:(id) _storageDelegateQueue = storageDelegateQueue; _uniqueIdentifier = uniqueIdentifier; + _concurrentSubscriptionEstablishmentsAllowedOnThread = kDefaultConcurrentSubscriptionPoolSize; + return self; } diff --git a/src/darwin/Framework/CHIP/MTRDeviceController_Internal.h b/src/darwin/Framework/CHIP/MTRDeviceController_Internal.h index 4235f17cb5a0dc..8aefa481ba7616 100644 --- a/src/darwin/Framework/CHIP/MTRDeviceController_Internal.h +++ b/src/darwin/Framework/CHIP/MTRDeviceController_Internal.h @@ -42,6 +42,7 @@ @class MTRDeviceControllerStartupParamsInternal; @class MTRDeviceControllerFactory; @class MTRDevice; +@class MTRAsyncWorkQueue; namespace chip { class FabricTable; @@ -94,18 +95,25 @@ NS_ASSUME_NONNULL_BEGIN @property (nonatomic, readonly, nullable) id otaProviderDelegate; @property (nonatomic, readonly, nullable) dispatch_queue_t otaProviderDelegateQueue; +/** + * A queue with a fixed width that allows a number of MTRDevice objects to perform + * subscription at the same time. + */ +@property (nonatomic, readonly) MTRAsyncWorkQueue * concurrentSubscriptionPool; + /** * Init a newly created controller. * * Only MTRDeviceControllerFactory should be calling this. */ - (instancetype)initWithFactory:(MTRDeviceControllerFactory *)factory - queue:(dispatch_queue_t)queue - storageDelegate:(id _Nullable)storageDelegate - storageDelegateQueue:(dispatch_queue_t _Nullable)storageDelegateQueue - otaProviderDelegate:(id _Nullable)otaProviderDelegate - otaProviderDelegateQueue:(dispatch_queue_t _Nullable)otaProviderDelegateQueue - uniqueIdentifier:(NSUUID *)uniqueIdentifier; + queue:(dispatch_queue_t)queue + storageDelegate:(id _Nullable)storageDelegate + storageDelegateQueue:(dispatch_queue_t _Nullable)storageDelegateQueue + otaProviderDelegate:(id _Nullable)otaProviderDelegate + otaProviderDelegateQueue:(dispatch_queue_t _Nullable)otaProviderDelegateQueue + uniqueIdentifier:(NSUUID *)uniqueIdentifier + concurrentSubscriptionPoolSize:(NSUInteger)concurrentSubscriptionPoolSize; /** * Check whether this controller is running on the given fabric, as represented diff --git a/src/darwin/Framework/CHIPTests/MTRCommissionableBrowserTests.m b/src/darwin/Framework/CHIPTests/MTRCommissionableBrowserTests.m index 047ce4f8588816..4253625af80ff7 100644 --- a/src/darwin/Framework/CHIPTests/MTRCommissionableBrowserTests.m +++ b/src/darwin/Framework/CHIPTests/MTRCommissionableBrowserTests.m @@ -34,8 +34,13 @@ static const uint16_t kTestProductId2 = 0x8001u; static const uint16_t kTestDiscriminator1 = 3840u; static const uint16_t kTestDiscriminator2 = 3839u; +static const uint16_t kTestDiscriminator3 = 101u; +static const uint16_t kTestDiscriminator4 = 102u; +static const uint16_t kTestDiscriminator5 = 103u; +static const uint16_t kTestDiscriminator6 = 104u; +static const uint16_t kTestDiscriminator7 = 105u; static const uint16_t kDiscoverDeviceTimeoutInSeconds = 10; -static const uint16_t kExpectedDiscoveredDevicesCount = 2; +static const uint16_t kExpectedDiscoveredDevicesCount = 7; // Singleton controller we use. static MTRDeviceController * sController = nil; @@ -97,7 +102,7 @@ - (void)controller:(MTRDeviceController *)controller didFindCommissionableDevice XCTAssertEqual(instanceName.length, 16); // The instance name is random, so just ensure the len is right. XCTAssertEqualObjects(vendorId, @(kTestVendorId)); XCTAssertTrue([productId isEqual:@(kTestProductId1)] || [productId isEqual:@(kTestProductId2)]); - XCTAssertTrue([discriminator isEqual:@(kTestDiscriminator1)] || [discriminator isEqual:@(kTestDiscriminator2)]); + XCTAssertTrue([discriminator isEqual:@(kTestDiscriminator1)] || [discriminator isEqual:@(kTestDiscriminator2)] || [discriminator isEqual:@(kTestDiscriminator3)] || [discriminator isEqual:@(kTestDiscriminator4)] || [discriminator isEqual:@(kTestDiscriminator5)] || [discriminator isEqual:@(kTestDiscriminator6)] || [discriminator isEqual:@(kTestDiscriminator7)]); XCTAssertEqual(commissioningMode, YES); NSLog(@"Found Device (%@) with discriminator: %@ (vendor: %@, product: %@)", instanceName, discriminator, vendorId, productId); diff --git a/src/darwin/Framework/CHIPTests/MTRPerControllerStorageTests.m b/src/darwin/Framework/CHIPTests/MTRPerControllerStorageTests.m index 06968b00292ef4..49679dc26c72bd 100644 --- a/src/darwin/Framework/CHIPTests/MTRPerControllerStorageTests.m +++ b/src/darwin/Framework/CHIPTests/MTRPerControllerStorageTests.m @@ -31,6 +31,7 @@ static const uint16_t kTimeoutInSeconds = 3; static NSString * kOnboardingPayload = @"MT:-24J0AFN00KA0648G00"; static const uint16_t kTestVendorId = 0xFFF1u; +static const uint16_t kSubscriptionPoolBaseTimeoutInSeconds = 10; @interface MTRPerControllerStorageTestsControllerDelegate : NSObject @property (nonatomic, strong) XCTestExpectation * expectation; @@ -226,6 +227,11 @@ - (void)stopFactory // Test helpers - (void)commissionWithController:(MTRDeviceController *)controller newNodeID:(NSNumber *)newNodeID +{ + [self commissionWithController:controller newNodeID:newNodeID onboardingPayload:kOnboardingPayload]; +} + +- (void)commissionWithController:(MTRDeviceController *)controller newNodeID:(NSNumber *)newNodeID onboardingPayload:(NSString *)onboardingPayload { XCTestExpectation * expectation = [self expectationWithDescription:@"Pairing Complete"]; @@ -236,7 +242,7 @@ - (void)commissionWithController:(MTRDeviceController *)controller newNodeID:(NS [controller setDeviceControllerDelegate:deviceControllerDelegate queue:callbackQueue]; NSError * error; - __auto_type * payload = [MTRSetupPayload setupPayloadWithOnboardingPayload:kOnboardingPayload error:&error]; + __auto_type * payload = [MTRSetupPayload setupPayloadWithOnboardingPayload:onboardingPayload error:&error]; XCTAssertNil(error); XCTAssertNotNil(payload); @@ -2004,4 +2010,147 @@ - (void)testControllerServer [controllerServer shutdown]; } +static NSString * const kLocalTestUserDefaultDomain = @"org.csa-iot.matter.darwintest"; +static NSString * const kLocalTestUserDefaultSubscriptionPoolSizeOverrideKey = @"subscriptionPoolSizeOverride"; + +// TODO: This might also want to go in a separate test file, with some shared setup for commissioning devices per test +- (void)doTestSubscriptionPoolWithSize:(NSInteger)subscriptionPoolSize +{ + __auto_type * factory = [MTRDeviceControllerFactory sharedInstance]; + XCTAssertNotNil(factory); + + __auto_type queue = dispatch_get_main_queue(); + + __auto_type * rootKeys = [[MTRTestKeys alloc] init]; + XCTAssertNotNil(rootKeys); + + __auto_type * operationalKeys = [[MTRTestKeys alloc] init]; + XCTAssertNotNil(operationalKeys); + + __auto_type * storageDelegate = [[MTRTestPerControllerStorageWithBulkReadWrite alloc] initWithControllerID:[NSUUID UUID]]; + + NSNumber * nodeID = @(555); + NSNumber * fabricID = @(555); + + NSError * error; + + NSUserDefaults * defaults = [[NSUserDefaults alloc] initWithSuiteName:kLocalTestUserDefaultDomain]; + NSNumber * subscriptionPoolSizeOverrideOriginalValue = [defaults objectForKey:kLocalTestUserDefaultSubscriptionPoolSizeOverrideKey]; + + // Test DeviceController with a Subscription pool + [defaults setInteger:subscriptionPoolSize forKey:kLocalTestUserDefaultSubscriptionPoolSizeOverrideKey]; + + MTRPerControllerStorageTestsCertificateIssuer * certificateIssuer; + MTRDeviceController * controller = [self startControllerWithRootKeys:rootKeys + operationalKeys:operationalKeys + fabricID:fabricID + nodeID:nodeID + storage:storageDelegate + error:&error + certificateIssuer:&certificateIssuer]; + XCTAssertNil(error); + XCTAssertNotNil(controller); + XCTAssertTrue([controller isRunning]); + + XCTAssertEqualObjects(controller.controllerNodeID, nodeID); + + // QRCodes generated for discriminators 101~105 and passcodes 1001~1005 + NSArray * orderedDeviceIDs = @[ @(101), @(102), @(103), @(104), @(105) ]; + NSDictionary * deviceOnboardingPayloads = @{ + @(101) : @"MT:00000EBQ15IZC900000", + @(102) : @"MT:00000MNY16-AD900000", + @(103) : @"MT:00000UZ427GOD900000", + @(104) : @"MT:00000CQM00Z.D900000", + @(105) : @"MT:00000K0V01FDE900000", + }; + + // Commission 5 devices + for (NSNumber * deviceID in orderedDeviceIDs) { + certificateIssuer.nextNodeID = deviceID; + [self commissionWithController:controller newNodeID:deviceID onboardingPayload:deviceOnboardingPayloads[deviceID]]; + } + + // Set up expectations and delegates + + NSDictionary * subscriptionExpectations = @{ + @(101) : [self expectationWithDescription:@"Subscription 1 has been set up"], + @(102) : [self expectationWithDescription:@"Subscription 2 has been set up"], + @(103) : [self expectationWithDescription:@"Subscription 3 has been set up"], + @(104) : [self expectationWithDescription:@"Subscription 4 has been set up"], + @(105) : [self expectationWithDescription:@"Subscription 5 has been set up"], + }; + + NSDictionary * deviceDelegates = @{ + @(101) : [[MTRDeviceTestDelegate alloc] init], + @(102) : [[MTRDeviceTestDelegate alloc] init], + @(103) : [[MTRDeviceTestDelegate alloc] init], + @(104) : [[MTRDeviceTestDelegate alloc] init], + @(105) : [[MTRDeviceTestDelegate alloc] init], + }; + + // Test with counters + __block os_unfair_lock counterLock = OS_UNFAIR_LOCK_INIT; + __block NSUInteger subscriptionRunningCount = 0; + __block NSUInteger subscriptionDequeueCount = 0; + + for (NSNumber * deviceID in orderedDeviceIDs) { + MTRDeviceTestDelegate * delegate = deviceDelegates[deviceID]; + delegate.pretendThreadEnabled = YES; + + delegate.onSubscriptionPoolDequeue = ^{ + // Count subscribing when dequeued from the subscription pool + os_unfair_lock_lock(&counterLock); + subscriptionRunningCount++; + subscriptionDequeueCount++; + // At any given moment, only up to subscriptionPoolSize subcriptions can be going on + XCTAssertLessThanOrEqual(subscriptionRunningCount, subscriptionPoolSize); + os_unfair_lock_unlock(&counterLock); + }; + delegate.onSubscriptionPoolWorkComplete = ^{ + // Stop counting subscribing right before calling work item completion + os_unfair_lock_lock(&counterLock); + subscriptionRunningCount--; + os_unfair_lock_unlock(&counterLock); + }; + __weak __auto_type weakDelegate = delegate; + delegate.onReportEnd = ^{ + [subscriptionExpectations[deviceID] fulfill]; + // reset callback so expectation not fulfilled twice, given the run time of this can be long due to subscription pool + __strong __auto_type strongDelegate = weakDelegate; + strongDelegate.onReportEnd = nil; + }; + } + + for (NSNumber * deviceID in orderedDeviceIDs) { + __auto_type * device = [MTRDevice deviceWithNodeID:deviceID controller:controller]; + [device setDelegate:deviceDelegates[deviceID] queue:queue]; + } + + // Make the wait time depend on pool size and device count (can expand number of devices in the future) + [self waitForExpectations:subscriptionExpectations.allValues timeout:(kSubscriptionPoolBaseTimeoutInSeconds * orderedDeviceIDs.count / subscriptionPoolSize)]; + + XCTAssertEqual(subscriptionDequeueCount, orderedDeviceIDs.count); + + // Reset our commissionees. + for (NSNumber * deviceID in orderedDeviceIDs) { + __auto_type * baseDevice = [MTRBaseDevice deviceWithNodeID:deviceID controller:controller]; + ResetCommissionee(baseDevice, queue, self, kTimeoutInSeconds); + } + + [controller shutdown]; + XCTAssertFalse([controller isRunning]); + + if (subscriptionPoolSizeOverrideOriginalValue) { + [defaults setInteger:subscriptionPoolSizeOverrideOriginalValue.integerValue forKey:kLocalTestUserDefaultSubscriptionPoolSizeOverrideKey]; + } else { + [defaults removeObjectForKey:kLocalTestUserDefaultSubscriptionPoolSizeOverrideKey]; + } +} + +- (void)testSubscriptionPool +{ + [self doTestSubscriptionPoolWithSize:1]; + [self doTestSubscriptionPoolWithSize:2]; +} + @end diff --git a/src/darwin/Framework/CHIPTests/TestHelpers/MTRDeviceTestDelegate.h b/src/darwin/Framework/CHIPTests/TestHelpers/MTRDeviceTestDelegate.h index 3e749366e0d722..2492a184e2c69c 100644 --- a/src/darwin/Framework/CHIPTests/TestHelpers/MTRDeviceTestDelegate.h +++ b/src/darwin/Framework/CHIPTests/TestHelpers/MTRDeviceTestDelegate.h @@ -31,6 +31,9 @@ typedef void (^MTRDeviceTestDelegateDataHandler)(NSArray