Skip to content

Commit

Permalink
Message sink interface (#3364)
Browse files Browse the repository at this point in the history
This change is an application of dependency injection to remove coupling between the tcp_channels and udp_channels classes and the network class. An interface called message_sink has been created and an implementation that forwards network message matching existing behavior has been put it its place.
  • Loading branch information
clemahieu authored Jul 3, 2021
1 parent def99da commit f594d0b
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 60 deletions.
4 changes: 2 additions & 2 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,11 @@ TEST (active_transactions, inactive_votes_cache_fork)
node.vote_processor.vote (vote, std::make_shared<nano::transport::channel_loopback> (node));
auto channel1 (node.network.udp_channels.create (node.network.endpoint ()));
ASSERT_TIMELY (5s, node.active.inactive_votes_cache_size () == 1);
node.network.process_message (nano::publish (send2), channel1);
node.network.inbound (nano::publish (send2), channel1);
node.block_processor.flush ();
ASSERT_NE (nullptr, node.block (send2->hash ()));
node.scheduler.flush (); // Start election, otherwise conflicting block won't be inserted into election
node.network.process_message (nano::publish (send1), channel1);
node.network.inbound (nano::publish (send1), channel1);
node.block_processor.flush ();
bool confirmed (false);
system.deadline_set (5s);
Expand Down
8 changes: 4 additions & 4 deletions nano/core_test/confirmation_height.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,19 +662,19 @@ TEST (confirmation_height, conflict_rollback_cemented)
auto send2 (std::make_shared<nano::send_block> (genesis.hash (), key2.pub, nano::genesis_amount - 100, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (genesis.hash ())));
nano::publish publish2 (send2);
auto channel1 (node1->network.udp_channels.create (node1->network.endpoint ()));
node1->network.process_message (publish1, channel1);
node1->network.inbound (publish1, channel1);
node1->block_processor.flush ();
node1->scheduler.flush ();
auto channel2 (node2->network.udp_channels.create (node1->network.endpoint ()));
node2->network.process_message (publish2, channel2);
node2->network.inbound (publish2, channel2);
node2->block_processor.flush ();
node2->scheduler.flush ();
ASSERT_EQ (1, node1->active.size ());
ASSERT_EQ (1, node2->active.size ());
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
node1->network.process_message (publish2, channel1);
node1->network.inbound (publish2, channel1);
node1->block_processor.flush ();
node2->network.process_message (publish1, channel2);
node2->network.inbound (publish1, channel2);
node2->block_processor.flush ();
auto election (node2->active.election (nano::qualified_root (genesis.hash (), genesis.hash ())));
ASSERT_NE (nullptr, election);
Expand Down
8 changes: 4 additions & 4 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ TEST (receivable_processor, confirm_insufficient_pos)
nano::keypair key1;
auto vote (std::make_shared<nano::vote> (key1.pub, key1.prv, 0, block1));
nano::confirm_ack con1 (vote);
node1.network.process_message (con1, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.inbound (con1, node1.network.udp_channels.create (node1.network.endpoint ()));
}

TEST (receivable_processor, confirm_sufficient_pos)
Expand All @@ -375,7 +375,7 @@ TEST (receivable_processor, confirm_sufficient_pos)
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
auto vote (std::make_shared<nano::vote> (nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, 0, block1));
nano::confirm_ack con1 (vote);
node1.network.process_message (con1, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.inbound (con1, node1.network.udp_channels.create (node1.network.endpoint ()));
}

TEST (receivable_processor, send_with_receive)
Expand Down Expand Up @@ -989,10 +989,10 @@ TEST (network, duplicate_revert_publish)
auto channel = nano::establish_tcp (system, *other_node, node.network.endpoint ());
ASSERT_NE (nullptr, channel);
ASSERT_EQ (0, publish.digest);
node.network.process_message (publish, channel);
node.network.inbound (publish, channel);
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
publish.digest = digest;
node.network.process_message (publish, channel);
node.network.inbound (publish, channel);
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
}

