Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Establish CASE on re-subscription #20080

Merged
merged 14 commits into from
Jul 27, 2022
Prev Previous commit
Next Next commit
Review feedback
mrjerryjohns committed Jul 26, 2022
commit 10ab0537c7421ee6f1a4516c9ede9e3c437f9efd
1 change: 1 addition & 0 deletions examples/bridge-app/esp32/main/main.cpp
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@
#include <lib/support/CHIPMemString.h>
#include <lib/support/ErrorStr.h>

#include <app/InteractionModelEngine.h>
#include <app/server/Server.h>

#if CONFIG_ENABLE_ESP32_FACTORY_DATA_PROVIDER
2 changes: 0 additions & 2 deletions src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
@@ -125,8 +125,6 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
*/
CASESessionManager * GetCASESessionManager() const { return mpCASESessionMgr; }
mrjerryjohns marked this conversation as resolved.
Show resolved Hide resolved

FabricTable * GetFabricTable() const { return mpFabricTable; }

/**
* Tears down an active subscription.
*
37 changes: 29 additions & 8 deletions src/app/ReadClient.cpp
Original file line number Diff line number Diff line change
@@ -121,7 +121,7 @@ CHIP_ERROR ReadClient::ScheduleResubscription(uint32_t aTimeTillNextResubscripti
VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE);

//
// If we're establishing CASE, make sure we not provided a new SessionHandle as well.
// If we're establishing CASE, make sure we are not provided a new SessionHandle as well.
//
VerifyOrReturnError(!aReestablishCASE || !aNewSessionHandle.HasValue(), CHIP_ERROR_INVALID_ARGUMENT);

@@ -553,12 +553,17 @@ CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload)
exit:
if (IsSubscriptionType())
{
if (IsAwaitingInitialReport() || IsAwaitingSubscribeResponse())
if (IsAwaitingInitialReport())
{
MoveToState(ClientState::AwaitingSubscribeResponse);
}
else
else if (IsSubscriptionActive())
{
//
// Only refresh the liveness check timer if we've successfully established
// a subscription and have a valid value for mMaxInterval which the function
// relies on.
//
RefreshLivenessCheckTimer();
}
}
@@ -697,7 +702,7 @@ CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsRea
//
// Update the event number being tracked in mReadPrepareParams in case
// we want to send it in the next SubscribeRequest message to convey
// the event number till which we've already received events for.
// the event number for which we have already received an event.
//
mReadPrepareParams.mEventNumber.SetValue(header.mEventNumber + 1);
mrjerryjohns marked this conversation as resolved.
Show resolved Hide resolved

@@ -974,6 +979,7 @@ void ReadClient::HandleDeviceConnected(void * context, OperationalDeviceProxy *
ReadClient * const _this = static_cast<ReadClient *>(context);
VerifyOrDie(_this != nullptr);

ChipLogProgress(DataManagement, "HandleDeviceConnected %d\n", device->GetSecureSession().HasValue());
_this->mReadPrepareParams.mSessionHolder.Grab(device->GetSecureSession().Value());

auto err = _this->SendSubscribeRequest(_this->mReadPrepareParams);
yunhanw-google marked this conversation as resolved.
Show resolved Hide resolved
@@ -990,9 +996,6 @@ void ReadClient::HandleDeviceConnectionFailure(void * context, const ScopedNodeI

ChipLogError(DataManagement, "Failed to establish CASE for re-subscription with error '%" CHIP_ERROR_FORMAT "'", err.Format());

auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager();
VerifyOrDie(caseSessionManager != nullptr);

_this->Close(err);
}

@@ -1011,6 +1014,24 @@ void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void
auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager();
VerifyOrExit(caseSessionManager != nullptr, err = CHIP_ERROR_INCORRECT_STATE);

//
// We need to mark our session as defunct explicitly since the assessment of a liveness failure
// is usually triggered by the absence of any exchange activity that would have otherwise
// automatically marked the session as defunct on a response timeout.
//
// Doing so will ensure that the subsequent call to FindOrEstablishSession will not bind to
// an existing established session but rather trigger establishing a new one.
//
if (_this->mReadPrepareParams.mSessionHolder)
{
_this->mReadPrepareParams.mSessionHolder.Get().Value()->AsSecureSession()->MarkAsDefunct();
mrjerryjohns marked this conversation as resolved.
Show resolved Hide resolved
}

//
// TODO: Until #19259 is merged, we cannot actually just get by with the above logic since marking sessions
// defunct has no effect on resident OperationalDeviceProxy instances that are already bound
// to a now-defunct CASE session.
//
auto proxy = caseSessionManager->FindExistingSession(_this->mPeer);
if (proxy != nullptr)
{
@@ -1033,7 +1054,7 @@ void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void
//
// In that case, don't permit re-subscription to occur.
//
_this->Close(err, err != CHIP_ERROR_INVALID_FABRIC_INDEX && err != CHIP_ERROR_INCORRECT_STATE);
_this->Close(err, err != CHIP_ERROR_INCORRECT_STATE);
}
}

13 changes: 7 additions & 6 deletions src/app/ReadClient.h
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@ class InteractionModelEngine;
* When used to manage subscriptions, the client provides functionality to automatically re-subscribe as needed,
* including re-establishing CASE under certain conditions (see Callback::OnResubscriptionNeeded for more info).
* This is the default behavior. A consumer can completely opt-out of this behavior by over-riding
* Callback::OnResubscriptionNeeded and providing an alternative implementation.
* Callback::OnResubscriptionNeeded and providing an alternative implementation.
*
*/
class ReadClient : public Messaging::ExchangeDelegate
@@ -191,6 +191,7 @@ class ReadClient : public Messaging::ExchangeDelegate
* - Be called even in error circumstances.
* - Only be called after a successful call to SendRequest has been
* made, when the read completes or the subscription is shut down.
*
* @param[in] apReadClient the ReadClient for the completed interaction.
*/
virtual void OnDone(ReadClient * apReadClient) = 0;
@@ -333,7 +334,7 @@ class ReadClient : public Messaging::ExchangeDelegate
* a) allocate a ReadPrepareParams object that will have fields mpEventPathParamsList and mpAttributePathParamsList and
* mpDataVersionFilterList with lifetimes as long as the ReadClient itself and b) free those up later in the call to
* OnDeallocatePaths. Note: At a given time in the system, you can either have a single subscription with re-sub enabled that
* that has mKeepSubscriptions = false, OR, multiple subs with re-sub enabled with mKeepSubscriptions = true. You shall not
* has mKeepSubscriptions = false, OR, multiple subs with re-sub enabled with mKeepSubscriptions = true. You shall not
* have a mix of both simultaneously. If SendAutoResubscribeRequest is called at all, it guarantees that it will call
* OnDeallocatePaths when OnDone is called. SendAutoResubscribeRequest is the only case that calls OnDeallocatePaths, since
* that's the only case when the consumer moved a ReadParams into the client.
@@ -351,22 +352,22 @@ class ReadClient : public Messaging::ExchangeDelegate
CHIP_ERROR DefaultResubscribePolicy(CHIP_ERROR aTerminationCause);

