From 72d5f4256195afe56dd9a1be004627f2c0dc6ced Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 21 Oct 2024 16:04:55 +0200 Subject: [PATCH 1/4] Issue block processor batch processed notifications on background thread --- nano/lib/thread_roles.cpp | 3 ++ nano/lib/thread_roles.hpp | 1 + nano/node/blockprocessor.cpp | 45 ++++++++++++++++------- nano/node/blockprocessor.hpp | 6 +++ nano/node/bootstrap_ascending/service.cpp | 10 +++-- nano/node/confirming_set.cpp | 13 ++++--- nano/node/confirming_set.hpp | 4 +- 7 files changed, 57 insertions(+), 25 deletions(-) diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 76f467cd0e..ff6831b4d8 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -37,6 +37,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::block_processing: thread_role_name_string = "Blck processing"; break; + case nano::thread_role::name::block_processing_notifications: + thread_role_name_string = "Blck proc notif"; + break; case nano::thread_role::name::request_loop: thread_role_name_string = "Request loop"; break; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 5896318c42..a82a584727 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -17,6 +17,7 @@ enum class name vote_processing, vote_cache_processing, block_processing, + block_processing_notifications, request_loop, wallet_actions, bootstrap_initiator, diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index d0e8716494..b6075ceb04 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -41,7 +41,8 @@ void nano::block_processor::context::set_result (result_t const & result) nano::block_processor::block_processor (nano::node & node_a) : config{ node_a.config.block_processor }, node (node_a), - next_log (std::chrono::steady_clock::now ()) + next_log (std::chrono::steady_clock::now ()), + workers{ 1, nano::thread_role::name::block_processing_notifications } { batch_processed.add ([this] (auto const & items) { // For every batch item: notify the 'processed' observer. @@ -84,12 +85,15 @@ nano::block_processor::~block_processor () { // Thread must be stopped before destruction debug_assert (!thread.joinable ()); + debug_assert (!workers.alive ()); } void nano::block_processor::start () { debug_assert (!thread.joinable ()); + workers.start (); + thread = std::thread ([this] () { nano::thread_role::set (nano::thread_role::name::block_processing); run (); @@ -107,6 +111,7 @@ void nano::block_processor::stop () { thread.join (); } + workers.stop (); } // TODO: Remove and replace all checks with calls to size (block_source) @@ -234,6 +239,17 @@ void nano::block_processor::run () { if (!queue.empty ()) { + // It's possible that ledger processing happens faster than the notifications can be processed by other components, cooldown here + while (workers.queued_tasks () >= config.max_queued_notifications) + { + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::cooldown); + condition.wait_for (lock, 100ms, [this] { return stopped; }); + if (stopped) + { + return; + } + } + // TODO: Cleaner periodical logging if (should_log ()) { @@ -244,20 +260,22 @@ void nano::block_processor::run () auto processed = process_batch (lock); debug_assert (!lock.owns_lock ()); + lock.lock (); - // Set results for futures when not holding the lock - for (auto & [result, context] : processed) - { - if (context.callback) + // Queue notifications to be dispatched in the background + workers.post ([this, processed = std::move (processed)] () mutable { + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::notify); + // Set results for futures when not holding the lock + for (auto & [result, context] : processed) { - context.callback (result); + if (context.callback) + { + context.callback (result); + } + context.set_result (result); } - context.set_result (result); - } - - batch_processed.notify (processed); - - lock.lock (); + batch_processed.notify (processed); + }); } else { @@ -315,7 +333,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock debug_assert (!mutex.try_lock ()); debug_assert (!queue.empty ()); - auto batch = next_batch (256); + auto batch = next_batch (config.batch_size); lock.unlock (); @@ -466,6 +484,7 @@ nano::container_info nano::block_processor::container_info () const info.put ("blocks", queue.size ()); info.put ("forced", queue.size ({ nano::block_source::forced })); info.add ("queue", queue.container_info ()); + info.add ("workers", workers.container_info ()); return info; } diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 837631b0ba..6313e9bd51 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -46,6 +47,9 @@ class block_processor_config final size_t priority_live{ 1 }; size_t priority_bootstrap{ 8 }; size_t priority_local{ 16 }; + + size_t batch_size{ 256 }; + size_t max_queued_notifications{ 8 }; }; /** @@ -128,5 +132,7 @@ class block_processor final nano::condition_variable condition; mutable nano::mutex mutex{ mutex_identifier (mutexes::block_processor) }; std::thread thread; + + nano::thread_pool workers; }; } diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index 776f0b3d8e..d14f17c520 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -33,7 +33,6 @@ nano::bootstrap_ascending::service::service (nano::node_config const & node_conf scoring{ config, node_config_a.network_params.network }, database_limiter{ config.database_rate_limit, 1.0 } { - // TODO: This is called from a very congested blockprocessor thread. Offload this work to a dedicated processing thread block_processor.batch_processed.add ([this] (auto const & batch) { { nano::lock_guard lock{ mutex }; @@ -217,11 +216,14 @@ void nano::bootstrap_ascending::service::inspect (secure::transaction const & tx { if (source == nano::block_source::bootstrap) { - const auto account = block.previous ().is_zero () ? block.account_field ().value () : ledger.any.block_account (tx, block.previous ()).value (); + const auto account = block.previous ().is_zero () ? block.account_field ().value () : ledger.any.block_account (tx, block.previous ()).value_or (0); const auto source_hash = block.source_field ().value_or (block.link_field ().value_or (0).as_block_hash ()); - // Mark account as blocked because it is missing the source block - accounts.block (account, source_hash); + if (!account.is_zero () && !source_hash.is_zero ()) + { + // Mark account as blocked because it is missing the source block + accounts.block (account, source_hash); + } } } break; diff --git a/nano/node/confirming_set.cpp b/nano/node/confirming_set.cpp index e504b0acae..2921881b52 100644 --- a/nano/node/confirming_set.cpp +++ b/nano/node/confirming_set.cpp @@ -12,7 +12,7 @@ nano::confirming_set::confirming_set (confirming_set_config const & config_a, na ledger{ ledger_a }, stats{ stats_a }, logger{ logger_a }, - notification_workers{ 1, nano::thread_role::name::confirmation_height_notifications } + workers{ 1, nano::thread_role::name::confirmation_height_notifications } { batch_cemented.add ([this] (auto const & cemented) { for (auto const & context : cemented) @@ -55,7 +55,7 @@ void nano::confirming_set::start () return; } - notification_workers.start (); + workers.start (); thread = std::thread{ [this] () { nano::thread_role::set (nano::thread_role::name::confirmation_height); @@ -74,7 +74,7 @@ void nano::confirming_set::stop () { thread.join (); } - notification_workers.stop (); + workers.stop (); } bool nano::confirming_set::contains (nano::block_hash const & hash) const @@ -150,7 +150,7 @@ void nano::confirming_set::run_batch (std::unique_lock & lock) std::unique_lock lock{ mutex }; // It's possible that ledger cementing happens faster than the notifications can be processed by other components, cooldown here - while (notification_workers.queued_tasks () >= config.max_queued_notifications) + while (workers.queued_tasks () >= config.max_queued_notifications) { stats.inc (nano::stat::type::confirming_set, nano::stat::detail::cooldown); condition.wait_for (lock, 100ms, [this] { return stopped.load (); }); @@ -160,7 +160,7 @@ void nano::confirming_set::run_batch (std::unique_lock & lock) } } - notification_workers.post ([this, batch = std::move (batch)] () { + workers.post ([this, batch = std::move (batch)] () { stats.inc (nano::stat::type::confirming_set, nano::stat::detail::notify); batch_cemented.notify (batch); }); @@ -255,6 +255,7 @@ nano::container_info nano::confirming_set::container_info () const nano::container_info info; info.put ("set", set); - info.add ("notification_workers", notification_workers.container_info ()); + info.put ("notifications", workers.queued_tasks ()); + info.add ("workers", workers.container_info ()); return info; } diff --git a/nano/node/confirming_set.hpp b/nano/node/confirming_set.hpp index 99569ce1e6..644b241c92 100644 --- a/nano/node/confirming_set.hpp +++ b/nano/node/confirming_set.hpp @@ -105,11 +105,11 @@ class confirming_set final ordered_entries set; std::unordered_set current; - nano::thread_pool notification_workers; - std::atomic stopped{ false }; mutable std::mutex mutex; std::condition_variable condition; std::thread thread; + + nano::thread_pool workers; }; } From f350100903411f7867a5afa7c8e43039ad3d4628 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 27 Oct 2024 11:06:29 +0100 Subject: [PATCH 2/4] Remove `should_log` function --- nano/node/blockprocessor.cpp | 19 +++---------------- nano/node/blockprocessor.hpp | 3 --- 2 files changed, 3 insertions(+), 19 deletions(-) diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index b6075ceb04..3e5ccf084e 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -40,8 +40,7 @@ void nano::block_processor::context::set_result (result_t const & result) nano::block_processor::block_processor (nano::node & node_a) : config{ node_a.config.block_processor }, - node (node_a), - next_log (std::chrono::steady_clock::now ()), + node{ node_a }, workers{ 1, nano::thread_role::name::block_processing_notifications } { batch_processed.add ([this] (auto const & items) { @@ -234,6 +233,7 @@ void nano::block_processor::rollback_competitor (secure::write_transaction const void nano::block_processor::run () { + nano::interval log_interval; nano::unique_lock lock{ mutex }; while (!stopped) { @@ -250,8 +250,7 @@ void nano::block_processor::run () } } - // TODO: Cleaner periodical logging - if (should_log ()) + if (log_interval.elapsed (15s)) { node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", queue.size (), @@ -285,18 +284,6 @@ void nano::block_processor::run () } } -bool nano::block_processor::should_log () -{ - auto result (false); - auto now (std::chrono::steady_clock::now ()); - if (next_log < now) - { - next_log = now + std::chrono::seconds (15); - result = true; - } - return result; -} - auto nano::block_processor::next () -> context { debug_assert (!mutex.try_lock ()); diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 6313e9bd51..cb54711b0e 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -93,7 +93,6 @@ class block_processor final bool add (std::shared_ptr const &, nano::block_source = nano::block_source::live, std::shared_ptr const & channel = nullptr, std::function callback = {}); std::optional add_blocking (std::shared_ptr const & block, nano::block_source); void force (std::shared_ptr const &); - bool should_log (); nano::container_info container_info () const; @@ -126,8 +125,6 @@ class block_processor final private: nano::fair_queue queue; - std::chrono::steady_clock::time_point next_log; - bool stopped{ false }; nano::condition_variable condition; mutable nano::mutex mutex{ mutex_identifier (mutexes::block_processor) }; From 3cd1dabc5e3ec821e50a827f083cc506d5166d69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 27 Oct 2024 11:09:40 +0100 Subject: [PATCH 3/4] Move `block_processor::context` source to the bottom of the file --- nano/node/blockprocessor.cpp | 44 ++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 3e5ccf084e..36e0a79779 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -12,28 +12,6 @@ #include -/* - * block_processor::context - */ - -nano::block_processor::context::context (std::shared_ptr block, nano::block_source source_a, callback_t callback_a) : - block{ std::move (block) }, - source{ source_a }, - callback{ std::move (callback_a) } -{ - debug_assert (source != nano::block_source::unknown); -} - -auto nano::block_processor::context::get_future () -> std::future -{ - return promise.get_future (); -} - -void nano::block_processor::context::set_result (result_t const & result) -{ - promise.set_value (result); -} - /* * block_processor */ @@ -475,6 +453,28 @@ nano::container_info nano::block_processor::container_info () const return info; } +/* + * block_processor::context + */ + +nano::block_processor::context::context (std::shared_ptr block, nano::block_source source_a, callback_t callback_a) : + block{ std::move (block) }, + source{ source_a }, + callback{ std::move (callback_a) } +{ + debug_assert (source != nano::block_source::unknown); +} + +auto nano::block_processor::context::get_future () -> std::future +{ + return promise.get_future (); +} + +void nano::block_processor::context::set_result (result_t const & result) +{ + promise.set_value (result); +} + /* * block_processor_config */ From bac144fc671ab86b7db1b315bae1b2c851ef821d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 27 Oct 2024 11:11:10 +0100 Subject: [PATCH 4/4] Remove unnecessary notify --- nano/node/blockprocessor.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 36e0a79779..34d6956a8b 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -256,8 +256,9 @@ void nano::block_processor::run () } else { - condition.notify_one (); - condition.wait (lock); + condition.wait (lock, [this] { + return stopped || !queue.empty (); + }); } } }