Skip to content

Commit

Permalink
Stop doing read-throughs on MTRDevice when subscription is being esta…
Browse files Browse the repository at this point in the history
…blished. (project-chip#32990)

* Stop doing read-throughs on MTRDevice when subscription is being established.

The key goal is to stop doing read-throughs when we expect to start getting data
soon: between sending the Sigma1 for our initial subscribe and getting the first
priming read data packets.  Any reads dispatched during that time "because we
don't have a subscription yet" are just wasted network traffic, because chances
are we're about to have a subscription and get that data as part of the priming
read anyway.

The main fix is just to stop doing read-throughs altogether for any attributes
known to not be C and if we have a subscription delegate.  In that state we
should be subscribed and should be getting updates for attribute values, and
there's no reason to read through.

The rest of the changes are related to a side-effect of the stopping of
read-throughs: if we had a subscription drop and have backed off quite a bit, a
read-through that succeeded could prod us to retry subscribing.  We don't want
to do a read just to (maybe) trigger that behavior, though.  Instead, treat a
skipped read-through as a direct signal to retry our subscription attempt.  But
not too often.

This "try to resubscribe" heuristic can probably use some tweaking.

* Fix XPC test failure.
  • Loading branch information
bzbarsky-apple authored Apr 16, 2024
1 parent 0c17072 commit 4aadee7
Showing 1 changed file with 120 additions and 10 deletions.
130 changes: 120 additions & 10 deletions src/darwin/Framework/CHIP/MTRDevice.mm
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#import "MTRCommandTimedCheck.h"
#import "MTRConversion.h"
#import "MTRDefines_Internal.h"
#import "MTRDeviceControllerOverXPC.h"
#import "MTRDeviceController_Internal.h"
#import "MTRDevice_Internal.h"
#import "MTRError_Internal.h"
Expand All @@ -45,6 +46,7 @@
#include <app/BufferedReadCallback.h>
#include <app/ClusterStateCache.h>
#include <app/InteractionModelEngine.h>
#include <platform/LockTracker.h>
#include <platform/PlatformManager.h>

typedef void (^MTRDeviceAttributeReportHandler)(NSArray * _Nonnull);
Expand Down Expand Up @@ -260,6 +262,13 @@ - (BOOL)isEqual:(id)object

@end

// Minimal time to wait since our last resubscribe failure before we will allow
// a read attempt to prod our subscription.
//
// TODO: Figure out a better value for this, but for now don't allow this to
// happen more often than once every 10 minutes.
#define MTRDEVICE_MIN_RESUBSCRIBE_DUE_TO_READ_INTERVAL_SECONDS (10 * 60)

@interface MTRDevice ()
@property (nonatomic, readonly) os_unfair_lock lock; // protects the caches and device state
// protects against concurrent time updates by guarding timeUpdateScheduled flag which manages time updates scheduling,
Expand Down Expand Up @@ -340,6 +349,11 @@ @implementation MTRDevice {
// as the read cache, should testing prove attribute storage by cluster is the better solution.
NSMutableDictionary<MTRClusterPath *, MTRDeviceClusterData *> * _clusterData;
NSMutableSet<MTRClusterPath *> * _clustersToPersist;

// When we last failed to subscribe to the device (either via
// _setupSubscription or via the auto-resubscribe behavior of the
// ReadClient). Nil if we have had no such failures.
NSDate * _Nullable _lastSubscriptionFailureTime;
}

- (instancetype)initWithNodeID:(NSNumber *)nodeID controller:(MTRDeviceController *)controller
Expand Down Expand Up @@ -659,13 +673,26 @@ - (void)invalidate

- (void)nodeMayBeAdvertisingOperational
{
assertChipStackLockedByCurrentThread();

MTR_LOG_DEFAULT("%@ saw new operational advertisement", self);

[self _triggerResubscribeWithReason:"operational advertisement seen"
nodeLikelyReachable:YES];
}

// Trigger a resubscribe as needed. nodeLikelyReachable should be YES if we
// have reason to suspect the node is now reachable, NO if we have no idea
// whether it might be.
- (void)_triggerResubscribeWithReason:(const char *)reason nodeLikelyReachable:(BOOL)nodeLikelyReachable
{
assertChipStackLockedByCurrentThread();

// We might want to trigger a resubscribe on our existing ReadClient. Do
// that outside the scope of our lock, so we're not calling arbitrary code
// we don't control with the lock held. This is safe, because when
// nodeMayBeAdvertisingOperational is called we are running on the Matter
// queue, and the ReadClient can't get destroyed while we are on that queue.
// we don't control with the lock held. This is safe, because we are
// running on he Matter queue and the ReadClient can't get destroyed while
// we are on that queue.
ReadClient * readClientToResubscribe = nullptr;
SubscriptionCallback * subscriptionCallback = nullptr;

Expand All @@ -684,17 +711,85 @@ - (void)nodeMayBeAdvertisingOperational
os_unfair_lock_unlock(&self->_lock);

if (readClientToResubscribe) {
subscriptionCallback->ResetResubscriptionBackoff();
readClientToResubscribe->TriggerResubscribeIfScheduled("operational advertisement seen");
if (nodeLikelyReachable) {
// If we have reason to suspect the node is now reachable, reset the
// backoff timer, so that if this attempt fails we'll try again
// quickly; it's possible we'll just catch the node at a bad time
// here (e.g. still booting up), but should try again reasonably quickly.
subscriptionCallback->ResetResubscriptionBackoff();
}
readClientToResubscribe->TriggerResubscribeIfScheduled(reason);
}
}

// Return YES if there's a valid delegate AND subscription is expected to report value
// Return YES if we are in a state where, apart from communication issues with
// the device, we will be able to get reports via our subscription.
- (BOOL)_subscriptionAbleToReport
{
std::lock_guard lock(_lock);
id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
return (delegate != nil) && (_state == MTRDeviceStateReachable);
if (delegate == nil) {
// No delegate definitely means no subscription.
return NO;
}

// For unit testing only, matching logic in setDelegate
#ifdef DEBUG
id testDelegate = delegate;
if ([testDelegate respondsToSelector:@selector(unitTestShouldSetUpSubscriptionForDevice:)]) {
if (![testDelegate unitTestShouldSetUpSubscriptionForDevice:self]) {
return NO;
}
}
#endif

// Unfortunately, we currently have no subscriptions over our hacked-up XPC
// setup. Try to detect that situation.
if ([_deviceController.class respondsToSelector:@selector(sharedControllerWithID:xpcConnectBlock:)]) {
return NO;
}

return YES;
}

// Notification that read-through was skipped for an attribute read.
- (void)_readThroughSkipped
{
std::lock_guard lock(_lock);
if (_state == MTRDeviceStateReachable) {
// We're getting reports from the device, so there's nothing else to be
// done here. We could skip this check, because our "try to
// resubscribe" code would be a no-op in this case, but then we'd have
// an extra dispatch in the common case of read-while-subscribed, which
// is not great for peformance.
return;
}

if (_lastSubscriptionFailureTime == nil) {
// No need to try to do anything here, because we have never failed a
// subscription attempt (so we might be in the middle of one now, and no
// need to prod things along).
return;
}

if ([[NSDate now] timeIntervalSinceDate:_lastSubscriptionFailureTime] < MTRDEVICE_MIN_RESUBSCRIBE_DUE_TO_READ_INTERVAL_SECONDS) {
// Not enough time has passed since we last tried. Don't create extra
// network traffic.
//
// TODO: Do we need to worry about this being too spammy in the log if
// we keep getting reads while not subscribed? We could add another
// backoff timer or counter for the log line...
MTR_LOG_DEBUG("%@ skipping resubscribe from skipped read-through: not enough time has passed since %@", self, _lastSubscriptionFailureTime);
return;
}

// Do the remaining work on the Matter queue, because we may want to touch
// ReadClient in there. If the dispatch fails, that's fine; it means our
// controller has shut down, so nothing to be done.
[_deviceController asyncDispatchToMatterQueue:^{
[self _triggerResubscribeWithReason:"read-through skipped while not subscribed" nodeLikelyReachable:NO];
}
errorHandler:nil];
}

- (BOOL)_callDelegateWithBlock:(void (^)(id<MTRDeviceDelegate>))block
Expand Down Expand Up @@ -791,15 +886,29 @@ - (void)_handleResubscriptionNeeded
std::lock_guard lock(_lock);

[self _changeState:MTRDeviceStateUnknown];

// If we are here, then the ReadClient either just detected a subscription
// drop or just tried again and failed. Either way, count it as "tried and
// failed to subscribe": in the latter case it's actually true, and in the
// former case we recently had a subscription and do not want to be forcing
// retries immediately.
_lastSubscriptionFailureTime = [NSDate now];
}

