Skip to content

Commit

Permalink
Refs #20342: Refactor on TCP Transport
Browse files Browse the repository at this point in the history
Signed-off-by: cferreiragonz <[email protected]>
  • Loading branch information
cferreiragonz committed Jun 13, 2024
1 parent b98b227 commit 43eb945
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 35 deletions.
1 change: 1 addition & 0 deletions include/fastdds/rtps/transport/TransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <fastdds/rtps/transport/SenderResource.h>
#include <fastdds/rtps/transport/TransportDescriptorInterface.h>
#include <fastdds/rtps/transport/TransportReceiverInterface.h>
#include <fastdds/rtps/network/NetworkBuffer.hpp>

namespace eprosima {
namespace fastdds {
Expand Down
8 changes: 4 additions & 4 deletions src/cpp/rtps/transport/TCPSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
transport.SenderResourceHasBeenClosed(locator_);
};

send_lambda_ = [this, &transport](
const fastrtps::rtps::octet* data,
uint32_t dataSize,
send_buffers_lambda_ = [this, &transport](
const std::list<NetworkBuffer>& buffers,
uint32_t total_bytes,
fastrtps::rtps::LocatorsIterator* destination_locators_begin,
fastrtps::rtps::LocatorsIterator* destination_locators_end,
const std::chrono::steady_clock::time_point&) -> bool
{
return transport.send(data, dataSize, locator_, destination_locators_begin,
return transport.send(buffers, total_bytes, locator_, destination_locators_begin,
destination_locators_end);
};
}
Expand Down
45 changes: 25 additions & 20 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,17 @@ bool TCPTransportInterface::check_crc(

void TCPTransportInterface::calculate_crc(
TCPHeader& header,
const octet* data,
uint32_t size) const
const std::list<NetworkBuffer>& buffers) const
{
uint32_t crc(0);
for (uint32_t i = 0; i < size; ++i)
for (const NetworkBuffer& buffer : buffers)
{
crc = RTCPMessageManager::addToCRC(crc, data[i]);
size_t size = buffer.size;
const octet* data = static_cast<const octet*>(buffer.buffer);
for (size_t i = 0; i < size; ++i)
{
crc = RTCPMessageManager::addToCRC(crc, data[i]);
}
}
header.crc = crc;
}
Expand Down Expand Up @@ -415,15 +419,15 @@ uint16_t TCPTransportInterface::create_acceptor_socket(

void TCPTransportInterface::fill_rtcp_header(
TCPHeader& header,
const octet* send_buffer,
uint32_t send_buffer_size,
const std::list<NetworkBuffer>& buffers,
uint32_t total_bytes,
uint16_t logical_port) const
{
header.length = send_buffer_size + static_cast<uint32_t>(TCPHeader::size());
header.length = total_bytes + static_cast<uint32_t>(TCPHeader::size());
header.logical_port = logical_port;
if (configuration()->calculate_crc)
{
calculate_crc(header, send_buffer, send_buffer_size);
calculate_crc(header, buffers);
}
}

Expand Down Expand Up @@ -1421,8 +1425,8 @@ bool TCPTransportInterface::Receive(
}

bool TCPTransportInterface::send(
const octet* send_buffer,
uint32_t send_buffer_size,
const std::list<NetworkBuffer>& buffers,
uint32_t total_bytes,
const fastrtps::rtps::Locator_t& locator,
fastrtps::rtps::LocatorsIterator* destination_locators_begin,
fastrtps::rtps::LocatorsIterator* destination_locators_end)
Expand All @@ -1435,7 +1439,7 @@ bool TCPTransportInterface::send(
{
if (IsLocatorSupported(*it))
{
ret &= send(send_buffer, send_buffer_size, locator, *it);
ret &= send(buffers, total_bytes, locator, *it);
}

++it;
Expand All @@ -1445,8 +1449,8 @@ bool TCPTransportInterface::send(
}

bool TCPTransportInterface::send(
const octet* send_buffer,
uint32_t send_buffer_size,
const std::list<NetworkBuffer>& buffers,
uint32_t total_bytes,
const fastrtps::rtps::Locator_t& locator,
const Locator& remote_locator)
{
Expand All @@ -1471,7 +1475,7 @@ bool TCPTransportInterface::send(
}
}

if (locator_mismatch || send_buffer_size > configuration()->sendBufferSize)
if (locator_mismatch || total_bytes > configuration()->sendBufferSize)
{
return false;
}
Expand Down Expand Up @@ -1522,21 +1526,22 @@ bool TCPTransportInterface::send(
scoped_lock.lock();
}
TCPHeader tcp_header;
statistics_info_.set_statistics_message_data(remote_locator, send_buffer, send_buffer_size);
fill_rtcp_header(tcp_header, send_buffer, send_buffer_size, logical_port);
// TODO Carlos: Handle statistics buffer message
// statistics_info_.set_statistics_message_data(remote_locator, send_buffer, send_buffer_size);
fill_rtcp_header(tcp_header, buffers, total_bytes, logical_port);
{
asio::error_code ec;
size_t sent = channel->send(
(octet*)&tcp_header,
static_cast<uint32_t>(TCPHeader::size()),
send_buffer,
send_buffer_size,
buffers,
total_bytes,
ec);

if (sent != static_cast<uint32_t>(TCPHeader::size() + send_buffer_size) || ec)
if (sent != static_cast<uint32_t>(TCPHeader::size() + total_bytes) || ec)
{
EPROSIMA_LOG_WARNING(DEBUG, "Failed to send RTCP message (" << sent << " of " <<
TCPHeader::size() + send_buffer_size << " b): " << ec.message());
TCPHeader::size() + total_bytes << " b): " << ec.message());
success = false;
}
else
Expand Down
21 changes: 10 additions & 11 deletions src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,12 @@ class TCPTransportInterface : public TransportInterface

void calculate_crc(
TCPHeader& header,
const fastrtps::rtps::octet* data,
uint32_t size) const;
const std::list<NetworkBuffer>& buffers) const;

void fill_rtcp_header(
TCPHeader& header,
const fastrtps::rtps::octet* send_buffer,
uint32_t send_buffer_size,
const std::list<NetworkBuffer>& buffers,
uint32_t total_bytes,
uint16_t logical_port) const;

//! Closes the given p_channel_resource and unbind it from every resource.
Expand Down Expand Up @@ -225,8 +224,8 @@ class TCPTransportInterface : public TransportInterface
* There must exist a channel bound to the locator, otherwise the send will be skipped.
*/
bool send(
const fastrtps::rtps::octet* send_buffer,
uint32_t send_buffer_size,
const std::list<NetworkBuffer>& buffers,
uint32_t total_bytes,
const eprosima::fastrtps::rtps::Locator_t& locator,
const Locator& remote_locator);

Expand Down Expand Up @@ -389,9 +388,9 @@ class TCPTransportInterface : public TransportInterface
Locator& remote_locator);

/**
* Blocking Send through the channel inside channel_resources_ matching the locator provided.
* @param send_buffer Slice into the raw data to send.
* @param send_buffer_size Size of the raw data. It will be used as a bounds check for the previous argument.
* Blocking Send through the specified channel.
* @param buffers List of buffers to send.
* @param total_bytes Total amount of bytes to send. It will be used as a bounds check for the previous argument.
* It must not exceed the send_buffer_size fed to this class during construction.
* @param locator Physical locator we're sending to.
* @param destination_locators_begin pointer to destination locators iterator begin, the iterator can be advanced inside this fuction
Expand All @@ -400,8 +399,8 @@ class TCPTransportInterface : public TransportInterface
* so should not be reuse.
*/
bool send(
const fastrtps::rtps::octet* send_buffer,
uint32_t send_buffer_size,
const std::list<NetworkBuffer>& buffers,
uint32_t total_bytes,
const fastrtps::rtps::Locator_t& locator,
fastrtps::rtps::LocatorsIterator* destination_locators_begin,
fastrtps::rtps::LocatorsIterator* destination_locators_end);
Expand Down

0 comments on commit 43eb945

Please sign in to comment.