Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadline timer cleanup #2921

Merged
merged 8 commits into from
Nov 16, 2021
27 changes: 15 additions & 12 deletions nano/core_test/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ TEST (socket, drop_policy)
// The total number of drops should thus be 1 (the socket allows doubling the queue size for no_socket_drop)
size_t max_write_queue_size = 0;
{
auto client_dummy (std::make_shared<nano::socket> (node, boost::none, nano::socket::concurrency::multi_writer));
max_write_queue_size = client_dummy->get_max_write_queue_size ();
auto client_dummy (std::make_shared<nano::socket> (node));
max_write_queue_size = client_dummy->queue_size_max;
}

auto func = [&](size_t total_message_count, nano::buffer_drop_policy drop_policy) {
auto server_port (nano::get_available_port ());
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), server_port);
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::any (), server_port);

auto server_socket (std::make_shared<nano::server_socket> (node, endpoint, 1, nano::socket::concurrency::multi_writer));
auto server_socket (std::make_shared<nano::server_socket> (node, endpoint, 1));
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);
Expand All @@ -41,22 +41,25 @@ TEST (socket, drop_policy)
return true;
});

auto client (std::make_shared<nano::socket> (node, boost::none, nano::socket::concurrency::multi_writer));
auto client (std::make_shared<nano::socket> (node));
nano::transport::channel_tcp channel{ *node, client };
nano::util::counted_completion write_completion (total_message_count);

client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), server_port),
[client, total_message_count, node, &write_completion, &drop_policy](boost::system::error_code const & ec_a) {
client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), server_port),
[&channel, total_message_count, node, &write_completion, &drop_policy, client](boost::system::error_code const & ec_a) mutable {
for (int i = 0; i < total_message_count; i++)
{
std::vector<uint8_t> buff (1);
client->async_write (
nano::shared_const_buffer (std::move (buff)), [&write_completion](boost::system::error_code const & ec, size_t size_a) {
channel.send_buffer (
nano::shared_const_buffer (std::move (buff)), [&write_completion, client](boost::system::error_code const & ec, size_t size_a) mutable {
client.reset ();
write_completion.increment ();
},
drop_policy);
}
});
write_completion.await_count_for (std::chrono::seconds (5));
ASSERT_FALSE (write_completion.await_count_for (std::chrono::seconds (5)));
ASSERT_EQ (1, client.use_count ());
};

func (max_write_queue_size * 2 + 1, nano::buffer_drop_policy::no_socket_drop);
Expand Down Expand Up @@ -123,7 +126,7 @@ TEST (socket, concurrent_writes)

boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), 25000);