- (void)_handleSubscriptionReset:(NSNumber * _Nullable)retryDelay
{
std::lock_guard lock(_lock);

// If we are here, then either we failed to establish initil CASE, or we
// failed to send the initial SubscribeRequest message, or our ReadClient
// has given up completely. Those all count as "we have tried and failed to
// subscribe".
_lastSubscriptionFailureTime = [NSDate now];

// if there is no delegate then also do not retry
id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
if (!delegate) {
// NOTE: Do not log anythig here: we have been invalidated, and the
// NOTE: Do not log anything here: we have been invalidated, and the
// Matter stack might already be torn down.
return;
}
Expand Down Expand Up @@ -1222,15 +1331,14 @@ - (void)_setupSubscription
// Drop our pointer to the ReadClient immediately, since
// it's about to be destroyed and we don't want to be
// holding a dangling pointer.
os_unfair_lock_lock(&self->_lock);
std::lock_guard lock(self->_lock);
self->_currentReadClient = nullptr;
self->_currentSubscriptionCallback = nullptr;

dispatch_async(self.queue, ^{
// OnDone
[self _handleSubscriptionReset:nil];
});
os_unfair_lock_unlock(&self->_lock);
},
^(void) {
MTR_LOG_DEFAULT("%@ got unsolicited message from publisher", self);
Expand Down Expand Up @@ -1624,6 +1732,8 @@ static BOOL AttributeHasChangesOmittedQuality(MTRAttributePath * attributePath)
}];
}];
[_asyncWorkQueue enqueueWorkItem:workItem descriptionWithFormat:@"read %@ 0x%llx 0x%llx", endpointID, clusterID.unsignedLongLongValue, attributeID.unsignedLongLongValue];
} else {
[self _readThroughSkipped];
}

return attributeValueToReturn;
Expand Down

0 comments on commit 4aadee7

Please sign in to comment.