Skip to content

Commit

Permalink
[ICD] Client-side device communication notification plumbing (#24061)
Browse files Browse the repository at this point in the history
  • Loading branch information
jtung-apple authored and pull[bot] committed Jun 20, 2023
1 parent eab2bd1 commit 2063105
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 29 deletions.
6 changes: 6 additions & 0 deletions src/app/BufferedReadCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ class BufferedReadCallback : public ReadClient::Callback
{
return mCallback.GetHighestReceivedEventNumber(aEventNumber);
}

void OnUnsolicitedMessageFromPublisher(ReadClient * apReadClient) override
{
return mCallback.OnUnsolicitedMessageFromPublisher(apReadClient);
}

/*
* Given a reader positioned at a list element, allocate a packet buffer, copy the list item where
* the reader is positioned into that buffer and add it to our buffered list for tracking.
Expand Down
5 changes: 5 additions & 0 deletions src/app/ClusterStateCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,11 @@ class ClusterStateCache : protected ReadClient::Callback
const Span<AttributePathParams> & aAttributePaths,
bool & aEncodedDataVersionList) override;

void OnUnsolicitedMessageFromPublisher(ReadClient * apReadClient) override
{
return mCallback.OnUnsolicitedMessageFromPublisher(apReadClient);
}

// Commit the pending cluster data version, if there is one.
void CommitPendingDataVersion();

Expand Down
26 changes: 22 additions & 4 deletions src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -630,19 +630,37 @@ Status InteractionModelEngine::OnUnsolicitedReportData(Messaging::ExchangeContex
VerifyOrReturnError(report.GetSubscriptionId(&subscriptionId) == CHIP_NO_ERROR, Status::InvalidAction);
VerifyOrReturnError(report.ExitContainer() == CHIP_NO_ERROR, Status::InvalidAction);

ReadClient * foundSubscription = nullptr;
for (auto * readClient = mpActiveReadClientList; readClient != nullptr; readClient = readClient->GetNextClient())
{
auto peer = apExchangeContext->GetSessionHandle()->GetPeer();
if (readClient->GetFabricIndex() != peer.GetFabricIndex() || readClient->GetPeerNodeId() != peer.GetNodeId())
{
continue;
}

// Notify Subscriptions about incoming communication from node
readClient->OnUnsolicitedMessageFromPublisher();

if (!readClient->IsSubscriptionActive())
{
continue;
}
auto peer = apExchangeContext->GetSessionHandle()->GetPeer();
if (readClient->GetFabricIndex() != peer.GetFabricIndex() || readClient->GetPeerNodeId() != peer.GetNodeId() ||
!readClient->IsMatchingSubscriptionId(subscriptionId))

if (!readClient->IsMatchingSubscriptionId(subscriptionId))
{
continue;
}
readClient->OnUnsolicitedReportData(apExchangeContext, std::move(aPayload));

if (!foundSubscription)
{
foundSubscription = readClient;
}
}

if (foundSubscription)
{
foundSubscription->OnUnsolicitedReportData(apExchangeContext, std::move(aPayload));
return Status::Success;
}

Expand Down
5 changes: 5 additions & 0 deletions src/app/ReadClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ void ReadClient::ClearActiveSubscriptionState()
mMinIntervalFloorSeconds = 0;
mMaxInterval = 0;
mSubscriptionId = 0;
mIsResubscriptionScheduled = false;
MoveToState(ClientState::Idle);
}

Expand Down Expand Up @@ -155,6 +156,7 @@ CHIP_ERROR ReadClient::ScheduleResubscription(uint32_t aTimeTillNextResubscripti
ReturnErrorOnFailure(
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
System::Clock::Milliseconds32(aTimeTillNextResubscriptionMs), OnResubscribeTimerCallback, this));
mIsResubscriptionScheduled = true;

return CHIP_NO_ERROR;
}
Expand Down Expand Up @@ -841,6 +843,7 @@ void ReadClient::CancelResubscribeTimer()
{
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
OnResubscribeTimerCallback, this);
mIsResubscriptionScheduled = false;
}

void ReadClient::OnLivenessTimeoutCallback(System::Layer * apSystemLayer, void * apAppState)
Expand Down Expand Up @@ -1095,6 +1098,8 @@ void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void
ReadClient * const _this = static_cast<ReadClient *>(apAppState);
VerifyOrDie(_this != nullptr);

_this->mIsResubscriptionScheduled = false;

CHIP_ERROR err;

ChipLogProgress(DataManagement, "OnResubscribeTimerCallback: ForceCASE = %d", _this->mForceCaseOnNextResub);
Expand Down
32 changes: 31 additions & 1 deletion src/app/ReadClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,21 @@ class ReadClient : public Messaging::ExchangeDelegate
aEventNumber.ClearValue();
return CHIP_NO_ERROR;
}

/**
* OnUnsolicitedMessageFromPublisher will be called for a subscription
* ReadClient when any incoming message is received from a matching
* node on the fabric.
*
* This callback will be called:
* - When receiving any unsolicited communication from the node
* - Even for disconnected subscriptions.
*
* Callee MUST not synchronously destroy ReadClients in this callback.
*
* @param[in] apReadClient the ReadClient for the subscription.
*/
virtual void OnUnsolicitedMessageFromPublisher(ReadClient * apReadClient) {}
};

