Skip to content

Commit

Permalink
Vote replay in crawler (#2497)
Browse files Browse the repository at this point in the history
* Move vote replay code in to rep_crawler since this is a very infrequently needed codepath and it only really needs to be done once and can be done during rep discovery.

* Directly calling vote_blocking instead of creating a new deque of valid votes.

* Fixing timeout for possibly slow test situations.
  • Loading branch information
clemahieu authored Jan 25, 2020
1 parent 2d503c3 commit b2c094a
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 69 deletions.
19 changes: 9 additions & 10 deletions nano/core_test/ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -742,11 +742,10 @@ TEST (votes, check_signature)
}
auto vote1 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 1, send1));
vote1->signature.bytes[0] ^= 1;
auto transaction (node1.store.tx_begin_read ());
ASSERT_EQ (nano::vote_code::invalid, node1.vote_processor.vote_blocking (transaction, vote1, std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, nano::endpoint (boost::asio::ip::address_v6 (), 0), node1.network_params.protocol.protocol_version)));
ASSERT_EQ (nano::vote_code::invalid, node1.vote_processor.vote_blocking (vote1, std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, nano::endpoint (boost::asio::ip::address_v6 (), 0), node1.network_params.protocol.protocol_version)));
vote1->signature.bytes[0] ^= 1;
ASSERT_EQ (nano::vote_code::vote, node1.vote_processor.vote_blocking (transaction, vote1, std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, nano::endpoint (boost::asio::ip::address_v6 (), 0), node1.network_params.protocol.protocol_version)));
ASSERT_EQ (nano::vote_code::replay, node1.vote_processor.vote_blocking (transaction, vote1, std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, nano::endpoint (boost::asio::ip::address_v6 (), 0), node1.network_params.protocol.protocol_version)));
ASSERT_EQ (nano::vote_code::vote, node1.vote_processor.vote_blocking (vote1, std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, nano::endpoint (boost::asio::ip::address_v6 (), 0), node1.network_params.protocol.protocol_version)));
ASSERT_EQ (nano::vote_code::replay, node1.vote_processor.vote_blocking (vote1, std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, nano::endpoint (boost::asio::ip::address_v6 (), 0), node1.network_params.protocol.protocol_version)));
}

