Skip to content

Commit

Permalink
Use cached event number in ReadClient
Browse files Browse the repository at this point in the history
-- Update EventNumber with optional in ReadPrepareParams
-- If event number is provided by user, then read  client uses it to construct event filter and update it, when resubscribe happens, it would use updated event number. If event number is not provide by user, client would try to fetch the highest event number from cache, and when resubscribe happens, it would automatically retrieve the latest event number from cache.
-- Add GetHighestReceivedEventNumber function in ClusterStateCache so
that user can retrieve the highest recieved event number and construct the event filter manually later if user don't wanna the cache, and user could retrieve the event data using the range from x to mHighestReceivedEventNumber later.
  • Loading branch information
yunhanw-google committed Apr 17, 2022
1 parent 4967c0e commit f621005
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 37 deletions.
2 changes: 1 addition & 1 deletion examples/chip-tool/commands/clusters/ReportCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ class ReportCommand : public ModelCommand, public chip::app::ReadClient::Callbac
chip::app::ReadPrepareParams params(device->GetSecureSession().Value());
params.mpEventPathParamsList = eventPathParams;
params.mEventPathParamsListSize = pathsCount;
params.mEventNumber = mEventNumber.ValueOr(0);
params.mEventNumber = mEventNumber;
params.mpAttributePathParamsList = nullptr;
params.mAttributePathParamsListSize = 0;

Expand Down
4 changes: 4 additions & 0 deletions src/app/BufferedReadCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ class BufferedReadCallback : public ReadClient::Callback
return mCallback.OnUpdateDataVersionFilterList(aDataVersionFilterIBsBuilder, aAttributePaths, aEncodedDataVersionList);
}

virtual CHIP_ERROR GetHighestReceivedEventNumber(Optional<EventNumber> & aEventNumber) override
{
return mCallback.GetHighestReceivedEventNumber(aEventNumber);
}
/*
* Given a reader positioned at a list element, allocate a packet buffer, copy the list item where
* the reader is positioned into that buffer and add it to our buffered list for tracking.
Expand Down
1 change: 0 additions & 1 deletion src/app/ClusterStateCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ CHIP_ERROR ClusterStateCache::UpdateEventCache(const EventHeader & aEventHeader,
{
return CHIP_NO_ERROR;
}

System::PacketBufferHandle handle = System::PacketBufferHandle::New(chip::app::kMaxSecureSduLengthBytes);

System::PacketBufferTLVWriter writer;
Expand Down
9 changes: 9 additions & 0 deletions src/app/ClusterStateCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,15 @@ class ClusterStateCache : protected ReadClient::Callback
*/
CHIP_ERROR GetVersion(EndpointId mEndpointId, ClusterId mClusterId, Optional<DataVersion> & aVersion);

/*
* Get highest received event number.
*/
virtual CHIP_ERROR GetHighestReceivedEventNumber(Optional<EventNumber> & aEventNumber) final
{
aEventNumber = mHighestReceivedEventNumber;
return CHIP_NO_ERROR;
}

/*
* Retrieve the value of an event from the cache given an EventNumber by decoding
* it using DataModel::Decode into the in-out argument 'value'.
Expand Down
10 changes: 10 additions & 0 deletions src/app/MessageDef/EventFilterIBs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,15 @@ EventFilterIBs::Builder & EventFilterIBs::Builder::EndOfEventFilters()
EndOfContainer();
return *this;
}

CHIP_ERROR EventFilterIBs::Builder::GenerateEventFilter(EventNumber aEventNumber)
{
EventFilterIB::Builder & eventFilter = CreateEventFilter();
ReturnErrorOnFailure(GetError());
ReturnErrorOnFailure(eventFilter.EventMin(aEventNumber).EndOfEventFilterIB().GetError());
ReturnErrorOnFailure(EndOfEventFilters().GetError());
return CHIP_NO_ERROR;
}

}; // namespace app
}; // namespace chip
6 changes: 6 additions & 0 deletions src/app/MessageDef/EventFilterIBs.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ class Builder : public ArrayBuilder
*/
EventFilterIBs::Builder & EndOfEventFilters();

/**
* @brief Generate single event filter
*
*/
CHIP_ERROR GenerateEventFilter(EventNumber aEventNumber);

private:
EventFilterIB::Builder mEventFilter;
};
Expand Down
50 changes: 30 additions & 20 deletions src/app/ReadClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,16 +234,13 @@ CHIP_ERROR ReadClient::SendReadRequest(ReadPrepareParams & aReadPrepareParams)

ReturnErrorOnFailure(GenerateEventPaths(eventPathListBuilder, eventPaths));