enum class InteractionType : uint8_t
Expand Down Expand Up @@ -288,6 +303,20 @@ class ReadClient : public Messaging::ExchangeDelegate

void OnUnsolicitedReportData(Messaging::ExchangeContext * apExchangeContext, System::PacketBufferHandle && aPayload);

void OnUnsolicitedMessageFromPublisher()
{
// accelerate resubscription if scheduled
if (mIsResubscriptionScheduled)
{
ChipLogDetail(DataManagement, "%s ReadClient[%p] resubscribe on unsolicited message", __func__, this);
CancelResubscribeTimer();
OnResubscribeTimerCallback(nullptr, this);
}

// Then notify callbacks
mpCallback.OnUnsolicitedMessageFromPublisher(this);
}

auto GetSubscriptionId() const
{
using returnType = Optional<decltype(mSubscriptionId)>;
Expand Down Expand Up @@ -501,7 +530,8 @@ class ReadClient : public Messaging::ExchangeDelegate
InteractionType mInteractionType = InteractionType::Read;
Timestamp mEventTimestamp;

bool mForceCaseOnNextResub = true;
bool mForceCaseOnNextResub = true;
bool mIsResubscriptionScheduled = false;

chip::Callback::Callback<OnDeviceConnected> mOnConnectedCallback;
chip::Callback::Callback<OnDeviceConnectionFailure> mOnConnectionFailureCallback;
Expand Down
8 changes: 7 additions & 1 deletion src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,22 @@ typedef void (^DataReportCallback)(NSArray * value);
typedef void (^ErrorCallback)(NSError * error);
typedef void (^SubscriptionEstablishedHandler)(void);
typedef void (^OnDoneHandler)(void);
typedef void (^UnsolicitedMessageFromPublisherHandler)(void);

class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callback {
public:
MTRBaseSubscriptionCallback(DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback,
ErrorCallback errorCallback, MTRDeviceResubscriptionScheduledHandler _Nullable resubscriptionCallback,
SubscriptionEstablishedHandler _Nullable subscriptionEstablishedHandler, OnDoneHandler _Nullable onDoneHandler)
SubscriptionEstablishedHandler _Nullable subscriptionEstablishedHandler, OnDoneHandler _Nullable onDoneHandler,
UnsolicitedMessageFromPublisherHandler _Nullable unsolicitedMessageFromPublisherHandler = NULL)
: mAttributeReportCallback(attributeReportCallback)
, mEventReportCallback(eventReportCallback)
, mErrorCallback(errorCallback)
, mResubscriptionCallback(resubscriptionCallback)
, mSubscriptionEstablishedHandler(subscriptionEstablishedHandler)
, mBufferedReadAdapter(*this)
, mOnDoneHandler(onDoneHandler)
, mUnsolicitedMessageFromPublisherHandler(unsolicitedMessageFromPublisherHandler)
{
}

Expand Down Expand Up @@ -117,6 +120,8 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac

CHIP_ERROR OnResubscriptionNeeded(chip::app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override;

void OnUnsolicitedMessageFromPublisher(chip::app::ReadClient * apReadClient) override;

void ReportData();

protected:
Expand All @@ -131,6 +136,7 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac
ErrorCallback _Nullable mErrorCallback = nil;
MTRDeviceResubscriptionScheduledHandler _Nullable mResubscriptionCallback = nil;
SubscriptionEstablishedHandler _Nullable mSubscriptionEstablishedHandler = nil;
UnsolicitedMessageFromPublisherHandler _Nullable mUnsolicitedMessageFromPublisherHandler = nil;
chip::app::BufferedReadCallback mBufferedReadAdapter;

// Our lifetime management is a little complicated. On errors that don't
Expand Down
9 changes: 9 additions & 0 deletions src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#import "MTRBaseSubscriptionCallback.h"
#import "MTRError_Internal.h"
#import "MTRLogging_Internal.h"

#include <platform/PlatformManager.h>

Expand Down Expand Up @@ -111,6 +112,14 @@
return CHIP_NO_ERROR;
}

void MTRBaseSubscriptionCallback::OnUnsolicitedMessageFromPublisher(ReadClient *)
{
if (mUnsolicitedMessageFromPublisherHandler) {
auto unsolicitedMessageFromPublisherHandler = mUnsolicitedMessageFromPublisherHandler;
unsolicitedMessageFromPublisherHandler();
}
}

void MTRBaseSubscriptionCallback::ReportError(CHIP_ERROR aError, bool aCancelSubscription)
{
auto * err = [MTRError errorForCHIPErrorCode:aError];
Expand Down
8 changes: 8 additions & 0 deletions src/darwin/Framework/CHIP/MTRDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ typedef NS_ENUM(NSUInteger, MTRDeviceState) {
*/
- (void)device:(MTRDevice *)device receivedEventReport:(NSArray<NSDictionary<NSString *, id> *> *)eventReport;

@optional
/**
* deviceStartedCommunicating:
*
* Notifies delegate the device is currently communicating
*/
- (void)didReceiveCommunicationFromDevice:(MTRDevice *)device;

@end

@interface MTRDevice (Deprecated)
Expand Down
69 changes: 46 additions & 23 deletions src/darwin/Framework/CHIP/MTRDevice.mm
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ - (id)strongObject
public:
SubscriptionCallback(DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback,
ErrorCallback errorCallback, MTRDeviceResubscriptionScheduledHandler resubscriptionCallback,
SubscriptionEstablishedHandler subscriptionEstablishedHandler, OnDoneHandler onDoneHandler)
SubscriptionEstablishedHandler subscriptionEstablishedHandler, OnDoneHandler onDoneHandler,
UnsolicitedMessageFromPublisherHandler unsolicitedMessageFromPublisherHandler)
: MTRBaseSubscriptionCallback(attributeReportCallback, eventReportCallback, errorCallback, resubscriptionCallback,
subscriptionEstablishedHandler, onDoneHandler)
subscriptionEstablishedHandler, onDoneHandler, unsolicitedMessageFromPublisherHandler)
{
}

Expand Down Expand Up @@ -211,20 +212,27 @@ - (void)invalidate
os_unfair_lock_unlock(&self->_lock);
}

