Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using post-processing events for all operations #2820

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
7adcbe0
Use const ref in blockprocessor methods
guilhermelawless Jun 17, 2020
ad818da
Queue to blockprocessor on block difficulty update via restart
guilhermelawless Jun 17, 2020
03029f7
Use a single read transaction for all events
guilhermelawless Jun 17, 2020
98d2fa2
Post-event for process_old
guilhermelawless Jun 17, 2020
56a5832
Post events for process gap, fork, bad_signature
guilhermelawless Jun 17, 2020
00d31af
Use const ref for shared_ptr in more locations
guilhermelawless Jun 17, 2020
c1df5ea
Unshadow transaction_a from event lambdas (Serg review)
guilhermelawless Jun 29, 2020
5aa0952
Merge remote-tracking branch 'origin/develop' into blockprocessor/pos…
guilhermelawless Jul 8, 2020
a2fcba8
Simplify block processor loop condition (Wez review)
guilhermelawless Jul 16, 2020
308f11c
Move ledger condition check test to the correct location
guilhermelawless Jul 16, 2020
0bb7e86
Merge remote-tracking branch 'upstream/develop' into blockprocessor/p…
guilhermelawless Jul 27, 2020
0f55da5
Formatting
guilhermelawless Jul 27, 2020
36e35fb
Merge remote-tracking branch 'upstream/develop' into blockprocessor/p…
guilhermelawless Aug 12, 2020
9c14802
Merge remote-tracking branch 'upstream/develop' into blockprocessor/p…
guilhermelawless Aug 12, 2020
0c471a4
Merge remote-tracking branch 'upstream/develop' into blockprocessor/p…
guilhermelawless Sep 4, 2020
5cbea41
Merge remote-tracking branch 'upstream/develop' into blockprocessor/p…
guilhermelawless Sep 11, 2020
566936d
Merge remote-tracking branch 'upstream/develop' into blockprocessor/p…
guilhermelawless Sep 22, 2020
b91f072
Merge remote-tracking branch 'origin/develop' into blockprocessor/pos…
guilhermelawless Oct 9, 2020
211b031
Merge remote-tracking branch 'upstream/develop' into blockprocessor/p…
SergiySW Oct 13, 2020
d387c6f
Merge branch 'develop' into blockprocessor/post-events-full
clemahieu Oct 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1177,8 +1177,6 @@ TEST (active_transactions, restart_dropped)
node.block_processor.flush ();
ASSERT_EQ (0, node.active.size ());
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::election_restart));
// Verify the block was not updated in the ledger
ASSERT_EQ (*node.store.block_get (node.store.tx_begin_read (), send->hash ()), *send);
// Generate even higher difficulty work
ASSERT_TRUE (node.work_generate_blocking (*send, send->difficulty () + 1).is_initialized ());
// Add voting
Expand All @@ -1192,6 +1190,8 @@ TEST (active_transactions, restart_dropped)
ASSERT_EQ (2, node.stats.count (nano::stat::type::election, nano::stat::detail::election_restart));
// Wait for the election to complete
ASSERT_TIMELY (5s, node.ledger.cache.cemented_count == 2);
// Verify the block is eventually updated in the ledger
ASSERT_TIMELY (3s, node.store.block_get (node.store.tx_begin_read (), send->hash ())->block_work () == send->block_work ());
}

// Ensures votes are tallied on election::publish even if no vote is inserted through inactive_votes_cache
Expand Down
10 changes: 3 additions & 7 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3033,13 +3033,9 @@ TEST (node, epoch_conflict_confirm)
ASSERT_NE (nullptr, election);
election->force_confirm ();
ASSERT_TIMELY (3s, node1->block_confirmed (open->hash ()));
{
nano::block_post_events events;
auto transaction (node0->store.tx_begin_write ());
ASSERT_EQ (nano::process_result::progress, node0->block_processor.process_one (transaction, events, send).code);
ASSERT_EQ (nano::process_result::progress, node0->block_processor.process_one (transaction, events, send2).code);
ASSERT_EQ (nano::process_result::progress, node0->block_processor.process_one (transaction, events, open).code);
}
ASSERT_EQ (nano::process_result::progress, node0->process (*send).code);
ASSERT_EQ (nano::process_result::progress, node0->process (*send2).code);
ASSERT_EQ (nano::process_result::progress, node0->process (*open).code);
node0->process_active (change);
node0->process_active (epoch_open);
ASSERT_TIMELY (10s, node0->block (change->hash ()) && node0->block (epoch_open->hash ()) && node1->block (change->hash ()) && node1->block (epoch_open->hash ()));
Expand Down
7 changes: 3 additions & 4 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ bool nano::active_transactions::update_difficulty_impl (nano::active_transaction
return error;
}

