Skip to content

Commit

Permalink
Ensure that all MTRDevice state/internalState changes happen on the M…
Browse files Browse the repository at this point in the history
…atter queue.

This avoids races where we queue blocks to different queues that both try to
change the state, which were resulting in non-deterministic final state.

Fixes project-chip#34796
  • Loading branch information
bzbarsky-apple committed Sep 9, 2024
1 parent 7d57f07 commit e99e8e5
Showing 1 changed file with 87 additions and 42 deletions.
129 changes: 87 additions & 42 deletions src/darwin/Framework/CHIP/MTRDevice_Concrete.mm
Original file line number Diff line number Diff line change
Expand Up @@ -757,11 +757,16 @@ - (void)_ensureSubscriptionForExistingDelegates:(NSString *)reason
if ([self _deviceUsesThread]) {
MTR_LOG(" => %@ - device is a thread device, scheduling in pool", self);
[self _scheduleSubscriptionPoolWork:^{
std::lock_guard lock(self->_lock);
[self _setupSubscriptionWithReason:[NSString stringWithFormat:@"%@ and scheduled subscription is happening", reason]];
[self->_deviceController asyncDispatchToMatterQueue:^{
std::lock_guard lock(self->_lock);
[self _setupSubscriptionWithReason:[NSString stringWithFormat:@"%@ and scheduled subscription is happening", reason]];
} errorHandler:nil];
} inNanoseconds:0 description:@"MTRDevice setDelegate first subscription"];
} else {
[self _setupSubscriptionWithReason:[NSString stringWithFormat:@"%@ and subscription is needed", reason]];
[_deviceController asyncDispatchToMatterQueue:^{
std::lock_guard lock(self->_lock);
[self _setupSubscriptionWithReason:[NSString stringWithFormat:@"%@ and subscription is needed", reason]];
} errorHandler:nil];
}
}

