Skip to content

Commit

Permalink
Merge pull request #4354 from clemahieu/socket_threads
Browse files Browse the repository at this point in the history
Merge server_socket in to tcp_listener
  • Loading branch information
clemahieu authored Jan 10, 2024
2 parents 3b57c1f + a66787a commit 5db5c60
Show file tree
Hide file tree
Showing 13 changed files with 331 additions and 452 deletions.
30 changes: 15 additions & 15 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ TEST (network, construction_with_specified_port)
auto const node = system.add_node (nano::node_config{ port, system.logging });
EXPECT_EQ (port, node->network.port);
EXPECT_EQ (port, node->network.endpoint ().port ());
EXPECT_EQ (port, node->tcp_listener.endpoint ().port ());
EXPECT_EQ (port, node->tcp_listener->endpoint ().port ());
}

TEST (network, construction_without_specified_port)
Expand All @@ -71,7 +71,7 @@ TEST (network, construction_without_specified_port)
auto const port = node->network.port.load ();
EXPECT_NE (0, port);
EXPECT_EQ (port, node->network.endpoint ().port ());
EXPECT_EQ (port, node->tcp_listener.endpoint ().port ());
EXPECT_EQ (port, node->tcp_listener->endpoint ().port ());
}

TEST (network, send_node_id_handshake_tcp)
Expand Down Expand Up @@ -646,8 +646,8 @@ TEST (node, port_mapping)
TEST (tcp_listener, tcp_node_id_handshake)
{
nano::test::system system (1);
auto socket (std::make_shared<nano::transport::client_socket> (*system.nodes[0]));
auto bootstrap_endpoint (system.nodes[0]->tcp_listener.endpoint ());
auto socket (std::make_shared<nano::transport::socket> (*system.nodes[0]));
auto bootstrap_endpoint (system.nodes[0]->tcp_listener->endpoint ());
auto cookie (system.nodes[0]->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (bootstrap_endpoint)));
ASSERT_TRUE (cookie);
nano::node_id_handshake::query_payload query{ *cookie };
Expand Down Expand Up @@ -684,9 +684,9 @@ TEST (tcp_listener, DISABLED_tcp_listener_timeout_empty)
{
nano::test::system system (1);
auto node0 (system.nodes[0]);
auto socket (std::make_shared<nano::transport::client_socket> (*node0));
auto socket (std::make_shared<nano::transport::socket> (*node0));
std::atomic<bool> connected (false);
socket->async_connect (node0->tcp_listener.endpoint (), [&connected] (boost::system::error_code const & ec) {
socket->async_connect (node0->tcp_listener->endpoint (), [&connected] (boost::system::error_code const & ec) {
ASSERT_FALSE (ec);
connected = true;
});
Expand All @@ -696,8 +696,8 @@ TEST (tcp_listener, DISABLED_tcp_listener_timeout_empty)
while (!disconnected)
{
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener.mutex);
disconnected = node0->tcp_listener.connections.empty ();
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
disconnected = node0->tcp_listener->connections.empty ();
}
ASSERT_NO_ERROR (system.poll ());
}
Expand All @@ -707,30 +707,30 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake)
{
nano::test::system system (1);
auto node0 (system.nodes[0]);
auto socket (std::make_shared<nano::transport::client_socket> (*node0));
auto cookie (node0->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (node0->tcp_listener.endpoint ())));
auto socket (std::make_shared<nano::transport::socket> (*node0));
auto cookie (node0->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (node0->tcp_listener->endpoint ())));
ASSERT_TRUE (cookie);
nano::node_id_handshake::query_payload query{ *cookie };
nano::node_id_handshake node_id_handshake{ nano::dev::network_params.network, query };
auto channel = std::make_shared<nano::transport::channel_tcp> (*node0, socket);
socket->async_connect (node0->tcp_listener.endpoint (), [&node_id_handshake, channel] (boost::system::error_code const & ec) {
socket->async_connect (node0->tcp_listener->endpoint (), [&node_id_handshake, channel] (boost::system::error_code const & ec) {
ASSERT_FALSE (ec);
channel->send (node_id_handshake, [] (boost::system::error_code const & ec, size_t size_a) {
ASSERT_FALSE (ec);
});
});
ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake) != 0);
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener.mutex);
ASSERT_EQ (node0->tcp_listener.connections.size (), 1);
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
ASSERT_EQ (node0->tcp_listener->connections.size (), 1);
}
bool disconnected (false);
system.deadline_set (std::chrono::seconds (20));
while (!disconnected)
{
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener.mutex);
disconnected = node0->tcp_listener.connections.empty ();
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
disconnected = node0->tcp_listener->connections.empty ();
}
ASSERT_NO_ERROR (system.poll ());
}
Expand Down
2 changes: 1 addition & 1 deletion nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3184,7 +3184,7 @@ TEST (node, peers)
node2->start ();
ASSERT_TIMELY (10s, !node2->network.empty () && !node1->network.empty ())
// Wait to finish TCP node ID handshakes
ASSERT_TIMELY (10s, node1->tcp_listener.realtime_count != 0 && node2->tcp_listener.realtime_count != 0);
ASSERT_TIMELY (10s, node1->tcp_listener->realtime_count != 0 && node2->tcp_listener->realtime_count != 0);
// Confirm that the peers match with the endpoints we are expecting
ASSERT_EQ (1, node1->network.size ());
auto list1 (node1->network.list (2));
Expand Down
20 changes: 10 additions & 10 deletions nano/core_test/request_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ TEST (request_aggregator, one)
.build_shared ();
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send1->hash (), send1->root ());
auto client = std::make_shared<nano::transport::client_socket> (node);
auto client = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel = std::make_shared<nano::transport::channel_tcp> (node, client);
node.aggregator.add (dummy_channel, request);
ASSERT_EQ (1, node.aggregator.size ());
Expand Down Expand Up @@ -98,7 +98,7 @@ TEST (request_aggregator, one_update)
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *receive1).code);
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send2->hash (), send2->root ());
auto client = std::make_shared<nano::transport::client_socket> (node);
auto client = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel = std::make_shared<nano::transport::channel_tcp> (node, client);
node.aggregator.add (dummy_channel, request);
request.clear ();
Expand Down Expand Up @@ -165,7 +165,7 @@ TEST (request_aggregator, two)
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send2->hash (), send2->root ());
request.emplace_back (receive1->hash (), receive1->root ());
auto client = std::make_shared<nano::transport::client_socket> (node);
auto client = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel = std::make_shared<nano::transport::channel_tcp> (node, client);
// Process both blocks
node.aggregator.add (dummy_channel, request);
Expand Down Expand Up @@ -289,7 +289,7 @@ TEST (request_aggregator, split)
election->force_confirm ();
ASSERT_TIMELY (5s, max_vbh + 2 == node.ledger.cache.cemented_count);
ASSERT_EQ (max_vbh + 1, request.size ());
auto client = std::make_shared<nano::transport::client_socket> (node);
auto client = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel = std::make_shared<nano::transport::channel_tcp> (node, client);
node.aggregator.add (dummy_channel, request);
ASSERT_EQ (1, node.aggregator.size ());
Expand Down Expand Up @@ -330,7 +330,7 @@ TEST (request_aggregator, channel_lifetime)
request.emplace_back (send1->hash (), send1->root ());
{
// The aggregator should extend the lifetime of the channel
auto client = std::make_shared<nano::transport::client_socket> (node);
auto client = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel = std::make_shared<nano::transport::channel_tcp> (node, client);
node.aggregator.add (dummy_channel, request);
}
Expand Down Expand Up @@ -361,11 +361,11 @@ TEST (request_aggregator, channel_update)
request.emplace_back (send1->hash (), send1->root ());
std::weak_ptr<nano::transport::channel> channel1_w;
{
auto client1 = std::make_shared<nano::transport::client_socket> (node);
auto client1 = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel1 = std::make_shared<nano::transport::channel_tcp> (node, client1);
channel1_w = dummy_channel1;
node.aggregator.add (dummy_channel1, request);
auto client2 = std::make_shared<nano::transport::client_socket> (node);
auto client2 = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel2 = std::make_shared<nano::transport::channel_tcp> (node, client2);
// The aggregator then hold channel2 and drop channel1
node.aggregator.add (dummy_channel2, request);
Expand Down Expand Up @@ -399,7 +399,7 @@ TEST (request_aggregator, channel_max_queue)
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code);
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send1->hash (), send1->root ());
auto client = std::make_shared<nano::transport::client_socket> (node);
auto client = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel = std::make_shared<nano::transport::channel_tcp> (node, client);
node.aggregator.add (dummy_channel, request);
node.aggregator.add (dummy_channel, request);
Expand Down Expand Up @@ -427,7 +427,7 @@ TEST (request_aggregator, unique)
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code);
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send1->hash (), send1->root ());
auto client = std::make_shared<nano::transport::client_socket> (node);
auto client = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel = std::make_shared<nano::transport::channel_tcp> (node, client);
node.aggregator.add (dummy_channel, request);
node.aggregator.add (dummy_channel, request);
Expand Down Expand Up @@ -474,7 +474,7 @@ TEST (request_aggregator, cannot_vote)
request.emplace_back (send2->hash (), send2->root ());
// Incorrect hash, correct root
request.emplace_back (1, send2->root ());
auto client = std::make_shared<nano::transport::client_socket> (node);
auto client = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel = std::make_shared<nano::transport::channel_tcp> (node, client);
node.aggregator.add (dummy_channel, request);
ASSERT_EQ (1, node.aggregator.size ());
Expand Down
Loading

0 comments on commit 5db5c60

Please sign in to comment.