Skip to content

Commit

Permalink
TCP connection setup/management and CASESession association.
Browse files Browse the repository at this point in the history
Add ConnectToPeer() API for explicit connection setup.
Currently, connecting to a peer is coupled with sending a message to the
peer.
This decouples the two and creates a clear API for connecting to a peer
address. Goes along with the existing Disconnect() API.
This would be essential during activation of retained sessions by solely
connecting to the peer and associating with the retained session.

Surface Connection completion and Closure callbacks and hook them
through SessionManager(TransportMgr delegate) and CASESession.

Mark SecureSession as defunct on connection closures.

Modify ActiveConnectionState in TCPBase to hold state for each
connection, so that it is able to handle the various control flow paths.

Associate a session with a connection object.

Associate the PeerAddress with the session early. Pass the PeerAddress
in the Find APIs. This helps check against the correct TransportType
when searching for a Sesssion in the SessionTable.

Add a `large payload` flag in EstablishSession() and Session lookup
functions to create/associate with the correct session and transport.

Have default configurations for TCP in a separate TCPConfig.h.

Refactor echo_requester.cpp and echo_responder.cpp to use the session
associated with the connection.

Handle Connection closure at ExchangeMgr and uplevel to corresponding
ExchangeContext using the corresponding session handle.

Add tests around connection establishment in TestTCP.
  • Loading branch information
pidarped committed Feb 27, 2024
1 parent de43a78 commit 54916ab
Show file tree
Hide file tree
Showing 43 changed files with 1,918 additions and 289 deletions.
2 changes: 2 additions & 0 deletions examples/shell/shell_common/include/Globals.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
#include <protocols/secure_channel/MessageCounterManager.h>
#include <transport/SessionHolder.h>
#include <transport/SessionManager.h>
#if INET_CONFIG_ENABLE_TCP_ENDPOINT
#include <transport/raw/TCP.h>
#endif // INET_CONFIG_ENABLE_TCP_ENDPOINT
#include <transport/raw/UDP.h>

#if INET_CONFIG_ENABLE_TCP_ENDPOINT
Expand Down
43 changes: 30 additions & 13 deletions src/app/CASESessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ void CASESessionManager::FindOrEstablishSession(const ScopedNodeId & peerId, Cal
,
uint8_t attemptCount, Callback::Callback<OnDeviceConnectionRetry> * onRetry
#endif // CHIP_DEVICE_CONFIG_ENABLE_AUTOMATIC_CASE_RETRIES
)
,
TransportPayloadCapability transportPayloadCapability)
{
FindOrEstablishSessionHelper(peerId, onConnection, onFailure, nullptr
#if CHIP_DEVICE_CONFIG_ENABLE_AUTOMATIC_CASE_RETRIES
,
attemptCount, onRetry
#endif
);
,
transportPayloadCapability);
}

void CASESessionManager::FindOrEstablishSession(const ScopedNodeId & peerId, Callback::Callback<OnDeviceConnected> * onConnection,
Expand All @@ -51,14 +53,16 @@ void CASESessionManager::FindOrEstablishSession(const ScopedNodeId & peerId, Cal
,
uint8_t attemptCount, Callback::Callback<OnDeviceConnectionRetry> * onRetry
#endif
)
,
TransportPayloadCapability transportPayloadCapability)
{
FindOrEstablishSessionHelper(peerId, onConnection, nullptr, onSetupFailure
#if CHIP_DEVICE_CONFIG_ENABLE_AUTOMATIC_CASE_RETRIES
,
attemptCount, onRetry
#endif
);
,
transportPayloadCapability);
}

void CASESessionManager::FindOrEstablishSession(const ScopedNodeId & peerId, Callback::Callback<OnDeviceConnected> * onConnection,
Expand All @@ -67,14 +71,16 @@ void CASESessionManager::FindOrEstablishSession(const ScopedNodeId & peerId, Cal
,
uint8_t attemptCount, Callback::Callback<OnDeviceConnectionRetry> * onRetry
#endif
)
,
TransportPayloadCapability transportPayloadCapability)
{
FindOrEstablishSessionHelper(peerId, onConnection, nullptr, nullptr
#if CHIP_DEVICE_CONFIG_ENABLE_AUTOMATIC_CASE_RETRIES
,
attemptCount, onRetry
#endif
);
,
transportPayloadCapability);
}