TEST (votes, add_one)
Expand Down Expand Up @@ -878,13 +877,13 @@ TEST (votes, add_old)
votes1 = node1.active.roots.find (send1->qualified_root ())->election;
}
auto channel (std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, node1.network.endpoint (), node1.network_params.protocol.protocol_version));
node1.vote_processor.vote_blocking (transaction, vote1, channel);
node1.vote_processor.vote_blocking (vote1, channel);
nano::keypair key2;
auto send2 (std::make_shared<nano::send_block> (genesis.hash (), key2.pub, 0, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
node1.work_generate_blocking (*send2);
auto vote2 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 1, send2));
votes1->last_votes[nano::test_genesis_key.pub].time = std::chrono::steady_clock::now () - std::chrono::seconds (20);
node1.vote_processor.vote_blocking (transaction, vote2, channel);
node1.vote_processor.vote_blocking (vote2, channel);
ASSERT_EQ (2, votes1->last_votes_size ());
ASSERT_NE (votes1->last_votes.end (), votes1->last_votes.find (nano::test_genesis_key.pub));
ASSERT_EQ (send1->hash (), votes1->last_votes[nano::test_genesis_key.pub].hash);
Expand Down Expand Up @@ -919,12 +918,12 @@ TEST (votes, add_old_different_account)
ASSERT_EQ (1, votes2->last_votes_size ());
auto vote1 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 2, send1));
auto channel (std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, node1.network.endpoint (), node1.network_params.protocol.protocol_version));
auto vote_result1 (node1.vote_processor.vote_blocking (transaction, vote1, channel));
auto vote_result1 (node1.vote_processor.vote_blocking (vote1, channel));
ASSERT_EQ (nano::vote_code::vote, vote_result1);
ASSERT_EQ (2, votes1->last_votes.size ());
ASSERT_EQ (1, votes2->last_votes.size ());
auto vote2 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 1, send2));
auto vote_result2 (node1.vote_processor.vote_blocking (transaction, vote2, channel));
auto vote_result2 (node1.vote_processor.vote_blocking (vote2, channel));
ASSERT_EQ (nano::vote_code::vote, vote_result2);
ASSERT_EQ (2, votes1->last_votes.size ());
ASSERT_EQ (2, votes2->last_votes.size ());
Expand Down Expand Up @@ -957,12 +956,12 @@ TEST (votes, add_cooldown)
}
auto vote1 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 1, send1));
auto channel (std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, node1.network.endpoint (), node1.network_params.protocol.protocol_version));
node1.vote_processor.vote_blocking (transaction, vote1, channel);
node1.vote_processor.vote_blocking (vote1, channel);
nano::keypair key2;
auto send2 (std::make_shared<nano::send_block> (genesis.hash (), key2.pub, 0, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
node1.work_generate_blocking (*send2);
auto vote2 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 2, send2));
node1.vote_processor.vote_blocking (transaction, vote2, channel);
node1.vote_processor.vote_blocking (vote2, channel);
ASSERT_EQ (2, votes1->last_votes.size ());
ASSERT_NE (votes1->last_votes.end (), votes1->last_votes.find (nano::test_genesis_key.pub));
ASSERT_EQ (send1->hash (), votes1->last_votes[nano::test_genesis_key.pub].hash);
Expand Down
37 changes: 14 additions & 23 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2286,34 +2286,31 @@ TEST (node, send_callback)
// This helps representatives continue from their last sequence number if their node is reinitialized and the old sequence number is lost
TEST (node, vote_replay)
{
nano::system system (2);
nano::system system (1);
auto & node1 (*system.nodes[0]);
auto & node2 (*system.nodes[1]);
nano::keypair key;
auto open (std::make_shared<nano::open_block> (0, 1, key.pub, key.prv, key.pub, 0));
node1.work_generate_blocking (*open);
nano::genesis genesis;
for (auto i (0); i < 11000; ++i)
{
auto transaction (node2.store.tx_begin_read ());
auto vote (node2.store.vote_generate (transaction, nano::test_genesis_key.pub, nano::test_genesis_key.prv, open));
auto transaction (node1.store.tx_begin_read ());
auto vote (node1.store.vote_generate (transaction, nano::test_genesis_key.pub, nano::test_genesis_key.prv, genesis.open));
}
auto node2 = system.add_node ();
{
auto transaction (node1.store.tx_begin_read ());
nano::lock_guard<std::mutex> lock (node1.store.get_cache_mutex ());
auto vote (node1.store.vote_current (transaction, nano::test_genesis_key.pub));
auto transaction (node2->store.tx_begin_read ());
nano::lock_guard<std::mutex> lock (node2->store.get_cache_mutex ());
auto vote (node2->store.vote_current (transaction, nano::test_genesis_key.pub));
ASSERT_EQ (nullptr, vote);
}
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
auto block (system.wallet (0)->send_action (nano::test_genesis_key.pub, key.pub, nano::Gxrb_ratio));
ASSERT_NE (nullptr, block);
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
auto done (false);
system.deadline_set (20s);
while (!done)
{
auto ec = system.poll ();
auto transaction (node1.store.tx_begin_read ());
nano::lock_guard<std::mutex> lock (node1.store.get_cache_mutex ());
auto vote (node1.store.vote_current (transaction, nano::test_genesis_key.pub));
auto transaction (node2->store.tx_begin_read ());
nano::lock_guard<std::mutex> lock (node2->store.get_cache_mutex ());
auto vote (node2->store.vote_current (transaction, nano::test_genesis_key.pub));
done = vote && (vote->sequence >= 10000);
ASSERT_NO_ERROR (ec);
}
Expand Down Expand Up @@ -3010,10 +3007,7 @@ TEST (node, fork_invalid_block_signature_vote_by_hash)
std::vector<nano::block_hash> vote_blocks;
vote_blocks.push_back (send2->hash ());
auto vote (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, vote_blocks));
{
auto transaction (node1.store.tx_begin_read ());
node1.vote_processor.vote_blocking (transaction, vote, std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, node1.network.endpoint (), node1.network_params.protocol.protocol_version));
}
node1.vote_processor.vote_blocking (vote, std::make_shared<nano::transport::channel_udp> (node1.network.udp_channels, node1.network.endpoint (), node1.network_params.protocol.protocol_version));
while (node1.block (send1->hash ()))
{
ASSERT_NO_ERROR (system.poll ());
Expand Down Expand Up @@ -3179,10 +3173,7 @@ TEST (node, confirm_back)
std::vector<nano::block_hash> vote_blocks;
vote_blocks.push_back (send2->hash ());
auto vote (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, vote_blocks));
{
auto transaction (node.store.tx_begin_read ());
node.vote_processor.vote_blocking (transaction, vote, std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, node.network.endpoint (), node.network_params.protocol.protocol_version));
}
node.vote_processor.vote_blocking (vote, std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, node.network.endpoint (), node.network_params.protocol.protocol_version));
system.deadline_set (10s);
while (!node.active.empty ())
{
Expand Down
2 changes: 1 addition & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ bootstrap_initiator (*this),
bootstrap (config.peering_port, *this),
application_path (application_path_a),
port_mapping (*this),
vote_processor (checker, active, store, observers, stats, config, logger, online_reps, ledger, network_params),
vote_processor (checker, active, observers, stats, config, logger, online_reps, ledger, network_params),
rep_crawler (*this),
warmed_up (0),
block_processor (*this, write_database_queue),
Expand Down
10 changes: 10 additions & 0 deletions nano/node/repcrawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ void nano::rep_crawler::validate ()
nano::lock_guard<std::mutex> lock (active_mutex);
responses_l.swap (responses);
}
auto transaction (node.store.tx_begin_read ());
auto minimum = node.minimum_principal_weight ();
for (auto const & i : responses_l)
{
Expand Down Expand Up @@ -85,6 +86,15 @@ void nano::rep_crawler::validate ()
}
}
}
// This tries to assist rep nodes that have lost track of their highest sequence number by replaying our highest known vote back to them
// Only do this if the sequence number is significantly different to account for network reordering
// Amplify attack considerations: We're sending out a confirm_ack in response to a confirm_ack for no net traffic increase
auto max_vote (node.store.vote_max (transaction, vote));
if (max_vote->sequence > vote->sequence + 10000)
{
nano::confirm_ack confirm (max_vote);
channel->send (confirm); // this is non essential traffic as it will be resolicited if not received
}
}
}