// assume lock is held
- (void)_changeState:(MTRDeviceState)state
{
MTRDeviceState lastState = _state;
_state = state;
id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
if (delegate && (lastState != state)) {
dispatch_async(_delegateQueue, ^{
[delegate device:self stateChanged:state];
});
}
}

- (void)_handleSubscriptionEstablished
{
os_unfair_lock_lock(&self->_lock);

// reset subscription attempt wait time when subscription succeeds
_lastSubscriptionAttemptWait = 0;

_state = MTRDeviceStateReachable;
id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
if (delegate) {
dispatch_async(_delegateQueue, ^{
[delegate device:self stateChanged:MTRDeviceStateReachable];
});
}
[self _changeState:MTRDeviceStateReachable];

os_unfair_lock_unlock(&self->_lock);
}
Expand All @@ -236,12 +244,7 @@ - (void)_handleSubscriptionError:(NSError *)error
_subscriptionActive = NO;
_unreportedEvents = nil;

id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
if (delegate) {
dispatch_async(_delegateQueue, ^{
[delegate device:self stateChanged:MTRDeviceStateUnreachable];
});
}
[self _changeState:MTRDeviceStateUnreachable];

os_unfair_lock_unlock(&self->_lock);
}
Expand All @@ -250,14 +253,7 @@ - (void)_handleResubscriptionNeeded
{
os_unfair_lock_lock(&self->_lock);

_state = MTRDeviceStateUnknown;

id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
if (delegate) {
dispatch_async(_delegateQueue, ^{
[delegate device:self stateChanged:MTRDeviceStateUnknown];
});
}
[self _changeState:MTRDeviceStateUnknown];

os_unfair_lock_unlock(&self->_lock);
}
Expand Down Expand Up @@ -303,6 +299,26 @@ - (void)_handleSubscriptionReset
os_unfair_lock_unlock(&self->_lock);
}

- (void)_handleUnsolicitedMessageFromPublisher
{
os_unfair_lock_lock(&self->_lock);

[self _changeState:MTRDeviceStateReachable];

id<MTRDeviceDelegate> delegate = _weakDelegate.strongObject;
if (delegate && [delegate respondsToSelector:@selector(didReceiveCommunicationFromDevice:)]) {
dispatch_async(_delegateQueue, ^{
[delegate didReceiveCommunicationFromDevice:self];
});
}

// in case this is called dyring exponential back off of subscription
// reestablishment, this starts the attempt right away
[self _setupSubscription];

os_unfair_lock_unlock(&self->_lock);
}

// assume lock is held
- (void)_reportAttributes:(NSArray<NSDictionary<NSString *, id> *> *)attributes
{
Expand Down Expand Up @@ -444,6 +460,13 @@ - (void)_setupSubscription
// OnDone
[self _handleSubscriptionReset];
});
},
^(void) {
MTR_LOG_INFO("%@ got unsolicited message from publisher", self);
dispatch_async(self.queue, ^{
// OnUnsolicitedMessageFromPublisher
[self _handleUnsolicitedMessageFromPublisher];
});
});

// Set up a cluster state cache. We really just want this for the
Expand Down

0 comments on commit 2063105

Please sign in to comment.