Skip to content

Commit

Permalink
[IM] Wildcard read and chunking (#11304)
Browse files Browse the repository at this point in the history
* [IM] Support chunking in report engine

* Address comments

* Support wildcard path in read / subscribe interaction
  • Loading branch information
erjiaqing authored and pull[bot] committed Nov 30, 2021
1 parent 5cd3886 commit 1072856
Show file tree
Hide file tree
Showing 16 changed files with 603 additions and 97 deletions.
10 changes: 10 additions & 0 deletions src/app/ClusterInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#pragma once

#include <app/ConcreteAttributePath.h>
#include <app/util/basic-types.h>
#include <assert.h>
#include <lib/core/Optional.h>
Expand Down Expand Up @@ -57,6 +58,15 @@ struct ClusterInfo
return true;
}

bool IsAttributePathSupersetOf(const ConcreteAttributePath & other) const
{
VerifyOrReturnError(HasWildcardEndpointId() || mEndpointId == other.mEndpointId, false);
VerifyOrReturnError(HasWildcardClusterId() || mClusterId == other.mClusterId, false);
VerifyOrReturnError(HasWildcardAttributeId() || mAttributeId == other.mAttributeId, false);

return true;
}

bool HasWildcard() const { return HasWildcardEndpointId() || HasWildcardClusterId() || HasWildcardAttributeId(); }

