Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move identical parts of SubscriptionCallback into a shared base class. #22340

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 7 additions & 214 deletions src/darwin/Framework/CHIP/MTRBaseDevice.mm
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
#import "MTRAttributeCacheContainer_Internal.h"
#import "MTRAttributeTLVValueDecoder_Internal.h"
#import "MTRBaseDevice_Internal.h"
#import "MTRBaseSubscriptionCallback.h"
#import "MTRCallbackBridgeBase_internal.h"
#import "MTRCluster.h"
#import "MTRError_Internal.h"
#import "MTREventTLVValueDecoder_Internal.h"
#import "MTRLogging.h"

#include "app/ConcreteAttributePath.h"
#include "app/ConcreteCommandPath.h"
#include "lib/core/CHIPError.h"
Expand All @@ -39,8 +41,6 @@

#include <memory>

typedef void (^SubscriptionEstablishedHandler)(void);

using namespace chip;
using namespace chip::app;
using namespace chip::Protocols::InteractionModel;
Expand Down Expand Up @@ -268,107 +268,21 @@ - (void)invalidateCASESession
[self.deviceController invalidateCASESessionForNode:self.deviceID];
}

typedef void (^ReportCallback)(NSArray * _Nullable value, NSError * _Nullable error);
typedef void (^DataReportCallback)(NSArray * value);
typedef void (^ErrorCallback)(NSError * error);

namespace {

class SubscriptionCallback final : public ClusterStateCache::Callback {
class SubscriptionCallback final : public MTRBaseSubscriptionCallback {
public:
SubscriptionCallback(dispatch_queue_t queue, DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback,
ErrorCallback errorCallback, SubscriptionEstablishedHandler _Nullable subscriptionEstablishedHandler)
: mQueue(queue)
, mAttributeReportCallback(attributeReportCallback)
, mEventReportCallback(eventReportCallback)
, mErrorCallback(errorCallback)
, mSubscriptionEstablishedHandler(subscriptionEstablishedHandler)
, mBufferedReadAdapter(*this)
{
}

SubscriptionCallback(dispatch_queue_t queue, DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback,
ErrorCallback errorCallback, SubscriptionEstablishedHandler _Nullable subscriptionEstablishedHandler,
void (^onDoneHandler)(void))
: mQueue(queue)
, mAttributeReportCallback(attributeReportCallback)
, mEventReportCallback(eventReportCallback)
, mErrorCallback(errorCallback)
, mSubscriptionEstablishedHandler(subscriptionEstablishedHandler)
, mBufferedReadAdapter(*this)
, mOnDoneHandler(onDoneHandler)
{
}

~SubscriptionCallback()
OnDoneHandler _Nullable onDoneHandler)
: MTRBaseSubscriptionCallback(
queue, attributeReportCallback, eventReportCallback, errorCallback, nil, subscriptionEstablishedHandler, onDoneHandler)
{
// Ensure we release the ReadClient before we tear down anything else,
// so it can call our OnDeallocatePaths properly.
mReadClient = nullptr;
}

BufferedReadCallback & GetBufferedCallback() { return mBufferedReadAdapter; }

// We need to exist to get a ReadClient, so can't take this as a constructor argument.
void AdoptReadClient(std::unique_ptr<ReadClient> aReadClient) { mReadClient = std::move(aReadClient); }
void AdoptAttributeCache(std::unique_ptr<ClusterStateCache> aAttributeCache) { mAttributeCache = std::move(aAttributeCache); }

private:
void OnReportBegin() override;

void OnReportEnd() override;

void OnEventData(const EventHeader & aEventHeader, TLV::TLVReader * apData, const StatusIB * apStatus) override;

void OnAttributeData(const ConcreteDataAttributePath & aPath, TLV::TLVReader * apData, const StatusIB & aStatus) override;

void OnError(CHIP_ERROR aError) override;

void OnDone(ReadClient * aReadClient) override;

void OnDeallocatePaths(ReadPrepareParams && aReadPrepareParams) override;

void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override;

void ReportData();

// Report an error, which may be due to issues in our own internal state or
// due to the OnError callback happening.
//
// aCancelSubscription should be false for the OnError case, since it will
// be immediately followed by OnDone and we want to do the deletion there.
void ReportError(CHIP_ERROR aError, bool aCancelSubscription = true);

private:
dispatch_queue_t mQueue;
DataReportCallback _Nullable mAttributeReportCallback = nil;
DataReportCallback _Nullable mEventReportCallback = nil;
// We set mErrorCallback to nil when queueing error reports, so we
// make sure to only report one error.
ErrorCallback _Nullable mErrorCallback = nil;
SubscriptionEstablishedHandler _Nullable mSubscriptionEstablishedHandler;
BufferedReadCallback mBufferedReadAdapter;
NSMutableArray * _Nullable mAttributeReports = nil;
NSMutableArray * _Nullable mEventReports = nil;

// Our lifetime management is a little complicated. On errors that don't
// originate with the ReadClient we attempt to delete ourselves (and hence
// the ReadClient), but asynchronously, because the ReadClient API doesn't
// allow sync deletion under callbacks other than OnDone. While that's
// pending, something else (e.g. an error it runs into) could end up calling
// OnDone on us. And generally if OnDone is called we want to delete
// ourselves as well.
//
// To handle this, enforce the following rules:
//
// 1) We guarantee that mErrorCallback is only invoked with an error once.
// 2) We ensure that we delete ourselves and the passed in ReadClient only from OnDone or a queued-up
// error callback, but not both, by tracking whether we have a queued-up
// deletion.
std::unique_ptr<ReadClient> mReadClient;
std::unique_ptr<ClusterStateCache> mAttributeCache;
bool mHaveQueuedDeletion = false;
void (^mOnDoneHandler)(void) = nil;
};

} // anonymous namespace
Expand Down Expand Up @@ -433,7 +347,7 @@ - (void)subscribeWithQueue:(dispatch_queue_t)queue
attributeCache->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
} else {
callback = std::make_unique<SubscriptionCallback>(queue, attributeReportHandler,
eventReportHandler, errorHandler, subscriptionEstablishedHandler);
eventReportHandler, errorHandler, subscriptionEstablishedHandler, nil);
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), exchangeManager,
callback->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
}
Expand Down Expand Up @@ -1502,37 +1416,6 @@ - (instancetype)initWithPath:(const ConcreteEventPath &)path
@end

