From 0a3f8ace26eeaade9a83e498c2e5b284a7deec02 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Fri, 27 May 2022 16:49:50 +0300 Subject: [PATCH] fixed on-demand parachains relay case: if better relay header is delivered, then we must select para header that may be proved using this relay header (#1419) --- .../src/on_demand/parachains.rs | 413 ++++++------------ .../src/parachains/source.rs | 57 +-- .../relays/parachains/src/parachains_loop.rs | 88 +++- 3 files changed, 245 insertions(+), 313 deletions(-) diff --git a/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs b/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs index 8f1bee352007..d8cc60ab2977 100644 --- a/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs +++ b/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs @@ -37,14 +37,14 @@ use num_traits::Zero; use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber}; use parachains_relay::parachains_loop::{ParachainSyncParams, TargetClient}; use relay_substrate_client::{ - AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, + AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf, TransactionSignScheme, }; use relay_utils::{ metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, HeaderId, }; use sp_runtime::traits::Header as HeaderT; -use std::{cmp::Ordering, collections::BTreeMap}; +use std::fmt::Debug; /// On-demand Substrate <-> Substrate parachain finality relay. /// @@ -142,9 +142,8 @@ async fn background_task( let target_transactions_mortality = target_transaction_params.mortality; let mut relay_state = RelayState::Idle; - let mut headers_map_cache = BTreeMap::new(); let mut required_parachain_header_number = Zero::zero(); - let required_para_header_number_ref = Arc::new(Mutex::new(required_parachain_header_number)); + let required_para_header_number_ref = Arc::new(Mutex::new(None)); let mut restart_relay = true; let parachains_relay_task = futures::future::Fuse::terminated(); @@ -191,7 +190,10 @@ async fn background_task( // the workflow of the on-demand parachains relay is: // // 1) message relay (or any other dependent relay) sees new message at parachain header - // `PH`; 2) it sees that the target chain does not know `PH`; + // `PH`; + // + // 2) it sees that the target chain does not know `PH`; + // // 3) it asks on-demand parachains relay to relay `PH` to the target chain; // // Phase#1: relaying relay chain header @@ -204,21 +206,21 @@ async fn background_task( // Phase#2: relaying parachain header // // 7) on-demand parachains relay sets `ParachainsSource::maximal_header_number` to the - // `PH'.number()`. 8) parachains finality relay sees that the parachain head has been - // updated and relays `PH'` to the target chain. + // `PH'.number()`. + // 8) parachains finality relay sees that the parachain head has been + // updated and relays `PH'` to the target chain. // select headers to relay let relay_data = read_relay_data( ¶chains_source, ¶chains_target, required_parachain_header_number, - &mut headers_map_cache, ) .await; match relay_data { - Ok(mut relay_data) => { + Ok(relay_data) => { let prev_relay_state = relay_state; - relay_state = select_headers_to_relay(&mut relay_data, relay_state); + relay_state = select_headers_to_relay(&relay_data, relay_state); log::trace!( target: "bridge", "Selected new relay state in {}: {:?} using old state {:?} and data {:?}", @@ -244,13 +246,13 @@ async fn background_task( // requirements match relay_state { RelayState::Idle => (), - RelayState::RelayingRelayHeader(required_relay_header, _) => { + RelayState::RelayingRelayHeader(required_relay_header) => { on_demand_source_relay_to_target_headers .require_more_headers(required_relay_header) .await; }, RelayState::RelayingParaHeader(required_para_header) => { - *required_para_header_number_ref.lock().await = required_para_header; + *required_para_header_number_ref.lock().await = Some(required_para_header); }, } @@ -300,55 +302,44 @@ fn on_demand_parachains_relay_name() -> /// On-demand relay state. #[derive(Clone, Copy, Debug, PartialEq)] -enum RelayState { +enum RelayState { /// On-demand relay is not doing anything. Idle, /// Relaying given relay header to relay given parachain header later. - RelayingRelayHeader(SourceRelayBlock, SourceParaBlock), + RelayingRelayHeader(RelayNumber), /// Relaying given parachain header. - RelayingParaHeader(SourceParaBlock), + RelayingParaHeader(HeaderId), } /// Data gathered from source and target clients, used by on-demand relay. #[derive(Debug)] -struct RelayData<'a, SourceParaBlock, SourceRelayBlock> { +struct RelayData { /// Parachain header number that is required at the target chain. - pub required_para_header: SourceParaBlock, + pub required_para_header: ParaNumber, /// Parachain header number, known to the target chain. - pub para_header_at_target: SourceParaBlock, - /// Parachain header number, known to the source (relay) chain. - pub para_header_at_source: Option, + pub para_header_at_target: ParaNumber, + /// Parachain header id, known to the source (relay) chain. + pub para_header_at_source: Option>, + /// Parachain header, that is available at the source relay chain at `relay_header_at_target` + /// block. + pub para_header_at_relay_header_at_target: Option>, /// Relay header number at the source chain. - pub relay_header_at_source: SourceRelayBlock, + pub relay_header_at_source: RelayNumber, /// Relay header number at the target chain. - pub relay_header_at_target: SourceRelayBlock, - /// Map of relay to para header block numbers for recent relay headers. - /// - /// Even if we have been trying to relay relay header #100 to relay parachain header #50 - /// afterwards, it may happen that the relay header #200 may be relayed instead - either - /// by us (e.g. if GRANDPA justification is generated for #200, or if we are only syncing - /// mandatory headers), or by other relayer. Then, instead of parachain header #50 we may - /// relay parachain header #70. - /// - /// This cache is especially important, given that we assume that the nodes we're connected - /// to are not necessarily archive nodes. Then, if current relay chain block is #210 and #200 - /// has been delivered to the target chain, we have more chances to generate storage proof - /// at relay block #200 than on relay block #100, which is most likely has pruned state - /// already. - pub headers_map_cache: &'a mut BTreeMap, + pub relay_header_at_target: RelayNumber, } /// Read required data from source and target clients. -async fn read_relay_data<'a, P: SubstrateParachainsPipeline>( +async fn read_relay_data( source: &ParachainsSource

, target: &ParachainsTarget

, required_header_number: BlockNumberOf, - headers_map_cache: &'a mut BTreeMap< - BlockNumberOf, +) -> Result< + RelayData< + HashOf, BlockNumberOf, + BlockNumberOf, >, -) -> Result< - RelayData<'a, BlockNumberOf, BlockNumberOf>, FailedClient, > where @@ -398,7 +389,7 @@ where ) .await .map_err(map_source_err)? - .map(|h| *h.number()); + .map(|h| HeaderId(*h.number(), h.hash())); let relay_header_at_source = best_finalized_relay_block_id.0; let relay_header_at_target = @@ -408,68 +399,52 @@ where P::SourceRelayChain::BEST_FINALIZED_HEADER_ID_METHOD, ) .await - .map_err(map_target_err)? - .0; + .map_err(map_target_err)?; + + let para_header_at_relay_header_at_target = source + .on_chain_parachain_header(relay_header_at_target, P::SOURCE_PARACHAIN_PARA_ID.into()) + .await + .map_err(map_source_err)? + .map(|h| HeaderId(*h.number(), h.hash())); Ok(RelayData { required_para_header: required_header_number, para_header_at_target, para_header_at_source, relay_header_at_source, - relay_header_at_target, - headers_map_cache, + relay_header_at_target: relay_header_at_target.0, + para_header_at_relay_header_at_target, }) } -// This number is bigger than the session length of any well-known Substrate-based relay -// chain. We expect that the underlying on-demand relay will submit at least 1 header per -// session. -const MAX_HEADERS_MAP_CACHE_ENTRIES: usize = 4096; - /// Select relay and parachain headers that need to be relayed. -fn select_headers_to_relay<'a, SourceParaBlock, SourceRelayBlock>( - data: &mut RelayData<'a, SourceParaBlock, SourceRelayBlock>, - mut state: RelayState, -) -> RelayState +fn select_headers_to_relay( + data: &RelayData, + mut state: RelayState, +) -> RelayState where - RelayData<'a, SourceParaBlock, SourceRelayBlock>: std::fmt::Debug, // TODO: remove - SourceParaBlock: Copy + PartialOrd, - SourceRelayBlock: Copy + Ord, + ParaHash: Clone, + ParaNumber: Copy + PartialOrd, + RelayNumber: Copy + Debug + Ord, { - // despite of our current state, we want to update the headers map cache - if let Some(para_header_at_source) = data.para_header_at_source { - data.headers_map_cache - .insert(data.relay_header_at_source, para_header_at_source); - if data.headers_map_cache.len() > MAX_HEADERS_MAP_CACHE_ENTRIES { - let first_key = *data.headers_map_cache.keys().next().expect("map is not empty; qed"); - data.headers_map_cache.remove(&first_key); - } - } - // this switch is responsible for processing `RelayingRelayHeader` state match state { RelayState::Idle | RelayState::RelayingParaHeader(_) => (), - RelayState::RelayingRelayHeader(relay_header_number, para_header_number) => { - match data.relay_header_at_target.cmp(&relay_header_number) { - Ordering::Less => { - // relay header hasn't yet been relayed - return RelayState::RelayingRelayHeader(relay_header_number, para_header_number) - }, - Ordering::Equal => { - // relay header has been realyed and we may continue with parachain header - state = RelayState::RelayingParaHeader(para_header_number); - }, - Ordering::Greater => { - // relay header descendant has been relayed and we may need to change parachain - // header that we want to relay - let next_para_header_number = data - .headers_map_cache - .range(..=data.relay_header_at_target) - .next_back() - .map(|(_, next_para_header_number)| *next_para_header_number) - .unwrap_or_else(|| para_header_number); - state = RelayState::RelayingParaHeader(next_para_header_number); - }, + RelayState::RelayingRelayHeader(relay_header_number) => { + if data.relay_header_at_target < relay_header_number { + // required relay header hasn't yet been relayed + return RelayState::RelayingRelayHeader(relay_header_number) + } + + // we may switch to `RelayingParaHeader` if parachain head is available + if let Some(para_header_at_relay_header_at_target) = + data.para_header_at_relay_header_at_target.clone() + { + state = RelayState::RelayingParaHeader(para_header_at_relay_header_at_target); + } else { + // otherwise, we'd need to restart (this may happen only if parachain has been + // deregistered) + state = RelayState::Idle; } }, } @@ -477,11 +452,11 @@ where // this switch is responsible for processing `RelayingParaHeader` state match state { RelayState::Idle => (), - RelayState::RelayingRelayHeader(_, _) => unreachable!("processed by previous match; qed"), - RelayState::RelayingParaHeader(para_header_number) => { - if data.para_header_at_target < para_header_number { + RelayState::RelayingRelayHeader(_) => unreachable!("processed by previous match; qed"), + RelayState::RelayingParaHeader(para_header_id) => { + if data.para_header_at_target < para_header_id.0 { // parachain header hasn't yet been relayed - return RelayState::RelayingParaHeader(para_header_number) + return RelayState::RelayingParaHeader(para_header_id) } }, } @@ -491,8 +466,14 @@ where return RelayState::Idle } + // if we haven't read para head from the source, we can't yet do anyhting + let para_header_at_source = match data.para_header_at_source { + Some(ref para_header_at_source) => para_header_at_source.clone(), + None => return RelayState::Idle, + }; + // if required header is not available even at the source chain, let's wait - if Some(data.required_para_header) > data.para_header_at_source { + if data.required_para_header > para_header_at_source.0 { return RelayState::Idle } @@ -501,14 +482,11 @@ where // we need relay chain header first if data.relay_header_at_target < data.relay_header_at_source { - return RelayState::RelayingRelayHeader( - data.relay_header_at_source, - data.required_para_header, - ) + return RelayState::RelayingRelayHeader(data.relay_header_at_source) } // if all relay headers synced, we may start directly with parachain header - RelayState::RelayingParaHeader(data.required_para_header) + RelayState::RelayingParaHeader(para_header_at_source) } #[cfg(test)] @@ -519,17 +497,17 @@ mod tests { fn relay_waits_for_relay_header_to_be_delivered() { assert_eq!( select_headers_to_relay( - &mut RelayData { - required_para_header: 100, + &RelayData { + required_para_header: 90, para_header_at_target: 50, - para_header_at_source: Some(110), + para_header_at_source: Some(HeaderId(110, 110)), relay_header_at_source: 800, relay_header_at_target: 700, - headers_map_cache: &mut BTreeMap::new(), + para_header_at_relay_header_at_target: Some(HeaderId(100, 100)), }, - RelayState::RelayingRelayHeader(750, 100), + RelayState::RelayingRelayHeader(750), ), - RelayState::RelayingRelayHeader(750, 100), + RelayState::RelayingRelayHeader(750), ); } @@ -537,53 +515,17 @@ mod tests { fn relay_starts_relaying_requested_para_header_after_relay_header_is_delivered() { assert_eq!( select_headers_to_relay( - &mut RelayData { - required_para_header: 100, + &RelayData { + required_para_header: 90, para_header_at_target: 50, - para_header_at_source: Some(110), + para_header_at_source: Some(HeaderId(110, 110)), relay_header_at_source: 800, relay_header_at_target: 750, - headers_map_cache: &mut BTreeMap::new(), - }, - RelayState::RelayingRelayHeader(750, 100), - ), - RelayState::RelayingParaHeader(100), - ); - } - - #[test] - fn relay_selects_same_para_header_after_better_relay_header_is_delivered_1() { - assert_eq!( - select_headers_to_relay( - &mut RelayData { - required_para_header: 100, - para_header_at_target: 50, - para_header_at_source: Some(110), - relay_header_at_source: 800, - relay_header_at_target: 780, - headers_map_cache: &mut vec![(700, 90), (750, 100)].into_iter().collect(), - }, - RelayState::RelayingRelayHeader(750, 100), - ), - RelayState::RelayingParaHeader(100), - ); - } - - #[test] - fn relay_selects_same_para_header_after_better_relay_header_is_delivered_2() { - assert_eq!( - select_headers_to_relay( - &mut RelayData { - required_para_header: 100, - para_header_at_target: 50, - para_header_at_source: Some(110), - relay_header_at_source: 800, - relay_header_at_target: 780, - headers_map_cache: &mut BTreeMap::new(), + para_header_at_relay_header_at_target: Some(HeaderId(100, 100)), }, - RelayState::RelayingRelayHeader(750, 100), + RelayState::RelayingRelayHeader(750), ), - RelayState::RelayingParaHeader(100), + RelayState::RelayingParaHeader(HeaderId(100, 100)), ); } @@ -591,37 +533,34 @@ mod tests { fn relay_selects_better_para_header_after_better_relay_header_is_delivered() { assert_eq!( select_headers_to_relay( - &mut RelayData { - required_para_header: 100, + &RelayData { + required_para_header: 90, para_header_at_target: 50, - para_header_at_source: Some(120), + para_header_at_source: Some(HeaderId(110, 110)), relay_header_at_source: 800, relay_header_at_target: 780, - headers_map_cache: &mut vec![(700, 90), (750, 100), (780, 110), (790, 120)] - .into_iter() - .collect(), + para_header_at_relay_header_at_target: Some(HeaderId(105, 105)), }, - RelayState::RelayingRelayHeader(750, 100), + RelayState::RelayingRelayHeader(750), ), - RelayState::RelayingParaHeader(110), + RelayState::RelayingParaHeader(HeaderId(105, 105)), ); } - #[test] fn relay_waits_for_para_header_to_be_delivered() { assert_eq!( select_headers_to_relay( - &mut RelayData { - required_para_header: 100, + &RelayData { + required_para_header: 90, para_header_at_target: 50, - para_header_at_source: Some(110), + para_header_at_source: Some(HeaderId(110, 110)), relay_header_at_source: 800, - relay_header_at_target: 700, - headers_map_cache: &mut BTreeMap::new(), + relay_header_at_target: 780, + para_header_at_relay_header_at_target: Some(HeaderId(105, 105)), }, - RelayState::RelayingParaHeader(100), + RelayState::RelayingParaHeader(HeaderId(105, 105)), ), - RelayState::RelayingParaHeader(100), + RelayState::RelayingParaHeader(HeaderId(105, 105)), ); } @@ -629,13 +568,13 @@ mod tests { fn relay_stays_idle_if_required_para_header_is_already_delivered() { assert_eq!( select_headers_to_relay( - &mut RelayData { - required_para_header: 100, - para_header_at_target: 100, - para_header_at_source: Some(110), + &RelayData { + required_para_header: 90, + para_header_at_target: 105, + para_header_at_source: Some(HeaderId(110, 110)), relay_header_at_source: 800, - relay_header_at_target: 700, - headers_map_cache: &mut BTreeMap::new(), + relay_header_at_target: 780, + para_header_at_relay_header_at_target: Some(HeaderId(105, 105)), }, RelayState::Idle, ), @@ -647,13 +586,13 @@ mod tests { fn relay_waits_for_required_para_header_to_appear_at_source_1() { assert_eq!( select_headers_to_relay( - &mut RelayData { - required_para_header: 110, - para_header_at_target: 100, + &RelayData { + required_para_header: 120, + para_header_at_target: 105, para_header_at_source: None, relay_header_at_source: 800, - relay_header_at_target: 700, - headers_map_cache: &mut BTreeMap::new(), + relay_header_at_target: 780, + para_header_at_relay_header_at_target: Some(HeaderId(105, 105)), }, RelayState::Idle, ), @@ -665,13 +604,13 @@ mod tests { fn relay_waits_for_required_para_header_to_appear_at_source_2() { assert_eq!( select_headers_to_relay( - &mut RelayData { - required_para_header: 110, - para_header_at_target: 100, - para_header_at_source: Some(100), + &RelayData { + required_para_header: 120, + para_header_at_target: 105, + para_header_at_source: Some(HeaderId(110, 110)), relay_header_at_source: 800, - relay_header_at_target: 700, - headers_map_cache: &mut BTreeMap::new(), + relay_header_at_target: 780, + para_header_at_relay_header_at_target: Some(HeaderId(105, 105)), }, RelayState::Idle, ), @@ -683,17 +622,17 @@ mod tests { fn relay_starts_relaying_relay_header_when_new_para_header_is_requested() { assert_eq!( select_headers_to_relay( - &mut RelayData { - required_para_header: 110, - para_header_at_target: 100, - para_header_at_source: Some(110), + &RelayData { + required_para_header: 120, + para_header_at_target: 105, + para_header_at_source: Some(HeaderId(125, 125)), relay_header_at_source: 800, - relay_header_at_target: 700, - headers_map_cache: &mut BTreeMap::new(), + relay_header_at_target: 780, + para_header_at_relay_header_at_target: Some(HeaderId(105, 105)), }, RelayState::Idle, ), - RelayState::RelayingRelayHeader(800, 110), + RelayState::RelayingRelayHeader(800), ); } @@ -701,97 +640,35 @@ mod tests { fn relay_starts_relaying_para_header_when_new_para_header_is_requested() { assert_eq!( select_headers_to_relay( - &mut RelayData { - required_para_header: 110, - para_header_at_target: 100, - para_header_at_source: Some(110), + &RelayData { + required_para_header: 120, + para_header_at_target: 105, + para_header_at_source: Some(HeaderId(125, 125)), relay_header_at_source: 800, relay_header_at_target: 800, - headers_map_cache: &mut BTreeMap::new(), + para_header_at_relay_header_at_target: Some(HeaderId(125, 125)), }, RelayState::Idle, ), - RelayState::RelayingParaHeader(110), + RelayState::RelayingParaHeader(HeaderId(125, 125)), ); } #[test] - fn headers_map_cache_is_updated() { - let mut headers_map_cache = BTreeMap::new(); - - // when parachain header is known, map is updated - select_headers_to_relay( - &mut RelayData { - required_para_header: 0, - para_header_at_target: 50, - para_header_at_source: Some(110), - relay_header_at_source: 800, - relay_header_at_target: 700, - headers_map_cache: &mut headers_map_cache, - }, - RelayState::RelayingRelayHeader(750, 100), - ); - assert_eq!(headers_map_cache.clone().into_iter().collect::>(), vec![(800, 110)],); - - // when parachain header is not known, map is NOT updated - select_headers_to_relay( - &mut RelayData { - required_para_header: 0, - para_header_at_target: 50, - para_header_at_source: None, - relay_header_at_source: 800, - relay_header_at_target: 700, - headers_map_cache: &mut headers_map_cache, - }, - RelayState::RelayingRelayHeader(750, 100), - ); - assert_eq!(headers_map_cache.clone().into_iter().collect::>(), vec![(800, 110)],); - - // map auto-deduplicates equal entries - select_headers_to_relay( - &mut RelayData { - required_para_header: 0, - para_header_at_target: 50, - para_header_at_source: Some(110), - relay_header_at_source: 800, - relay_header_at_target: 700, - headers_map_cache: &mut headers_map_cache, - }, - RelayState::RelayingRelayHeader(750, 100), - ); - assert_eq!(headers_map_cache.clone().into_iter().collect::>(), vec![(800, 110)],); - - // nothing is pruned if number of map entries is < MAX_HEADERS_MAP_CACHE_ENTRIES - for i in 1..MAX_HEADERS_MAP_CACHE_ENTRIES { - select_headers_to_relay( - &mut RelayData { - required_para_header: 0, - para_header_at_target: 50, - para_header_at_source: Some(110 + i), - relay_header_at_source: 800 + i, - relay_header_at_target: 700, - headers_map_cache: &mut headers_map_cache, + fn relay_goes_idle_when_parachain_is_deregistered() { + assert_eq!( + select_headers_to_relay::( + &RelayData { + required_para_header: 120, + para_header_at_target: 105, + para_header_at_source: None, + relay_header_at_source: 800, + relay_header_at_target: 800, + para_header_at_relay_header_at_target: None, }, - RelayState::RelayingRelayHeader(750, 100), - ); - assert_eq!(headers_map_cache.len(), i + 1); - } - - // when we add next entry, the oldest one is pruned - assert!(headers_map_cache.contains_key(&800)); - assert_eq!(headers_map_cache.len(), MAX_HEADERS_MAP_CACHE_ENTRIES); - select_headers_to_relay( - &mut RelayData { - required_para_header: 0, - para_header_at_target: 50, - para_header_at_source: Some(110 + MAX_HEADERS_MAP_CACHE_ENTRIES), - relay_header_at_source: 800 + MAX_HEADERS_MAP_CACHE_ENTRIES, - relay_header_at_target: 700, - headers_map_cache: &mut headers_map_cache, - }, - RelayState::RelayingRelayHeader(750, 100), + RelayState::RelayingRelayHeader(800), + ), + RelayState::Idle, ); - assert!(!headers_map_cache.contains_key(&800)); - assert_eq!(headers_map_cache.len(), MAX_HEADERS_MAP_CACHE_ENTRIES); } } diff --git a/bridges/relays/lib-substrate-relay/src/parachains/source.rs b/bridges/relays/lib-substrate-relay/src/parachains/source.rs index 3ae735ab8930..ea30143e4b64 100644 --- a/bridges/relays/lib-substrate-relay/src/parachains/source.rs +++ b/bridges/relays/lib-substrate-relay/src/parachains/source.rs @@ -16,39 +16,38 @@ //! Parachain heads source. -use crate::{ - finality::source::RequiredHeaderNumberRef, - parachains::{ParachainsPipelineAdapter, SubstrateParachainsPipeline}, -}; +use crate::parachains::{ParachainsPipelineAdapter, SubstrateParachainsPipeline}; use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; use bp_parachains::parachain_head_storage_key_at_source; use bp_polkadot_core::parachains::{ParaHash, ParaHead, ParaHeadsProof, ParaId}; use codec::Decode; -use parachains_relay::parachains_loop::SourceClient; +use parachains_relay::parachains_loop::{ParaHashAtSource, SourceClient}; use relay_substrate_client::{ Chain, Client, Error as SubstrateError, HeaderIdOf, HeaderOf, RelayChain, }; use relay_utils::relay_loop::Client as RelayClient; use sp_runtime::traits::Header as HeaderT; +/// Shared updatable reference to the maximal parachain header id that we want to sync from the +/// source. +pub type RequiredHeaderIdRef = Arc>>>; + /// Substrate client as parachain heads source. #[derive(Clone)] pub struct ParachainsSource { client: Client, - maximal_header_number: Option>, - previous_parachain_head: Arc>>, + maximal_header_id: Option>, } impl ParachainsSource

