Skip to content

Commit

Permalink
Add initial resubscribe capability
Browse files Browse the repository at this point in the history
  • Loading branch information
yunhanw-google committed Jan 29, 2022
1 parent 6ba06b2 commit 8e9eab1
Show file tree
Hide file tree
Showing 14 changed files with 379 additions and 37 deletions.
5 changes: 5 additions & 0 deletions src/app/AttributeCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,11 @@ class AttributeCache : protected ReadClient::Callback
void OnDone() override { return mCallback.OnDone(); }
void OnSubscriptionEstablished(uint64_t aSubscriptionId) override { mCallback.OnSubscriptionEstablished(aSubscriptionId); }

void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override
{
return mCallback.OnDeallocatePaths(std::move(aReadPrepareParams));
}

private:
Callback & mCallback;
NodeState mCache;
Expand Down
5 changes: 5 additions & 0 deletions src/app/BufferedReadCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ class BufferedReadCallback : public ReadClient::Callback
void OnDone() override { return mCallback.OnDone(); }
void OnSubscriptionEstablished(uint64_t aSubscriptionId) override { mCallback.OnSubscriptionEstablished(aSubscriptionId); }

void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override
{
return mCallback.OnDeallocatePaths(std::move(aReadPrepareParams));
}

private:
/*
* Given a reader positioned at a list element, allocate a packet buffer, copy the list item where
Expand Down
145 changes: 136 additions & 9 deletions src/app/ReadClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,55 @@
*
*/

#include "lib/core/CHIPTLVTypes.h"
#include <app/AppBuildConfig.h>
#include <app/InteractionModelEngine.h>
#include <app/ReadClient.h>
#include <app/StatusResponse.h>
#include <lib/core/CHIPTLVTypes.h>
#include <lib/support/FibonacciUtils.h>

namespace chip {
namespace app {

/**
* @brief The default resubscribe policy will pick a random timeslot
* with millisecond resolution over an ever increasing window,
* following a fibonacci sequence up to CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX,
* Average of the randomized wait time past the CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX
* will be around one hour.
* When the retry count resets to 0, the sequence starts from the beginning again.
*/
static void DefaultResubscribePolicy(uint32_t aNumCumulativeRetries, uint32_t & aNextSubscriptionIntervalMsec,
bool & aShouldResubscribe)
{
uint32_t maxWaitTimeInMsec = 0;
uint32_t waitTimeInMsec = 0;
uint32_t minWaitTimeInMsec = 0;

if (aNumCumulativeRetries <= CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX)
{
maxWaitTimeInMsec = GetFibonacciForIndex(aNumCumulativeRetries) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS;
}
else
{
maxWaitTimeInMsec = CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS;
}

if (maxWaitTimeInMsec != 0)
{
minWaitTimeInMsec = (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100;
waitTimeInMsec = minWaitTimeInMsec + (Crypto::GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec));
}

aNextSubscriptionIntervalMsec = waitTimeInMsec;
aShouldResubscribe = true;
ChipLogProgress(DataManagement,
"Computing Resubscribe policy: attempts %" PRIu32 ", max wait time %" PRIu32 " ms, selected wait time %" PRIu32
" ms",
aNumCumulativeRetries, maxWaitTimeInMsec, waitTimeInMsec);
return;
}

ReadClient::ReadClient(InteractionModelEngine * apImEngine, Messaging::ExchangeManager * apExchangeMgr, Callback & apCallback,
InteractionType aInteractionType) :
mpCallback(apCallback)
Expand All @@ -48,14 +88,32 @@ ReadClient::ReadClient(InteractionModelEngine * apImEngine, Messaging::ExchangeM
}
}

void ReadClient::ClearActiveSubscriptionState()
{
mIsInitialReport = true;
mIsPrimingReports = true;
mPendingMoreChunks = false;
mMinIntervalFloorSeconds = 0;
mMaxIntervalCeilingSeconds = 0;
mSubscriptionId = 0;
MoveToState(ClientState::Idle);
}

void ReadClient::StopResubscription()
{
ClearActiveSubscriptionState();
CancelLivenessCheckTimer();
CancelResubscribeTimer();
mpCallback.OnDeallocatePaths(std::move(mReadPrepareParams));
}

ReadClient::~ReadClient()
{
Abort();

if (IsSubscriptionType())
{
CancelLivenessCheckTimer();

//
// Only remove ourselves from the engine's tracker list if we still continue to have a valid pointer to it.
// This won't be the case if the engine shut down before this destructor was called (in which case, mpImEngine
Expand Down Expand Up @@ -84,9 +142,18 @@ void ReadClient::Close(CHIP_ERROR aError)

if (aError != CHIP_NO_ERROR)
{
if (ResubscribeIfNeeded())
{
ClearActiveSubscriptionState();
return;
}
mpCallback.OnError(aError);
}

if (mReadPrepareParams.mResubscribePolicy != nullptr)
{
StopResubscription();
}
mpCallback.OnDone();
}

Expand Down Expand Up @@ -598,22 +665,29 @@ void ReadClient::CancelLivenessCheckTimer()
OnLivenessTimeoutCallback, this);
}

