Skip to content

Commit

Permalink
Addressed PR review comments, including:
Browse files Browse the repository at this point in the history
 - ReadHandler constructor side effect moved to separate function
 - SubscriptionList and SubscriptionIndex member initialization moved to runtome
 - Improved error handling - remove stored info on error
 - Changed for loop indices to more descriptive names
 - Disabled feature by default except Linux and Darwin for CI testing
 - Added operator[] getter with index to access SubscriptionList and SubscriptionIndex elements
  • Loading branch information
jtung-apple committed Jan 20, 2023
1 parent 87e4afe commit c813234
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 149 deletions.
27 changes: 14 additions & 13 deletions src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1588,39 +1588,40 @@ CHIP_ERROR InteractionModelEngine::ResumeSubscriptions()
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
SubscriptionResumptionStorage::SubscriptionIndex subscriberIndex;
CHIP_ERROR err = mpSubscriptionResumptionStorage->LoadIndex(subscriberIndex);
ChipLogProgress(InteractionModel, "%u subscriber nodes to resume.. (error %" CHIP_ERROR_FORMAT ")",
static_cast<unsigned>(subscriberIndex.mSize), err.Format());
ReturnErrorOnFailure(err);
ReturnLogErrorOnFailure(err);
ChipLogProgress(InteractionModel, "%u subscriber nodes to resume..", static_cast<unsigned>(subscriberIndex.mSize));

for (size_t i = 0; i < subscriberIndex.mSize; i++)
for (size_t currentNodeIndex = 0; currentNodeIndex < subscriberIndex.mSize; currentNodeIndex++)
{
SubscriptionResumptionStorage::SubscriptionList subscriptions;

err = mpSubscriptionResumptionStorage->FindByScopedNodeId(subscriberIndex.mNodes[i], subscriptions);
err = mpSubscriptionResumptionStorage->FindByScopedNodeId(subscriberIndex.mNodes[currentNodeIndex], subscriptions);
ReturnLogErrorOnFailure(err);

ChipLogProgress(
InteractionModel, "\tNode " ChipLogFormatScopedNodeId ": Loaded %u subscriptions.. (error %" CHIP_ERROR_FORMAT ")",
ChipLogValueScopedNodeId(subscriberIndex.mNodes[i]), static_cast<unsigned>(subscriptions.mSize), err.Format());

ReturnErrorOnFailure(err);

for (size_t j = 0; j < subscriptions.mSize; j++)
InteractionModel, "\tNode " ChipLogFormatScopedNodeId ": Loaded %u subscriptions..",
ChipLogValueScopedNodeId(subscriberIndex.mNodes[currentNodeIndex]), static_cast<unsigned>(subscriptions.mSize));
for (size_t currentSubscriptionIndex = 0; currentSubscriptionIndex < subscriptions.mSize; currentSubscriptionIndex++)
{
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo = subscriptions.mSubscriptions[j];
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo = subscriptions.mSubscriptions[currentSubscriptionIndex];
auto requestedAttributePathCount = subscriptionInfo.mAttributePaths.AllocatedCount();
auto requestedEventPathCount = subscriptionInfo.mEventPaths.AllocatedCount();
if (!EnsureResourceForSubscription(subscriptionInfo.mFabricIndex, requestedAttributePathCount, requestedEventPathCount))
{
ChipLogProgress(InteractionModel, "no resource for Subscription resumption");
mpSubscriptionResumptionStorage->Delete(subscriptionInfo);
return CHIP_ERROR_NO_MEMORY;
}

ReadHandler * handler = mReadHandlers.CreateObject(*this, subscriptionInfo);
ReadHandler * handler = mReadHandlers.CreateObject(*this);
if (handler == nullptr)
{
ChipLogProgress(InteractionModel, "no resource for ReadHandler creation");
mpSubscriptionResumptionStorage->Delete(subscriptionInfo);
return CHIP_ERROR_NO_MEMORY;
}

handler->ResumeSubscription(mpCASESessionMgr, subscriptionInfo);
}
}
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
Expand Down
29 changes: 13 additions & 16 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,23 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon
mSessionHandle.Grab(mExchangeCtx->GetSessionHandle());
}

