From fd4b90646c95b2393838af47143edfa127186967 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Tue, 1 Aug 2023 09:52:07 +0300 Subject: [PATCH] Make wait time for caching relative to no-show ... additionally computed in ticks as it is done everywhere else. And added some tests to make sure approval-voting behaves the way we intended to. Signed-off-by: Alexandru Gheorghe --- .../approval-voting/src/approval_db/v2/mod.rs | 2 +- .../approval-voting/src/approvals_timers.rs | 80 --- node/core/approval-voting/src/lib.rs | 162 +++--- .../approval-voting/src/persisted_entries.rs | 6 +- node/core/approval-voting/src/tests.rs | 477 ++++++++++++++++++ node/core/approval-voting/src/time.rs | 165 +++++- primitives/src/vstaging/mod.rs | 4 +- runtime/parachains/src/configuration.rs | 2 +- .../src/configuration/migration/v6.rs | 2 +- 9 files changed, 752 insertions(+), 148 deletions(-) delete mode 100644 node/core/approval-voting/src/approvals_timers.rs diff --git a/node/core/approval-voting/src/approval_db/v2/mod.rs b/node/core/approval-voting/src/approval_db/v2/mod.rs index f33f17b4386c..20260c3e842e 100644 --- a/node/core/approval-voting/src/approval_db/v2/mod.rs +++ b/node/core/approval-voting/src/approval_db/v2/mod.rs @@ -248,7 +248,7 @@ pub struct BlockEntry { /// Context needed for creating an approval signature for a given candidate. pub struct CandidateSigningContext { pub candidate_hash: CandidateHash, - pub approved_time_since_unix_epoch: u128, + pub send_no_later_than_tick: Tick, } impl From for Tick { diff --git a/node/core/approval-voting/src/approvals_timers.rs b/node/core/approval-voting/src/approvals_timers.rs deleted file mode 100644 index 9225fad375f3..000000000000 --- a/node/core/approval-voting/src/approvals_timers.rs +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright (C) Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! A simple implementation of a timer per block hash . -//! -use std::{collections::HashSet, task::Poll, time::Duration}; - -use futures::{ - future::BoxFuture, - stream::{FusedStream, FuturesUnordered}, - Stream, StreamExt, -}; -use futures_timer::Delay; -use polkadot_primitives::{Hash, ValidatorIndex}; -// A list of delayed futures that gets triggered when the waiting time has expired and it is -// time to sign the candidate. -// We have a timer per relay-chain block. -#[derive(Default)] -pub struct SignApprovalsTimers { - timers: FuturesUnordered>, - blocks: HashSet, -} - -impl SignApprovalsTimers { - /// Starts a single timer per block hash - /// - /// Guarantees that if a timer already exits for the give block hash, - /// no additional timer is started. - pub fn maybe_start_timer_for_block( - &mut self, - timer_duration_ms: u32, - block_hash: Hash, - validator_index: ValidatorIndex, - ) { - if self.blocks.insert(block_hash) { - let delay = Delay::new(Duration::from_millis(timer_duration_ms as _)); - self.timers.push(Box::pin(async move { - delay.await; - (block_hash, validator_index) - })); - } - } -} - -impl Stream for SignApprovalsTimers { - type Item = (Hash, ValidatorIndex); - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let poll_result = self.timers.poll_next_unpin(cx); - match poll_result { - Poll::Ready(Some(result)) => { - self.blocks.remove(&result.0); - Poll::Ready(Some(result)) - }, - _ => poll_result, - } - } -} - -impl FusedStream for SignApprovalsTimers { - fn is_terminated(&self) -> bool { - self.timers.is_terminated() - } -} diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index f6380826fc2f..b212874d356c 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -21,7 +21,6 @@ //! of others. It uses this information to determine when candidates and blocks have //! been sufficiently approved to finalize. -use approvals_timers::SignApprovalsTimers; use itertools::Itertools; use jaeger::{hash_to_trace_identifier, PerLeafSpan}; use polkadot_node_jaeger as jaeger; @@ -74,23 +73,23 @@ use futures::{ }; use std::{ + cmp::min, collections::{ btree_map::Entry as BTMEntry, hash_map::Entry as HMEntry, BTreeMap, HashMap, HashSet, }, num::NonZeroUsize, sync::Arc, - time::{Duration, SystemTime, UNIX_EPOCH}, + time::Duration, }; use approval_checking::RequiredTranches; use bitvec::{order::Lsb0, vec::BitVec}; use criteria::{AssignmentCriteria, RealAssignmentCriteria}; use persisted_entries::{ApprovalEntry, BlockEntry, CandidateEntry}; -use time::{slot_number_to_tick, Clock, ClockExt, SystemClock, Tick}; +use time::{slot_number_to_tick, Clock, ClockExt, DelayedApprovalTimer, SystemClock, Tick}; mod approval_checking; pub mod approval_db; -pub mod approvals_timers; mod backend; mod criteria; mod import; @@ -785,7 +784,7 @@ where let mut wakeups = Wakeups::default(); let mut currently_checking_set = CurrentlyCheckingSet::default(); let mut approvals_cache = lru::LruCache::new(APPROVAL_CACHE_SIZE); - let mut sign_approvals_timers = SignApprovalsTimers::default(); + let mut delayed_approvals_timers = DelayedApprovalTimer::default(); let mut last_finalized_height: Option = { let (tx, rx) = oneshot::channel(); @@ -864,21 +863,25 @@ where actions }, - (block_hash, validator_index) = sign_approvals_timers.select_next_some() => { + (block_hash, validator_index) = delayed_approvals_timers.select_next_some() => { gum::debug!( target: LOG_TARGET, "Sign approval for multiple candidates", ); + + let approval_params = get_approval_voting_params_or_default(&mut ctx, block_hash).await; + match maybe_create_signature( &mut overlayed_db, &mut session_info_provider, + approval_params, &state, &mut ctx, block_hash, validator_index, &subsystem.metrics, ).await { - Ok(Some(duration)) => { - sign_approvals_timers.maybe_start_timer_for_block(duration.as_millis() as u32, block_hash, validator_index); + Ok(Some(next_wakeup)) => { + delayed_approvals_timers.maybe_arm_timer(next_wakeup, state.clock.as_ref(), block_hash, validator_index); }, Ok(None) => {} Err(err) => { @@ -902,7 +905,7 @@ where &mut wakeups, &mut currently_checking_set, &mut approvals_cache, - &mut sign_approvals_timers, + &mut delayed_approvals_timers, &mut subsystem.mode, actions, ) @@ -950,7 +953,7 @@ async fn handle_actions( wakeups: &mut Wakeups, currently_checking_set: &mut CurrentlyCheckingSet, approvals_cache: &mut lru::LruCache, - sign_approvals_timers: &mut SignApprovalsTimers, + delayed_approvals_timers: &mut DelayedApprovalTimer, mode: &mut Mode, actions: Vec, ) -> SubsystemResult { @@ -980,7 +983,7 @@ async fn handle_actions( session_info_provider, metrics, candidate_hash, - sign_approvals_timers, + delayed_approvals_timers, approval_request, ) .await? @@ -1019,7 +1022,6 @@ async fn handle_actions( let block_hash = indirect_cert.block_hash; launch_approval_span.add_string_tag("block-hash", format!("{:?}", block_hash)); let validator_index = indirect_cert.validator; - ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment( indirect_cert, claimed_candidate_indices, @@ -2939,7 +2941,7 @@ async fn issue_approval( session_info_provider: &mut RuntimeInfo, metrics: &Metrics, candidate_hash: CandidateHash, - sign_approvals_timers: &mut SignApprovalsTimers, + delayed_approvals_timers: &mut DelayedApprovalTimer, ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest, ) -> SubsystemResult> { let mut issue_approval_span = state @@ -3009,14 +3011,30 @@ async fn issue_approval( }, }; + let session_info = match get_session_info( + session_info_provider, + ctx.sender(), + block_entry.parent_hash(), + block_entry.session(), + ) + .await + { + Some(s) => s, + None => return Ok(Vec::new()), + }; + + let approval_params = get_approval_voting_params_or_default(ctx, block_hash).await; + block_entry.candidates_pending_signature.insert( candidate_index as _, CandidateSigningContext { candidate_hash, - approved_time_since_unix_epoch: SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|duration| duration.as_millis()) - .unwrap_or(0), + send_no_later_than_tick: compute_delayed_approval_sending_tick( + state, + &block_entry, + session_info, + &approval_params, + ), }, ); @@ -3041,9 +3059,10 @@ async fn issue_approval( ) .await; - if let Some(timer_duration) = maybe_create_signature( + if let Some(next_wakeup) = maybe_create_signature( db, session_info_provider, + approval_params, state, ctx, block_hash, @@ -3052,8 +3071,9 @@ async fn issue_approval( ) .await? { - sign_approvals_timers.maybe_start_timer_for_block( - timer_duration.as_millis() as u32, + delayed_approvals_timers.maybe_arm_timer( + next_wakeup, + state.clock.as_ref(), block_hash, validator_index, ); @@ -3066,12 +3086,13 @@ async fn issue_approval( async fn maybe_create_signature( db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, + approval_params: ApprovalVotingParams, state: &State, ctx: &mut Context, block_hash: Hash, validator_index: ValidatorIndex, metrics: &Metrics, -) -> SubsystemResult> { +) -> SubsystemResult> { let mut block_entry = match db.load_block_entry(&block_hash)? { Some(b) => b, None => { @@ -3093,50 +3114,21 @@ async fn maybe_create_signature( let oldest_candidate_to_sign = match block_entry .candidates_pending_signature .values() - .min_by(|a, b| a.approved_time_since_unix_epoch.cmp(&b.approved_time_since_unix_epoch)) + .min_by(|a, b| a.send_no_later_than_tick.cmp(&b.send_no_later_than_tick)) { Some(candidate) => candidate, + // No cached candidates, nothing to do here, this just means the timer fired, + // but the signatures were already sent because we gathered more than max_approval_coalesce_count. None => return Ok(None), }; - let (s_tx, s_rx) = oneshot::channel(); - - ctx.send_message(RuntimeApiMessage::Request( - block_hash, - RuntimeApiRequest::ApprovalVotingParams(s_tx), - )) - .await; - - let approval_params = match s_rx.await { - Ok(Ok(s)) => s, - _ => { - gum::error!( - target: LOG_TARGET, - "Could not request approval voting params from runtime using defaults" - ); - ApprovalVotingParams { - max_approval_coalesce_count: 1, - max_approval_coalesce_wait_millis: 100, - } - }, - }; + let tick_now = state.clock.tick_now(); - let passed_since_approved = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|duration| duration.as_millis()) - .map(|now| now.checked_sub(oldest_candidate_to_sign.approved_time_since_unix_epoch)); - - match passed_since_approved { - Ok(Some(passed_since_approved)) - if passed_since_approved < - approval_params.max_approval_coalesce_wait_millis as u128 && - (block_entry.candidates_pending_signature.len() as u32) < - approval_params.max_approval_coalesce_count => - return Ok(Some(Duration::from_millis( - (approval_params.max_approval_coalesce_wait_millis as u128 - passed_since_approved) - as u64, - ))), - _ => {}, + if oldest_candidate_to_sign.send_no_later_than_tick > tick_now && + (block_entry.candidates_pending_signature.len() as u32) < + approval_params.max_approval_coalesce_count + { + return Ok(Some(oldest_candidate_to_sign.send_no_later_than_tick)) } let session_info = match get_session_info( @@ -3281,3 +3273,55 @@ fn issue_local_invalid_statement( false, )); } + +#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] +async fn get_approval_voting_params_or_default( + ctx: &mut Context, + block_hash: Hash, +) -> ApprovalVotingParams { + let (s_tx, s_rx) = oneshot::channel(); + + ctx.send_message(RuntimeApiMessage::Request( + block_hash, + RuntimeApiRequest::ApprovalVotingParams(s_tx), + )) + .await; + + match s_rx.await { + Ok(Ok(s)) => s, + _ => { + gum::error!( + target: LOG_TARGET, + "Could not request approval voting params from runtime using defaults" + ); + ApprovalVotingParams { + max_approval_coalesce_count: 1, + max_approval_coalesce_wait_ticks: 2, + } + }, + } +} + +fn compute_delayed_approval_sending_tick( + state: &State, + block_entry: &BlockEntry, + session_info: &SessionInfo, + approval_params: &ApprovalVotingParams, +) -> Tick { + let current_block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot()); + + let no_show_duration_ticks = slot_number_to_tick( + state.slot_duration_millis, + Slot::from(u64::from(session_info.no_show_slots)), + ); + let tick_now = state.clock.tick_now(); + + min( + tick_now + approval_params.max_approval_coalesce_wait_ticks as Tick, + // We don't want to accidentally cause, no-shows so if we are past + // the 2 thirds of the no show time, force the sending of the + // approval immediately. + // TODO: TBD the right value here, so that we don't accidentally create no-shows. + current_block_tick + (no_show_duration_ticks * 2) / 3, + ) +} diff --git a/node/core/approval-voting/src/persisted_entries.rs b/node/core/approval-voting/src/persisted_entries.rs index bf3d52956998..ac1dea98ebda 100644 --- a/node/core/approval-voting/src/persisted_entries.rs +++ b/node/core/approval-voting/src/persisted_entries.rs @@ -366,7 +366,7 @@ pub struct BlockEntry { #[derive(Debug, Clone, PartialEq)] pub struct CandidateSigningContext { pub candidate_hash: CandidateHash, - pub approved_time_since_unix_epoch: u128, + pub send_no_later_than_tick: Tick, } impl BlockEntry { @@ -489,7 +489,7 @@ impl From for CandidateSigningC fn from(signing_context: crate::approval_db::v2::CandidateSigningContext) -> Self { Self { candidate_hash: signing_context.candidate_hash, - approved_time_since_unix_epoch: signing_context.approved_time_since_unix_epoch, + send_no_later_than_tick: signing_context.send_no_later_than_tick.into(), } } } @@ -498,7 +498,7 @@ impl From for crate::approval_db::v2::CandidateSigningC fn from(signing_context: CandidateSigningContext) -> Self { Self { candidate_hash: signing_context.candidate_hash, - approved_time_since_unix_epoch: signing_context.approved_time_since_unix_epoch, + send_no_later_than_tick: signing_context.send_no_later_than_tick.into(), } } } diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs index c37850d0466f..3ad5457aace7 100644 --- a/node/core/approval-voting/src/tests.rs +++ b/node/core/approval-voting/src/tests.rs @@ -2595,11 +2595,31 @@ async fn handle_double_assignment_import( } ); + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 1, + max_approval_coalesce_wait_ticks: 0, + })); + } + ); + assert_matches!( overseer_recv(virtual_overseer).await, AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_)) ); + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 1, + max_approval_coalesce_wait_ticks: 0, + })); + } + ); + assert_matches!( overseer_recv(virtual_overseer).await, AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_)) @@ -3334,3 +3354,460 @@ fn waits_until_approving_assignments_are_old_enough() { virtual_overseer }); } + +#[test] +fn test_approval_is_sent_on_max_approval_coalesce_count() { + let assignment_criteria = Box::new(MockAssignmentCriteria( + || { + let mut assignments = HashMap::new(); + let _ = assignments.insert( + CoreIndex(0), + approval_db::v2::OurAssignment { + cert: garbage_assignment_cert(AssignmentCertKind::RelayVRFModulo { sample: 0 }) + .into(), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + + let assignments_cert = + garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFModuloCompact { + core_bitfield: vec![CoreIndex(0), CoreIndex(1), CoreIndex(2)] + .try_into() + .unwrap(), + }); + let _ = assignments.insert( + CoreIndex(0), + approval_db::v2::OurAssignment { + cert: assignments_cert.clone(), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + + let _ = assignments.insert( + CoreIndex(1), + approval_db::v2::OurAssignment { + cert: assignments_cert.clone(), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + assignments + }, + |_| Ok(0), + )); + + let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build(); + let store = config.backend(); + + test_harness(config, |test_harness| async move { + let TestHarness { mut virtual_overseer, clock, sync_oracle_handle: _sync_oracle_handle } = + test_harness; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => { + rx.send(Ok(0)).unwrap(); + } + ); + + let block_hash = Hash::repeat_byte(0x01); + + let candidate_commitments = CandidateCommitments::default(); + + let candidate_receipt1 = { + let mut receipt = dummy_candidate_receipt(block_hash); + receipt.descriptor.para_id = ParaId::from(1_u32); + receipt.commitments_hash = candidate_commitments.hash(); + receipt + }; + + let candidate_hash1 = candidate_receipt1.hash(); + + let candidate_receipt2 = { + let mut receipt = dummy_candidate_receipt(block_hash); + receipt.descriptor.para_id = ParaId::from(2_u32); + receipt.commitments_hash = candidate_commitments.hash(); + receipt + }; + + let slot = Slot::from(1); + let candidate_index1 = 0; + let candidate_index2 = 1; + + let validators = vec![ + Sr25519Keyring::Alice, + Sr25519Keyring::Bob, + Sr25519Keyring::Charlie, + Sr25519Keyring::Dave, + Sr25519Keyring::Eve, + ]; + let session_info = SessionInfo { + validator_groups: IndexedVec::>::from(vec![ + vec![ValidatorIndex(0), ValidatorIndex(1)], + vec![ValidatorIndex(2)], + vec![ValidatorIndex(3), ValidatorIndex(4)], + ]), + ..session_info(&validators) + }; + + let candidates = Some(vec![ + (candidate_receipt1.clone(), CoreIndex(0), GroupIndex(0)), + (candidate_receipt2.clone(), CoreIndex(1), GroupIndex(1)), + ]); + ChainBuilder::new() + .add_block( + block_hash, + ChainBuilder::GENESIS_HASH, + 1, + BlockConfig { + slot, + candidates: candidates.clone(), + session_info: Some(session_info.clone()), + }, + ) + .build(&mut virtual_overseer) + .await; + + assert!(!clock.inner.lock().current_wakeup_is(1)); + clock.inner.lock().wakeup_all(1); + + assert!(clock.inner.lock().current_wakeup_is(slot_to_tick(slot))); + clock.inner.lock().wakeup_all(slot_to_tick(slot)); + + futures_timer::Delay::new(Duration::from_millis(200)).await; + + clock.inner.lock().wakeup_all(slot_to_tick(slot + 2)); + + assert_eq!(clock.inner.lock().wakeups.len(), 0); + + futures_timer::Delay::new(Duration::from_millis(200)).await; + + let candidate_entry = store.load_candidate_entry(&candidate_hash1).unwrap().unwrap(); + let our_assignment = + candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap(); + assert!(our_assignment.triggered()); + + handle_approval_on_max_coalesce_count( + &mut virtual_overseer, + vec![candidate_index1, candidate_index2], + ) + .await; + + virtual_overseer + }); +} + +async fn handle_approval_on_max_coalesce_count( + virtual_overseer: &mut VirtualOverseer, + candidate_indicies: Vec, +) { + for _ in &candidate_indicies { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + c_indices, + )) => { + assert_eq!(TryInto::::try_into(candidate_indicies.clone()).unwrap(), c_indices); + } + ); + + recover_available_data(virtual_overseer).await; + fetch_validation_code(virtual_overseer).await; + } + + for _ in &candidate_indicies { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx)) if timeout == PvfExecTimeoutKind::Approval => { + tx.send(Ok(ValidationResult::Valid(Default::default(), Default::default()))) + .unwrap(); + } + ); + } + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 2, + max_approval_coalesce_wait_ticks: 10000, + })); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 2, + max_approval_coalesce_wait_ticks: 10000, + })); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(vote)) => { + assert_eq!(TryInto::::try_into(candidate_indicies).unwrap(), vote.candidate_indices); + } + ); + + // Assert that there are no more messages being sent by the subsystem + assert!(overseer_recv(virtual_overseer).timeout(TIMEOUT / 2).await.is_none()); +} + +async fn handle_approval_on_max_wait_time( + virtual_overseer: &mut VirtualOverseer, + candidate_indicies: Vec, + clock: Box, +) { + const TICK_NOW_BEGIN: u64 = 1; + const MAX_COALESCE_COUNT: u32 = 3; + const MAX_APPROVAL_COALESCE_WAIT_TICKS: u32 = 4; + + clock.inner.lock().set_tick(TICK_NOW_BEGIN); + + for _ in &candidate_indicies { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + c_indices, + )) => { + assert_eq!(TryInto::::try_into(candidate_indicies.clone()).unwrap(), c_indices); + } + ); + + recover_available_data(virtual_overseer).await; + fetch_validation_code(virtual_overseer).await; + } + + for _ in &candidate_indicies { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx)) if timeout == PvfExecTimeoutKind::Approval => { + tx.send(Ok(ValidationResult::Valid(Default::default(), Default::default()))) + .unwrap(); + } + ); + } + + // First time we fetch the configuration when we are ready to approve the first candidate + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: MAX_COALESCE_COUNT, + max_approval_coalesce_wait_ticks: MAX_APPROVAL_COALESCE_WAIT_TICKS, + })); + } + ); + + // Second time we fetch the configuration when we are ready to approve the second candidate + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: MAX_COALESCE_COUNT, + max_approval_coalesce_wait_ticks: MAX_APPROVAL_COALESCE_WAIT_TICKS, + })); + } + ); + + assert!(overseer_recv(virtual_overseer).timeout(TIMEOUT / 2).await.is_none()); + + // Move the clock just before we should send the approval + clock + .inner + .lock() + .set_tick(MAX_APPROVAL_COALESCE_WAIT_TICKS as Tick + TICK_NOW_BEGIN - 1); + + assert!(overseer_recv(virtual_overseer).timeout(TIMEOUT / 2).await.is_none()); + + // Move the clock tick, so we can trigger a force sending of the approvals + clock + .inner + .lock() + .set_tick(MAX_APPROVAL_COALESCE_WAIT_TICKS as Tick + TICK_NOW_BEGIN); + + // Third time we fetch the configuration when timer expires and we are ready to sent the approval + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 3, + max_approval_coalesce_wait_ticks: 4, + })); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(vote)) => { + assert_eq!(TryInto::::try_into(candidate_indicies).unwrap(), vote.candidate_indices); + } + ); + + // Assert that there are no more messages being sent by the subsystem + assert!(overseer_recv(virtual_overseer).timeout(TIMEOUT / 2).await.is_none()); +} + +#[test] +fn test_approval_is_sent_on_max_approval_coalesce_wait() { + let assignment_criteria = Box::new(MockAssignmentCriteria( + || { + let mut assignments = HashMap::new(); + let _ = assignments.insert( + CoreIndex(0), + approval_db::v2::OurAssignment { + cert: garbage_assignment_cert(AssignmentCertKind::RelayVRFModulo { sample: 0 }) + .into(), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + + let assignments_cert = + garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFModuloCompact { + core_bitfield: vec![CoreIndex(0), CoreIndex(1), CoreIndex(2)] + .try_into() + .unwrap(), + }); + let _ = assignments.insert( + CoreIndex(0), + approval_db::v2::OurAssignment { + cert: assignments_cert.clone(), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + + let _ = assignments.insert( + CoreIndex(1), + approval_db::v2::OurAssignment { + cert: assignments_cert.clone(), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + assignments + }, + |_| Ok(0), + )); + + let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build(); + let store = config.backend(); + + test_harness(config, |test_harness| async move { + let TestHarness { mut virtual_overseer, clock, sync_oracle_handle: _sync_oracle_handle } = + test_harness; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => { + rx.send(Ok(0)).unwrap(); + } + ); + + let block_hash = Hash::repeat_byte(0x01); + + let candidate_commitments = CandidateCommitments::default(); + + let candidate_receipt1 = { + let mut receipt = dummy_candidate_receipt(block_hash); + receipt.descriptor.para_id = ParaId::from(1_u32); + receipt.commitments_hash = candidate_commitments.hash(); + receipt + }; + + let candidate_hash1 = candidate_receipt1.hash(); + + let candidate_receipt2 = { + let mut receipt = dummy_candidate_receipt(block_hash); + receipt.descriptor.para_id = ParaId::from(2_u32); + receipt.commitments_hash = candidate_commitments.hash(); + receipt + }; + + let slot = Slot::from(1); + let candidate_index1 = 0; + let candidate_index2 = 1; + + let validators = vec![ + Sr25519Keyring::Alice, + Sr25519Keyring::Bob, + Sr25519Keyring::Charlie, + Sr25519Keyring::Dave, + Sr25519Keyring::Eve, + ]; + let session_info = SessionInfo { + validator_groups: IndexedVec::>::from(vec![ + vec![ValidatorIndex(0), ValidatorIndex(1)], + vec![ValidatorIndex(2)], + vec![ValidatorIndex(3), ValidatorIndex(4)], + ]), + ..session_info(&validators) + }; + + let candidates = Some(vec![ + (candidate_receipt1.clone(), CoreIndex(0), GroupIndex(0)), + (candidate_receipt2.clone(), CoreIndex(1), GroupIndex(1)), + ]); + ChainBuilder::new() + .add_block( + block_hash, + ChainBuilder::GENESIS_HASH, + 1, + BlockConfig { + slot, + candidates: candidates.clone(), + session_info: Some(session_info.clone()), + }, + ) + .build(&mut virtual_overseer) + .await; + + assert!(!clock.inner.lock().current_wakeup_is(1)); + clock.inner.lock().wakeup_all(1); + + assert!(clock.inner.lock().current_wakeup_is(slot_to_tick(slot))); + clock.inner.lock().wakeup_all(slot_to_tick(slot)); + + futures_timer::Delay::new(Duration::from_millis(200)).await; + + clock.inner.lock().wakeup_all(slot_to_tick(slot + 2)); + + assert_eq!(clock.inner.lock().wakeups.len(), 0); + + futures_timer::Delay::new(Duration::from_millis(200)).await; + + let candidate_entry = store.load_candidate_entry(&candidate_hash1).unwrap().unwrap(); + let our_assignment = + candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap(); + assert!(our_assignment.triggered()); + + handle_approval_on_max_wait_time( + &mut virtual_overseer, + vec![candidate_index1, candidate_index2], + clock, + ) + .await; + + virtual_overseer + }); +} diff --git a/node/core/approval-voting/src/time.rs b/node/core/approval-voting/src/time.rs index a45866402c82..ca6ba288d161 100644 --- a/node/core/approval-voting/src/time.rs +++ b/node/core/approval-voting/src/time.rs @@ -16,14 +16,23 @@ //! Time utilities for approval voting. -use futures::prelude::*; +use futures::{ + future::BoxFuture, + prelude::*, + stream::{FusedStream, FuturesUnordered}, + Stream, StreamExt, +}; + use polkadot_node_primitives::approval::v1::DelayTranche; use sp_consensus_slots::Slot; use std::{ + collections::HashSet, pin::Pin, + task::Poll, time::{Duration, SystemTime}, }; +use polkadot_primitives::{Hash, ValidatorIndex}; const TICK_DURATION_MILLIS: u64 = 500; /// A base unit of time, starting from the Unix epoch, split into half-second intervals. @@ -88,3 +97,157 @@ pub(crate) fn slot_number_to_tick(slot_duration_millis: u64, slot: Slot) -> Tick let ticks_per_slot = slot_duration_millis / TICK_DURATION_MILLIS; u64::from(slot) * ticks_per_slot } + +// A list of delayed futures that gets triggered when the waiting time has expired and it is +// time to sign the candidate. +// We have a timer per relay-chain block. +#[derive(Default)] +pub struct DelayedApprovalTimer { + timers: FuturesUnordered>, + blocks: HashSet, +} + +impl DelayedApprovalTimer { + /// Starts a single timer per block hash + /// + /// Guarantees that if a timer already exits for the give block hash, + /// no additional timer is started. + pub(crate) fn maybe_arm_timer( + &mut self, + wait_untill: Tick, + clock: &dyn Clock, + block_hash: Hash, + validator_index: ValidatorIndex, + ) { + if self.blocks.insert(block_hash) { + let clock_wait = clock.wait(wait_untill); + self.timers.push(Box::pin(async move { + clock_wait.await; + (block_hash, validator_index) + })); + } + } +} + +impl Stream for DelayedApprovalTimer { + type Item = (Hash, ValidatorIndex); + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let poll_result = self.timers.poll_next_unpin(cx); + match poll_result { + Poll::Ready(Some(result)) => { + self.blocks.remove(&result.0); + Poll::Ready(Some(result)) + }, + _ => poll_result, + } + } +} + +impl FusedStream for DelayedApprovalTimer { + fn is_terminated(&self) -> bool { + self.timers.is_terminated() + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use futures::{executor::block_on, FutureExt, StreamExt}; + use futures_timer::Delay; + use polkadot_primitives::{Hash, ValidatorIndex}; + + use crate::time::{Clock, SystemClock}; + + use super::DelayedApprovalTimer; + + #[test] + fn test_select_empty_timer() { + block_on(async move { + let mut timer = DelayedApprovalTimer::default(); + + for _ in 1..10 { + let result = futures::select!( + _ = timer.select_next_some() => { + 0 + } + // Only this arm should fire + _ = Delay::new(Duration::from_millis(100)).fuse() => { + 1 + } + ); + + assert_eq!(result, 1); + } + }); + } + + #[test] + fn test_timer_functionality() { + block_on(async move { + let mut timer = DelayedApprovalTimer::default(); + let test_hashes = + vec![Hash::repeat_byte(0x01), Hash::repeat_byte(0x02), Hash::repeat_byte(0x03)]; + for (index, hash) in test_hashes.iter().enumerate() { + timer.maybe_arm_timer( + SystemClock.tick_now() + index as u64, + &SystemClock, + *hash, + ValidatorIndex::from(2), + ); + timer.maybe_arm_timer( + SystemClock.tick_now() + index as u64, + &SystemClock, + *hash, + ValidatorIndex::from(2), + ); + } + let timeout_hash = Hash::repeat_byte(0x02); + for i in 0..test_hashes.len() * 2 { + let result = futures::select!( + (hash, _) = timer.select_next_some() => { + hash + } + // Timers should fire only once, so for the rest of the iterations we should timeout through here. + _ = Delay::new(Duration::from_secs(2)).fuse() => { + timeout_hash + } + ); + assert_eq!(test_hashes.get(i).cloned().unwrap_or(timeout_hash), result); + } + + // Now check timer can be restarted if already fired + for (index, hash) in test_hashes.iter().enumerate() { + timer.maybe_arm_timer( + SystemClock.tick_now() + index as u64, + &SystemClock, + *hash, + ValidatorIndex::from(2), + ); + timer.maybe_arm_timer( + SystemClock.tick_now() + index as u64, + &SystemClock, + *hash, + ValidatorIndex::from(2), + ); + } + + for i in 0..test_hashes.len() * 2 { + let result = futures::select!( + (hash, _) = timer.select_next_some() => { + hash + } + // Timers should fire only once, so for the rest of the iterations we should timeout through here. + _ = Delay::new(Duration::from_secs(2)).fuse() => { + timeout_hash + } + ); + assert_eq!(test_hashes.get(i).cloned().unwrap_or(timeout_hash), result); + } + }); + } +} diff --git a/primitives/src/vstaging/mod.rs b/primitives/src/vstaging/mod.rs index aad7208355e1..aa652f433f66 100644 --- a/primitives/src/vstaging/mod.rs +++ b/primitives/src/vstaging/mod.rs @@ -69,7 +69,7 @@ pub struct ApprovalVotingParams { /// /// Setting it to 1, means we send the approval as soon as we have it available. pub max_approval_coalesce_count: u32, - /// The maximum time we await for a candidate approval to be coalesced with + /// The maximum ticks we await for a candidate approval to be coalesced with /// the ones for other candidate before we sign it and distribute to our peers - pub max_approval_coalesce_wait_millis: u32, + pub max_approval_coalesce_wait_ticks: u32, } diff --git a/runtime/parachains/src/configuration.rs b/runtime/parachains/src/configuration.rs index 52172f9acf5a..79990c1ac5f0 100644 --- a/runtime/parachains/src/configuration.rs +++ b/runtime/parachains/src/configuration.rs @@ -301,7 +301,7 @@ impl> Default for HostConfiguration