diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index 993725f544..07f4c0e4e1 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -1177,8 +1177,6 @@ TEST (active_transactions, restart_dropped) node.block_processor.flush (); ASSERT_EQ (0, node.active.size ()); ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::election_restart)); - // Verify the block was not updated in the ledger - ASSERT_EQ (*node.store.block_get (node.store.tx_begin_read (), send->hash ()), *send); // Generate even higher difficulty work ASSERT_TRUE (node.work_generate_blocking (*send, send->difficulty () + 1).is_initialized ()); // Add voting @@ -1192,6 +1190,8 @@ TEST (active_transactions, restart_dropped) ASSERT_EQ (2, node.stats.count (nano::stat::type::election, nano::stat::detail::election_restart)); // Wait for the election to complete ASSERT_TIMELY (5s, node.ledger.cache.cemented_count == 2); + // Verify the block is eventually updated in the ledger + ASSERT_TIMELY (3s, node.store.block_get (node.store.tx_begin_read (), send->hash ())->block_work () == send->block_work ()); } // Ensures votes are tallied on election::publish even if no vote is inserted through inactive_votes_cache diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index f883326fe8..b868e3a97a 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -3033,13 +3033,9 @@ TEST (node, epoch_conflict_confirm) ASSERT_NE (nullptr, election); election->force_confirm (); ASSERT_TIMELY (3s, node1->block_confirmed (open->hash ())); - { - nano::block_post_events events; - auto transaction (node0->store.tx_begin_write ()); - ASSERT_EQ (nano::process_result::progress, node0->block_processor.process_one (transaction, events, send).code); - ASSERT_EQ (nano::process_result::progress, node0->block_processor.process_one (transaction, events, send2).code); - ASSERT_EQ (nano::process_result::progress, node0->block_processor.process_one (transaction, events, open).code); - } + ASSERT_EQ (nano::process_result::progress, node0->process (*send).code); + ASSERT_EQ (nano::process_result::progress, node0->process (*send2).code); + ASSERT_EQ (nano::process_result::progress, node0->process (*open).code); node0->process_active (change); node0->process_active (epoch_open); ASSERT_TIMELY (10s, node0->block (change->hash ()) && node0->block (epoch_open->hash ()) && node1->block (change->hash ()) && node1->block (epoch_open->hash ())); diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index b60380feea..dfc90520d7 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -995,7 +995,7 @@ bool nano::active_transactions::update_difficulty_impl (nano::active_transaction return error; } -bool nano::active_transactions::restart (std::shared_ptr const & block_a, nano::write_transaction const & transaction_a) +bool nano::active_transactions::restart (nano::transaction const & transaction_a, std::shared_ptr const & block_a) { // Only guaranteed to restart the election if the new block is received within 2 minutes of its election being dropped constexpr std::chrono::minutes recently_dropped_cutoff{ 2 }; @@ -1012,9 +1012,8 @@ bool nano::active_transactions::restart (std::shared_ptr const & bl // The existing block is re-written, not the arriving block, as that one might not have gone through a full signature check ledger_block->block_work_set (block_a->block_work ()); - auto block_count = node.ledger.cache.block_count.load (); - node.store.block_put (transaction_a, hash, *ledger_block); - debug_assert (node.ledger.cache.block_count.load () == block_count); + // Queue for writing in the block processor to avoid opening a new write transaction for a single operation + node.block_processor.update (ledger_block); // Restart election for the upgraded block, previously dropped from elections auto previous_balance = node.ledger.balance (transaction_a, ledger_block->previous ()); diff --git a/nano/node/active_transactions.hpp b/nano/node/active_transactions.hpp index a84a0adbe0..4b542b12a4 100644 --- a/nano/node/active_transactions.hpp +++ b/nano/node/active_transactions.hpp @@ -188,7 +188,7 @@ class active_transactions final // Returns false if the election difficulty was updated bool update_difficulty (nano::block const &); // Returns false if the election was restarted - bool restart (std::shared_ptr const &, nano::write_transaction const &); + bool restart (nano::transaction const &, std::shared_ptr const &); double normalized_multiplier (nano::block const &, boost::optional const & = boost::none) const; void update_active_multiplier (nano::unique_lock &); uint64_t active_difficulty (); diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 134e6a813b..a912542f41 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -10,11 +10,18 @@ std::chrono::milliseconds constexpr nano::block_processor::confirmation_request_delay; +nano::block_post_events::block_post_events (std::function && get_transaction_a) : +get_transaction (std::move (get_transaction_a)) +{ +} + nano::block_post_events::~block_post_events () { + debug_assert (get_transaction != nullptr); + auto transaction (get_transaction ()); for (auto const & i : events) { - i (); + i (transaction); } } @@ -126,6 +133,15 @@ void nano::block_processor::force (std::shared_ptr const & block_a) condition.notify_all (); } +void nano::block_processor::update (std::shared_ptr const & block_a) +{ + { + nano::lock_guard lock (mutex); + updates.push_back (block_a); + } + condition.notify_all (); +} + void nano::block_processor::wait_write () { nano::lock_guard lock (mutex); @@ -137,7 +153,7 @@ void nano::block_processor::process_blocks () nano::unique_lock lock (mutex); while (!stopped) { - if (!blocks.empty () || !forced.empty ()) + if (have_blocks_ready ()) { active = true; lock.unlock (); @@ -165,10 +181,16 @@ bool nano::block_processor::should_log () return result; } +bool nano::block_processor::have_blocks_ready () +{ + debug_assert (!mutex.try_lock ()); + return !blocks.empty () || !forced.empty () || !updates.empty (); +} + bool nano::block_processor::have_blocks () { debug_assert (!mutex.try_lock ()); - return !blocks.empty () || !forced.empty () || state_block_signature_verification.size () != 0; + return have_blocks_ready () || state_block_signature_verification.size () != 0; } void nano::block_processor::process_verified_state_blocks (std::deque & items, std::vector const & verifications, std::vector const & hashes, std::vector const & blocks_signatures) @@ -213,71 +235,89 @@ void nano::block_processor::process_verified_state_blocks (std::deque & lock_a) { auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch); - block_post_events post_events; + block_post_events post_events ([& store = node.store] { return store.tx_begin_read (); }); auto transaction (node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending, tables::unchecked }, { tables::confirmation_height })); nano::timer timer_l; lock_a.lock (); timer_l.start (); // Processing blocks - unsigned number_of_blocks_processed (0), number_of_forced_processed (0); - while ((!blocks.empty () || !forced.empty ()) && (timer_l.before_deadline (node.config.block_processor_batch_max_time) || (number_of_blocks_processed < node.flags.block_processor_batch_size)) && !awaiting_write && number_of_blocks_processed < node.store.max_block_write_batch_num ()) + unsigned number_of_blocks_processed (0), number_of_forced_processed (0), number_of_updates_processed (0); + auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); }; + auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; }; + auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; }; + while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !awaiting_write && !store_batch_reached ()) { - if ((blocks.size () + state_block_signature_verification.size () + forced.size () > 64) && should_log ()) + if ((blocks.size () + state_block_signature_verification.size () + forced.size () + updates.size () > 64) && should_log ()) { - node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced) in processing queue") % blocks.size () % state_block_signature_verification.size () % forced.size ())); + node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced, %4% updates) in processing queue") % blocks.size () % state_block_signature_verification.size () % forced.size () % updates.size ())); } - nano::unchecked_info info; - nano::block_hash hash (0); - bool force (false); - if (forced.empty ()) + if (!updates.empty ()) { - info = blocks.front (); - blocks.pop_front (); - hash = info.block->hash (); + auto block (updates.front ()); + updates.pop_front (); + lock_a.unlock (); + auto hash (block->hash ()); + if (node.store.block_exists (transaction, hash)) + { + node.store.block_put (transaction, hash, *block); + } + ++number_of_updates_processed; } else { - info = nano::unchecked_info (forced.front (), 0, nano::seconds_since_epoch (), nano::signature_verification::unknown); - forced.pop_front (); - hash = info.block->hash (); - force = true; - number_of_forced_processed++; - } - lock_a.unlock (); - if (force) - { - auto successor (node.ledger.successor (transaction, info.block->qualified_root ())); - if (successor != nullptr && successor->hash () != hash) + nano::unchecked_info info; + nano::block_hash hash (0); + bool force (false); + if (forced.empty ()) { - // Replace our block with the winner and roll back any dependent blocks - if (node.config.logging.ledger_rollback_logging ()) - { - node.logger.always_log (boost::str (boost::format ("Rolling back %1% and replacing with %2%") % successor->hash ().to_string () % hash.to_string ())); - } - std::vector> rollback_list; - if (node.ledger.rollback (transaction, successor->hash (), rollback_list)) - { - node.logger.always_log (nano::severity_level::error, boost::str (boost::format ("Failed to roll back %1% because it or a successor was confirmed") % successor->hash ().to_string ())); - } - else if (node.config.logging.ledger_rollback_logging ()) - { - node.logger.always_log (boost::str (boost::format ("%1% blocks rolled back") % rollback_list.size ())); - } - // Deleting from votes cache & wallet work watcher, stop active transaction - for (auto & i : rollback_list) + info = blocks.front (); + blocks.pop_front (); + hash = info.block->hash (); + } + else + { + info = nano::unchecked_info (forced.front (), 0, nano::seconds_since_epoch (), nano::signature_verification::unknown); + forced.pop_front (); + hash = info.block->hash (); + force = true; + number_of_forced_processed++; + } + lock_a.unlock (); + if (force) + { + auto successor (node.ledger.successor (transaction, info.block->qualified_root ())); + if (successor != nullptr && successor->hash () != hash) { - node.history.erase (i->root ()); - node.wallets.watcher->remove (*i); - // Stop all rolled back active transactions except initial - if (i->hash () != successor->hash ()) + // Replace our block with the winner and roll back any dependent blocks + if (node.config.logging.ledger_rollback_logging ()) { - node.active.erase (*i); + node.logger.always_log (boost::str (boost::format ("Rolling back %1% and replacing with %2%") % successor->hash ().to_string () % hash.to_string ())); + } + std::vector> rollback_list; + if (node.ledger.rollback (transaction, successor->hash (), rollback_list)) + { + node.logger.always_log (nano::severity_level::error, boost::str (boost::format ("Failed to roll back %1% because it or a successor was confirmed") % successor->hash ().to_string ())); + } + else if (node.config.logging.ledger_rollback_logging ()) + { + node.logger.always_log (boost::str (boost::format ("%1% blocks rolled back") % rollback_list.size ())); + } + // Deleting from votes cache & wallet work watcher, stop active transaction + for (auto & i : rollback_list) + { + node.history.erase (i->root ()); + node.wallets.watcher->remove (*i); + // Stop all rolled back active transactions except initial + if (i->hash () != successor->hash ()) + { + node.active.erase (*i); + } } } } + number_of_blocks_processed++; + process_one (transaction, post_events, info); } - number_of_blocks_processed++; - process_one (transaction, post_events, info); lock_a.lock (); } awaiting_write = false; @@ -289,7 +329,7 @@ void nano::block_processor::process_batch (nano::unique_lock & lock_ } } -void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr const & block_a, nano::process_return const & process_return_a, const bool watch_work_a, nano::block_origin const origin_a) +void nano::block_processor::process_live (nano::transaction const & transaction_a, nano::block_hash const & hash_a, std::shared_ptr const & block_a, nano::process_return const & process_return_a, const bool watch_work_a, nano::block_origin const origin_a) { // Add to work watcher to prevent dropping the election if (watch_work_a) @@ -298,7 +338,7 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std:: } // Start collecting quorum on block - if (watch_work_a || node.ledger.dependents_confirmed (node.store.tx_begin_read (), *block_a)) + if (watch_work_a || node.ledger.dependents_confirmed (transaction_a, *block_a)) { node.active.insert (block_a, process_return_a.previous_balance.number ()); } @@ -342,7 +382,7 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction } if (info_a.modified > nano::seconds_since_epoch () - 300 && node.block_arrival.recent (hash)) { - events_a.events.emplace_back ([this, hash, block, result, watch_work_a, origin_a]() { process_live (hash, block, result, watch_work_a, origin_a); }); + events_a.events.emplace_back ([this, hash, block = info_a.block, result, watch_work_a, origin_a](nano::transaction const & post_event_transaction_a) { process_live (post_event_transaction_a, hash, block, result, watch_work_a, origin_a); }); } queue_unchecked (transaction_a, hash); break; @@ -361,7 +401,9 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction nano::unchecked_key unchecked_key (block->previous (), hash); node.store.unchecked_put (transaction_a, unchecked_key, info_a); - node.gap_cache.add (hash); + + events_a.events.emplace_back ([this, hash](nano::transaction const & /* unused */) { this->node.gap_cache.add (hash); }); + node.stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_previous); break; } @@ -379,7 +421,9 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction nano::unchecked_key unchecked_key (node.ledger.block_source (transaction_a, *(block)), hash); node.store.unchecked_put (transaction_a, unchecked_key, info_a); - node.gap_cache.add (hash); + + events_a.events.emplace_back ([this, hash](nano::transaction const & /* unused */) { this->node.gap_cache.add (hash); }); + node.stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source); break; } @@ -389,7 +433,7 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction { node.logger.try_log (boost::str (boost::format ("Old for: %1%") % hash.to_string ())); } - process_old (transaction_a, block, origin_a); + events_a.events.emplace_back ([this, block = info_a.block, origin_a](nano::transaction const & post_event_transaction_a) { process_old (post_event_transaction_a, block, origin_a); }); node.stats.inc (nano::stat::type::ledger, nano::stat::detail::old); break; } @@ -399,7 +443,7 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction { node.logger.try_log (boost::str (boost::format ("Bad signature for: %1%") % hash.to_string ())); } - requeue_invalid (hash, info_a); + events_a.events.emplace_back ([this, hash, info_a](nano::transaction const & /* unused */) { requeue_invalid (hash, info_a); }); break; } case nano::process_result::negative_spend: @@ -420,7 +464,7 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction } case nano::process_result::fork: { - node.process_fork (transaction_a, block, info_a.modified); + events_a.events.emplace_back ([this, block = info_a.block, modified = info_a.modified](nano::transaction const & post_event_transaction_a) { this->node.process_fork (post_event_transaction_a, block, modified); }); node.stats.inc (nano::stat::type::ledger, nano::stat::detail::fork); if (node.config.logging.ledger_logging ()) { @@ -476,10 +520,10 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction return result; } -void nano::block_processor::process_old (nano::write_transaction const & transaction_a, std::shared_ptr const & block_a, nano::block_origin const origin_a) +void nano::block_processor::process_old (nano::transaction const & transaction_a, std::shared_ptr const & block_a, nano::block_origin const origin_a) { // First try to update election difficulty, then attempt to restart an election - if (!node.active.update_difficulty (*block_a) || !node.active.restart (block_a, transaction_a)) + if (!node.active.update_difficulty (*block_a) || !node.active.restart (transaction_a, block_a)) { // Let others know about the difficulty update if (origin_a == nano::block_origin::local) diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index b769698161..1956e6fb50 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -17,6 +17,7 @@ namespace nano { class node; +class read_transaction; class transaction; class write_transaction; class write_database_queue; @@ -30,8 +31,12 @@ enum class block_origin class block_post_events final { public: + explicit block_post_events (std::function &&); ~block_post_events (); - std::deque> events; + std::deque> events; + +private: + std::function get_transaction; }; /** @@ -51,8 +56,10 @@ class block_processor final void add (nano::unchecked_info const &, const bool = false); void add (std::shared_ptr const &, uint64_t = 0); void force (std::shared_ptr const &); + void update (std::shared_ptr const &); void wait_write (); bool should_log (); + bool have_blocks_ready (); bool have_blocks (); void process_blocks (); nano::process_return process_one (nano::write_transaction const &, block_post_events &, nano::unchecked_info, const bool = false, nano::block_origin const = nano::block_origin::remote); @@ -64,8 +71,8 @@ class block_processor final private: void queue_unchecked (nano::write_transaction const &, nano::block_hash const &); void process_batch (nano::unique_lock &); - void process_live (nano::block_hash const &, std::shared_ptr const &, nano::process_return const &, const bool = false, nano::block_origin const = nano::block_origin::remote); - void process_old (nano::write_transaction const &, std::shared_ptr const &, nano::block_origin const); + void process_live (nano::transaction const &, nano::block_hash const &, std::shared_ptr const &, nano::process_return const &, const bool = false, nano::block_origin const = nano::block_origin::remote); + void process_old (nano::transaction const &, std::shared_ptr const &, nano::block_origin const); void requeue_invalid (nano::block_hash const &, nano::unchecked_info const &); void process_verified_state_blocks (std::deque &, std::vector const &, std::vector const &, std::vector const &); bool stopped{ false }; @@ -74,6 +81,7 @@ class block_processor final std::chrono::steady_clock::time_point next_log; std::deque blocks; std::deque> forced; + std::deque> updates; nano::condition_variable condition; nano::node & node; nano::write_database_queue & write_database_queue; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index bee4c30569..9b4e589039 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -534,7 +534,7 @@ bool nano::node::copy_with_compaction (boost::filesystem::path const & destinati return store.copy_db (destination); } -void nano::node::process_fork (nano::transaction const & transaction_a, std::shared_ptr const & block_a, uint64_t modified_a) +void nano::node::process_fork (nano::transaction const & transaction_a, std::shared_ptr const & block_a, uint64_t const modified_a) { auto root (block_a->root ()); if (!store.block_exists (transaction_a, block_a->hash ()) && store.root_exists (transaction_a, block_a->root ())) @@ -630,9 +630,9 @@ nano::process_return nano::node::process_local (std::shared_ptr con // Notify block processor to release write lock block_processor.wait_write (); // Process block - block_post_events events; + block_post_events post_events ([& store = store] { return store.tx_begin_read (); }); auto transaction (store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending }, { tables::confirmation_height })); - return block_processor.process_one (transaction, events, info, work_watcher_a, nano::block_origin::local); + return block_processor.process_one (transaction, post_events, info, work_watcher_a, nano::block_origin::local); } void nano::node::start ()