diff --git a/core/injector/application_injector.cpp b/core/injector/application_injector.cpp index fb1fb03a4a..6e063f3f57 100644 --- a/core/injector/application_injector.cpp +++ b/core/injector/application_injector.cpp @@ -1012,9 +1012,10 @@ namespace kagome::injector { .template create>(); } - std::shared_ptr KagomeNodeInjector::injectStatementDistribution() { - return pimpl_->injector_ - .template create>(); + std::shared_ptr + KagomeNodeInjector::injectStatementDistribution() { + return pimpl_->injector_.template create< + sptr>(); } std::shared_ptr diff --git a/core/injector/application_injector.hpp b/core/injector/application_injector.hpp index 3ca511118b..89ae6faaf6 100644 --- a/core/injector/application_injector.hpp +++ b/core/injector/application_injector.hpp @@ -61,7 +61,7 @@ namespace kagome { struct ParachainProcessorImpl; struct ApprovalDistribution; - namespace statement_distribution{ + namespace statement_distribution { struct StatementDistribution; } } // namespace parachain diff --git a/core/network/impl/peer_manager_impl.cpp b/core/network/impl/peer_manager_impl.cpp index cb001d3b53..89af89e677 100644 --- a/core/network/impl/peer_manager_impl.cpp +++ b/core/network/impl/peer_manager_impl.cpp @@ -192,7 +192,8 @@ namespace kagome::network { self->sync_peer_num_->set(self->active_peers_.size()); self->peers_count_metric_->set(self->active_peers_.size()); - self->peer_event_engine_->notify(primitives::events::PeerEventType::kDisconnected, peer_id); + self->peer_event_engine_->notify( + primitives::events::PeerEventType::kDisconnected, peer_id); SL_DEBUG(self->log_, "Remained {} active peers", self->active_peers_.size()); @@ -863,7 +864,8 @@ namespace kagome::network { } } - self->peer_event_engine_->notify(primitives::events::PeerEventType::kConnected, peer_info.id); + self->peer_event_engine_->notify( + primitives::events::PeerEventType::kConnected, peer_info.id); self->tryOpenGrandpaProtocol(peer_info, peer_state.value().get()); self->tryOpenValidationProtocol( peer_info, diff --git a/core/network/impl/protocols/request_response_protocol.hpp b/core/network/impl/protocols/request_response_protocol.hpp index dcbf2993f6..326b8e3b32 100644 --- a/core/network/impl/protocols/request_response_protocol.hpp +++ b/core/network/impl/protocols/request_response_protocol.hpp @@ -86,7 +86,8 @@ namespace kagome::network { }); } - void writeResponseAsync(std::shared_ptr stream, ResponseType response) { + void writeResponseAsync(std::shared_ptr stream, + ResponseType response) { writeResponse(std::move(stream), std::move(response)); } diff --git a/core/parachain/approval/approval_thread_pool.hpp b/core/parachain/approval/approval_thread_pool.hpp index 40ab61b0eb..711afe0e56 100644 --- a/core/parachain/approval/approval_thread_pool.hpp +++ b/core/parachain/approval/approval_thread_pool.hpp @@ -19,6 +19,7 @@ namespace kagome::parachain { class StatementDistributionThreadPool final : public ThreadPool { public: StatementDistributionThreadPool(std::shared_ptr watchdog) - : ThreadPool(std::move(watchdog), "statement-distribution", 1, std::nullopt) {} + : ThreadPool( + std::move(watchdog), "statement-distribution", 1, std::nullopt) {} }; } // namespace kagome::parachain diff --git a/core/parachain/availability/bitfield/signer.cpp b/core/parachain/availability/bitfield/signer.cpp index 3eda326730..f138065aed 100644 --- a/core/parachain/availability/bitfield/signer.cpp +++ b/core/parachain/availability/bitfield/signer.cpp @@ -7,8 +7,8 @@ #include "parachain/availability/bitfield/signer.hpp" #include "log/logger.hpp" -#include "primitives/block_header.hpp" #include "parachain/availability/availability_chunk_index.hpp" +#include "primitives/block_header.hpp" namespace kagome::parachain { constexpr std::chrono::milliseconds kDelay{1500}; @@ -93,13 +93,15 @@ namespace kagome::parachain { n_validators += group.size(); } - SL_DEBUG(logger_, "chunk mapping is enabled: {}", availability_chunk_mapping_is_enabled(node_features) ? "YES" : "NO"); - OUTCOME_TRY(chunk_index, availability_chunk_index( - node_features, - n_validators, - core_index, - signer->validatorIndex() - )); + SL_DEBUG(logger_, + "chunk mapping is enabled: {}", + availability_chunk_mapping_is_enabled(node_features) ? "YES" + : "NO"); + OUTCOME_TRY(chunk_index, + availability_chunk_index(node_features, + n_validators, + core_index, + signer->validatorIndex())); fetch_->fetch(chunk_index, *occupied, *session); } else { candidates.emplace_back(std::nullopt); diff --git a/core/parachain/availability/store/store_impl.cpp b/core/parachain/availability/store/store_impl.cpp index c4571d4e28..4e4972b8e7 100644 --- a/core/parachain/availability/store/store_impl.cpp +++ b/core/parachain/availability/store/store_impl.cpp @@ -9,9 +9,8 @@ constexpr uint64_t KEEP_CANDIDATES_TIMEOUT = 10 * 60; namespace kagome::parachain { - AvailabilityStoreImpl::AvailabilityStoreImpl(clock::SteadyClock &steady_clock) : steady_clock_(steady_clock) { - - } + AvailabilityStoreImpl::AvailabilityStoreImpl(clock::SteadyClock &steady_clock) + : steady_clock_(steady_clock) {} bool AvailabilityStoreImpl::hasChunk(const CandidateHash &candidate_hash, ValidatorIndex index) const { @@ -49,7 +48,10 @@ namespace kagome::parachain { std::optional AvailabilityStoreImpl::getChunk(const CandidateHash &candidate_hash, ValidatorIndex index) const { - SL_TRACE(logger, "===> GET CHUNK: candidate_hash={}, index={}", candidate_hash, index); + SL_TRACE(logger, + "===> GET CHUNK: candidate_hash={}, index={}", + candidate_hash, + index); return state_.sharedAccess( [&](const auto &state) -> std::optional { @@ -122,9 +124,11 @@ namespace kagome::parachain { void AvailabilityStoreImpl::prune_candidates_no_lock(State &state) { const auto now = steady_clock_.nowUint64(); - while (!state.candidates_living_keeper_.empty() && state.candidates_living_keeper_[0].first + KEEP_CANDIDATES_TIMEOUT < now) { + while (!state.candidates_living_keeper_.empty() + && state.candidates_living_keeper_[0].first + KEEP_CANDIDATES_TIMEOUT + < now) { remove_no_lock(state, state.candidates_living_keeper_[0].second); - state.candidates_living_keeper_.pop_front(); + state.candidates_living_keeper_.pop_front(); } } @@ -138,12 +142,16 @@ namespace kagome::parachain { state.candidates_[relay_parent].insert(candidate_hash); auto &candidate_data = state.per_candidate_[candidate_hash]; for (auto &&chunk : std::move(chunks)) { - SL_TRACE(logger, "===> STORE CHUNK: candidate_hash={}, index={}", candidate_hash, chunk.index); + SL_TRACE(logger, + "===> STORE CHUNK: candidate_hash={}, index={}", + candidate_hash, + chunk.index); candidate_data.chunks[chunk.index] = std::move(chunk); } candidate_data.pov = pov; candidate_data.data = data; - state.candidates_living_keeper_.emplace_back(steady_clock_.nowUint64(), relay_parent); + state.candidates_living_keeper_.emplace_back(steady_clock_.nowUint64(), + relay_parent); }); } @@ -153,26 +161,30 @@ namespace kagome::parachain { state_.exclusiveAccess([&](auto &state) { prune_candidates_no_lock(state); state.candidates_[relay_parent].insert(candidate_hash); - SL_TRACE(logger, "===> STORE CHUNK: candidate_hash={}, index={}", candidate_hash, chunk.index); + SL_TRACE(logger, + "===> STORE CHUNK: candidate_hash={}, index={}", + candidate_hash, + chunk.index); state.per_candidate_[candidate_hash].chunks[chunk.index] = std::move(chunk); - state.candidates_living_keeper_.emplace_back(steady_clock_.nowUint64(), relay_parent); + state.candidates_living_keeper_.emplace_back(steady_clock_.nowUint64(), + relay_parent); }); } - void AvailabilityStoreImpl::remove_no_lock(State &state, const network::RelayHash &relay_parent) { - if (auto it = state.candidates_.find(relay_parent); - it != state.candidates_.end()) { - for (auto const &l : it->second) { - state.per_candidate_.erase(l); - } - state.candidates_.erase(it); + void AvailabilityStoreImpl::remove_no_lock( + State &state, const network::RelayHash &relay_parent) { + if (auto it = state.candidates_.find(relay_parent); + it != state.candidates_.end()) { + for (const auto &l : it->second) { + state.per_candidate_.erase(l); } + state.candidates_.erase(it); + } } void AvailabilityStoreImpl::remove(const network::RelayHash &relay_parent) { - state_.exclusiveAccess([&](auto &state) { - remove_no_lock(state, relay_parent); - }); + state_.exclusiveAccess( + [&](auto &state) { remove_no_lock(state, relay_parent); }); } } // namespace kagome::parachain diff --git a/core/parachain/availability/store/store_impl.hpp b/core/parachain/availability/store/store_impl.hpp index 48142b6214..7d77074dfd 100644 --- a/core/parachain/availability/store/store_impl.hpp +++ b/core/parachain/availability/store/store_impl.hpp @@ -53,7 +53,8 @@ namespace kagome::parachain { std::unordered_map per_candidate_{}; std::unordered_map> candidates_{}; - std::deque> candidates_living_keeper_; + std::deque> + candidates_living_keeper_; }; void prune_candidates_no_lock(State &state); diff --git a/core/parachain/validator/impl/candidates.hpp b/core/parachain/validator/impl/candidates.hpp index 9cb73a4a8d..977e9f7440 100644 --- a/core/parachain/validator/impl/candidates.hpp +++ b/core/parachain/validator/impl/candidates.hpp @@ -97,53 +97,59 @@ namespace kagome::parachain { }); } - bool has_claims() const { - return !claims.empty(); - } + bool has_claims() const { + return !claims.empty(); + } + template + void on_deactivate_leaves(std::span leaves, + F &&remove_parent_index, + D &&relay_parent_live) { + retain_if(claims, [&](const auto &c) { + if (std::forward(relay_parent_live)(c.second.relay_parent)) { + return true; + } - template - void on_deactivate_leaves(std::span leaves, F &&remove_parent_index, D &&relay_parent_live) { - retain_if(claims, [&](const auto &c) { - if (std::forward(relay_parent_live)(c.second.relay_parent)) { - return true; - } + if (c.second.parent_hash_and_id) { + const auto &pc = *c.second.parent_hash_and_id; + if (auto it_1 = parent_claims.find(pc.first); + it_1 != parent_claims.end()) { + if (auto it_2 = it_1->second.find(pc.second); + it_2 != it_1->second.end()) { + auto it = std::ranges::find_if(it_2->second, [&](const auto &x) { + return x.first == c.second.relay_parent; + }); + if (it != it_2->second.end()) { + const auto p = std::distance(it_2->second.begin(), it); + auto &sub_claims = it_2->second; + sub_claims[p].second -= 1; + if (sub_claims[p].second == 0) { + auto rem_it = sub_claims.begin(); + std::advance(rem_it, p); + sub_claims.erase(rem_it); + } + } - if (c.second.parent_hash_and_id) { - const auto &pc = *c.second.parent_hash_and_id; - if (auto it_1 = parent_claims.find(pc.first); it_1 != parent_claims.end()) { - if (auto it_2 = it_1->second.find(pc.second); it_2 != it_1->second.end()) { - auto it = std::ranges::find_if(it_2->second, [&](const auto &x) { return x.first == c.second.relay_parent; }); - if (it != it_2->second.end()) { - const auto p = std::distance(it_2->second.begin(), it); - auto &sub_claims = it_2->second; - sub_claims[p].second -= 1; - if (sub_claims[p].second == 0) { - auto rem_it = sub_claims.begin(); - std::advance(rem_it, p); - sub_claims.erase(rem_it); + if (it_2->second.empty()) { + std::forward(remove_parent_index)(pc.first, pc.second); + it_1->second.erase(it_2); } } - if (it_2->second.empty()) { - std::forward(remove_parent_index)(pc.first, pc.second); - it_1->second.erase(it_2); + if (it_1->second.empty()) { + parent_claims.erase(it_1); } } - - if (it_1->second.empty()) { - parent_claims.erase(it_1); - } } - } - return false; - }); + return false; + }); - retain_if(unconfirmed_importable_under, [&](const auto &pair) { - const auto &[l, props] = pair; - return (std::ranges::find(leaves, l) != leaves.end()) && std::forward(relay_parent_live)(props.relay_parent); - }); - } + retain_if(unconfirmed_importable_under, [&](const auto &pair) { + const auto &[l, props] = pair; + return (std::ranges::find(leaves, l) != leaves.end()) + && std::forward(relay_parent_live)(props.relay_parent); + }); + } void extend_hypotheticals( const CandidateHash &candidate_hash, @@ -493,46 +499,53 @@ namespace kagome::parachain { } template - void on_deactivate_leaves(std::span leaves, F &&relay_parent_live) { - auto remove_parent_claims = [&](const auto &c_hash, const auto &parent_hash, const auto id) { - if (auto it_1 = utils::get_it(by_parent, parent_hash)) { - if (auto it_2 = utils::get_it((*it_1)->second, id)) { - (*it_2)->second.erase(c_hash); - if ((*it_2)->second.empty()) { - (*it_1)->second.erase(*it_2); + void on_deactivate_leaves(std::span leaves, + F &&relay_parent_live) { + auto remove_parent_claims = + [&](const auto &c_hash, const auto &parent_hash, const auto id) { + if (auto it_1 = utils::get_it(by_parent, parent_hash)) { + if (auto it_2 = utils::get_it((*it_1)->second, id)) { + (*it_2)->second.erase(c_hash); + if ((*it_2)->second.empty()) { + (*it_1)->second.erase(*it_2); + } + } + if ((*it_1)->second.empty()) { + by_parent.erase(*it_1); + } } - } - if ((*it_1)->second.empty()) { - by_parent.erase(*it_1); - } - } - }; - + }; + retain_if(candidates, [&](auto &pair) { auto &[c_hash, state] = pair; - return visit_in_place(state, - [&](ConfirmedCandidate &c){ - if (!std::forward(relay_parent_live)(c.relay_parent())) { - remove_parent_claims(c_hash, c.parent_head_data_hash(), c.para_id()); - return false; - } + return visit_in_place( + state, + [&](ConfirmedCandidate &c) { + if (!std::forward(relay_parent_live)(c.relay_parent())) { + remove_parent_claims( + c_hash, c.parent_head_data_hash(), c.para_id()); + return false; + } - for (const auto &leaf_hash : leaves) { - c.importable_under.erase(leaf_hash); - } - return true; - }, - [&](UnconfirmedCandidate &c){ - c.on_deactivate_leaves(leaves, [&](const auto &parent_hash, const auto &id) { - return remove_parent_claims(c_hash, parent_hash, id); - }, std::forward(relay_parent_live)); - return c.has_claims(); - }); + for (const auto &leaf_hash : leaves) { + c.importable_under.erase(leaf_hash); + } + return true; + }, + [&](UnconfirmedCandidate &c) { + c.on_deactivate_leaves( + leaves, + [&](const auto &parent_hash, const auto &id) { + return remove_parent_claims(c_hash, parent_hash, id); + }, + std::forward(relay_parent_live)); + return c.has_claims(); + }); }); - SL_TRACE(logger, "Candidates remaining after cleanup: {}", candidates.size()); + SL_TRACE( + logger, "Candidates remaining after cleanup: {}", candidates.size()); } - }; } // namespace kagome::parachain diff --git a/core/parachain/validator/impl/parachain_processor.cpp b/core/parachain/validator/impl/parachain_processor.cpp index a95d40a52a..0b4fdb8805 100644 --- a/core/parachain/validator/impl/parachain_processor.cpp +++ b/core/parachain/validator/impl/parachain_processor.cpp @@ -345,12 +345,13 @@ namespace kagome::parachain { event.view.finalized_number_, event.view.heads_.size()); broadcastView(event.view); - //broadcastViewToGroup(relay_parent, event.view); + // broadcastViewToGroup(relay_parent, event.view); handle_active_leaves_update_for_validator(event, std::move(pruned)); } - void ParachainProcessorImpl::handle_active_leaves_update_for_validator(const network::ExView &event, std::vector pruned_h) { + void ParachainProcessorImpl::handle_active_leaves_update_for_validator( + const network::ExView &event, std::vector pruned_h) { const auto current_leaves = our_current_state_.validator_side.active_leaves; std::unordered_map removed; for (const auto &[h, m] : current_leaves) { @@ -366,19 +367,22 @@ namespace kagome::parachain { } for (const auto &leaf : added) { - const auto mode = prospective_parachains_->prospectiveParachainsMode(leaf); + const auto mode = + prospective_parachains_->prospectiveParachainsMode(leaf); our_current_state_.validator_side.active_leaves[leaf] = mode; } for (const auto &[removed, mode] : removed) { our_current_state_.validator_side.active_leaves.erase(removed); - const std::vector pruned = mode ? std::move(pruned_h) : std::vector{removed}; + const std::vector pruned = + mode ? std::move(pruned_h) : std::vector{removed}; for (const auto &removed : pruned) { our_current_state_.state_by_relay_parent.erase(removed); { /// remove cancelations - auto &container = our_current_state_.collation_requests_cancel_handles; + auto &container = + our_current_state_.collation_requests_cancel_handles; for (auto pc = container.begin(); pc != container.end();) { if (pc->relay_parent != removed) { ++pc; @@ -388,7 +392,8 @@ namespace kagome::parachain { } } { /// remove fetched candidates - auto &container = our_current_state_.validator_side.fetched_candidates; + auto &container = + our_current_state_.validator_side.fetched_candidates; for (auto pc = container.begin(); pc != container.end();) { if (pc->first.relay_parent != removed) { ++pc; @@ -400,13 +405,15 @@ namespace kagome::parachain { } } - retain_if(our_current_state_.validator_side.blocked_from_seconding, [&](auto &pair) { - auto &collations = pair.second; - retain_if(collations, [&](const auto &collation) { - return our_current_state_.state_by_relay_parent.contains(collation.candidate_receipt.descriptor.relay_parent); - }); - return !collations.empty(); - }); + retain_if(our_current_state_.validator_side.blocked_from_seconding, + [&](auto &pair) { + auto &collations = pair.second; + retain_if(collations, [&](const auto &collation) { + return our_current_state_.state_by_relay_parent.contains( + collation.candidate_receipt.descriptor.relay_parent); + }); + return !collations.empty(); + }); prune_old_advertisements(*our_current_state_.implicit_view, our_current_state_.validator_side.active_leaves, @@ -426,7 +433,7 @@ namespace kagome::parachain { backing_store_->onDeactivateLeaf(lost.hash); bitfield_store_->remove(lost.hash); - //av_store_->remove(lost); + // av_store_->remove(lost); } } @@ -489,8 +496,8 @@ namespace kagome::parachain { network::ViewUpdate{.view = view}); pm_->getStreamEngine()->broadcast(router_->getCollationProtocolVStaging(), msg); -// pm_->getStreamEngine()->broadcast(router_->getValidationProtocolVStaging(), -// msg); + // pm_->getStreamEngine()->broadcast(router_->getValidationProtocolVStaging(), + // msg); } outcome::result> @@ -638,9 +645,15 @@ namespace kagome::parachain { return Error::NOT_A_VALIDATOR; } - OUTCOME_TRY(per_session_state, per_session->get_or_insert(session_index, [&]() -> outcome::result::RefObj> { - return outcome::success(RefCache::RefObj(session_index, *session_info)); - })); + OUTCOME_TRY(per_session_state, + per_session->get_or_insert( + session_index, + [&]() -> outcome::result< + RefCache::RefObj> { + return outcome::success( + RefCache::RefObj( + session_index, *session_info)); + })); const auto n_cores = cores.size(); std::unordered_map> out_groups; @@ -895,33 +908,52 @@ namespace kagome::parachain { return pruned; } -void ParachainProcessorImpl::second_unblocked_collations(ParachainId para_id, const HeadData &head_data, const Hash &head_data_hash) { - auto unblocked_collations_it = our_current_state_.validator_side.blocked_from_seconding.find(BlockedCollationId { - .para_id = para_id, - .parent_head_data_hash = head_data_hash, - }); + void ParachainProcessorImpl::second_unblocked_collations( + ParachainId para_id, + const HeadData &head_data, + const Hash &head_data_hash) { + auto unblocked_collations_it = + our_current_state_.validator_side.blocked_from_seconding.find( + BlockedCollationId{ + .para_id = para_id, + .parent_head_data_hash = head_data_hash, + }); - if (unblocked_collations_it != our_current_state_.validator_side.blocked_from_seconding.end()) { - auto &unblocked_collations = unblocked_collations_it->second; + if (unblocked_collations_it + != our_current_state_.validator_side.blocked_from_seconding.end()) { + auto &unblocked_collations = unblocked_collations_it->second; - if (!unblocked_collations.empty()) { - SL_TRACE(logger_, "Candidate outputting head data with hash {} unblocked {} collations for seconding.", head_data_hash, unblocked_collations.size()); - } + if (!unblocked_collations.empty()) { + SL_TRACE(logger_, + "Candidate outputting head data with hash {} unblocked {} " + "collations for seconding.", + head_data_hash, + unblocked_collations.size()); + } - for (auto &unblocked_collation : unblocked_collations) { - unblocked_collation.maybe_parent_head_data = head_data; - const auto peer_id = unblocked_collation.collation_event.pending_collation.peer_id; - const auto relay_parent = unblocked_collation.candidate_receipt.descriptor.relay_parent; + for (auto &unblocked_collation : unblocked_collations) { + unblocked_collation.maybe_parent_head_data = head_data; + const auto peer_id = + unblocked_collation.collation_event.pending_collation.peer_id; + const auto relay_parent = + unblocked_collation.candidate_receipt.descriptor.relay_parent; - if (auto res = kick_off_seconding(std::move(unblocked_collation)); res.has_error()) { - SL_WARN(logger_, "Seconding aborted due to an error. (relay_parent={}, para_id={}, peer_id={}, error={})", relay_parent, para_id, peer_id, res.error()); + if (auto res = kick_off_seconding(std::move(unblocked_collation)); + res.has_error()) { + SL_WARN(logger_, + "Seconding aborted due to an error. (relay_parent={}, " + "para_id={}, peer_id={}, error={})", + relay_parent, + para_id, + peer_id, + res.error()); + } } - } - our_current_state_.validator_side.blocked_from_seconding.erase(unblocked_collations_it); + our_current_state_.validator_side.blocked_from_seconding.erase( + unblocked_collations_it); + } } -} - void ParachainProcessorImpl::handle_collation_fetch_response( network::CollationEvent &&collation_event, @@ -1031,7 +1063,9 @@ void ParachainProcessorImpl::second_unblocked_collations(ParachainId para_id, co 32, crypto::Blake2b_StreamHasher<32>> &persisted_validation_data, - std::optional, std::reference_wrapper>> maybe_parent_head_and_hash) { + std::optional, + std::reference_wrapper>> + maybe_parent_head_and_hash) { if (persisted_validation_data.getHash() != fetched.descriptor.persisted_data_hash) { return Error::PERSISTED_VALIDATION_DATA_MISMATCH; @@ -1462,7 +1496,10 @@ void ParachainProcessorImpl::second_unblocked_collations(ParachainId para_id, co outcome::result ParachainProcessorImpl::OnFetchChunkRequest( const network::FetchChunkRequest &request) { - SL_TRACE(logger_, "===> REQUEST V2 CHUNK: candidate_hash={}, index={}", request.candidate, request.chunk_index); + SL_TRACE(logger_, + "===> REQUEST V2 CHUNK: candidate_hash={}, index={}", + request.candidate, + request.chunk_index); if (auto chunk = av_store_->getChunk(request.candidate, request.chunk_index)) { return network::Chunk{ @@ -1477,7 +1514,10 @@ void ParachainProcessorImpl::second_unblocked_collations(ParachainId para_id, co outcome::result ParachainProcessorImpl::OnFetchChunkRequestObsolete( const network::FetchChunkRequest &request) { - SL_TRACE(logger_, "===> REQUEST V1 CHUNK: candidate_hash={}, index={}", request.candidate, request.chunk_index); + SL_TRACE(logger_, + "===> REQUEST V1 CHUNK: candidate_hash={}, index={}", + request.candidate, + request.chunk_index); if (auto chunk = av_store_->getChunk(request.candidate, request.chunk_index)) { // This check needed because v1 protocol mustn't have chunk mapping @@ -2351,9 +2391,9 @@ void ParachainProcessorImpl::second_unblocked_collations(ParachainId para_id, co CHECK_OR_RET(canProcessParachains().has_value()); SL_DEBUG(logger_, - "Send my view.(peer={}, protocol={})", - peer_id, - protocol->protocolName()); + "Send my view.(peer={}, protocol={})", + peer_id, + protocol->protocolName()); pm_->getStreamEngine()->send( peer_id, protocol, @@ -2504,10 +2544,12 @@ void ParachainProcessorImpl::second_unblocked_collations(ParachainId para_id, co REINVOKE_ONCE( *main_pool_handler_, notifyInvalid, parent, candidate_receipt); - our_current_state_.validator_side.blocked_from_seconding.erase(BlockedCollationId { - .para_id = candidate_receipt.descriptor.para_id, - .parent_head_data_hash = candidate_receipt.descriptor.para_head_hash, - }); + our_current_state_.validator_side.blocked_from_seconding.erase( + BlockedCollationId{ + .para_id = candidate_receipt.descriptor.para_id, + .parent_head_data_hash = + candidate_receipt.descriptor.para_head_hash, + }); auto fetched_collation = network::FetchedCollation::from(candidate_receipt, *hasher_); @@ -2553,8 +2595,10 @@ void ParachainProcessorImpl::second_unblocked_collations(ParachainId para_id, co return; } - auto output_head_data = seconded->get().committed_receipt.commitments.para_head; - auto output_head_data_hash = seconded->get().committed_receipt.descriptor.para_head_hash; + auto output_head_data = + seconded->get().committed_receipt.commitments.para_head; + auto output_head_data_hash = + seconded->get().committed_receipt.descriptor.para_head_hash; auto fetched_collation = network::FetchedCollation::from( seconded->get().committed_receipt.to_plain(*hasher_), *hasher_); auto it = our_current_state_.validator_side.fetched_candidates.find( @@ -2591,10 +2635,7 @@ void ParachainProcessorImpl::second_unblocked_collations(ParachainId para_id, co } second_unblocked_collations( - fetched_collation.para_id, - output_head_data, - output_head_data_hash - ); + fetched_collation.para_id, output_head_data, output_head_data_hash); const auto maybe_candidate_hash = utils::map( prospective_candidate, [](const auto &v) { return v.candidate_hash; }); @@ -2979,11 +3020,12 @@ void ParachainProcessorImpl::second_unblocked_collations(ParachainId para_id, co return Error::UNDECLARED_COLLATOR; } - if (!isRelayParentInImplicitView(on_relay_parent, - relay_parent_mode, - *our_current_state_.implicit_view, - our_current_state_.validator_side.active_leaves, - peer_data.collator_state->para_id)) { + if (!isRelayParentInImplicitView( + on_relay_parent, + relay_parent_mode, + *our_current_state_.implicit_view, + our_current_state_.validator_side.active_leaves, + peer_data.collator_state->para_id)) { SL_TRACE(logger_, "Out of view. (relay_parent={})", on_relay_parent); return Error::OUT_OF_VIEW; } @@ -3046,7 +3088,8 @@ void ParachainProcessorImpl::second_unblocked_collations(ParachainId para_id, co std::optional maybe_pvd; std::optional maybe_parent_head_hash; - std::optional &maybe_parent_head = pending_collation_fetch.maybe_parent_head_data; + std::optional &maybe_parent_head = + pending_collation_fetch.maybe_parent_head_data; if (is_collator_v2 && have_prospective_candidate && async_backing_en) { OUTCOME_TRY(pvd, @@ -3059,7 +3102,9 @@ void ParachainProcessorImpl::second_unblocked_collations(ParachainId para_id, co maybe_pvd = pvd; if (pending_collation_fetch.maybe_parent_head_data) { - maybe_parent_head_hash.emplace(collation_event.pending_collation.prospective_candidate->parent_head_data_hash); + maybe_parent_head_hash.emplace( + collation_event.pending_collation.prospective_candidate + ->parent_head_data_hash); } } else if ((is_collator_v2 && have_prospective_candidate) || !is_collator_v2) { @@ -3077,25 +3122,29 @@ void ParachainProcessorImpl::second_unblocked_collations(ParachainId para_id, co std::optional> pvd; if (maybe_pvd) { pvd = *maybe_pvd; - } else if(!maybe_pvd && !maybe_parent_head && maybe_parent_head_hash) { - const network::PendingCollationFetch blocked_collation { + } else if (!maybe_pvd && !maybe_parent_head && maybe_parent_head_hash) { + const network::PendingCollationFetch blocked_collation{ .collation_event = std::move(collation_event), - .candidate_receipt = std::move(pending_collation_fetch.candidate_receipt), + .candidate_receipt = + std::move(pending_collation_fetch.candidate_receipt), .pov = std::move(pending_collation_fetch.pov), .maybe_parent_head_data = std::nullopt, - }; - SL_TRACE(logger_, - "Collation having parent head data hash {} is blocked from seconding. Waiting on its parent to be validated. (candidate_hash={}, relay_parent={})", - *maybe_parent_head_hash, - blocked_collation.candidate_receipt.hash(*hasher_), - blocked_collation.candidate_receipt.descriptor.relay_parent - ); - our_current_state_.validator_side.blocked_from_seconding[BlockedCollationId { - .para_id = blocked_collation.candidate_receipt.descriptor.para_id, - .parent_head_data_hash = *maybe_parent_head_hash, - }].emplace_back(blocked_collation); - - return outcome::success(false); + }; + SL_TRACE(logger_, + "Collation having parent head data hash {} is blocked from " + "seconding. Waiting on its parent to be validated. " + "(candidate_hash={}, relay_parent={})", + *maybe_parent_head_hash, + blocked_collation.candidate_receipt.hash(*hasher_), + blocked_collation.candidate_receipt.descriptor.relay_parent); + our_current_state_.validator_side + .blocked_from_seconding[BlockedCollationId{ + .para_id = blocked_collation.candidate_receipt.descriptor.para_id, + .parent_head_data_hash = *maybe_parent_head_hash, + }] + .emplace_back(blocked_collation); + + return outcome::success(false); } else { return Error::PERSISTED_VALIDATION_DATA_NOT_FOUND; } @@ -3104,7 +3153,11 @@ void ParachainProcessorImpl::second_unblocked_collations(ParachainId para_id, co collation_event.pending_collation, pending_collation_fetch.candidate_receipt, pvd->get(), - maybe_parent_head && maybe_parent_head_hash ? std::make_pair(std::cref(*maybe_parent_head), std::cref(*maybe_parent_head_hash)) : std::optional, std::reference_wrapper>>{})); + maybe_parent_head && maybe_parent_head_hash + ? std::make_pair(std::cref(*maybe_parent_head), + std::cref(*maybe_parent_head_hash)) + : std::optional, + std::reference_wrapper>>{})); collations.status = CollationStatus::WaitingOnValidation; validateAsync( diff --git a/core/parachain/validator/network_bridge.hpp b/core/parachain/validator/network_bridge.hpp index c79b111db5..6c86b91281 100644 --- a/core/parachain/validator/network_bridge.hpp +++ b/core/parachain/validator/network_bridge.hpp @@ -53,7 +53,9 @@ namespace kagome::parachain { } template - requires requires { std::is_same_v; } + requires requires { + std::is_same_v; + } void send_to_peers(Container peers, const std::shared_ptr &protocol, const std::shared_ptr &message) { diff --git a/core/parachain/validator/parachain_processor.hpp b/core/parachain/validator/parachain_processor.hpp index eca869232d..df5952ef96 100644 --- a/core/parachain/validator/parachain_processor.hpp +++ b/core/parachain/validator/parachain_processor.hpp @@ -91,15 +91,15 @@ namespace kagome::dispute { } namespace kagome::parachain { - struct BlockedCollationId { - /// Para id. - ParachainId para_id; - /// Hash of the parent head data. - Hash parent_head_data_hash; + struct BlockedCollationId { + /// Para id. + ParachainId para_id; + /// Hash of the parent head data. + Hash parent_head_data_hash; - constexpr auto operator<=>(const BlockedCollationId &) const = default; - }; -} + constexpr auto operator<=>(const BlockedCollationId &) const = default; + }; +} // namespace kagome::parachain template <> struct std::hash { @@ -589,7 +589,9 @@ namespace kagome::parachain { 32, crypto::Blake2b_StreamHasher<32>> &persisted_validation_data, - std::optional, std::reference_wrapper>> maybe_parent_head_and_hash); + std::optional, + std::reference_wrapper>> + maybe_parent_head_and_hash); outcome::result> fetchPersistedValidationData(const RelayHash &relay_parent, @@ -599,8 +601,11 @@ namespace kagome::parachain { void onAttestNoPoVComplete(const network::RelayHash &relay_parent, const CandidateHash &candidate_hash); - /// Try seconding any collations which were waiting on the validation of their parent - void second_unblocked_collations(ParachainId para_id, const HeadData &head_data, const Hash &head_data_hash); + /// Try seconding any collations which were waiting on the validation of + /// their parent + void second_unblocked_collations(ParachainId para_id, + const HeadData &head_data, + const Hash &head_data_hash); void kickOffValidationWork( const RelayHash &relay_parent, @@ -699,7 +704,8 @@ namespace kagome::parachain { void onDeactivateBlocks( const primitives::events::RemoveAfterFinalizationParams &event); - void handle_active_leaves_update_for_validator(const network::ExView &event, std::vector pruned); + void handle_active_leaves_update_for_validator(const network::ExView &event, + std::vector pruned); void onViewUpdated(const network::ExView &event); void OnBroadcastBitfields(const primitives::BlockHash &relay_parent, const network::SignedBitfield &bitfield); @@ -879,7 +885,9 @@ namespace kagome::parachain { std::unordered_map active_leaves; std::unordered_map fetched_candidates; - std::unordered_map> blocked_from_seconding; + std::unordered_map> + blocked_from_seconding; } validator_side; } our_current_state_; diff --git a/core/parachain/validator/statement_distribution/peer_state.hpp b/core/parachain/validator/statement_distribution/peer_state.hpp index 75cb97d1d8..199c47bd71 100644 --- a/core/parachain/validator/statement_distribution/peer_state.hpp +++ b/core/parachain/validator/statement_distribution/peer_state.hpp @@ -24,14 +24,15 @@ namespace kagome::parachain { namespace kagome::parachain::statement_distribution { -struct PeerState { + struct PeerState { network::View view; std::unordered_set implicit_view; /// Update the view, returning a vector of implicit relay-parents which /// weren't previously part of the view. std::vector update_view( - const network::View &new_view, const parachain::ImplicitView &local_implicit) { + const network::View &new_view, + const parachain::ImplicitView &local_implicit) { std::unordered_set next_implicit; for (const auto &x : new_view.heads_) { auto t = @@ -82,7 +83,6 @@ struct PeerState { } return v; } - }; -} +} // namespace kagome::parachain::statement_distribution diff --git a/core/parachain/validator/statement_distribution/per_relay_parent_state.hpp b/core/parachain/validator/statement_distribution/per_relay_parent_state.hpp index c8e0c7f4b6..cfe90c8480 100644 --- a/core/parachain/validator/statement_distribution/per_relay_parent_state.hpp +++ b/core/parachain/validator/statement_distribution/per_relay_parent_state.hpp @@ -11,13 +11,13 @@ #include #include +#include "common/ref_cache.hpp" #include "parachain/backing/cluster.hpp" #include "parachain/backing/grid_tracker.hpp" #include "parachain/types.hpp" #include "parachain/validator/impl/statements_store.hpp" -#include "utils/safe_object.hpp" #include "parachain/validator/statement_distribution/per_session_state.hpp" -#include "common/ref_cache.hpp" +#include "utils/safe_object.hpp" namespace kagome::parachain::statement_distribution { @@ -48,7 +48,7 @@ namespace kagome::parachain::statement_distribution { std::unordered_map> groups_per_para; std::unordered_set disabled_validators; std::shared_ptr::RefObj> - per_session_state; + per_session_state; std::optional> active_validator_state() { diff --git a/core/parachain/validator/statement_distribution/statement_distribution.cpp b/core/parachain/validator/statement_distribution/statement_distribution.cpp index 7588f4007d..90bf47326e 100644 --- a/core/parachain/validator/statement_distribution/statement_distribution.cpp +++ b/core/parachain/validator/statement_distribution/statement_distribution.cpp @@ -142,9 +142,13 @@ namespace kagome::parachain::statement_distribution { block_tree(_block_tree), slots_util(_slots_util), babe_config_repo(std::move(_babe_config_repo)), - peer_state_sub(std::make_shared(_peer_events_engine, false)), - my_view_sub(std::make_shared(_peer_view->getMyViewObservable(), false)), - remote_view_sub(std::make_shared(_peer_view->getRemoteViewObservable(), false)) { + peer_state_sub( + std::make_shared( + _peer_events_engine, false)), + my_view_sub(std::make_shared( + _peer_view->getMyViewObservable(), false)), + remote_view_sub(std::make_shared( + _peer_view->getRemoteViewObservable(), false)) { BOOST_ASSERT(per_session); BOOST_ASSERT(signer_factory); BOOST_ASSERT(peer_use_count); @@ -165,100 +169,132 @@ namespace kagome::parachain::statement_distribution { BOOST_ASSERT(remote_view_sub); } - bool StatementDistribution::tryStart() { SL_INFO(logger, "StatementDistribution subsystem started."); primitives::events::subscribe( *remote_view_sub, network::PeerView::EventType::kViewUpdated, - [wptr{weak_from_this()}](const libp2p::peer::PeerId &peer_id, const network::View &view) { + [wptr{weak_from_this()}](const libp2p::peer::PeerId &peer_id, + const network::View &view) { TRY_GET_OR_RET(self, wptr.lock()); self->handle_peer_view_update(peer_id, view); }); - peer_state_sub->setCallback([wptr{weak_from_this()}](subscription::SubscriptionSetId, auto &, const auto ev_key, const libp2p::peer::PeerId &peer) { - TRY_GET_OR_RET(self, wptr.lock()); - switch (ev_key) { - case primitives::events::PeerEventType::kConnected: return self->on_peer_connected(peer); - case primitives::events::PeerEventType::kDisconnected: return self->on_peer_disconnected(peer); - default: break; - } - }); - peer_state_sub->subscribe(peer_state_sub->generateSubscriptionSetId(), primitives::events::PeerEventType::kConnected); - peer_state_sub->subscribe(peer_state_sub->generateSubscriptionSetId(), primitives::events::PeerEventType::kDisconnected); + peer_state_sub->setCallback( + [wptr{weak_from_this()}](subscription::SubscriptionSetId, + auto &, + const auto ev_key, + const libp2p::peer::PeerId &peer) { + TRY_GET_OR_RET(self, wptr.lock()); + switch (ev_key) { + case primitives::events::PeerEventType::kConnected: + return self->on_peer_connected(peer); + case primitives::events::PeerEventType::kDisconnected: + return self->on_peer_disconnected(peer); + default: + break; + } + }); + peer_state_sub->subscribe(peer_state_sub->generateSubscriptionSetId(), + primitives::events::PeerEventType::kConnected); + peer_state_sub->subscribe(peer_state_sub->generateSubscriptionSetId(), + primitives::events::PeerEventType::kDisconnected); primitives::events::subscribe( *my_view_sub, network::PeerView::EventType::kViewUpdated, [wptr{weak_from_this()}](const network::ExView &event) { TRY_GET_OR_RET(self, wptr.lock()); - if (auto result = self->handle_view_event(event); result.has_error()) { + if (auto result = self->handle_view_event(event); + result.has_error()) { SL_ERROR(self->logger, - "Handle view event failed. (relay parent={})", - event.new_head.hash()); + "Handle view event failed. (relay parent={})", + event.new_head.hash()); } }); return true; } - void StatementDistribution::on_peer_connected(const libp2p::peer::PeerId &peer) { + void StatementDistribution::on_peer_connected( + const libp2p::peer::PeerId &peer) { REINVOKE(*statements_distribution_thread_handler, on_peer_connected, peer); auto _ = peers[peer]; } - void StatementDistribution::on_peer_disconnected(const libp2p::peer::PeerId &peer) { - REINVOKE(*statements_distribution_thread_handler, on_peer_disconnected, peer); + void StatementDistribution::on_peer_disconnected( + const libp2p::peer::PeerId &peer) { + REINVOKE( + *statements_distribution_thread_handler, on_peer_disconnected, peer); peers.erase(peer); } outcome::result> StatementDistribution::is_parachain_validator( const primitives::BlockHash &relay_parent) const { - BOOST_ASSERT(main_pool_handler->isInCurrentThread()); + BOOST_ASSERT(main_pool_handler->isInCurrentThread()); return signer_factory->at(relay_parent); } - outcome::result StatementDistribution::handle_view_event(const network::ExView &event) { - BOOST_ASSERT(main_pool_handler->isInCurrentThread()); - OUTCOME_TRY(new_relay_parents, implicit_view.exclusiveAccess([&](auto &iv) -> outcome::result> { - OUTCOME_TRY(iv.activate_leaf(event.new_head.hash())); - return outcome::success(iv.all_allowed_relay_parents()); - })); - + outcome::result StatementDistribution::handle_view_event( + const network::ExView &event) { + BOOST_ASSERT(main_pool_handler->isInCurrentThread()); + OUTCOME_TRY(new_relay_parents, + implicit_view.exclusiveAccess( + [&](auto &iv) -> outcome::result> { + OUTCOME_TRY(iv.activate_leaf(event.new_head.hash())); + return outcome::success(iv.all_allowed_relay_parents()); + })); + std::vector new_contexts; new_contexts.reserve(new_relay_parents.size()); - SL_TRACE(logger, "===> (relay_parent={}, new_relay_parents={})", event.new_head.hash(), new_relay_parents.size()); + SL_TRACE(logger, + "===> (relay_parent={}, new_relay_parents={})", + event.new_head.hash(), + new_relay_parents.size()); for (const auto &new_relay_parent : new_relay_parents) { SL_TRACE(logger, "===> (new_relay_parent={})", new_relay_parent); OUTCOME_TRY(v_index, - signer_factory->getAuthorityValidatorIndex( - new_relay_parent)); - OUTCOME_TRY(validator, - is_parachain_validator(new_relay_parent)); - SL_TRACE(logger, "===> (new_relay_parent={}, validator={})", new_relay_parent, validator ? "YES" : "NO"); + signer_factory->getAuthorityValidatorIndex(new_relay_parent)); + OUTCOME_TRY(validator, is_parachain_validator(new_relay_parent)); + SL_TRACE(logger, + "===> (new_relay_parent={}, validator={})", + new_relay_parent, + validator ? "YES" : "NO"); const auto validator_index = utils::map( validator, [](const auto &signer) { return signer.validatorIndex(); }); if (validator_index) { - SL_TRACE(logger, "===> (new_relay_parent={}, index={})", new_relay_parent, *validator_index); + SL_TRACE(logger, + "===> (new_relay_parent={}, index={})", + new_relay_parent, + *validator_index); } else { - SL_TRACE(logger, "===> (new_relay_parent={}, index={})", new_relay_parent, "NO"); + SL_TRACE(logger, + "===> (new_relay_parent={}, index={})", + new_relay_parent, + "NO"); } if (v_index) { - SL_TRACE(logger, "===> (new_relay_parent={}, v_index={})", new_relay_parent, *v_index); + SL_TRACE(logger, + "===> (new_relay_parent={}, v_index={})", + new_relay_parent, + *v_index); } else { - SL_TRACE(logger, "===> (new_relay_parent={}, v_index={})", new_relay_parent, "NO"); + SL_TRACE(logger, + "===> (new_relay_parent={}, v_index={})", + new_relay_parent, + "NO"); } - new_contexts.emplace_back(RelayParentContext { - .relay_parent = new_relay_parent, - .validator_index = validator_index, - .v_index = v_index, + new_contexts.emplace_back(RelayParentContext{ + .relay_parent = new_relay_parent, + .validator_index = validator_index, + .v_index = v_index, }); } @@ -267,28 +303,46 @@ namespace kagome::parachain::statement_distribution { } void StatementDistribution::handle_active_leaves_update( - const network::ExView &event, std::vector new_contexts) { - REINVOKE(*statements_distribution_thread_handler, handle_active_leaves_update, event, std::move(new_contexts)); - SL_TRACE(logger, "New leaf update. (relay_parent={}, height={})", event.new_head.hash(), event.new_head.number); - if (auto res = handle_active_leaves_update_inner(event, std::move(new_contexts)); res.has_error()) { - SL_ERROR(logger, - "Handle active leaf update inner failed. (relay parent={}, error={})", - event.new_head.hash(), res.error()); + const network::ExView &event, + std::vector new_contexts) { + REINVOKE(*statements_distribution_thread_handler, + handle_active_leaves_update, + event, + std::move(new_contexts)); + SL_TRACE(logger, + "New leaf update. (relay_parent={}, height={})", + event.new_head.hash(), + event.new_head.number); + if (auto res = + handle_active_leaves_update_inner(event, std::move(new_contexts)); + res.has_error()) { + SL_ERROR( + logger, + "Handle active leaf update inner failed. (relay parent={}, error={})", + event.new_head.hash(), + res.error()); } - if (auto res = handle_deactive_leaves_update_inner(event.lost); res.has_error()) { + if (auto res = handle_deactive_leaves_update_inner(event.lost); + res.has_error()) { SL_ERROR(logger, - "Handle deactive leaf update inner failed. (relay parent={}, error={})", - event.new_head.hash(), res.error()); + "Handle deactive leaf update inner failed. (relay parent={}, " + "error={})", + event.new_head.hash(), + res.error()); } - if (auto res = update_our_view(event.new_head.hash(), event.view); res.has_error()) { + if (auto res = update_our_view(event.new_head.hash(), event.view); + res.has_error()) { SL_ERROR(logger, - "Update our view failed. (relay parent={}, error={})", - event.new_head.hash(), res.error()); + "Update our view failed. (relay parent={}, error={})", + event.new_head.hash(), + res.error()); } } -outcome::result StatementDistribution::handle_active_leaves_update_inner( - const network::ExView &event, std::vector new_contexts) { + outcome::result + StatementDistribution::handle_active_leaves_update_inner( + const network::ExView &event, + std::vector new_contexts) { BOOST_ASSERT(statements_distribution_thread_handler->isInCurrentThread()); const auto &relay_parent = event.new_head.hash(); @@ -315,72 +369,72 @@ outcome::result StatementDistribution::handle_active_leaves_update_inner( OUTCOME_TRY(session_index, parachain_host->session_index_for_child(new_relay_parent)); - auto per_session_state = - per_session->get_or_insert( - session_index, - [&]() -> outcome::result< - RefCache::RefObj> { - OUTCOME_TRY(session_info, - parachain_host->session_info(new_relay_parent, - session_index)); - if (!v_index) { - SL_TRACE(logger, "Not a validator. (new_relay_parent={})", new_relay_parent); - return outcome::failure(Error::NOT_A_VALIDATOR); - } + auto per_session_state = per_session->get_or_insert( + session_index, + [&]() -> outcome::result< + RefCache::RefObj> { + OUTCOME_TRY( + session_info, + parachain_host->session_info(new_relay_parent, session_index)); + if (!v_index) { + SL_TRACE(logger, + "Not a validator. (new_relay_parent={})", + new_relay_parent); + return outcome::failure(Error::NOT_A_VALIDATOR); + } - uint32_t minimum_backing_votes = 2; /// legacy value - if (auto r = parachain_host->minimum_backing_votes( - new_relay_parent, session_index); - r.has_value()) { - minimum_backing_votes = r.value(); - } else { - SL_TRACE( - logger, - "Querying the backing threshold from the runtime is not " - "supported by the current Runtime API. " - "(new_relay_parent={})", - new_relay_parent); - } + uint32_t minimum_backing_votes = 2; /// legacy value + if (auto r = parachain_host->minimum_backing_votes(new_relay_parent, + session_index); + r.has_value()) { + minimum_backing_votes = r.value(); + } else { + SL_TRACE(logger, + "Querying the backing threshold from the runtime is not " + "supported by the current Runtime API. " + "(new_relay_parent={})", + new_relay_parent); + } - OUTCOME_TRY(block_header, - block_tree->getBlockHeader(new_relay_parent)); - OUTCOME_TRY(babe_header, - consensus::babe::getBabeBlockHeader(block_header)); - OUTCOME_TRY( - epoch, - slots_util.get()->slotToEpoch(*block_header.parentInfo(), - babe_header.slot_number)); - OUTCOME_TRY(babe_config, - babe_config_repo->config(*block_header.parentInfo(), - epoch)); - std::unordered_map - authority_lookup; - for (ValidatorIndex v = 0; - v < session_info->discovery_keys.size(); - ++v) { - authority_lookup[session_info->discovery_keys[v]] = v; - } + OUTCOME_TRY(block_header, + block_tree->getBlockHeader(new_relay_parent)); + OUTCOME_TRY(babe_header, + consensus::babe::getBabeBlockHeader(block_header)); + OUTCOME_TRY( + epoch, + slots_util.get()->slotToEpoch(*block_header.parentInfo(), + babe_header.slot_number)); + OUTCOME_TRY( + babe_config, + babe_config_repo->config(*block_header.parentInfo(), epoch)); + std::unordered_map + authority_lookup; + for (ValidatorIndex v = 0; v < session_info->discovery_keys.size(); + ++v) { + authority_lookup[session_info->discovery_keys[v]] = v; + } - grid::Views grid_view = grid::makeViews( - session_info->validator_groups, - grid::shuffle(session_info->discovery_keys.size(), - babe_config->randomness), - *v_index); - - return outcome::success( - RefCache::RefObj( - session_index, - *session_info, - Groups{session_info->validator_groups, - minimum_backing_votes}, - std::move(grid_view), - validator_index, - peer_use_count, - std::move(authority_lookup))); - }); + grid::Views grid_view = grid::makeViews( + session_info->validator_groups, + grid::shuffle(session_info->discovery_keys.size(), + babe_config->randomness), + *v_index); + + return outcome::success( + RefCache::RefObj( + session_index, + *session_info, + Groups{session_info->validator_groups, + minimum_backing_votes}, + std::move(grid_view), + validator_index, + peer_use_count, + std::move(authority_lookup))); + }); if (per_session_state.has_error()) { - SL_WARN(logger, "Create session data failed. (error={})", per_session_state.error()); + SL_WARN(logger, + "Create session data failed. (error={})", + per_session_state.error()); continue; } @@ -404,13 +458,14 @@ outcome::result StatementDistribution::handle_active_leaves_update_inner( } if (validator_index) { - return find_active_validator_state(*validator_index, - per_session_state.value()->value().groups, - availability_cores, - group_rotation_info, - maybe_claim_queue, - seconding_limit, - max_candidate_depth); + return find_active_validator_state( + *validator_index, + per_session_state.value()->value().groups, + availability_cores, + group_rotation_info, + maybe_claim_queue, + seconding_limit, + max_candidate_depth); } return LocalValidatorState{}; }(); @@ -423,7 +478,8 @@ outcome::result StatementDistribution::handle_active_leaves_update_inner( new_relay_parent, PerRelayParentState{ .local_validator = local_validator, - .statement_store = StatementStore(per_session_state.value()->value().groups), + .statement_store = + StatementStore(per_session_state.value()->value().groups), .seconding_limit = seconding_limit, .session = session_index, .groups_per_para = std::move(groups_per_para), @@ -448,7 +504,8 @@ outcome::result StatementDistribution::handle_active_leaves_update_inner( std::vector>> update_peers; for (auto it = peers.begin(); it != peers.end(); ++it) { - std::vector fresh = it->second.reconcile_active_leaf(relay_parent, new_relay_parents); + std::vector fresh = + it->second.reconcile_active_leaf(relay_parent, new_relay_parents); if (!fresh.empty()) { update_peers.emplace_back(it->first, fresh); } @@ -464,14 +521,19 @@ outcome::result StatementDistribution::handle_active_leaves_update_inner( return outcome::success(); } - outcome::result StatementDistribution::handle_deactive_leaves_update_inner(const std::vector &lost) { + outcome::result + StatementDistribution::handle_deactive_leaves_update_inner( + const std::vector &lost) { implicit_view.exclusiveAccess([&](auto &iv) { for (const auto &leaf : lost) { const auto pruned = iv.deactivate_leaf(leaf); for (const auto &pruned_rp : pruned) { - if (auto it = per_relay_parent.find(pruned_rp); it != per_relay_parent.end()) { + if (auto it = per_relay_parent.find(pruned_rp); + it != per_relay_parent.end()) { if (auto active_state = it->second.active_validator_state()) { - active_state->get().cluster_tracker.warn_if_too_many_pending_statements(pruned_rp); + active_state->get() + .cluster_tracker.warn_if_too_many_pending_statements( + pruned_rp); } per_relay_parent.erase(it); } @@ -479,13 +541,13 @@ outcome::result StatementDistribution::handle_active_leaves_update_inner( } }); - candidates.on_deactivate_leaves(lost, [&](const auto &h) { - return per_relay_parent.contains(h); - }); + candidates.on_deactivate_leaves( + lost, [&](const auto &h) { return per_relay_parent.contains(h); }); return outcome::success(); } - outcome::result StatementDistribution::update_our_view(const Hash &relay_parent, const network::View &view) { + outcome::result StatementDistribution::update_our_view( + const Hash &relay_parent, const network::View &view) { if (auto parachain_proc = parachain_processor.lock()) { OUTCOME_TRY(parachain_proc->canProcessParachains()); } @@ -493,18 +555,23 @@ outcome::result StatementDistribution::handle_active_leaves_update_inner( OUTCOME_TRY(per_relay_parent, getStateByRelayParent(relay_parent)); std::unordered_set peers_to_send; - const auto &per_session_state = per_relay_parent.get().per_session_state->value(); + const auto &per_session_state = + per_relay_parent.get().per_session_state->value(); const auto &local_validator = per_session_state.local_validator; if (local_validator) { - if (const auto our_group = per_session_state.groups.byValidatorIndex(*local_validator)) { + if (const auto our_group = + per_session_state.groups.byValidatorIndex(*local_validator)) { /// update peers of our group if (const auto group = per_session_state.groups.get(*our_group)) { for (const auto vi : *group) { - if (auto peer = query_audi->get(per_session_state.session_info.discovery_keys[vi])) { + if (auto peer = query_audi->get( + per_session_state.session_info.discovery_keys[vi])) { peers_to_send.emplace(peer->id); } else { - SL_TRACE(logger, "No audi for {}.", per_session_state.session_info.discovery_keys[vi]); + SL_TRACE(logger, + "No audi for {}.", + per_session_state.session_info.discovery_keys[vi]); } } } @@ -515,17 +582,23 @@ outcome::result StatementDistribution::handle_active_leaves_update_inner( if (per_session_state.grid_view) { for (const auto &view : *per_session_state.grid_view) { for (const auto vi : view.sending) { - if (auto peer = query_audi->get(per_session_state.session_info.discovery_keys[vi])) { + if (auto peer = query_audi->get( + per_session_state.session_info.discovery_keys[vi])) { peers_to_send.emplace(peer->id); } else { - SL_TRACE(logger, "No audi for {}.", per_session_state.session_info.discovery_keys[vi]); + SL_TRACE(logger, + "No audi for {}.", + per_session_state.session_info.discovery_keys[vi]); } } for (const auto vi : view.receiving) { - if (auto peer = query_audi->get(per_session_state.session_info.discovery_keys[vi])) { + if (auto peer = query_audi->get( + per_session_state.session_info.discovery_keys[vi])) { peers_to_send.emplace(peer->id); } else { - SL_TRACE(logger, "No audi for {}.", per_session_state.session_info.discovery_keys[vi]); + SL_TRACE(logger, + "No audi for {}.", + per_session_state.session_info.discovery_keys[vi]); } } } @@ -537,11 +610,12 @@ outcome::result StatementDistribution::handle_active_leaves_update_inner( SL_INFO(logger, "Send my view. (peers_count={})", peers_to_send.size()); auto message = std::make_shared< - network::WireMessage>( - network::ViewUpdate{ - .view = view, - }); - network_bridge->send_to_peers(peers_to_send, router->getValidationProtocolVStaging(), message); + network::WireMessage>( + network::ViewUpdate{ + .view = view, + }); + network_bridge->send_to_peers( + peers_to_send, router->getValidationProtocolVStaging(), message); return outcome::success(); } @@ -566,13 +640,16 @@ outcome::result StatementDistribution::handle_active_leaves_update_inner( visit_in_place( core, [&](const runtime::ScheduledCore &scheduled_core) { - result.emplace(index, std::vector{scheduled_core.para_id}); + result.emplace(index, + std::vector{scheduled_core.para_id}); }, [&](const runtime::OccupiedCore &occupied_core) { if (max_candidate_depth >= 1) { if (occupied_core.next_up_on_available) { - result.emplace(index, - std::vector{occupied_core.next_up_on_available->para_id}); + result.emplace( + index, + std::vector{ + occupied_core.next_up_on_available->para_id}); } } }, @@ -594,12 +671,13 @@ outcome::result StatementDistribution::handle_active_leaves_update_inner( outcome::result> StatementDistribution::fetch_claim_queue(const RelayHash &relay_parent) { -// constexpr uint32_t CLAIM_QUEUE_RUNTIME_REQUIREMENT = 11; -// OUTCOME_TRY(version, parachain_host->runtime_api_version(relay_parent)); -// if (version < CLAIM_QUEUE_RUNTIME_REQUIREMENT) { -// SL_TRACE(logger, "Runtime doesn't support `request_claim_queue`"); -// return std::nullopt; -// } + // constexpr uint32_t CLAIM_QUEUE_RUNTIME_REQUIREMENT = 11; + // OUTCOME_TRY(version, + // parachain_host->runtime_api_version(relay_parent)); if (version < + // CLAIM_QUEUE_RUNTIME_REQUIREMENT) { + // SL_TRACE(logger, "Runtime doesn't support `request_claim_queue`"); + // return std::nullopt; + // } OUTCOME_TRY(claims, parachain_host->claim_queue(relay_parent)); return runtime::ClaimQueueSnapshot{ @@ -856,8 +934,7 @@ outcome::result StatementDistribution::handle_active_leaves_update_inner( } implicit_view.sharedAccess([&](const auto &iv) { - auto fresh_implicit = - peer_state->get().update_view(new_view, iv); + auto fresh_implicit = peer_state->get().update_view(new_view, iv); for (const auto &new_relay_parent : fresh_implicit) { send_peer_messages_for_relay_parent(peer, new_relay_parent); } @@ -1159,7 +1236,10 @@ outcome::result StatementDistribution::handle_active_leaves_update_inner( BOOST_ASSERT(statements_distribution_thread_handler->isInCurrentThread()); TRY_GET_OR_RET(relay_parent_state, tryGetStateByRelayParent(relay_parent)); - TRY_GET_OR_RET(local_group, utils::map(relay_parent_state->get().active_validator_state(), [](const auto &state) {return state.get().group;})); + TRY_GET_OR_RET( + local_group, + utils::map(relay_parent_state->get().active_validator_state(), + [](const auto &state) { return state.get().group; })); TRY_GET_OR_RET( group, relay_parent_state->get().per_session_state->value().groups.get( @@ -1635,7 +1715,9 @@ outcome::result StatementDistribution::handle_active_leaves_update_inner( "local_validator={})", candidate_hash, *sender_index, - *relay_parent_state->get().per_session_state->value().local_validator); + *relay_parent_state->get() + .per_session_state->value() + .local_validator); } return ManifestImportSuccess{ @@ -2702,10 +2784,14 @@ outcome::result StatementDistribution::handle_active_leaves_update_inner( void StatementDistribution::send_peer_messages_for_relay_parent( const libp2p::peer::PeerId &peer_id, const RelayHash &relay_parent) { - REINVOKE(*statements_distribution_thread_handler, send_peer_messages_for_relay_parent, peer_id, relay_parent); + REINVOKE(*statements_distribution_thread_handler, + send_peer_messages_for_relay_parent, + peer_id, + relay_parent); TRY_GET_OR_RET(parachain_state, tryGetStateByRelayParent(relay_parent)); - const network::CollationVersion version = network::CollationVersion::VStaging; + const network::CollationVersion version = + network::CollationVersion::VStaging; if (auto auth_id = query_audi->get(peer_id)) { if (auto vi = utils::get(parachain_state->get() .per_session_state->value() diff --git a/core/parachain/validator/statement_distribution/statement_distribution.hpp b/core/parachain/validator/statement_distribution/statement_distribution.hpp index 0e8021beb1..7648c4fd52 100644 --- a/core/parachain/validator/statement_distribution/statement_distribution.hpp +++ b/core/parachain/validator/statement_distribution/statement_distribution.hpp @@ -21,9 +21,9 @@ #include "parachain/validator/impl/candidates.hpp" #include "parachain/validator/network_bridge.hpp" #include "parachain/validator/signer.hpp" +#include "parachain/validator/statement_distribution/peer_state.hpp" #include "parachain/validator/statement_distribution/per_session_state.hpp" #include "parachain/validator/statement_distribution/types.hpp" -#include "parachain/validator/statement_distribution/peer_state.hpp" #include "utils/pool_handler_ready_make.hpp" namespace kagome::parachain { @@ -84,8 +84,7 @@ namespace kagome::parachain::statement_distribution { std::shared_ptr crypto_provider, std::shared_ptr peer_view, LazySPtr slots_util, - std::shared_ptr - babe_config_repo, + std::shared_ptr babe_config_repo, primitives::events::PeerSubscriptionEnginePtr peer_events_engine); void request_attested_candidate(const libp2p::peer::PeerId &peer, @@ -138,9 +137,9 @@ namespace kagome::parachain::statement_distribution { using ManifestSummary = parachain::grid::ManifestSummary; struct RelayParentContext { - Hash relay_parent; - std::optional validator_index; - std::optional v_index; + Hash relay_parent; + std::optional validator_index; + std::optional v_index; }; std::optional> @@ -336,12 +335,15 @@ namespace kagome::parachain::statement_distribution { outcome::result handle_view_event(const network::ExView &event); void handle_active_leaves_update( - const network::ExView &event, std::vector new_contexts); + const network::ExView &event, + std::vector new_contexts); outcome::result handle_active_leaves_update_inner( - const network::ExView &event, std::vector new_contexts); + const network::ExView &event, + std::vector new_contexts); outcome::result handle_deactive_leaves_update_inner( - const std::vector &lost); - outcome::result update_our_view(const Hash &relay_parent, const network::View &view); + const std::vector &lost); + outcome::result update_our_view(const Hash &relay_parent, + const network::View &view); void on_peer_connected(const libp2p::peer::PeerId &peer); void on_peer_disconnected(const libp2p::peer::PeerId &peer); diff --git a/core/primitives/event_types.hpp b/core/primitives/event_types.hpp index f99ff253cc..03e7422668 100644 --- a/core/primitives/event_types.hpp +++ b/core/primitives/event_types.hpp @@ -241,7 +241,6 @@ namespace kagome::primitives::events { using StorageEventSubscriber = StorageSubscriptionEngine::SubscriberType; using StorageEventSubscriberPtr = std::shared_ptr; - using PeerSubscriptionEngine = subscription::SubscriptionEngine; - using ChainSubscriptionEngine = subscription::SubscriptionEngine, @@ -259,7 +257,6 @@ namespace kagome::primitives::events { using ChainEventSubscriber = ChainSubscriptionEngine::SubscriberType; using ChainEventSubscriberPtr = std::shared_ptr; - using SyncStateSubscriptionEngine = subscription::SubscriptionEngine< primitives::events::SyncStateEventType, bool, @@ -269,7 +266,6 @@ namespace kagome::primitives::events { using SyncStateEventSubscriber = SyncStateSubscriptionEngine::SubscriberType; using SyncStateEventSubscriberPtr = std::shared_ptr; - using ExtrinsicSubscriptionEngine = subscription::SubscriptionEngine< SubscribedExtrinsicId, std::shared_ptr, @@ -279,7 +275,6 @@ namespace kagome::primitives::events { using ExtrinsicEventSubscriber = ExtrinsicSubscriptionEngine::SubscriberType; using ExtrinsicEventSubscriberPtr = std::shared_ptr; - template void subscribe( subscription::Subscriber &sub, @@ -296,7 +291,7 @@ namespace kagome::primitives::events { struct ChainSub { ChainSub(ChainSubscriptionEnginePtr engine) : sub{std::make_shared( - std::move(engine))} {} + std::move(engine))} {} void onBlock(ChainEventType type, auto f) { subscribe(*sub, type, [f{std::move(f)}](const ChainEventParams &args) { diff --git a/core/utils/map.hpp b/core/utils/map.hpp index 21cb2526e7..bd4444004d 100644 --- a/core/utils/map.hpp +++ b/core/utils/map.hpp @@ -87,11 +87,8 @@ namespace kagome::utils { return val; } - template - requires requires { - typename C::mapped_type; - } + requires requires { typename C::mapped_type; } inline std::optional get_it( C &container, const typename C::key_type &key) { if (auto it = container.find(key); it != container.end()) { diff --git a/core/utils/retain_if.hpp b/core/utils/retain_if.hpp index c7f3deca84..1813388572 100644 --- a/core/utils/retain_if.hpp +++ b/core/utils/retain_if.hpp @@ -18,9 +18,7 @@ namespace kagome { v.end()); } template - requires requires { - typename C::key_type; - } + requires requires { typename C::key_type; } void retain_if(C &v, auto &&predicate) { for (auto it = v.begin(); it != v.end();) { if (!predicate(*it)) {