/**
* Computes the time till the next re-subscription with millisecond resolution over
* Computes the time, in milliseconds, until the next re-subscription over
* an ever increasing window following a fibonacci sequence with the current retry count
* used as input to the fibonacci algorithm.
*
* CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX is used as the maximum ceiling for that input.
* CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX is the maximum value the retry count can tick up to.
*
*/
uint32_t ComputeTimeTillNextSubscription();

/**
* Schedules a re-subscription aTimeTillNextResubscriptionMs into the future.
*
* If an application wants to setup CASE on their own, they should call ComputeTimeTillNextSubscription() to compute the next
* If an application wants to set up CASE on their own, they should call ComputeTimeTillNextSubscription() to compute the next
* interval at which they should attempt CASE and attempt CASE at that time. On successful CASE establishment, this method
* should be called with the new SessionHandle provided through 'aNewSessionHandle', 'aTimeTillNextResubscriptionMs' set to 0
* (i.e re-subscribe immediately) and 'aReestablishCASE' set to false.
* (i.e async, but as soon as possible) and 'aReestablishCASE' set to false.
*
* Otherwise, if aReestablishCASE is true, operational discovery and CASE will be attempted at that time before
* the actual IM interaction is initiated.
16 changes: 6 additions & 10 deletions src/controller/java/AndroidCallbacks.cpp
Original file line number Diff line number Diff line change
@@ -547,29 +547,25 @@ void ReportEventCallback::OnSubscriptionEstablished(SubscriptionId aSubscription
JniReferences::GetInstance().CallSubscriptionEstablished(mSubscriptionEstablishedCallbackRef);
}

void ReportEventCallback::OnResubscriptionAttempt(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause)
CHIP_ERROR ReportEventCallback::OnResubscriptionNeeded(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause)
{
VerifyOrReturn(mResubscriptionAttemptCallbackRef != nullptr,
ChipLogError(Controller, "mResubscriptionAttemptCallbackRef is null"));

CHIP_ERROR err = CHIP_NO_ERROR;
JNIEnv * env = JniReferences::GetInstance().GetEnvForCurrentThread();

err = app::ReadClient::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause);
if (err != CHIP_NO_ERROR)
{
ReportError(nullptr, ErrorStr(err), err.AsInteger());
return;
}
ReturnErrorOnFailure(app::ReadClient::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause));

jmethodID onResubscriptionAttemptMethod;
err = JniReferences::GetInstance().FindMethod(env, mResubscriptionAttemptCallbackRef, "onResubscriptionAttempt", "(II)V",
&onResubscriptionAttemptMethod);
VerifyOrReturn(err == CHIP_NO_ERROR, ChipLogError(Controller, "Could not find onResubscriptionAttempt method"));
ReturnLogErrorOnFailure(JniReferences::GetInstance().FindMethod(
env, mResubscriptionAttemptCallbackRef, "onResubscriptionAttempt", "(II)V", &onResubscriptionAttemptMethod));

DeviceLayer::StackUnlock unlock;
env->CallVoidMethod(mResubscriptionAttemptCallbackRef, onResubscriptionAttemptMethod, aTerminationCause.AsInteger(),
apReadClient->ComputeTimeTillNextSubscription());

return CHIP_NO_ERROR;
}

void ReportEventCallback::ReportError(jobject attributePath, CHIP_ERROR err)
2 changes: 1 addition & 1 deletion src/controller/java/AndroidCallbacks.h
Original file line number Diff line number Diff line change
@@ -98,7 +98,7 @@ struct ReportEventCallback : public app::ReadClient::Callback

void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override;

void OnResubscriptionAttempt(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override;
CHIP_ERROR OnResubscriptionNeeded(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override;

/** Report errors back to Java layer. attributePath may be nullptr for general errors. */
void ReportError(jobject eventPath, CHIP_ERROR err);
8 changes: 1 addition & 7 deletions src/controller/python/test/test_scripts/base.py
Original file line number Diff line number Diff line change
@@ -766,7 +766,7 @@ async def TestResubscription(self, nodeid: int):
resubAttempted = False
resubSucceeded = True

async def OnResubscriptionAttempted(transaction, errorEncountered: int, nextResubsribeIntervalMsec: int):
async def OnResubscriptionAttempted(transaction, errorEncountered: int, nextResubscribeIntervalMsec: int):
self.logger.info("Re-subscription Attempted")
nonlocal resubAttempted
resubAttempted = True
@@ -785,12 +785,6 @@ async def OnResubscriptionSucceeded(transaction):
subscription.SetResubscriptionAttemptedCallback(OnResubscriptionAttempted, True)
subscription.SetResubscriptionSucceededCallback(OnResubscriptionSucceeded, True)

#
# Now, let's go and expire the session to the node. That will immediately prevent
# future reports from the server from being dispatched to the client logic.
#
self.devCtrl.ExpireSessions(nodeid)

#
# Over-ride the default liveness timeout (which is set quite high to accomodate for
# transport delays) to something very small. This ensures that our liveness timer will