Skip to content

Commit

Permalink
Changing how forced blocks are handled so it doesn't require a separa…
Browse files Browse the repository at this point in the history
…te logic path or tracking a separate container.
  • Loading branch information
clemahieu committed Feb 6, 2023
1 parent a0b858d commit 214fed2
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 64 deletions.
4 changes: 4 additions & 0 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3696,6 +3696,10 @@ TEST (node, rollback_gap_source)
.link (send2->hash ())
.sign (key.prv, key.pub)
.build_shared ();
std::cerr << boost::str (boost::format ("send1: %1%\n") % send1->hash ().to_string ());
std::cerr << boost::str (boost::format ("fork: %1%\n") % fork->hash ().to_string ());
std::cerr << boost::str (boost::format ("send2: %1%\n") % send2->hash ().to_string ());
std::cerr << boost::str (boost::format ("open: %1%\n") % open->hash ().to_string ());
ASSERT_EQ (nano::process_result::progress, node.process (*send1).code);
ASSERT_EQ (nano::process_result::progress, node.process (*fork).code);
// Node has fork & doesn't have source for correct block open (send2)
Expand Down
109 changes: 48 additions & 61 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ void nano::block_processor::flush ()
std::size_t nano::block_processor::size ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
return (blocks.size () + state_block_signature_verification.size () + forced.size ());
return (blocks.size () + state_block_signature_verification.size ());
}

bool nano::block_processor::full ()
Expand All @@ -203,13 +203,45 @@ void nano::block_processor::add (std::shared_ptr<nano::block> const & block_a)
add (item);
}

void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)
void nano::block_processor::rollback_competitor (nano::block const & block)
{
auto hash = block.hash ();
auto transaction = node.store.tx_begin_write ();
auto successor (node.ledger.successor (transaction, block.qualified_root ()));
if (successor != nullptr && successor->hash () != hash)
{
nano::lock_guard<nano::mutex> lock{ mutex };
forced.push_back (block_a);
// Replace our block with the winner and roll back any dependent blocks
if (node.config.logging.ledger_rollback_logging ())
{
node.logger.always_log (boost::str (boost::format ("Rolling back %1% and replacing with %2%") % successor->hash ().to_string () % hash.to_string ()));
}
std::vector<std::shared_ptr<nano::block>> rollback_list;
if (node.ledger.rollback (transaction, successor->hash (), rollback_list))
{
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback_failed);
node.logger.always_log (nano::severity_level::error, boost::str (boost::format ("Failed to roll back %1% because it or a successor was confirmed") % successor->hash ().to_string ()));
}
else if (node.config.logging.ledger_rollback_logging ())
{
node.logger.always_log (boost::str (boost::format ("%1% blocks rolled back") % rollback_list.size ()));
}
// Deleting from votes cache, stop active transaction
for (auto & i : rollback_list)
{
node.history.erase (i->root ());
// Stop all rolled back active transactions except initial
if (i->hash () != successor->hash ())
{
node.active.erase (*i);
}
}
}
condition.notify_all ();
}

void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)
{
rollback_competitor (*block_a);
add (block_a);
}

void nano::block_processor::wait_write ()
Expand Down Expand Up @@ -254,7 +286,7 @@ bool nano::block_processor::should_log ()
bool nano::block_processor::have_blocks_ready ()
{
debug_assert (!mutex.try_lock ());
return !blocks.empty () || !forced.empty ();
return !blocks.empty ();
}