bool nano::active_transactions::restart (std::shared_ptr<nano::block> const & block_a, nano::write_transaction const & transaction_a)
bool nano::active_transactions::restart (nano::transaction const & transaction_a, std::shared_ptr<nano::block> const & block_a)
{
// Only guaranteed to restart the election if the new block is received within 2 minutes of its election being dropped
constexpr std::chrono::minutes recently_dropped_cutoff{ 2 };
Expand All @@ -1012,9 +1012,8 @@ bool nano::active_transactions::restart (std::shared_ptr<nano::block> const & bl
// The existing block is re-written, not the arriving block, as that one might not have gone through a full signature check
ledger_block->block_work_set (block_a->block_work ());

auto block_count = node.ledger.cache.block_count.load ();
node.store.block_put (transaction_a, hash, *ledger_block);
debug_assert (node.ledger.cache.block_count.load () == block_count);
// Queue for writing in the block processor to avoid opening a new write transaction for a single operation
node.block_processor.update (ledger_block);

// Restart election for the upgraded block, previously dropped from elections
auto previous_balance = node.ledger.balance (transaction_a, ledger_block->previous ());
Expand Down
2 changes: 1 addition & 1 deletion nano/node/active_transactions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class active_transactions final
// Returns false if the election difficulty was updated
bool update_difficulty (nano::block const &);
// Returns false if the election was restarted
bool restart (std::shared_ptr<nano::block> const &, nano::write_transaction const &);
bool restart (nano::transaction const &, std::shared_ptr<nano::block> const &);
double normalized_multiplier (nano::block const &, boost::optional<roots_iterator> const & = boost::none) const;
void update_active_multiplier (nano::unique_lock<std::mutex> &);
uint64_t active_difficulty ();
Expand Down
162 changes: 103 additions & 59 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,18 @@

std::chrono::milliseconds constexpr nano::block_processor::confirmation_request_delay;

nano::block_post_events::block_post_events (std::function<nano::read_transaction ()> && get_transaction_a) :
get_transaction (std::move (get_transaction_a))
{
}

nano::block_post_events::~block_post_events ()
{
debug_assert (get_transaction != nullptr);
auto transaction (get_transaction ());
for (auto const & i : events)
{
i ();
i (transaction);
}
}

Expand Down Expand Up @@ -126,6 +133,15 @@ void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)
condition.notify_all ();
}

void nano::block_processor::update (std::shared_ptr<nano::block> const & block_a)
{
{
nano::lock_guard<std::mutex> lock (mutex);
updates.push_back (block_a);
}
condition.notify_all ();
}

