Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadline timer cleanup #2921

Merged
merged 8 commits into from
Nov 16, 2021
20 changes: 10 additions & 10 deletions nano/core_test/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ TEST (socket, max_connections)

// start 3 clients, 2 will persist but 1 will be dropped

auto client1 = std::make_shared<nano::socket> (*node, boost::none);
auto client1 = std::make_shared<nano::socket> (*node);
client1->async_connect (dst_endpoint, connect_handler);

auto client2 = std::make_shared<nano::socket> (*node, boost::none);
auto client2 = std::make_shared<nano::socket> (*node);
client2->async_connect (dst_endpoint, connect_handler);

auto client3 = std::make_shared<nano::socket> (*node, boost::none);
auto client3 = std::make_shared<nano::socket> (*node);
client3->async_connect (dst_endpoint, connect_handler);

auto get_tcp_accept_failures = [&node] () {
Expand All @@ -70,10 +70,10 @@ TEST (socket, max_connections)

server_sockets[0].reset ();

auto client4 = std::make_shared<nano::socket> (*node, boost::none);
auto client4 = std::make_shared<nano::socket> (*node);
client4->async_connect (dst_endpoint, connect_handler);

auto client5 = std::make_shared<nano::socket> (*node, boost::none);
auto client5 = std::make_shared<nano::socket> (*node);
client5->async_connect (dst_endpoint, connect_handler);

ASSERT_TIMELY (5s, get_tcp_accept_failures () == 2);
Expand All @@ -87,13 +87,13 @@ TEST (socket, max_connections)
server_sockets[2].reset ();
ASSERT_EQ (server_sockets.size (), 3);

auto client6 = std::make_shared<nano::socket> (*node, boost::none);
auto client6 = std::make_shared<nano::socket> (*node);
client6->async_connect (dst_endpoint, connect_handler);

auto client7 = std::make_shared<nano::socket> (*node, boost::none);
auto client7 = std::make_shared<nano::socket> (*node);
client7->async_connect (dst_endpoint, connect_handler);

auto client8 = std::make_shared<nano::socket> (*node, boost::none);
auto client8 = std::make_shared<nano::socket> (*node);
client8->async_connect (dst_endpoint, connect_handler);

ASSERT_TIMELY (5s, get_tcp_accept_failures () == 3);
Expand Down Expand Up @@ -132,7 +132,7 @@ TEST (socket, drop_policy)
return true;
});

auto client = std::make_shared<nano::socket> (*node, boost::none);
auto client = std::make_shared<nano::socket> (*node);
nano::transport::channel_tcp channel{ *node, client };
nano::util::counted_completion write_completion (static_cast<unsigned> (total_message_count));

Expand Down Expand Up @@ -244,7 +244,7 @@ TEST (socket, concurrent_writes)
std::vector<std::shared_ptr<nano::socket>> clients;
for (unsigned i = 0; i < client_count; i++)
{
auto client = std::make_shared<nano::socket> (*node, boost::none);
auto client = std::make_shared<nano::socket> (*node);
clients.push_back (client);
client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), 25000),
[&connection_count_completion] (boost::system::error_code const & ec_a) {
Expand Down
4 changes: 2 additions & 2 deletions nano/node/bootstrap/bootstrap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void nano::bootstrap_server::stop ()
void nano::bootstrap_server::receive ()
{
// Increase timeout to receive TCP header (idle server socket)
socket->set_timeout (node->network_params.network.idle_timeout);
socket->timeout_set (node->network_params.network.idle_timeout);
auto this_l (shared_from_this ());
socket->async_read (receive_buffer, 8, [this_l] (boost::system::error_code const & ec, std::size_t size_a) {
// Set remote_endpoint
Expand All @@ -152,7 +152,7 @@ void nano::bootstrap_server::receive ()
this_l->remote_endpoint = this_l->socket->remote_endpoint ();
}
// Decrease timeout to default
this_l->socket->set_timeout (this_l->node->config.tcp_io_timeout);
this_l->socket->timeout_set (this_l->node->config.tcp_io_timeout);
// Receive header
this_l->receive_header_action (ec, size_a);
});
Expand Down
24 changes: 9 additions & 15 deletions nano/node/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,14 @@

#include <limits>

nano::socket::socket (nano::node & node_a, boost::optional<std::chrono::seconds> io_timeout_a) :
nano::socket::socket (nano::node & node_a) :
strand{ node_a.io_ctx.get_executor () },
tcp_socket{ node_a.io_ctx },
node{ node_a },
next_deadline{ std::numeric_limits<uint64_t>::max () },
last_completion_time{ 0 },
io_timeout{ io_timeout_a }
io_timeout{ node_a.config.tcp_io_timeout }
{
if (!io_timeout)
{
io_timeout = node_a.config.tcp_io_timeout;
}
}

nano::socket::~socket ()
Expand Down Expand Up @@ -108,7 +104,7 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std:

void nano::socket::start_timer ()
{
start_timer (io_timeout.get ());
start_timer (io_timeout);
}

void nano::socket::start_timer (std::chrono::seconds deadline_a)
Expand Down Expand Up @@ -156,12 +152,9 @@ bool nano::socket::has_timed_out () const
return timed_out;
}

