Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better fix for crashes around MTRBaseSubscriptionCallback. #23076

Merged
merged 1 commit into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 117 additions & 77 deletions src/darwin/Framework/CHIP/MTRBaseDevice.mm
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,11 @@ - (void)invalidateCASESession

class SubscriptionCallback final : public MTRBaseSubscriptionCallback {
public:
SubscriptionCallback(dispatch_queue_t queue, DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback,
SubscriptionCallback(DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback,
ErrorCallback errorCallback, MTRDeviceResubscriptionScheduledHandler _Nullable resubscriptionScheduledHandler,
MTRSubscriptionEstablishedHandler _Nullable subscriptionEstablishedHandler, OnDoneHandler _Nullable onDoneHandler)
: MTRBaseSubscriptionCallback(queue, attributeReportCallback, eventReportCallback, errorCallback,
resubscriptionScheduledHandler, subscriptionEstablishedHandler, onDoneHandler)
: MTRBaseSubscriptionCallback(attributeReportCallback, eventReportCallback, errorCallback, resubscriptionScheduledHandler,
subscriptionEstablishedHandler, onDoneHandler)
{
}

Expand Down Expand Up @@ -286,80 +286,120 @@ - (void)subscribeWithQueue:(dispatch_queue_t)queue
// Copy params before going async.
params = [params copy];

[self.deviceController
getSessionForNode:self.nodeID
completion:^(ExchangeManager * _Nullable exchangeManager, const Optional<SessionHandle> & session,
NSError * _Nullable error) {
if (error != nil) {
dispatch_async(queue, ^{
errorHandler(error);
});
return;
}

// Wildcard endpoint, cluster, attribute, event.
auto attributePath = std::make_unique<AttributePathParams>();
auto eventPath = std::make_unique<EventPathParams>();
ReadPrepareParams readParams(session.Value());
readParams.mMinIntervalFloorSeconds = [params.minInterval unsignedShortValue];
readParams.mMaxIntervalCeilingSeconds = [params.maxInterval unsignedShortValue];
readParams.mpAttributePathParamsList = attributePath.get();
readParams.mAttributePathParamsListSize = 1;
readParams.mpEventPathParamsList = eventPath.get();
readParams.mEventPathParamsListSize = 1;
readParams.mIsFabricFiltered = params.fabricFiltered;
readParams.mKeepSubscriptions = params.keepPreviousSubscriptions;

std::unique_ptr<SubscriptionCallback> callback;
std::unique_ptr<ReadClient> readClient;
std::unique_ptr<ClusterStateCache> clusterStateCache;
if (clusterStateCacheContainer) {
__weak MTRClusterStateCacheContainer * weakPtr = clusterStateCacheContainer;
callback = std::make_unique<SubscriptionCallback>(queue, attributeReportHandler, eventReportHandler,
errorHandler, resubscriptionScheduled, subscriptionEstablished, ^{
MTRClusterStateCacheContainer * container = weakPtr;
if (container) {
container.cppClusterStateCache = nullptr;
}
});
clusterStateCache = std::make_unique<ClusterStateCache>(*callback.get());
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), exchangeManager,
clusterStateCache->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
} else {
callback = std::make_unique<SubscriptionCallback>(queue, attributeReportHandler, eventReportHandler,
errorHandler, resubscriptionScheduled, subscriptionEstablished, nil);
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), exchangeManager,
callback->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
}

CHIP_ERROR err;
if (!params.autoResubscribe) {
err = readClient->SendRequest(readParams);
} else {
// SendAutoResubscribeRequest cleans up the params, even on failure.
attributePath.release();
eventPath.release();
err = readClient->SendAutoResubscribeRequest(std::move(readParams));
}

if (err != CHIP_NO_ERROR) {
dispatch_async(queue, ^{
errorHandler([MTRError errorForCHIPErrorCode:err]);
});

return;
}

if (clusterStateCacheContainer) {
clusterStateCacheContainer.cppClusterStateCache = clusterStateCache.get();
// ClusterStateCache will be deleted when OnDone is called or an error is encountered as well.
callback->AdoptClusterStateCache(std::move(clusterStateCache));
}
// Callback and ReadClient will be deleted when OnDone is called or an error is
// encountered.
callback->AdoptReadClient(std::move(readClient));
callback.release();
}];
[self.deviceController getSessionForNode:self.nodeID
completion:^(ExchangeManager * _Nullable exchangeManager, const Optional<SessionHandle> & session,
NSError * _Nullable error) {
if (error != nil) {
dispatch_async(queue, ^{
errorHandler(error);
});
return;
}

// Wildcard endpoint, cluster, attribute, event.
auto attributePath = std::make_unique<AttributePathParams>();
auto eventPath = std::make_unique<EventPathParams>();
ReadPrepareParams readParams(session.Value());
readParams.mMinIntervalFloorSeconds = [params.minInterval unsignedShortValue];
readParams.mMaxIntervalCeilingSeconds = [params.maxInterval unsignedShortValue];
readParams.mpAttributePathParamsList = attributePath.get();
readParams.mAttributePathParamsListSize = 1;
readParams.mpEventPathParamsList = eventPath.get();
readParams.mEventPathParamsListSize = 1;
readParams.mIsFabricFiltered = params.fabricFiltered;
readParams.mKeepSubscriptions = params.keepPreviousSubscriptions;

std::unique_ptr<ClusterStateCache> clusterStateCache;
ReadClient::Callback * callbackForReadClient = nullptr;
OnDoneHandler onDoneHandler = nil;

if (clusterStateCacheContainer) {
__weak MTRClusterStateCacheContainer * weakPtr = clusterStateCacheContainer;
onDoneHandler = ^{
// This, like all manipulation of cppClusterStateCache, needs to run on the Matter
// queue.
MTRClusterStateCacheContainer * container = weakPtr;
if (container) {
container.cppClusterStateCache = nullptr;
}
};
}

auto callback = std::make_unique<SubscriptionCallback>(
^(NSArray * value) {
dispatch_async(queue, ^{
if (attributeReportHandler != nil) {
attributeReportHandler(value);
}
});
},
^(NSArray * value) {
dispatch_async(queue, ^{
if (eventReportHandler != nil) {
eventReportHandler(value);
}
});
},
^(NSError * error) {
dispatch_async(queue, ^{
errorHandler(error);
});
},
^(NSError * error, NSNumber * resubscriptionDelay) {
dispatch_async(queue, ^{
if (resubscriptionScheduled != nil) {
resubscriptionScheduled(error, resubscriptionDelay);
}
});
},
^(void) {
dispatch_async(queue, ^{
if (subscriptionEstablished != nil) {
subscriptionEstablished();
}
});
},
onDoneHandler);

if (clusterStateCacheContainer) {
clusterStateCache = std::make_unique<ClusterStateCache>(*callback.get());
callbackForReadClient = &clusterStateCache->GetBufferedCallback();
} else {
callbackForReadClient = &callback->GetBufferedCallback();
}

auto readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(),
exchangeManager, *callbackForReadClient, ReadClient::InteractionType::Subscribe);

CHIP_ERROR err;
if (!params.autoResubscribe) {
err = readClient->SendRequest(readParams);
} else {
// SendAutoResubscribeRequest cleans up the params, even on failure.
attributePath.release();
eventPath.release();
err = readClient->SendAutoResubscribeRequest(std::move(readParams));
}

if (err != CHIP_NO_ERROR) {
dispatch_async(queue, ^{
errorHandler([MTRError errorForCHIPErrorCode:err]);
});

return;
}

if (clusterStateCacheContainer) {
clusterStateCacheContainer.cppClusterStateCache = clusterStateCache.get();
// ClusterStateCache will be deleted when OnDone is called or an error is encountered as
// well.
callback->AdoptClusterStateCache(std::move(clusterStateCache));
}
// Callback and ReadClient will be deleted when OnDone is called or an error is
// encountered.
callback->AdoptReadClient(std::move(readClient));
callback.release();
}];
}