if (aReadPrepareParams.mEventNumber != 0)
Optional<EventNumber> eventMin;
ReturnErrorOnFailure(GetMinEventNumber(aReadPrepareParams, eventMin));
if (eventMin.HasValue())
{
// EventFilter is optional
EventFilterIBs::Builder & eventFilters = request.CreateEventFilters();
ReturnErrorOnFailure(request.GetError());

EventFilterIB::Builder & eventFilter = eventFilters.CreateEventFilter();
ReturnErrorOnFailure(eventFilters.GetError());
ReturnErrorOnFailure(eventFilter.EventMin(aReadPrepareParams.mEventNumber).EndOfEventFilterIB().GetError());
ReturnErrorOnFailure(eventFilters.EndOfEventFilters().GetError());
ReturnErrorOnFailure(err = request.GetError());
ReturnErrorOnFailure(eventFilters.GenerateEventFilter(eventMin.Value()));
}
}

Expand Down Expand Up @@ -695,9 +692,14 @@ CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsRea
header.mTimestamp = mEventTimestamp;
ReturnErrorOnFailure(data.DecodeEventHeader(header));
mEventTimestamp = header.mTimestamp;
mEventMin = header.mEventNumber + 1;

ReturnErrorOnFailure(data.GetData(&dataReader));

if (mReadPrepareParams.mResubscribePolicy != nullptr)
{
mReadPrepareParams.mEventNumber.SetValue(header.mEventNumber + 1);
}

mpCallback.OnEventData(header, &dataReader, nullptr);
}
else if (err == CHIP_END_OF_TLV)
Expand Down Expand Up @@ -876,19 +878,14 @@ CHIP_ERROR ReadClient::SendSubscribeRequest(ReadPrepareParams & aReadPreparePara
ReturnErrorOnFailure(err = eventPathListBuilder.GetError());
ReturnErrorOnFailure(GenerateEventPaths(eventPathListBuilder, eventPaths));

if (aReadPrepareParams.mEventNumber != 0)
Optional<EventNumber> eventMin;
ReturnErrorOnFailure(GetMinEventNumber(aReadPrepareParams, eventMin));
if (eventMin.HasValue())
{
mEventMin = aReadPrepareParams.mEventNumber;
EventFilterIBs::Builder & eventFilters = request.CreateEventFilters();
ReturnErrorOnFailure(err = request.GetError());
ReturnErrorOnFailure(eventFilters.GenerateEventFilter(eventMin.Value()));
}

EventFilterIBs::Builder & eventFilters = request.CreateEventFilters();
ReturnErrorOnFailure(err = request.GetError());
EventFilterIB::Builder & eventFilter = eventFilters.CreateEventFilter();
ReturnErrorOnFailure(err = eventFilters.GetError());
eventFilter.EventMin(mEventMin).EndOfEventFilterIB();
ReturnErrorOnFailure(err = eventFilter.GetError());
eventFilters.EndOfEventFilters();
ReturnErrorOnFailure(err = eventFilters.GetError());
}

ReturnErrorOnFailure(err = request.IsFabricFiltered(aReadPrepareParams.mIsFabricFiltered).GetError());
Expand Down Expand Up @@ -981,5 +978,18 @@ void ReadClient::UpdateDataVersionFilters(const ConcreteDataAttributePath & aPat
}
}
}

CHIP_ERROR ReadClient::GetMinEventNumber(const ReadPrepareParams & aReadPrepareParams, Optional<EventNumber> & aEventMin)
{
if (aReadPrepareParams.mEventNumber.HasValue())
{
aEventMin = aReadPrepareParams.mEventNumber;
return CHIP_NO_ERROR;
}
else
{
return mpCallback.GetHighestReceivedEventNumber(aEventMin);
}
}
} // namespace app
} // namespace chip
11 changes: 10 additions & 1 deletion src/app/ReadClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ class ReadClient : public Messaging::ExchangeDelegate
aEncodedDataVersionList = false;
return CHIP_NO_ERROR;
}

/*
* Get highest received event number.
*/
virtual CHIP_ERROR GetHighestReceivedEventNumber(Optional<EventNumber> & aEventNumber)
{
aEventNumber.ClearValue();
return CHIP_NO_ERROR;
}
};

enum class InteractionType : uint8_t
Expand Down Expand Up @@ -361,6 +370,7 @@ class ReadClient : public Messaging::ExchangeDelegate

void StopResubscription();
void ClearActiveSubscriptionState();
CHIP_ERROR GetMinEventNumber(const ReadPrepareParams & aReadPrepareParams, Optional<EventNumber> & aEventMin);

