diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 434b81ffee..06ace006b5 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -3696,6 +3696,10 @@ TEST (node, rollback_gap_source) .link (send2->hash ()) .sign (key.prv, key.pub) .build_shared (); + std::cerr << boost::str (boost::format ("send1: %1%\n") % send1->hash ().to_string ()); + std::cerr << boost::str (boost::format ("fork: %1%\n") % fork->hash ().to_string ()); + std::cerr << boost::str (boost::format ("send2: %1%\n") % send2->hash ().to_string ()); + std::cerr << boost::str (boost::format ("open: %1%\n") % open->hash ().to_string ()); ASSERT_EQ (nano::process_result::progress, node.process (*send1).code); ASSERT_EQ (nano::process_result::progress, node.process (*fork).code); // Node has fork & doesn't have source for correct block open (send2) diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 26abac0e0b..56df0647db 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -178,7 +178,7 @@ void nano::block_processor::flush () std::size_t nano::block_processor::size () { nano::unique_lock lock{ mutex }; - return (blocks.size () + state_block_signature_verification.size () + forced.size ()); + return (blocks.size () + state_block_signature_verification.size ()); } bool nano::block_processor::full () @@ -203,13 +203,45 @@ void nano::block_processor::add (std::shared_ptr const & block_a) add (item); } -void nano::block_processor::force (std::shared_ptr const & block_a) +void nano::block_processor::rollback_competitor (nano::block const & block) { + auto hash = block.hash (); + auto transaction = node.store.tx_begin_write (); + auto successor (node.ledger.successor (transaction, block.qualified_root ())); + if (successor != nullptr && successor->hash () != hash) { - nano::lock_guard lock{ mutex }; - forced.push_back (block_a); + // Replace our block with the winner and roll back any dependent blocks + if (node.config.logging.ledger_rollback_logging ()) + { + node.logger.always_log (boost::str (boost::format ("Rolling back %1% and replacing with %2%") % successor->hash ().to_string () % hash.to_string ())); + } + std::vector> rollback_list; + if (node.ledger.rollback (transaction, successor->hash (), rollback_list)) + { + node.stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback_failed); + node.logger.always_log (nano::severity_level::error, boost::str (boost::format ("Failed to roll back %1% because it or a successor was confirmed") % successor->hash ().to_string ())); + } + else if (node.config.logging.ledger_rollback_logging ()) + { + node.logger.always_log (boost::str (boost::format ("%1% blocks rolled back") % rollback_list.size ())); + } + // Deleting from votes cache, stop active transaction + for (auto & i : rollback_list) + { + node.history.erase (i->root ()); + // Stop all rolled back active transactions except initial + if (i->hash () != successor->hash ()) + { + node.active.erase (*i); + } + } } - condition.notify_all (); +} + +void nano::block_processor::force (std::shared_ptr const & block_a) +{ + rollback_competitor (*block_a); + add (block_a); } void nano::block_processor::wait_write () @@ -254,7 +286,7 @@ bool nano::block_processor::should_log () bool nano::block_processor::have_blocks_ready () { debug_assert (!mutex.try_lock ()); - return !blocks.empty () || !forced.empty (); + return !blocks.empty (); } bool nano::block_processor::have_blocks () @@ -298,69 +330,25 @@ void nano::block_processor::process_batch (nano::unique_lock & lock lock_a.lock (); timer_l.start (); // Processing blocks - unsigned number_of_blocks_processed (0), number_of_forced_processed (0); + unsigned number_of_blocks_processed{ 0 }; auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); }; auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; }; auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; }; while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !awaiting_write && !store_batch_reached ()) { - if ((blocks.size () + state_block_signature_verification.size () + forced.size () > 64) && should_log ()) + if ((blocks.size () + state_block_signature_verification.size () > 64) && should_log ()) { - node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced) in processing queue") % blocks.size () % state_block_signature_verification.size () % forced.size ())); + node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) in processing queue") % blocks.size () % state_block_signature_verification.size ())); } value_type info; auto & block = info.block; nano::block_hash hash (0); - bool force (false); - if (forced.empty ()) - { - info = blocks.front (); - blocks.pop_front (); - hash = block->hash (); - } - else - { - info = { forced.front () }; - forced.pop_front (); - hash = block->hash (); - force = true; - number_of_forced_processed++; - } + info = blocks.front (); + blocks.pop_front (); + hash = block->hash (); lock_a.unlock (); - if (force) - { - auto successor (node.ledger.successor (transaction, block->qualified_root ())); - if (successor != nullptr && successor->hash () != hash) - { - // Replace our block with the winner and roll back any dependent blocks - if (node.config.logging.ledger_rollback_logging ()) - { - node.logger.always_log (boost::str (boost::format ("Rolling back %1% and replacing with %2%") % successor->hash ().to_string () % hash.to_string ())); - } - std::vector> rollback_list; - if (node.ledger.rollback (transaction, successor->hash (), rollback_list)) - { - node.stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback_failed); - node.logger.always_log (nano::severity_level::error, boost::str (boost::format ("Failed to roll back %1% because it or a successor was confirmed") % successor->hash ().to_string ())); - } - else if (node.config.logging.ledger_rollback_logging ()) - { - node.logger.always_log (boost::str (boost::format ("%1% blocks rolled back") % rollback_list.size ())); - } - // Deleting from votes cache, stop active transaction - for (auto & i : rollback_list) - { - node.history.erase (i->root ()); - // Stop all rolled back active transactions except initial - if (i->hash () != successor->hash ()) - { - node.active.erase (*i); - } - } - } - } number_of_blocks_processed++; - auto result = process_one (transaction, post_events, info, force); + auto result = process_one (transaction, post_events, info); switch (result.code) { case nano::process_result::progress: @@ -377,7 +365,7 @@ void nano::block_processor::process_batch (nano::unique_lock & lock if (node.config.logging.timing_logging () && number_of_blocks_processed != 0 && timer_l.stop () > std::chrono::milliseconds (100)) { - node.logger.always_log (boost::str (boost::format ("Processed %1% blocks (%2% blocks were forced) in %3% %4%") % number_of_blocks_processed % number_of_forced_processed % timer_l.value ().count () % timer_l.unit ())); + node.logger.always_log (boost::str (boost::format ("Processed %1% blocks in %3% %4%") % number_of_blocks_processed % timer_l.value ().count () % timer_l.unit ())); } } @@ -409,7 +397,7 @@ void nano::block_processor::process_live (nano::transaction const & transaction_ } } -nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, block_post_events & events_a, value_type const & info_a, bool const forced_a, nano::block_origin const origin_a) +nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, block_post_events & events_a, value_type const & info_a, nano::block_origin const origin_a) { nano::process_return result; auto block (info_a.block); @@ -605,12 +593,11 @@ std::unique_ptr nano::collect_container_info (bl { nano::lock_guard guard{ block_processor.mutex }; blocks_count = block_processor.blocks.size (); - forced_count = block_processor.forced.size (); } auto composite = std::make_unique (name); composite->add_component (collect_container_info (block_processor.state_block_signature_verification, "state_block_signature_verification")); composite->add_component (std::make_unique (container_info{ "blocks", blocks_count, sizeof (decltype (block_processor.blocks)::value_type) })); - composite->add_component (std::make_unique (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) })); + return composite; } diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 2295815efc..f2cbcc8db5 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -59,13 +59,14 @@ class block_processor final bool half_full (); void add (value_type & item); void add (std::shared_ptr const &); + void rollback_competitor (nano::block const & block); void force (std::shared_ptr const &); void wait_write (); bool should_log (); bool have_blocks_ready (); bool have_blocks (); void process_blocks (); - nano::process_return process_one (nano::write_transaction const &, block_post_events &, value_type const & item, bool const = false, nano::block_origin const = nano::block_origin::remote); + nano::process_return process_one (nano::write_transaction const &, block_post_events &, value_type const & item, nano::block_origin const = nano::block_origin::remote); nano::process_return process_one (nano::write_transaction const &, block_post_events &, std::shared_ptr const &); std::atomic flushing{ false }; // Delay required for average network propagartion before requesting confirmation @@ -84,7 +85,6 @@ class block_processor final bool awaiting_write{ false }; std::chrono::steady_clock::time_point next_log; std::deque blocks; - std::deque> forced; nano::condition_variable condition; nano::node & node; nano::write_database_queue & write_database_queue; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index a174081629..1ac240327f 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -610,7 +610,7 @@ nano::process_return nano::node::process_local (std::shared_ptr con // Process block block_post_events post_events ([&store = store] { return store.tx_begin_read (); }); auto const transaction (store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending })); - return block_processor.process_one (transaction, post_events, { block_a }, false, nano::block_origin::local); + return block_processor.process_one (transaction, post_events, { block_a }, nano::block_origin::local); } void nano::node::start ()