bool nano::block_processor::have_blocks ()
Expand Down Expand Up @@ -298,69 +330,25 @@ void nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
lock_a.lock ();
timer_l.start ();
// Processing blocks
unsigned number_of_blocks_processed (0), number_of_forced_processed (0);
unsigned number_of_blocks_processed{ 0 };
auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); };
auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; };
auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; };
while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !awaiting_write && !store_batch_reached ())
{
if ((blocks.size () + state_block_signature_verification.size () + forced.size () > 64) && should_log ())
if ((blocks.size () + state_block_signature_verification.size () > 64) && should_log ())
{
node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced) in processing queue") % blocks.size () % state_block_signature_verification.size () % forced.size ()));
node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) in processing queue") % blocks.size () % state_block_signature_verification.size ()));
}
value_type info;
auto & block = info.block;
nano::block_hash hash (0);
bool force (false);
if (forced.empty ())
{
info = blocks.front ();
blocks.pop_front ();
hash = block->hash ();
}
else
{
info = { forced.front () };
forced.pop_front ();
hash = block->hash ();
force = true;
number_of_forced_processed++;
}
info = blocks.front ();
blocks.pop_front ();
hash = block->hash ();
lock_a.unlock ();
if (force)
{
auto successor (node.ledger.successor (transaction, block->qualified_root ()));
if (successor != nullptr && successor->hash () != hash)
{
// Replace our block with the winner and roll back any dependent blocks
if (node.config.logging.ledger_rollback_logging ())
{
node.logger.always_log (boost::str (boost::format ("Rolling back %1% and replacing with %2%") % successor->hash ().to_string () % hash.to_string ()));
}
std::vector<std::shared_ptr<nano::block>> rollback_list;
if (node.ledger.rollback (transaction, successor->hash (), rollback_list))
{
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback_failed);
node.logger.always_log (nano::severity_level::error, boost::str (boost::format ("Failed to roll back %1% because it or a successor was confirmed") % successor->hash ().to_string ()));
}
else if (node.config.logging.ledger_rollback_logging ())
{
node.logger.always_log (boost::str (boost::format ("%1% blocks rolled back") % rollback_list.size ()));
}
// Deleting from votes cache, stop active transaction
for (auto & i : rollback_list)
{
node.history.erase (i->root ());
// Stop all rolled back active transactions except initial
if (i->hash () != successor->hash ())
{
node.active.erase (*i);
}
}
}
}
number_of_blocks_processed++;
auto result = process_one (transaction, post_events, info, force);
auto result = process_one (transaction, post_events, info);
switch (result.code)
{
case nano::process_result::progress:
Expand All @@ -377,7 +365,7 @@ void nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock

if (node.config.logging.timing_logging () && number_of_blocks_processed != 0 && timer_l.stop () > std::chrono::milliseconds (100))
{
node.logger.always_log (boost::str (boost::format ("Processed %1% blocks (%2% blocks were forced) in %3% %4%") % number_of_blocks_processed % number_of_forced_processed % timer_l.value ().count () % timer_l.unit ()));
node.logger.always_log (boost::str (boost::format ("Processed %1% blocks in %3% %4%") % number_of_blocks_processed % timer_l.value ().count () % timer_l.unit ()));
}
}

Expand Down Expand Up @@ -409,7 +397,7 @@ void nano::block_processor::process_live (nano::transaction const & transaction_
}
}

nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, block_post_events & events_a, value_type const & info_a, bool const forced_a, nano::block_origin const origin_a)
nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, block_post_events & events_a, value_type const & info_a, nano::block_origin const origin_a)
{
nano::process_return result;
auto block (info_a.block);
Expand Down Expand Up @@ -605,12 +593,11 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (bl
{
nano::lock_guard<nano::mutex> guard{ block_processor.mutex };
blocks_count = block_processor.blocks.size ();
forced_count = block_processor.forced.size ();
}

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (collect_container_info (block_processor.state_block_signature_verification, "state_block_signature_verification"));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (block_processor.blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) }));

return composite;
}
4 changes: 2 additions & 2 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ class block_processor final
bool half_full ();
void add (value_type & item);
void add (std::shared_ptr<nano::block> const &);
void rollback_competitor (nano::block const & block);
void force (std::shared_ptr<nano::block> const &);
void wait_write ();
bool should_log ();
bool have_blocks_ready ();
bool have_blocks ();
void process_blocks ();
nano::process_return process_one (nano::write_transaction const &, block_post_events &, value_type const & item, bool const = false, nano::block_origin const = nano::block_origin::remote);
nano::process_return process_one (nano::write_transaction const &, block_post_events &, value_type const & item, nano::block_origin const = nano::block_origin::remote);
nano::process_return process_one (nano::write_transaction const &, block_post_events &, std::shared_ptr<nano::block> const &);
std::atomic<bool> flushing{ false };
// Delay required for average network propagartion before requesting confirmation
Expand All @@ -84,7 +85,6 @@ class block_processor final
bool awaiting_write{ false };
std::chrono::steady_clock::time_point next_log;
std::deque<value_type> blocks;
std::deque<std::shared_ptr<nano::block>> forced;
nano::condition_variable condition;
nano::node & node;
nano::write_database_queue & write_database_queue;
Expand Down
2 changes: 1 addition & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ nano::process_return nano::node::process_local (std::shared_ptr<nano::block> con
// Process block
block_post_events post_events ([&store = store] { return store.tx_begin_read (); });
auto const transaction (store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending }));
return block_processor.process_one (transaction, post_events, { block_a }, false, nano::block_origin::local);
return block_processor.process_one (transaction, post_events, { block_a }, nano::block_origin::local);
}

void nano::node::start ()
Expand Down

0 comments on commit 214fed2

Please sign in to comment.