Messaging::ExchangeManager * mpExchangeMgr = nullptr;
Messaging::ExchangeContext * mpExchangeCtx = nullptr;
Expand All @@ -376,7 +386,6 @@ class ReadClient : public Messaging::ExchangeDelegate
FabricIndex mFabricIndex = kUndefinedFabricIndex;
InteractionType mInteractionType = InteractionType::Read;
Timestamp mEventTimestamp;
EventNumber mEventMin = 0;
bool mSawAttributeReportsInCurrentReport = false;

ReadClient * mpNext = nullptr;
Expand Down
14 changes: 7 additions & 7 deletions src/app/ReadPrepareParams.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ struct ReadPrepareParams
size_t mAttributePathParamsListSize = 0;
DataVersionFilter * mpDataVersionFilterList = nullptr;
size_t mDataVersionFilterListSize = 0;
EventNumber mEventNumber = 0;
System::Clock::Timeout mTimeout = kImMessageTimeout;
uint16_t mMinIntervalFloorSeconds = 0;
uint16_t mMaxIntervalCeilingSeconds = 0;
bool mKeepSubscriptions = false;
bool mIsFabricFiltered = true;
OnResubscribePolicyCB mResubscribePolicy = nullptr;
Optional<EventNumber> mEventNumber;
System::Clock::Timeout mTimeout = kImMessageTimeout;
uint16_t mMinIntervalFloorSeconds = 0;
uint16_t mMaxIntervalCeilingSeconds = 0;
bool mKeepSubscriptions = false;
bool mIsFabricFiltered = true;
OnResubscribePolicyCB mResubscribePolicy = nullptr;

ReadPrepareParams() {}
ReadPrepareParams(const SessionHandle & sessionHandle) { mSessionHolder.Grab(sessionHandle); }
Expand Down
6 changes: 3 additions & 3 deletions src/app/tests/TestReadInteraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ void TestReadInteraction::TestReadRoundtrip(nlTestSuite * apSuite, void * apCont
readPrepareParams.mEventPathParamsListSize = 1;
readPrepareParams.mpAttributePathParamsList = attributePathParams;
readPrepareParams.mAttributePathParamsListSize = 2;
readPrepareParams.mEventNumber = 1;
readPrepareParams.mEventNumber.SetValue(1);

{
app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate,
Expand Down Expand Up @@ -1062,7 +1062,7 @@ void TestReadInteraction::TestReadRoundtripWithEventStatusIBInEventReport(nlTest
ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice());
readPrepareParams.mpEventPathParamsList = eventPathParams;
readPrepareParams.mEventPathParamsListSize = 1;
readPrepareParams.mEventNumber = 1;
readPrepareParams.mEventNumber.SetValue(1);

MockInteractionModelApp delegate;
NL_TEST_ASSERT(apSuite, !delegate.mGotEventResponse);
Expand Down Expand Up @@ -1093,7 +1093,7 @@ void TestReadInteraction::TestReadRoundtripWithEventStatusIBInEventReport(nlTest
ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice());
readPrepareParams.mpEventPathParamsList = eventPathParams;
readPrepareParams.mEventPathParamsListSize = 1;
readPrepareParams.mEventNumber = 1;
readPrepareParams.mEventNumber.SetValue(1);

MockInteractionModelApp delegate;
NL_TEST_ASSERT(apSuite, !delegate.mGotEventResponse);
Expand Down
56 changes: 55 additions & 1 deletion src/controller/tests/TestEventCaching.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ void TestReadEvents::TestBasicCaching(nlTestSuite * apSuite, void * apContext)

readParams.mpEventPathParamsList = &eventPath;
readParams.mEventPathParamsListSize = 1;
readParams.mEventNumber = firstEventNumber;
readParams.mEventNumber.SetValue(firstEventNumber);

TestReadCallback readCallback;

Expand Down Expand Up @@ -216,6 +216,10 @@ void TestReadEvents::TestBasicCaching(nlTestSuite * apSuite, void * apContext)

NL_TEST_ASSERT(apSuite, generationCount == 5);

Optional<EventNumber> highestEventNumber;
readCallback.mClusterCacheAdapter.GetHighestReceivedEventNumber(highestEventNumber);
NL_TEST_ASSERT(apSuite, highestEventNumber.HasValue() && highestEventNumber.Value() == 4);

//
// Re-run the iterator but pass in a path filter: EP*/TestCluster/EID*
//
Expand Down Expand Up @@ -337,6 +341,10 @@ void TestReadEvents::TestBasicCaching(nlTestSuite * apSuite, void * apContext)

NL_TEST_ASSERT(apSuite, generationCount == 10);

Optional<EventNumber> highestEventNumber;
readCallback.mClusterCacheAdapter.GetHighestReceivedEventNumber(highestEventNumber);
NL_TEST_ASSERT(apSuite, highestEventNumber.HasValue() && highestEventNumber.Value() == 9);

readCallback.mClusterCacheAdapter.ClearEventCache();
generationCount = 0;
readCallback.mClusterCacheAdapter.ForEachEventData([&generationCount](const app::EventHeader & header) {
Expand All @@ -345,6 +353,8 @@ void TestReadEvents::TestBasicCaching(nlTestSuite * apSuite, void * apContext)
});

NL_TEST_ASSERT(apSuite, generationCount == 0);
readCallback.mClusterCacheAdapter.GetHighestReceivedEventNumber(highestEventNumber);
NL_TEST_ASSERT(apSuite, highestEventNumber.HasValue() && highestEventNumber.Value() == 9);
}

//
Expand Down Expand Up @@ -379,6 +389,50 @@ void TestReadEvents::TestBasicCaching(nlTestSuite * apSuite, void * apContext)
});

