Skip to content

Commit

Permalink
parallel apply blocks (#2284)
Browse files Browse the repository at this point in the history
Signed-off-by: turuslan <[email protected]>
Co-authored-by: kamilsa <[email protected]>
  • Loading branch information
turuslan and kamilsa authored Nov 29, 2024
1 parent 778d214 commit f7a61ef
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 92 deletions.
143 changes: 54 additions & 89 deletions core/network/impl/synchronizer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "storage/trie/trie_storage.hpp"
#include "storage/trie_pruner/trie_pruner.hpp"
#include "utils/pool_handler_ready_make.hpp"
#include "utils/weak_macro.hpp"

OUTCOME_CPP_DEFINE_CATEGORY(kagome::network, SynchronizerImpl::Error, e) {
using E = kagome::network::SynchronizerImpl::Error;
Expand Down Expand Up @@ -767,7 +768,7 @@ namespace kagome::network {
BlockInfo(header.number, block.hash),
peer_id);

self->generations_.emplace(header.number, block.hash);
self->generations_.emplace(header.blockInfo());
self->ancestry_.emplace(header.parent_hash, block.hash);

some_blocks_added = true;
Expand Down Expand Up @@ -899,61 +900,46 @@ namespace kagome::network {
hash);

processBlockAdditionResult(block_addition_result, hash, std::move(handler));
postApplyBlock(hash);
ancestry_.erase(hash);
postApplyBlock();
}

void SynchronizerImpl::applyNextBlock() {
auto any_block_applied = false;
if (generations_.empty()) {
SL_TRACE(log_, "No block for applying");
return;
}

bool false_val = false;
if (not applying_in_progress_.compare_exchange_strong(false_val, true)) {
SL_TRACE(log_, "Applying in progress");
return;
}
SL_TRACE(log_, "Begin applying");
::libp2p::common::MovableFinalAction cleanup([weak{weak_from_this()}] {
if (auto self = weak.lock()) {
SL_TRACE(self->log_, "End applying");
self->applying_in_progress_ = false;
}
});

primitives::BlockHash hash;

while (true) {
auto generation_node = generations_.extract(generations_.begin());
if (generation_node) {
hash = generation_node.mapped();
break;
}
if (generations_.empty()) {
SL_TRACE(log_, "No block for applying");
return;
while (not generations_.empty()) {
auto block_info = *generations_.begin();
auto pop = [&] { generations_.erase(generations_.begin()); };
auto it = known_blocks_.find(block_info.hash);
if (it == known_blocks_.end()) {
pop();
continue;
}
}

if (auto it = known_blocks_.find(hash); it != known_blocks_.end()) {
auto &block_data = it->second.data;
BOOST_ASSERT(block_data.header.has_value());
const BlockInfo block_info(block_data.header->number, block_data.hash);
auto parent = block_data.header->parentInfo();
if (parent and not block_tree_->has(parent->hash)) {
break;
}
pop();
any_block_applied = true;

const auto &last_finalized_block = block_tree_->getLastFinalized();

SyncResultHandler handler;

if (watched_blocks_number_ == block_data.header->number) {
if (auto wbn_node = watched_blocks_.extract(hash)) {
if (auto wbn_node = watched_blocks_.extract(block_info.hash)) {
handler = std::move(wbn_node.mapped());
}
}

// Skip applied and finalized blocks and
// discard side-chain below last finalized
if (block_data.header->number <= last_finalized_block.number) {
if (not block_tree_->has(hash)) {
if (not block_tree_->has(block_info.hash)) {
auto n = discardBlock(block_data.hash);
SL_WARN(
log_,
Expand All @@ -966,18 +952,13 @@ namespace kagome::network {
}

} else {
auto callback =
[wself{weak_from_this()},
hash,
handler{std::move(handler)},
cleanup = std::make_shared<decltype(cleanup)>(std::move(cleanup))](
auto &&block_addition_result) mutable {
cleanup.reset();
if (auto self = wself.lock()) {
self->post_block_addition(
std::move(block_addition_result), std::move(handler), hash);
}
};
auto callback = [WEAK_SELF, block_info, handler{std::move(handler)}](
auto &&block_addition_result) mutable {
WEAK_LOCK(self);
self->post_block_addition(std::move(block_addition_result),
std::move(handler),
block_info.hash);
};

if (sync_method_ == application::SyncMethod::Full) {
// Regular syncing
Expand Down Expand Up @@ -1013,8 +994,11 @@ namespace kagome::network {
}
return;
}
ancestry_.erase(block_info.hash);
}
if (any_block_applied) {
postApplyBlock();
}
postApplyBlock(hash);
}

void SynchronizerImpl::processBlockAdditionResult(
Expand Down Expand Up @@ -1062,9 +1046,7 @@ namespace kagome::network {
}
}

void SynchronizerImpl::postApplyBlock(const primitives::BlockHash &hash) {
ancestry_.erase(hash);

void SynchronizerImpl::postApplyBlock() {
auto minPreloadedBlockAmount = sync_method_ == application::SyncMethod::Full
? kMinPreloadedBlockAmount
: kMinPreloadedBlockAmountForFastSyncing;
Expand Down Expand Up @@ -1118,29 +1100,20 @@ namespace kagome::network {
void SynchronizerImpl::prune(const primitives::BlockInfo &finalized_block) {
// Remove blocks whose numbers less finalized one
while (not generations_.empty()) {
auto generation_node = generations_.extract(generations_.begin());
if (generation_node) {
const auto &number = generation_node.key();
if (number >= finalized_block.number) {
break;
}
const auto &hash = generation_node.mapped();
notifySubscribers({number, hash}, Error::DISCARDED_BLOCK);

known_blocks_.erase(hash);
ancestry_.erase(hash);
auto block_info = *generations_.begin();
if (block_info.number > finalized_block.number) {
break;
}
}

// Remove blocks whose numbers equal finalized one, excluding finalized
// one
auto range = generations_.equal_range(finalized_block.number);
for (auto it = range.first; it != range.second;) {
auto cit = it++;
const auto &hash = cit->second;
if (hash != finalized_block.hash) {
discardBlock(hash);
if (block_info.number == finalized_block.number) {
if (block_info.hash != finalized_block.hash) {
discardBlock(block_info.hash);
}
continue;
}
generations_.erase(*generations_.begin());
notifySubscribers(block_info, Error::DISCARDED_BLOCK);
known_blocks_.erase(block_info.hash);
ancestry_.erase(block_info.hash);
}

metric_import_queue_length_->set(known_blocks_.size());
Expand Down Expand Up @@ -1169,18 +1142,13 @@ namespace kagome::network {

for (auto g_it = generations_.rbegin(); g_it != generations_.rend();
++g_it) {
const auto &hash = g_it->second;

auto b_it = known_blocks_.find(hash);
auto &block_info = *g_it;
auto b_it = known_blocks_.find(block_info.hash);
if (b_it == known_blocks_.end()) {
SL_TRACE(log_,
"Block {} is unknown. Go to next one",
primitives::BlockInfo(g_it->first, hash));
SL_TRACE(log_, "Block {} is unknown. Go to next one", block_info);
continue;
}

primitives::BlockInfo block_info(g_it->first, hash);

auto &peers = b_it->second.peers;
if (peers.empty()) {
SL_TRACE(
Expand All @@ -1194,10 +1162,7 @@ namespace kagome::network {
auto &peer_id = *cp_it;

if (busy_peers_.find(peer_id) != busy_peers_.end()) {
SL_TRACE(log_,
"Peer {} for block {} is busy",
peer_id,
primitives::BlockInfo(g_it->first, hash));
SL_TRACE(log_, "Peer {} for block {} is busy", peer_id, block_info);
continue;
}

Expand Down Expand Up @@ -1229,16 +1194,16 @@ namespace kagome::network {
};

if (sync_method_ == application::SyncMethod::Full) {
auto lower = generations_.begin()->first;
auto upper = generations_.rbegin()->first + 1;
auto hint = generations_.rbegin()->first;
auto lower = generations_.begin()->number;
auto upper = generations_.rbegin()->number + 1;
auto hint = generations_.rbegin()->number;

SL_DEBUG(
log_,
"Start to find common block with {} in #{}..#{} to fill queue",
peer_id,
generations_.begin()->first,
generations_.rbegin()->first);
generations_.begin()->number,
generations_.rbegin()->number);
findCommonBlock(
peer_id,
lower,
Expand Down Expand Up @@ -1278,7 +1243,7 @@ namespace kagome::network {

SL_TRACE(log_,
"Block {} doesn't have appropriate peer. Go to next one",
primitives::BlockInfo(g_it->first, hash));
block_info);
}

SL_TRACE(log_, "End asking portion of blocks: none");
Expand Down
5 changes: 2 additions & 3 deletions core/network/impl/synchronizer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ namespace kagome::network {
SyncResultHandler &&handler);

private:
void postApplyBlock(const primitives::BlockHash &hash);
void postApplyBlock();
void processBlockAdditionResult(outcome::result<void> block_addition_result,
const primitives::BlockHash &hash,
SyncResultHandler &&handler);
Expand Down Expand Up @@ -292,7 +292,7 @@ namespace kagome::network {
std::unordered_map<primitives::BlockHash, KnownBlock> known_blocks_;

// Blocks grouped by number
std::multimap<primitives::BlockNumber, primitives::BlockHash> generations_;
std::set<primitives::BlockInfo> generations_;

// Links parent->child
std::unordered_multimap<primitives::BlockHash, primitives::BlockHash>
Expand All @@ -307,7 +307,6 @@ namespace kagome::network {

std::multimap<primitives::BlockInfo, SyncResultHandler> subscriptions_;

std::atomic_bool applying_in_progress_ = false;
std::atomic_bool asking_blocks_portion_in_progress_ = false;
std::set<libp2p::peer::PeerId> busy_peers_;
std::unordered_set<primitives::BlockInfo> load_blocks_;
Expand Down

0 comments on commit f7a61ef

Please sign in to comment.