From 94b3e8eb47bd0fc92f6b045a5add8e4484606c89 Mon Sep 17 00:00:00 2001 From: clemahieu Date: Thu, 1 Jul 2021 18:44:21 +0100 Subject: [PATCH 1/2] Merge branch 'network_id/develop' into network_id/squash --- nano/core_test/active_transactions.cpp | 4 +- nano/core_test/confirmation_height.cpp | 8 ++-- nano/core_test/network.cpp | 8 ++-- nano/core_test/node.cpp | 58 +++++++++++++------------- nano/core_test/telemetry.cpp | 4 +- nano/core_test/websocket.cpp | 2 +- nano/node/network.cpp | 7 +++- nano/node/network.hpp | 16 ++++++- nano/node/transport/tcp.cpp | 11 ++--- nano/node/transport/tcp.hpp | 3 +- nano/node/transport/transport.hpp | 5 +++ nano/node/transport/udp.cpp | 2 +- 12 files changed, 77 insertions(+), 51 deletions(-) diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index c62124d05c..7605af73e3 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -256,11 +256,11 @@ TEST (active_transactions, inactive_votes_cache_fork) node.vote_processor.vote (vote, std::make_shared (node)); auto channel1 (node.network.udp_channels.create (node.network.endpoint ())); ASSERT_TIMELY (5s, node.active.inactive_votes_cache_size () == 1); - node.network.process_message (nano::publish (send2), channel1); + node.network.inbound.sink (nano::publish (send2), channel1); node.block_processor.flush (); ASSERT_NE (nullptr, node.block (send2->hash ())); node.scheduler.flush (); // Start election, otherwise conflicting block won't be inserted into election - node.network.process_message (nano::publish (send1), channel1); + node.network.inbound.sink (nano::publish (send1), channel1); node.block_processor.flush (); bool confirmed (false); system.deadline_set (5s); diff --git a/nano/core_test/confirmation_height.cpp b/nano/core_test/confirmation_height.cpp index 9ee02d31aa..904d1a42d2 100644 --- a/nano/core_test/confirmation_height.cpp +++ b/nano/core_test/confirmation_height.cpp @@ -662,19 +662,19 @@ TEST (confirmation_height, conflict_rollback_cemented) auto send2 (std::make_shared (genesis.hash (), key2.pub, nano::genesis_amount - 100, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (genesis.hash ()))); nano::publish publish2 (send2); auto channel1 (node1->network.udp_channels.create (node1->network.endpoint ())); - node1->network.process_message (publish1, channel1); + node1->network.inbound.sink (publish1, channel1); node1->block_processor.flush (); node1->scheduler.flush (); auto channel2 (node2->network.udp_channels.create (node1->network.endpoint ())); - node2->network.process_message (publish2, channel2); + node2->network.inbound.sink (publish2, channel2); node2->block_processor.flush (); node2->scheduler.flush (); ASSERT_EQ (1, node1->active.size ()); ASSERT_EQ (1, node2->active.size ()); system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv); - node1->network.process_message (publish2, channel1); + node1->network.inbound.sink (publish2, channel1); node1->block_processor.flush (); - node2->network.process_message (publish1, channel2); + node2->network.inbound.sink (publish1, channel2); node2->block_processor.flush (); auto election (node2->active.election (nano::qualified_root (genesis.hash (), genesis.hash ()))); ASSERT_NE (nullptr, election); diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 01b8e9877e..dcdc4f08bd 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -361,7 +361,7 @@ TEST (receivable_processor, confirm_insufficient_pos) nano::keypair key1; auto vote (std::make_shared (key1.pub, key1.prv, 0, block1)); nano::confirm_ack con1 (vote); - node1.network.process_message (con1, node1.network.udp_channels.create (node1.network.endpoint ())); + node1.network.inbound.sink (con1, node1.network.udp_channels.create (node1.network.endpoint ())); } TEST (receivable_processor, confirm_sufficient_pos) @@ -375,7 +375,7 @@ TEST (receivable_processor, confirm_sufficient_pos) node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ()); auto vote (std::make_shared (nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, 0, block1)); nano::confirm_ack con1 (vote); - node1.network.process_message (con1, node1.network.udp_channels.create (node1.network.endpoint ())); + node1.network.inbound.sink (con1, node1.network.udp_channels.create (node1.network.endpoint ())); } TEST (receivable_processor, send_with_receive) @@ -989,10 +989,10 @@ TEST (network, duplicate_revert_publish) auto channel = nano::establish_tcp (system, *other_node, node.network.endpoint ()); ASSERT_NE (nullptr, channel); ASSERT_EQ (0, publish.digest); - node.network.process_message (publish, channel); + node.network.inbound.sink (publish, channel); ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); publish.digest = digest; - node.network.process_message (publish, channel); + node.network.inbound.sink (publish, channel); ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); } diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index bdeaa65a5e..f4f48296fc 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -363,7 +363,7 @@ TEST (node, receive_gap) .build_shared (); node1.work_generate_blocking (*block); nano::publish message (block); - node1.network.process_message (message, node1.network.udp_channels.create (node1.network.endpoint ())); + node1.network.inbound.sink (message, node1.network.udp_channels.create (node1.network.endpoint ())); node1.block_processor.flush (); ASSERT_EQ (1, node1.gap_cache.size ()); } @@ -1210,19 +1210,19 @@ TEST (node, fork_flip) .build_shared (); nano::publish publish2 (send2); auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ())); - node1.network.process_message (publish1, channel1); + node1.network.inbound.sink (publish1, channel1); node1.block_processor.flush (); node1.scheduler.flush (); auto channel2 (node2.network.udp_channels.create (node1.network.endpoint ())); - node2.network.process_message (publish2, channel2); + node2.network.inbound.sink (publish2, channel2); node2.block_processor.flush (); node2.scheduler.flush (); ASSERT_EQ (1, node1.active.size ()); ASSERT_EQ (1, node2.active.size ()); system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv); - node1.network.process_message (publish2, channel1); + node1.network.inbound.sink (publish2, channel1); node1.block_processor.flush (); - node2.network.process_message (publish1, channel2); + node2.network.inbound.sink (publish1, channel2); node2.block_processor.flush (); auto election1 (node2.active.election (nano::qualified_root (genesis.hash (), genesis.hash ()))); ASSERT_NE (nullptr, election1); @@ -1285,9 +1285,9 @@ TEST (node, fork_multi_flip) .work (*system.work.generate (publish2.block->hash ())) .build_shared (); nano::publish publish3 (send3); - node1.network.process_message (publish1, node1.network.udp_channels.create (node1.network.endpoint ())); - node2.network.process_message (publish2, node2.network.udp_channels.create (node2.network.endpoint ())); - node2.network.process_message (publish3, node2.network.udp_channels.create (node2.network.endpoint ())); + node1.network.inbound.sink (publish1, node1.network.udp_channels.create (node1.network.endpoint ())); + node2.network.inbound.sink (publish2, node2.network.udp_channels.create (node2.network.endpoint ())); + node2.network.inbound.sink (publish3, node2.network.udp_channels.create (node2.network.endpoint ())); node1.block_processor.flush (); node1.scheduler.flush (); node2.block_processor.flush (); @@ -1295,10 +1295,10 @@ TEST (node, fork_multi_flip) ASSERT_EQ (1, node1.active.size ()); ASSERT_EQ (1, node2.active.size ()); system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv); - node1.network.process_message (publish2, node1.network.udp_channels.create (node1.network.endpoint ())); - node1.network.process_message (publish3, node1.network.udp_channels.create (node1.network.endpoint ())); + node1.network.inbound.sink (publish2, node1.network.udp_channels.create (node1.network.endpoint ())); + node1.network.inbound.sink (publish3, node1.network.udp_channels.create (node1.network.endpoint ())); node1.block_processor.flush (); - node2.network.process_message (publish1, node2.network.udp_channels.create (node2.network.endpoint ())); + node2.network.inbound.sink (publish1, node2.network.udp_channels.create (node2.network.endpoint ())); node2.block_processor.flush (); auto election1 (node2.active.election (nano::qualified_root (genesis.hash (), genesis.hash ()))); ASSERT_NE (nullptr, election1); @@ -1380,7 +1380,7 @@ TEST (node, fork_open) .build_shared (); nano::publish publish1 (send1); auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ())); - node1.network.process_message (publish1, channel1); + node1.network.inbound.sink (publish1, channel1); node1.block_processor.flush (); node1.scheduler.flush (); auto election = node1.active.election (publish1.block->qualified_root ()); @@ -1396,7 +1396,7 @@ TEST (node, fork_open) .work (*system.work.generate (key1.pub)) .build_shared (); nano::publish publish2 (open1); - node1.network.process_message (publish2, channel1); + node1.network.inbound.sink (publish2, channel1); node1.block_processor.flush (); node1.scheduler.flush (); ASSERT_EQ (1, node1.active.size ()); @@ -1409,7 +1409,7 @@ TEST (node, fork_open) .build_shared (); nano::publish publish3 (open2); system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv); - node1.network.process_message (publish3, channel1); + node1.network.inbound.sink (publish3, channel1); node1.block_processor.flush (); node1.scheduler.flush (); election = node1.active.election (publish3.block->qualified_root ()); @@ -2714,14 +2714,14 @@ TEST (node, local_votes_cache) nano::confirm_req message1 (send1); nano::confirm_req message2 (send2); auto channel (node.network.udp_channels.create (node.network.endpoint ())); - node.network.process_message (message1, channel); + node.network.inbound.sink (message1, channel); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 1); - node.network.process_message (message2, channel); + node.network.inbound.sink (message2, channel); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 2); for (auto i (0); i < 100; ++i) { - node.network.process_message (message1, channel); - node.network.process_message (message2, channel); + node.network.inbound.sink (message1, channel); + node.network.inbound.sink (message2, channel); } for (int i = 0; i < 4; ++i) { @@ -2737,7 +2737,7 @@ TEST (node, local_votes_cache) nano::confirm_req message3 (send3); for (auto i (0); i < 100; ++i) { - node.network.process_message (message3, channel); + node.network.inbound.sink (message3, channel); } for (int i = 0; i < 4; ++i) { @@ -2795,26 +2795,26 @@ TEST (node, local_votes_cache_batch) nano::confirm_req message (batch); auto channel (node.network.udp_channels.create (node.network.endpoint ())); // Generates and sends one vote for both hashes which is then cached - node.network.process_message (message, channel); + node.network.inbound.sink (message, channel); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 1); ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); ASSERT_FALSE (node.history.votes (send2->root (), send2->hash ()).empty ()); ASSERT_FALSE (node.history.votes (receive1->root (), receive1->hash ()).empty ()); // Only one confirm_ack should be sent if all hashes are part of the same vote - node.network.process_message (message, channel); + node.network.inbound.sink (message, channel); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 2); ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); // Test when votes are different node.history.erase (send2->root ()); node.history.erase (receive1->root ()); - node.network.process_message (nano::confirm_req (send2->hash (), send2->root ()), channel); + node.network.inbound.sink (nano::confirm_req (send2->hash (), send2->root ()), channel); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 3); ASSERT_EQ (3, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); - node.network.process_message (nano::confirm_req (receive1->hash (), receive1->root ()), channel); + node.network.inbound.sink (nano::confirm_req (receive1->hash (), receive1->root ()), channel); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 4); ASSERT_EQ (4, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); // There are two different votes, so both should be sent in response - node.network.process_message (message, channel); + node.network.inbound.sink (message, channel); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 6); ASSERT_EQ (6, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); } @@ -2830,7 +2830,7 @@ TEST (node, local_votes_cache_generate_new_vote) // Repsond with cached vote nano::confirm_req message1 (genesis.open); auto channel (node.network.udp_channels.create (node.network.endpoint ())); - node.network.process_message (message1, channel); + node.network.inbound.sink (message1, channel); ASSERT_TIMELY (3s, !node.history.votes (genesis.open->root (), genesis.open->hash ()).empty ()); auto votes1 (node.history.votes (genesis.open->root (), genesis.open->hash ())); ASSERT_EQ (1, votes1.size ()); @@ -2850,7 +2850,7 @@ TEST (node, local_votes_cache_generate_new_vote) // One of the hashes is cached std::vector> roots_hashes{ std::make_pair (genesis.open->hash (), genesis.open->root ()), std::make_pair (send1->hash (), send1->root ()) }; nano::confirm_req message2 (roots_hashes); - node.network.process_message (message2, channel); + node.network.inbound.sink (message2, channel); ASSERT_TIMELY (3s, !node.history.votes (send1->root (), send1->hash ()).empty ()); auto votes2 (node.history.votes (send1->root (), send1->hash ())); ASSERT_EQ (1, votes2.size ()); @@ -3259,13 +3259,13 @@ TEST (node, fork_election_invalid_block_signature) .sign (nano::dev_genesis_key.prv, 0) // Invalid signature .build_shared (); auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ())); - node1.network.process_message (nano::publish (send1), channel1); + node1.network.inbound.sink (nano::publish (send1), channel1); ASSERT_TIMELY (5s, node1.active.active (send1->qualified_root ())); auto election (node1.active.election (send1->qualified_root ())); ASSERT_NE (nullptr, election); ASSERT_EQ (1, election->blocks ().size ()); - node1.network.process_message (nano::publish (send3), channel1); - node1.network.process_message (nano::publish (send2), channel1); + node1.network.inbound.sink (nano::publish (send3), channel1); + node1.network.inbound.sink (nano::publish (send2), channel1); ASSERT_TIMELY (3s, election->blocks ().size () > 1); ASSERT_EQ (election->blocks ()[send2->hash ()]->block_signature (), send2->block_signature ()); } diff --git a/nano/core_test/telemetry.cpp b/nano/core_test/telemetry.cpp index 4f3b819800..f12107ba86 100644 --- a/nano/core_test/telemetry.cpp +++ b/nano/core_test/telemetry.cpp @@ -285,7 +285,7 @@ TEST (telemetry, receive_from_non_listening_channel) nano::system system; auto node = system.add_node (); nano::telemetry_ack message (nano::telemetry_data{}); - node->network.process_message (message, node->network.udp_channels.create (node->network.endpoint ())); + node->network.inbound.sink (message, node->network.udp_channels.create (node->network.endpoint ())); // We have not sent a telemetry_req message to this endpoint, so shouldn't count telemetry_ack received from it. ASSERT_EQ (node->telemetry->telemetry_data_size (), 0); } @@ -632,7 +632,7 @@ TEST (telemetry, remove_peer_invalid_signature) // Change anything so that the signed message is incorrect telemetry_data.block_count = 0; auto telemetry_ack = nano::telemetry_ack (telemetry_data); - node->network.process_message (telemetry_ack, channel); + node->network.inbound.sink (telemetry_ack, channel); ASSERT_TIMELY (10s, node->stats.count (nano::stat::type::telemetry, nano::stat::detail::invalid_signature) > 0); ASSERT_NO_ERROR (system.poll_until_true (3s, [&node, address = channel->get_endpoint ().address ()] () -> bool { diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index a98b479974..a129fed8d4 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -144,7 +144,7 @@ TEST (websocket, stopped_election) auto send1 (std::make_shared (genesis.hash (), key1.pub, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (genesis.hash ()))); nano::publish publish1 (send1); auto channel1 (node1->network.udp_channels.create (node1->network.endpoint ())); - node1->network.process_message (publish1, channel1); + node1->network.inbound.sink (publish1, channel1); node1->block_processor.flush (); ASSERT_TIMELY (1s, node1->active.election (send1->qualified_root ())); node1->active.erase (*send1); diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 9f8f344bb0..7c5b5ec3a6 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -11,6 +11,11 @@ #include +void nano::network::inbound_t::sink (nano::message const & message, std::shared_ptr const & channel) +{ + network.process_message (message, channel); +} + nano::network::network (nano::node & node_a, uint16_t port_a) : syn_cookies (node_a.network_params.node.max_peers_per_ip), buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer @@ -20,7 +25,7 @@ nano::network::network (nano::node & node_a, uint16_t port_a) : node (node_a), publish_filter (256 * 1024), udp_channels (node_a, port_a), - tcp_channels (node_a), + tcp_channels (node_a, inbound), port (port_a), disconnect_observer ([] () {}) { diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 6ebd1a65eb..034310105a 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -118,6 +118,18 @@ class syn_cookies final class network final { public: + class inbound_t : public nano::transport::message_sink + { + public: + inbound_t (nano::network & network) : + network{ network } + { + } + void sink (nano::message const &, std::shared_ptr const &) override; + nano::network & network; + }; +public: + inbound_t inbound{ *this }; network (nano::node &, uint16_t); ~network (); void start (); @@ -154,7 +166,6 @@ class network final void broadcast_confirm_req_many (std::deque, std::shared_ptr>>>>, std::function = nullptr, unsigned = broadcast_interval_ms); std::shared_ptr find_node_id (nano::account const &); std::shared_ptr find_channel (nano::endpoint const &); - void process_message (nano::message const &, std::shared_ptr const &); bool not_a_peer (nano::endpoint const &, bool); // Should we reach out to this endpoint with a keepalive message bool reachout (nano::endpoint const &, bool = false); @@ -180,6 +191,9 @@ class network final bool empty () const; void erase (nano::transport::channel const &); void set_bandwidth_params (double, size_t); +private: + void process_message (nano::message const &, std::shared_ptr const &); +public: nano::message_buffer_manager buffer_container; boost::asio::ip::udp::resolver resolver; std::vector packet_processing_threads; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 910e4f47a2..428e757f88 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -110,8 +110,9 @@ void nano::transport::channel_tcp::set_endpoint () } } -nano::transport::tcp_channels::tcp_channels (nano::node & node_a) : - node (node_a) +nano::transport::tcp_channels::tcp_channels (nano::node & node, nano::transport::message_sink & inbound) : + node{ node }, + inbound{ inbound } { } @@ -296,14 +297,14 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa auto channel (node.network.find_channel (nano::transport::map_tcp_to_endpoint (endpoint_a))); if (channel) { - node.network.process_message (message_a, channel); + inbound.sink (message_a, channel); } else { channel = node.network.find_node_id (node_id_a); if (channel) { - node.network.process_message (message_a, channel); + inbound.sink (message_a, channel); } else if (!node.network.excluded_peers.check (endpoint_a)) { @@ -322,7 +323,7 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa { insert (temporary_channel, socket_a, nullptr); } - node.network.process_message (message_a, temporary_channel); + inbound.sink (message_a, temporary_channel); } else { diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 0fae3837db..88ccefb494 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -76,7 +76,7 @@ namespace transport friend class telemetry_simultaneous_requests_Test; public: - tcp_channels (nano::node &); + tcp_channels (nano::node &, nano::transport::message_sink & inbound); bool insert (std::shared_ptr const &, std::shared_ptr const &, std::shared_ptr const &); void erase (nano::tcp_endpoint const &); size_t size () const; @@ -112,6 +112,7 @@ namespace transport nano::node & node; private: + nano::transport::message_sink & inbound; class endpoint_tag { }; diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index b38672ae21..e95cfb4ad9 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -137,6 +137,11 @@ namespace transport protected: nano::node & node; }; + class message_sink + { + public: + virtual void sink (nano::message const &, std::shared_ptr const &) {}; + }; class channel_loopback final : public nano::transport::channel { diff --git a/nano/node/transport/udp.cpp b/nano/node/transport/udp.cpp index 9007eff165..fcb2157b64 100644 --- a/nano/node/transport/udp.cpp +++ b/nano/node/transport/udp.cpp @@ -512,7 +512,7 @@ class udp_message_visitor : public nano::message_visitor node.network.udp_channels.modify (find_channel, [] (std::shared_ptr const & channel_a) { channel_a->set_last_packet_received (std::chrono::steady_clock::now ()); }); - node.network.process_message (message_a, find_channel); + node.network.inbound.sink (message_a, find_channel); } } nano::node & node; From 8470ec24303e99be2170c76d30e990f9f986d761 Mon Sep 17 00:00:00 2001 From: clemahieu Date: Fri, 2 Jul 2021 11:47:31 +0100 Subject: [PATCH 2/2] Convert virtual class to std::function since there's only one method on the interface. --- nano/core_test/active_transactions.cpp | 4 +- nano/core_test/confirmation_height.cpp | 8 ++-- nano/core_test/network.cpp | 8 ++-- nano/core_test/node.cpp | 58 +++++++++++++------------- nano/core_test/telemetry.cpp | 4 +- nano/core_test/websocket.cpp | 2 +- nano/node/network.cpp | 8 +--- nano/node/network.hpp | 14 +------ nano/node/transport/tcp.cpp | 10 ++--- nano/node/transport/tcp.hpp | 4 +- nano/node/transport/transport.hpp | 5 --- nano/node/transport/udp.cpp | 19 +++++---- nano/node/transport/udp.hpp | 3 +- 13 files changed, 65 insertions(+), 82 deletions(-) diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index 7605af73e3..5801a08f43 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -256,11 +256,11 @@ TEST (active_transactions, inactive_votes_cache_fork) node.vote_processor.vote (vote, std::make_shared (node)); auto channel1 (node.network.udp_channels.create (node.network.endpoint ())); ASSERT_TIMELY (5s, node.active.inactive_votes_cache_size () == 1); - node.network.inbound.sink (nano::publish (send2), channel1); + node.network.inbound (nano::publish (send2), channel1); node.block_processor.flush (); ASSERT_NE (nullptr, node.block (send2->hash ())); node.scheduler.flush (); // Start election, otherwise conflicting block won't be inserted into election - node.network.inbound.sink (nano::publish (send1), channel1); + node.network.inbound (nano::publish (send1), channel1); node.block_processor.flush (); bool confirmed (false); system.deadline_set (5s); diff --git a/nano/core_test/confirmation_height.cpp b/nano/core_test/confirmation_height.cpp index 904d1a42d2..77c890f6e4 100644 --- a/nano/core_test/confirmation_height.cpp +++ b/nano/core_test/confirmation_height.cpp @@ -662,19 +662,19 @@ TEST (confirmation_height, conflict_rollback_cemented) auto send2 (std::make_shared (genesis.hash (), key2.pub, nano::genesis_amount - 100, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (genesis.hash ()))); nano::publish publish2 (send2); auto channel1 (node1->network.udp_channels.create (node1->network.endpoint ())); - node1->network.inbound.sink (publish1, channel1); + node1->network.inbound (publish1, channel1); node1->block_processor.flush (); node1->scheduler.flush (); auto channel2 (node2->network.udp_channels.create (node1->network.endpoint ())); - node2->network.inbound.sink (publish2, channel2); + node2->network.inbound (publish2, channel2); node2->block_processor.flush (); node2->scheduler.flush (); ASSERT_EQ (1, node1->active.size ()); ASSERT_EQ (1, node2->active.size ()); system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv); - node1->network.inbound.sink (publish2, channel1); + node1->network.inbound (publish2, channel1); node1->block_processor.flush (); - node2->network.inbound.sink (publish1, channel2); + node2->network.inbound (publish1, channel2); node2->block_processor.flush (); auto election (node2->active.election (nano::qualified_root (genesis.hash (), genesis.hash ()))); ASSERT_NE (nullptr, election); diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index dcdc4f08bd..80d78a4219 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -361,7 +361,7 @@ TEST (receivable_processor, confirm_insufficient_pos) nano::keypair key1; auto vote (std::make_shared (key1.pub, key1.prv, 0, block1)); nano::confirm_ack con1 (vote); - node1.network.inbound.sink (con1, node1.network.udp_channels.create (node1.network.endpoint ())); + node1.network.inbound (con1, node1.network.udp_channels.create (node1.network.endpoint ())); } TEST (receivable_processor, confirm_sufficient_pos) @@ -375,7 +375,7 @@ TEST (receivable_processor, confirm_sufficient_pos) node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ()); auto vote (std::make_shared (nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, 0, block1)); nano::confirm_ack con1 (vote); - node1.network.inbound.sink (con1, node1.network.udp_channels.create (node1.network.endpoint ())); + node1.network.inbound (con1, node1.network.udp_channels.create (node1.network.endpoint ())); } TEST (receivable_processor, send_with_receive) @@ -989,10 +989,10 @@ TEST (network, duplicate_revert_publish) auto channel = nano::establish_tcp (system, *other_node, node.network.endpoint ()); ASSERT_NE (nullptr, channel); ASSERT_EQ (0, publish.digest); - node.network.inbound.sink (publish, channel); + node.network.inbound (publish, channel); ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); publish.digest = digest; - node.network.inbound.sink (publish, channel); + node.network.inbound (publish, channel); ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); } diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index f4f48296fc..7099902fd4 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -363,7 +363,7 @@ TEST (node, receive_gap) .build_shared (); node1.work_generate_blocking (*block); nano::publish message (block); - node1.network.inbound.sink (message, node1.network.udp_channels.create (node1.network.endpoint ())); + node1.network.inbound (message, node1.network.udp_channels.create (node1.network.endpoint ())); node1.block_processor.flush (); ASSERT_EQ (1, node1.gap_cache.size ()); } @@ -1210,19 +1210,19 @@ TEST (node, fork_flip) .build_shared (); nano::publish publish2 (send2); auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ())); - node1.network.inbound.sink (publish1, channel1); + node1.network.inbound (publish1, channel1); node1.block_processor.flush (); node1.scheduler.flush (); auto channel2 (node2.network.udp_channels.create (node1.network.endpoint ())); - node2.network.inbound.sink (publish2, channel2); + node2.network.inbound (publish2, channel2); node2.block_processor.flush (); node2.scheduler.flush (); ASSERT_EQ (1, node1.active.size ()); ASSERT_EQ (1, node2.active.size ()); system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv); - node1.network.inbound.sink (publish2, channel1); + node1.network.inbound (publish2, channel1); node1.block_processor.flush (); - node2.network.inbound.sink (publish1, channel2); + node2.network.inbound (publish1, channel2); node2.block_processor.flush (); auto election1 (node2.active.election (nano::qualified_root (genesis.hash (), genesis.hash ()))); ASSERT_NE (nullptr, election1); @@ -1285,9 +1285,9 @@ TEST (node, fork_multi_flip) .work (*system.work.generate (publish2.block->hash ())) .build_shared (); nano::publish publish3 (send3); - node1.network.inbound.sink (publish1, node1.network.udp_channels.create (node1.network.endpoint ())); - node2.network.inbound.sink (publish2, node2.network.udp_channels.create (node2.network.endpoint ())); - node2.network.inbound.sink (publish3, node2.network.udp_channels.create (node2.network.endpoint ())); + node1.network.inbound (publish1, node1.network.udp_channels.create (node1.network.endpoint ())); + node2.network.inbound (publish2, node2.network.udp_channels.create (node2.network.endpoint ())); + node2.network.inbound (publish3, node2.network.udp_channels.create (node2.network.endpoint ())); node1.block_processor.flush (); node1.scheduler.flush (); node2.block_processor.flush (); @@ -1295,10 +1295,10 @@ TEST (node, fork_multi_flip) ASSERT_EQ (1, node1.active.size ()); ASSERT_EQ (1, node2.active.size ()); system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv); - node1.network.inbound.sink (publish2, node1.network.udp_channels.create (node1.network.endpoint ())); - node1.network.inbound.sink (publish3, node1.network.udp_channels.create (node1.network.endpoint ())); + node1.network.inbound (publish2, node1.network.udp_channels.create (node1.network.endpoint ())); + node1.network.inbound (publish3, node1.network.udp_channels.create (node1.network.endpoint ())); node1.block_processor.flush (); - node2.network.inbound.sink (publish1, node2.network.udp_channels.create (node2.network.endpoint ())); + node2.network.inbound (publish1, node2.network.udp_channels.create (node2.network.endpoint ())); node2.block_processor.flush (); auto election1 (node2.active.election (nano::qualified_root (genesis.hash (), genesis.hash ()))); ASSERT_NE (nullptr, election1); @@ -1380,7 +1380,7 @@ TEST (node, fork_open) .build_shared (); nano::publish publish1 (send1); auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ())); - node1.network.inbound.sink (publish1, channel1); + node1.network.inbound (publish1, channel1); node1.block_processor.flush (); node1.scheduler.flush (); auto election = node1.active.election (publish1.block->qualified_root ()); @@ -1396,7 +1396,7 @@ TEST (node, fork_open) .work (*system.work.generate (key1.pub)) .build_shared (); nano::publish publish2 (open1); - node1.network.inbound.sink (publish2, channel1); + node1.network.inbound (publish2, channel1); node1.block_processor.flush (); node1.scheduler.flush (); ASSERT_EQ (1, node1.active.size ()); @@ -1409,7 +1409,7 @@ TEST (node, fork_open) .build_shared (); nano::publish publish3 (open2); system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv); - node1.network.inbound.sink (publish3, channel1); + node1.network.inbound (publish3, channel1); node1.block_processor.flush (); node1.scheduler.flush (); election = node1.active.election (publish3.block->qualified_root ()); @@ -2714,14 +2714,14 @@ TEST (node, local_votes_cache) nano::confirm_req message1 (send1); nano::confirm_req message2 (send2); auto channel (node.network.udp_channels.create (node.network.endpoint ())); - node.network.inbound.sink (message1, channel); + node.network.inbound (message1, channel); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 1); - node.network.inbound.sink (message2, channel); + node.network.inbound (message2, channel); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 2); for (auto i (0); i < 100; ++i) { - node.network.inbound.sink (message1, channel); - node.network.inbound.sink (message2, channel); + node.network.inbound (message1, channel); + node.network.inbound (message2, channel); } for (int i = 0; i < 4; ++i) { @@ -2737,7 +2737,7 @@ TEST (node, local_votes_cache) nano::confirm_req message3 (send3); for (auto i (0); i < 100; ++i) { - node.network.inbound.sink (message3, channel); + node.network.inbound (message3, channel); } for (int i = 0; i < 4; ++i) { @@ -2795,26 +2795,26 @@ TEST (node, local_votes_cache_batch) nano::confirm_req message (batch); auto channel (node.network.udp_channels.create (node.network.endpoint ())); // Generates and sends one vote for both hashes which is then cached - node.network.inbound.sink (message, channel); + node.network.inbound (message, channel); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 1); ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); ASSERT_FALSE (node.history.votes (send2->root (), send2->hash ()).empty ()); ASSERT_FALSE (node.history.votes (receive1->root (), receive1->hash ()).empty ()); // Only one confirm_ack should be sent if all hashes are part of the same vote - node.network.inbound.sink (message, channel); + node.network.inbound (message, channel); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 2); ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); // Test when votes are different node.history.erase (send2->root ()); node.history.erase (receive1->root ()); - node.network.inbound.sink (nano::confirm_req (send2->hash (), send2->root ()), channel); + node.network.inbound (nano::confirm_req (send2->hash (), send2->root ()), channel); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 3); ASSERT_EQ (3, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); - node.network.inbound.sink (nano::confirm_req (receive1->hash (), receive1->root ()), channel); + node.network.inbound (nano::confirm_req (receive1->hash (), receive1->root ()), channel); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 4); ASSERT_EQ (4, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); // There are two different votes, so both should be sent in response - node.network.inbound.sink (message, channel); + node.network.inbound (message, channel); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 6); ASSERT_EQ (6, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); } @@ -2830,7 +2830,7 @@ TEST (node, local_votes_cache_generate_new_vote) // Repsond with cached vote nano::confirm_req message1 (genesis.open); auto channel (node.network.udp_channels.create (node.network.endpoint ())); - node.network.inbound.sink (message1, channel); + node.network.inbound (message1, channel); ASSERT_TIMELY (3s, !node.history.votes (genesis.open->root (), genesis.open->hash ()).empty ()); auto votes1 (node.history.votes (genesis.open->root (), genesis.open->hash ())); ASSERT_EQ (1, votes1.size ()); @@ -2850,7 +2850,7 @@ TEST (node, local_votes_cache_generate_new_vote) // One of the hashes is cached std::vector> roots_hashes{ std::make_pair (genesis.open->hash (), genesis.open->root ()), std::make_pair (send1->hash (), send1->root ()) }; nano::confirm_req message2 (roots_hashes); - node.network.inbound.sink (message2, channel); + node.network.inbound (message2, channel); ASSERT_TIMELY (3s, !node.history.votes (send1->root (), send1->hash ()).empty ()); auto votes2 (node.history.votes (send1->root (), send1->hash ())); ASSERT_EQ (1, votes2.size ()); @@ -3259,13 +3259,13 @@ TEST (node, fork_election_invalid_block_signature) .sign (nano::dev_genesis_key.prv, 0) // Invalid signature .build_shared (); auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ())); - node1.network.inbound.sink (nano::publish (send1), channel1); + node1.network.inbound (nano::publish (send1), channel1); ASSERT_TIMELY (5s, node1.active.active (send1->qualified_root ())); auto election (node1.active.election (send1->qualified_root ())); ASSERT_NE (nullptr, election); ASSERT_EQ (1, election->blocks ().size ()); - node1.network.inbound.sink (nano::publish (send3), channel1); - node1.network.inbound.sink (nano::publish (send2), channel1); + node1.network.inbound (nano::publish (send3), channel1); + node1.network.inbound (nano::publish (send2), channel1); ASSERT_TIMELY (3s, election->blocks ().size () > 1); ASSERT_EQ (election->blocks ()[send2->hash ()]->block_signature (), send2->block_signature ()); } diff --git a/nano/core_test/telemetry.cpp b/nano/core_test/telemetry.cpp index f12107ba86..9c46221842 100644 --- a/nano/core_test/telemetry.cpp +++ b/nano/core_test/telemetry.cpp @@ -285,7 +285,7 @@ TEST (telemetry, receive_from_non_listening_channel) nano::system system; auto node = system.add_node (); nano::telemetry_ack message (nano::telemetry_data{}); - node->network.inbound.sink (message, node->network.udp_channels.create (node->network.endpoint ())); + node->network.inbound (message, node->network.udp_channels.create (node->network.endpoint ())); // We have not sent a telemetry_req message to this endpoint, so shouldn't count telemetry_ack received from it. ASSERT_EQ (node->telemetry->telemetry_data_size (), 0); } @@ -632,7 +632,7 @@ TEST (telemetry, remove_peer_invalid_signature) // Change anything so that the signed message is incorrect telemetry_data.block_count = 0; auto telemetry_ack = nano::telemetry_ack (telemetry_data); - node->network.inbound.sink (telemetry_ack, channel); + node->network.inbound (telemetry_ack, channel); ASSERT_TIMELY (10s, node->stats.count (nano::stat::type::telemetry, nano::stat::detail::invalid_signature) > 0); ASSERT_NO_ERROR (system.poll_until_true (3s, [&node, address = channel->get_endpoint ().address ()] () -> bool { diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index a129fed8d4..100df8c507 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -144,7 +144,7 @@ TEST (websocket, stopped_election) auto send1 (std::make_shared (genesis.hash (), key1.pub, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (genesis.hash ()))); nano::publish publish1 (send1); auto channel1 (node1->network.udp_channels.create (node1->network.endpoint ())); - node1->network.inbound.sink (publish1, channel1); + node1->network.inbound (publish1, channel1); node1->block_processor.flush (); ASSERT_TIMELY (1s, node1->active.election (send1->qualified_root ())); node1->active.erase (*send1); diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 7c5b5ec3a6..3e4daa2e76 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -11,20 +11,16 @@ #include -void nano::network::inbound_t::sink (nano::message const & message, std::shared_ptr const & channel) -{ - network.process_message (message, channel); -} - nano::network::network (nano::node & node_a, uint16_t port_a) : syn_cookies (node_a.network_params.node.max_peers_per_ip), + inbound{ [this] (nano::message const & message, std::shared_ptr const & channel) { process_message (message, channel); } }, buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer resolver (node_a.io_ctx), limiter (node_a.config.bandwidth_limit_burst_ratio, node_a.config.bandwidth_limit), tcp_message_manager (node_a.config.tcp_incoming_connections_max), node (node_a), publish_filter (256 * 1024), - udp_channels (node_a, port_a), + udp_channels (node_a, port_a, inbound), tcp_channels (node_a, inbound), port (port_a), disconnect_observer ([] () {}) diff --git a/nano/node/network.hpp b/nano/node/network.hpp index acd8b99546..d42f304a75 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -118,19 +118,6 @@ class syn_cookies final class network final { public: - class inbound_t : public nano::transport::message_sink - { - public: - inbound_t (nano::network & network) : - network{ network } - { - } - void sink (nano::message const &, std::shared_ptr const &) override; - nano::network & network; - }; - -public: - inbound_t inbound{ *this }; network (nano::node &, uint16_t); ~network (); void start (); @@ -197,6 +184,7 @@ class network final void process_message (nano::message const &, std::shared_ptr const &); public: + std::function const &)> inbound; nano::message_buffer_manager buffer_container; boost::asio::ip::udp::resolver resolver; std::vector packet_processing_threads; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 428e757f88..31bedb62e6 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -110,9 +110,9 @@ void nano::transport::channel_tcp::set_endpoint () } } -nano::transport::tcp_channels::tcp_channels (nano::node & node, nano::transport::message_sink & inbound) : +nano::transport::tcp_channels::tcp_channels (nano::node & node, std::function const &)> sink) : node{ node }, - inbound{ inbound } + sink{ sink } { } @@ -297,14 +297,14 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa auto channel (node.network.find_channel (nano::transport::map_tcp_to_endpoint (endpoint_a))); if (channel) { - inbound.sink (message_a, channel); + sink (message_a, channel); } else { channel = node.network.find_node_id (node_id_a); if (channel) { - inbound.sink (message_a, channel); + sink (message_a, channel); } else if (!node.network.excluded_peers.check (endpoint_a)) { @@ -323,7 +323,7 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa { insert (temporary_channel, socket_a, nullptr); } - inbound.sink (message_a, temporary_channel); + sink (message_a, temporary_channel); } else { diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 88ccefb494..8fde8f2a8e 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -76,7 +76,7 @@ namespace transport friend class telemetry_simultaneous_requests_Test; public: - tcp_channels (nano::node &, nano::transport::message_sink & inbound); + tcp_channels (nano::node &, std::function const &)> = nullptr); bool insert (std::shared_ptr const &, std::shared_ptr const &, std::shared_ptr const &); void erase (nano::tcp_endpoint const &); size_t size () const; @@ -112,7 +112,7 @@ namespace transport nano::node & node; private: - nano::transport::message_sink & inbound; + std::function const &)> sink; class endpoint_tag { }; diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 0204479b2c..b38672ae21 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -137,11 +137,6 @@ namespace transport protected: nano::node & node; }; - class message_sink - { - public: - virtual void sink (nano::message const &, std::shared_ptr const &){}; - }; class channel_loopback final : public nano::transport::channel { diff --git a/nano/node/transport/udp.cpp b/nano/node/transport/udp.cpp index fcb2157b64..370975a084 100644 --- a/nano/node/transport/udp.cpp +++ b/nano/node/transport/udp.cpp @@ -61,9 +61,10 @@ std::string nano::transport::channel_udp::to_string () const return boost::str (boost::format ("%1%") % endpoint); } -nano::transport::udp_channels::udp_channels (nano::node & node_a, uint16_t port_a) : - node (node_a), - strand (node_a.io_ctx.get_executor ()) +nano::transport::udp_channels::udp_channels (nano::node & node_a, uint16_t port_a, std::function const &)> sink) : + node{ node_a }, + strand{ node_a.io_ctx.get_executor () }, + sink{ sink } { if (!node.flags.disable_udp) { @@ -364,9 +365,10 @@ namespace class udp_message_visitor : public nano::message_visitor { public: - udp_message_visitor (nano::node & node_a, nano::endpoint const & endpoint_a) : - node (node_a), - endpoint (endpoint_a) + udp_message_visitor (nano::node & node_a, nano::endpoint const & endpoint_a, std::function const &)> sink) : + node{ node_a }, + endpoint{ endpoint_a }, + sink{ sink } { } void keepalive (nano::keepalive const & message_a) override @@ -512,11 +514,12 @@ class udp_message_visitor : public nano::message_visitor node.network.udp_channels.modify (find_channel, [] (std::shared_ptr const & channel_a) { channel_a->set_last_packet_received (std::chrono::steady_clock::now ()); }); - node.network.inbound.sink (message_a, find_channel); + sink (message_a, find_channel); } } nano::node & node; nano::endpoint endpoint; + std::function const &)> sink; }; } @@ -537,7 +540,7 @@ void nano::transport::udp_channels::receive_action (nano::message_buffer * data_ } if (allowed_sender) { - udp_message_visitor visitor (node, data_a->endpoint); + udp_message_visitor visitor (node, data_a->endpoint, sink); nano::message_parser parser (node.network.publish_filter, node.block_uniquer, node.vote_uniquer, visitor, node.work); parser.deserialize_buffer (data_a->buffer, data_a->size); if (parser.status == nano::message_parser::parse_status::success) diff --git a/nano/node/transport/udp.hpp b/nano/node/transport/udp.hpp index 4806976771..691f735689 100644 --- a/nano/node/transport/udp.hpp +++ b/nano/node/transport/udp.hpp @@ -75,7 +75,7 @@ namespace transport friend class nano::transport::channel_udp; public: - udp_channels (nano::node &, uint16_t); + udp_channels (nano::node &, uint16_t, std::function const &)> sink); std::shared_ptr insert (nano::endpoint const &, unsigned); void erase (nano::endpoint const &); size_t size () const; @@ -106,6 +106,7 @@ namespace transport void list (std::deque> &, uint8_t = 0); void modify (std::shared_ptr const &, std::function const &)>); nano::node & node; + std::function const &)> sink; private: void close_socket ();