auto server_socket (std::make_shared<nano::server_socket> (node, endpoint, max_connections, nano::socket::concurrency::multi_writer));
auto server_socket (std::make_shared<nano::server_socket> (node, endpoint, max_connections));
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);
Expand All @@ -148,7 +151,7 @@ TEST (socket, concurrent_writes)
std::vector<std::shared_ptr<nano::socket>> clients;
for (unsigned i = 0; i < client_count; i++)
{
auto client (std::make_shared<nano::socket> (node, boost::none, nano::socket::concurrency::multi_writer));
auto client (std::make_shared<nano::socket> (node));
clients.push_back (client);
client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), 25000),
[&connection_count_completion](boost::system::error_code const & ec_a) {
Expand Down
4 changes: 2 additions & 2 deletions nano/node/bootstrap/bootstrap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ void nano::bootstrap_server::stop ()
void nano::bootstrap_server::receive ()
{
// Increase timeout to receive TCP header (idle server socket)
socket->set_timeout (node->network_params.node.idle_timeout);
socket->timeout_set (node->network_params.node.idle_timeout);
auto this_l (shared_from_this ());
socket->async_read (receive_buffer, 8, [this_l](boost::system::error_code const & ec, size_t size_a) {
// Set remote_endpoint
Expand All @@ -160,7 +160,7 @@ void nano::bootstrap_server::receive ()
this_l->remote_endpoint = this_l->socket->remote_endpoint ();
}
// Decrease timeout to default
this_l->socket->set_timeout (this_l->node->config.tcp_io_timeout);
this_l->socket->timeout_set (this_l->node->config.tcp_io_timeout);
// Receive header
this_l->receive_header_action (ec, size_a);
});
Expand Down
193 changes: 41 additions & 152 deletions nano/node/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,14 @@

#include <limits>

nano::socket::socket (std::shared_ptr<nano::node> node_a, boost::optional<std::chrono::seconds> io_timeout_a, nano::socket::concurrency concurrency_a) :
strand (node_a->io_ctx.get_executor ()),
tcp_socket (node_a->io_ctx),
node (node_a),
writer_concurrency (concurrency_a),
next_deadline (std::numeric_limits<uint64_t>::max ()),
last_completion_time (0),
io_timeout (io_timeout_a)
nano::socket::socket (std::shared_ptr<nano::node> node_a) :
strand{ node_a->io_ctx.get_executor () },
tcp_socket{ node_a->io_ctx },
node{ node_a },
next_deadline{ std::numeric_limits<uint64_t>::max () },
last_completion_time{ 0 },
io_timeout{ node_a->config.tcp_io_timeout }
{
if (!io_timeout)
{
io_timeout = node_a->config.tcp_io_timeout;
}
}

nano::socket::~socket ()
Expand Down Expand Up @@ -72,69 +67,39 @@ void nano::socket::async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, s
}
}

void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a, nano::buffer_drop_policy drop_policy_a)
void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a)
{
auto this_l (shared_from_this ());
if (!closed)
{
if (writer_concurrency == nano::socket::concurrency::multi_writer)
{
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, this_l, drop_policy_a]() {
if (!this_l->closed)
{
bool write_in_progress = !this_l->send_queue.empty ();
auto queue_size = this_l->send_queue.size ();
if (queue_size < this_l->queue_size_max || (drop_policy_a == nano::buffer_drop_policy::no_socket_drop && queue_size < (this_l->queue_size_max * 2)))
{
this_l->send_queue.emplace_back (nano::socket::queue_item{ buffer_a, callback_a });
}
else if (auto node_l = this_l->node.lock ())
++queue_size;
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, this_l]() {
if (!this_l->closed)
{
this_l->start_timer ();
SergiySW marked this conversation as resolved.
Show resolved Hide resolved
nano::async_write (this_l->tcp_socket, buffer_a,
boost::asio::bind_executor (this_l->strand,
[buffer_a, callback_a, this_l](boost::system::error_code ec, std::size_t size_a) {
--this_l->queue_size;
if (auto node = this_l->node.lock ())
{
if (drop_policy_a == nano::buffer_drop_policy::no_socket_drop)
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out);
}
else
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out);
}

node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a);
this_l->stop_timer ();
if (callback_a)
{
callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0);
callback_a (ec, size_a);
}
}
if (!write_in_progress)
{
this_l->write_queued_messages ();
}
}
else
{
if (callback_a)
{
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
}
}
}));
}
else
{
start_timer ();
nano::async_write (tcp_socket, buffer_a,
boost::asio::bind_executor (strand,
[this_l, callback_a](boost::system::error_code const & ec, size_t size_a) {
if (auto node = this_l->node.lock ())
}));
}
else
{
if (callback_a)
{
node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a);
this_l->stop_timer ();
if (callback_a)
{
callback_a (ec, size_a);
}
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
}
}));
}
}
}));
}
else if (callback_a)
{
Expand All @@ -147,58 +112,9 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std:
}
}

