From f7a61ef4cd2b84ae3b05d86dd8c9c37240fcdd6a Mon Sep 17 00:00:00 2001 From: Ruslan Tushov Date: Fri, 29 Nov 2024 16:34:52 +0500 Subject: [PATCH] parallel apply blocks (#2284) Signed-off-by: turuslan Co-authored-by: kamilsa --- core/network/impl/synchronizer_impl.cpp | 143 +++++++++--------------- core/network/impl/synchronizer_impl.hpp | 5 +- 2 files changed, 56 insertions(+), 92 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index eca774951e..a2f9292b37 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -30,6 +30,7 @@ #include "storage/trie/trie_storage.hpp" #include "storage/trie_pruner/trie_pruner.hpp" #include "utils/pool_handler_ready_make.hpp" +#include "utils/weak_macro.hpp" OUTCOME_CPP_DEFINE_CATEGORY(kagome::network, SynchronizerImpl::Error, e) { using E = kagome::network::SynchronizerImpl::Error; @@ -767,7 +768,7 @@ namespace kagome::network { BlockInfo(header.number, block.hash), peer_id); - self->generations_.emplace(header.number, block.hash); + self->generations_.emplace(header.blockInfo()); self->ancestry_.emplace(header.parent_hash, block.hash); some_blocks_added = true; @@ -899,53 +900,38 @@ namespace kagome::network { hash); processBlockAdditionResult(block_addition_result, hash, std::move(handler)); - postApplyBlock(hash); + ancestry_.erase(hash); + postApplyBlock(); } void SynchronizerImpl::applyNextBlock() { + auto any_block_applied = false; if (generations_.empty()) { - SL_TRACE(log_, "No block for applying"); - return; - } - - bool false_val = false; - if (not applying_in_progress_.compare_exchange_strong(false_val, true)) { - SL_TRACE(log_, "Applying in progress"); return; } - SL_TRACE(log_, "Begin applying"); - ::libp2p::common::MovableFinalAction cleanup([weak{weak_from_this()}] { - if (auto self = weak.lock()) { - SL_TRACE(self->log_, "End applying"); - self->applying_in_progress_ = false; - } - }); - - primitives::BlockHash hash; - - while (true) { - auto generation_node = generations_.extract(generations_.begin()); - if (generation_node) { - hash = generation_node.mapped(); - break; - } - if (generations_.empty()) { - SL_TRACE(log_, "No block for applying"); - return; + while (not generations_.empty()) { + auto block_info = *generations_.begin(); + auto pop = [&] { generations_.erase(generations_.begin()); }; + auto it = known_blocks_.find(block_info.hash); + if (it == known_blocks_.end()) { + pop(); + continue; } - } - - if (auto it = known_blocks_.find(hash); it != known_blocks_.end()) { auto &block_data = it->second.data; BOOST_ASSERT(block_data.header.has_value()); - const BlockInfo block_info(block_data.header->number, block_data.hash); + auto parent = block_data.header->parentInfo(); + if (parent and not block_tree_->has(parent->hash)) { + break; + } + pop(); + any_block_applied = true; const auto &last_finalized_block = block_tree_->getLastFinalized(); SyncResultHandler handler; if (watched_blocks_number_ == block_data.header->number) { - if (auto wbn_node = watched_blocks_.extract(hash)) { + if (auto wbn_node = watched_blocks_.extract(block_info.hash)) { handler = std::move(wbn_node.mapped()); } } @@ -953,7 +939,7 @@ namespace kagome::network { // Skip applied and finalized blocks and // discard side-chain below last finalized if (block_data.header->number <= last_finalized_block.number) { - if (not block_tree_->has(hash)) { + if (not block_tree_->has(block_info.hash)) { auto n = discardBlock(block_data.hash); SL_WARN( log_, @@ -966,18 +952,13 @@ namespace kagome::network { } } else { - auto callback = - [wself{weak_from_this()}, - hash, - handler{std::move(handler)}, - cleanup = std::make_shared(std::move(cleanup))]( - auto &&block_addition_result) mutable { - cleanup.reset(); - if (auto self = wself.lock()) { - self->post_block_addition( - std::move(block_addition_result), std::move(handler), hash); - } - }; + auto callback = [WEAK_SELF, block_info, handler{std::move(handler)}]( + auto &&block_addition_result) mutable { + WEAK_LOCK(self); + self->post_block_addition(std::move(block_addition_result), + std::move(handler), + block_info.hash); + }; if (sync_method_ == application::SyncMethod::Full) { // Regular syncing @@ -1013,8 +994,11 @@ namespace kagome::network { } return; } + ancestry_.erase(block_info.hash); + } + if (any_block_applied) { + postApplyBlock(); } - postApplyBlock(hash); } void SynchronizerImpl::processBlockAdditionResult( @@ -1062,9 +1046,7 @@ namespace kagome::network { } } - void SynchronizerImpl::postApplyBlock(const primitives::BlockHash &hash) { - ancestry_.erase(hash); - + void SynchronizerImpl::postApplyBlock() { auto minPreloadedBlockAmount = sync_method_ == application::SyncMethod::Full ? kMinPreloadedBlockAmount : kMinPreloadedBlockAmountForFastSyncing; @@ -1118,29 +1100,20 @@ namespace kagome::network { void SynchronizerImpl::prune(const primitives::BlockInfo &finalized_block) { // Remove blocks whose numbers less finalized one while (not generations_.empty()) { - auto generation_node = generations_.extract(generations_.begin()); - if (generation_node) { - const auto &number = generation_node.key(); - if (number >= finalized_block.number) { - break; - } - const auto &hash = generation_node.mapped(); - notifySubscribers({number, hash}, Error::DISCARDED_BLOCK); - - known_blocks_.erase(hash); - ancestry_.erase(hash); + auto block_info = *generations_.begin(); + if (block_info.number > finalized_block.number) { + break; } - } - - // Remove blocks whose numbers equal finalized one, excluding finalized - // one - auto range = generations_.equal_range(finalized_block.number); - for (auto it = range.first; it != range.second;) { - auto cit = it++; - const auto &hash = cit->second; - if (hash != finalized_block.hash) { - discardBlock(hash); + if (block_info.number == finalized_block.number) { + if (block_info.hash != finalized_block.hash) { + discardBlock(block_info.hash); + } + continue; } + generations_.erase(*generations_.begin()); + notifySubscribers(block_info, Error::DISCARDED_BLOCK); + known_blocks_.erase(block_info.hash); + ancestry_.erase(block_info.hash); } metric_import_queue_length_->set(known_blocks_.size()); @@ -1169,18 +1142,13 @@ namespace kagome::network { for (auto g_it = generations_.rbegin(); g_it != generations_.rend(); ++g_it) { - const auto &hash = g_it->second; - - auto b_it = known_blocks_.find(hash); + auto &block_info = *g_it; + auto b_it = known_blocks_.find(block_info.hash); if (b_it == known_blocks_.end()) { - SL_TRACE(log_, - "Block {} is unknown. Go to next one", - primitives::BlockInfo(g_it->first, hash)); + SL_TRACE(log_, "Block {} is unknown. Go to next one", block_info); continue; } - primitives::BlockInfo block_info(g_it->first, hash); - auto &peers = b_it->second.peers; if (peers.empty()) { SL_TRACE( @@ -1194,10 +1162,7 @@ namespace kagome::network { auto &peer_id = *cp_it; if (busy_peers_.find(peer_id) != busy_peers_.end()) { - SL_TRACE(log_, - "Peer {} for block {} is busy", - peer_id, - primitives::BlockInfo(g_it->first, hash)); + SL_TRACE(log_, "Peer {} for block {} is busy", peer_id, block_info); continue; } @@ -1229,16 +1194,16 @@ namespace kagome::network { }; if (sync_method_ == application::SyncMethod::Full) { - auto lower = generations_.begin()->first; - auto upper = generations_.rbegin()->first + 1; - auto hint = generations_.rbegin()->first; + auto lower = generations_.begin()->number; + auto upper = generations_.rbegin()->number + 1; + auto hint = generations_.rbegin()->number; SL_DEBUG( log_, "Start to find common block with {} in #{}..#{} to fill queue", peer_id, - generations_.begin()->first, - generations_.rbegin()->first); + generations_.begin()->number, + generations_.rbegin()->number); findCommonBlock( peer_id, lower, @@ -1278,7 +1243,7 @@ namespace kagome::network { SL_TRACE(log_, "Block {} doesn't have appropriate peer. Go to next one", - primitives::BlockInfo(g_it->first, hash)); + block_info); } SL_TRACE(log_, "End asking portion of blocks: none"); diff --git a/core/network/impl/synchronizer_impl.hpp b/core/network/impl/synchronizer_impl.hpp index c139387ecc..6f932976c4 100644 --- a/core/network/impl/synchronizer_impl.hpp +++ b/core/network/impl/synchronizer_impl.hpp @@ -191,7 +191,7 @@ namespace kagome::network { SyncResultHandler &&handler); private: - void postApplyBlock(const primitives::BlockHash &hash); + void postApplyBlock(); void processBlockAdditionResult(outcome::result block_addition_result, const primitives::BlockHash &hash, SyncResultHandler &&handler); @@ -292,7 +292,7 @@ namespace kagome::network { std::unordered_map known_blocks_; // Blocks grouped by number - std::multimap generations_; + std::set generations_; // Links parent->child std::unordered_multimap @@ -307,7 +307,6 @@ namespace kagome::network { std::multimap subscriptions_; - std::atomic_bool applying_in_progress_ = false; std::atomic_bool asking_blocks_portion_in_progress_ = false; std::set busy_peers_; std::unordered_set load_blocks_;