Skip to content

Commit

Permalink
async tcp listerner
Browse files Browse the repository at this point in the history
  • Loading branch information
gr0vity committed Mar 27, 2024
1 parent f932b0e commit 9d0c054
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 139 deletions.
223 changes: 84 additions & 139 deletions nano/node/transport/tcp_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_
nano::transport::tcp_listener::~tcp_listener ()
{
// Thread should be stopped before destruction
debug_assert (!thread.joinable ());
debug_assert (!cleanup_thread.joinable ());
}

void nano::transport::tcp_listener::start (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a)
{
debug_assert (!thread.joinable ());
debug_assert (!cleanup_thread.joinable ());

try
Expand All @@ -50,33 +49,82 @@ void nano::transport::tcp_listener::start (std::function<bool (std::shared_ptr<n
catch (boost::system::system_error const & ex)
{
logger.critical (nano::log::type::tcp_listener, "Error while binding for incoming TCP: {} (port: {})", ex.what (), port);

throw std::runtime_error (ex.code ().message ());
}

thread = std::thread ([this, callback_a] {
cleanup_thread = std::thread ([this] {
nano::thread_role::set (nano::thread_role::name::tcp_listener);
try
{
logger.debug (nano::log::type::tcp_listener, "Starting acceptor thread");
run ();
logger.debug (nano::log::type::tcp_listener, "Stopped acceptor thread");
}
catch (std::exception const & ex)
run_cleanup ();
});

try
{
logger.debug (nano::log::type::tcp_listener, "Starting acceptor thread");
start_accept ();
logger.debug (nano::log::type::tcp_listener, "Stopped acceptor thread");
}
catch (std::exception const & ex)
{
logger.critical (nano::log::type::tcp_listener, "Error: {}", ex.what ());
release_assert (false); // Should be handled earlier
}
catch (...)
{
logger.critical (nano::log::type::tcp_listener, "Unknown error");
release_assert (false); // Should be handled earlier
}
}

void nano::transport::tcp_listener::start_accept ()
{
if (!acceptor.is_open () || stopped)
{
logger.debug (nano::log::type::tcp_listener, "Acceptor is not open or listening has been stopped.");
return;
}

// Need a way to limit max connections
// wait_available_slots(); // socket.max_connections AND socket.disconnection_of_silent_connections FAIL

auto socket = std::make_shared<boost::asio::ip::tcp::socket> (acceptor.get_executor ());

acceptor.async_accept (*socket, [this, socket] (const boost::system::error_code & ec) {
if (!ec && !stopped)
{
logger.critical (nano::log::type::tcp_listener, "Error: {}", ex.what ());
release_assert (false); // Should be handled earlier
auto remote_endpoint = socket->remote_endpoint ();
// Perform limit checks here before proceeding
if (check_limits (remote_endpoint.address ()) != accept_result::accepted)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_failure, nano::stat::dir::in);
socket->close ();
start_accept (); // Continue to accept next incoming connection
return;
}

stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in);
logger.debug (nano::log::type::tcp_listener, "Accepted incoming connection from: {}", fmt::streamed (remote_endpoint));

// If not over limit, proceed with establishing this connection
auto socket_l = std::make_shared<nano::transport::socket> (std::move (*socket), remote_endpoint, socket->local_endpoint (), node, nano::transport::socket_endpoint::server);
auto server_l = std::make_shared<nano::transport::tcp_server> (socket_l, node.shared (), true);

{
nano::lock_guard<nano::mutex> guard{ mutex };
connections.insert (entry{ remote_endpoint, socket_l });
}
socket_l->set_timeout (node.network_params.network.idle_timeout);
socket_l->start ();
server_l->start ();
connection_accepted.notify (socket_l, server_l);

logger.debug (nano::log::type::tcp_listener, "Connection added to the container.");
}
catch (...)
else if (ec != boost::asio::error::operation_aborted)
{
logger.critical (nano::log::type::tcp_listener, "Unknown error");
release_assert (false); // Should be handled earlier
logger.error (nano::log::type::tcp_listener, "Error on accept: {}", ec.message ());
}
});

cleanup_thread = std::thread ([this] {
nano::thread_role::set (nano::thread_role::name::tcp_listener);
run_cleanup ();
// Always ready to accept the next connection
start_accept ();
});
}

Expand All @@ -88,16 +136,8 @@ void nano::transport::tcp_listener::stop ()
stopped = true;
}
condition.notify_all ();
acceptor.close ();

// Schedule the acceptor to be closed on the IO context
boost::asio::post(acceptor.get_executor(), [this]() {
acceptor.close();
});