Expand Down Expand Up @@ -946,6 +951,15 @@ - (void)_callDelegateDeviceCachePrimed
// assume lock is held
- (void)_changeState:(MTRDeviceState)state
{
// We want to avoid situations where something changes our state and then an
// async block that was queued earlier in response to something changes it
// again, to a value that no longer makes sense. To avoid that:
//
// 1) All state changes happen on the Matter queue.
// 2) All state changes happen synchronously with the event that actually
// triggers the state change.
assertChipStackLockedByCurrentThread();

os_unfair_lock_assert_owner(&self->_lock);
MTRDeviceState lastState = _state;
_state = state;
Expand All @@ -970,6 +984,15 @@ - (void)_changeState:(MTRDeviceState)state

- (void)_changeInternalState:(MTRInternalDeviceState)state
{
// We want to avoid situations where something changes our state and then an
// async block that was queued earlier in response to something changes it
// again, to a value that no longer makes sense. To avoid that:
//
// 1) All state changes happen on the Matter queue.
// 2) All state changes happen synchronously with the event that actually
// triggers the state change.
assertChipStackLockedByCurrentThread();

os_unfair_lock_assert_owner(&self->_lock);
MTRInternalDeviceState lastState = _internalDeviceState;
_internalDeviceState = state;
Expand Down Expand Up @@ -1053,12 +1076,16 @@ - (void)_handleSubscriptionEstablished

- (void)_handleSubscriptionError:(NSError *)error
{
assertChipStackLockedByCurrentThread();

std::lock_guard lock(_lock);
[self _doHandleSubscriptionError:error];
}

- (void)_doHandleSubscriptionError:(NSError *)error
{
assertChipStackLockedByCurrentThread();

os_unfair_lock_assert_owner(&_lock);

[self _changeInternalState:MTRInternalDeviceStateUnsubscribed];
Expand Down Expand Up @@ -1174,13 +1201,23 @@ - (void)_scheduleSubscriptionPoolWork:(dispatch_block_t)workBlock inNanoseconds:

- (void)_handleResubscriptionNeededWithDelay:(NSNumber *)resubscriptionDelayMs
{
BOOL deviceUsesThread;
assertChipStackLockedByCurrentThread();

os_unfair_lock_lock(&self->_lock);
std::lock_guard lock(_lock);

// Change our state before going async.
[self _changeState:MTRDeviceStateUnknown];
[self _changeInternalState:MTRInternalDeviceStateResubscribing];

dispatch_async(self.queue, ^{
[self _handleResubscriptionNeededWithDelayOnDeviceQueue:resubscriptionDelayMs];
});
}

- (void)_handleResubscriptionNeededWithDelayOnDeviceQueue:(NSNumber *)resubscriptionDelayMs
{
os_unfair_lock_lock(&self->_lock);

// If we are here, then the ReadClient either just detected a subscription
// drop or just tried again and failed. Either way, count it as "tried and
// failed to subscribe": in the latter case it's actually true, and in the
Expand All @@ -1192,7 +1229,7 @@ - (void)_handleResubscriptionNeededWithDelay:(NSNumber *)resubscriptionDelayMs
_lastSubscriptionFailureTimeForDescription = _lastSubscriptionFailureTime;
}
[self _notifyDelegateOfPrivateInternalPropertiesChanges];
deviceUsesThread = [self _deviceUsesThread];
BOOL deviceUsesThread = [self _deviceUsesThread];

// If a previous resubscription failed, remove the item from the subscription pool.
[self _clearSubscriptionPoolWork];
Expand Down Expand Up @@ -1228,6 +1265,8 @@ - (void)_handleResubscriptionNeededWithDelay:(NSNumber *)resubscriptionDelayMs

- (void)_handleSubscriptionReset:(NSNumber * _Nullable)retryDelay
{
assertChipStackLockedByCurrentThread();

std::lock_guard lock(_lock);
[self _doHandleSubscriptionReset:retryDelay];
}
Expand All @@ -1247,6 +1286,8 @@ - (void)_setLastSubscriptionAttemptWait:(uint32_t)lastSubscriptionAttemptWait

- (void)_doHandleSubscriptionReset:(NSNumber * _Nullable)retryDelay
{
assertChipStackLockedByCurrentThread();

os_unfair_lock_assert_owner(&_lock);

if (_deviceController.isSuspended) {
Expand Down Expand Up @@ -1309,8 +1350,11 @@ - (void)_doHandleSubscriptionReset:(NSNumber * _Nullable)retryDelay
// Call _reattemptSubscriptionNowIfNeededWithReason when timer fires - if subscription is
// in a better state at that time this will be a no-op.
auto resubscriptionBlock = ^{
std::lock_guard lock(self->_lock);
[self _reattemptSubscriptionNowIfNeededWithReason:@"got subscription reset"];
[self->_deviceController asyncDispatchToMatterQueue:^{
std::lock_guard lock(self->_lock);
[self _reattemptSubscriptionNowIfNeededWithReason:@"got subscription reset"];
}
errorHandler:nil];
};

int64_t resubscriptionDelayNs = static_cast<int64_t>(secondsToWait * NSEC_PER_SEC);
Expand All @@ -1326,6 +1370,8 @@ - (void)_doHandleSubscriptionReset:(NSNumber * _Nullable)retryDelay

- (void)_reattemptSubscriptionNowIfNeededWithReason:(NSString *)reason
{
assertChipStackLockedByCurrentThread();

os_unfair_lock_assert_owner(&self->_lock);
if (!self.reattemptingSubscription) {
return;
Expand All @@ -1338,6 +1384,8 @@ - (void)_reattemptSubscriptionNowIfNeededWithReason:(NSString *)reason

- (void)_handleUnsolicitedMessageFromPublisher
{
assertChipStackLockedByCurrentThread();

std::lock_guard lock(_lock);

[self _changeState:MTRDeviceStateReachable];
Expand All @@ -1358,18 +1406,23 @@ - (void)_handleUnsolicitedMessageFromPublisher

- (void)_markDeviceAsUnreachableIfNeverSubscribed
{
os_unfair_lock_assert_owner(&self->_lock);
[_deviceController asyncDispatchToMatterQueue:^{
std::lock_guard lock(self->_lock);

if (HadSubscriptionEstablishedOnce(_internalDeviceState)) {
return;
}
if (HadSubscriptionEstablishedOnce(self->_internalDeviceState)) {
return;
}

MTR_LOG("%@ still not subscribed, marking the device as unreachable", self);
[self _changeState:MTRDeviceStateUnreachable];
MTR_LOG("%@ still not subscribed, marking the device as unreachable", self);
[self _changeState:MTRDeviceStateUnreachable];
}
errorHandler:nil];
}

- (void)_handleReportBegin
{
assertChipStackLockedByCurrentThread();

std::lock_guard lock(_lock);

_receivingReport = YES;
Expand Down Expand Up @@ -1807,11 +1860,14 @@ - (void)unitTestInjectEventReport:(NSArray<NSDictionary<NSString *, id> *> *)eve

- (void)unitTestInjectAttributeReport:(NSArray<NSDictionary<NSString *, id> *> *)attributeReport fromSubscription:(BOOL)isFromSubscription
{
dispatch_async(self.queue, ^{
[_deviceController asyncDispatchToMatterQueue:^{
[self _handleReportBegin];
[self _handleAttributeReport:attributeReport fromSubscription:isFromSubscription];
[self _handleReportEnd];
});
dispatch_async(self.queue, ^{
[self _handleAttributeReport:attributeReport fromSubscription:isFromSubscription];
[self _handleReportEnd];
});
}
errorHandler:nil];
}
#endif

Expand Down Expand Up @@ -2242,6 +2298,8 @@ - (void)unitTestResetSubscription
// assume lock is held
- (void)_setupSubscriptionWithReason:(NSString *)reason
{
assertChipStackLockedByCurrentThread();

os_unfair_lock_assert_owner(&self->_lock);

if (![self _subscriptionsAllowed]) {
Expand Down Expand Up @@ -2287,7 +2345,6 @@ - (void)_setupSubscriptionWithReason:(NSString *)reason
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, static_cast<int64_t>(kSecondsToWaitBeforeMarkingUnreachableAfterSettingUpSubscription) * static_cast<int64_t>(NSEC_PER_SEC)), self.queue, ^{
mtr_strongify(self);
if (self != nil) {
std::lock_guard lock(self->_lock);
[self _markDeviceAsUnreachableIfNeverSubscribed];
}
});
Expand All @@ -2305,10 +2362,8 @@ - (void)_setupSubscriptionWithReason:(NSString *)reason
NSNumber * _Nullable retryDelay) {
if (error != nil) {
MTR_LOG_ERROR("%@ getSessionForNode error %@", self, error);
dispatch_async(self.queue, ^{
[self _handleSubscriptionError:error];
[self _handleSubscriptionReset:retryDelay];
});
[self _handleSubscriptionError:error];
[self _handleSubscriptionReset:retryDelay];
return;
}

Expand All @@ -2332,17 +2387,13 @@ - (void)_setupSubscriptionWithReason:(NSString *)reason
},
^(NSError * error) {
MTR_LOG_ERROR("%@ got subscription error %@", self, error);
dispatch_async(self.queue, ^{
// OnError
[self _handleSubscriptionError:error];
});
// OnError
[self _handleSubscriptionError:error];
},
^(NSError * error, NSNumber * resubscriptionDelayMs) {
MTR_LOG_ERROR("%@ got resubscription error %@ delay %@", self, error, resubscriptionDelayMs);
dispatch_async(self.queue, ^{
// OnResubscriptionNeeded
[self _handleResubscriptionNeededWithDelay:resubscriptionDelayMs];
});
// OnResubscriptionNeeded
[self _handleResubscriptionNeededWithDelay:resubscriptionDelayMs];
},
^(void) {
MTR_LOG("%@ got subscription established", self);
Expand Down Expand Up @@ -2373,10 +2424,8 @@ - (void)_setupSubscriptionWithReason:(NSString *)reason
self->_currentReadClient = nullptr;
self->_currentSubscriptionCallback = nullptr;

dispatch_async(self.queue, ^{
// OnDone
[self _handleSubscriptionReset:nil];
});
// OnDone
[self _doHandleSubscriptionReset:nil];
},
^(void) {
MTR_LOG("%@ got unsolicited message from publisher", self);
Expand All @@ -2387,9 +2436,7 @@ - (void)_setupSubscriptionWithReason:(NSString *)reason
},
^(void) {
MTR_LOG("%@ got report begin", self);
dispatch_async(self.queue, ^{
[self _handleReportBegin];
});
[self _handleReportBegin];
},
^(void) {
MTR_LOG("%@ got report end", self);
Expand Down Expand Up @@ -2459,10 +2506,8 @@ - (void)_setupSubscriptionWithReason:(NSString *)reason
if (err != CHIP_NO_ERROR) {
NSError * error = [MTRError errorForCHIPErrorCode:err logContext:self];
MTR_LOG_ERROR("%@ SendAutoResubscribeRequest error %@", self, error);
dispatch_async(self.queue, ^{
[self _handleSubscriptionError:error];
[self _handleSubscriptionReset:nil];
});
[self _handleSubscriptionError:error];
[self _handleSubscriptionReset:nil];

return;
}
Expand Down

0 comments on commit e99e8e5

Please sign in to comment.