From 9c7d41787cdabab8e9a0f3b32deb755cfa44821f Mon Sep 17 00:00:00 2001 From: Colin LeMahieu Date: Tue, 14 May 2024 22:45:14 +0100 Subject: [PATCH] Track the number of active elections by each bucket and cap at the bucket maximum. The number of elections started for a particular bucket is now limited on a per-bucket basis rather than globally across all buckets. --- nano/core_test/active_elections.cpp | 109 ++------------------------ nano/core_test/election_scheduler.cpp | 93 ---------------------- nano/core_test/scheduler_buckets.cpp | 95 +++++++++++----------- nano/node/active_elections.cpp | 1 + nano/node/active_elections.hpp | 1 + nano/node/node.cpp | 4 +- nano/node/scheduler/bucket.cpp | 3 + nano/node/scheduler/bucket.hpp | 8 +- nano/node/scheduler/buckets.cpp | 70 ++++------------- nano/node/scheduler/buckets.hpp | 11 +-- nano/node/scheduler/priority.cpp | 85 +++++++++++++------- nano/node/scheduler/priority.hpp | 7 +- 12 files changed, 153 insertions(+), 334 deletions(-) diff --git a/nano/core_test/active_elections.cpp b/nano/core_test/active_elections.cpp index 29cd87f70e..590587a82d 100644 --- a/nano/core_test/active_elections.cpp +++ b/nano/core_test/active_elections.cpp @@ -163,7 +163,7 @@ TEST (active_elections, keep_local) nano::node_config node_config = system.default_config (); node_config.enable_voting = false; // Bound to 2, won't drop wallet created transactions, but good to test dropping remote - node_config.active_elections.size = 2; + node_config.priority_scheduler.bucket_maximum = 2; // Disable frontier confirmation to allow the test to finish before node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; @@ -192,7 +192,10 @@ TEST (active_elections, keep_local) ASSERT_NE (nullptr, send6); // force-confirm blocks - nano::test::confirm (node.ledger, send6); + auto election = nano::test::start_election (system, node, send6->hash ()); + ASSERT_NE (nullptr, election); + election->force_confirm (); + ASSERT_TIMELY (5s, node.active.empty ()); nano::state_block_builder builder{}; const auto receive1 = builder.make_block () @@ -227,8 +230,7 @@ TEST (active_elections, keep_local) node.process_active (receive3); /// bound elections, should drop after one loop - ASSERT_TIMELY_EQ (5s, node.active.size (), node_config.active_elections.size); - // ASSERT_EQ (1, node.scheduler.size ()); + ASSERT_TIMELY_EQ (5s, node.active.size (), node_config.priority_scheduler.bucket_maximum); } TEST (inactive_votes_cache, basic) @@ -433,7 +435,6 @@ TEST (inactive_votes_cache, election_start) nano::test::system system; nano::node_config node_config = system.default_config (); node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; - node_config.priority_scheduler.enabled = false; node_config.optimistic_scheduler.enabled = false; auto & node = *system.add_node (node_config); nano::block_hash latest (node.latest (nano::dev::genesis_key.pub)); @@ -1389,101 +1390,3 @@ TEST (active_elections, limit_vote_hinted_elections) // Ensure there was no overflow of elections ASSERT_EQ (0, node.stats.count (nano::stat::type::active_dropped, nano::stat::detail::priority)); } - -/* - * Tests that when AEC is running at capacity from normal elections, it is still possible to schedule a limited number of hinted elections - */ -TEST (active_elections, allow_limited_overflow) -{ - nano::test::system system; - nano::node_config config = system.default_config (); - const int aec_limit = 20; - config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; - config.active_elections.size = aec_limit; - config.active_elections.hinted_limit_percentage = 20; // Should give us a limit of 4 hinted elections - auto & node = *system.add_node (config); - - auto blocks = nano::test::setup_independent_blocks (system, node, aec_limit * 4); - - // Split blocks in two halves - std::vector> blocks1 (blocks.begin (), blocks.begin () + blocks.size () / 2); - std::vector> blocks2 (blocks.begin () + blocks.size () / 2, blocks.end ()); - - // Even though automatic frontier confirmation is disabled, AEC is doing funny stuff and inserting elections, clear that - WAIT (1s); - node.active.clear (); - ASSERT_TRUE (node.active.empty ()); - - // Insert the first part of the blocks into normal election scheduler - for (auto const & block : blocks1) - { - node.scheduler.priority.activate (node.ledger.tx_begin_read (), block->account ()); - } - - // Ensure number of active elections reaches AEC limit and there is no overfill - ASSERT_TIMELY_EQ (5s, node.active.size (), node.active.limit (nano::election_behavior::priority)); - // And it stays that way without increasing - ASSERT_ALWAYS (1s, node.active.size () == node.active.limit (nano::election_behavior::priority)); - - // Insert votes for the second part of the blocks, so that those are scheduled as hinted elections - for (auto const & block : blocks2) - { - // Non-final vote, so it stays in the AEC without getting confirmed - auto vote = nano::test::make_vote (nano::dev::genesis_key, { block }); - node.vote_cache.insert (vote); - } - - // Ensure active elections overfill AEC only up to normal + hinted limit - ASSERT_TIMELY_EQ (5s, node.active.size (), node.active.limit (nano::election_behavior::priority) + node.active.limit (nano::election_behavior::hinted)); - // And it stays that way without increasing - ASSERT_ALWAYS (1s, node.active.size () == node.active.limit (nano::election_behavior::priority) + node.active.limit (nano::election_behavior::hinted)); -} - -/* - * Tests that when hinted elections are present in the AEC, normal scheduler adapts not to exceed the limit of all elections - */ -TEST (active_elections, allow_limited_overflow_adapt) -{ - nano::test::system system; - nano::node_config config = system.default_config (); - const int aec_limit = 20; - config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; - config.active_elections.size = aec_limit; - config.active_elections.hinted_limit_percentage = 20; // Should give us a limit of 4 hinted elections - auto & node = *system.add_node (config); - - auto blocks = nano::test::setup_independent_blocks (system, node, aec_limit * 4); - - // Split blocks in two halves - std::vector> blocks1 (blocks.begin (), blocks.begin () + blocks.size () / 2); - std::vector> blocks2 (blocks.begin () + blocks.size () / 2, blocks.end ()); - - // Even though automatic frontier confirmation is disabled, AEC is doing funny stuff and inserting elections, clear that - WAIT (1s); - node.active.clear (); - ASSERT_TRUE (node.active.empty ()); - - // Insert votes for the second part of the blocks, so that those are scheduled as hinted elections - for (auto const & block : blocks2) - { - // Non-final vote, so it stays in the AEC without getting confirmed - auto vote = nano::test::make_vote (nano::dev::genesis_key, { block }); - node.vote_cache.insert (vote); - } - - // Ensure hinted election amount is bounded by hinted limit - ASSERT_TIMELY_EQ (5s, node.active.size (), node.active.limit (nano::election_behavior::hinted)); - // And it stays that way without increasing - ASSERT_ALWAYS (1s, node.active.size () == node.active.limit (nano::election_behavior::hinted)); - - // Insert the first part of the blocks into normal election scheduler - for (auto const & block : blocks1) - { - node.scheduler.priority.activate (node.ledger.tx_begin_read (), block->account ()); - } - - // Ensure number of active elections reaches AEC limit and there is no overfill - ASSERT_TIMELY_EQ (5s, node.active.size (), node.active.limit (nano::election_behavior::priority)); - // And it stays that way without increasing - ASSERT_ALWAYS (1s, node.active.size () == node.active.limit (nano::election_behavior::priority)); -} diff --git a/nano/core_test/election_scheduler.cpp b/nano/core_test/election_scheduler.cpp index 02a37bec89..8c525e9e1a 100644 --- a/nano/core_test/election_scheduler.cpp +++ b/nano/core_test/election_scheduler.cpp @@ -53,96 +53,3 @@ TEST (election_scheduler, activate_one_flush) system.nodes[0]->scheduler.priority.activate (system.nodes[0]->ledger.tx_begin_read (), nano::dev::genesis_key.pub); ASSERT_TIMELY (5s, system.nodes[0]->active.election (send1->qualified_root ())); } - -/** - * Tests that the election scheduler and the active transactions container (AEC) - * work in sync with regards to the node configuration value "active_elections.size". - * - * The test sets up two forcefully cemented blocks -- a send on the genesis account and a receive on a second account. - * It then creates two other blocks, each a successor to one of the previous two, - * and processes them locally (without the node starting elections for them, but just saving them to disk). - * - * Elections for these latter two (B1 and B2) are started by the test code manually via `election_scheduler::activate`. - * The test expects E1 to start right off and take its seat into the AEC. - * E2 is expected not to start though (because the AEC is full), so B2 should be awaiting in the scheduler's queue. - * - * As soon as the test code manually confirms E1 (and thus evicts it out of the AEC), - * it is expected that E2 begins and the scheduler's queue becomes empty again. - */ -TEST (election_scheduler, no_vacancy) -{ - nano::test::system system{}; - - nano::node_config config = system.default_config (); - config.active_elections.size = 1; - config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; - - auto & node = *system.add_node (config); - nano::state_block_builder builder{}; - nano::keypair key{}; - - // Activating accounts depends on confirmed dependencies. First, prepare 2 accounts - auto send = builder.make_block () - .account (nano::dev::genesis_key.pub) - .previous (nano::dev::genesis->hash ()) - .representative (nano::dev::genesis_key.pub) - .link (key.pub) - .balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio) - .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) - .work (*system.work.generate (nano::dev::genesis->hash ())) - .build (); - ASSERT_EQ (nano::block_status::progress, node.process (send)); - node.process_confirmed (nano::election_status{ send }); - - auto receive = builder.make_block () - .account (key.pub) - .previous (0) - .representative (key.pub) - .link (send->hash ()) - .balance (nano::Gxrb_ratio) - .sign (key.prv, key.pub) - .work (*system.work.generate (key.pub)) - .build (); - ASSERT_EQ (nano::block_status::progress, node.process (receive)); - node.process_confirmed (nano::election_status{ receive }); - - ASSERT_TIMELY (5s, nano::test::confirmed (node, { send, receive })); - - // Second, process two eligible transactions - auto block1 = builder.make_block () - .account (nano::dev::genesis_key.pub) - .previous (send->hash ()) - .representative (nano::dev::genesis_key.pub) - .link (nano::dev::genesis_key.pub) - .balance (nano::dev::constants.genesis_amount - 2 * nano::Gxrb_ratio) - .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) - .work (*system.work.generate (send->hash ())) - .build (); - ASSERT_EQ (nano::block_status::progress, node.process (block1)); - - // There is vacancy so it should be inserted - node.scheduler.priority.activate (node.ledger.tx_begin_read (), nano::dev::genesis_key.pub); - std::shared_ptr election{}; - ASSERT_TIMELY (5s, (election = node.active.election (block1->qualified_root ())) != nullptr); - - auto block2 = builder.make_block () - .account (key.pub) - .previous (receive->hash ()) - .representative (key.pub) - .link (key.pub) - .balance (0) - .sign (key.prv, key.pub) - .work (*system.work.generate (receive->hash ())) - .build (); - ASSERT_EQ (nano::block_status::progress, node.process (block2)); - - // There is no vacancy so it should stay queued - node.scheduler.priority.activate (node.ledger.tx_begin_read (), key.pub); - ASSERT_TIMELY_EQ (5s, node.scheduler.priority.size (), 1); - ASSERT_EQ (node.active.election (block2->qualified_root ()), nullptr); - - // Election confirmed, next in queue should begin - election->force_confirm (); - ASSERT_TIMELY (5s, node.active.election (block2->qualified_root ()) != nullptr); - ASSERT_TRUE (node.scheduler.priority.empty ()); -} diff --git a/nano/core_test/scheduler_buckets.cpp b/nano/core_test/scheduler_buckets.cpp index 5cfaf57bf4..7fb94ab5df 100644 --- a/nano/core_test/scheduler_buckets.cpp +++ b/nano/core_test/scheduler_buckets.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -118,7 +119,7 @@ TEST (buckets, construction) TEST (buckets, insert_Gxrb) { nano::scheduler::buckets buckets; - buckets.push (1000, block0 (), nano::Gxrb_ratio); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); ASSERT_EQ (1, buckets.size ()); ASSERT_EQ (1, buckets.bucket_size (nano::Gxrb_ratio)); } @@ -126,7 +127,7 @@ TEST (buckets, insert_Gxrb) TEST (buckets, insert_Mxrb) { nano::scheduler::buckets buckets; - buckets.push (1000, block1 (), nano::Mxrb_ratio); + buckets.bucket (nano::Mxrb_ratio).push (1000, block1 ()); ASSERT_EQ (1, buckets.size ()); ASSERT_EQ (1, buckets.bucket_size (nano::Mxrb_ratio)); } @@ -135,8 +136,8 @@ TEST (buckets, insert_Mxrb) TEST (buckets, insert_same_priority) { nano::scheduler::buckets buckets; - buckets.push (1000, block0 (), nano::Gxrb_ratio); - buckets.push (1000, block2 (), nano::Gxrb_ratio); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); + buckets.bucket (nano::Gxrb_ratio).push (1000, block2 ()); ASSERT_EQ (2, buckets.size ()); ASSERT_EQ (2, buckets.bucket_size (nano::Gxrb_ratio)); } @@ -145,8 +146,8 @@ TEST (buckets, insert_same_priority) TEST (buckets, insert_duplicate) { nano::scheduler::buckets buckets; - buckets.push (1000, block0 (), nano::Gxrb_ratio); - buckets.push (1000, block0 (), nano::Gxrb_ratio); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); ASSERT_EQ (1, buckets.size ()); ASSERT_EQ (1, buckets.bucket_size (nano::Gxrb_ratio)); } @@ -154,89 +155,89 @@ TEST (buckets, insert_duplicate) TEST (buckets, insert_older) { nano::scheduler::buckets buckets; - buckets.push (1000, block0 (), nano::Gxrb_ratio); - buckets.push (1100, block2 (), nano::Gxrb_ratio); - ASSERT_EQ (block0 (), buckets.top ()); - buckets.pop (); - ASSERT_EQ (block2 (), buckets.top ()); - buckets.pop (); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); + buckets.bucket (nano::Gxrb_ratio).push (1100, block2 ()); + ASSERT_EQ (block0 (), buckets.bucket (nano::Gxrb_ratio).top ()); + buckets.bucket (nano::Gxrb_ratio).pop (); + ASSERT_EQ (block2 (), buckets.bucket (nano::Gxrb_ratio).top ()); + buckets.bucket (nano::Gxrb_ratio).pop (); } TEST (buckets, pop) { nano::scheduler::buckets buckets; ASSERT_TRUE (buckets.empty ()); - buckets.push (1000, block0 (), nano::Gxrb_ratio); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); ASSERT_FALSE (buckets.empty ()); - buckets.pop (); + buckets.bucket (nano::Gxrb_ratio).pop (); ASSERT_TRUE (buckets.empty ()); } TEST (buckets, top_one) { nano::scheduler::buckets buckets; - buckets.push (1000, block0 (), nano::Gxrb_ratio); - ASSERT_EQ (block0 (), buckets.top ()); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); + ASSERT_EQ (block0 (), buckets.bucket (nano::Gxrb_ratio).top ()); } TEST (buckets, top_two) { nano::scheduler::buckets buckets; - buckets.push (1000, block0 (), nano::Gxrb_ratio); - buckets.push (1, block1 (), nano::Mxrb_ratio); - ASSERT_EQ (block0 (), buckets.top ()); - buckets.pop (); - ASSERT_EQ (block1 (), buckets.top ()); - buckets.pop (); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); + buckets.bucket (nano::Mxrb_ratio).push (1, block1 ()); + ASSERT_EQ (block0 (), buckets.bucket (nano::Gxrb_ratio).top ()); + buckets.bucket (nano::Gxrb_ratio).pop (); + ASSERT_EQ (block1 (), buckets.bucket (nano::Mxrb_ratio).top ()); + buckets.bucket (nano::Mxrb_ratio).pop (); ASSERT_TRUE (buckets.empty ()); } TEST (buckets, top_round_robin) { nano::scheduler::buckets buckets; - buckets.push (1000, blockzero (), 0); - ASSERT_EQ (blockzero (), buckets.top ()); - buckets.push (1000, block0 (), nano::Gxrb_ratio); - buckets.push (1000, block1 (), nano::Mxrb_ratio); - buckets.push (1100, block3 (), nano::Mxrb_ratio); - buckets.pop (); // blockzero - EXPECT_EQ (block1 (), buckets.top ()); - buckets.pop (); - EXPECT_EQ (block0 (), buckets.top ()); - buckets.pop (); - EXPECT_EQ (block3 (), buckets.top ()); - buckets.pop (); + buckets.bucket (0).push (1000, blockzero ()); + ASSERT_EQ (blockzero (), buckets.bucket (0).top ()); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); + buckets.bucket (nano::Mxrb_ratio).push (1000, block1 ()); + buckets.bucket (nano::Mxrb_ratio).push (1100, block3 ()); + buckets.bucket (0).pop (); // blockzero + EXPECT_EQ (block1 (), buckets.bucket (nano::Mxrb_ratio).top ()); + buckets.bucket (nano::Mxrb_ratio).pop (); + EXPECT_EQ (block0 (), buckets.bucket (nano::Gxrb_ratio).top ()); + buckets.bucket (nano::Gxrb_ratio).pop (); + EXPECT_EQ (block3 (), buckets.bucket (nano::Mxrb_ratio).top ()); + buckets.bucket (nano::Mxrb_ratio).pop (); EXPECT_TRUE (buckets.empty ()); } TEST (buckets, trim_normal) { nano::scheduler::buckets buckets{ 1 }; - buckets.push (1000, block0 (), nano::Gxrb_ratio); - buckets.push (1100, block2 (), nano::Gxrb_ratio); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); + buckets.bucket (nano::Gxrb_ratio).push (1100, block2 ()); ASSERT_EQ (1, buckets.size ()); - ASSERT_EQ (block0 (), buckets.top ()); + ASSERT_EQ (block0 (), buckets.bucket (nano::Gxrb_ratio).top ()); } TEST (buckets, trim_reverse) { nano::scheduler::buckets buckets{ 1 }; - buckets.push (1100, block2 (), nano::Gxrb_ratio); - buckets.push (1000, block0 (), nano::Gxrb_ratio); + buckets.bucket (nano::Gxrb_ratio).push (1100, block2 ()); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); ASSERT_EQ (1, buckets.size ()); - ASSERT_EQ (block0 (), buckets.top ()); + ASSERT_EQ (block0 (), buckets.bucket (nano::Gxrb_ratio).top ()); } TEST (buckets, trim_even) { nano::scheduler::buckets buckets{ 1 }; - buckets.push (1000, block0 (), nano::Gxrb_ratio); - buckets.push (1100, block2 (), nano::Gxrb_ratio); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); + buckets.bucket (nano::Gxrb_ratio).push (1100, block2 ()); ASSERT_EQ (1, buckets.size ()); - ASSERT_EQ (block0 (), buckets.top ()); - buckets.push (1000, block1 (), nano::Mxrb_ratio); + ASSERT_EQ (block0 (), buckets.bucket (nano::Gxrb_ratio).top ()); + buckets.bucket (nano::Mxrb_ratio).push (1000, block1 ()); ASSERT_EQ (2, buckets.size ()); - ASSERT_EQ (block0 (), buckets.top ()); - buckets.pop (); - ASSERT_EQ (block1 (), buckets.top ()); + ASSERT_EQ (block0 (), buckets.bucket (nano::Gxrb_ratio).top ()); + buckets.bucket (nano::Gxrb_ratio).pop (); + ASSERT_EQ (block1 (), buckets.bucket (nano::Mxrb_ratio).top ()); } diff --git a/nano/node/active_elections.cpp b/nano/node/active_elections.cpp index b0e3507306..fcd5052e5e 100644 --- a/nano/node/active_elections.cpp +++ b/nano/node/active_elections.cpp @@ -293,6 +293,7 @@ void nano::active_elections::cleanup_election (nano::unique_lock & node.stats.sample (nano::stat::sample::active_election_duration, { 0, 1000 * 60 * 10 /* 0-10 minutes range */ }, election->duration ().count ()); vacancy_update (); + election_stopped.notify (election); for (auto const & [hash, block] : blocks_l) { diff --git a/nano/node/active_elections.hpp b/nano/node/active_elections.hpp index 869912f5c8..56bc386cb8 100644 --- a/nano/node/active_elections.hpp +++ b/nano/node/active_elections.hpp @@ -133,6 +133,7 @@ class active_elections final */ int64_t vacancy (nano::election_behavior behavior) const; std::function vacancy_update{ [] () {} }; + nano::observer_set> election_stopped; std::size_t election_winner_details_size (); void add_election_winner_details (nano::block_hash const &, std::shared_ptr const &); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 58ec072dae..602f2a0199 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -258,9 +258,11 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy if (!init_error ()) { + active.election_stopped.add ([this] (std::shared_ptr election) { + scheduler.priority.election_stopped (election); + }); // Notify election schedulers when AEC frees election slot active.vacancy_update = [this] () { - scheduler.priority.notify (); scheduler.hinted.notify (); scheduler.optimistic.notify (); }; diff --git a/nano/node/scheduler/bucket.cpp b/nano/node/scheduler/bucket.cpp index 51ba1626a5..3cb61b10e7 100644 --- a/nano/node/scheduler/bucket.cpp +++ b/nano/node/scheduler/bucket.cpp @@ -35,10 +35,13 @@ void nano::scheduler::bucket::pop () void nano::scheduler::bucket::push (uint64_t time, std::shared_ptr block) { + // Place this item which can lie anywhere in the range ( queue.begin (), queue.end () ] queue.insert ({ time, block }); if (queue.size () > maximum) { + // Trim lowest priority transaction from block queue debug_assert (!queue.empty ()); + // Drop last item which is lowest priority queue.erase (--queue.end ()); } } diff --git a/nano/node/scheduler/bucket.hpp b/nano/node/scheduler/bucket.hpp index 2f32c17d59..3aac10bf2e 100644 --- a/nano/node/scheduler/bucket.hpp +++ b/nano/node/scheduler/bucket.hpp @@ -12,6 +12,9 @@ class block; namespace nano::scheduler { /** A class which holds an ordered set of blocks to be scheduled, ordered by their block arrival time + * Tracks two internal counters limited by 'maximum' + * 1) active - number of elections this bucket has been responsible for starting + * 2) queue - A std::set ordered from highest to lowest priority which drops the last, lowest priority item */ class bucket final { @@ -24,7 +27,6 @@ class bucket final bool operator== (value_type const & other_a) const; }; std::set queue; - size_t const maximum; public: bucket (size_t maximum); @@ -35,5 +37,9 @@ class bucket final size_t size () const; bool empty () const; void dump () const; + + // Tracks the number of active elections started by this bucket + size_t active{ 0 }; + size_t const maximum; }; } // namespace nano::scheduler diff --git a/nano/node/scheduler/buckets.cpp b/nano/node/scheduler/buckets.cpp index 4c238ac825..a4fedc135a 100644 --- a/nano/node/scheduler/buckets.cpp +++ b/nano/node/scheduler/buckets.cpp @@ -5,26 +5,6 @@ #include -/** Moves the bucket pointer to the next bucket */ -void nano::scheduler::buckets::next () -{ - ++current; - if (current == buckets_m.end ()) - { - current = buckets_m.begin (); - } -} - -/** Seek to the next non-empty bucket, if one exists */ -void nano::scheduler::buckets::seek () -{ - next (); - for (std::size_t i = 0, n = buckets_m.size (); current->second->empty () && i < n; ++i) - { - next (); - } -} - void nano::scheduler::buckets::setup_buckets (uint64_t maximum) { auto build_region = [&] (uint128_t const & begin, uint128_t const & end, size_t count) { @@ -53,7 +33,6 @@ void nano::scheduler::buckets::setup_buckets (uint64_t maximum) nano::scheduler::buckets::buckets (uint64_t maximum) { setup_buckets (maximum); - current = buckets_m.begin (); } nano::scheduler::buckets::~buckets () @@ -62,42 +41,12 @@ nano::scheduler::buckets::~buckets () auto nano::scheduler::buckets::bucket (nano::uint128_t const & balance) const -> scheduler::bucket & { - auto iter = buckets_m.upper_bound (balance); - --iter; // Iterator points to bucket after the target priority + auto iter = buckets_m.upper_bound (balance); // Iterator points to bucket after the target priority + --iter; debug_assert (iter != buckets_m.end ()); return *iter->second; } -/** - * Push a block and its associated time into the prioritization container. - * The time is given here because sideband might not exist in the case of state blocks. - */ -void nano::scheduler::buckets::push (uint64_t time, std::shared_ptr block, nano::amount const & priority) -{ - auto was_empty = empty (); - bucket (priority.number ()).push (time, block); - if (was_empty) - { - seek (); - } -} - -/** Return the highest priority block of the current bucket */ -std::shared_ptr nano::scheduler::buckets::top () const -{ - debug_assert (!empty ()); - auto result = current->second->top (); - return result; -} - -/** Pop the current block from the container and seek to the next block, if it exists */ -void nano::scheduler::buckets::pop () -{ - debug_assert (!empty ()); - current->second->pop (); - seek (); -} - /** Returns the total number of blocks in buckets */ std::size_t nano::scheduler::buckets::size () const { @@ -121,6 +70,21 @@ std::size_t nano::scheduler::buckets::bucket_size (nano::amount const & amount) return bucket (amount.number ()).size (); } +std::size_t nano::scheduler::buckets::active () const +{ + return std::accumulate (buckets_m.begin (), buckets_m.end (), 0, [] (auto const & total, auto const & item) { return total + item.second->active; }); +} + +auto nano::scheduler::buckets::next () -> scheduler::bucket * +{ + auto next = std::find_if (buckets_m.begin (), buckets_m.end (), [] (auto const & item) { return item.second->active < item.second->maximum && !item.second->empty (); }); + if (next == buckets_m.end ()) + { + return nullptr; + } + return next->second.get (); +} + /** Returns true if all buckets are empty */ bool nano::scheduler::buckets::empty () const { diff --git a/nano/node/scheduler/buckets.hpp b/nano/node/scheduler/buckets.hpp index c4c52d24f4..a7472d013f 100644 --- a/nano/node/scheduler/buckets.hpp +++ b/nano/node/scheduler/buckets.hpp @@ -29,22 +29,17 @@ class buckets final /** container for the buckets to be read in round robin fashion */ std::map> buckets_m; - /** index of bucket to read next */ - decltype (buckets_m)::const_iterator current; - - void next (); - void seek (); void setup_buckets (uint64_t maximum); public: buckets (uint64_t maximum = 128); ~buckets (); - void push (uint64_t time, std::shared_ptr block, nano::amount const & priority); - std::shared_ptr top () const; - void pop (); + std::size_t size () const; std::size_t bucket_count () const; std::size_t bucket_size (nano::amount const & amount) const; + std::size_t active () const; + scheduler::bucket * next (); bool empty () const; void dump () const; scheduler::bucket & bucket (nano::uint128_t const & balance) const; diff --git a/nano/node/scheduler/priority.cpp b/nano/node/scheduler/priority.cpp index bb0314930d..2f34ba8cf6 100644 --- a/nano/node/scheduler/priority.cpp +++ b/nano/node/scheduler/priority.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -23,6 +24,7 @@ nano::scheduler::priority::~priority () { // Thread must be stopped before destruction debug_assert (!thread.joinable ()); + debug_assert (tracking.size () == buckets->active ()); } void nano::scheduler::priority::start () @@ -74,9 +76,12 @@ bool nano::scheduler::priority::activate (secure::transaction const & transactio nano::log::arg{ "priority", balance_priority }); nano::lock_guard lock{ mutex }; - buckets->push (time_priority, block, balance_priority); - notify (); - + auto & bucket = buckets->bucket (balance_priority); + bucket.push (time_priority, block); + if (bucket.active < bucket.maximum) + { + notify (); + } return true; // Activated } @@ -102,47 +107,73 @@ bool nano::scheduler::priority::empty () const return empty_locked (); } -bool nano::scheduler::priority::predicate () const -{ - return active.vacancy (nano::election_behavior::priority) > 0 && !buckets->empty (); -} - void nano::scheduler::priority::run () { nano::unique_lock lock{ mutex }; while (!stopped) { - condition.wait (lock, [this] () { - return stopped || predicate (); - }); - debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds + condition.wait (lock); if (!stopped) { stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::loop); - if (predicate ()) + while (auto bucket = buckets->next ()) { - auto block = buckets->top (); - buckets->pop (); - lock.unlock (); - stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_priority); - auto result = active.insert (block); - if (result.inserted) + debug_assert (!bucket->empty () && bucket->active < bucket->maximum); + auto block = bucket->top (); + debug_assert (block != nullptr); + bucket->pop (); + if (tracking.find (block->qualified_root ()) != tracking.end ()) { - stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_priority_success); + continue; } - if (result.election != nullptr) + // Increment counter and start tracking for block's qualified root + // Start tracking before we actually attempt to start election since election cleanup happens asynchronously + ++bucket->active; + [[maybe_unused]] auto inserted = tracking.emplace (block->qualified_root (), bucket); + debug_assert (inserted.second); + nano::election_insertion_result result; { - result.election->transition_active (); + // Do slow operations outside lock + + lock.unlock (); + stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_priority); + result = active.insert (block); + if (result.inserted) + { + stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_priority_success); + } + if (result.election != nullptr) + { + result.election->transition_active (); + } + lock.lock (); + } + if (!result.election) + { + // No election exists or was created so clean up dangling tracking information + --bucket->active; + [[maybe_unused]] auto erased = tracking.erase (block->qualified_root ()); + debug_assert (erased == 1); } } - else - { - lock.unlock (); - } + } + } +} + +void nano::scheduler::priority::election_stopped (std::shared_ptr election) +{ + nano::lock_guard lock{ mutex }; + if (auto existing = tracking.find (election->qualified_root); existing != tracking.end ()) + { + auto & bucket = *existing->second; + --bucket.active; + if (!bucket.empty ()) + { notify (); - lock.lock (); } + // Clean up election stop event subscription + tracking.erase (existing); } } diff --git a/nano/node/scheduler/priority.hpp b/nano/node/scheduler/priority.hpp index 90923f79f9..ee5c7e23b0 100644 --- a/nano/node/scheduler/priority.hpp +++ b/nano/node/scheduler/priority.hpp @@ -9,12 +9,14 @@ #include #include #include +#include namespace nano { class active_elections; class block; class container_info_component; +class election; class ledger; class stats; } @@ -25,6 +27,7 @@ class transaction; namespace nano::scheduler { +class bucket; class priority_config { public: @@ -53,6 +56,7 @@ class priority final void notify (); std::size_t size () const; bool empty () const; + void election_stopped (std::shared_ptr election); std::unique_ptr collect_container_info (std::string const & name); @@ -66,9 +70,10 @@ class priority final private: void run (); bool empty_locked () const; - bool predicate () const; std::unique_ptr buckets; + // Bucket associated with a particular election + std::unordered_map tracking; bool stopped{ false }; nano::condition_variable condition;