void ReadClient::CancelResubscribeTimer()
{
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
OnResubscribeTimerCallback, this);
}

void ReadClient::OnLivenessTimeoutCallback(System::Layer * apSystemLayer, void * apAppState)
{
ReadClient * const client = reinterpret_cast<ReadClient *>(apAppState);
ReadClient * const _this = reinterpret_cast<ReadClient *>(apAppState);

//
// Might as well try to see if this instance exists in the tracked list in the IM.
// This might blow-up if either the client has since been free'ed (use-after-free), or if the engine has since
// been shutdown at which point the client wouldn't exist in the active read client list.
//
VerifyOrDie(client->mpImEngine->InActiveReadClientList(client));
VerifyOrDie(_this->mpImEngine->InActiveReadClientList(_this));

ChipLogError(DataManagement, "Subscription Liveness timeout with peer node 0x%" PRIx64 ", shutting down ", client->mPeerNodeId);
ChipLogError(DataManagement, "Subscription Liveness timeout with subscription id 0x%" PRIx64 " peer node 0x%" PRIx64,
_this->mSubscriptionId, _this->mPeerNodeId);

// TODO: add a more specific error here for liveness timeout failure to distinguish between other classes of timeouts (i.e
// response timeouts).
client->Close(CHIP_ERROR_TIMEOUT);
_this->Close(CHIP_ERROR_TIMEOUT);
}

CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aPayload)
Expand Down Expand Up @@ -644,6 +718,22 @@ CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aP
return CHIP_NO_ERROR;
}

CHIP_ERROR ReadClient::SendAutoResubscribeRequest(ReadPrepareParams && aReadPrepareParams)
{
mReadPrepareParams = std::move(aReadPrepareParams);
if (mReadPrepareParams.mResubscribePolicy == nullptr)
{
mReadPrepareParams.mResubscribePolicy = DefaultResubscribePolicy;
}

CHIP_ERROR err = SendSubscribeRequest(mReadPrepareParams);
if (err != CHIP_NO_ERROR)
{
StopResubscription();
}
return err;
}

CHIP_ERROR ReadClient::SendSubscribeRequest(ReadPrepareParams & aReadPrepareParams)
{
CHIP_ERROR err = CHIP_NO_ERROR;
Expand All @@ -658,7 +748,6 @@ CHIP_ERROR ReadClient::SendSubscribeRequest(ReadPrepareParams & aReadPreparePara

VerifyOrReturnError(aReadPrepareParams.mMinIntervalFloorSeconds <= aReadPrepareParams.mMaxIntervalCeilingSeconds,
err = CHIP_ERROR_INVALID_ARGUMENT);

writer.Init(std::move(msgBuf));

ReturnErrorOnFailure(request.Init(&writer));
Expand Down Expand Up @@ -717,5 +806,43 @@ CHIP_ERROR ReadClient::SendSubscribeRequest(ReadPrepareParams & aReadPreparePara
return CHIP_NO_ERROR;
}

}; // namespace app
}; // namespace chip
void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void * apAppState)
{
ReadClient * const _this = reinterpret_cast<ReadClient *>(apAppState);
assert(_this != nullptr);
_this->SendSubscribeRequest(_this->mReadPrepareParams);
_this->mNumRetries++;
}

