Skip to content

Commit

Permalink
Adjust request aggregator stats (#2507)
Browse files Browse the repository at this point in the history
* Adjust request aggregator stats

And ensure unique hashes before generating votes

* Move hash duplicate removal to before aggregation, and add a test

* Change stats to be more obvious without reading documentation

* Fix a rare failure in a test due to rep crawler sending an extra request
  • Loading branch information
guilhermelawless authored Jan 25, 2020
1 parent b2c094a commit d1f04a3
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 67 deletions.
117 changes: 71 additions & 46 deletions nano/core_test/request_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ TEST (request_aggregator, one)
{
ASSERT_NO_ERROR (system.poll ());
}
// Not yet in the ledger, should be ignored
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
// Not yet in the ledger
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code);
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
// In the ledger but no vote generated yet
// Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0)
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
Expand All @@ -47,11 +47,11 @@ TEST (request_aggregator, one)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (3, node.stats.count (nano::stat::type::requests, nano::stat::detail::all));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped));
ASSERT_EQ (3, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}

Expand Down Expand Up @@ -79,16 +79,18 @@ TEST (request_aggregator, one_update)
// In the ledger but no vote generated yet
// Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0)
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node.aggregator.empty ());
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::all));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped));
ASSERT_EQ (2, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}

Expand All @@ -114,12 +116,11 @@ TEST (request_aggregator, two)
// One vote should be generated for both blocks
// Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0)
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node.aggregator.empty ());
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
// The same request should now send the cached vote
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
Expand All @@ -128,11 +129,13 @@ TEST (request_aggregator, two)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::all));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped));
ASSERT_EQ (2, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
// Make sure the cached vote is for both hashes
auto vote1 (node.votes_cache.find (send1->hash ()));
Expand All @@ -147,9 +150,11 @@ TEST (request_aggregator, two_endpoints)
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node1 (*system.add_node (node_config));
nano::node_flags node_flags;
node_flags.disable_rep_crawler = true;
auto & node1 (*system.add_node (node_config, node_flags));
node_config.peering_port = nano::get_available_port ();
auto & node2 (*system.add_node (node_config));
auto & node2 (*system.add_node (node_config, node_flags));
nano::genesis genesis;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node1.work_generate_blocking (genesis.hash ())));
Expand All @@ -169,11 +174,13 @@ TEST (request_aggregator, two_endpoints)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (2, node1.stats.count (nano::stat::type::requests, nano::stat::detail::all));
ASSERT_EQ (0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped));
ASSERT_EQ (2, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
ASSERT_EQ (0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
}

TEST (request_aggregator, split)
Expand Down Expand Up @@ -212,26 +219,19 @@ TEST (request_aggregator, split)
// In the ledger but no vote generated yet
// Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) < 2)
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) < 2)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node.aggregator.empty ());
// Two votes were sent, the first one for 12 hashes and the second one for 1 hash
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::all));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped));
ASSERT_EQ (1, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
ASSERT_EQ (13, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
auto transaction (node.store.tx_begin_read ());
auto pre_last_hash (node.store.block_get (transaction, previous)->previous ());
auto vote1 (node.votes_cache.find (pre_last_hash));
ASSERT_EQ (1, vote1.size ());
ASSERT_EQ (max_vbh, vote1[0]->blocks.size ());
auto vote2 (node.votes_cache.find (previous));
ASSERT_EQ (1, vote2.size ());
ASSERT_EQ (1, vote2[0]->blocks.size ());
}

TEST (request_aggregator, channel_lifetime)
Expand All @@ -253,7 +253,7 @@ TEST (request_aggregator, channel_lifetime)
}
ASSERT_EQ (1, node.aggregator.size ());
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0)
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
Expand Down Expand Up @@ -285,7 +285,7 @@ TEST (request_aggregator, channel_update)
// channel1 is not being held anymore
ASSERT_EQ (nullptr, channel1_w.lock ());
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0)
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
Expand All @@ -308,8 +308,33 @@ TEST (request_aggregator, channel_max_queue)
node.aggregator.add (channel, request);
node.aggregator.add (channel, request);
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped) < 1)
while (node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped) < 1)
{
ASSERT_NO_ERROR (system.poll ());
}
}
}

TEST (request_aggregator, unique)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node (*system.add_node (node_config));
nano::genesis genesis;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (genesis.hash ())));
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code);
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send1->hash (), send1->root ());
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
node.aggregator.add (channel, request);
node.aggregator.add (channel, request);
node.aggregator.add (channel, request);
node.aggregator.add (channel, request);
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) < 1)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
}
28 changes: 20 additions & 8 deletions nano/lib/stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,9 @@ std::string nano::stat::type_to_string (uint32_t key)
case nano::stat::type::drop:
res = "drop";
break;
case nano::stat::type::aggregator:
res = "aggregator";
break;
case nano::stat::type::requests:
res = "requests";
break;
Expand Down Expand Up @@ -655,17 +658,26 @@ std::string nano::stat::detail_to_string (uint32_t key)
case nano::stat::detail::blocks_confirmed:
res = "blocks_confirmed";
break;
case nano::stat::detail::requests_cached:
res = "requests_votes_cached";
case nano::stat::detail::aggregator_accepted:
res = "aggregator_accepted";
break;
case nano::stat::detail::aggregator_dropped:
res = "aggregator_dropped";
break;
case nano::stat::detail::requests_cached_hashes:
res = "requests_cached_hashes";
break;
case nano::stat::detail::requests_generated_hashes:
res = "requests_generated_hashes";
break;
case nano::stat::detail::requests_generated:
res = "requests_votes_generated";
case nano::stat::detail::requests_cached_votes:
res = "requests_cached_votes";
break;
case nano::stat::detail::requests_ignored:
res = "requests_votes_ignored";
case nano::stat::detail::requests_generated_votes:
res = "requests_generated_votes";
break;
case nano::stat::detail::requests_dropped:
res = "requests_dropped";
case nano::stat::detail::requests_unknown:
res = "requests_unknown";
break;
}
return res;
Expand Down
14 changes: 10 additions & 4 deletions nano/lib/stats.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ class stat final
observer,
confirmation_height,
drop,
aggregator,
requests
};

