Skip to content

Commit

Permalink
Integrate CRMP to messaging layer and enable it by default (#4048)
Browse files Browse the repository at this point in the history
* Integrate CRMP to messaging layer and enable it by default

* Address review comments

* Add re-transmition test with dropped sent packets

* Use reference to share mContextPool between ReliableMessageManager and ExchangeManager

* Use PacketBufferHandle instead of raw PacketBuffer to store message buffer in retransmit table

* Retain an extra handle of msgBuf before sending for following rewinding

* Retain the buffer within SecureSessionMgr and update the retrans entry

* Address the review comments

Update src/transport/SecureSessionMgr.h

Co-authored-by: Boris Zbarsky <[email protected]>

* Take care of various error cases in HandleMessage

* Update src/messaging/ExchangeContext.cpp

Co-authored-by: Boris Zbarsky <[email protected]>

* Update src/messaging/ExchangeContext.cpp

Co-authored-by: Boris Zbarsky <[email protected]>

* Update src/messaging/ExchangeContext.cpp

Co-authored-by: Boris Zbarsky <[email protected]>

* Update src/messaging/ExchangeContext.cpp

Co-authored-by: Boris Zbarsky <[email protected]>

* Update src/messaging/ExchangeContext.cpp

Co-authored-by: Boris Zbarsky <[email protected]>

* Update src/messaging/ExchangeMgr.cpp

Co-authored-by: Boris Zbarsky <[email protected]>

* Use initializer to initilize the EncryptedPacketBufferHandle memebers

* Simplify sendMessage API for sending encrypted message

* Skip re-setting up the packet header when the message is already encrypted.

* Add the Unit test for SecureSessionMgr

Co-authored-by: Boris Zbarsky <[email protected]>
  • Loading branch information
yufengwangca and bzbarsky-apple authored Jan 7, 2021
1 parent cb84e54 commit cb23e0c
Show file tree
Hide file tree
Showing 15 changed files with 747 additions and 430 deletions.
117 changes: 105 additions & 12 deletions src/messaging/ExchangeContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <messaging/ExchangeContext.h>
#include <messaging/ExchangeMgr.h>
#include <protocols/Protocols.h>
#include <protocols/common/CommonProtocol.h>
#include <support/logging/CHIPLogging.h>
#include <system/SystemTimer.h>

Expand Down Expand Up @@ -97,6 +98,29 @@ CHIP_ERROR ExchangeContext::SendMessage(uint16_t protocolId, uint8_t msgType, Pa
// Set the message type for this header.
payloadHeader.SetMessageType(msgType);

payloadHeader.SetInitiator(IsInitiator());

// If auto-request ACK feature is enabled, automatically request an acknowledgment,
// UNLESS the NoAutoRequestAck send flag has been specified.
if (mReliableMessageContext.AutoRequestAck() && !sendFlags.Has(SendMessageFlags::kSendFlag_NoAutoRequestAck))
{
payloadHeader.SetNeedsAck(true);
}

// If there is a pending acknowledgment piggyback it on this message.
if (mReliableMessageContext.HasPeerRequestedAck())
{
payloadHeader.SetAckId(mReliableMessageContext.mPendingPeerAckId);

// Set AckPending flag to false since current outgoing message is going to serve as the ack on this exchange.
mReliableMessageContext.SetAckPending(false);

#if !defined(NDEBUG)
ChipLogProgress(ExchangeManager, "Piggybacking Ack for MsgId:%08" PRIX32 " with msg",
mReliableMessageContext.mPendingPeerAckId);
#endif
}

// If a response message is expected...
if (sendFlags.Has(SendMessageFlags::kSendFlag_ExpectResponse))
{
Expand All @@ -113,10 +137,33 @@ CHIP_ERROR ExchangeContext::SendMessage(uint16_t protocolId, uint8_t msgType, Pa
}
}

payloadHeader.SetInitiator(IsInitiator());
// Send the message.
if (payloadHeader.IsNeedsAck())
{
ReliableMessageManager::RetransTableEntry * entry = nullptr;

// Add to Table for subsequent sending
err = mExchangeMgr->GetReliableMessageMgr()->AddToRetransTable(&mReliableMessageContext, &entry);
SuccessOrExit(err);

err = mExchangeMgr->GetSessionMgr()->SendMessage(payloadHeader, mPeerNodeId, std::move(msgBuf), &entry->retainedBuf);

err = mExchangeMgr->GetSessionMgr()->SendMessage(payloadHeader, mPeerNodeId, std::move(msgBuf));
SuccessOrExit(err);
if (err != CHIP_NO_ERROR)
{
// Remove from table
ChipLogError(ExchangeManager, "Failed to send message to 0x%" PRIx64 " with err %ld", mPeerNodeId, long(err));
mExchangeMgr->GetReliableMessageMgr()->ClearRetransmitTable(*entry);
}
else
{
mExchangeMgr->GetReliableMessageMgr()->StartRetransmision(entry);
}
}
else
{
err = mExchangeMgr->GetSessionMgr()->SendMessage(payloadHeader, mPeerNodeId, std::move(msgBuf));
SuccessOrExit(err);
}

exit:
if (err != CHIP_NO_ERROR && IsResponseExpected())
Expand Down Expand Up @@ -144,6 +191,19 @@ void ExchangeContext::DoClose(bool clearRetransTable)
}
mDelegate = nullptr;

// Closure of an exchange context is based on ref counting. The Protocol, when it calls DoClose(), indicates that
// it is done with the exchange context and the message layer sets all callbacks to NULL and does not send anything
// received on the exchange context up to higher layers. At this point, the message layer needs to handle the
// remaining work to be done on that exchange, (e.g. send all pending acks) before truly cleaning it up.
mReliableMessageContext.FlushAcks();

// In case the protocol wants a harder release of the EC right away, such as calling Abort(), exchange
// needs to clear the CRMP retransmission table immediately.
if (clearRetransTable)
{
mExchangeMgr->GetReliableMessageMgr()->ClearRetransmitTable(&mReliableMessageContext);
}

// Cancel the response timer.
CancelResponseTimer();
}
Expand Down Expand Up @@ -192,7 +252,6 @@ void ExchangeContext::Reset()
ExchangeContext * ExchangeContext::Alloc(ExchangeManager * em, uint16_t ExchangeId, uint64_t PeerNodeId, bool Initiator,
ExchangeDelegate * delegate)
{
VerifyOrDie(delegate != nullptr);
VerifyOrDie(mExchangeMgr == nullptr && GetReferenceCount() == 0);

Reset();
Expand All @@ -204,6 +263,8 @@ ExchangeContext * ExchangeContext::Alloc(ExchangeManager * em, uint16_t Exchange
mFlags.Set(ExFlagValues::kFlagInitiator, Initiator);
mDelegate = delegate;

mReliableMessageContext.Init(em->GetReliableMessageMgr(), this);

#if defined(CHIP_EXCHANGE_CONTEXT_DETAIL_LOGGING)
ChipLogProgress(ExchangeManager, "ec++ id: %d, inUse: %d, addr: 0x%x", (this - em->ContextPool + 1), em->GetContextsInUse(),
this);
Expand Down Expand Up @@ -297,6 +358,7 @@ CHIP_ERROR ExchangeContext::HandleMessage(const PacketHeader & packetHeader, con
PacketBufferHandle msgBuf)
{
CHIP_ERROR err = CHIP_NO_ERROR;
uint32_t messageId = 0;
uint16_t protocolId = 0;
uint8_t messageType = 0;

Expand All @@ -306,25 +368,56 @@ CHIP_ERROR ExchangeContext::HandleMessage(const PacketHeader & packetHeader, con
// layer has completed its work on the ExchangeContext.
Retain();

messageId = packetHeader.GetMessageId();
protocolId = payloadHeader.GetProtocolID();
messageType = payloadHeader.GetMessageType();

// Since we got the response, cancel the response timer.
CancelResponseTimer();
if (payloadHeader.IsAckMsg())
{
err = mReliableMessageContext.HandleRcvdAck(payloadHeader.GetAckId().Value());
SuccessOrExit(err);
}

if (payloadHeader.IsNeedsAck())
{
MessageFlags msgFlags;

// If the context was expecting a response to a previously sent message, this message
// is implicitly that response.
SetResponseExpected(false);
// An acknowledgment needs to be sent back to the peer for this message on this exchange,
// Set the flag in message header indicating an ack requested by peer;
msgFlags.Set(MessageFlagValues::kMessageFlag_PeerRequestedAck);

if (mDelegate != nullptr)
// Also set the flag in the exchange context indicating an ack requested;
mReliableMessageContext.SetPeerRequestedAck(true);

err = mReliableMessageContext.HandleNeedsAck(messageId, msgFlags);
SuccessOrExit(err);
}

// The Common::Null message type is only used for CRMP; do not pass such messages to the application layer.
if ((protocolId == Protocols::kProtocol_Protocol_Common) && (messageType == Protocols::Common::kMsgType_Null))
{
mDelegate->OnMessageReceived(this, packetHeader, protocolId, messageType, std::move(msgBuf));
ExitNow(err = CHIP_NO_ERROR);
}
else
{
DefaultOnMessageReceived(this, packetHeader, protocolId, messageType, std::move(msgBuf));
// Since we got the response, cancel the response timer.
CancelResponseTimer();

// If the context was expecting a response to a previously sent message, this message
// is implicitly that response.
SetResponseExpected(false);

if (mDelegate != nullptr)
{
mDelegate->OnMessageReceived(this, packetHeader, protocolId, messageType, std::move(msgBuf));
}
else
{
DefaultOnMessageReceived(this, packetHeader, protocolId, messageType, std::move(msgBuf));
}
}

exit:
// Release the reference to the ExchangeContext that was held at the beginning of this function.
// This call should also do the needful of closing the ExchangeContext if the protocol has
// already made a prior call to Close().
Expand Down
5 changes: 5 additions & 0 deletions src/messaging/ExchangeContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <lib/core/ReferenceCounted.h>
#include <messaging/ExchangeDelegate.h>
#include <messaging/Flags.h>
#include <messaging/ReliableMessageContext.h>
#include <support/BitFlags.h>
#include <support/DLLUtil.h>
#include <system/SystemTimer.h>
Expand Down Expand Up @@ -128,9 +129,12 @@ class DLL_EXPORT ExchangeContext : public ReferenceCounted<ExchangeContext, Exch

ExchangeDelegate * GetDelegate() const { return mDelegate; }
void SetDelegate(ExchangeDelegate * delegate) { mDelegate = delegate; }
void SetReliableMessageDelegate(ReliableMessageDelegate * delegate) { mReliableMessageContext.SetDelegate(delegate); }

ExchangeManager * GetExchangeMgr() const { return mExchangeMgr; }

ReliableMessageContext * GetReliableMessageContext() { return &mReliableMessageContext; };

uint64_t GetPeerNodeId() const { return mPeerNodeId; }

uint16_t GetExchangeId() const { return mExchangeId; }
Expand Down Expand Up @@ -158,6 +162,7 @@ class DLL_EXPORT ExchangeContext : public ReferenceCounted<ExchangeContext, Exch
};

Timeout mResponseTimeout; // Maximum time to wait for response (in milliseconds); 0 disables response timeout.
ReliableMessageContext mReliableMessageContext;
ExchangeDelegate * mDelegate = nullptr;
ExchangeManager * mExchangeMgr = nullptr;

Expand Down
49 changes: 41 additions & 8 deletions src/messaging/ExchangeMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#include <core/CHIPEncoding.h>
#include <messaging/ExchangeContext.h>
#include <messaging/ExchangeMgr.h>
#include <protocols/Protocols.h>
#include <protocols/common/CommonProtocol.h>
#include <support/CHIPFaultInjection.h>
#include <support/CodeUtils.h>
#include <support/RandUtils.h>
Expand All @@ -58,7 +60,7 @@ namespace Messaging {
* prior to use.
*
*/
ExchangeManager::ExchangeManager()
ExchangeManager::ExchangeManager() : mReliableMessageMgr(ContextPool)
{
mState = State::kState_NotInitialized;
}
Expand All @@ -79,6 +81,8 @@ CHIP_ERROR ExchangeManager::Init(SecureSessionMgr * sessionMgr)

sessionMgr->SetDelegate(this);

mReliableMessageMgr.Init(sessionMgr->SystemLayer(), sessionMgr);

mState = State::kState_Initialized;

return CHIP_NO_ERROR;
Expand Down Expand Up @@ -161,15 +165,23 @@ ExchangeContext * ExchangeManager::AllocContext(uint16_t ExchangeId, uint64_t Pe
void ExchangeManager::DispatchMessage(const PacketHeader & packetHeader, const PayloadHeader & payloadHeader,
System::PacketBufferHandle msgBuf)
{
CHIP_ERROR err = CHIP_NO_ERROR;
UnsolicitedMessageHandler * umh = nullptr;
UnsolicitedMessageHandler * matchingUMH = nullptr;
CHIP_ERROR err = CHIP_NO_ERROR;
bool sendAckAndCloseExchange = false;

// Search for an existing exchange that the message applies to. If a match is found...
for (auto & ec : ContextPool)
{
if (ec.GetReferenceCount() > 0 && ec.MatchExchange(packetHeader, payloadHeader))
{
// Found a matching exchange. Set flag for correct subsequent CRMP
// retransmission timeout selection.
if (!ec.mReliableMessageContext.HasRcvdMsgFromPeer())
{
ec.mReliableMessageContext.SetMsgRcvdFromPeer(true);
}

// Matched ExchangeContext; send to message handler.
ec.HandleMessage(packetHeader, payloadHeader, std::move(msgBuf));

Expand Down Expand Up @@ -203,23 +215,44 @@ void ExchangeManager::DispatchMessage(const PacketHeader & packetHeader, const P
}
}
}
// Discard the message if it isn't marked as being sent by an initiator.
else
// Discard the message if it isn't marked as being sent by an initiator and the message does not need to send
// an ack to the peer.
else if (!payloadHeader.IsNeedsAck())
{
ExitNow(err = CHIP_ERROR_UNSOLICITED_MSG_NO_ORIGINATOR);
}

// If we didn't find an existing exchange that matches the message, and no unsolicited message handler registered
// to hand this message, we need to create a temporary exchange to send an ack for this message and then close this exchange.
sendAckAndCloseExchange = payloadHeader.IsNeedsAck() && (matchingUMH == nullptr);

// If we found a handler or we need to create a new exchange context (EC).
if (matchingUMH != nullptr)
if (matchingUMH != nullptr || sendAckAndCloseExchange)
{
auto * ec =
AllocContext(payloadHeader.GetExchangeID(), packetHeader.GetSourceNodeId().Value(), false, matchingUMH->Delegate);
ExchangeContext * ec = nullptr;

if (sendAckAndCloseExchange)
{
// If rcvd msg is from initiator then this exchange is created as not Initiator.
// If rcvd msg is not from initiator then this exchange is created as Initiator.
ec = AllocContext(payloadHeader.GetExchangeID(), packetHeader.GetSourceNodeId().Value(), !payloadHeader.IsInitiator(),
nullptr);
}
else
{
ec = AllocContext(payloadHeader.GetExchangeID(), packetHeader.GetSourceNodeId().Value(), false, matchingUMH->Delegate);
}

VerifyOrExit(ec != nullptr, err = CHIP_ERROR_NO_MEMORY);

ChipLogProgress(ExchangeManager, "ec pos: %d, id: %d, Delegate: 0x%x", ec - ContextPool.begin(), ec->GetExchangeId(),
ec->GetDelegate());

ec->HandleMessage(packetHeader, payloadHeader, std::move(msgBuf));

// Close exchange if it was created only to send ack for a duplicate message.
if (sendAckAndCloseExchange)
ec->Close();
}

exit:
Expand Down Expand Up @@ -288,7 +321,7 @@ void ExchangeManager::OnConnectionExpired(const Transport::PeerConnectionState *
{
for (auto & ec : ContextPool)
{
if (ec.GetReferenceCount() > 0 && ec.mPeerNodeId == state->GetPeerNodeId())
if (ec.GetReferenceCount() > 0 && ec.GetPeerNodeId() == state->GetPeerNodeId())
{
ec.Close();
// Continue iterate because there can be multiple contexts associated with the connection.
Expand Down
4 changes: 4 additions & 0 deletions src/messaging/ExchangeMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <array>

#include <messaging/ExchangeContext.h>
#include <messaging/ReliableMessageManager.h>
#include <support/DLLUtil.h>
#include <transport/SecureSessionMgr.h>

Expand Down Expand Up @@ -165,6 +166,8 @@ class DLL_EXPORT ExchangeManager : public SecureSessionMgrDelegate

SecureSessionMgr * GetSessionMgr() const { return mSessionMgr; }

ReliableMessageManager * GetReliableMessageMgr() { return &mReliableMessageMgr; };

size_t GetContextsInUse() const { return mContextsInUse; }

private:
Expand All @@ -184,6 +187,7 @@ class DLL_EXPORT ExchangeManager : public SecureSessionMgrDelegate
uint16_t mNextExchangeId;
State mState;
SecureSessionMgr * mSessionMgr;
ReliableMessageManager mReliableMessageMgr;

std::array<ExchangeContext, CHIP_CONFIG_MAX_EXCHANGE_CONTEXTS> ContextPool;
size_t mContextsInUse;
Expand Down
Loading

0 comments on commit cb23e0c

Please sign in to comment.