void nano::socket::set_timeout (std::chrono::seconds io_timeout_a)
void nano::socket::timeout_set (std::chrono::seconds io_timeout_a)
{
auto this_l (shared_from_this ());
boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l, io_timeout_a] () {
this_l->io_timeout = io_timeout_a;
}));
io_timeout = io_timeout_a;
}

void nano::socket::close ()
Expand All @@ -177,7 +170,7 @@ void nano::socket::close_internal ()
{
if (!closed.exchange (true))
{
io_timeout = boost::none;
io_timeout = std::chrono::seconds (0);
boost::system::error_code ec;

// Ignore error code for shutdown as it is best-effort
Expand All @@ -202,11 +195,12 @@ nano::tcp_endpoint nano::socket::local_endpoint () const
}

nano::server_socket::server_socket (nano::node & node_a, boost::asio::ip::tcp::endpoint local_a, std::size_t max_connections_a) :
socket{ node_a, std::chrono::seconds::max () },
socket{ node_a },
acceptor{ node_a.io_ctx },
local{ local_a },
max_inbound_connections{ max_connections_a }
{
io_timeout = std::chrono::seconds::max ();
}

void nano::server_socket::start (boost::system::error_code & ec_a)
Expand Down Expand Up @@ -250,7 +244,7 @@ void nano::server_socket::on_connection (std::function<bool (std::shared_ptr<nan
}

// Prepare new connection
auto new_connection = std::make_shared<nano::socket> (this_l->node, boost::none);
auto new_connection = std::make_shared<nano::socket> (this_l->node);
this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote,
boost::asio::bind_executor (this_l->strand,
[this_l, new_connection, callback_a] (boost::system::error_code const & ec_a) {
Expand Down
6 changes: 3 additions & 3 deletions nano/node/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class socket : public std::enable_shared_from_this<nano::socket>
* @param io_timeout If tcp async operation is not completed within the timeout, the socket is closed. If not set, the tcp_io_timeout config option is used.
* @param concurrency write concurrency
*/
explicit socket (nano::node & node, boost::optional<std::chrono::seconds> io_timeout = boost::none);
explicit socket (nano::node & node);
virtual ~socket ();
void async_connect (boost::asio::ip::tcp::endpoint const &, std::function<void (boost::system::error_code const &)>);
void async_read (std::shared_ptr<std::vector<uint8_t>> const &, std::size_t, std::function<void (boost::system::error_code const &, std::size_t)>);
Expand All @@ -58,7 +58,7 @@ class socket : public std::enable_shared_from_this<nano::socket>
/** Returns true if the socket has timed out */
bool has_timed_out () const;
/** This can be called to change the maximum idle time, e.g. based on the type of traffic detected. */
void set_timeout (std::chrono::seconds io_timeout_a);
void timeout_set (std::chrono::seconds io_timeout_a);
void start_timer (std::chrono::seconds deadline_a);
bool max () const
{
Expand Down Expand Up @@ -96,7 +96,7 @@ class socket : public std::enable_shared_from_this<nano::socket>
std::atomic<uint64_t> next_deadline;
std::atomic<uint64_t> last_completion_time;
std::atomic<bool> timed_out{ false };
boost::optional<std::chrono::seconds> io_timeout;
std::atomic<std::chrono::seconds> io_timeout;
std::atomic<std::size_t> queue_size{ 0 };

/** Set by close() - completion handlers must check this. This is more reliable than checking
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
node.network.tcp_channels.udp_fallback (endpoint_a);
return;
}
auto socket = std::make_shared<nano::socket> (node, boost::none);
auto socket = std::make_shared<nano::socket> (node);
std::weak_ptr<nano::socket> socket_w (socket);
auto channel (std::make_shared<nano::transport::channel_tcp> (node, socket_w));
std::weak_ptr<nano::node> node_w (node.shared ());
Expand Down