void CASESessionManager::FindOrEstablishSessionHelper(const ScopedNodeId & peerId,
Expand All @@ -85,7 +91,8 @@ void CASESessionManager::FindOrEstablishSessionHelper(const ScopedNodeId & peerI
,
uint8_t attemptCount, Callback::Callback<OnDeviceConnectionRetry> * onRetry
#endif
)
,
TransportPayloadCapability transportPayloadCapability)
{
ChipLogDetail(CASESessionManager, "FindOrEstablishSession: PeerId = [%d:" ChipLogFormatX64 "]", peerId.GetFabricIndex(),
ChipLogValueX64(peerId.GetNodeId()));
Expand Down Expand Up @@ -124,12 +131,12 @@ void CASESessionManager::FindOrEstablishSessionHelper(const ScopedNodeId & peerI

if (onFailure != nullptr)
{
session->Connect(onConnection, onFailure);
session->Connect(onConnection, onFailure, transportPayloadCapability);
}

if (onSetupFailure != nullptr)
{
session->Connect(onConnection, onSetupFailure);
session->Connect(onConnection, onSetupFailure, transportPayloadCapability);
}
}

Expand All @@ -146,7 +153,16 @@ void CASESessionManager::ReleaseAllSessions()
CHIP_ERROR CASESessionManager::GetPeerAddress(const ScopedNodeId & peerId, Transport::PeerAddress & addr)
{
ReturnErrorOnFailure(mConfig.sessionInitParams.Validate());
auto optionalSessionHandle = FindExistingSession(peerId);
auto optionalSessionHandle = FindExistingSession(peerId, TransportPayloadCapability::kDefault);
ReturnErrorCodeIf(!optionalSessionHandle.HasValue(), CHIP_ERROR_NOT_CONNECTED);
addr = optionalSessionHandle.Value()->AsSecureSession()->GetPeerAddress();
return CHIP_NO_ERROR;
}

CHIP_ERROR CASESessionManager::GetPeerAddressForLargePayloadSession(const ScopedNodeId & peerId, Transport::PeerAddress & addr)
{
ReturnErrorOnFailure(mConfig.sessionInitParams.Validate());
auto optionalSessionHandle = FindExistingSession(peerId, TransportPayloadCapability::kLarge);
ReturnErrorCodeIf(!optionalSessionHandle.HasValue(), CHIP_ERROR_NOT_CONNECTED);
addr = optionalSessionHandle.Value()->AsSecureSession()->GetPeerAddress();
return CHIP_NO_ERROR;
Expand Down Expand Up @@ -182,10 +198,11 @@ OperationalSessionSetup * CASESessionManager::FindExistingSessionSetup(const Sco
return mConfig.sessionSetupPool->FindSessionSetup(peerId, forAddressUpdate);
}

Optional<SessionHandle> CASESessionManager::FindExistingSession(const ScopedNodeId & peerId) const
Optional<SessionHandle> CASESessionManager::FindExistingSession(const ScopedNodeId & peerId,
const TransportPayloadCapability transportPayloadCapability) const
{
return mConfig.sessionInitParams.sessionManager->FindSecureSessionForNode(peerId,
MakeOptional(Transport::SecureSession::Type::kCASE));
return mConfig.sessionInitParams.sessionManager->FindSecureSessionForNode(
peerId, MakeOptional(Transport::SecureSession::Type::kCASE), transportPayloadCapability);
}

void CASESessionManager::ReleaseSession(OperationalSessionSetup * session)
Expand Down
21 changes: 16 additions & 5 deletions src/app/CASESessionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <lib/support/Pool.h>
#include <platform/CHIPDeviceLayer.h>
#include <transport/SessionDelegate.h>
#include <transport/SessionManager.h>
#include <transport/SessionUpdateDelegate.h>

namespace chip {
Expand Down Expand Up @@ -83,7 +84,8 @@ class CASESessionManager : public OperationalSessionReleaseDelegate, public Sess
,
uint8_t attemptCount = 1, Callback::Callback<OnDeviceConnectionRetry> * onRetry = nullptr
#endif // CHIP_DEVICE_CONFIG_ENABLE_AUTOMATIC_CASE_RETRIES
);
,
TransportPayloadCapability transportPayloadCapability = TransportPayloadCapability::kDefault);

/**
* Find an existing session for the given node ID or trigger a new session request.
Expand All @@ -106,14 +108,16 @@ class CASESessionManager : public OperationalSessionReleaseDelegate, public Sess
* @param onSetupFailure A callback to be called upon an extended device connection failure.
* @param attemptCount The number of retry attempts if session setup fails (default is 1).
* @param onRetry A callback to be called on a retry attempt (enabled by a config flag).
* @param transportPayloadCapability An indicator of whether to use a transport that supports transfer of large payloads.
*/
void FindOrEstablishSession(const ScopedNodeId & peerId, Callback::Callback<OnDeviceConnected> * onConnection,
Callback::Callback<OperationalSessionSetup::OnSetupFailure> * onSetupFailure
#if CHIP_DEVICE_CONFIG_ENABLE_AUTOMATIC_CASE_RETRIES
,
uint8_t attemptCount = 1, Callback::Callback<OnDeviceConnectionRetry> * onRetry = nullptr
#endif // CHIP_DEVICE_CONFIG_ENABLE_AUTOMATIC_CASE_RETRIES
);
,
TransportPayloadCapability transportPayloadCapability = TransportPayloadCapability::kDefault);

