Skip to content

Commit

Permalink
Fix subscription liveness check (#11795)
Browse files Browse the repository at this point in the history
  • Loading branch information
yunhanw-google authored and pull[bot] committed Mar 15, 2022
1 parent 1eac83e commit 2740658
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 22 deletions.
16 changes: 13 additions & 3 deletions src/app/ReadClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,10 +559,15 @@ CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsRea

CHIP_ERROR ReadClient::RefreshLivenessCheckTimer()
{
CHIP_ERROR err = CHIP_NO_ERROR;
CancelLivenessCheckTimer();
ChipLogProgress(DataManagement, "Refresh LivenessCheckTime with %d seconds", mMaxIntervalCeilingSeconds);
CHIP_ERROR err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
System::Clock::Seconds16(mMaxIntervalCeilingSeconds), OnLivenessTimeoutCallback, this);
VerifyOrReturnError(mpExchangeCtx != nullptr, err = CHIP_ERROR_INCORRECT_STATE);

System::Clock::Timeout timeout = System::Clock::Seconds16(mMaxIntervalCeilingSeconds) + mpExchangeCtx->GetAckTimeout();
// EFR32/MBED/INFINION/K32W's chrono count return long unsinged, but other platform returns unsigned
ChipLogProgress(DataManagement, "Refresh LivenessCheckTime with %lu milliseconds", static_cast<long unsigned>(timeout.count()));
err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
timeout, OnLivenessTimeoutCallback, this);

if (err != CHIP_NO_ERROR)
{
Expand Down Expand Up @@ -685,6 +690,11 @@ CHIP_ERROR ReadClient::SendSubscribeRequest(ReadPrepareParams & aReadPreparePara
mpExchangeCtx = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHandle, this);
VerifyOrExit(mpExchangeCtx != nullptr, err = CHIP_ERROR_NO_MEMORY);
mpExchangeCtx->SetResponseTimeout(kImMessageTimeout);
if (mpExchangeCtx->IsBLETransport())
{
ChipLogError(DataManagement, "IM Subscribe cannot work with BLE");
SuccessOrExit(err = CHIP_ERROR_INCORRECT_STATE);
}

err = mpExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::SubscribeRequest, std::move(msgBuf),
Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse));
Expand Down
29 changes: 24 additions & 5 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -602,21 +602,40 @@ CHIP_ERROR ReadHandler::ProcessSubscribeRequest(System::PacketBufferHandle && aP
return CHIP_NO_ERROR;
}

void ReadHandler::OnUnblockHoldReportCallback(System::Layer * apSystemLayer, void * apAppState)
{
VerifyOrReturn(apAppState != nullptr);
ReadHandler * readHandler = static_cast<ReadHandler *>(apAppState);
ChipLogProgress(DataManagement, "Unblock report hold after min %d seconds", readHandler->mMinIntervalFloorSeconds);
readHandler->mHoldReport = false;
if (readHandler->mDirty)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
}
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
System::Clock::Seconds16(readHandler->mMaxIntervalCeilingSeconds - readHandler->mMinIntervalFloorSeconds),
OnRefreshSubscribeTimerSyncCallback, readHandler);
}

void ReadHandler::OnRefreshSubscribeTimerSyncCallback(System::Layer * apSystemLayer, void * apAppState)
{
ReadHandler * aReadHandler = static_cast<ReadHandler *>(apAppState);
aReadHandler->mHoldReport = false;
VerifyOrReturn(apAppState != nullptr);
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
}

CHIP_ERROR ReadHandler::RefreshSubscribeSyncTimer()
{
ChipLogProgress(DataManagement, "ReadHandler::Refresh Subscribe Sync Timer with %d seconds", mMinIntervalFloorSeconds);
ChipLogProgress(DataManagement, "ReadHandler::Refresh Subscribe Sync Timer with %d seconds", mMaxIntervalCeilingSeconds);
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
OnUnblockHoldReportCallback, this);
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
OnRefreshSubscribeTimerSyncCallback, this);
mHoldReport = true;
return InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
System::Clock::Seconds16(mMinIntervalFloorSeconds), OnRefreshSubscribeTimerSyncCallback, this);
ReturnErrorOnFailure(
InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
System::Clock::Seconds16(mMinIntervalFloorSeconds), OnUnblockHoldReportCallback, this));

return CHIP_NO_ERROR;
}
} // namespace app
} // namespace chip
1 change: 1 addition & 0 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class ReadHandler : public Messaging::ExchangeDelegate
AwaitingReportResponse, ///< The handler has sent the report to the client and is awaiting a status response.
};