{ /// Creates new parachains source client. pub fn new( client: Client, - maximal_header_number: Option>, + maximal_header_id: Option>, ) -> Self { - let previous_parachain_head = Arc::new(Mutex::new(None)); - ParachainsSource { client, maximal_header_number, previous_parachain_head } + ParachainsSource { client, maximal_header_id } } /// Returns reference to the underlying RPC client. @@ -102,7 +101,7 @@ where &self, at_block: HeaderIdOf, para_id: ParaId, - ) -> Result, Self::Error> { + ) -> Result { // we don't need to support many parachains now if para_id.0 != P::SOURCE_PARACHAIN_PARA_ID { return Err(SubstrateError::Custom(format!( @@ -112,29 +111,33 @@ where ))) } - let parachain_head = match self.on_chain_parachain_header(at_block, para_id).await? { + Ok(match self.on_chain_parachain_header(at_block, para_id).await? { Some(parachain_header) => { - let mut parachain_head = Some(parachain_header.hash()); + let mut parachain_head = ParaHashAtSource::Some(parachain_header.hash()); // never return head that is larger than requested. This way we'll never sync - // headers past `maximal_header_number` - if let Some(ref maximal_header_number) = self.maximal_header_number { - let maximal_header_number = *maximal_header_number.lock().await; - if *parachain_header.number() > maximal_header_number { - let previous_parachain_head = *self.previous_parachain_head.lock().await; - if let Some(previous_parachain_head) = previous_parachain_head { - parachain_head = Some(previous_parachain_head); - } + // headers past `maximal_header_id` + if let Some(ref maximal_header_id) = self.maximal_header_id { + let maximal_header_id = *maximal_header_id.lock().await; + match maximal_header_id { + Some(maximal_header_id) + if *parachain_header.number() > maximal_header_id.0 => + { + // we don't want this header yet => let's report previously requested + // header + parachain_head = ParaHashAtSource::Some(maximal_header_id.1); + }, + Some(_) => (), + None => { + // on-demand relay has not yet asked us to sync anything let's do that + parachain_head = ParaHashAtSource::Unavailable; + }, } } parachain_head }, - None => None, - }; - - *self.previous_parachain_head.lock().await = parachain_head; - - Ok(parachain_head) + None => ParaHashAtSource::None, + }) } async fn prove_parachain_heads( diff --git a/bridges/relays/parachains/src/parachains_loop.rs b/bridges/relays/parachains/src/parachains_loop.rs index 827a5d4430bd..fd173f2d25bc 100644 --- a/bridges/relays/parachains/src/parachains_loop.rs +++ b/bridges/relays/parachains/src/parachains_loop.rs @@ -52,6 +52,23 @@ pub enum ParachainSyncStrategy { All, } +/// Parachain head hash, available at the source (relay) chain. +#[derive(Clone, Copy, Debug)] +pub enum ParaHashAtSource { + /// There's no parachain head at the source chain. + /// + /// Normally it means that the parachain is not registered there. + None, + /// Parachain head with given hash is available at the source chain. + Some(ParaHash), + /// The source client refuses to report parachain head hash at this moment. + /// + /// It is a "mild" error, which may appear when e.g. on-demand parachains relay is used. + /// This variant must be treated as "we don't want to update parachain head value at the + /// target chain at this moment". + Unavailable, +} + /// Source client used in parachain heads synchronization loop. #[async_trait] pub trait SourceClient: RelayClient { @@ -63,7 +80,7 @@ pub trait SourceClient: RelayClient { &self, at_block: HeaderIdOf, para_id: ParaId, - ) -> Result, Self::Error>; + ) -> Result; /// Get parachain heads proof. async fn prove_parachain_heads( @@ -291,7 +308,7 @@ where /// Given heads at source and target clients, returns set of heads that are out of sync. fn select_parachains_to_update( - heads_at_source: BTreeMap>, + heads_at_source: BTreeMap, heads_at_target: BTreeMap>, best_finalized_relay_block: HeaderIdOf, ) -> Vec @@ -317,7 +334,12 @@ where .zip(heads_at_target.into_iter()) .filter(|((para, head_at_source), (_, head_at_target))| { let needs_update = match (head_at_source, head_at_target) { - (Some(head_at_source), Some(head_at_target)) + (ParaHashAtSource::Unavailable, _) => { + // source client has politely asked us not to update current parachain head + // at the target chain + false + }, + (ParaHashAtSource::Some(head_at_source), Some(head_at_target)) if head_at_target.at_relay_block_number < best_finalized_relay_block.0 && head_at_target.head_hash != *head_at_source => { @@ -325,22 +347,22 @@ where // client true }, - (Some(_), Some(_)) => { + (ParaHashAtSource::Some(_), Some(_)) => { // this is normal case when relay has recently updated heads, when parachain is // not progressing or when our source client is false }, - (Some(_), None) => { + (ParaHashAtSource::Some(_), None) => { // parachain is not yet known to the target client. This is true when parachain // or bridge has been just onboarded/started true }, - (None, Some(_)) => { + (ParaHashAtSource::None, Some(_)) => { // parachain/parathread has been offboarded removed from the system. It needs to // be propageted to the target client true }, - (None, None) => { + (ParaHashAtSource::None, None) => { // all's good - parachain is unknown to both clients false }, @@ -378,7 +400,7 @@ async fn read_heads_at_source( source_client: &impl SourceClient

, at_relay_block: &HeaderIdOf, parachains: &[ParaId], -) -> Result>, FailedClient> { +) -> Result, FailedClient> { let mut para_head_hashes = BTreeMap::new(); for para in parachains { let para_head = source_client.parachain_head(*at_relay_block, *para).await; @@ -554,7 +576,7 @@ mod tests { #[derive(Clone, Debug)] struct TestClientData { source_sync_status: Result, - source_heads: BTreeMap>, + source_heads: BTreeMap>, source_proofs: BTreeMap, TestError>>, target_best_block: Result, TestError>, @@ -569,7 +591,9 @@ mod tests { pub fn minimal() -> Self { TestClientData { source_sync_status: Ok(true), - source_heads: vec![(PARA_ID, Ok(PARA_0_HASH))].into_iter().collect(), + source_heads: vec![(PARA_ID, Ok(ParaHashAtSource::Some(PARA_0_HASH)))] + .into_iter() + .collect(), source_proofs: vec![(PARA_ID, Ok(PARA_0_HASH.encode()))].into_iter().collect(), target_best_block: Ok(HeaderId(0, Default::default())), @@ -615,8 +639,11 @@ mod tests { &self, _at_block: HeaderIdOf, para_id: ParaId, - ) -> Result, TestError> { - self.data.lock().await.source_heads.get(¶_id.0).cloned().transpose() + ) -> Result { + match self.data.lock().await.source_heads.get(¶_id.0).cloned() { + Some(result) => result, + None => Ok(ParaHashAtSource::None), + } } async fn prove_parachain_heads( @@ -923,7 +950,7 @@ mod tests { fn parachain_is_not_updated_if_it_is_unknown_to_both_clients() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), None)].into_iter().collect(), + vec![(ParaId(PARA_ID), ParaHashAtSource::None)].into_iter().collect(), vec![(ParaId(PARA_ID), None)].into_iter().collect(), HeaderId(10, Default::default()), ), @@ -935,7 +962,9 @@ mod tests { fn parachain_is_not_updated_if_it_has_been_updated_at_better_relay_block() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), Some(PARA_0_HASH))].into_iter().collect(), + vec![(ParaId(PARA_ID), ParaHashAtSource::Some(PARA_0_HASH))] + .into_iter() + .collect(), vec![( ParaId(PARA_ID), Some(BestParaHeadHash { at_relay_block_number: 20, head_hash: PARA_1_HASH }) @@ -952,7 +981,9 @@ mod tests { fn parachain_is_not_updated_if_hash_is_the_same_at_next_relay_block() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), Some(PARA_0_HASH))].into_iter().collect(), + vec![(ParaId(PARA_ID), ParaHashAtSource::Some(PARA_0_HASH))] + .into_iter() + .collect(), vec![( ParaId(PARA_ID), Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH }) @@ -969,7 +1000,7 @@ mod tests { fn parachain_is_updated_after_offboarding() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), None)].into_iter().collect(), + vec![(ParaId(PARA_ID), ParaHashAtSource::None)].into_iter().collect(), vec![( ParaId(PARA_ID), Some(BestParaHeadHash { @@ -989,7 +1020,9 @@ mod tests { fn parachain_is_updated_after_onboarding() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), Some(PARA_0_HASH))].into_iter().collect(), + vec![(ParaId(PARA_ID), ParaHashAtSource::Some(PARA_0_HASH))] + .into_iter() + .collect(), vec![(ParaId(PARA_ID), None)].into_iter().collect(), HeaderId(10, Default::default()), ), @@ -1001,7 +1034,9 @@ mod tests { fn parachain_is_updated_if_newer_head_is_known() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), Some(PARA_1_HASH))].into_iter().collect(), + vec![(ParaId(PARA_ID), ParaHashAtSource::Some(PARA_1_HASH))] + .into_iter() + .collect(), vec![( ParaId(PARA_ID), Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH }) @@ -1014,6 +1049,23 @@ mod tests { ); } + #[test] + fn parachain_is_not_updated_if_source_head_is_unavailable() { + assert_eq!( + select_parachains_to_update::( + vec![(ParaId(PARA_ID), ParaHashAtSource::Unavailable)].into_iter().collect(), + vec![( + ParaId(PARA_ID), + Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH }) + )] + .into_iter() + .collect(), + HeaderId(10, Default::default()), + ), + vec![], + ); + } + #[test] fn is_update_required_works() { let mut sync_params = ParachainSyncParams {