ReadHandler::ReadHandler(ManagementCallback & apCallback, SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo) :
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
ReadHandler::ReadHandler(ManagementCallback & apCallback) :
mExchangeCtx(*this), mManagementCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this),
mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
{
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
mInteractionType = InteractionType::Subscribe;
mFlags.ClearAll();
}

void ReadHandler::ResumeSubscription(CASESessionManager *caseSessionManager, SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo)
{
mSubscriptionId = subscriptionInfo.mSubscriptionId;
mMinIntervalFloorSeconds = subscriptionInfo.mMinInterval;
mMaxInterval = subscriptionInfo.mMaxInterval;
SetStateFlag(ReadHandlerFlags::FabricFiltered, subscriptionInfo.mFabricFiltered);

// Move dynamically allocated attributes and events from the SubscriptionInfo struct into the object pool managed by the IM engine
CHIP_ERROR err;
for (size_t i = 0; i < subscriptionInfo.mAttributePaths.AllocatedCount(); i++)
{
Expand All @@ -95,21 +101,12 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, SubscriptionResumption
}
}

// Ask IM engine to start case with
auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager();
if (caseSessionManager)
{
ScopedNodeId peerNode = ScopedNodeId(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex);
caseSessionManager->FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback);
}
else
{
// TODO: Investigate if need to consider if caseSessionManager does not exist
Close();
}
// Ask IM engine to start CASE session with subscriber
ScopedNodeId peerNode = ScopedNodeId(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex);
caseSessionManager->FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback);
}

#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
}

ReadHandler::~ReadHandler()
{
Expand Down Expand Up @@ -371,7 +368,7 @@ void ReadHandler::OnResponseTimeout(Messaging::ExchangeContext * apExchangeConte
ChipLogValueExchange(apExchangeContext));
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
// TODO: Have a retry mechanism tied to wake interval for IC devices
Close(true);
CloseButKeepPersisted();
#else
Close();
#endif
Expand Down
17 changes: 15 additions & 2 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <app/AttributeAccessInterface.h>
#include <app/AttributePathExpandIterator.h>
#include <app/AttributePathParams.h>
#include <app/CASESessionManager.h>
#include <app/DataVersionFilter.h>
#include <app/EventManagement.h>
#include <app/EventPathParams.h>
Expand Down Expand Up @@ -166,14 +167,16 @@ class ReadHandler : public Messaging::ExchangeDelegate
*/
ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeContext * apExchangeContext, InteractionType aInteractionType);

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
/**
*
* Constructor for resuming a persisted subscription
* Constructor in preparation for resuming a persisted subscription
*
* The callback passed in has to outlive this handler object.
*
*/
ReadHandler(ManagementCallback & apCallback, SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo);
ReadHandler(ManagementCallback & apCallback);
#endif

const ObjectList<AttributePathParams> * GetAttributePathList() const { return mpAttributePathList; }
const ObjectList<EventPathParams> * GetEventPathList() const { return mpEventPathList; }
Expand Down Expand Up @@ -253,6 +256,15 @@ class ReadHandler : public Messaging::ExchangeDelegate
*/
void OnInitialRequest(System::PacketBufferHandle && aPayload);

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
/**
*
* Resume a persisted subscription
*
*/
void ResumeSubscription(CASESessionManager *caseSessionManager, SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo);
#endif

/**
* Send ReportData to initiator
*
Expand Down Expand Up @@ -372,6 +384,7 @@ class ReadHandler : public Messaging::ExchangeDelegate
* @param keepPersisted Keep the subscription persisted in storage for later resumption
*/
void Close(bool keepPersisted = false);
void CloseButKeepPersisted() { Close(true); }

static void OnUnblockHoldReportCallback(System::Layer * apSystemLayer, void * apAppState);
static void OnRefreshSubscribeTimerSyncCallback(System::Layer * apSystemLayer, void * apAppState);
Expand Down
Loading

0 comments on commit c813234

Please sign in to comment.