// Convert TLV data into data-value dictionary as described in MTRDeviceResponseHandler
Expand Down
30 changes: 19 additions & 11 deletions src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,16 @@
/**
* This file defines a base class for subscription callbacks used by
* MTRBaseDevice and MTRDevice. This base class handles everything except the
* actual conversion from the incoming data to the desired data.
* actual conversion from the incoming data to the desired data and the dispatch
* of callbacks to the relevant client queues. Its callbacks are called on the
* Matter queue. This allows MTRDevice and MTRBaseDevice to do any necessary
* sync cleanup work before dispatching to the client callbacks on the client
* queue.
*
* After onDoneHandler is invoked, this object will at some point delete itself
* and destroy anything it owns (such as the ReadClient or the
* ClusterStateCache). Consumers should drop references to all the relevant
* objects in that handler. This deletion will happen on the Matter queue.
*
* The desired data is assumed to be NSObjects that can be stored in NSArray.
*/
Expand All @@ -49,12 +58,10 @@ typedef void (^OnDoneHandler)(void);

class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callback {
public:
MTRBaseSubscriptionCallback(dispatch_queue_t queue, DataReportCallback attributeReportCallback,
DataReportCallback eventReportCallback, ErrorCallback errorCallback,
MTRDeviceResubscriptionScheduledHandler _Nullable resubscriptionCallback,
MTRBaseSubscriptionCallback(DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback,
ErrorCallback errorCallback, MTRDeviceResubscriptionScheduledHandler _Nullable resubscriptionCallback,
SubscriptionEstablishedHandler _Nullable subscriptionEstablishedHandler, OnDoneHandler _Nullable onDoneHandler)
: mQueue(queue)
, mAttributeReportCallback(attributeReportCallback)
: mAttributeReportCallback(attributeReportCallback)
, mEventReportCallback(eventReportCallback)
, mErrorCallback(errorCallback)
, mResubscriptionCallback(resubscriptionCallback)
Expand Down Expand Up @@ -117,10 +124,9 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac
NSMutableArray * _Nullable mEventReports = nil;

private:
dispatch_queue_t mQueue;
DataReportCallback _Nullable mAttributeReportCallback = nil;
DataReportCallback _Nullable mEventReportCallback = nil;
// We set mErrorCallback to nil when queueing error reports, so we
// We set mErrorCallback to nil before calling the error callback, so we
// make sure to only report one error.
ErrorCallback _Nullable mErrorCallback = nil;
MTRDeviceResubscriptionScheduledHandler _Nullable mResubscriptionCallback = nil;
Expand All @@ -138,9 +144,11 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac
// To handle this, enforce the following rules:
//
// 1) We guarantee that mErrorCallback is only invoked with an error once.
// 2) We ensure that we delete ourselves and the passed in ReadClient only from OnDone or a queued-up
// error callback, but not both, by tracking whether we have a queued-up
// deletion.
// 2) We guarantee that mOnDoneHandler is only invoked once, and always
// invoked before we delete ourselves.
// 3) We ensure that we delete ourselves and the passed in ReadClient only
// from OnDone or from an error callback but not both, by tracking whether
// we have a queued-up deletion.
std::unique_ptr<chip::app::ReadClient> mReadClient;
std::unique_ptr<chip::app::ClusterStateCache> mClusterStateCache;
bool mHaveQueuedDeletion = false;
Expand Down
44 changes: 15 additions & 29 deletions src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,17 @@
{
__block NSArray * attributeReports = mAttributeReports;
mAttributeReports = nil;
__block auto attributeCallback = mAttributeReportCallback;
auto attributeCallback = mAttributeReportCallback;

__block NSArray * eventReports = mEventReports;
mEventReports = nil;
__block auto eventCallback = mEventReportCallback;
auto eventCallback = mEventReportCallback;

if (attributeCallback != nil && attributeReports.count) {
dispatch_async(mQueue, ^{
attributeCallback(attributeReports);
});
attributeCallback(attributeReports);
}
if (eventCallback != nil && eventReports.count) {
dispatch_async(mQueue, ^{
eventCallback(eventReports);
});
eventCallback(eventReports);
}
}

Expand Down Expand Up @@ -96,7 +92,8 @@
void MTRBaseSubscriptionCallback::OnSubscriptionEstablished(SubscriptionId aSubscriptionId)
{
if (mSubscriptionEstablishedHandler) {
dispatch_async(mQueue, mSubscriptionEstablishedHandler);
auto subscriptionEstablishedHandler = mSubscriptionEstablishedHandler;
subscriptionEstablishedHandler();
}
}

Expand All @@ -109,9 +106,7 @@
auto callback = mResubscriptionCallback;
auto error = [MTRError errorForCHIPErrorCode:aTerminationCause];
auto delayMs = @(apReadClient->ComputeTimeTillNextSubscription());
dispatch_async(mQueue, ^{
callback(error, delayMs);
});
callback(error, delayMs);
}
return CHIP_NO_ERROR;
}
Expand All @@ -129,30 +124,21 @@
return;
}

__block ErrorCallback callback = mErrorCallback;
__block auto * myself = this;

auto errorCallback = mErrorCallback;
mErrorCallback = nil;
mAttributeReportCallback = nil;
mEventReportCallback = nil;
__auto_type onDoneHandler = mOnDoneHandler;
auto onDoneHandler = mOnDoneHandler;
mOnDoneHandler = nil;
dispatch_async(mQueue, ^{
callback(err);
});

errorCallback(err);
if (onDoneHandler) {
// To guarantee the async onDoneHandler call is made before
// deletion, so that clean up can happen while the callback
// object is still alive (and therefore cluster cache), queue
// deletion after calling the onDoneHandler
mHaveQueuedDeletion = true;
dispatch_async(mQueue, ^{
onDoneHandler();
dispatch_async(DeviceLayer::PlatformMgrImpl().GetWorkQueue(), ^{
delete myself;
});
});
} else if (aCancelSubscription) {
onDoneHandler();
}

if (aCancelSubscription) {
// We can't synchronously delete ourselves, because we're inside one of
// the ReadClient callbacks and we need to outlive the callback's
// execution. Queue an async deletion on the Matter queue (where we are
Expand Down
Loading