void nano::socket::write_queued_messages ()
{
if (!closed)
{
std::weak_ptr<nano::socket> this_w (shared_from_this ());
auto msg (send_queue.front ());
start_timer ();
nano::async_write (tcp_socket, msg.buffer,
boost::asio::bind_executor (strand,
[msg, this_w](boost::system::error_code ec, std::size_t size_a) {
if (auto this_l = this_w.lock ())
{
if (auto node = this_l->node.lock ())
{
node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a);

this_l->stop_timer ();

if (!this_l->closed)
{
if (msg.callback)
{
msg.callback (ec, size_a);
}

this_l->send_queue.pop_front ();
if (!ec && !this_l->send_queue.empty ())
{
this_l->write_queued_messages ();
}
else if (this_l->send_queue.empty ())
{
// Idle TCP realtime client socket after writes
this_l->start_timer (node->network_params.node.idle_timeout);
}
}
else if (msg.callback)
{
msg.callback (ec, size_a);
}
}
}
}));
}
}

void nano::socket::start_timer ()
{
if (auto node_l = node.lock ())
{
start_timer (io_timeout.get ());
}
start_timer (io_timeout);
}

void nano::socket::start_timer (std::chrono::seconds deadline_a)
Expand Down Expand Up @@ -252,12 +168,9 @@ bool nano::socket::has_timed_out () const
return timed_out;
}

void nano::socket::set_timeout (std::chrono::seconds io_timeout_a)
void nano::socket::timeout_set (std::chrono::seconds io_timeout_a)
{
auto this_l (shared_from_this ());
boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l, io_timeout_a]() {
this_l->io_timeout = io_timeout_a;
}));
io_timeout = io_timeout_a;
}

void nano::socket::close ()
Expand All @@ -268,36 +181,17 @@ void nano::socket::close ()
}));
}

void nano::socket::flush_send_queue_callbacks ()
{
while (!send_queue.empty ())
{
auto & item = send_queue.front ();
if (item.callback)
{
if (auto node_l = node.lock ())
{
node_l->background ([callback = std::move (item.callback)]() {
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
}
send_queue.pop_front ();
}
}

// This must be called from a strand or the destructor
void nano::socket::close_internal ()
{
if (!closed.exchange (true))
{
io_timeout = boost::none;
io_timeout = std::chrono::seconds (0);
boost::system::error_code ec;

// Ignore error code for shutdown as it is best-effort
tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec);
tcp_socket.close (ec);
flush_send_queue_callbacks ();
if (ec)
{
if (auto node_l = node.lock ())
Expand All @@ -314,19 +208,14 @@ nano::tcp_endpoint nano::socket::remote_endpoint () const
return remote;
}

void nano::socket::set_writer_concurrency (concurrency writer_concurrency_a)
{
writer_concurrency = writer_concurrency_a;
}

size_t nano::socket::get_max_write_queue_size () const
{
return queue_size_max;
}

nano::server_socket::server_socket (std::shared_ptr<nano::node> node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a, nano::socket::concurrency concurrency_a) :
socket (node_a, std::chrono::seconds::max (), concurrency_a), acceptor (node_a->io_ctx), local (local_a), deferred_accept_timer (node_a->io_ctx), max_inbound_connections (max_connections_a), concurrency_new_connections (concurrency_a)
nano::server_socket::server_socket (std::shared_ptr<nano::node> node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a) :
socket{ node_a },
acceptor{ node_a->io_ctx },
local{ local_a },
deferred_accept_timer{ node_a->io_ctx },
max_inbound_connections{ max_connections_a }
{
io_timeout = std::chrono::seconds::max ();
}

void nano::server_socket::start (boost::system::error_code & ec_a)
Expand Down Expand Up @@ -370,7 +259,7 @@ void nano::server_socket::on_connection (std::function<bool(std::shared_ptr<nano
if (this_l->connections.size () < this_l->max_inbound_connections)
{
// Prepare new connection
auto new_connection (std::make_shared<nano::socket> (node_l->shared (), boost::none, this_l->concurrency_new_connections));
auto new_connection (std::make_shared<nano::socket> (node_l->shared ()));
this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote,
boost::asio::bind_executor (this_l->strand,
[this_l, new_connection, callback_a](boost::system::error_code const & ec_a) {
Expand Down
Loading