Skip to content

Commit

Permalink
Move identical parts of SubscriptionCallback into a shared base class. (
Browse files Browse the repository at this point in the history
#22340)

Fixes #22322
  • Loading branch information
bzbarsky-apple authored and pull[bot] committed Oct 17, 2023
1 parent 20c61e9 commit 8803702
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 419 deletions.
221 changes: 7 additions & 214 deletions src/darwin/Framework/CHIP/MTRBaseDevice.mm
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
#import "MTRAttributeCacheContainer_Internal.h"
#import "MTRAttributeTLVValueDecoder_Internal.h"
#import "MTRBaseDevice_Internal.h"
#import "MTRBaseSubscriptionCallback.h"
#import "MTRCallbackBridgeBase_internal.h"
#import "MTRCluster.h"
#import "MTRError_Internal.h"
#import "MTREventTLVValueDecoder_Internal.h"
#import "MTRLogging.h"

#include "app/ConcreteAttributePath.h"
#include "app/ConcreteCommandPath.h"
#include "lib/core/CHIPError.h"
Expand All @@ -39,8 +41,6 @@

#include <memory>

typedef void (^SubscriptionEstablishedHandler)(void);

using namespace chip;
using namespace chip::app;
using namespace chip::Protocols::InteractionModel;
Expand Down Expand Up @@ -268,107 +268,21 @@ - (void)invalidateCASESession
[self.deviceController invalidateCASESessionForNode:self.deviceID];
}

typedef void (^ReportCallback)(NSArray * _Nullable value, NSError * _Nullable error);
typedef void (^DataReportCallback)(NSArray * value);
typedef void (^ErrorCallback)(NSError * error);

