From 72dfc0407ee4c2f58270f53ef169bd602cc21698 Mon Sep 17 00:00:00 2001 From: clemahieu Date: Sat, 5 Sep 2020 22:29:51 +0200 Subject: [PATCH 1/5] Converting the socket write queue to an atomic number since strands already ensure non-concurrent execution. --- nano/core_test/socket.cpp | 12 +-- nano/node/socket.cpp | 169 +++++++++--------------------------- nano/node/socket.hpp | 32 ++----- nano/node/transport/tcp.cpp | 3 +- 4 files changed, 56 insertions(+), 160 deletions(-) diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index 24709ae2bf..99abca53a2 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -22,15 +22,15 @@ TEST (socket, drop_policy) // The total number of drops should thus be 1 (the socket allows doubling the queue size for no_socket_drop) size_t max_write_queue_size = 0; { - auto client_dummy (std::make_shared (node, boost::none, nano::socket::concurrency::multi_writer)); - max_write_queue_size = client_dummy->get_max_write_queue_size (); + auto client_dummy (std::make_shared (node, boost::none)); + max_write_queue_size = client_dummy->queue_size_max; } auto func = [&](size_t total_message_count, nano::buffer_drop_policy drop_policy) { auto server_port (nano::get_available_port ()); boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), server_port); - auto server_socket (std::make_shared (node, endpoint, 1, nano::socket::concurrency::multi_writer)); + auto server_socket (std::make_shared (node, endpoint, 1)); boost::system::error_code ec; server_socket->start (ec); ASSERT_FALSE (ec); @@ -41,7 +41,7 @@ TEST (socket, drop_policy) return true; }); - auto client (std::make_shared (node, boost::none, nano::socket::concurrency::multi_writer)); + auto client (std::make_shared (node, boost::none)); nano::util::counted_completion write_completion (total_message_count); client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), server_port), @@ -123,7 +123,7 @@ TEST (socket, concurrent_writes) boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), 25000); - auto server_socket (std::make_shared (node, endpoint, max_connections, nano::socket::concurrency::multi_writer)); + auto server_socket (std::make_shared (node, endpoint, max_connections)); boost::system::error_code ec; server_socket->start (ec); ASSERT_FALSE (ec); @@ -148,7 +148,7 @@ TEST (socket, concurrent_writes) std::vector> clients; for (unsigned i = 0; i < client_count; i++) { - auto client (std::make_shared (node, boost::none, nano::socket::concurrency::multi_writer)); + auto client (std::make_shared (node, boost::none)); clients.push_back (client); client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), 25000), [&connection_count_completion](boost::system::error_code const & ec_a) { diff --git a/nano/node/socket.cpp b/nano/node/socket.cpp index e2f6c3c050..4746e7c7e8 100644 --- a/nano/node/socket.cpp +++ b/nano/node/socket.cpp @@ -8,11 +8,10 @@ #include -nano::socket::socket (std::shared_ptr node_a, boost::optional io_timeout_a, nano::socket::concurrency concurrency_a) : +nano::socket::socket (std::shared_ptr node_a, boost::optional io_timeout_a) : strand (node_a->io_ctx.get_executor ()), tcp_socket (node_a->io_ctx), node (node_a), -writer_concurrency (concurrency_a), next_deadline (std::numeric_limits::max ()), last_completion_time (0), io_timeout (io_timeout_a) @@ -77,64 +76,53 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std: auto this_l (shared_from_this ()); if (!closed) { - if (writer_concurrency == nano::socket::concurrency::multi_writer) - { - boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, this_l, drop_policy_a]() { - if (!this_l->closed) + boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, this_l, drop_policy_a]() { + if (!this_l->closed) + { + if (this_l->queue_size < this_l->queue_size_max || (drop_policy_a == nano::buffer_drop_policy::no_socket_drop && this_l->queue_size < (this_l->queue_size_max * 2))) { - bool write_in_progress = !this_l->send_queue.empty (); - auto queue_size = this_l->send_queue.size (); - if (queue_size < this_l->queue_size_max || (drop_policy_a == nano::buffer_drop_policy::no_socket_drop && queue_size < (this_l->queue_size_max * 2))) - { - this_l->send_queue.emplace_back (nano::socket::queue_item{ buffer_a, callback_a }); - } - else if (auto node_l = this_l->node.lock ()) - { - if (drop_policy_a == nano::buffer_drop_policy::no_socket_drop) - { - node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out); - } - else - { - node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out); - } - - if (callback_a) + this_l->start_timer (); + ++this_l->queue_size; + nano::async_write (this_l->tcp_socket, buffer_a, + boost::asio::bind_executor (this_l->strand, + [buffer_a, callback_a, this_l](boost::system::error_code ec, std::size_t size_a) { + --this_l->queue_size; + if (auto node = this_l->node.lock ()) { - callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0); + node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a); + this_l->stop_timer (); + if (callback_a) + { + callback_a (ec, size_a); + } } + })); + } + else if (auto node_l = this_l->node.lock ()) + { + if (drop_policy_a == nano::buffer_drop_policy::no_socket_drop) + { + node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out); } - if (!write_in_progress) + else { - this_l->write_queued_messages (); + node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out); } - } - else - { + if (callback_a) { - callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); + callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0); } } - })); - } - else - { - start_timer (); - nano::async_write (tcp_socket, buffer_a, - boost::asio::bind_executor (strand, - [this_l, callback_a](boost::system::error_code const & ec, size_t size_a) { - if (auto node = this_l->node.lock ()) + } + else + { + if (callback_a) { - node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a); - this_l->stop_timer (); - if (callback_a) - { - callback_a (ec, size_a); - } + callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); } - })); - } + } + })); } else if (callback_a) { @@ -147,52 +135,6 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std: } } -void nano::socket::write_queued_messages () -{ - if (!closed) - { - std::weak_ptr this_w (shared_from_this ()); - auto msg (send_queue.front ()); - start_timer (); - nano::async_write (tcp_socket, msg.buffer, - boost::asio::bind_executor (strand, - [msg, this_w](boost::system::error_code ec, std::size_t size_a) { - if (auto this_l = this_w.lock ()) - { - if (auto node = this_l->node.lock ()) - { - node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a); - - this_l->stop_timer (); - - if (!this_l->closed) - { - if (msg.callback) - { - msg.callback (ec, size_a); - } - - this_l->send_queue.pop_front (); - if (!ec && !this_l->send_queue.empty ()) - { - this_l->write_queued_messages (); - } - else if (this_l->send_queue.empty ()) - { - // Idle TCP realtime client socket after writes - this_l->start_timer (node->network_params.node.idle_timeout); - } - } - else if (msg.callback) - { - msg.callback (ec, size_a); - } - } - } - })); - } -} - void nano::socket::start_timer () { if (auto node_l = node.lock ()) @@ -268,24 +210,6 @@ void nano::socket::close () })); } -void nano::socket::flush_send_queue_callbacks () -{ - while (!send_queue.empty ()) - { - auto & item = send_queue.front (); - if (item.callback) - { - if (auto node_l = node.lock ()) - { - node_l->background ([callback = std::move (item.callback)]() { - callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); - }); - } - } - send_queue.pop_front (); - } -} - // This must be called from a strand or the destructor void nano::socket::close_internal () { @@ -297,7 +221,6 @@ void nano::socket::close_internal () // Ignore error code for shutdown as it is best-effort tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec); tcp_socket.close (ec); - flush_send_queue_callbacks (); if (ec) { if (auto node_l = node.lock ()) @@ -314,18 +237,12 @@ nano::tcp_endpoint nano::socket::remote_endpoint () const return remote; } -void nano::socket::set_writer_concurrency (concurrency writer_concurrency_a) -{ - writer_concurrency = writer_concurrency_a; -} - -size_t nano::socket::get_max_write_queue_size () const -{ - return queue_size_max; -} - -nano::server_socket::server_socket (std::shared_ptr node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a, nano::socket::concurrency concurrency_a) : -socket (node_a, std::chrono::seconds::max (), concurrency_a), acceptor (node_a->io_ctx), local (local_a), deferred_accept_timer (node_a->io_ctx), max_inbound_connections (max_connections_a), concurrency_new_connections (concurrency_a) +nano::server_socket::server_socket (std::shared_ptr node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a) : +socket{ node_a, std::chrono::seconds::max () }, +acceptor{ node_a->io_ctx }, +local{ local_a }, +deferred_accept_timer{ node_a->io_ctx }, +max_inbound_connections{ max_connections_a } { } @@ -370,7 +287,7 @@ void nano::server_socket::on_connection (std::functionconnections.size () < this_l->max_inbound_connections) { // Prepare new connection - auto new_connection (std::make_shared (node_l->shared (), boost::none, this_l->concurrency_new_connections)); + auto new_connection (std::make_shared (node_l->shared (), boost::none)); this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote, boost::asio::bind_executor (this_l->strand, [this_l, new_connection, callback_a](boost::system::error_code const & ec_a) { diff --git a/nano/node/socket.hpp b/nano/node/socket.hpp index 399f9373ed..cb9e1e3f1c 100644 --- a/nano/node/socket.hpp +++ b/nano/node/socket.hpp @@ -33,25 +33,13 @@ class socket : public std::enable_shared_from_this friend class server_socket; public: - /** - * If multi_writer is used, overlapping writes are allowed, including from multiple threads. - * For bootstrapping, reading and writing alternates on a socket, thus single_writer - * should be used to avoid queueing overhead. For live messages, multiple threads may want - * to concurrenctly queue messages on the same socket, thus multi_writer should be used. - */ - enum class concurrency - { - single_writer, - multi_writer - }; - /** * Constructor * @param node Owning node * @param io_timeout If tcp async operation is not completed within the timeout, the socket is closed. If not set, the tcp_io_timeout config option is used. * @param concurrency write concurrency */ - explicit socket (std::shared_ptr node, boost::optional io_timeout = boost::none, concurrency = concurrency::single_writer); + explicit socket (std::shared_ptr node, boost::optional io_timeout = boost::none); virtual ~socket (); void async_connect (boost::asio::ip::tcp::endpoint const &, std::function); void async_read (std::shared_ptr>, size_t, std::function); @@ -64,10 +52,6 @@ class socket : public std::enable_shared_from_this /** This can be called to change the maximum idle time, e.g. based on the type of traffic detected. */ void set_timeout (std::chrono::seconds io_timeout_a); void start_timer (std::chrono::seconds deadline_a); - /** Change write concurrent */ - void set_writer_concurrency (concurrency writer_concurrency_a); - /** Returns the maximum number of buffers in the write queue */ - size_t get_max_write_queue_size () const; protected: /** Holds the buffer and callback for queued writes */ @@ -84,25 +68,23 @@ class socket : public std::enable_shared_from_this /** The other end of the connection */ boost::asio::ip::tcp::endpoint remote; - /** Send queue, protected by always being accessed in the strand */ - std::deque send_queue; - std::atomic writer_concurrency; std::atomic next_deadline; std::atomic last_completion_time; std::atomic timed_out{ false }; boost::optional io_timeout; - size_t const queue_size_max = 128; + std::atomic queue_size{ 0 }; /** Set by close() - completion handlers must check this. This is more reliable than checking error codes as the OS may have already completed the async operation. */ std::atomic closed{ false }; void close_internal (); - void write_queued_messages (); void start_timer (); void stop_timer (); void checkup (); - void flush_send_queue_callbacks (); + +public: + size_t const queue_size_max = 128; }; /** Socket class for TCP servers */ @@ -116,7 +98,7 @@ class server_socket final : public socket * @param max_connections_a Maximum number of concurrent connections * @param concurrency_a Write concurrency for new connections */ - explicit server_socket (std::shared_ptr node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a, concurrency concurrency_a = concurrency::single_writer); + explicit server_socket (std::shared_ptr node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a); /**Start accepting new connections */ void start (boost::system::error_code &); /** Stop accepting new connections */ @@ -134,8 +116,6 @@ class server_socket final : public socket boost::asio::ip::tcp::endpoint local; boost::asio::steady_timer deferred_accept_timer; size_t max_inbound_connections; - /** Concurrency setting for new connections */ - concurrency concurrency_new_connections; void evict_dead_connections (); }; } diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 8313fa2d12..93b9c2189b 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -290,7 +290,6 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa if (!node_id_a.is_zero ()) { // Add temporary channel - socket_a->set_writer_concurrency (nano::socket::concurrency::multi_writer); auto temporary_channel (std::make_shared (node, socket_a)); debug_assert (endpoint_a == temporary_channel->get_tcp_endpoint ()); temporary_channel->set_node_id (node_id_a); @@ -531,7 +530,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a node.network.tcp_channels.udp_fallback (endpoint_a, callback_a); return; } - auto socket (std::make_shared (node.shared_from_this (), boost::none, nano::socket::concurrency::multi_writer)); + auto socket (std::make_shared (node.shared_from_this (), boost::none)); std::weak_ptr socket_w (socket); auto channel (std::make_shared (node, socket_w)); std::weak_ptr node_w (node.shared ()); From b0685e883f3a74dda47a15add915f679e26d2b51 Mon Sep 17 00:00:00 2001 From: clemahieu Date: Tue, 8 Sep 2020 11:55:18 +0200 Subject: [PATCH 2/5] Moving responsibility for message drop policy in to tcp channel, rather than in the socket class itself. Having the socket class handle the limiter drop policy is a layering violation where the socket class knows about specific usages by its clients. This simplifies the socket class to be a more light weight wrapper around raw TCP sockets and reduces complexity. --- nano/core_test/socket.cpp | 15 +++++---- nano/node/socket.cpp | 49 +++++++++-------------------- nano/node/socket.hpp | 10 +++++- nano/node/transport/tcp.cpp | 51 ++++++++++++++++++++----------- nano/node/transport/transport.hpp | 2 +- 5 files changed, 68 insertions(+), 59 deletions(-) diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index 99abca53a2..d49767fa07 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -28,7 +28,7 @@ TEST (socket, drop_policy) auto func = [&](size_t total_message_count, nano::buffer_drop_policy drop_policy) { auto server_port (nano::get_available_port ()); - boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), server_port); + boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::any (), server_port); auto server_socket (std::make_shared (node, endpoint, 1)); boost::system::error_code ec; @@ -42,21 +42,24 @@ TEST (socket, drop_policy) }); auto client (std::make_shared (node, boost::none)); + nano::transport::channel_tcp channel{ *node, client }; nano::util::counted_completion write_completion (total_message_count); - client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), server_port), - [client, total_message_count, node, &write_completion, &drop_policy](boost::system::error_code const & ec_a) { + client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), server_port), + [&channel, total_message_count, node, &write_completion, &drop_policy, client](boost::system::error_code const & ec_a) mutable { for (int i = 0; i < total_message_count; i++) { std::vector buff (1); - client->async_write ( - nano::shared_const_buffer (std::move (buff)), [&write_completion](boost::system::error_code const & ec, size_t size_a) { + channel.send_buffer ( + nano::shared_const_buffer (std::move (buff)), [&write_completion, client](boost::system::error_code const & ec, size_t size_a) mutable { + client.reset (); write_completion.increment (); }, drop_policy); } }); - write_completion.await_count_for (std::chrono::seconds (5)); + ASSERT_FALSE (write_completion.await_count_for (std::chrono::seconds (5))); + ASSERT_EQ (1, client.use_count ()); }; func (max_write_queue_size * 2 + 1, nano::buffer_drop_policy::no_socket_drop); diff --git a/nano/node/socket.cpp b/nano/node/socket.cpp index 4746e7c7e8..61997c6794 100644 --- a/nano/node/socket.cpp +++ b/nano/node/socket.cpp @@ -71,49 +71,30 @@ void nano::socket::async_read (std::shared_ptr> buffer_a, s } } -void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::buffer_drop_policy drop_policy_a) +void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function const & callback_a) { auto this_l (shared_from_this ()); if (!closed) { - boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, this_l, drop_policy_a]() { + ++queue_size; + boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, this_l]() { if (!this_l->closed) { - if (this_l->queue_size < this_l->queue_size_max || (drop_policy_a == nano::buffer_drop_policy::no_socket_drop && this_l->queue_size < (this_l->queue_size_max * 2))) - { - this_l->start_timer (); - ++this_l->queue_size; - nano::async_write (this_l->tcp_socket, buffer_a, - boost::asio::bind_executor (this_l->strand, - [buffer_a, callback_a, this_l](boost::system::error_code ec, std::size_t size_a) { - --this_l->queue_size; - if (auto node = this_l->node.lock ()) + this_l->start_timer (); + nano::async_write (this_l->tcp_socket, buffer_a, + boost::asio::bind_executor (this_l->strand, + [buffer_a, callback_a, this_l](boost::system::error_code ec, std::size_t size_a) { + --this_l->queue_size; + if (auto node = this_l->node.lock ()) + { + node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a); + this_l->stop_timer (); + if (callback_a) { - node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a); - this_l->stop_timer (); - if (callback_a) - { - callback_a (ec, size_a); - } + callback_a (ec, size_a); } - })); - } - else if (auto node_l = this_l->node.lock ()) - { - if (drop_policy_a == nano::buffer_drop_policy::no_socket_drop) - { - node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out); } - else - { - node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out); - } - - if (callback_a) - { - callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0); - } - } + })); } else { diff --git a/nano/node/socket.hpp b/nano/node/socket.hpp index cb9e1e3f1c..4c4c39862a 100644 --- a/nano/node/socket.hpp +++ b/nano/node/socket.hpp @@ -43,7 +43,7 @@ class socket : public std::enable_shared_from_this virtual ~socket (); void async_connect (boost::asio::ip::tcp::endpoint const &, std::function); void async_read (std::shared_ptr>, size_t, std::function); - void async_write (nano::shared_const_buffer const &, std::function const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter); + void async_write (nano::shared_const_buffer const &, std::function const & = nullptr); void close (); boost::asio::ip::tcp::endpoint remote_endpoint () const; @@ -52,6 +52,14 @@ class socket : public std::enable_shared_from_this /** This can be called to change the maximum idle time, e.g. based on the type of traffic detected. */ void set_timeout (std::chrono::seconds io_timeout_a); void start_timer (std::chrono::seconds deadline_a); + bool max () const + { + return queue_size >= queue_size_max; + } + bool full () const + { + return queue_size >= queue_size_max * 2; + } protected: /** Holds the buffer and callback for queued writes */ diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 93b9c2189b..787a45741e 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -50,29 +50,46 @@ bool nano::transport::channel_tcp::operator== (nano::transport::channel const & return result; } -void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::buffer_drop_policy drop_policy_a) +void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::buffer_drop_policy policy_a) { if (auto socket_l = socket.lock ()) { - socket_l->async_write ( - buffer_a, [endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr (node.shared ()), callback_a](boost::system::error_code const & ec, size_t size_a) { - if (auto node_l = node.lock ()) - { - if (!ec) - { - node_l->network.tcp_channels.update (endpoint_a); - } - if (ec == boost::system::errc::host_unreachable) - { - node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out); - } - if (callback_a) + if (!socket_l->max () || (policy_a == nano::buffer_drop_policy::no_socket_drop && !socket_l->full ())) + { + socket_l->async_write ( + buffer_a, [endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr (node.shared ()), callback_a](boost::system::error_code const & ec, size_t size_a) { + if (auto node_l = node.lock ()) { - callback_a (ec, size_a); + if (!ec) + { + node_l->network.tcp_channels.update (endpoint_a); + } + if (ec == boost::system::errc::host_unreachable) + { + node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out); + } + if (callback_a) + { + callback_a (ec, size_a); + } } + }); + } + else + { + if (policy_a == nano::buffer_drop_policy::no_socket_drop) + { + node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out); } - }, - drop_policy_a); + else + { + node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out); + } + if (callback_a) + { + callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0); + } + } } else if (callback_a) { diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 831559560d..95e82754d4 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -41,7 +41,7 @@ namespace transport virtual ~channel () = default; virtual size_t hash_code () const = 0; virtual bool operator== (nano::transport::channel const &) const = 0; - void send (nano::message const &, std::function const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter); + void send (nano::message const & message_a, std::function const & callback_a = nullptr, nano::buffer_drop_policy policy_a = nano::buffer_drop_policy::limiter); virtual void send_buffer (nano::shared_const_buffer const &, std::function const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) = 0; virtual std::string to_string () const = 0; virtual nano::endpoint get_endpoint () const = 0; From c83b94c97f59ec08e50c7216c016bf825772a924 Mon Sep 17 00:00:00 2001 From: clemahieu Date: Tue, 8 Sep 2020 17:56:59 +0200 Subject: [PATCH 3/5] Making max value a static constexpr. --- nano/node/socket.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nano/node/socket.hpp b/nano/node/socket.hpp index 4c4c39862a..7e7505aede 100644 --- a/nano/node/socket.hpp +++ b/nano/node/socket.hpp @@ -92,7 +92,7 @@ class socket : public std::enable_shared_from_this void checkup (); public: - size_t const queue_size_max = 128; + static size_t constexpr queue_size_max = 128; }; /** Socket class for TCP servers */ From 5114ff2a9d7e65a552a6fc5aab7fd018cb17e088 Mon Sep 17 00:00:00 2001 From: clemahieu Date: Wed, 9 Sep 2020 14:05:18 +0200 Subject: [PATCH 4/5] Removing boost::optional for io_timeout as it's unnecessary. --- nano/core_test/socket.cpp | 6 ++--- nano/node/bootstrap/bootstrap_server.cpp | 4 ++-- nano/node/socket.cpp | 29 +++++++++++------------- nano/node/socket.hpp | 6 ++--- nano/node/transport/tcp.cpp | 2 +- 5 files changed, 22 insertions(+), 25 deletions(-) diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index d49767fa07..a8aafd104e 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -22,7 +22,7 @@ TEST (socket, drop_policy) // The total number of drops should thus be 1 (the socket allows doubling the queue size for no_socket_drop) size_t max_write_queue_size = 0; { - auto client_dummy (std::make_shared (node, boost::none)); + auto client_dummy (std::make_shared (node)); max_write_queue_size = client_dummy->queue_size_max; } @@ -41,7 +41,7 @@ TEST (socket, drop_policy) return true; }); - auto client (std::make_shared (node, boost::none)); + auto client (std::make_shared (node)); nano::transport::channel_tcp channel{ *node, client }; nano::util::counted_completion write_completion (total_message_count); @@ -151,7 +151,7 @@ TEST (socket, concurrent_writes) std::vector> clients; for (unsigned i = 0; i < client_count; i++) { - auto client (std::make_shared (node, boost::none)); + auto client (std::make_shared (node)); clients.push_back (client); client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), 25000), [&connection_count_completion](boost::system::error_code const & ec_a) { diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index 4badab5de3..02a2a1b9f8 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -151,7 +151,7 @@ void nano::bootstrap_server::stop () void nano::bootstrap_server::receive () { // Increase timeout to receive TCP header (idle server socket) - socket->set_timeout (node->network_params.node.idle_timeout); + socket->timeout_set (node->network_params.node.idle_timeout); auto this_l (shared_from_this ()); socket->async_read (receive_buffer, 8, [this_l](boost::system::error_code const & ec, size_t size_a) { // Set remote_endpoint @@ -160,7 +160,7 @@ void nano::bootstrap_server::receive () this_l->remote_endpoint = this_l->socket->remote_endpoint (); } // Decrease timeout to default - this_l->socket->set_timeout (this_l->node->config.tcp_io_timeout); + this_l->socket->timeout_set (this_l->node->config.tcp_io_timeout); // Receive header this_l->receive_header_action (ec, size_a); }); diff --git a/nano/node/socket.cpp b/nano/node/socket.cpp index 61997c6794..acae0a97de 100644 --- a/nano/node/socket.cpp +++ b/nano/node/socket.cpp @@ -8,18 +8,14 @@ #include -nano::socket::socket (std::shared_ptr node_a, boost::optional io_timeout_a) : -strand (node_a->io_ctx.get_executor ()), -tcp_socket (node_a->io_ctx), -node (node_a), -next_deadline (std::numeric_limits::max ()), -last_completion_time (0), -io_timeout (io_timeout_a) +nano::socket::socket (std::shared_ptr node_a) : +strand{ node_a->io_ctx.get_executor () }, +tcp_socket{ node_a->io_ctx }, +node{ node_a }, +next_deadline{ std::numeric_limits::max () }, +last_completion_time{ 0 }, +io_timeout{ node_a->config.tcp_io_timeout } { - if (!io_timeout) - { - io_timeout = node_a->config.tcp_io_timeout; - } } nano::socket::~socket () @@ -120,7 +116,7 @@ void nano::socket::start_timer () { if (auto node_l = node.lock ()) { - start_timer (io_timeout.get ()); + start_timer (io_timeout); } } @@ -175,7 +171,7 @@ bool nano::socket::has_timed_out () const return timed_out; } -void nano::socket::set_timeout (std::chrono::seconds io_timeout_a) +void nano::socket::timeout_set (std::chrono::seconds io_timeout_a) { auto this_l (shared_from_this ()); boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l, io_timeout_a]() { @@ -196,7 +192,7 @@ void nano::socket::close_internal () { if (!closed.exchange (true)) { - io_timeout = boost::none; + io_timeout = std::chrono::seconds (0); boost::system::error_code ec; // Ignore error code for shutdown as it is best-effort @@ -219,12 +215,13 @@ nano::tcp_endpoint nano::socket::remote_endpoint () const } nano::server_socket::server_socket (std::shared_ptr node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a) : -socket{ node_a, std::chrono::seconds::max () }, +socket{ node_a }, acceptor{ node_a->io_ctx }, local{ local_a }, deferred_accept_timer{ node_a->io_ctx }, max_inbound_connections{ max_connections_a } { + io_timeout = std::chrono::seconds::max (); } void nano::server_socket::start (boost::system::error_code & ec_a) @@ -268,7 +265,7 @@ void nano::server_socket::on_connection (std::functionconnections.size () < this_l->max_inbound_connections) { // Prepare new connection - auto new_connection (std::make_shared (node_l->shared (), boost::none)); + auto new_connection (std::make_shared (node_l->shared ())); this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote, boost::asio::bind_executor (this_l->strand, [this_l, new_connection, callback_a](boost::system::error_code const & ec_a) { diff --git a/nano/node/socket.hpp b/nano/node/socket.hpp index 7e7505aede..da5b5d4d6e 100644 --- a/nano/node/socket.hpp +++ b/nano/node/socket.hpp @@ -39,7 +39,7 @@ class socket : public std::enable_shared_from_this * @param io_timeout If tcp async operation is not completed within the timeout, the socket is closed. If not set, the tcp_io_timeout config option is used. * @param concurrency write concurrency */ - explicit socket (std::shared_ptr node, boost::optional io_timeout = boost::none); + explicit socket (std::shared_ptr node); virtual ~socket (); void async_connect (boost::asio::ip::tcp::endpoint const &, std::function); void async_read (std::shared_ptr>, size_t, std::function); @@ -50,7 +50,7 @@ class socket : public std::enable_shared_from_this /** Returns true if the socket has timed out */ bool has_timed_out () const; /** This can be called to change the maximum idle time, e.g. based on the type of traffic detected. */ - void set_timeout (std::chrono::seconds io_timeout_a); + void timeout_set (std::chrono::seconds io_timeout_a); void start_timer (std::chrono::seconds deadline_a); bool max () const { @@ -80,7 +80,7 @@ class socket : public std::enable_shared_from_this std::atomic next_deadline; std::atomic last_completion_time; std::atomic timed_out{ false }; - boost::optional io_timeout; + std::atomic io_timeout; std::atomic queue_size{ 0 }; /** Set by close() - completion handlers must check this. This is more reliable than checking diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 787a45741e..8ea3699d76 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -547,7 +547,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a node.network.tcp_channels.udp_fallback (endpoint_a, callback_a); return; } - auto socket (std::make_shared (node.shared_from_this (), boost::none)); + auto socket (std::make_shared (node.shared_from_this ())); std::weak_ptr socket_w (socket); auto channel (std::make_shared (node, socket_w)); std::weak_ptr node_w (node.shared ()); From 284218d275c60a69e97483dc3261ab7279d7c5bf Mon Sep 17 00:00:00 2001 From: clemahieu Date: Wed, 9 Sep 2020 18:26:02 +0200 Subject: [PATCH 5/5] With io_timeout being atomic it can be set without strand synchronization. --- nano/node/socket.cpp | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/nano/node/socket.cpp b/nano/node/socket.cpp index acae0a97de..a37de3b4b3 100644 --- a/nano/node/socket.cpp +++ b/nano/node/socket.cpp @@ -114,10 +114,7 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std: void nano::socket::start_timer () { - if (auto node_l = node.lock ()) - { - start_timer (io_timeout); - } + start_timer (io_timeout); } void nano::socket::start_timer (std::chrono::seconds deadline_a) @@ -173,10 +170,7 @@ bool nano::socket::has_timed_out () const void nano::socket::timeout_set (std::chrono::seconds io_timeout_a) { - auto this_l (shared_from_this ()); - boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l, io_timeout_a]() { - this_l->io_timeout = io_timeout_a; - })); + io_timeout = io_timeout_a; } void nano::socket::close ()