static void OnUnblockHoldReportCallback(System::Layer * apSystemLayer, void * apAppState);
static void OnRefreshSubscribeTimerSyncCallback(System::Layer * apSystemLayer, void * apAppState);
CHIP_ERROR RefreshSubscribeSyncTimer();
CHIP_ERROR SendSubscribeResponse();
Expand Down
26 changes: 18 additions & 8 deletions src/app/reporting/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,15 @@ CHIP_ERROR Engine::SetDirty(ClusterInfo & aClusterInfo)
// chunk for read interactions.
if (handler.IsGeneratingReports() || handler.IsAwaitingReportResponse())
{
handler.SetDirty();
for (auto clusterInfo = handler.GetAttributeClusterInfolist(); clusterInfo != nullptr;
clusterInfo = clusterInfo->mpNext)
{
if (aClusterInfo.IsAttributePathSupersetOf(*clusterInfo) || clusterInfo->IsAttributePathSupersetOf(aClusterInfo))
{
handler.SetDirty();
break;
}
}
}
}
if (!InteractionModelEngine::GetInstance()->MergeOverlappedAttributePath(mpGlobalDirtySet, aClusterInfo) &&
Expand All @@ -468,20 +476,22 @@ void Engine::UpdateReadHandlerDirty(ReadHandler & aReadHandler)
{
return;
}

bool intersected = false;
for (auto clusterInfo = aReadHandler.GetAttributeClusterInfolist(); clusterInfo != nullptr; clusterInfo = clusterInfo->mpNext)
{
bool intersected = false;
for (auto path = mpGlobalDirtySet; path != nullptr; path = path->mpNext)
{
if (path->IsAttributePathSupersetOf(*clusterInfo) || clusterInfo->IsAttributePathSupersetOf(*path))
{
intersected = true;
break;
}
}
if (!intersected)
{
aReadHandler.ClearDirty();
}
}
if (!intersected)
{
aReadHandler.ClearDirty();
}
}

Expand All @@ -504,8 +514,8 @@ void Engine::OnReportConfirm()
}

}; // namespace reporting
}; // namespace app
}; // namespace chip
} // namespace app
} // namespace chip

void __attribute__((weak)) MatterPreAttributeReadCallback(const chip::app::ConcreteAttributePath & attributePath) {}
void __attribute__((weak)) MatterPostAttributeReadCallback(const chip::app::ConcreteAttributePath & attributePath) {}
40 changes: 34 additions & 6 deletions src/messaging/ExchangeContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,7 @@ CHIP_ERROR ExchangeContext::SendMessage(Protocols::Id protocolId, uint8_t msgTyp
// an error arising below. at the end, we have to close it.
ExchangeHandle ref(*this);

// If sending via UDP and NoAutoRequestAck send flag is not specificed,
// request reliable transmission.
const Transport::PeerAddress * peerAddress = GetSessionHandle().GetPeerAddress(mExchangeMgr->GetSessionManager());
// Treat unknown peer address as "not UDP", because we have no idea whether
// it's safe to do MRP there.
bool isUDPTransport = peerAddress && peerAddress->GetTransportType() == Transport::Type::kUdp;
bool isUDPTransport = IsUDPTransport();

// this check is ignored by the ExchangeMsgDispatch if !AutoRequestAck()
bool reliableTransmissionRequested = isUDPTransport && !sendFlags.Has(SendMessageFlags::kNoAutoRequestAck);
Expand Down Expand Up @@ -511,5 +506,38 @@ void ExchangeContext::MessageHandled()
Close();
}

bool ExchangeContext::IsUDPTransport()
{
const Transport::PeerAddress * peerAddress = GetSessionHandle().GetPeerAddress(mExchangeMgr->GetSessionManager());
return peerAddress && peerAddress->GetTransportType() == Transport::Type::kUdp;
}

bool ExchangeContext::IsTCPTransport()
{
const Transport::PeerAddress * peerAddress = GetSessionHandle().GetPeerAddress(mExchangeMgr->GetSessionManager());
return peerAddress && peerAddress->GetTransportType() == Transport::Type::kTcp;
}

bool ExchangeContext::IsBLETransport()
{
const Transport::PeerAddress * peerAddress = GetSessionHandle().GetPeerAddress(mExchangeMgr->GetSessionManager());
return peerAddress && peerAddress->GetTransportType() == Transport::Type::kBle;
}

System::Clock::Milliseconds32 ExchangeContext::GetAckTimeout()
{
System::Clock::Timeout timeout;
if (IsUDPTransport())
{
timeout = System::Clock::Milliseconds32((CHIP_CONFIG_RMP_DEFAULT_MAX_RETRANS + 1) *
(GetIdleRetransmitTimeoutTick() << CHIP_CONFIG_RMP_TIMER_DEFAULT_PERIOD_SHIFT));
}
else if (IsTCPTransport())
{
// TODO: issue 12009, need actual tcp margin value considering restransmission
timeout = System::Clock::Seconds16(30);
}
return timeout;
}
} // namespace Messaging
} // namespace chip
9 changes: 9 additions & 0 deletions src/messaging/ExchangeContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@ class DLL_EXPORT ExchangeContext : public ReliableMessageContext, public Referen

void SetResponseTimeout(Timeout timeout);

/*
* Get the overall acknowledge timeout period for the underneath transport(MRP+UDP/TCP)
*/
System::Clock::Milliseconds32 GetAckTimeout();

bool IsUDPTransport();
bool IsTCPTransport();
bool IsBLETransport();

private:
Timeout mResponseTimeout{ 0 }; // Maximum time to wait for response (in milliseconds); 0 disables response timeout.
ExchangeDelegate * mDelegate = nullptr;
Expand Down

0 comments on commit 2740658

Please sign in to comment.