Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some threading issues on Darwin. #20197

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,7 @@ class TestCommandBridge : public CHIPCommandBridge,
// just hand it right back to us without establishing a new CASE
// session.
if (GetDevice(identity) != nil) {
auto device = [GetDevice(identity) internalDevice];
if (device != nullptr) {
device->Disconnect();
}
[GetDevice(identity) disconnect];
mConnectedDevices[identity] = nil;
}

Expand Down
6 changes: 6 additions & 0 deletions src/darwin/Framework/CHIP/CHIPDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ extern NSString * const kCHIPArrayValueType;
- (instancetype)init NS_UNAVAILABLE;
+ (instancetype)new NS_UNAVAILABLE;

/**
* Disconnect the device, so an attempt to getConnectedDevice for this device id
* will have to create a new secure session.
*/
- (void)disconnect;

/**
* Subscribe to receive attribute reports for everything (all endpoints, all
* clusters, all attributes, all events) on the device.
Expand Down
142 changes: 77 additions & 65 deletions src/darwin/Framework/CHIP/CHIPDevice.mm
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,16 @@ - (instancetype)initWithDevice:(chip::DeviceProxy *)device
return _cppDevice;
}

- (void)disconnect
{
dispatch_sync(DeviceLayer::PlatformMgrImpl().GetWorkQueue(), ^{
DeviceProxy * device = [self internalDevice];
if (device != nullptr) {
device->Disconnect();
}
});
}

typedef void (^ReportCallback)(NSArray * _Nullable value, NSError * _Nullable error);
typedef void (^DataReportCallback)(NSArray * value);
typedef void (^ErrorCallback)(NSError * error);
Expand Down Expand Up @@ -371,76 +381,78 @@ - (void)subscribeWithQueue:(dispatch_queue_t)queue
errorHandler:(void (^)(NSError * error))errorHandler
subscriptionEstablished:(nullable void (^)(void))subscriptionEstablishedHandler
{
DeviceProxy * device = [self internalDevice];
if (!device) {
dispatch_async(queue, ^{
errorHandler([CHIPError errorForCHIPErrorCode:CHIP_ERROR_INCORRECT_STATE]);
});
return;
}

// Wildcard endpoint, cluster, attribute, event.
auto attributePath = std::make_unique<AttributePathParams>();
auto eventPath = std::make_unique<EventPathParams>();
ReadPrepareParams readParams(device->GetSecureSession().Value());
readParams.mMinIntervalFloorSeconds = minInterval;
readParams.mMaxIntervalCeilingSeconds = maxInterval;
readParams.mpAttributePathParamsList = attributePath.get();
readParams.mAttributePathParamsListSize = 1;
readParams.mpEventPathParamsList = eventPath.get();
readParams.mEventPathParamsListSize = 1;
readParams.mKeepSubscriptions
= (params != nil) && (params.keepPreviousSubscriptions != nil) && [params.keepPreviousSubscriptions boolValue];

std::unique_ptr<SubscriptionCallback> callback;
std::unique_ptr<ReadClient> readClient;
std::unique_ptr<ClusterStateCache> attributeCache;
if (attributeCacheContainer) {
__weak CHIPAttributeCacheContainer * weakPtr = attributeCacheContainer;
callback = std::make_unique<SubscriptionCallback>(
queue, attributeReportHandler, eventReportHandler, errorHandler, subscriptionEstablishedHandler, ^{
CHIPAttributeCacheContainer * container = weakPtr;
if (container) {
container.cppAttributeCache = nullptr;
}
dispatch_async(DeviceLayer::PlatformMgrImpl().GetWorkQueue(), ^{
DeviceProxy * device = [self internalDevice];
if (!device) {
dispatch_async(queue, ^{
errorHandler([CHIPError errorForCHIPErrorCode:CHIP_ERROR_INCORRECT_STATE]);
});
attributeCache = std::make_unique<ClusterStateCache>(*callback.get());
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), device->GetExchangeManager(),
attributeCache->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
} else {
callback = std::make_unique<SubscriptionCallback>(
queue, attributeReportHandler, eventReportHandler, errorHandler, subscriptionEstablishedHandler);
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), device->GetExchangeManager(),
callback->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
}
return;
}

CHIP_ERROR err;
if (params != nil && params.autoResubscribe != nil && ![params.autoResubscribe boolValue]) {
err = readClient->SendRequest(readParams);
} else {
// SendAutoResubscribeRequest cleans up the params, even on failure.
attributePath.release();
eventPath.release();
err = readClient->SendAutoResubscribeRequest(std::move(readParams));
}
// Wildcard endpoint, cluster, attribute, event.
auto attributePath = std::make_unique<AttributePathParams>();
auto eventPath = std::make_unique<EventPathParams>();
ReadPrepareParams readParams(device->GetSecureSession().Value());
readParams.mMinIntervalFloorSeconds = minInterval;
readParams.mMaxIntervalCeilingSeconds = maxInterval;
readParams.mpAttributePathParamsList = attributePath.get();
readParams.mAttributePathParamsListSize = 1;
readParams.mpEventPathParamsList = eventPath.get();
readParams.mEventPathParamsListSize = 1;
readParams.mKeepSubscriptions
= (params != nil) && (params.keepPreviousSubscriptions != nil) && [params.keepPreviousSubscriptions boolValue];

std::unique_ptr<SubscriptionCallback> callback;
std::unique_ptr<ReadClient> readClient;
std::unique_ptr<ClusterStateCache> attributeCache;
if (attributeCacheContainer) {
__weak CHIPAttributeCacheContainer * weakPtr = attributeCacheContainer;
callback = std::make_unique<SubscriptionCallback>(
queue, attributeReportHandler, eventReportHandler, errorHandler, subscriptionEstablishedHandler, ^{
CHIPAttributeCacheContainer * container = weakPtr;
if (container) {
container.cppAttributeCache = nullptr;
}
});
attributeCache = std::make_unique<ClusterStateCache>(*callback.get());
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), device->GetExchangeManager(),
attributeCache->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
} else {
callback = std::make_unique<SubscriptionCallback>(
queue, attributeReportHandler, eventReportHandler, errorHandler, subscriptionEstablishedHandler);
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), device->GetExchangeManager(),
callback->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
}

if (err != CHIP_NO_ERROR) {
dispatch_async(queue, ^{
errorHandler([CHIPError errorForCHIPErrorCode:err]);
});
CHIP_ERROR err;
if (params != nil && params.autoResubscribe != nil && ![params.autoResubscribe boolValue]) {
err = readClient->SendRequest(readParams);
} else {
// SendAutoResubscribeRequest cleans up the params, even on failure.
attributePath.release();
eventPath.release();
err = readClient->SendAutoResubscribeRequest(std::move(readParams));
}

return;
}
if (err != CHIP_NO_ERROR) {
dispatch_async(queue, ^{
errorHandler([CHIPError errorForCHIPErrorCode:err]);
});

if (attributeCacheContainer) {
attributeCacheContainer.cppAttributeCache = attributeCache.get();
// ClusterStateCache will be deleted when OnDone is called or an error is encountered as well.
callback->AdoptAttributeCache(std::move(attributeCache));
}
// Callback and ReadClient will be deleted when OnDone is called or an error is
// encountered.
callback->AdoptReadClient(std::move(readClient));
callback.release();
return;
}

if (attributeCacheContainer) {
attributeCacheContainer.cppAttributeCache = attributeCache.get();
// ClusterStateCache will be deleted when OnDone is called or an error is encountered as well.
callback->AdoptAttributeCache(std::move(attributeCache));
}
// Callback and ReadClient will be deleted when OnDone is called or an error is
// encountered.
callback->AdoptReadClient(std::move(readClient));
callback.release();
});
}

// Convert TLV data into NSObject
Expand Down