/**
Expand Down
14 changes: 7 additions & 7 deletions src/app/ReadClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ CHIP_ERROR ReadClient::OnMessageReceived(Messaging::ExchangeContext * apExchange
}

exit:
if (!IsSubscriptionType() || err != CHIP_NO_ERROR)
if ((!IsSubscriptionType() && !mPendingMoreChunks) || err != CHIP_NO_ERROR)
{
ShutdownInternal(err);
}
Expand Down Expand Up @@ -300,7 +300,6 @@ CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload)
bool isEventReportsPresent = false;
bool isAttributeReportIBsPresent = false;
bool suppressResponse = false;
bool moreChunkedMessages = false;
uint64_t subscriptionId = 0;
EventReports::Parser EventReports;
AttributeReportIBs::Parser attributeReportIBs;
Expand Down Expand Up @@ -349,10 +348,11 @@ CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload)
}
SuccessOrExit(err);

err = report.GetMoreChunkedMessages(&moreChunkedMessages);
err = report.GetMoreChunkedMessages(&mPendingMoreChunks);
if (CHIP_END_OF_TLV == err)
{
err = CHIP_NO_ERROR;
mPendingMoreChunks = false;
err = CHIP_NO_ERROR;
}
SuccessOrExit(err);

Expand All @@ -378,7 +378,7 @@ CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload)
err = CHIP_NO_ERROR;
}
SuccessOrExit(err);
if (isAttributeReportIBsPresent && nullptr != mpCallback && !moreChunkedMessages)
if (isAttributeReportIBsPresent && nullptr != mpCallback)
{
TLV::TLVReader attributeReportIBsReader;
attributeReportIBs.GetReader(&attributeReportIBsReader);
Expand Down Expand Up @@ -407,9 +407,9 @@ CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload)

StatusResponse::SendStatusResponse(err == CHIP_NO_ERROR ? Protocols::InteractionModel::Status::Success
: Protocols::InteractionModel::Status::InvalidSubscription,
mpExchangeCtx, IsAwaitingSubscribeResponse());
mpExchangeCtx, IsAwaitingSubscribeResponse() || mPendingMoreChunks);

if (!mInitialReport)
if (!mInitialReport && !mPendingMoreChunks)
{
mpExchangeCtx = nullptr;
}
Expand Down
1 change: 1 addition & 0 deletions src/app/ReadClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ class ReadClient : public Messaging::ExchangeDelegate
Callback * mpCallback = nullptr;
ClientState mState = ClientState::Uninitialized;
bool mInitialReport = true;
bool mPendingMoreChunks = false;
uint16_t mMinIntervalFloorSeconds = 0;
uint16_t mMaxIntervalCeilingSeconds = 0;
uint64_t mSubscriptionId = 0;
Expand Down
38 changes: 32 additions & 6 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ CHIP_ERROR ReadHandler::Init(Messaging::ExchangeManager * apExchangeMgr, Interac
mHoldReport = false;
mDirty = false;
mActiveSubscription = false;
mIsChunkedReport = false;
mInteractionType = aInteractionType;
mInitiatorNodeId = apExchangeContext->GetSessionHandle().GetPeerNodeId();
mFabricIndex = apExchangeContext->GetSessionHandle().GetFabricIndex();
Expand Down Expand Up @@ -107,6 +108,7 @@ void ReadHandler::Shutdown(ShutdownOptions aOptions)
mHoldReport = false;
mDirty = false;
mActiveSubscription = false;
mIsChunkedReport = false;
mInitiatorNodeId = kUndefinedNodeId;
}

Expand Down Expand Up @@ -140,7 +142,18 @@ CHIP_ERROR ReadHandler::OnStatusResponse(Messaging::ExchangeContext * apExchange
switch (mState)
{
case HandlerState::AwaitingReportResponse:
if (IsSubscriptionType())
if (IsChunkedReport())
{
InteractionModelEngine::GetInstance()->GetReportingEngine().OnReportConfirm();
MoveToState(HandlerState::GeneratingReports);
if (mpExchangeCtx)
{
mpExchangeCtx->WillSendMessage();
}
// Trigger ReportingEngine run for sending next chunk of data.
SuccessOrExit(err = InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun());
}
else if (IsSubscriptionType())
{
InteractionModelEngine::GetInstance()->GetReportingEngine().OnReportConfirm();
if (IsInitialReport())
Expand Down Expand Up @@ -176,10 +189,10 @@ CHIP_ERROR ReadHandler::OnStatusResponse(Messaging::ExchangeContext * apExchange
return err;
}

CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload)
CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, bool aMoreChunks)
{
VerifyOrReturnLogError(IsReportable(), CHIP_ERROR_INCORRECT_STATE);
if (IsInitialReport())
if (IsInitialReport() || IsChunkedReport())
{
mSessionHandle.SetValue(mpExchangeCtx->GetSessionHandle());
}
Expand All @@ -190,6 +203,7 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload)
mpExchangeCtx->SetResponseTimeout(kImMessageTimeout);
}
VerifyOrReturnLogError(mpExchangeCtx != nullptr, CHIP_ERROR_INCORRECT_STATE);
mIsChunkedReport = aMoreChunks;
MoveToState(HandlerState::AwaitingReportResponse);
CHIP_ERROR err = mpExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::ReportData, std::move(aPayload),
Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse));
Expand All @@ -200,7 +214,10 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload)
err = RefreshSubscribeSyncTimer();
}
}
ClearDirty();
if (!aMoreChunks)
{
ClearDirty();
}
return err;
}

Expand Down Expand Up @@ -319,20 +336,28 @@ CHIP_ERROR ReadHandler::ProcessAttributePathList(AttributePathIBs::Parser & aAtt
AttributePathIB::Parser path;
err = path.Init(reader);
SuccessOrExit(err);
// TODO: Support wildcard paths here
// TODO: MEIs (ClusterId and AttributeId) have a invalid pattern instead of a single invalid value, need to add separate
// functions for checking if we have received valid values.
// TODO: Wildcard cluster id with non-global attributes or wildcard attribute paths should be rejected.
err = path.GetEndpoint(&(clusterInfo.mEndpointId));
if (err == CHIP_NO_ERROR)
{
VerifyOrExit(!clusterInfo.HasWildcardEndpointId(), err = CHIP_ERROR_IM_MALFORMED_ATTRIBUTE_PATH);
}
else if (err == CHIP_END_OF_TLV)
{
err = CHIP_NO_ERROR;
}
SuccessOrExit(err);
err = path.GetCluster(&(clusterInfo.mClusterId));
if (err == CHIP_NO_ERROR)
{
VerifyOrExit(!clusterInfo.HasWildcardClusterId(), err = CHIP_ERROR_IM_MALFORMED_ATTRIBUTE_PATH);
}
else if (err == CHIP_END_OF_TLV)
{
err = CHIP_NO_ERROR;
}

SuccessOrExit(err);
err = path.GetAttribute(&(clusterInfo.mAttributeId));
Expand Down Expand Up @@ -364,7 +389,8 @@ CHIP_ERROR ReadHandler::ProcessAttributePathList(AttributePathIBs::Parser & aAtt
// if we have exhausted this container
if (CHIP_END_OF_TLV == err)
{
err = CHIP_NO_ERROR;
mAttributePathExpandIterator = AttributePathExpandIterator(mpAttributeClusterInfoList);
err = CHIP_NO_ERROR;
}

exit:
Expand Down
22 changes: 18 additions & 4 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#pragma once

#include <app/AttributePathExpandIterator.h>
#include <app/ClusterInfo.h>
#include <app/EventManagement.h>
#include <app/InteractionModelDelegate.h>
Expand Down Expand Up @@ -97,12 +98,13 @@ class ReadHandler : public Messaging::ExchangeDelegate
* Send ReportData to initiator
*
* @param[in] aPayload A payload that has read request data
* @param[in] aMoreChunks A flags indicating there will be more chunks expected to be sent for this read request
*
* @retval #Others If fails to send report data
* @retval #CHIP_NO_ERROR On success.
*
*/
CHIP_ERROR SendReportData(System::PacketBufferHandle && aPayload);
CHIP_ERROR SendReportData(System::PacketBufferHandle && aPayload, bool mMoreChunks);