namespace {

class SubscriptionCallback final : public ClusterStateCache::Callback {
class SubscriptionCallback final : public MTRBaseSubscriptionCallback {
public:
SubscriptionCallback(dispatch_queue_t queue, DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback,
ErrorCallback errorCallback, SubscriptionEstablishedHandler _Nullable subscriptionEstablishedHandler)
: mQueue(queue)
, mAttributeReportCallback(attributeReportCallback)
, mEventReportCallback(eventReportCallback)
, mErrorCallback(errorCallback)
, mSubscriptionEstablishedHandler(subscriptionEstablishedHandler)
, mBufferedReadAdapter(*this)
{
}

SubscriptionCallback(dispatch_queue_t queue, DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback,
ErrorCallback errorCallback, SubscriptionEstablishedHandler _Nullable subscriptionEstablishedHandler,
void (^onDoneHandler)(void))
: mQueue(queue)
, mAttributeReportCallback(attributeReportCallback)
, mEventReportCallback(eventReportCallback)
, mErrorCallback(errorCallback)
, mSubscriptionEstablishedHandler(subscriptionEstablishedHandler)
, mBufferedReadAdapter(*this)
, mOnDoneHandler(onDoneHandler)
{
}

~SubscriptionCallback()
OnDoneHandler _Nullable onDoneHandler)
: MTRBaseSubscriptionCallback(
queue, attributeReportCallback, eventReportCallback, errorCallback, nil, subscriptionEstablishedHandler, onDoneHandler)
{
// Ensure we release the ReadClient before we tear down anything else,
// so it can call our OnDeallocatePaths properly.
mReadClient = nullptr;
}

BufferedReadCallback & GetBufferedCallback() { return mBufferedReadAdapter; }

// We need to exist to get a ReadClient, so can't take this as a constructor argument.
void AdoptReadClient(std::unique_ptr<ReadClient> aReadClient) { mReadClient = std::move(aReadClient); }
void AdoptAttributeCache(std::unique_ptr<ClusterStateCache> aAttributeCache) { mAttributeCache = std::move(aAttributeCache); }

private:
void OnReportBegin() override;

void OnReportEnd() override;

void OnEventData(const EventHeader & aEventHeader, TLV::TLVReader * apData, const StatusIB * apStatus) override;

void OnAttributeData(const ConcreteDataAttributePath & aPath, TLV::TLVReader * apData, const StatusIB & aStatus) override;

void OnError(CHIP_ERROR aError) override;

void OnDone(ReadClient * aReadClient) override;

void OnDeallocatePaths(ReadPrepareParams && aReadPrepareParams) override;

void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override;

void ReportData();

// Report an error, which may be due to issues in our own internal state or
// due to the OnError callback happening.
//
// aCancelSubscription should be false for the OnError case, since it will
// be immediately followed by OnDone and we want to do the deletion there.
void ReportError(CHIP_ERROR aError, bool aCancelSubscription = true);

private:
dispatch_queue_t mQueue;
DataReportCallback _Nullable mAttributeReportCallback = nil;
DataReportCallback _Nullable mEventReportCallback = nil;
// We set mErrorCallback to nil when queueing error reports, so we
// make sure to only report one error.
ErrorCallback _Nullable mErrorCallback = nil;
SubscriptionEstablishedHandler _Nullable mSubscriptionEstablishedHandler;
BufferedReadCallback mBufferedReadAdapter;
NSMutableArray * _Nullable mAttributeReports = nil;
NSMutableArray * _Nullable mEventReports = nil;

// Our lifetime management is a little complicated. On errors that don't
// originate with the ReadClient we attempt to delete ourselves (and hence
// the ReadClient), but asynchronously, because the ReadClient API doesn't
// allow sync deletion under callbacks other than OnDone. While that's
// pending, something else (e.g. an error it runs into) could end up calling
// OnDone on us. And generally if OnDone is called we want to delete
// ourselves as well.
//
// To handle this, enforce the following rules:
//
// 1) We guarantee that mErrorCallback is only invoked with an error once.
// 2) We ensure that we delete ourselves and the passed in ReadClient only from OnDone or a queued-up
// error callback, but not both, by tracking whether we have a queued-up
// deletion.
std::unique_ptr<ReadClient> mReadClient;
std::unique_ptr<ClusterStateCache> mAttributeCache;
bool mHaveQueuedDeletion = false;
void (^mOnDoneHandler)(void) = nil;
};

} // anonymous namespace
Expand Down Expand Up @@ -433,7 +347,7 @@ - (void)subscribeWithQueue:(dispatch_queue_t)queue
attributeCache->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
} else {
callback = std::make_unique<SubscriptionCallback>(queue, attributeReportHandler,
eventReportHandler, errorHandler, subscriptionEstablishedHandler);
eventReportHandler, errorHandler, subscriptionEstablishedHandler, nil);
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), exchangeManager,
callback->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
}
Expand Down Expand Up @@ -1502,37 +1416,6 @@ - (instancetype)initWithPath:(const ConcreteEventPath &)path
@end