Expand Down Expand Up @@ -299,11 +300,16 @@ class stat final
blocks_confirmed,
invalid_block,

// [request] aggregator
aggregator_accepted,
aggregator_dropped,

// requests
requests_cached,
requests_generated,
requests_ignored,
requests_dropped
requests_cached_hashes,
requests_generated_hashes,
requests_cached_votes,
requests_generated_votes,
requests_unknown
};

/** Direction of the stat. If the direction is irrelevant, use in */
Expand Down
27 changes: 19 additions & 8 deletions nano/node/request_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ void nano::request_aggregator::add (std::shared_ptr<nano::transport::channel> &
condition.notify_all ();
}
}
if (error)
{
stats.inc (nano::stat::type::requests, nano::stat::detail::requests_dropped);
}
stats.inc (nano::stat::type::aggregator, !error ? nano::stat::detail::aggregator_accepted : nano::stat::detail::aggregator_dropped);
}

void nano::request_aggregator::run ()
Expand Down Expand Up @@ -144,13 +141,25 @@ bool nano::request_aggregator::empty ()

std::vector<nano::block_hash> nano::request_aggregator::aggregate (nano::transaction const & transaction_a, channel_pool & pool_a) const
{
// Unique hashes
using pair = decltype (pool_a.hashes_roots)::value_type;
std::sort (pool_a.hashes_roots.begin (), pool_a.hashes_roots.end (), [](pair const & pair1, pair const & pair2) {
return pair1.first < pair2.first;
});
pool_a.hashes_roots.erase (std::unique (pool_a.hashes_roots.begin (), pool_a.hashes_roots.end (), [](pair const & pair1, pair const & pair2) {
return pair1.first == pair2.first;
}),
pool_a.hashes_roots.end ());

size_t cached_hashes = 0;
std::vector<nano::block_hash> to_generate;
std::vector<std::shared_ptr<nano::vote>> cached_votes;
for (auto const & hash_root : pool_a.hashes_roots)
{
auto find_votes (votes_cache.find (hash_root.first));
if (!find_votes.empty ())
{
++cached_hashes;
cached_votes.insert (cached_votes.end (), find_votes.begin (), find_votes.end ());
}
else if (!hash_root.first.is_zero () && store.block_exists (transaction_a, hash_root.first))
Expand Down Expand Up @@ -189,7 +198,7 @@ std::vector<nano::block_hash> nano::request_aggregator::aggregate (nano::transac
}
else
{
stats.inc (nano::stat::type::requests, nano::stat::detail::requests_ignored);
stats.inc (nano::stat::type::requests, nano::stat::detail::requests_unknown, stat::dir::in);
}
}
}
Expand All @@ -201,11 +210,12 @@ std::vector<nano::block_hash> nano::request_aggregator::aggregate (nano::transac
nano::confirm_ack confirm (vote);
pool_a.channel->send (confirm);
}
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached, stat::dir::in, cached_votes.size ());
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes, stat::dir::in, cached_hashes);
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_votes, stat::dir::in, cached_votes.size ());
return to_generate;
}

void nano::request_aggregator::generate (nano::transaction const & transaction_a, std::vector<nano::block_hash> const hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const
void nano::request_aggregator::generate (nano::transaction const & transaction_a, std::vector<nano::block_hash> hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const
{
size_t generated_l = 0;
auto i (hashes_a.begin ());
Expand All @@ -225,7 +235,8 @@ void nano::request_aggregator::generate (nano::transaction const & transaction_a
this->votes_cache.add (vote);
});
}
stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated, stat::dir::in, generated_l);
stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes, stat::dir::in, hashes_a.size ());
stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated_votes, stat::dir::in, generated_l);
}

std::unique_ptr<nano::container_info_component> nano::collect_container_info (nano::request_aggregator & aggregator, const std::string & name)
Expand Down
2 changes: 1 addition & 1 deletion nano/node/request_aggregator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class request_aggregator final
/** Aggregate and send cached votes for \p pool_a, returning the leftovers that were not found in cached votes **/
std::vector<nano::block_hash> aggregate (nano::transaction const &, channel_pool & pool_a) const;
/** Generate and send votes from \p hashes_a to \p channel_a, does not need a lock on the mutex **/
void generate (nano::transaction const &, std::vector<nano::block_hash> const hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const;
void generate (nano::transaction const &, std::vector<nano::block_hash> hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const;

unsigned const max_consecutive_requests;

Expand Down

0 comments on commit d1f04a3

Please sign in to comment.