namespace {
void SubscriptionCallback::OnReportBegin()
{
mAttributeReports = [NSMutableArray new];
mEventReports = [NSMutableArray new];
}

// Reports attribute and event data if any exists
void SubscriptionCallback::ReportData()
{
__block NSArray * attributeReports = mAttributeReports;
mAttributeReports = nil;
__block auto attributeCallback = mAttributeReportCallback;

__block NSArray * eventReports = mEventReports;
mEventReports = nil;
__block auto eventCallback = mEventReportCallback;

if (attributeCallback != nil && attributeReports.count) {
dispatch_async(mQueue, ^{
attributeCallback(attributeReports);
});
}
if (eventCallback != nil && eventReports.count) {
dispatch_async(mQueue, ^{
eventCallback(eventReports);
});
}
}

void SubscriptionCallback::OnReportEnd() { ReportData(); }

void SubscriptionCallback::OnEventData(const EventHeader & aEventHeader, TLV::TLVReader * apData, const StatusIB * apStatus)
{
id _Nullable value = nil;
Expand Down Expand Up @@ -1606,94 +1489,4 @@ - (instancetype)initWithPath:(const ConcreteEventPath &)path
[mAttributeReports addObject:[[MTRAttributeReport alloc] initWithPath:aPath value:value error:error]];
}

void SubscriptionCallback::OnError(CHIP_ERROR aError)
{
// If OnError is called after OnReportBegin, we should report the collected data
ReportData();
ReportError(aError, /* aCancelSubscription = */ false);
}

void SubscriptionCallback::OnDone(ReadClient *)
{
if (mOnDoneHandler) {
mOnDoneHandler();
mOnDoneHandler = nil;
}
if (!mHaveQueuedDeletion) {
delete this;
return; // Make sure we touch nothing else.
}
}

void SubscriptionCallback::OnDeallocatePaths(ReadPrepareParams && aReadPrepareParams)
{
VerifyOrDie((aReadPrepareParams.mAttributePathParamsListSize == 0 && aReadPrepareParams.mpAttributePathParamsList == nullptr)
|| (aReadPrepareParams.mAttributePathParamsListSize == 1 && aReadPrepareParams.mpAttributePathParamsList != nullptr));
if (aReadPrepareParams.mpAttributePathParamsList) {
delete aReadPrepareParams.mpAttributePathParamsList;
}

VerifyOrDie((aReadPrepareParams.mDataVersionFilterListSize == 0 && aReadPrepareParams.mpDataVersionFilterList == nullptr)
|| (aReadPrepareParams.mDataVersionFilterListSize == 1 && aReadPrepareParams.mpDataVersionFilterList != nullptr));
if (aReadPrepareParams.mpDataVersionFilterList != nullptr) {
delete aReadPrepareParams.mpDataVersionFilterList;
}

VerifyOrDie((aReadPrepareParams.mEventPathParamsListSize == 0 && aReadPrepareParams.mpEventPathParamsList == nullptr)
|| (aReadPrepareParams.mEventPathParamsListSize == 1 && aReadPrepareParams.mpEventPathParamsList != nullptr));
if (aReadPrepareParams.mpEventPathParamsList) {
delete aReadPrepareParams.mpEventPathParamsList;
}
}

void SubscriptionCallback::OnSubscriptionEstablished(SubscriptionId aSubscriptionId)
{
if (mSubscriptionEstablishedHandler) {
dispatch_async(mQueue, mSubscriptionEstablishedHandler);
}
}

void SubscriptionCallback::ReportError(CHIP_ERROR aError, bool aCancelSubscription)
{
auto * err = [MTRError errorForCHIPErrorCode:aError];
if (!err) {
// Very strange... Someone tried to report a success status as an error?
return;
}

if (mHaveQueuedDeletion) {
// Already have an error report pending which will delete us.
return;
}

__block ErrorCallback callback = mErrorCallback;
__block auto * myself = this;
mErrorCallback = nil;
mAttributeReportCallback = nil;
mEventReportCallback = nil;
__auto_type onDoneHandler = mOnDoneHandler;
mOnDoneHandler = nil;
dispatch_async(mQueue, ^{
callback(err);
if (onDoneHandler) {
onDoneHandler();
}
});

if (aCancelSubscription) {
// We can't synchronously delete ourselves, because we're inside one of
// the ReadClient callbacks and we need to outlive the callback's
// execution. Queue an async deletion on the Matter queue (where we are
// running already).
//
// If we now get OnDone, we will ignore that, since we have the deletion
// posted already, but that's OK even during shutdown: since we are
// queueing the deletion now, it will be processed before the Matter queue
// gets paused, which is fairly early in the shutdown process.
mHaveQueuedDeletion = true;
dispatch_async(DeviceLayer::PlatformMgrImpl().GetWorkQueue(), ^{
delete myself;
});
}
}
} // anonymous namespace
Loading