Expand Down
58 changes: 29 additions & 29 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ TEST (node, receive_gap)
.build_shared ();
node1.work_generate_blocking (*block);
nano::publish message (block);
node1.network.process_message (message, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.inbound (message, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.block_processor.flush ();
ASSERT_EQ (1, node1.gap_cache.size ());
}
Expand Down Expand Up @@ -1210,19 +1210,19 @@ TEST (node, fork_flip)
.build_shared ();
nano::publish publish2 (send2);
auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.process_message (publish1, channel1);
node1.network.inbound (publish1, channel1);
node1.block_processor.flush ();
node1.scheduler.flush ();
auto channel2 (node2.network.udp_channels.create (node1.network.endpoint ()));
node2.network.process_message (publish2, channel2);
node2.network.inbound (publish2, channel2);
node2.block_processor.flush ();
node2.scheduler.flush ();
ASSERT_EQ (1, node1.active.size ());
ASSERT_EQ (1, node2.active.size ());
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
node1.network.process_message (publish2, channel1);
node1.network.inbound (publish2, channel1);
node1.block_processor.flush ();
node2.network.process_message (publish1, channel2);
node2.network.inbound (publish1, channel2);
node2.block_processor.flush ();
auto election1 (node2.active.election (nano::qualified_root (genesis.hash (), genesis.hash ())));
ASSERT_NE (nullptr, election1);
Expand Down Expand Up @@ -1285,20 +1285,20 @@ TEST (node, fork_multi_flip)
.work (*system.work.generate (publish2.block->hash ()))
.build_shared ();
nano::publish publish3 (send3);
node1.network.process_message (publish1, node1.network.udp_channels.create (node1.network.endpoint ()));
node2.network.process_message (publish2, node2.network.udp_channels.create (node2.network.endpoint ()));
node2.network.process_message (publish3, node2.network.udp_channels.create (node2.network.endpoint ()));
node1.network.inbound (publish1, node1.network.udp_channels.create (node1.network.endpoint ()));
node2.network.inbound (publish2, node2.network.udp_channels.create (node2.network.endpoint ()));
node2.network.inbound (publish3, node2.network.udp_channels.create (node2.network.endpoint ()));
node1.block_processor.flush ();
node1.scheduler.flush ();
node2.block_processor.flush ();
node2.scheduler.flush ();
ASSERT_EQ (1, node1.active.size ());
ASSERT_EQ (1, node2.active.size ());
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
node1.network.process_message (publish2, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.process_message (publish3, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.inbound (publish2, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.inbound (publish3, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.block_processor.flush ();
node2.network.process_message (publish1, node2.network.udp_channels.create (node2.network.endpoint ()));
node2.network.inbound (publish1, node2.network.udp_channels.create (node2.network.endpoint ()));
node2.block_processor.flush ();
auto election1 (node2.active.election (nano::qualified_root (genesis.hash (), genesis.hash ())));
ASSERT_NE (nullptr, election1);
Expand Down Expand Up @@ -1380,7 +1380,7 @@ TEST (node, fork_open)
.build_shared ();
nano::publish publish1 (send1);
auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.process_message (publish1, channel1);
node1.network.inbound (publish1, channel1);
node1.block_processor.flush ();
node1.scheduler.flush ();
auto election = node1.active.election (publish1.block->qualified_root ());
Expand All @@ -1396,7 +1396,7 @@ TEST (node, fork_open)
.work (*system.work.generate (key1.pub))
.build_shared ();
nano::publish publish2 (open1);
node1.network.process_message (publish2, channel1);
node1.network.inbound (publish2, channel1);
node1.block_processor.flush ();
node1.scheduler.flush ();
ASSERT_EQ (1, node1.active.size ());
Expand All @@ -1409,7 +1409,7 @@ TEST (node, fork_open)
.build_shared ();
nano::publish publish3 (open2);
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
node1.network.process_message (publish3, channel1);
node1.network.inbound (publish3, channel1);
node1.block_processor.flush ();
node1.scheduler.flush ();
election = node1.active.election (publish3.block->qualified_root ());
Expand Down Expand Up @@ -2714,14 +2714,14 @@ TEST (node, local_votes_cache)
nano::confirm_req message1 (send1);
nano::confirm_req message2 (send2);
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
node.network.process_message (message1, channel);
node.network.inbound (message1, channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 1);
node.network.process_message (message2, channel);
node.network.inbound (message2, channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 2);
for (auto i (0); i < 100; ++i)
{
node.network.process_message (message1, channel);
node.network.process_message (message2, channel);
node.network.inbound (message1, channel);
node.network.inbound (message2, channel);
}
for (int i = 0; i < 4; ++i)
{
Expand All @@ -2737,7 +2737,7 @@ TEST (node, local_votes_cache)
nano::confirm_req message3 (send3);
for (auto i (0); i < 100; ++i)
{
node.network.process_message (message3, channel);
node.network.inbound (message3, channel);
}
for (int i = 0; i < 4; ++i)
{
Expand Down Expand Up @@ -2795,26 +2795,26 @@ TEST (node, local_votes_cache_batch)
nano::confirm_req message (batch);
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
// Generates and sends one vote for both hashes which is then cached
node.network.process_message (message, channel);
node.network.inbound (message, channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 1);
ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
ASSERT_FALSE (node.history.votes (send2->root (), send2->hash ()).empty ());
ASSERT_FALSE (node.history.votes (receive1->root (), receive1->hash ()).empty ());
// Only one confirm_ack should be sent if all hashes are part of the same vote
node.network.process_message (message, channel);
node.network.inbound (message, channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 2);
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
// Test when votes are different
node.history.erase (send2->root ());
node.history.erase (receive1->root ());
node.network.process_message (nano::confirm_req (send2->hash (), send2->root ()), channel);
node.network.inbound (nano::confirm_req (send2->hash (), send2->root ()), channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 3);
ASSERT_EQ (3, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
node.network.process_message (nano::confirm_req (receive1->hash (), receive1->root ()), channel);
node.network.inbound (nano::confirm_req (receive1->hash (), receive1->root ()), channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 4);
ASSERT_EQ (4, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
// There are two different votes, so both should be sent in response
node.network.process_message (message, channel);
node.network.inbound (message, channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 6);
ASSERT_EQ (6, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}
Expand All @@ -2830,7 +2830,7 @@ TEST (node, local_votes_cache_generate_new_vote)
// Repsond with cached vote
nano::confirm_req message1 (genesis.open);
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
node.network.process_message (message1, channel);
node.network.inbound (message1, channel);
ASSERT_TIMELY (3s, !node.history.votes (genesis.open->root (), genesis.open->hash ()).empty ());
auto votes1 (node.history.votes (genesis.open->root (), genesis.open->hash ()));
ASSERT_EQ (1, votes1.size ());
Expand All @@ -2850,7 +2850,7 @@ TEST (node, local_votes_cache_generate_new_vote)
// One of the hashes is cached
std::vector<std::pair<nano::block_hash, nano::root>> roots_hashes{ std::make_pair (genesis.open->hash (), genesis.open->root ()), std::make_pair (send1->hash (), send1->root ()) };
nano::confirm_req message2 (roots_hashes);
node.network.process_message (message2, channel);
node.network.inbound (message2, channel);
ASSERT_TIMELY (3s, !node.history.votes (send1->root (), send1->hash ()).empty ());
auto votes2 (node.history.votes (send1->root (), send1->hash ()));
ASSERT_EQ (1, votes2.size ());
Expand Down Expand Up @@ -3259,13 +3259,13 @@ TEST (node, fork_election_invalid_block_signature)
.sign (nano::dev_genesis_key.prv, 0) // Invalid signature
.build_shared ();
auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.process_message (nano::publish (send1), channel1);
node1.network.inbound (nano::publish (send1), channel1);
ASSERT_TIMELY (5s, node1.active.active (send1->qualified_root ()));
auto election (node1.active.election (send1->qualified_root ()));
ASSERT_NE (nullptr, election);
ASSERT_EQ (1, election->blocks ().size ());
node1.network.process_message (nano::publish (send3), channel1);
node1.network.process_message (nano::publish (send2), channel1);
node1.network.inbound (nano::publish (send3), channel1);
node1.network.inbound (nano::publish (send2), channel1);
ASSERT_TIMELY (3s, election->blocks ().size () > 1);
ASSERT_EQ (election->blocks ()[send2->hash ()]->block_signature (), send2->block_signature ());
}
Expand Down
4 changes: 2 additions & 2 deletions nano/core_test/telemetry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ TEST (telemetry, receive_from_non_listening_channel)
nano::system system;
auto node = system.add_node ();
nano::telemetry_ack message (nano::telemetry_data{});
node->network.process_message (message, node->network.udp_channels.create (node->network.endpoint ()));
node->network.inbound (message, node->network.udp_channels.create (node->network.endpoint ()));
// We have not sent a telemetry_req message to this endpoint, so shouldn't count telemetry_ack received from it.
ASSERT_EQ (node->telemetry->telemetry_data_size (), 0);
}
Expand Down Expand Up @@ -632,7 +632,7 @@ TEST (telemetry, remove_peer_invalid_signature)
// Change anything so that the signed message is incorrect
telemetry_data.block_count = 0;
auto telemetry_ack = nano::telemetry_ack (telemetry_data);
node->network.process_message (telemetry_ack, channel);
node->network.inbound (telemetry_ack, channel);

ASSERT_TIMELY (10s, node->stats.count (nano::stat::type::telemetry, nano::stat::detail::invalid_signature) > 0);
ASSERT_NO_ERROR (system.poll_until_true (3s, [&node, address = channel->get_endpoint ().address ()] () -> bool {
Expand Down
2 changes: 1 addition & 1 deletion nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ TEST (websocket, stopped_election)
auto send1 (std::make_shared<nano::send_block> (genesis.hash (), key1.pub, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (genesis.hash ())));
nano::publish publish1 (send1);
auto channel1 (node1->network.udp_channels.create (node1->network.endpoint ()));
node1->network.process_message (publish1, channel1);
node1->network.inbound (publish1, channel1);
node1->block_processor.flush ();
ASSERT_TIMELY (1s, node1->active.election (send1->qualified_root ()));
node1->active.erase (*send1);
Expand Down
5 changes: 3 additions & 2 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@

nano::network::network (nano::node & node_a, uint16_t port_a) :
syn_cookies (node_a.network_params.node.max_peers_per_ip),
inbound{ [this] (nano::message const & message, std::shared_ptr<nano::transport::channel> const & channel) { process_message (message, channel); } },
buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer
resolver (node_a.io_ctx),
limiter (node_a.config.bandwidth_limit_burst_ratio, node_a.config.bandwidth_limit),
tcp_message_manager (node_a.config.tcp_incoming_connections_max),
node (node_a),
publish_filter (256 * 1024),
udp_channels (node_a, port_a),
tcp_channels (node_a),
udp_channels (node_a, port_a, inbound),
tcp_channels (node_a, inbound),
port (port_a),
disconnect_observer ([] () {})
{
Expand Down
7 changes: 6 additions & 1 deletion nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ class network final
void broadcast_confirm_req_many (std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>>>, std::function<void ()> = nullptr, unsigned = broadcast_interval_ms);
std::shared_ptr<nano::transport::channel> find_node_id (nano::account const &);
std::shared_ptr<nano::transport::channel> find_channel (nano::endpoint const &);
void process_message (nano::message const &, std::shared_ptr<nano::transport::channel> const &);
bool not_a_peer (nano::endpoint const &, bool);
// Should we reach out to this endpoint with a keepalive message
bool reachout (nano::endpoint const &, bool = false);
Expand All @@ -180,6 +179,12 @@ class network final
bool empty () const;
void erase (nano::transport::channel const &);
void set_bandwidth_params (double, size_t);

private:
void process_message (nano::message const &, std::shared_ptr<nano::transport::channel> const &);

public:
std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> inbound;
nano::message_buffer_manager buffer_container;
boost::asio::ip::udp::resolver resolver;
std::vector<boost::thread> packet_processing_threads;
Expand Down
11 changes: 6 additions & 5 deletions nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ void nano::transport::channel_tcp::set_endpoint ()
}
}

nano::transport::tcp_channels::tcp_channels (nano::node & node_a) :
node (node_a)
nano::transport::tcp_channels::tcp_channels (nano::node & node, std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink) :
node{ node },
sink{ sink }
{
}

Expand Down Expand Up @@ -296,14 +297,14 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
auto channel (node.network.find_channel (nano::transport::map_tcp_to_endpoint (endpoint_a)));
if (channel)
{
node.network.process_message (message_a, channel);
sink (message_a, channel);
}
else
{
channel = node.network.find_node_id (node_id_a);
if (channel)
{
node.network.process_message (message_a, channel);
sink (message_a, channel);
}
else if (!node.network.excluded_peers.check (endpoint_a))
{
Expand All @@ -322,7 +323,7 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
{
insert (temporary_channel, socket_a, nullptr);
}
node.network.process_message (message_a, temporary_channel);
sink (message_a, temporary_channel);
}
else
{
Expand Down
3 changes: 2 additions & 1 deletion nano/node/transport/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ namespace transport
friend class telemetry_simultaneous_requests_Test;

public:
tcp_channels (nano::node &);
tcp_channels (nano::node &, std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> = nullptr);
bool insert (std::shared_ptr<nano::transport::channel_tcp> const &, std::shared_ptr<nano::socket> const &, std::shared_ptr<nano::bootstrap_server> const &);
void erase (nano::tcp_endpoint const &);
size_t size () const;
Expand Down Expand Up @@ -112,6 +112,7 @@ namespace transport
nano::node & node;

private:
std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink;
class endpoint_tag
{
};
Expand Down
Loading

0 comments on commit f594d0b

Please sign in to comment.