Skip to content

Commit

Permalink
Networking cleanup continued (#4495)
Browse files Browse the repository at this point in the history
* Organize

* Renamings

* Dedicated thread for tcp keepalives

* Rename 'reachout' to `track_reachout`

* Dedicated network reachout thread

* Merge tcp channels and network keepalive loops

* Unused

* Random number generator helper

* Properly close channel container

* Rework channel purging

* Reverse track_reachout return value

* Rework `add_initial_peers`

* Stop network component last
  • Loading branch information
pwojcikdev authored Mar 18, 2024
1 parent 5891121 commit 9b38bb2
Show file tree
Hide file tree
Showing 18 changed files with 357 additions and 254 deletions.
10 changes: 5 additions & 5 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ TEST (network, peer_max_tcp_attempts)
node->network.merge_peer (node2->network.endpoint ());
}
ASSERT_EQ (0, node->network.size ());
ASSERT_TRUE (node->network.tcp_channels.reachout (nano::endpoint (node->network.endpoint ().address (), system.get_available_port ())));
ASSERT_FALSE (node->network.tcp_channels.track_reachout (nano::endpoint (node->network.endpoint ().address (), system.get_available_port ())));
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::out));
}
#endif
Expand All @@ -779,11 +779,11 @@ namespace transport
{
auto address (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0x7f000001 + i))); // 127.0.0.1 hex
nano::endpoint endpoint (address, system.get_available_port ());
ASSERT_FALSE (node->network.tcp_channels.reachout (endpoint));
ASSERT_TRUE (node->network.tcp_channels.track_reachout (endpoint));
}
ASSERT_EQ (0, node->network.size ());
ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::out));
ASSERT_TRUE (node->network.tcp_channels.reachout (nano::endpoint (boost::asio::ip::make_address_v6 ("::ffff:127.0.0.1"), system.get_available_port ())));
ASSERT_FALSE (node->network.tcp_channels.track_reachout (nano::endpoint (boost::asio::ip::make_address_v6 ("::ffff:127.0.0.1"), system.get_available_port ())));
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::out));
}
}
Expand Down Expand Up @@ -974,7 +974,7 @@ TEST (network, tcp_no_connect_excluded_peers)
ASSERT_EQ (nullptr, node0->network.find_node_id (node1->get_node_id ()));

// Should not actively reachout to excluded peers
ASSERT_TRUE (node0->network.reachout (node1->network.endpoint (), true));
ASSERT_FALSE (node0->network.track_reachout (node1->network.endpoint ()));

// Erasing from excluded peers should allow a connection
node0->network.excluded_peers.remove (endpoint1_tcp);
Expand Down Expand Up @@ -1080,7 +1080,7 @@ TEST (network, cleanup_purge)
ASSERT_EQ (1, node1.network.size ());

node1.network.cleanup (std::chrono::steady_clock::now ());
ASSERT_EQ (0, node1.network.size ());
ASSERT_TIMELY_EQ (5s, 0, node1.network.size ());
}

TEST (network, loopback_channel)
Expand Down
11 changes: 6 additions & 5 deletions nano/core_test/peer_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,18 +217,19 @@ TEST (peer_container, reachout)
auto outer_node1 = nano::test::add_outer_node (system);
ASSERT_NE (nullptr, nano::test::establish_tcp (system, node1, outer_node1->network.endpoint ()));
// Make sure having been contacted by them already indicates we shouldn't reach out
ASSERT_TRUE (node1.network.reachout (outer_node1->network.endpoint ()));
ASSERT_FALSE (node1.network.track_reachout (outer_node1->network.endpoint ()));
auto outer_node2 = nano::test::add_outer_node (system);
ASSERT_FALSE (node1.network.reachout (outer_node2->network.endpoint ()));
ASSERT_TRUE (node1.network.track_reachout (outer_node2->network.endpoint ()));
ASSERT_NE (nullptr, nano::test::establish_tcp (system, node1, outer_node2->network.endpoint ()));
// Reaching out to them once should signal we shouldn't reach out again.
ASSERT_TRUE (node1.network.reachout (outer_node2->network.endpoint ()));
ASSERT_FALSE (node1.network.track_reachout (outer_node2->network.endpoint ()));
// Make sure we don't purge new items
node1.network.cleanup (std::chrono::steady_clock::now () - std::chrono::seconds (10));
ASSERT_TRUE (node1.network.reachout (outer_node2->network.endpoint ()));
ASSERT_FALSE (node1.network.track_reachout (outer_node2->network.endpoint ()));
// Make sure we purge old items
node1.network.cleanup (std::chrono::steady_clock::now () + std::chrono::seconds (10));
ASSERT_FALSE (node1.network.reachout (outer_node2->network.endpoint ()));
ASSERT_TIMELY (5s, node1.network.empty ());
ASSERT_TRUE (node1.network.track_reachout (outer_node2->network.endpoint ()));
}

