diff --git a/nano/node/transport/tcp_listener.cpp b/nano/node/transport/tcp_listener.cpp index 356112b28a..83f845d5f9 100644 --- a/nano/node/transport/tcp_listener.cpp +++ b/nano/node/transport/tcp_listener.cpp @@ -30,12 +30,11 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_ nano::transport::tcp_listener::~tcp_listener () { // Thread should be stopped before destruction - debug_assert (!thread.joinable ()); + debug_assert (!cleanup_thread.joinable ()); } void nano::transport::tcp_listener::start (std::function const &, boost::system::error_code const &)> callback_a) { - debug_assert (!thread.joinable ()); debug_assert (!cleanup_thread.joinable ()); try @@ -50,33 +49,82 @@ void nano::transport::tcp_listener::start (std::function (acceptor.get_executor ()); + + acceptor.async_accept (*socket, [this, socket] (const boost::system::error_code & ec) { + if (!ec && !stopped) { - logger.critical (nano::log::type::tcp_listener, "Error: {}", ex.what ()); - release_assert (false); // Should be handled earlier + auto remote_endpoint = socket->remote_endpoint (); + // Perform limit checks here before proceeding + if (check_limits (remote_endpoint.address ()) != accept_result::accepted) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_failure, nano::stat::dir::in); + socket->close (); + start_accept (); // Continue to accept next incoming connection + return; + } + + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in); + logger.debug (nano::log::type::tcp_listener, "Accepted incoming connection from: {}", fmt::streamed (remote_endpoint)); + + // If not over limit, proceed with establishing this connection + auto socket_l = std::make_shared (std::move (*socket), remote_endpoint, socket->local_endpoint (), node, nano::transport::socket_endpoint::server); + auto server_l = std::make_shared (socket_l, node.shared (), true); + + { + nano::lock_guard guard{ mutex }; + connections.insert (entry{ remote_endpoint, socket_l }); + } + socket_l->set_timeout (node.network_params.network.idle_timeout); + socket_l->start (); + server_l->start (); + connection_accepted.notify (socket_l, server_l); + + logger.debug (nano::log::type::tcp_listener, "Connection added to the container."); } - catch (...) + else if (ec != boost::asio::error::operation_aborted) { - logger.critical (nano::log::type::tcp_listener, "Unknown error"); - release_assert (false); // Should be handled earlier + logger.error (nano::log::type::tcp_listener, "Error on accept: {}", ec.message ()); } - }); - - cleanup_thread = std::thread ([this] { - nano::thread_role::set (nano::thread_role::name::tcp_listener); - run_cleanup (); + // Always ready to accept the next connection + start_accept (); }); } @@ -88,16 +136,8 @@ void nano::transport::tcp_listener::stop () stopped = true; } condition.notify_all (); + acceptor.close (); - // Schedule the acceptor to be closed on the IO context - boost::asio::post(acceptor.get_executor(), [this]() { - acceptor.close(); - }); - - if (thread.joinable ()) - { - thread.join (); - } if (cleanup_thread.joinable ()) { cleanup_thread.join (); @@ -119,7 +159,9 @@ void nano::transport::tcp_listener::stop () { server->stop (); } + logger.debug (nano::log::type::tcp_listener, "tcp_listener::stop () for... server->stop ();"); } + logger.debug (nano::log::type::tcp_listener, "tcp_listener::stop () All connections closed"); } void nano::transport::tcp_listener::run_cleanup () @@ -150,119 +192,22 @@ void nano::transport::tcp_listener::cleanup () }); } -void nano::transport::tcp_listener::run () -{ - nano::unique_lock lock{ mutex }; - while (!stopped && acceptor.is_open ()) - { - lock.unlock (); - - wait_available_slots (); - - if (stopped) - { - return; - } - - bool cooldown = false; - try - { - auto result = accept_one (); - if (result != accept_result::accepted) - { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_failure, nano::stat::dir::in); - // Refusal reason should be logged earlier - } - } - catch (boost::system::system_error const & ex) - { - if (ex.code() != boost::asio::error::operation_aborted) - { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_error, nano::stat::dir::in); - logger.log (stopped ? nano::log::level::debug : nano::log::level::error, // Avoid logging expected errors when stopping - nano::log::type::tcp_listener, "Error accepting incoming connection: {}", ex.what ()); - - cooldown = true; - } - - } - - lock.lock (); - - // Sleep for a while to prevent busy loop with additional cooldown if an error occurred - condition.wait_for (lock, cooldown ? 100ms : 10ms, [this] () { return stopped.load (); }); - } - if (!stopped) - { - debug_assert (false, "acceptor stopped unexpectedly"); - logger.error (nano::log::type::tcp_listener, "Acceptor stopped unexpectedly"); - } -} - -auto nano::transport::tcp_listener::accept_one () -> accept_result -{ - auto raw_socket = acceptor.accept (); - auto const remote_endpoint = raw_socket.remote_endpoint (); - auto const local_endpoint = raw_socket.local_endpoint (); - - if (auto result = check_limits (remote_endpoint.address ()); result != accept_result::accepted) - { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_limits_exceeded, nano::stat::dir::in); - // Refusal reason should be logged earlier - - try - { - // Best effor attempt to gracefully close the socket, shutdown before closing to avoid zombie sockets - raw_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both); - raw_socket.close (); - } - catch (boost::system::system_error const & ex) - { - logger.debug (nano::log::type::tcp_listener, "Error while closing socket after refusing connection: {}", ex.what ()); - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::close_error, nano::stat::dir::in); - } - - return result; - } - - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in); - logger.debug (nano::log::type::tcp_listener, "Accepted incoming connection from: {}", fmt::streamed (remote_endpoint)); - - auto socket = std::make_shared (std::move (raw_socket), remote_endpoint, local_endpoint, node, socket_endpoint::server); - auto server = std::make_shared (socket, node.shared (), true); - - { - nano::lock_guard lock{ mutex }; - connections.emplace (entry{ remote_endpoint, socket, server }); - } - - socket->set_timeout (node.network_params.network.idle_timeout); - socket->start (); - server->start (); - - connection_accepted.notify (socket, server); - - return accept_result::accepted; -} - void nano::transport::tcp_listener::wait_available_slots () -{ - auto should_wait = [this] { - nano::lock_guard lock{ mutex }; - return connections.size () >= max_inbound_connections; - }; - nano::interval log_interval; - while (!stopped && should_wait ()) +auto should_wait += [this] { + nano::lock_guard lock{ mutex }; + return connections.size () >= max_inbound_connections; + }; +nano::interval log_interval; +while (!stopped && should_wait ()) +{ + if (log_interval.elapsed (node.network_params.network.is_dev_network () ? 1s : 15s)) { - if (log_interval.elapsed (node.network_params.network.is_dev_network () ? 1s : 15s)) - { - logger.warn (nano::log::type::tcp_listener, "Waiting for available slots to accept new connections (current: {} / max: {})", - connection_count (), max_inbound_connections); - } - - std::this_thread::sleep_for (100ms); + logger.warn (nano::log::type::tcp_listener, "Waiting for available slots to accept new connections (current: {} / max: {})", + connection_count (), max_inbound_connections); } + std::this_thread::sleep_for (100ms); } auto nano::transport::tcp_listener::check_limits (boost::asio::ip::address const & ip) -> accept_result diff --git a/nano/node/transport/tcp_listener.hpp b/nano/node/transport/tcp_listener.hpp index 33652b4b15..486dd0120d 100644 --- a/nano/node/transport/tcp_listener.hpp +++ b/nano/node/transport/tcp_listener.hpp @@ -57,6 +57,7 @@ class tcp_listener final void run_cleanup (); void cleanup (); void wait_available_slots (); + void start_accept (); enum class accept_result {