void nano::block_processor::wait_write ()
{
nano::lock_guard<std::mutex> lock (mutex);
Expand All @@ -137,7 +153,7 @@ void nano::block_processor::process_blocks ()
nano::unique_lock<std::mutex> lock (mutex);
while (!stopped)
{
if (!blocks.empty () || !forced.empty ())
if (have_blocks_ready ())
{
active = true;
lock.unlock ();
Expand Down Expand Up @@ -165,10 +181,16 @@ bool nano::block_processor::should_log ()
return result;
}

bool nano::block_processor::have_blocks_ready ()
{
debug_assert (!mutex.try_lock ());
return !blocks.empty () || !forced.empty () || !updates.empty ();
}

bool nano::block_processor::have_blocks ()
{
debug_assert (!mutex.try_lock ());
return !blocks.empty () || !forced.empty () || state_block_signature_verification.size () != 0;
return have_blocks_ready () || state_block_signature_verification.size () != 0;
}

void nano::block_processor::process_verified_state_blocks (std::deque<nano::unchecked_info> & items, std::vector<int> const & verifications, std::vector<nano::block_hash> const & hashes, std::vector<nano::signature> const & blocks_signatures)
Expand Down Expand Up @@ -213,71 +235,89 @@ void nano::block_processor::process_verified_state_blocks (std::deque<nano::unch
void nano::block_processor::process_batch (nano::unique_lock<std::mutex> & lock_a)
{
auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch);
block_post_events post_events;
block_post_events post_events ([& store = node.store] { return store.tx_begin_read (); });
auto transaction (node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending, tables::unchecked }, { tables::confirmation_height }));
nano::timer<std::chrono::milliseconds> timer_l;
lock_a.lock ();
timer_l.start ();
// Processing blocks
unsigned number_of_blocks_processed (0), number_of_forced_processed (0);
while ((!blocks.empty () || !forced.empty ()) && (timer_l.before_deadline (node.config.block_processor_batch_max_time) || (number_of_blocks_processed < node.flags.block_processor_batch_size)) && !awaiting_write && number_of_blocks_processed < node.store.max_block_write_batch_num ())
unsigned number_of_blocks_processed (0), number_of_forced_processed (0), number_of_updates_processed (0);
auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); };
auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; };
auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; };
while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !awaiting_write && !store_batch_reached ())
{
if ((blocks.size () + state_block_signature_verification.size () + forced.size () > 64) && should_log ())
if ((blocks.size () + state_block_signature_verification.size () + forced.size () + updates.size () > 64) && should_log ())
{
node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced) in processing queue") % blocks.size () % state_block_signature_verification.size () % forced.size ()));
node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced, %4% updates) in processing queue") % blocks.size () % state_block_signature_verification.size () % forced.size () % updates.size ()));
}
nano::unchecked_info info;
nano::block_hash hash (0);
bool force (false);
if (forced.empty ())
if (!updates.empty ())
{
info = blocks.front ();
blocks.pop_front ();
hash = info.block->hash ();
auto block (updates.front ());
updates.pop_front ();
lock_a.unlock ();
auto hash (block->hash ());
if (node.store.block_exists (transaction, hash))
{
node.store.block_put (transaction, hash, *block);
}
++number_of_updates_processed;
}
else
{
info = nano::unchecked_info (forced.front (), 0, nano::seconds_since_epoch (), nano::signature_verification::unknown);
forced.pop_front ();
hash = info.block->hash ();
force = true;
number_of_forced_processed++;
}
lock_a.unlock ();
if (force)
{
auto successor (node.ledger.successor (transaction, info.block->qualified_root ()));
if (successor != nullptr && successor->hash () != hash)
nano::unchecked_info info;
nano::block_hash hash (0);
bool force (false);
if (forced.empty ())
{
// Replace our block with the winner and roll back any dependent blocks
if (node.config.logging.ledger_rollback_logging ())
{
node.logger.always_log (boost::str (boost::format ("Rolling back %1% and replacing with %2%") % successor->hash ().to_string () % hash.to_string ()));
}
std::vector<std::shared_ptr<nano::block>> rollback_list;
if (node.ledger.rollback (transaction, successor->hash (), rollback_list))
{
node.logger.always_log (nano::severity_level::error, boost::str (boost::format ("Failed to roll back %1% because it or a successor was confirmed") % successor->hash ().to_string ()));
}
else if (node.config.logging.ledger_rollback_logging ())
{
node.logger.always_log (boost::str (boost::format ("%1% blocks rolled back") % rollback_list.size ()));
}
// Deleting from votes cache & wallet work watcher, stop active transaction
for (auto & i : rollback_list)
info = blocks.front ();
blocks.pop_front ();
hash = info.block->hash ();
}
else
{
info = nano::unchecked_info (forced.front (), 0, nano::seconds_since_epoch (), nano::signature_verification::unknown);
forced.pop_front ();
hash = info.block->hash ();
force = true;
number_of_forced_processed++;
}
lock_a.unlock ();
if (force)
{
auto successor (node.ledger.successor (transaction, info.block->qualified_root ()));
if (successor != nullptr && successor->hash () != hash)
{
node.history.erase (i->root ());
node.wallets.watcher->remove (*i);
// Stop all rolled back active transactions except initial
if (i->hash () != successor->hash ())
// Replace our block with the winner and roll back any dependent blocks
if (node.config.logging.ledger_rollback_logging ())
{
node.active.erase (*i);
node.logger.always_log (boost::str (boost::format ("Rolling back %1% and replacing with %2%") % successor->hash ().to_string () % hash.to_string ()));
}
std::vector<std::shared_ptr<nano::block>> rollback_list;
if (node.ledger.rollback (transaction, successor->hash (), rollback_list))
{
node.logger.always_log (nano::severity_level::error, boost::str (boost::format ("Failed to roll back %1% because it or a successor was confirmed") % successor->hash ().to_string ()));
}
else if (node.config.logging.ledger_rollback_logging ())
{
node.logger.always_log (boost::str (boost::format ("%1% blocks rolled back") % rollback_list.size ()));
}
// Deleting from votes cache & wallet work watcher, stop active transaction
for (auto & i : rollback_list)
{
node.history.erase (i->root ());
node.wallets.watcher->remove (*i);
// Stop all rolled back active transactions except initial
if (i->hash () != successor->hash ())
{
node.active.erase (*i);
}
}
}
}
number_of_blocks_processed++;
process_one (transaction, post_events, info);
}
number_of_blocks_processed++;
process_one (transaction, post_events, info);
lock_a.lock ();
}
awaiting_write = false;
Expand All @@ -289,7 +329,7 @@ void nano::block_processor::process_batch (nano::unique_lock<std::mutex> & lock_
}
}