// This test is similar to network.filter_invalid_version_using with the difference that
Expand Down
4 changes: 2 additions & 2 deletions nano/core_test/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,15 +420,15 @@ TEST (socket, drop_policy)
});

auto client = std::make_shared<nano::transport::socket> (*node);
nano::transport::channel_tcp channel{ *node, client };
auto channel = std::make_shared<nano::transport::channel_tcp> (*node, client);
nano::test::counted_completion write_completion (static_cast<unsigned> (total_message_count));

client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), listener->endpoint ().port ()),
[&channel, total_message_count, node, &write_completion, &drop_policy, client] (boost::system::error_code const & ec_a) mutable {
for (int i = 0; i < total_message_count; i++)
{
std::vector<uint8_t> buff (1);
channel.send_buffer (
channel->send_buffer (
nano::shared_const_buffer (std::move (buff)), [&write_completion, client] (boost::system::error_code const & ec, size_t size_a) mutable {
client.reset ();
write_completion.increment ();
Expand Down
1 change: 1 addition & 0 deletions nano/lib/logging_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ enum class type
tcp,
tcp_server,
tcp_listener,
tcp_channels,
prunning,
conf_processor_bounded,
conf_processor_unbounded,
Expand Down
31 changes: 31 additions & 0 deletions nano/lib/random.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

#include <random>

namespace nano
{
/**
* Not safe for any crypto related code, use for non-crypto PRNG only.
*/
class random_generator final
{
public:
/// Generate a random number in the range [min, max)
auto random (auto min, auto max)
{
release_assert (min < max);
std::uniform_int_distribution<decltype (min)> dist (min, max - 1);
return dist (rng);
}

/// Generate a random number in the range [0, max)
auto random (auto max)
{
return random (decltype (max){ 0 }, max);
}

private:
std::random_device device;
std::default_random_engine rng{ device () };
};
}
7 changes: 6 additions & 1 deletion nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ enum class type : uint8_t
http_callback,
ipc,
tcp,
tcp_channels,
channel,
socket,
confirmation_height,
Expand Down Expand Up @@ -70,7 +71,6 @@ enum class detail : uint8_t
ok,
loop,
loop_cleanup,
loop_keepalive,
total,
process,
processed,
Expand Down Expand Up @@ -216,6 +216,11 @@ enum class detail : uint8_t
message_size_too_big,
outdated_version,

// network
loop_keepalive,
loop_reachout,
merge_peer,

// tcp
tcp_accept_success,
tcp_accept_failure,
Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::network_keepalive:
thread_role_name_string = "Net keepalive";
break;
case nano::thread_role::name::network_reachout:
thread_role_name_string = "Net reachout";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ enum class name
rep_tiers,
network_cleanup,
network_keepalive,
network_reachout,
};

/*
Expand Down
68 changes: 56 additions & 12 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ void nano::network::start ()
run_keepalive ();
});

reachout_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::network_reachout);
run_reachout ();
});

if (!node.flags.disable_tcp_realtime)
{
tcp_channels.start ();
Expand Down Expand Up @@ -87,6 +92,10 @@ void nano::network::stop ()
{
cleanup_thread.join ();
}
if (reachout_thread.joinable ())
{
reachout_thread.join ();
}

port = 0;
}
Expand Down Expand Up @@ -126,12 +135,11 @@ void nano::network::run_cleanup ()
while (!stopped)
{
condition.wait_for (lock, node.network_params.network.is_dev_network () ? 1s : 5s);
lock.unlock ();

if (stopped)
{
return;
}
lock.unlock ();

node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_cleanup);

Expand All @@ -154,18 +162,54 @@ void nano::network::run_keepalive ()
while (!stopped)
{
condition.wait_for (lock, node.network_params.network.keepalive_period);
lock.unlock ();

if (stopped)
{
return;
}
lock.unlock ();

node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_keepalive);

flood_keepalive (0.75f);
flood_keepalive_self (0.25f);

tcp_channels.keepalive ();

lock.lock ();
}
}

void nano::network::run_reachout ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait_for (lock, node.network_params.network.merge_period);
if (stopped)
{
return;
}
lock.unlock ();

node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_reachout);

auto keepalive = tcp_channels.sample_keepalive ();
if (keepalive)
{
for (auto const & peer : keepalive->peers)
{
if (stopped)
{
return;
}

merge_peer (peer);

// Throttle reachout attempts
std::this_thread::sleep_for (node.network_params.network.merge_period);
}
}

lock.lock ();
}
}
Expand Down Expand Up @@ -411,10 +455,11 @@ void nano::network::merge_peers (std::array<nano::endpoint, 8> const & peers_a)

void nano::network::merge_peer (nano::endpoint const & peer_a)
{
if (!reachout (peer_a, node.config.allow_local_peers))
if (track_reachout (peer_a))
{
std::weak_ptr<nano::node> node_w (node.shared ());
node.network.tcp_channels.start_tcp (peer_a);
node.stats.inc (nano::stat::type::network, nano::stat::detail::merge_peer);

tcp_channels.start_tcp (peer_a);
}
}

Expand All @@ -436,15 +481,14 @@ bool nano::network::not_a_peer (nano::endpoint const & endpoint_a, bool allow_lo
return result;
}

bool nano::network::reachout (nano::endpoint const & endpoint_a, bool allow_local_peers)
bool nano::network::track_reachout (nano::endpoint const & endpoint_a)
{
// Don't contact invalid IPs
bool error = not_a_peer (endpoint_a, allow_local_peers);
if (!error)
if (not_a_peer (endpoint_a, node.config.allow_local_peers))
{
error = tcp_channels.reachout (endpoint_a);
return false;
}
return error;
return tcp_channels.track_reachout (endpoint_a);
}

std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (std::size_t count_a, uint8_t minimum_version_a, bool include_tcp_temporary_channels_a)
Expand Down
8 changes: 5 additions & 3 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ class network final
void send_keepalive_self (std::shared_ptr<nano::transport::channel> const &);
std::shared_ptr<nano::transport::channel> find_node_id (nano::account const &);
std::shared_ptr<nano::transport::channel> find_channel (nano::endpoint const &);
bool not_a_peer (nano::endpoint const &, bool);
// Should we reach out to this endpoint with a keepalive message
bool reachout (nano::endpoint const &, bool = false);
bool not_a_peer (nano::endpoint const &, bool allow_local_peers);
// Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt
bool track_reachout (nano::endpoint const &);
std::deque<std::shared_ptr<nano::transport::channel>> list (std::size_t max_count = 0, uint8_t = 0, bool = true);
std::deque<std::shared_ptr<nano::transport::channel>> list_non_pr (std::size_t);
// Desired fanout for a given scale
Expand Down Expand Up @@ -111,6 +111,7 @@ class network final
void run_processing ();
void run_cleanup ();
void run_keepalive ();
void run_reachout ();
void process_message (nano::message const &, std::shared_ptr<nano::transport::channel> const &);

private: // Dependencies
Expand All @@ -137,6 +138,7 @@ class network final
std::vector<boost::thread> processing_threads; // Using boost::thread to enable increased stack size
std::thread cleanup_thread;
std::thread keepalive_thread;
std::thread reachout_thread;

public:
static unsigned const broadcast_interval_ms = 10;
Expand Down
20 changes: 14 additions & 6 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,6 @@ void nano::node::stop ()
generator.stop ();
final_generator.stop ();
confirmation_height_processor.stop ();
network.stop ();
telemetry.stop ();
websocket.stop ();
bootstrap_server.stop ();
Expand All @@ -696,6 +695,8 @@ void nano::node::stop ()
epoch_upgrader.stop ();
workers.stop ();
local_block_broadcaster.stop ();
network.stop (); // Stop network last to avoid killing in-use sockets

// work pool is not stopped on purpose due to testing setup
}

Expand Down Expand Up @@ -1116,15 +1117,22 @@ void nano::node::add_initial_peers ()
return;
}

auto transaction (store.tx_begin_read ());
for (auto i (store.peer.begin (transaction)), n (store.peer.end ()); i != n; ++i)
std::vector<nano::endpoint> initial_peers;
{
nano::endpoint endpoint (boost::asio::ip::address_v6 (i->first.address_bytes ()), i->first.port ());
if (!network.reachout (endpoint, config.allow_local_peers))
auto transaction = store.tx_begin_read ();
for (auto i (store.peer.begin (transaction)), n (store.peer.end ()); i != n; ++i)
{
network.tcp_channels.start_tcp (endpoint);
nano::endpoint endpoint (boost::asio::ip::address_v6 (i->first.address_bytes ()), i->first.port ());
initial_peers.push_back (endpoint);
}
}

logger.info (nano::log::type::node, "Adding cached initial peers: {}", initial_peers.size ());

for (auto const & peer : initial_peers)
{
network.merge_peer (peer);
}
}

void nano::node::start_election (std::shared_ptr<nano::block> const & block)
Expand Down
3 changes: 3 additions & 0 deletions nano/node/transport/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class channel
nano::transport::traffic_type = nano::transport::traffic_type::generic)
= 0;

virtual void close () = 0;

virtual std::string to_string () const = 0;
virtual nano::endpoint get_endpoint () const = 0;
virtual nano::tcp_endpoint get_tcp_endpoint () const = 0;
Expand All @@ -50,6 +52,7 @@ class channel
{
return false;
}

virtual bool alive () const
{
return true;
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/fake.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace transport
return nano::transport::transport_type::fake;
}

void close ()
void close () override
{
closed = true;
}
Expand Down
Loading

0 comments on commit 9b38bb2

Please sign in to comment.