bool ReadClient::ResubscribeIfNeeded()
{
bool shouldResubscribe = true;
uint32_t intervalMsec = 0;
if (mReadPrepareParams.mResubscribePolicy == nullptr)
{
ChipLogProgress(DataManagement, "mResubscribePolicy is null");
return false;
}
mReadPrepareParams.mResubscribePolicy(mNumRetries, intervalMsec, shouldResubscribe);
if (!shouldResubscribe)
{
ChipLogProgress(DataManagement, "Resubscribe has been stopped");
return false;
}
CHIP_ERROR err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
System::Clock::Milliseconds32(intervalMsec), OnResubscribeTimerCallback, this);
if (err != CHIP_NO_ERROR)
{
ChipLogProgress(DataManagement, "Fail to resubscribe with error %" CHIP_ERROR_FORMAT, err.Format());
return false;
}
else
{
ChipLogProgress(DataManagement, "Will try to Resubscribe at retry index %" PRIu32 " after %" PRIu32 "ms", mNumRetries,
intervalMsec);
}
return true;
}

} // namespace app
} // namespace chip
34 changes: 33 additions & 1 deletion src/app/ReadClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ class ReadClient : public Messaging::ExchangeDelegate
*
*/
virtual void OnDone() = 0;

/**
* This function is invoked when using SendAutoResubscribeRequest, where the ReadClient was configured to auto re-subscribe
* and the ReadPrepareParams was moved into this client for management. This will have to be free'ed appropriately by the
* application. If SendAutoResubscribeRequest fails, this function will be called before it returns the failure. If
* SendAutoResubscribeRequest succeeds, this function will be called immediately before calling OnDone. If
* SendAutoResubscribeRequest is not called, this function will not be called.
*/
virtual void OnDeallocatePaths(ReadPrepareParams && aReadPrepareParams) {}
};

enum class InteractionType : uint8_t
Expand Down Expand Up @@ -232,6 +241,21 @@ class ReadClient : public Messaging::ExchangeDelegate
ReadClient * GetNextClient() { return mpNext; }
void SetNextClient(ReadClient * apClient) { mpNext = apClient; }

// Like SendSubscribeRequest, but the ReadClient will automatically attempt to re-establish the subscription if
// we decide that the subscription has dropped. The exact behavior of the re-establishment can be controlled
// by setting mResubscribePolicy in the ReadPrepareParams. If not set, a default behavior with exponential backoff will be
// used.
//
// The application has to know to
// a) allocate a ReadPrepareParams object that will have fields mpEventPathParamsList and mpAttributePathParamsList with
// lifetimes as long as the ReadClient itself and b) free those up later in the call to OnDeallocatePaths. Note: At a given
// time in the system, you can either have a single subscription with re-sub enabled that that has mKeepSubscriptions = false,
// OR, multiple subs with re-sub enabled with mKeepSubscriptions = true. You shall not have a mix of both simultaneously.
// If SendAutoResubscribeRequest is called at all, it guarantees that it will call OnDeallocatePaths when OnDone is called.
// SendAutoResubscribeRequest is the only case that calls OnDeallocatePaths, since that's the only case when the consumer moved
// a ReadParams into the client.
CHIP_ERROR SendAutoResubscribeRequest(ReadPrepareParams && aReadPrepareParams);

