Skip to content

Commit

Permalink
Fix some threading issues on Darwin.
Browse files Browse the repository at this point in the history
There were two places where we were touching SDK data structures from
the wrong event queue.
  • Loading branch information
bzbarsky-apple committed Jul 1, 2022
1 parent c1d5431 commit 7071652
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,7 @@ class TestCommandBridge : public CHIPCommandBridge,
// just hand it right back to us without establishing a new CASE
// session.
if (GetDevice(identity) != nil) {
auto device = [GetDevice(identity) internalDevice];
if (device != nullptr) {
device->Disconnect();
}
[GetDevice(identity) disconnect];
mConnectedDevices[identity] = nil;
}

Expand Down
6 changes: 6 additions & 0 deletions src/darwin/Framework/CHIP/CHIPDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ extern NSString * const kCHIPArrayValueType;
- (instancetype)init NS_UNAVAILABLE;
+ (instancetype)new NS_UNAVAILABLE;

/**
* Disconnect the device, so an attempt to getConnectedDevice for this device id
* will have to create a new secure session.
*/
- (void)disconnect;

/**
* Subscribe to receive attribute reports for everything (all endpoints, all
* clusters, all attributes, all events) on the device.
Expand Down
142 changes: 77 additions & 65 deletions src/darwin/Framework/CHIP/CHIPDevice.mm
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,16 @@ - (instancetype)initWithDevice:(chip::DeviceProxy *)device
return _cppDevice;
}

- (void)disconnect
{
dispatch_sync(DeviceLayer::PlatformMgrImpl().GetWorkQueue(), ^{
DeviceProxy * device = [self internalDevice];
if (device != nullptr) {
device->Disconnect();
}
});
}

typedef void (^ReportCallback)(NSArray * _Nullable value, NSError * _Nullable error);
typedef void (^DataReportCallback)(NSArray * value);
typedef void (^ErrorCallback)(NSError * error);
Expand Down Expand Up @@ -371,76 +381,78 @@ - (void)subscribeWithQueue:(dispatch_queue_t)queue
errorHandler:(void (^)(NSError * error))errorHandler
subscriptionEstablished:(nullable void (^)(void))subscriptionEstablishedHandler
{
DeviceProxy * device = [self internalDevice];
if (!device) {
dispatch_async(queue, ^{
errorHandler([CHIPError errorForCHIPErrorCode:CHIP_ERROR_INCORRECT_STATE]);
});
return;
}

// Wildcard endpoint, cluster, attribute, event.
auto attributePath = std::make_unique<AttributePathParams>();
auto eventPath = std::make_unique<EventPathParams>();
ReadPrepareParams readParams(device->GetSecureSession().Value());
readParams.mMinIntervalFloorSeconds = minInterval;
readParams.mMaxIntervalCeilingSeconds = maxInterval;
readParams.mpAttributePathParamsList = attributePath.get();
readParams.mAttributePathParamsListSize = 1;
readParams.mpEventPathParamsList = eventPath.get();
readParams.mEventPathParamsListSize = 1;
readParams.mKeepSubscriptions
= (params != nil) && (params.keepPreviousSubscriptions != nil) && [params.keepPreviousSubscriptions boolValue];

std::unique_ptr<SubscriptionCallback> callback;
std::unique_ptr<ReadClient> readClient;
std::unique_ptr<ClusterStateCache> attributeCache;
if (attributeCacheContainer) {
__weak CHIPAttributeCacheContainer * weakPtr = attributeCacheContainer;
callback = std::make_unique<SubscriptionCallback>(
queue, attributeReportHandler, eventReportHandler, errorHandler, subscriptionEstablishedHandler, ^{
CHIPAttributeCacheContainer * container = weakPtr;
if (container) {
container.cppAttributeCache = nullptr;
}
dispatch_async(DeviceLayer::PlatformMgrImpl().GetWorkQueue(), ^{
DeviceProxy * device = [self internalDevice];
if (!device) {
dispatch_async(queue, ^{
errorHandler([CHIPError errorForCHIPErrorCode:CHIP_ERROR_INCORRECT_STATE]);
});
attributeCache = std::make_unique<ClusterStateCache>(*callback.get());
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), device->GetExchangeManager(),
attributeCache->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
} else {
callback = std::make_unique<SubscriptionCallback>(
queue, attributeReportHandler, eventReportHandler, errorHandler, subscriptionEstablishedHandler);
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), device->GetExchangeManager(),
callback->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
}
return;
}

CHIP_ERROR err;
if (params != nil && params.autoResubscribe != nil && ![params.autoResubscribe boolValue]) {
err = readClient->SendRequest(readParams);
} else {
// SendAutoResubscribeRequest cleans up the params, even on failure.
attributePath.release();
eventPath.release();
err = readClient->SendAutoResubscribeRequest(std::move(readParams));
}
// Wildcard endpoint, cluster, attribute, event.
auto attributePath = std::make_unique<AttributePathParams>();
auto eventPath = std::make_unique<EventPathParams>();
ReadPrepareParams readParams(device->GetSecureSession().Value());
readParams.mMinIntervalFloorSeconds = minInterval;
readParams.mMaxIntervalCeilingSeconds = maxInterval;
readParams.mpAttributePathParamsList = attributePath.get();
readParams.mAttributePathParamsListSize = 1;
readParams.mpEventPathParamsList = eventPath.get();
readParams.mEventPathParamsListSize = 1;
readParams.mKeepSubscriptions
= (params != nil) && (params.keepPreviousSubscriptions != nil) && [params.keepPreviousSubscriptions boolValue];

std::unique_ptr<SubscriptionCallback> callback;
std::unique_ptr<ReadClient> readClient;
std::unique_ptr<ClusterStateCache> attributeCache;
if (attributeCacheContainer) {
__weak CHIPAttributeCacheContainer * weakPtr = attributeCacheContainer;
callback = std::make_unique<SubscriptionCallback>(
queue, attributeReportHandler, eventReportHandler, errorHandler, subscriptionEstablishedHandler, ^{
CHIPAttributeCacheContainer * container = weakPtr;
if (container) {
container.cppAttributeCache = nullptr;
}
});
attributeCache = std::make_unique<ClusterStateCache>(*callback.get());
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), device->GetExchangeManager(),
attributeCache->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
} else {
callback = std::make_unique<SubscriptionCallback>(
queue, attributeReportHandler, eventReportHandler, errorHandler, subscriptionEstablishedHandler);
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), device->GetExchangeManager(),
callback->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
}

if (err != CHIP_NO_ERROR) {
dispatch_async(queue, ^{
errorHandler([CHIPError errorForCHIPErrorCode:err]);
});
CHIP_ERROR err;
if (params != nil && params.autoResubscribe != nil && ![params.autoResubscribe boolValue]) {
err = readClient->SendRequest(readParams);
} else {
// SendAutoResubscribeRequest cleans up the params, even on failure.
attributePath.release();
eventPath.release();
err = readClient->SendAutoResubscribeRequest(std::move(readParams));
}

return;
}
if (err != CHIP_NO_ERROR) {
dispatch_async(queue, ^{
errorHandler([CHIPError errorForCHIPErrorCode:err]);
});

if (attributeCacheContainer) {
attributeCacheContainer.cppAttributeCache = attributeCache.get();
// ClusterStateCache will be deleted when OnDone is called or an error is encountered as well.
callback->AdoptAttributeCache(std::move(attributeCache));
}
// Callback and ReadClient will be deleted when OnDone is called or an error is
// encountered.
callback->AdoptReadClient(std::move(readClient));
callback.release();
return;
}

if (attributeCacheContainer) {
attributeCacheContainer.cppAttributeCache = attributeCache.get();
// ClusterStateCache will be deleted when OnDone is called or an error is encountered as well.
callback->AdoptAttributeCache(std::move(attributeCache));
}
// Callback and ReadClient will be deleted when OnDone is called or an error is
// encountered.
callback->AdoptReadClient(std::move(readClient));
callback.release();
});
}

// Convert TLV data into NSObject
Expand Down

0 comments on commit 7071652

Please sign in to comment.