Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
RogerZhongAWS committed Dec 12, 2023
1 parent f1e136f commit d3ca11d
Showing 1 changed file with 54 additions and 21 deletions.
75 changes: 54 additions & 21 deletions src/TcpAdapterProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ namespace aws { namespace iot { namespace securedtunneling {
{
if (tac.serviceId_to_tcp_server_map.find(service_id) == tac.serviceId_to_tcp_server_map.end())
{
if(tac.serviceId_to_tcp_server_map.find(tac.adapter_config.serviceId_to_endpoint_map.cbegin()->first) == tac.serviceId_to_tcp_server_map.end())
if (tac.serviceId_to_tcp_server_map.find(tac.adapter_config.serviceId_to_endpoint_map.cbegin()->first) == tac.serviceId_to_tcp_server_map.end())
{
BOOST_LOG_SEV(log, debug) << "No serviceId_to_tcp_server mapping for service_id: " << service_id;
return connection_ptr;
Expand Down Expand Up @@ -520,7 +520,7 @@ namespace aws { namespace iot { namespace securedtunneling {
tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id, connection_id);

// if simultaneous connections are not enabled, then send a stream reset
if (tac.adapter_config.is_v2_message_format)
if (tac.adapter_config.is_v2_message_format || tac.adapter_config.is_v1_message_format)
{
BOOST_LOG_SEV(log, info) << "simultaneous connections are not enabled, sending stream reset";
socket_connection->after_send_message = std::bind(&tcp_adapter_proxy::setup_tcp_socket, this, std::ref(tac), service_id);
Expand Down Expand Up @@ -606,10 +606,18 @@ namespace aws { namespace iot { namespace securedtunneling {

BOOST_LOG_SEV(log, debug) << "Sending stream start, setting new stream ID to: " << new_stream_id << ", service id: " << service_id;

if (tac.adapter_config.is_v1_message_format)
{
outgoing_message.set_serviceid("");
}
else
{
outgoing_message.set_serviceid(service_id);
}

outgoing_message.set_type(Message_Type_STREAM_START);
outgoing_message.set_serviceid("");
outgoing_message.set_streamid(new_stream_id);
//outgoing_message.set_connectionid(connection_id);
outgoing_message.set_connectionid(connection_id);
outgoing_message.set_ignorable(false);
outgoing_message.clear_payload();
async_send_message(tac, outgoing_message);
Expand Down Expand Up @@ -644,8 +652,16 @@ namespace aws { namespace iot { namespace securedtunneling {
}
std::int32_t stream_id = tac.serviceId_to_streamId_map[service_id];

if (tac.adapter_config.is_v1_message_format)
{
outgoing_message.set_serviceid("");
}
else
{
outgoing_message.set_serviceid(service_id);
}

outgoing_message.set_type(Message_Type_CONNECTION_START);
outgoing_message.set_serviceid(service_id);
outgoing_message.set_streamid(stream_id);
outgoing_message.set_connectionid(connection_id);
outgoing_message.set_ignorable(false);
Expand All @@ -663,12 +679,20 @@ namespace aws { namespace iot { namespace securedtunneling {
return;
}

if (tac.adapter_config.is_v1_message_format)
{
outgoing_message.set_serviceid("");
}
else
{
outgoing_message.set_serviceid(service_id);
}

// NOTE: serviceIds -> streamId mapping will be updated when send/receive stream start, no action needed now.
std::int32_t stream_id = tac.serviceId_to_streamId_map[service_id];
outgoing_message.set_type(Message_Type_STREAM_RESET);
outgoing_message.set_serviceid("");
outgoing_message.set_streamid(stream_id);
//outgoing_message.set_connectionid(0);
outgoing_message.set_connectionid(connection_id);
outgoing_message.set_ignorable(false);
outgoing_message.clear_payload();
async_send_message(tac, outgoing_message, service_id, connection_id);
Expand All @@ -683,10 +707,19 @@ namespace aws { namespace iot { namespace securedtunneling {
BOOST_LOG_SEV(log, warning) << "No stream id mapping found for service id " << service_id << " . Skip connection reset.";
return;
}

if (tac.adapter_config.is_v1_message_format)
{
outgoing_message.set_serviceid("");
}
else
{
outgoing_message.set_serviceid(service_id);
}

// NOTE: serviceIds -> streamId mapping will be updated when send/receive stream start, no action needed now.
std::int32_t stream_id = tac.serviceId_to_streamId_map[service_id];
outgoing_message.set_type(Message_Type_CONNECTION_RESET);
outgoing_message.set_serviceid(service_id);
outgoing_message.set_streamid(stream_id);
outgoing_message.set_connectionid(connection_id);
outgoing_message.set_ignorable(false);
Expand Down Expand Up @@ -1080,7 +1113,6 @@ namespace aws { namespace iot { namespace securedtunneling {
if (!connection_id)
{
BOOST_LOG_SEV(log, info) << "reverting to v2 message format";
connection_id = 0;
tac.adapter_config.is_v2_message_format = true;
}
string service_id = message.serviceid();
Expand Down Expand Up @@ -1337,7 +1369,6 @@ namespace aws { namespace iot { namespace securedtunneling {
if (!connection_id)
{
BOOST_LOG_SEV(log, info) << "reverting to v2 message format";
connection_id = 0;
tac.adapter_config.is_v2_message_format = true;
}
string service_id = message.serviceid();
Expand Down Expand Up @@ -1441,7 +1472,6 @@ namespace aws { namespace iot { namespace securedtunneling {
if (!connection_id)
{
BOOST_LOG_SEV(log, info) << "reverting to v2 message format";
connection_id = 0;
tac.adapter_config.is_v2_message_format = true;
}
/**
Expand Down Expand Up @@ -1573,7 +1603,6 @@ namespace aws { namespace iot { namespace securedtunneling {
if (!connection_id)
{
BOOST_LOG_SEV(log, info) << "reverting to v2 message format";
connection_id = 0;
tac.adapter_config.is_v2_message_format = true;
}
tcp_connection::pointer connection = get_tcp_connection(tac, service_id, connection_id);
Expand Down Expand Up @@ -1773,8 +1802,17 @@ namespace aws { namespace iot { namespace securedtunneling {
throw proxy_exception((boost::format("No streamId exists for the service Id %1%") % service_id).str());
}
BOOST_LOG_SEV(log, debug) << "Prepare to send data message: service id: " << service_id << " stream id: " << tac.serviceId_to_streamId_map[service_id] << " connection id: " << connection_id;

if (tac.adapter_config.is_v1_message_format)
{
outgoing_message.set_serviceid("");
}
else
{
outgoing_message.set_serviceid(service_id);
}

// Construct outgoing message
outgoing_message.set_serviceid("");
outgoing_message.set_streamid(tac.serviceId_to_streamId_map[service_id]);
//outgoing_message.set_connectionid(connection_id);
size_t const send_size = std::min<std::size_t>(GET_SETTING(settings, MESSAGE_MAX_PAYLOAD_SIZE),
Expand Down Expand Up @@ -1999,12 +2037,11 @@ namespace aws { namespace iot { namespace securedtunneling {
uint32_t new_connection_id = ++server->highest_connection_id;

// backward compatibility: set connection id to 1 if simultaneous connections is not enabled
if (tac.adapter_config.is_v2_message_format)
if (tac.adapter_config.is_v2_message_format || tac.adapter_config.is_v1_message_format)
{
BOOST_LOG_SEV(log, info) << "Falling back to older protocol, setting new connection id to 0";
new_connection_id = 0;
}
new_connection_id = 0;
BOOST_LOG_SEV(log, info) << "creating tcp connection id " << new_connection_id;

if (server->connectionId_to_tcp_connection_map.find(new_connection_id) == server->connectionId_to_tcp_connection_map.end() &&
Expand All @@ -2022,17 +2059,13 @@ namespace aws { namespace iot { namespace securedtunneling {
server->connectionId_to_tcp_connection_map[new_connection_id]->socket() = std::move(new_socket);
BOOST_LOG_SEV(log, info) << "Accepted tcp connection on port " << server->connectionId_to_tcp_connection_map[new_connection_id]->socket().local_endpoint().port() << " from " << server->connectionId_to_tcp_connection_map[new_connection_id]->socket().remote_endpoint();

if (true)
if (is_first_connection || tac.adapter_config.is_v1_message_format || tac.adapter_config.is_v2_message_format)
{
async_send_stream_start(tac, service_id, new_connection_id);
}
else if (!tac.adapter_config.is_v2_message_format)
{
async_send_connection_start(tac, service_id, new_connection_id);
}
else
{
BOOST_LOG_SEV(log, debug) << "Can not send stream start or connection start. Tried to use connection id: " << new_connection_id;
async_send_connection_start(tac, service_id, new_connection_id);
}

do_accept_tcp_connection(tac, retry_config, service_id, local_port, false);
Expand Down

0 comments on commit d3ca11d

Please sign in to comment.