private:
friend class TestReadInteraction;
friend class InteractionModelEngine;
Expand Down Expand Up @@ -274,15 +298,18 @@ class ReadClient : public Messaging::ExchangeDelegate
CHIP_ERROR ProcessSubscribeResponse(System::PacketBufferHandle && aPayload);
CHIP_ERROR RefreshLivenessCheckTimer();
void CancelLivenessCheckTimer();
void CancelResubscribeTimer();
void MoveToState(const ClientState aTargetState);
CHIP_ERROR ProcessAttributePath(AttributePathIB::Parser & aAttributePath, ConcreteDataAttributePath & aClusterInfo);
CHIP_ERROR ProcessReportData(System::PacketBufferHandle && aPayload);
const char * GetStateStr() const;

bool ResubscribeIfNeeded();
// Specialized request-sending functions.
CHIP_ERROR SendReadRequest(ReadPrepareParams & aReadPrepareParams);
CHIP_ERROR SendSubscribeRequest(ReadPrepareParams & aSubscribePrepareParams);

static void OnResubscribeTimerCallback(System::Layer * apSystemLayer, void * apAppState);

/*
* Called internally to signal the completion of all work on this object, gracefully close the
* exchange and finally, signal to the application that it's
Expand All @@ -293,6 +320,9 @@ class ReadClient : public Messaging::ExchangeDelegate
*/
void Close(CHIP_ERROR aError);

void StopResubscription();
void ClearActiveSubscriptionState();

Messaging::ExchangeManager * mpExchangeMgr = nullptr;
Messaging::ExchangeContext * mpExchangeCtx = nullptr;
Callback & mpCallback;
Expand All @@ -311,6 +341,8 @@ class ReadClient : public Messaging::ExchangeDelegate

ReadClient * mpNext = nullptr;
InteractionModelEngine * mpImEngine = nullptr;
ReadPrepareParams mReadPrepareParams;
uint32_t mNumRetries = 0;
};

}; // namespace app
Expand Down
5 changes: 5 additions & 0 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ CHIP_ERROR ReadHandler::OnInitialRequest(System::PacketBufferHandle && aPayload)
{
Close();
}
else
{
// Mark read handler dirty for read/subscribe priming stage
mDirty = true;
}

return err;
}
Expand Down
15 changes: 14 additions & 1 deletion src/app/ReadPrepareParams.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@

namespace chip {
namespace app {
/**
* @brief Used to specify the re-subscription policy. Namely, the method is invoked and provided the number of
* retries that have occurred so far.
*
* aShouldResubscribe and aNextSubscriptionIntervalMsec are outparams indicating whether and how long into
* the future a re-subscription should happen.
*/
typedef void (*OnResubscribePolicyCB)(uint32_t aNumCumulativeRetries, uint32_t & aNextSubscriptionIntervalMsec,
bool & aShouldResubscribe);

struct ReadPrepareParams
{
SessionHolder mSessionHolder;
Expand All @@ -40,7 +50,9 @@ struct ReadPrepareParams
uint16_t mMaxIntervalCeilingSeconds = 0;
bool mKeepSubscriptions = true;
bool mIsFabricFiltered = false;
OnResubscribePolicyCB mResubscribePolicy = nullptr;

ReadPrepareParams() {}
ReadPrepareParams(const SessionHandle & sessionHandle) { mSessionHolder.Grab(sessionHandle); }
ReadPrepareParams(ReadPrepareParams && other) : mSessionHolder(other.mSessionHolder)
{
Expand All @@ -58,6 +70,7 @@ struct ReadPrepareParams
other.mEventPathParamsListSize = 0;
other.mpAttributePathParamsList = nullptr;
other.mAttributePathParamsListSize = 0;
mResubscribePolicy = other.mResubscribePolicy;
}

ReadPrepareParams & operator=(ReadPrepareParams && other)
Expand All @@ -80,7 +93,7 @@ struct ReadPrepareParams
other.mEventPathParamsListSize = 0;
other.mpAttributePathParamsList = nullptr;
other.mAttributePathParamsListSize = 0;

mResubscribePolicy = other.mResubscribePolicy;
return *this;
}
};
Expand Down
Loading

0 comments on commit 8e9eab1

Please sign in to comment.