void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr<nano::block> const & block_a, nano::process_return const & process_return_a, const bool watch_work_a, nano::block_origin const origin_a)
void nano::block_processor::process_live (nano::transaction const & transaction_a, nano::block_hash const & hash_a, std::shared_ptr<nano::block> const & block_a, nano::process_return const & process_return_a, const bool watch_work_a, nano::block_origin const origin_a)
{
// Add to work watcher to prevent dropping the election
if (watch_work_a)
Expand All @@ -298,7 +338,7 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std::
}

// Start collecting quorum on block
if (watch_work_a || node.ledger.dependents_confirmed (node.store.tx_begin_read (), *block_a))
if (watch_work_a || node.ledger.dependents_confirmed (transaction_a, *block_a))
{
node.active.insert (block_a, process_return_a.previous_balance.number ());
}
Expand Down Expand Up @@ -342,7 +382,7 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction
}
if (info_a.modified > nano::seconds_since_epoch () - 300 && node.block_arrival.recent (hash))
{
events_a.events.emplace_back ([this, hash, block, result, watch_work_a, origin_a]() { process_live (hash, block, result, watch_work_a, origin_a); });
events_a.events.emplace_back ([this, hash, block = info_a.block, result, watch_work_a, origin_a](nano::transaction const & post_event_transaction_a) { process_live (post_event_transaction_a, hash, block, result, watch_work_a, origin_a); });
}
queue_unchecked (transaction_a, hash);
break;
Expand All @@ -361,7 +401,9 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction

nano::unchecked_key unchecked_key (block->previous (), hash);
node.store.unchecked_put (transaction_a, unchecked_key, info_a);
node.gap_cache.add (hash);

events_a.events.emplace_back ([this, hash](nano::transaction const & /* unused */) { this->node.gap_cache.add (hash); });

node.stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_previous);
break;
}
Expand All @@ -379,7 +421,9 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction

nano::unchecked_key unchecked_key (node.ledger.block_source (transaction_a, *(block)), hash);
node.store.unchecked_put (transaction_a, unchecked_key, info_a);
node.gap_cache.add (hash);

events_a.events.emplace_back ([this, hash](nano::transaction const & /* unused */) { this->node.gap_cache.add (hash); });

node.stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source);
break;
}
Expand All @@ -389,7 +433,7 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction
{
node.logger.try_log (boost::str (boost::format ("Old for: %1%") % hash.to_string ()));
}
process_old (transaction_a, block, origin_a);
events_a.events.emplace_back ([this, block = info_a.block, origin_a](nano::transaction const & post_event_transaction_a) { process_old (post_event_transaction_a, block, origin_a); });
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::old);
break;
}
Expand All @@ -399,7 +443,7 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction
{
node.logger.try_log (boost::str (boost::format ("Bad signature for: %1%") % hash.to_string ()));
}
requeue_invalid (hash, info_a);
events_a.events.emplace_back ([this, hash, info_a](nano::transaction const & /* unused */) { requeue_invalid (hash, info_a); });
break;
}
case nano::process_result::negative_spend:
Expand All @@ -420,7 +464,7 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction
}
case nano::process_result::fork:
{
node.process_fork (transaction_a, block, info_a.modified);
events_a.events.emplace_back ([this, block = info_a.block, modified = info_a.modified](nano::transaction const & post_event_transaction_a) { this->node.process_fork (post_event_transaction_a, block, modified); });
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::fork);
if (node.config.logging.ledger_logging ())
{
Expand Down Expand Up @@ -476,10 +520,10 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction
return result;
}

void nano::block_processor::process_old (nano::write_transaction const & transaction_a, std::shared_ptr<nano::block> const & block_a, nano::block_origin const origin_a)
void nano::block_processor::process_old (nano::transaction const & transaction_a, std::shared_ptr<nano::block> const & block_a, nano::block_origin const origin_a)
{
// First try to update election difficulty, then attempt to restart an election
if (!node.active.update_difficulty (*block_a) || !node.active.restart (block_a, transaction_a))
if (!node.active.update_difficulty (*block_a) || !node.active.restart (transaction_a, block_a))
{
// Let others know about the difficulty update
if (origin_a == nano::block_origin::local)
Expand Down
Loading