/**
* Find an existing session for the given node ID or trigger a new session request.
Expand All @@ -134,13 +138,15 @@ class CASESessionManager : public OperationalSessionReleaseDelegate, public Sess
* @param onConnection A callback to be called upon successful connection establishment.
* @param attemptCount The number of retry attempts if session setup fails (default is 1).
* @param onRetry A callback to be called on a retry attempt (enabled by a config flag).
* @param transportPayloadCapability An indicator of whether to use a transport that supports transfer of large payloads.
*/
void FindOrEstablishSession(const ScopedNodeId & peerId, Callback::Callback<OnDeviceConnected> * onConnection, nullptr_t
#if CHIP_DEVICE_CONFIG_ENABLE_AUTOMATIC_CASE_RETRIES
,
uint8_t attemptCount = 1, Callback::Callback<OnDeviceConnectionRetry> * onRetry = nullptr
#endif // CHIP_DEVICE_CONFIG_ENABLE_AUTOMATIC_CASE_RETRIES
);
,
TransportPayloadCapability transportPayloadCapability = TransportPayloadCapability::kDefault);

void ReleaseSessionsForFabric(FabricIndex fabricIndex);

Expand All @@ -156,6 +162,8 @@ class CASESessionManager : public OperationalSessionReleaseDelegate, public Sess
*/
CHIP_ERROR GetPeerAddress(const ScopedNodeId & peerId, Transport::PeerAddress & addr);

CHIP_ERROR GetPeerAddressForLargePayloadSession(const ScopedNodeId & peerId, Transport::PeerAddress & addr);

//////////// OperationalSessionReleaseDelegate Implementation ///////////////
void ReleaseSession(OperationalSessionSetup * device) override;

Expand All @@ -165,15 +173,18 @@ class CASESessionManager : public OperationalSessionReleaseDelegate, public Sess
private:
OperationalSessionSetup * FindExistingSessionSetup(const ScopedNodeId & peerId, bool forAddressUpdate = false) const;

Optional<SessionHandle> FindExistingSession(const ScopedNodeId & peerId) const;
Optional<SessionHandle>
FindExistingSession(const ScopedNodeId & peerId,
const TransportPayloadCapability transportPayloadCapability = TransportPayloadCapability::kDefault) const;

void FindOrEstablishSessionHelper(const ScopedNodeId & peerId, Callback::Callback<OnDeviceConnected> * onConnection,
Callback::Callback<OnDeviceConnectionFailure> * onFailure,
Callback::Callback<OperationalSessionSetup::OnSetupFailure> * onSetupFailure,
#if CHIP_DEVICE_CONFIG_ENABLE_AUTOMATIC_CASE_RETRIES
uint8_t attemptCount, Callback::Callback<OnDeviceConnectionRetry> * onRetry
#endif
);
,
TransportPayloadCapability transportPayloadCapability);

CASESessionManagerConfig mConfig;
};
Expand Down
33 changes: 26 additions & 7 deletions src/app/OperationalSessionSetup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ bool OperationalSessionSetup::AttachToExistingSecureSession()
mState == State::WaitingForRetry,
false);