bool IsFree() const { return mState == HandlerState::Uninitialized; }
bool IsReportable() const { return mState == HandlerState::GeneratingReports && !mHoldReport; }
Expand All @@ -126,11 +128,19 @@ class ReadHandler : public Messaging::ExchangeDelegate

bool IsReadType() { return mInteractionType == InteractionType::Read; }
bool IsSubscriptionType() { return mInteractionType == InteractionType::Subscribe; }
bool IsChunkedReport() { return mIsChunkedReport; }
bool IsInitialReport() { return mInitialReport; }
bool IsActiveSubscription() const { return mActiveSubscription; }
CHIP_ERROR OnSubscribeRequest(Messaging::ExchangeContext * apExchangeContext, System::PacketBufferHandle && aPayload);
void GetSubscriptionId(uint64_t & aSubscriptionId) { aSubscriptionId = mSubscriptionId; }
void SetDirty() { mDirty = true; }
AttributePathExpandIterator * GetAttributePathExpandIterator() { return &mAttributePathExpandIterator; }
void SetDirty()
{
mDirty = true;
// If the contents of the global dirty set have changed, we need to reset the iterator since the paths
// we've sent up till now are no longer valid and need to be invalidated.
mAttributePathExpandIterator = AttributePathExpandIterator(mpAttributeClusterInfoList);
}
void ClearDirty() { mDirty = false; }
bool IsDirty() { return mDirty; }
NodeId GetInitiatorNodeId() const { return mInitiatorNodeId; }
Expand Down Expand Up @@ -192,8 +202,12 @@ class ReadHandler : public Messaging::ExchangeDelegate
bool mHoldReport = false;
bool mDirty = false;
bool mActiveSubscription = false;
NodeId mInitiatorNodeId = kUndefinedNodeId;
FabricIndex mFabricIndex = 0;
// The flag indicating we are in the middle of a series of chunked report messages, this flag will be cleared during sending
// last chunked message.
bool mIsChunkedReport = false;
NodeId mInitiatorNodeId = kUndefinedNodeId;
FabricIndex mFabricIndex = 0;
AttributePathExpandIterator mAttributePathExpandIterator = AttributePathExpandIterator(nullptr);
};
} // namespace app
} // namespace chip
Loading

0 comments on commit 1072856

Please sign in to comment.