diff --git a/src/TcpAdapterProxy.cpp b/src/TcpAdapterProxy.cpp index 7a3dfd8..bf9a371 100644 --- a/src/TcpAdapterProxy.cpp +++ b/src/TcpAdapterProxy.cpp @@ -253,7 +253,7 @@ namespace aws { namespace iot { namespace securedtunneling { { if (tac.serviceId_to_tcp_server_map.find(service_id) == tac.serviceId_to_tcp_server_map.end()) { - if(tac.serviceId_to_tcp_server_map.find(tac.adapter_config.serviceId_to_endpoint_map.cbegin()->first) == tac.serviceId_to_tcp_server_map.end()) + if (tac.serviceId_to_tcp_server_map.find(tac.adapter_config.serviceId_to_endpoint_map.cbegin()->first) == tac.serviceId_to_tcp_server_map.end()) { BOOST_LOG_SEV(log, debug) << "No serviceId_to_tcp_server mapping for service_id: " << service_id; return connection_ptr; @@ -520,7 +520,7 @@ namespace aws { namespace iot { namespace securedtunneling { tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id, connection_id); // if simultaneous connections are not enabled, then send a stream reset - if (tac.adapter_config.is_v2_message_format) + if (tac.adapter_config.is_v2_message_format || tac.adapter_config.is_v1_message_format) { BOOST_LOG_SEV(log, info) << "simultaneous connections are not enabled, sending stream reset"; socket_connection->after_send_message = std::bind(&tcp_adapter_proxy::setup_tcp_socket, this, std::ref(tac), service_id); @@ -606,10 +606,18 @@ namespace aws { namespace iot { namespace securedtunneling { BOOST_LOG_SEV(log, debug) << "Sending stream start, setting new stream ID to: " << new_stream_id << ", service id: " << service_id; + if (tac.adapter_config.is_v1_message_format) + { + outgoing_message.set_serviceid(""); + } + else + { + outgoing_message.set_serviceid(service_id); + } + outgoing_message.set_type(Message_Type_STREAM_START); - outgoing_message.set_serviceid(""); outgoing_message.set_streamid(new_stream_id); - //outgoing_message.set_connectionid(connection_id); + outgoing_message.set_connectionid(connection_id); outgoing_message.set_ignorable(false); outgoing_message.clear_payload(); async_send_message(tac, outgoing_message); @@ -644,8 +652,16 @@ namespace aws { namespace iot { namespace securedtunneling { } std::int32_t stream_id = tac.serviceId_to_streamId_map[service_id]; + if (tac.adapter_config.is_v1_message_format) + { + outgoing_message.set_serviceid(""); + } + else + { + outgoing_message.set_serviceid(service_id); + } + outgoing_message.set_type(Message_Type_CONNECTION_START); - outgoing_message.set_serviceid(service_id); outgoing_message.set_streamid(stream_id); outgoing_message.set_connectionid(connection_id); outgoing_message.set_ignorable(false); @@ -663,12 +679,20 @@ namespace aws { namespace iot { namespace securedtunneling { return; } + if (tac.adapter_config.is_v1_message_format) + { + outgoing_message.set_serviceid(""); + } + else + { + outgoing_message.set_serviceid(service_id); + } + // NOTE: serviceIds -> streamId mapping will be updated when send/receive stream start, no action needed now. std::int32_t stream_id = tac.serviceId_to_streamId_map[service_id]; outgoing_message.set_type(Message_Type_STREAM_RESET); - outgoing_message.set_serviceid(""); outgoing_message.set_streamid(stream_id); - //outgoing_message.set_connectionid(0); + outgoing_message.set_connectionid(connection_id); outgoing_message.set_ignorable(false); outgoing_message.clear_payload(); async_send_message(tac, outgoing_message, service_id, connection_id); @@ -683,10 +707,19 @@ namespace aws { namespace iot { namespace securedtunneling { BOOST_LOG_SEV(log, warning) << "No stream id mapping found for service id " << service_id << " . Skip connection reset."; return; } + + if (tac.adapter_config.is_v1_message_format) + { + outgoing_message.set_serviceid(""); + } + else + { + outgoing_message.set_serviceid(service_id); + } + // NOTE: serviceIds -> streamId mapping will be updated when send/receive stream start, no action needed now. std::int32_t stream_id = tac.serviceId_to_streamId_map[service_id]; outgoing_message.set_type(Message_Type_CONNECTION_RESET); - outgoing_message.set_serviceid(service_id); outgoing_message.set_streamid(stream_id); outgoing_message.set_connectionid(connection_id); outgoing_message.set_ignorable(false); @@ -1080,7 +1113,6 @@ namespace aws { namespace iot { namespace securedtunneling { if (!connection_id) { BOOST_LOG_SEV(log, info) << "reverting to v2 message format"; - connection_id = 0; tac.adapter_config.is_v2_message_format = true; } string service_id = message.serviceid(); @@ -1337,7 +1369,6 @@ namespace aws { namespace iot { namespace securedtunneling { if (!connection_id) { BOOST_LOG_SEV(log, info) << "reverting to v2 message format"; - connection_id = 0; tac.adapter_config.is_v2_message_format = true; } string service_id = message.serviceid(); @@ -1441,7 +1472,6 @@ namespace aws { namespace iot { namespace securedtunneling { if (!connection_id) { BOOST_LOG_SEV(log, info) << "reverting to v2 message format"; - connection_id = 0; tac.adapter_config.is_v2_message_format = true; } /** @@ -1573,7 +1603,6 @@ namespace aws { namespace iot { namespace securedtunneling { if (!connection_id) { BOOST_LOG_SEV(log, info) << "reverting to v2 message format"; - connection_id = 0; tac.adapter_config.is_v2_message_format = true; } tcp_connection::pointer connection = get_tcp_connection(tac, service_id, connection_id); @@ -1773,8 +1802,17 @@ namespace aws { namespace iot { namespace securedtunneling { throw proxy_exception((boost::format("No streamId exists for the service Id %1%") % service_id).str()); } BOOST_LOG_SEV(log, debug) << "Prepare to send data message: service id: " << service_id << " stream id: " << tac.serviceId_to_streamId_map[service_id] << " connection id: " << connection_id; + + if (tac.adapter_config.is_v1_message_format) + { + outgoing_message.set_serviceid(""); + } + else + { + outgoing_message.set_serviceid(service_id); + } + // Construct outgoing message - outgoing_message.set_serviceid(""); outgoing_message.set_streamid(tac.serviceId_to_streamId_map[service_id]); //outgoing_message.set_connectionid(connection_id); size_t const send_size = std::min(GET_SETTING(settings, MESSAGE_MAX_PAYLOAD_SIZE), @@ -1999,12 +2037,11 @@ namespace aws { namespace iot { namespace securedtunneling { uint32_t new_connection_id = ++server->highest_connection_id; // backward compatibility: set connection id to 1 if simultaneous connections is not enabled - if (tac.adapter_config.is_v2_message_format) + if (tac.adapter_config.is_v2_message_format || tac.adapter_config.is_v1_message_format) { BOOST_LOG_SEV(log, info) << "Falling back to older protocol, setting new connection id to 0"; new_connection_id = 0; } - new_connection_id = 0; BOOST_LOG_SEV(log, info) << "creating tcp connection id " << new_connection_id; if (server->connectionId_to_tcp_connection_map.find(new_connection_id) == server->connectionId_to_tcp_connection_map.end() && @@ -2022,17 +2059,13 @@ namespace aws { namespace iot { namespace securedtunneling { server->connectionId_to_tcp_connection_map[new_connection_id]->socket() = std::move(new_socket); BOOST_LOG_SEV(log, info) << "Accepted tcp connection on port " << server->connectionId_to_tcp_connection_map[new_connection_id]->socket().local_endpoint().port() << " from " << server->connectionId_to_tcp_connection_map[new_connection_id]->socket().remote_endpoint(); - if (true) + if (is_first_connection || tac.adapter_config.is_v1_message_format || tac.adapter_config.is_v2_message_format) { async_send_stream_start(tac, service_id, new_connection_id); } - else if (!tac.adapter_config.is_v2_message_format) - { - async_send_connection_start(tac, service_id, new_connection_id); - } else { - BOOST_LOG_SEV(log, debug) << "Can not send stream start or connection start. Tried to use connection id: " << new_connection_id; + async_send_connection_start(tac, service_id, new_connection_id); } do_accept_tcp_connection(tac, retry_config, service_id, local_port, false);