auto sessionHandle =
mInitParams.sessionManager->FindSecureSessionForNode(mPeerId, MakeOptional(Transport::SecureSession::Type::kCASE));
auto sessionHandle = mInitParams.sessionManager->FindSecureSessionForNode(
mPeerId, MakeOptional(Transport::SecureSession::Type::kCASE), mTransportPayloadCapability);
if (!sessionHandle.HasValue())
return false;

Expand All @@ -93,11 +93,13 @@ bool OperationalSessionSetup::AttachToExistingSecureSession()

void OperationalSessionSetup::Connect(Callback::Callback<OnDeviceConnected> * onConnection,
Callback::Callback<OnDeviceConnectionFailure> * onFailure,
Callback::Callback<OnSetupFailure> * onSetupFailure)
Callback::Callback<OnSetupFailure> * onSetupFailure,
TransportPayloadCapability transportPayloadCapability)
{
CHIP_ERROR err = CHIP_NO_ERROR;
bool isConnected = false;

mTransportPayloadCapability = transportPayloadCapability;
//
// Always enqueue our user provided callbacks into our callback list.
// If anything goes wrong below, we'll trigger failures (including any queued from
Expand Down Expand Up @@ -180,15 +182,17 @@ void OperationalSessionSetup::Connect(Callback::Callback<OnDeviceConnected> * on
}

void OperationalSessionSetup::Connect(Callback::Callback<OnDeviceConnected> * onConnection,
Callback::Callback<OnDeviceConnectionFailure> * onFailure)
Callback::Callback<OnDeviceConnectionFailure> * onFailure,
TransportPayloadCapability transportPayloadCapability)
{
Connect(onConnection, onFailure, nullptr);
Connect(onConnection, onFailure, nullptr, transportPayloadCapability);
}

void OperationalSessionSetup::Connect(Callback::Callback<OnDeviceConnected> * onConnection,
Callback::Callback<OnSetupFailure> * onSetupFailure)
Callback::Callback<OnSetupFailure> * onSetupFailure,
TransportPayloadCapability transportPayloadCapability)
{
Connect(onConnection, nullptr, onSetupFailure);
Connect(onConnection, nullptr, onSetupFailure, transportPayloadCapability);
}

