Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message sink interface #3364

Merged
merged 3 commits into from
Jul 3, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,11 @@ TEST (active_transactions, inactive_votes_cache_fork)
node.vote_processor.vote (vote, std::make_shared<nano::transport::channel_loopback> (node));
auto channel1 (node.network.udp_channels.create (node.network.endpoint ()));
ASSERT_TIMELY (5s, node.active.inactive_votes_cache_size () == 1);
node.network.process_message (nano::publish (send2), channel1);
node.network.inbound.sink (nano::publish (send2), channel1);
node.block_processor.flush ();
ASSERT_NE (nullptr, node.block (send2->hash ()));
node.scheduler.flush (); // Start election, otherwise conflicting block won't be inserted into election
node.network.process_message (nano::publish (send1), channel1);
node.network.inbound.sink (nano::publish (send1), channel1);
node.block_processor.flush ();
bool confirmed (false);
system.deadline_set (5s);
Expand Down
8 changes: 4 additions & 4 deletions nano/core_test/confirmation_height.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,19 +662,19 @@ TEST (confirmation_height, conflict_rollback_cemented)
auto send2 (std::make_shared<nano::send_block> (genesis.hash (), key2.pub, nano::genesis_amount - 100, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (genesis.hash ())));
nano::publish publish2 (send2);
auto channel1 (node1->network.udp_channels.create (node1->network.endpoint ()));
node1->network.process_message (publish1, channel1);
node1->network.inbound.sink (publish1, channel1);
node1->block_processor.flush ();
node1->scheduler.flush ();
auto channel2 (node2->network.udp_channels.create (node1->network.endpoint ()));
node2->network.process_message (publish2, channel2);
node2->network.inbound.sink (publish2, channel2);
node2->block_processor.flush ();
node2->scheduler.flush ();
ASSERT_EQ (1, node1->active.size ());
ASSERT_EQ (1, node2->active.size ());
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
node1->network.process_message (publish2, channel1);
node1->network.inbound.sink (publish2, channel1);
node1->block_processor.flush ();
node2->network.process_message (publish1, channel2);
node2->network.inbound.sink (publish1, channel2);
node2->block_processor.flush ();
auto election (node2->active.election (nano::qualified_root (genesis.hash (), genesis.hash ())));
ASSERT_NE (nullptr, election);
Expand Down
8 changes: 4 additions & 4 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ TEST (receivable_processor, confirm_insufficient_pos)
nano::keypair key1;
auto vote (std::make_shared<nano::vote> (key1.pub, key1.prv, 0, block1));
nano::confirm_ack con1 (vote);
node1.network.process_message (con1, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.inbound.sink (con1, node1.network.udp_channels.create (node1.network.endpoint ()));
}

TEST (receivable_processor, confirm_sufficient_pos)
Expand All @@ -375,7 +375,7 @@ TEST (receivable_processor, confirm_sufficient_pos)
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
auto vote (std::make_shared<nano::vote> (nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, 0, block1));
nano::confirm_ack con1 (vote);
node1.network.process_message (con1, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.inbound.sink (con1, node1.network.udp_channels.create (node1.network.endpoint ()));
}

TEST (receivable_processor, send_with_receive)
Expand Down Expand Up @@ -989,10 +989,10 @@ TEST (network, duplicate_revert_publish)
auto channel = nano::establish_tcp (system, *other_node, node.network.endpoint ());
ASSERT_NE (nullptr, channel);
ASSERT_EQ (0, publish.digest);
node.network.process_message (publish, channel);
node.network.inbound.sink (publish, channel);
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
publish.digest = digest;
node.network.process_message (publish, channel);
node.network.inbound.sink (publish, channel);
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
}