NL_TEST_ASSERT(apSuite, generationCount == 10);
Optional<EventNumber> highestEventNumber;
readCallback.mClusterCacheAdapter.GetHighestReceivedEventNumber(highestEventNumber);
NL_TEST_ASSERT(apSuite, highestEventNumber.HasValue() && highestEventNumber.Value() == 9);
}

//
// Set user-provided event number, then read client would use user-provided event number and not use the cached one in read
// client
//

{
readParams.mEventNumber.SetValue(5);
app::ReadClient readClient(engine, &ctx.GetExchangeManager(), readCallback.mClusterCacheAdapter.GetBufferedCallback(),
app::ReadClient::InteractionType::Read);
readCallback.mClusterCacheAdapter.ClearEventCache(true);
NL_TEST_ASSERT(apSuite, readClient.SendRequest(readParams) == CHIP_NO_ERROR);

ctx.DrainAndServiceIO();

//
// Validate that we would receive 5 events
//

uint8_t generationCount = 5;
readCallback.mClusterCacheAdapter.ForEachEventData(
[&apSuite, &readCallback, &generationCount](const app::EventHeader & header) {
NL_TEST_ASSERT(apSuite, header.mPath.mClusterId == TestCluster::Id);
NL_TEST_ASSERT(apSuite, header.mPath.mEventId == TestCluster::Events::TestEvent::Id);
NL_TEST_ASSERT(apSuite, header.mPath.mEndpointId == kTestEndpointId);

TestCluster::Events::TestEvent::DecodableType eventData;
NL_TEST_ASSERT(apSuite, readCallback.mClusterCacheAdapter.Get(header.mEventNumber, eventData) == CHIP_NO_ERROR);

NL_TEST_ASSERT(apSuite, eventData.arg1 == generationCount);
generationCount++;

return CHIP_NO_ERROR;
});

NL_TEST_ASSERT(apSuite, generationCount == 10);

Optional<EventNumber> highestEventNumber;
readCallback.mClusterCacheAdapter.GetHighestReceivedEventNumber(highestEventNumber);
NL_TEST_ASSERT(apSuite, highestEventNumber.HasValue() && highestEventNumber.Value() == 9);
}

NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
Expand Down
6 changes: 3 additions & 3 deletions src/controller/tests/TestEventChunking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ void TestReadEvents::TestEventChunking(nlTestSuite * apSuite, void * apContext)

readParams.mpEventPathParamsList = &eventPath;
readParams.mEventPathParamsListSize = 1;
readParams.mEventNumber = firstEventNumber;
readParams.mEventNumber.SetValue(firstEventNumber);

// Since we will always read from the first event, we only generate event once.

Expand Down Expand Up @@ -397,7 +397,7 @@ void TestReadEvents::TestMixedEventsAndAttributesChunking(nlTestSuite * apSuite,
readParams.mAttributePathParamsListSize = 1;
readParams.mpEventPathParamsList = &eventPath;
readParams.mEventPathParamsListSize = 1;
readParams.mEventNumber = firstEventNumber;
readParams.mEventNumber.SetValue(firstEventNumber);

//
// We've empirically determined that by reserving 950 bytes in the packet buffer, we can fit 2
Expand Down Expand Up @@ -476,7 +476,7 @@ void TestReadEvents::TestMixedEventsAndLargeAttributesChunking(nlTestSuite * apS
readParams.mAttributePathParamsListSize = 1;
readParams.mpEventPathParamsList = &eventPath;
readParams.mEventPathParamsListSize = 1;
readParams.mEventNumber = firstEventNumber;
readParams.mEventNumber.SetValue(firstEventNumber);

//
// We've empirically determined that by reserving 950 bytes in the packet buffer, we can fit 2
Expand Down

0 comments on commit f621005

Please sign in to comment.