namespace {
void SubscriptionCallback::OnReportBegin()
{
mAttributeReports = [NSMutableArray new];
mEventReports = [NSMutableArray new];
}

// Reports attribute and event data if any exists
void SubscriptionCallback::ReportData()
{
__block NSArray * attributeReports = mAttributeReports;
mAttributeReports = nil;
__block auto attributeCallback = mAttributeReportCallback;

__block NSArray * eventReports = mEventReports;
mEventReports = nil;
__block auto eventCallback = mEventReportCallback;

if (attributeCallback != nil && attributeReports.count) {
dispatch_async(mQueue, ^{
attributeCallback(attributeReports);
});
}
if (eventCallback != nil && eventReports.count) {
dispatch_async(mQueue, ^{
eventCallback(eventReports);
});
}
}

void SubscriptionCallback::OnReportEnd() { ReportData(); }

void SubscriptionCallback::OnEventData(const EventHeader & aEventHeader, TLV::TLVReader * apData, const StatusIB * apStatus)
{
id _Nullable value = nil;
Expand Down Expand Up @@ -1606,94 +1489,4 @@ - (instancetype)initWithPath:(const ConcreteEventPath &)path
[mAttributeReports addObject:[[MTRAttributeReport alloc] initWithPath:aPath value:value error:error]];
}

void SubscriptionCallback::OnError(CHIP_ERROR aError)
{
// If OnError is called after OnReportBegin, we should report the collected data
ReportData();
ReportError(aError, /* aCancelSubscription = */ false);
}

void SubscriptionCallback::OnDone(ReadClient *)
{
if (mOnDoneHandler) {
mOnDoneHandler();
mOnDoneHandler = nil;
}
if (!mHaveQueuedDeletion) {
delete this;
return; // Make sure we touch nothing else.
}
}

void SubscriptionCallback::OnDeallocatePaths(ReadPrepareParams && aReadPrepareParams)
{
VerifyOrDie((aReadPrepareParams.mAttributePathParamsListSize == 0 && aReadPrepareParams.mpAttributePathParamsList == nullptr)
|| (aReadPrepareParams.mAttributePathParamsListSize == 1 && aReadPrepareParams.mpAttributePathParamsList != nullptr));
if (aReadPrepareParams.mpAttributePathParamsList) {
delete aReadPrepareParams.mpAttributePathParamsList;
}

VerifyOrDie((aReadPrepareParams.mDataVersionFilterListSize == 0 && aReadPrepareParams.mpDataVersionFilterList == nullptr)
|| (aReadPrepareParams.mDataVersionFilterListSize == 1 && aReadPrepareParams.mpDataVersionFilterList != nullptr));
if (aReadPrepareParams.mpDataVersionFilterList != nullptr) {
delete aReadPrepareParams.mpDataVersionFilterList;
}

VerifyOrDie((aReadPrepareParams.mEventPathParamsListSize == 0 && aReadPrepareParams.mpEventPathParamsList == nullptr)
|| (aReadPrepareParams.mEventPathParamsListSize == 1 && aReadPrepareParams.mpEventPathParamsList != nullptr));
if (aReadPrepareParams.mpEventPathParamsList) {
delete aReadPrepareParams.mpEventPathParamsList;
}
}

void SubscriptionCallback::OnSubscriptionEstablished(SubscriptionId aSubscriptionId)
{
if (mSubscriptionEstablishedHandler) {
dispatch_async(mQueue, mSubscriptionEstablishedHandler);
}
}

void SubscriptionCallback::ReportError(CHIP_ERROR aError, bool aCancelSubscription)
{
auto * err = [MTRError errorForCHIPErrorCode:aError];
if (!err) {
// Very strange... Someone tried to report a success status as an error?
return;
}

if (mHaveQueuedDeletion) {
// Already have an error report pending which will delete us.
return;
}

__block ErrorCallback callback = mErrorCallback;
__block auto * myself = this;
mErrorCallback = nil;
mAttributeReportCallback = nil;
mEventReportCallback = nil;
__auto_type onDoneHandler = mOnDoneHandler;
mOnDoneHandler = nil;
dispatch_async(mQueue, ^{
callback(err);
if (onDoneHandler) {
onDoneHandler();
}
});

if (aCancelSubscription) {
// We can't synchronously delete ourselves, because we're inside one of
// the ReadClient callbacks and we need to outlive the callback's
// execution. Queue an async deletion on the Matter queue (where we are
// running already).
//
// If we now get OnDone, we will ignore that, since we have the deletion
// posted already, but that's OK even during shutdown: since we are
// queueing the deletion now, it will be processed before the Matter queue
// gets paused, which is fairly early in the shutdown process.
mHaveQueuedDeletion = true;
dispatch_async(DeviceLayer::PlatformMgrImpl().GetWorkQueue(), ^{
delete myself;
});
}
}
} // anonymous namespace
Loading

0 comments on commit 8803702

Please sign in to comment.