Expand Down
58 changes: 29 additions & 29 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ TEST (node, receive_gap)
.build_shared ();
node1.work_generate_blocking (*block);
nano::publish message (block);
node1.network.process_message (message, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.inbound.sink (message, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.block_processor.flush ();
ASSERT_EQ (1, node1.gap_cache.size ());
}
Expand Down Expand Up @@ -1210,19 +1210,19 @@ TEST (node, fork_flip)
.build_shared ();
nano::publish publish2 (send2);
auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.process_message (publish1, channel1);
node1.network.inbound.sink (publish1, channel1);
node1.block_processor.flush ();
node1.scheduler.flush ();
auto channel2 (node2.network.udp_channels.create (node1.network.endpoint ()));
node2.network.process_message (publish2, channel2);
node2.network.inbound.sink (publish2, channel2);
node2.block_processor.flush ();
node2.scheduler.flush ();
ASSERT_EQ (1, node1.active.size ());
ASSERT_EQ (1, node2.active.size ());
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
node1.network.process_message (publish2, channel1);
node1.network.inbound.sink (publish2, channel1);
node1.block_processor.flush ();
node2.network.process_message (publish1, channel2);
node2.network.inbound.sink (publish1, channel2);
node2.block_processor.flush ();
auto election1 (node2.active.election (nano::qualified_root (genesis.hash (), genesis.hash ())));
ASSERT_NE (nullptr, election1);
Expand Down Expand Up @@ -1285,20 +1285,20 @@ TEST (node, fork_multi_flip)
.work (*system.work.generate (publish2.block->hash ()))
.build_shared ();
nano::publish publish3 (send3);
node1.network.process_message (publish1, node1.network.udp_channels.create (node1.network.endpoint ()));
node2.network.process_message (publish2, node2.network.udp_channels.create (node2.network.endpoint ()));
node2.network.process_message (publish3, node2.network.udp_channels.create (node2.network.endpoint ()));
node1.network.inbound.sink (publish1, node1.network.udp_channels.create (node1.network.endpoint ()));
node2.network.inbound.sink (publish2, node2.network.udp_channels.create (node2.network.endpoint ()));
node2.network.inbound.sink (publish3, node2.network.udp_channels.create (node2.network.endpoint ()));
node1.block_processor.flush ();
node1.scheduler.flush ();
node2.block_processor.flush ();
node2.scheduler.flush ();
ASSERT_EQ (1, node1.active.size ());
ASSERT_EQ (1, node2.active.size ());
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
node1.network.process_message (publish2, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.process_message (publish3, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.inbound.sink (publish2, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.inbound.sink (publish3, node1.network.udp_channels.create (node1.network.endpoint ()));
node1.block_processor.flush ();
node2.network.process_message (publish1, node2.network.udp_channels.create (node2.network.endpoint ()));
node2.network.inbound.sink (publish1, node2.network.udp_channels.create (node2.network.endpoint ()));
node2.block_processor.flush ();
auto election1 (node2.active.election (nano::qualified_root (genesis.hash (), genesis.hash ())));
ASSERT_NE (nullptr, election1);
Expand Down Expand Up @@ -1380,7 +1380,7 @@ TEST (node, fork_open)
.build_shared ();
nano::publish publish1 (send1);
auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.process_message (publish1, channel1);
node1.network.inbound.sink (publish1, channel1);
node1.block_processor.flush ();
node1.scheduler.flush ();
auto election = node1.active.election (publish1.block->qualified_root ());
Expand All @@ -1396,7 +1396,7 @@ TEST (node, fork_open)
.work (*system.work.generate (key1.pub))
.build_shared ();
nano::publish publish2 (open1);
node1.network.process_message (publish2, channel1);
node1.network.inbound.sink (publish2, channel1);
node1.block_processor.flush ();
node1.scheduler.flush ();
ASSERT_EQ (1, node1.active.size ());
Expand All @@ -1409,7 +1409,7 @@ TEST (node, fork_open)
.build_shared ();
nano::publish publish3 (open2);
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
node1.network.process_message (publish3, channel1);
node1.network.inbound.sink (publish3, channel1);
node1.block_processor.flush ();
node1.scheduler.flush ();
election = node1.active.election (publish3.block->qualified_root ());
Expand Down Expand Up @@ -2714,14 +2714,14 @@ TEST (node, local_votes_cache)
nano::confirm_req message1 (send1);
nano::confirm_req message2 (send2);
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
node.network.process_message (message1, channel);
node.network.inbound.sink (message1, channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 1);
node.network.process_message (message2, channel);
node.network.inbound.sink (message2, channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 2);
for (auto i (0); i < 100; ++i)
{
node.network.process_message (message1, channel);
node.network.process_message (message2, channel);
node.network.inbound.sink (message1, channel);
node.network.inbound.sink (message2, channel);
}
for (int i = 0; i < 4; ++i)
{
Expand All @@ -2737,7 +2737,7 @@ TEST (node, local_votes_cache)
nano::confirm_req message3 (send3);
for (auto i (0); i < 100; ++i)
{
node.network.process_message (message3, channel);
node.network.inbound.sink (message3, channel);
}
for (int i = 0; i < 4; ++i)
{
Expand Down Expand Up @@ -2795,26 +2795,26 @@ TEST (node, local_votes_cache_batch)
nano::confirm_req message (batch);
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
// Generates and sends one vote for both hashes which is then cached
node.network.process_message (message, channel);
node.network.inbound.sink (message, channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 1);
ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
ASSERT_FALSE (node.history.votes (send2->root (), send2->hash ()).empty ());
ASSERT_FALSE (node.history.votes (receive1->root (), receive1->hash ()).empty ());
// Only one confirm_ack should be sent if all hashes are part of the same vote
node.network.process_message (message, channel);
node.network.inbound.sink (message, channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 2);
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
// Test when votes are different
node.history.erase (send2->root ());
node.history.erase (receive1->root ());
node.network.process_message (nano::confirm_req (send2->hash (), send2->root ()), channel);
node.network.inbound.sink (nano::confirm_req (send2->hash (), send2->root ()), channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 3);
ASSERT_EQ (3, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
node.network.process_message (nano::confirm_req (receive1->hash (), receive1->root ()), channel);
node.network.inbound.sink (nano::confirm_req (receive1->hash (), receive1->root ()), channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 4);
ASSERT_EQ (4, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
// There are two different votes, so both should be sent in response
node.network.process_message (message, channel);
node.network.inbound.sink (message, channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 6);
ASSERT_EQ (6, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}
Expand All @@ -2830,7 +2830,7 @@ TEST (node, local_votes_cache_generate_new_vote)
// Repsond with cached vote
nano::confirm_req message1 (genesis.open);
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
node.network.process_message (message1, channel);
node.network.inbound.sink (message1, channel);
ASSERT_TIMELY (3s, !node.history.votes (genesis.open->root (), genesis.open->hash ()).empty ());
auto votes1 (node.history.votes (genesis.open->root (), genesis.open->hash ()));
ASSERT_EQ (1, votes1.size ());
Expand All @@ -2850,7 +2850,7 @@ TEST (node, local_votes_cache_generate_new_vote)
// One of the hashes is cached
std::vector<std::pair<nano::block_hash, nano::root>> roots_hashes{ std::make_pair (genesis.open->hash (), genesis.open->root ()), std::make_pair (send1->hash (), send1->root ()) };
nano::confirm_req message2 (roots_hashes);
node.network.process_message (message2, channel);
node.network.inbound.sink (message2, channel);
ASSERT_TIMELY (3s, !node.history.votes (send1->root (), send1->hash ()).empty ());
auto votes2 (node.history.votes (send1->root (), send1->hash ()));
ASSERT_EQ (1, votes2.size ());
Expand Down Expand Up @@ -3259,13 +3259,13 @@ TEST (node, fork_election_invalid_block_signature)
.sign (nano::dev_genesis_key.prv, 0) // Invalid signature
.build_shared ();
auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ()));
node1.network.process_message (nano::publish (send1), channel1);
node1.network.inbound.sink (nano::publish (send1), channel1);
ASSERT_TIMELY (5s, node1.active.active (send1->qualified_root ()));
auto election (node1.active.election (send1->qualified_root ()));
ASSERT_NE (nullptr, election);
ASSERT_EQ (1, election->blocks ().size ());
node1.network.process_message (nano::publish (send3), channel1);
node1.network.process_message (nano::publish (send2), channel1);
node1.network.inbound.sink (nano::publish (send3), channel1);
node1.network.inbound.sink (nano::publish (send2), channel1);
ASSERT_TIMELY (3s, election->blocks ().size () > 1);
ASSERT_EQ (election->blocks ()[send2->hash ()]->block_signature (), send2->block_signature ());
}
Expand Down
4 changes: 2 additions & 2 deletions nano/core_test/telemetry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ TEST (telemetry, receive_from_non_listening_channel)
nano::system system;
auto node = system.add_node ();
nano::telemetry_ack message (nano::telemetry_data{});
node->network.process_message (message, node->network.udp_channels.create (node->network.endpoint ()));
node->network.inbound.sink (message, node->network.udp_channels.create (node->network.endpoint ()));
// We have not sent a telemetry_req message to this endpoint, so shouldn't count telemetry_ack received from it.
ASSERT_EQ (node->telemetry->telemetry_data_size (), 0);
}
Expand Down Expand Up @@ -632,7 +632,7 @@ TEST (telemetry, remove_peer_invalid_signature)
// Change anything so that the signed message is incorrect
telemetry_data.block_count = 0;
auto telemetry_ack = nano::telemetry_ack (telemetry_data);
node->network.process_message (telemetry_ack, channel);
node->network.inbound.sink (telemetry_ack, channel);

ASSERT_TIMELY (10s, node->stats.count (nano::stat::type::telemetry, nano::stat::detail::invalid_signature) > 0);
ASSERT_NO_ERROR (system.poll_until_true (3s, [&node, address = channel->get_endpoint ().address ()] () -> bool {
Expand Down
2 changes: 1 addition & 1 deletion nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ TEST (websocket, stopped_election)
auto send1 (std::make_shared<nano::send_block> (genesis.hash (), key1.pub, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (genesis.hash ())));
nano::publish publish1 (send1);
auto channel1 (node1->network.udp_channels.create (node1->network.endpoint ()));
node1->network.process_message (publish1, channel1);
node1->network.inbound.sink (publish1, channel1);
node1->block_processor.flush ();
ASSERT_TIMELY (1s, node1->active.election (send1->qualified_root ()));
node1->active.erase (*send1);
Expand Down
7 changes: 6 additions & 1 deletion nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@

#include <numeric>

void nano::network::inbound_t::sink (nano::message const & message, std::shared_ptr<nano::transport::channel> const & channel)
{
network.process_message (message, channel);
}

nano::network::network (nano::node & node_a, uint16_t port_a) :
syn_cookies (node_a.network_params.node.max_peers_per_ip),
buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer
Expand All @@ -20,7 +25,7 @@ nano::network::network (nano::node & node_a, uint16_t port_a) :
node (node_a),
publish_filter (256 * 1024),
udp_channels (node_a, port_a),
thsfs marked this conversation as resolved.
Show resolved Hide resolved
tcp_channels (node_a),
tcp_channels (node_a, inbound),
port (port_a),
disconnect_observer ([] () {})
{
Expand Down
19 changes: 18 additions & 1 deletion nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,19 @@ class syn_cookies final
class network final
{
public:
class inbound_t : public nano::transport::message_sink
thsfs marked this conversation as resolved.
Show resolved Hide resolved
{
public:
inbound_t (nano::network & network) :
network{ network }
{
}
void sink (nano::message const &, std::shared_ptr<nano::transport::channel> const &) override;
nano::network & network;
};

public:
inbound_t inbound{ *this };
network (nano::node &, uint16_t);
~network ();
void start ();
Expand Down Expand Up @@ -154,7 +167,6 @@ class network final
void broadcast_confirm_req_many (std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>>>, std::function<void ()> = nullptr, unsigned = broadcast_interval_ms);
std::shared_ptr<nano::transport::channel> find_node_id (nano::account const &);
std::shared_ptr<nano::transport::channel> find_channel (nano::endpoint const &);
void process_message (nano::message const &, std::shared_ptr<nano::transport::channel> const &);
bool not_a_peer (nano::endpoint const &, bool);
// Should we reach out to this endpoint with a keepalive message
bool reachout (nano::endpoint const &, bool = false);
Expand All @@ -180,6 +192,11 @@ class network final
bool empty () const;
void erase (nano::transport::channel const &);
void set_bandwidth_params (double, size_t);

private:
void process_message (nano::message const &, std::shared_ptr<nano::transport::channel> const &);

public:
nano::message_buffer_manager buffer_container;
boost::asio::ip::udp::resolver resolver;
std::vector<boost::thread> packet_processing_threads;
Expand Down
11 changes: 6 additions & 5 deletions nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ void nano::transport::channel_tcp::set_endpoint ()
}
}

nano::transport::tcp_channels::tcp_channels (nano::node & node_a) :
node (node_a)
nano::transport::tcp_channels::tcp_channels (nano::node & node, nano::transport::message_sink & inbound) :
node{ node },
inbound{ inbound }
{
}

Expand Down Expand Up @@ -296,14 +297,14 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
auto channel (node.network.find_channel (nano::transport::map_tcp_to_endpoint (endpoint_a)));
if (channel)
{
node.network.process_message (message_a, channel);
inbound.sink (message_a, channel);
}
else
{
channel = node.network.find_node_id (node_id_a);
if (channel)
{
node.network.process_message (message_a, channel);
inbound.sink (message_a, channel);
}
else if (!node.network.excluded_peers.check (endpoint_a))
{
Expand All @@ -322,7 +323,7 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
{
insert (temporary_channel, socket_a, nullptr);
}
node.network.process_message (message_a, temporary_channel);
inbound.sink (message_a, temporary_channel);
}
else
{
Expand Down
3 changes: 2 additions & 1 deletion nano/node/transport/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ namespace transport
friend class telemetry_simultaneous_requests_Test;

public:
tcp_channels (nano::node &);
tcp_channels (nano::node &, nano::transport::message_sink & inbound);
bool insert (std::shared_ptr<nano::transport::channel_tcp> const &, std::shared_ptr<nano::socket> const &, std::shared_ptr<nano::bootstrap_server> const &);
void erase (nano::tcp_endpoint const &);
size_t size () const;
Expand Down Expand Up @@ -112,6 +112,7 @@ namespace transport
nano::node & node;

private:
nano::transport::message_sink & inbound;
class endpoint_tag
{
};
Expand Down
Loading