void OperationalSessionSetup::UpdateDeviceData(const Transport::PeerAddress & addr, const ReliableMessageProtocolConfig & config)
Expand Down Expand Up @@ -291,6 +295,21 @@ CHIP_ERROR OperationalSessionSetup::EstablishConnection(const ReliableMessagePro
mCASEClient = mClientPool->Allocate();
ReturnErrorCodeIf(mCASEClient == nullptr, CHIP_ERROR_NO_MEMORY);

// TODO: Combine LargePayload flag with DNS-SD advertisements from peer
if (mTransportPayloadCapability == TransportPayloadCapability::kLarge)
{
#if INET_CONFIG_ENABLE_TCP_ENDPOINT
// Set the transport type for carrying large payloads
mDeviceAddress.SetTransportType(chip::Transport::Type::kTcp);
#else
ChipLogError(Discovery, "Cannot setup session for large payload. Appropriate Transport unavailable");

CleanupCASEClient();

return CHIP_ERROR_UNSUPPORTED_CHIP_FEATURE;
#endif
}

CHIP_ERROR err = mCASEClient->EstablishSession(mInitParams, mPeerId, mDeviceAddress, config, this);
if (err != CHIP_NO_ERROR)
{
Expand Down
17 changes: 14 additions & 3 deletions src/app/OperationalSessionSetup.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,12 @@ class DLL_EXPORT OperationalSessionSetup : public SessionEstablishmentDelegate,
* `onFailure` may be called before the Connect call returns, for error
* cases that are detected synchronously (e.g. inability to start an address
* lookup).
*
* `transportPayloadCapability` is set to kLarge when the session needs to be established
* over a transport that allows large payloads to be transferred, e.g., TCP.
*/
void Connect(Callback::Callback<OnDeviceConnected> * onConnection, Callback::Callback<OnDeviceConnectionFailure> * onFailure);
void Connect(Callback::Callback<OnDeviceConnected> * onConnection, Callback::Callback<OnDeviceConnectionFailure> * onFailure,
TransportPayloadCapability transportPayloadCapability = TransportPayloadCapability::kDefault);

/*
* This function can be called to establish a secure session with the device.
Expand All @@ -219,8 +223,12 @@ class DLL_EXPORT OperationalSessionSetup : public SessionEstablishmentDelegate,
*
* `onSetupFailure` may be called before the Connect call returns, for error cases that are detected synchronously
* (e.g. inability to start an address lookup).
*
* `transportPayloadCapability` is set to kLarge when the session needs to be established
* over a transport that allows large payloads to be transferred, e.g., TCP.
*/
void Connect(Callback::Callback<OnDeviceConnected> * onConnection, Callback::Callback<OnSetupFailure> * onSetupFailure);
void Connect(Callback::Callback<OnDeviceConnected> * onConnection, Callback::Callback<OnSetupFailure> * onSetupFailure,
TransportPayloadCapability transportPayloadCapability = TransportPayloadCapability::kDefault);

bool IsForAddressUpdate() const { return mPerformingAddressUpdate; }

Expand Down Expand Up @@ -305,6 +313,8 @@ class DLL_EXPORT OperationalSessionSetup : public SessionEstablishmentDelegate,

bool mPerformingAddressUpdate = false;

TransportPayloadCapability mTransportPayloadCapability = TransportPayloadCapability::kDefault;

#if CHIP_DEVICE_CONFIG_ENABLE_AUTOMATIC_CASE_RETRIES
// When we TryNextResult on the resolver, it will synchronously call back
// into our OnNodeAddressResolved when it succeeds. We need to track
Expand Down Expand Up @@ -338,7 +348,8 @@ class DLL_EXPORT OperationalSessionSetup : public SessionEstablishmentDelegate,
void CleanupCASEClient();

void Connect(Callback::Callback<OnDeviceConnected> * onConnection, Callback::Callback<OnDeviceConnectionFailure> * onFailure,
Callback::Callback<OnSetupFailure> * onSetupFailure);
Callback::Callback<OnSetupFailure> * onSetupFailure,
TransportPayloadCapability transportPayloadCapability = TransportPayloadCapability::kDefault);

void EnqueueConnectionCallbacks(Callback::Callback<OnDeviceConnected> * onConnection,
Callback::Callback<OnDeviceConnectionFailure> * onFailure,
Expand Down
9 changes: 9 additions & 0 deletions src/app/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ using chip::Transport::BleListenParameters;
#endif
using chip::Transport::PeerAddress;
using chip::Transport::UdpListenParameters;
#if INET_CONFIG_ENABLE_TCP_ENDPOINT
using chip::Transport::TcpListenParameters;
#endif

namespace {

Expand Down Expand Up @@ -200,6 +203,12 @@ CHIP_ERROR Server::Init(const ServerInitParams & initParams)
#if CONFIG_NETWORK_LAYER_BLE
,
BleListenParameters(DeviceLayer::ConnectivityMgr().GetBleLayer())
#endif
#if INET_CONFIG_ENABLE_TCP_ENDPOINT
,
TcpListenParameters(DeviceLayer::TCPEndPointManager())
.SetAddressType(IPAddressType::kIPv6)
.SetListenPort(mOperationalServicePort)
#endif
);

Expand Down
10 changes: 10 additions & 0 deletions src/app/server/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ namespace chip {

inline constexpr size_t kMaxBlePendingPackets = 1;

#if INET_CONFIG_ENABLE_TCP_ENDPOINT
inline constexpr size_t kMaxTcpActiveConnectionCount = CHIP_CONFIG_DEVICE_MAX_ACTIVE_TCP_CONNECTIONS;

inline constexpr size_t kMaxTcpPendingPackets = CHIP_CONFIG_MAX_TCP_PENDING_PACKETS;
#endif // INET_CONFIG_ENABLE_TCP_ENDPOINT

//
// NOTE: Please do not alter the order of template specialization here as the logic
// in the Server impl depends on this.
Expand All @@ -89,6 +95,10 @@ using ServerTransportMgr = chip::TransportMgr<chip::Transport::UDP
#if CONFIG_NETWORK_LAYER_BLE
,
chip::Transport::BLE<kMaxBlePendingPackets>
#endif
#if INET_CONFIG_ENABLE_TCP_ENDPOINT
,
chip::Transport::TCP<kMaxTcpActiveConnectionCount, kMaxTcpPendingPackets>
#endif
>;

Expand Down
Loading

0 comments on commit 54916ab

Please sign in to comment.