if (thread.joinable ())
{
thread.join ();
}
if (cleanup_thread.joinable ())
{
cleanup_thread.join ();
Expand All @@ -119,7 +159,9 @@ void nano::transport::tcp_listener::stop ()
{
server->stop ();
}
logger.debug (nano::log::type::tcp_listener, "tcp_listener::stop () for... server->stop ();");
}
logger.debug (nano::log::type::tcp_listener, "tcp_listener::stop () All connections closed");
}

void nano::transport::tcp_listener::run_cleanup ()
Expand Down Expand Up @@ -150,119 +192,22 @@ void nano::transport::tcp_listener::cleanup ()
});
}

void nano::transport::tcp_listener::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped && acceptor.is_open ())
{
lock.unlock ();

wait_available_slots ();

if (stopped)
{
return;
}

bool cooldown = false;
try
{
auto result = accept_one ();
if (result != accept_result::accepted)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_failure, nano::stat::dir::in);
// Refusal reason should be logged earlier
}
}
catch (boost::system::system_error const & ex)
{
if (ex.code() != boost::asio::error::operation_aborted)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_error, nano::stat::dir::in);
logger.log (stopped ? nano::log::level::debug : nano::log::level::error, // Avoid logging expected errors when stopping
nano::log::type::tcp_listener, "Error accepting incoming connection: {}", ex.what ());

cooldown = true;
}

}

lock.lock ();

// Sleep for a while to prevent busy loop with additional cooldown if an error occurred
condition.wait_for (lock, cooldown ? 100ms : 10ms, [this] () { return stopped.load (); });
}
if (!stopped)
{
debug_assert (false, "acceptor stopped unexpectedly");
logger.error (nano::log::type::tcp_listener, "Acceptor stopped unexpectedly");
}
}

auto nano::transport::tcp_listener::accept_one () -> accept_result
{
auto raw_socket = acceptor.accept ();
auto const remote_endpoint = raw_socket.remote_endpoint ();
auto const local_endpoint = raw_socket.local_endpoint ();

if (auto result = check_limits (remote_endpoint.address ()); result != accept_result::accepted)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_limits_exceeded, nano::stat::dir::in);
// Refusal reason should be logged earlier

try
{
// Best effor attempt to gracefully close the socket, shutdown before closing to avoid zombie sockets
raw_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both);
raw_socket.close ();
}
catch (boost::system::system_error const & ex)
{
logger.debug (nano::log::type::tcp_listener, "Error while closing socket after refusing connection: {}", ex.what ());
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::close_error, nano::stat::dir::in);
}

return result;
}

stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in);
logger.debug (nano::log::type::tcp_listener, "Accepted incoming connection from: {}", fmt::streamed (remote_endpoint));

auto socket = std::make_shared<nano::transport::socket> (std::move (raw_socket), remote_endpoint, local_endpoint, node, socket_endpoint::server);
auto server = std::make_shared<nano::transport::tcp_server> (socket, node.shared (), true);

{
nano::lock_guard<nano::mutex> lock{ mutex };
connections.emplace (entry{ remote_endpoint, socket, server });
}

socket->set_timeout (node.network_params.network.idle_timeout);
socket->start ();
server->start ();

connection_accepted.notify (socket, server);

return accept_result::accepted;
}

void nano::transport::tcp_listener::wait_available_slots ()
{
auto should_wait = [this] {
nano::lock_guard<nano::mutex> lock{ mutex };
return connections.size () >= max_inbound_connections;
};

nano::interval log_interval;
while (!stopped && should_wait ())
auto should_wait
= [this] {
nano::lock_guard<nano::mutex> lock{ mutex };
return connections.size () >= max_inbound_connections;
};
nano::interval log_interval;
while (!stopped && should_wait ())
{
if (log_interval.elapsed (node.network_params.network.is_dev_network () ? 1s : 15s))
{
if (log_interval.elapsed (node.network_params.network.is_dev_network () ? 1s : 15s))
{
logger.warn (nano::log::type::tcp_listener, "Waiting for available slots to accept new connections (current: {} / max: {})",
connection_count (), max_inbound_connections);
}

std::this_thread::sleep_for (100ms);
logger.warn (nano::log::type::tcp_listener, "Waiting for available slots to accept new connections (current: {} / max: {})",
connection_count (), max_inbound_connections);
}
std::this_thread::sleep_for (100ms);
}

auto nano::transport::tcp_listener::check_limits (boost::asio::ip::address const & ip) -> accept_result
Expand Down
1 change: 1 addition & 0 deletions nano/node/transport/tcp_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class tcp_listener final
void run_cleanup ();
void cleanup ();
void wait_available_slots ();
void start_accept ();

enum class accept_result
{
Expand Down

0 comments on commit 9d0c054

Please sign in to comment.