diff --git a/src/darwin/Framework/CHIP/MTRDevice.mm b/src/darwin/Framework/CHIP/MTRDevice.mm index 9a9b2eb340d107..2018b9fca4fc48 100644 --- a/src/darwin/Framework/CHIP/MTRDevice.mm +++ b/src/darwin/Framework/CHIP/MTRDevice.mm @@ -1061,6 +1061,12 @@ - (void)_handleSubscriptionError:(NSError *)error [self _changeState:MTRDeviceStateUnreachable]; } +- (BOOL)deviceUsesThread +{ + std::lock_guard lock(_lock); + return [self _deviceUsesThread]; +} + // This method is used for signaling whether to use the subscription pool. This functions as // a heuristic for whether to throttle subscriptions to the device via a pool of subscriptions. // If products appear that have both Thread and Wifi enabled but are primarily on wifi, this @@ -2055,194 +2061,195 @@ - (void)_setupSubscriptionWithReason:(NSString *)reason }); } + // Call directlyGetSessionForNode because the subscription setup already goes through the subscription pool queue [_deviceController - getSessionForNode:_nodeID.unsignedLongLongValue - completion:^(chip::Messaging::ExchangeManager * _Nullable exchangeManager, - const chip::Optional & session, NSError * _Nullable error, - NSNumber * _Nullable retryDelay) { - if (error != nil) { - MTR_LOG_ERROR("%@ getSessionForNode error %@", self, error); - dispatch_async(self.queue, ^{ - [self _handleSubscriptionError:error]; - [self _handleSubscriptionReset:retryDelay]; - }); - return; - } - - auto callback = std::make_unique( - ^(NSArray * value) { - MTR_LOG("%@ got attribute report %@", self, value); - dispatch_async(self.queue, ^{ - // OnAttributeData - [self _handleAttributeReport:value fromSubscription:YES]; + directlyGetSessionForNode:_nodeID.unsignedLongLongValue + completion:^(chip::Messaging::ExchangeManager * _Nullable exchangeManager, + const chip::Optional & session, NSError * _Nullable error, + NSNumber * _Nullable retryDelay) { + if (error != nil) { + MTR_LOG_ERROR("%@ getSessionForNode error %@", self, error); + dispatch_async(self.queue, ^{ + [self _handleSubscriptionError:error]; + [self _handleSubscriptionReset:retryDelay]; + }); + return; + } + + auto callback = std::make_unique( + ^(NSArray * value) { + MTR_LOG("%@ got attribute report %@", self, value); + dispatch_async(self.queue, ^{ + // OnAttributeData + [self _handleAttributeReport:value fromSubscription:YES]; #ifdef DEBUG - self->_unitTestAttributesReportedSinceLastCheck += value.count; + self->_unitTestAttributesReportedSinceLastCheck += value.count; #endif - }); - }, - ^(NSArray * value) { - MTR_LOG("%@ got event report %@", self, value); - dispatch_async(self.queue, ^{ - // OnEventReport - [self _handleEventReport:value]; - }); - }, - ^(NSError * error) { - MTR_LOG_ERROR("%@ got subscription error %@", self, error); - dispatch_async(self.queue, ^{ - // OnError - [self _handleSubscriptionError:error]; - }); - }, - ^(NSError * error, NSNumber * resubscriptionDelayMs) { - MTR_LOG_ERROR("%@ got resubscription error %@ delay %@", self, error, resubscriptionDelayMs); - dispatch_async(self.queue, ^{ - // OnResubscriptionNeeded - [self _handleResubscriptionNeededWithDelay:resubscriptionDelayMs]; - }); - }, - ^(void) { - MTR_LOG("%@ got subscription established", self); - dispatch_async(self.queue, ^{ - // OnSubscriptionEstablished - [self _handleSubscriptionEstablished]; - }); - }, - ^(void) { - MTR_LOG("%@ got subscription done", self); - // Drop our pointer to the ReadClient immediately, since - // it's about to be destroyed and we don't want to be - // holding a dangling pointer. - std::lock_guard lock(self->_lock); - self->_currentReadClient = nullptr; - self->_currentSubscriptionCallback = nullptr; - - dispatch_async(self.queue, ^{ - // OnDone - [self _handleSubscriptionReset:nil]; - }); - }, - ^(void) { - MTR_LOG("%@ got unsolicited message from publisher", self); - dispatch_async(self.queue, ^{ - // OnUnsolicitedMessageFromPublisher - [self _handleUnsolicitedMessageFromPublisher]; - }); - }, - ^(void) { - MTR_LOG("%@ got report begin", self); - dispatch_async(self.queue, ^{ - [self _handleReportBegin]; - }); - }, - ^(void) { - MTR_LOG("%@ got report end", self); - dispatch_async(self.queue, ^{ - [self _handleReportEnd]; - }); - }); - - // Set up a cluster state cache. We just want this for the logic it has for - // tracking data versions and event numbers so we minimize the amount of data we - // request on resubscribes, so tell it not to store data. - auto clusterStateCache = std::make_unique(*callback.get(), - /* highestReceivedEventNumber = */ NullOptional, - /* cacheData = */ false); - auto readClient = std::make_unique(InteractionModelEngine::GetInstance(), exchangeManager, - clusterStateCache->GetBufferedCallback(), ReadClient::InteractionType::Subscribe); - - // Subscribe with data version filter list and retry with smaller list if out of packet space - CHIP_ERROR err; - NSDictionary * dataVersions = [self _getCachedDataVersions]; - size_t dataVersionFilterListSizeReduction = 0; - for (;;) { - // Wildcard endpoint, cluster, attribute, event. - auto attributePath = std::make_unique(); - auto eventPath = std::make_unique(); - // We want to get event reports at the minInterval, not the maxInterval. - eventPath->mIsUrgentEvent = true; - ReadPrepareParams readParams(session.Value()); - - readParams.mMinIntervalFloorSeconds = 0; - // Select a max interval based on the device's claimed idle sleep interval. - auto idleSleepInterval = std::chrono::duration_cast( - session.Value()->GetRemoteMRPConfig().mIdleRetransTimeout); - - auto maxIntervalCeilingMin = System::Clock::Seconds32(MTR_DEVICE_SUBSCRIPTION_MAX_INTERVAL_MIN); - if (idleSleepInterval < maxIntervalCeilingMin) { - idleSleepInterval = maxIntervalCeilingMin; - } - - auto maxIntervalCeilingMax = System::Clock::Seconds32(MTR_DEVICE_SUBSCRIPTION_MAX_INTERVAL_MAX); - if (idleSleepInterval > maxIntervalCeilingMax) { - idleSleepInterval = maxIntervalCeilingMax; - } + }); + }, + ^(NSArray * value) { + MTR_LOG("%@ got event report %@", self, value); + dispatch_async(self.queue, ^{ + // OnEventReport + [self _handleEventReport:value]; + }); + }, + ^(NSError * error) { + MTR_LOG_ERROR("%@ got subscription error %@", self, error); + dispatch_async(self.queue, ^{ + // OnError + [self _handleSubscriptionError:error]; + }); + }, + ^(NSError * error, NSNumber * resubscriptionDelayMs) { + MTR_LOG_ERROR("%@ got resubscription error %@ delay %@", self, error, resubscriptionDelayMs); + dispatch_async(self.queue, ^{ + // OnResubscriptionNeeded + [self _handleResubscriptionNeededWithDelay:resubscriptionDelayMs]; + }); + }, + ^(void) { + MTR_LOG("%@ got subscription established", self); + dispatch_async(self.queue, ^{ + // OnSubscriptionEstablished + [self _handleSubscriptionEstablished]; + }); + }, + ^(void) { + MTR_LOG("%@ got subscription done", self); + // Drop our pointer to the ReadClient immediately, since + // it's about to be destroyed and we don't want to be + // holding a dangling pointer. + std::lock_guard lock(self->_lock); + self->_currentReadClient = nullptr; + self->_currentSubscriptionCallback = nullptr; + + dispatch_async(self.queue, ^{ + // OnDone + [self _handleSubscriptionReset:nil]; + }); + }, + ^(void) { + MTR_LOG("%@ got unsolicited message from publisher", self); + dispatch_async(self.queue, ^{ + // OnUnsolicitedMessageFromPublisher + [self _handleUnsolicitedMessageFromPublisher]; + }); + }, + ^(void) { + MTR_LOG("%@ got report begin", self); + dispatch_async(self.queue, ^{ + [self _handleReportBegin]; + }); + }, + ^(void) { + MTR_LOG("%@ got report end", self); + dispatch_async(self.queue, ^{ + [self _handleReportEnd]; + }); + }); + + // Set up a cluster state cache. We just want this for the logic it has for + // tracking data versions and event numbers so we minimize the amount of data we + // request on resubscribes, so tell it not to store data. + auto clusterStateCache = std::make_unique(*callback.get(), + /* highestReceivedEventNumber = */ NullOptional, + /* cacheData = */ false); + auto readClient = std::make_unique(InteractionModelEngine::GetInstance(), exchangeManager, + clusterStateCache->GetBufferedCallback(), ReadClient::InteractionType::Subscribe); + + // Subscribe with data version filter list and retry with smaller list if out of packet space + CHIP_ERROR err; + NSDictionary * dataVersions = [self _getCachedDataVersions]; + size_t dataVersionFilterListSizeReduction = 0; + for (;;) { + // Wildcard endpoint, cluster, attribute, event. + auto attributePath = std::make_unique(); + auto eventPath = std::make_unique(); + // We want to get event reports at the minInterval, not the maxInterval. + eventPath->mIsUrgentEvent = true; + ReadPrepareParams readParams(session.Value()); + + readParams.mMinIntervalFloorSeconds = 0; + // Select a max interval based on the device's claimed idle sleep interval. + auto idleSleepInterval = std::chrono::duration_cast( + session.Value()->GetRemoteMRPConfig().mIdleRetransTimeout); + + auto maxIntervalCeilingMin = System::Clock::Seconds32(MTR_DEVICE_SUBSCRIPTION_MAX_INTERVAL_MIN); + if (idleSleepInterval < maxIntervalCeilingMin) { + idleSleepInterval = maxIntervalCeilingMin; + } + + auto maxIntervalCeilingMax = System::Clock::Seconds32(MTR_DEVICE_SUBSCRIPTION_MAX_INTERVAL_MAX); + if (idleSleepInterval > maxIntervalCeilingMax) { + idleSleepInterval = maxIntervalCeilingMax; + } #ifdef DEBUG - if (maxIntervalOverride.HasValue()) { - idleSleepInterval = maxIntervalOverride.Value(); - } + if (maxIntervalOverride.HasValue()) { + idleSleepInterval = maxIntervalOverride.Value(); + } #endif - readParams.mMaxIntervalCeilingSeconds = static_cast(idleSleepInterval.count()); - - readParams.mpAttributePathParamsList = attributePath.get(); - readParams.mAttributePathParamsListSize = 1; - readParams.mpEventPathParamsList = eventPath.get(); - readParams.mEventPathParamsListSize = 1; - readParams.mKeepSubscriptions = true; - readParams.mIsFabricFiltered = false; - size_t dataVersionFilterListSize = 0; - DataVersionFilter * dataVersionFilterList; - [self _createDataVersionFilterListFromDictionary:dataVersions dataVersionFilterList:&dataVersionFilterList count:&dataVersionFilterListSize sizeReduction:dataVersionFilterListSizeReduction]; - readParams.mDataVersionFilterListSize = dataVersionFilterListSize; - readParams.mpDataVersionFilterList = dataVersionFilterList; - attributePath.release(); - eventPath.release(); - - // TODO: Change from local filter list generation to rehydrating ClusterStateCache ot take advantage of existing filter list sorting algorithm - - // SendAutoResubscribeRequest cleans up the params, even on failure. - err = readClient->SendAutoResubscribeRequest(std::move(readParams)); - if (err == CHIP_NO_ERROR) { - break; - } - - // If error is not a "no memory" issue, then break and go through regular resubscribe logic - if (err != CHIP_ERROR_NO_MEMORY) { - break; - } - - // If "no memory" error is not caused by data version filter list, break as well - if (!dataVersionFilterListSize) { - break; - } - - // Now "no memory" could mean subscribe request packet space ran out. Reduce size and try again immediately - dataVersionFilterListSizeReduction++; - } - - if (err != CHIP_NO_ERROR) { - NSError * error = [MTRError errorForCHIPErrorCode:err logContext:self]; - MTR_LOG_ERROR("%@ SendAutoResubscribeRequest error %@", self, error); - dispatch_async(self.queue, ^{ - [self _handleSubscriptionError:error]; - [self _handleSubscriptionReset:nil]; - }); - - return; - } - - MTR_LOG("%@ Subscribe with data version list size %lu, reduced by %lu", self, static_cast(dataVersions.count), static_cast(dataVersionFilterListSizeReduction)); - - // Callback and ClusterStateCache and ReadClient will be deleted - // when OnDone is called. - os_unfair_lock_lock(&self->_lock); - self->_currentReadClient = readClient.get(); - self->_currentSubscriptionCallback = callback.get(); - os_unfair_lock_unlock(&self->_lock); - callback->AdoptReadClient(std::move(readClient)); - callback->AdoptClusterStateCache(std::move(clusterStateCache)); - callback.release(); - }]; + readParams.mMaxIntervalCeilingSeconds = static_cast(idleSleepInterval.count()); + + readParams.mpAttributePathParamsList = attributePath.get(); + readParams.mAttributePathParamsListSize = 1; + readParams.mpEventPathParamsList = eventPath.get(); + readParams.mEventPathParamsListSize = 1; + readParams.mKeepSubscriptions = true; + readParams.mIsFabricFiltered = false; + size_t dataVersionFilterListSize = 0; + DataVersionFilter * dataVersionFilterList; + [self _createDataVersionFilterListFromDictionary:dataVersions dataVersionFilterList:&dataVersionFilterList count:&dataVersionFilterListSize sizeReduction:dataVersionFilterListSizeReduction]; + readParams.mDataVersionFilterListSize = dataVersionFilterListSize; + readParams.mpDataVersionFilterList = dataVersionFilterList; + attributePath.release(); + eventPath.release(); + + // TODO: Change from local filter list generation to rehydrating ClusterStateCache ot take advantage of existing filter list sorting algorithm + + // SendAutoResubscribeRequest cleans up the params, even on failure. + err = readClient->SendAutoResubscribeRequest(std::move(readParams)); + if (err == CHIP_NO_ERROR) { + break; + } + + // If error is not a "no memory" issue, then break and go through regular resubscribe logic + if (err != CHIP_ERROR_NO_MEMORY) { + break; + } + + // If "no memory" error is not caused by data version filter list, break as well + if (!dataVersionFilterListSize) { + break; + } + + // Now "no memory" could mean subscribe request packet space ran out. Reduce size and try again immediately + dataVersionFilterListSizeReduction++; + } + + if (err != CHIP_NO_ERROR) { + NSError * error = [MTRError errorForCHIPErrorCode:err logContext:self]; + MTR_LOG_ERROR("%@ SendAutoResubscribeRequest error %@", self, error); + dispatch_async(self.queue, ^{ + [self _handleSubscriptionError:error]; + [self _handleSubscriptionReset:nil]; + }); + + return; + } + + MTR_LOG("%@ Subscribe with data version list size %lu, reduced by %lu", self, static_cast(dataVersions.count), static_cast(dataVersionFilterListSizeReduction)); + + // Callback and ClusterStateCache and ReadClient will be deleted + // when OnDone is called. + os_unfair_lock_lock(&self->_lock); + self->_currentReadClient = readClient.get(); + self->_currentSubscriptionCallback = callback.get(); + os_unfair_lock_unlock(&self->_lock); + callback->AdoptReadClient(std::move(readClient)); + callback->AdoptClusterStateCache(std::move(clusterStateCache)); + callback.release(); + }]; // Set up connectivity monitoring in case network becomes routable after any part of the subscription process goes into backoff retries. [self _setupConnectivityMonitoring]; diff --git a/src/darwin/Framework/CHIP/MTRDeviceController.mm b/src/darwin/Framework/CHIP/MTRDeviceController.mm index 303cc2679e60d1..7dc79198297d72 100644 --- a/src/darwin/Framework/CHIP/MTRDeviceController.mm +++ b/src/darwin/Framework/CHIP/MTRDeviceController.mm @@ -1256,6 +1256,33 @@ - (BOOL)checkIsRunning:(NSError * __autoreleasing *)error } - (void)getSessionForNode:(chip::NodeId)nodeID completion:(MTRInternalDeviceConnectionCallback)completion +{ + // First check if MTRDevice exists from having loaded from storage, or created by a client. + // Do not use deviceForNodeID here, because we don't want to create the device if it does not already exist. + os_unfair_lock_lock(&_deviceMapLock); + MTRDevice * device = _nodeIDToDeviceMap[@(nodeID)]; + os_unfair_lock_unlock(&_deviceMapLock); + + // In the case that this device is known to use thread, queue this with subscription attempts as well, to + // help with throttling Thread traffic. + if (device && [device deviceUsesThread]) { + MTRAsyncWorkItem * workItem = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)]; + [workItem setReadyHandler:^(id _Nonnull context, NSInteger retryCount, MTRAsyncWorkCompletionBlock _Nonnull workItemCompletion) { + MTRInternalDeviceConnectionCallback completionWrapper = ^(chip::Messaging::ExchangeManager * _Nullable exchangeManager, + const chip::Optional & session, NSError * _Nullable error, NSNumber * _Nullable retryDelay) { + completion(exchangeManager, session, error, retryDelay); + workItemCompletion(MTRAsyncWorkComplete); + }; + [self directlyGetSessionForNode:nodeID completion:completionWrapper]; + }]; + + [_concurrentSubscriptionPool enqueueWorkItem:workItem descriptionWithFormat:@"device controller getSessionForNode nodeID: 0x%016llX", nodeID]; + } else { + [self directlyGetSessionForNode:nodeID completion:completion]; + } +} + +- (void)directlyGetSessionForNode:(chip::NodeId)nodeID completion:(MTRInternalDeviceConnectionCallback)completion { [self asyncGetCommissionerOnMatterQueue:^(chip::Controller::DeviceCommissioner * commissioner) { diff --git a/src/darwin/Framework/CHIP/MTRDeviceController_Internal.h b/src/darwin/Framework/CHIP/MTRDeviceController_Internal.h index 8fb61fba9bab80..925bbb284f3ab4 100644 --- a/src/darwin/Framework/CHIP/MTRDeviceController_Internal.h +++ b/src/darwin/Framework/CHIP/MTRDeviceController_Internal.h @@ -268,6 +268,13 @@ NS_ASSUME_NONNULL_BEGIN - (NSNumber * _Nullable)syncGetCompressedFabricID; +/** + * Since getSessionForNode now enqueues by the subscription pool for Thread + * devices, MTRDevice needs a direct non-queued access because it already + * makes use of the subscription pool. + */ +- (void)directlyGetSessionForNode:(chip::NodeId)nodeID completion:(MTRInternalDeviceConnectionCallback)completion; + @end NS_ASSUME_NONNULL_END diff --git a/src/darwin/Framework/CHIP/MTRDevice_Internal.h b/src/darwin/Framework/CHIP/MTRDevice_Internal.h index 2ad2f6422dd9e7..1416aa29dfac29 100644 --- a/src/darwin/Framework/CHIP/MTRDevice_Internal.h +++ b/src/darwin/Framework/CHIP/MTRDevice_Internal.h @@ -117,6 +117,9 @@ MTR_TESTABLE - (void)setStorageBehaviorConfiguration:(MTRDeviceStorageBehaviorConfiguration *)storageBehaviorConfiguration; +// Returns whether this MTRDevice uses Thread for communication +- (BOOL)deviceUsesThread; + @end #pragma mark - Utility for clamping numbers diff --git a/src/darwin/Framework/CHIPTests/MTRPerControllerStorageTests.m b/src/darwin/Framework/CHIPTests/MTRPerControllerStorageTests.m index 8b7f50dedb828b..ced79aadb78b82 100644 --- a/src/darwin/Framework/CHIPTests/MTRPerControllerStorageTests.m +++ b/src/darwin/Framework/CHIPTests/MTRPerControllerStorageTests.m @@ -2307,6 +2307,24 @@ - (void)doTestSubscriptionPoolWithSize:(NSInteger)subscriptionPoolSize deviceOnb [self commissionWithController:controller newNodeID:deviceID onboardingPayload:deviceOnboardingPayloads[deviceID]]; } + // Shutdown and restart, to reset all existing sessions, so that the subscriptions and base device usage start after + [controller shutdown]; + XCTAssertFalse([controller isRunning]); + + controller = [self startControllerWithRootKeys:rootKeys + operationalKeys:operationalKeys + fabricID:fabricID + nodeID:nodeID + storage:storageDelegate + error:&error + certificateIssuer:&certificateIssuer + concurrentSubscriptionPoolSize:subscriptionPoolSize]; + XCTAssertNil(error); + XCTAssertNotNil(controller); + XCTAssertTrue([controller isRunning]); + + XCTAssertEqualObjects(controller.controllerNodeID, nodeID); + // Set up expectations and delegates NSDictionary * subscriptionExpectations = @{ @@ -2329,6 +2347,7 @@ - (void)doTestSubscriptionPoolWithSize:(NSInteger)subscriptionPoolSize deviceOnb __block os_unfair_lock counterLock = OS_UNFAIR_LOCK_INIT; __block NSUInteger subscriptionRunningCount = 0; __block NSUInteger subscriptionDequeueCount = 0; + __block BOOL baseDeviceReadCompleted = NO; for (NSNumber * deviceID in orderedDeviceIDs) { MTRDeviceTestDelegate * delegate = deviceDelegates[deviceID]; @@ -2347,6 +2366,14 @@ - (void)doTestSubscriptionPoolWithSize:(NSInteger)subscriptionPoolSize deviceOnb // Stop counting subscribing right before calling work item completion os_unfair_lock_lock(&counterLock); subscriptionRunningCount--; + + // Given the base device read is happening on the 5th device, at the completion + // time of the first [pool size] subscriptions, the BaseDevice's request to + // read can't have completed, as it should be gated on its call to the + // MTRDeviceController's getSessionForNode: call. + if (subscriptionDequeueCount <= (orderedDeviceIDs.count - subscriptionPoolSize)) { + XCTAssertFalse(baseDeviceReadCompleted); + } os_unfair_lock_unlock(&counterLock); }; __weak __auto_type weakDelegate = delegate; @@ -2363,8 +2390,26 @@ - (void)doTestSubscriptionPoolWithSize:(NSInteger)subscriptionPoolSize deviceOnb [device setDelegate:deviceDelegates[deviceID] queue:queue]; } + // Create the base device to attempt to read from the 5th device + __auto_type * baseDeviceReadExpectation = [self expectationWithDescription:@"BaseDevice read"]; + // Dispatch async to get around XCTest, so that this runs after the above devices queue their subscriptions + dispatch_async(queue, ^{ + __auto_type * baseDevice = [MTRBaseDevice deviceWithNodeID:@(105) controller:controller]; + __auto_type * onOffCluster = [[MTRBaseClusterOnOff alloc] initWithDevice:baseDevice endpointID:@(1) queue:queue]; + [onOffCluster readAttributeOnOffWithCompletion:^(NSNumber * value, NSError * _Nullable error) { + XCTAssertNil(error); + // We expect the device to be off. + XCTAssertEqualObjects(value, @(0)); + [baseDeviceReadExpectation fulfill]; + os_unfair_lock_lock(&counterLock); + baseDeviceReadCompleted = YES; + os_unfair_lock_unlock(&counterLock); + }]; + }); + // Make the wait time depend on pool size and device count (can expand number of devices in the future) - [self waitForExpectations:subscriptionExpectations.allValues timeout:(kSubscriptionPoolBaseTimeoutInSeconds * orderedDeviceIDs.count / subscriptionPoolSize)]; + NSArray * expectationsToWait = [subscriptionExpectations.allValues arrayByAddingObject:baseDeviceReadExpectation]; + [self waitForExpectations:expectationsToWait timeout:(kSubscriptionPoolBaseTimeoutInSeconds * orderedDeviceIDs.count / subscriptionPoolSize)]; XCTAssertEqual(subscriptionDequeueCount, orderedDeviceIDs.count);