Expand Down
36 changes: 5 additions & 31 deletions nano/node/vote_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@

#include <boost/format.hpp>

nano::vote_processor::vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::block_store & store_a, nano::node_observers & observers_a, nano::stat & stats_a, nano::node_config & config_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::ledger & ledger_a, nano::network_params & network_params_a) :
nano::vote_processor::vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stat & stats_a, nano::node_config & config_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::ledger & ledger_a, nano::network_params & network_params_a) :
checker (checker_a),
active (active_a),
store (store_a),
observers (observers_a),
stats (stats_a),
config (config_a),
Expand Down Expand Up @@ -54,7 +53,7 @@ void nano::vote_processor::process_loop ()
{
if (!votes.empty ())
{
std::deque<std::pair<std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>>> votes_l;
decltype (votes) votes_l;
votes_l.swap (votes);

log_this_iteration = false;
Expand All @@ -70,20 +69,6 @@ void nano::vote_processor::process_loop ()
is_active = true;
lock.unlock ();
verify_votes (votes_l);
{
auto transaction (store.tx_begin_read ());
uint64_t count (1);
for (auto & i : votes_l)
{
vote_blocking (transaction, i.first, i.second, true);
// Free active_transactions mutex each 100 processed votes
if (count % 100 == 0)
{
transaction.refresh ();
}
count++;
}
}
lock.lock ();
is_active = false;

Expand Down Expand Up @@ -155,7 +140,7 @@ void nano::vote_processor::vote (std::shared_ptr<nano::vote> vote_a, std::shared
}
}

void nano::vote_processor::verify_votes (std::deque<std::pair<std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>>> & votes_a)
void nano::vote_processor::verify_votes (decltype (votes) const & votes_a)
{
auto size (votes_a.size ());
std::vector<unsigned char const *> messages;
Expand All @@ -178,37 +163,26 @@ void nano::vote_processor::verify_votes (std::deque<std::pair<std::shared_ptr<na
}
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () };
checker.verify (check);
std::remove_reference_t<decltype (votes_a)> result;
auto i (0);
for (auto const & vote : votes_a)
{
assert (verifications[i] == 1 || verifications[i] == 0);
if (verifications[i] == 1)
{
result.push_back (vote);
vote_blocking (vote.first, vote.second, true);
}
++i;
}
votes_a.swap (result);
}

// node.active.mutex lock required
nano::vote_code nano::vote_processor::vote_blocking (nano::transaction const & transaction_a, std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a, bool validated)
nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a, bool validated)
{
auto result (nano::vote_code::invalid);
if (validated || !vote_a->validate ())
{
result = active.vote (vote_a);
observers.vote.notify (vote_a, channel_a, result);
// This tries to assist rep nodes that have lost track of their highest sequence number by replaying our highest known vote back to them
// Only do this if the sequence number is significantly different to account for network reordering
// Amplify attack considerations: We're sending out a confirm_ack in response to a confirm_ack for no net traffic increase
auto max_vote (store.vote_max (transaction_a, vote_a));
if (max_vote->sequence > vote_a->sequence + 10000)
{
nano::confirm_ack confirm (max_vote);
channel_a->send (confirm); // this is non essential traffic as it will be resolicited if not received
}
}
std::string status;
switch (result)
Expand Down
7 changes: 3 additions & 4 deletions nano/node/vote_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ namespace transport
class vote_processor final
{
public:
explicit vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::block_store & store_a, nano::node_observers & observers_a, nano::stat & stats_a, nano::node_config & config_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::ledger & ledger_a, nano::network_params & network_params_a);
explicit vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stat & stats_a, nano::node_config & config_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::ledger & ledger_a, nano::network_params & network_params_a);
void vote (std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>);
/** Note: node.active.mutex lock is required */
nano::vote_code vote_blocking (nano::transaction const &, std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>, bool = false);
void verify_votes (std::deque<std::pair<std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>>> &);
nano::vote_code vote_blocking (std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>, bool = false);
void verify_votes (std::deque<std::pair<std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>>> const &);
void flush ();
void calculate_weights ();
void stop ();
Expand All @@ -46,7 +46,6 @@ class vote_processor final

nano::signature_checker & checker;
nano::active_transactions & active;
nano::block_store & store;
nano::node_observers & observers;
nano::stat & stats;
nano::node_config & config;
Expand Down

0 comments on commit b2c094a

Please sign in to comment.