From b757b99af5459948730d67b038c8adfdf1257919 Mon Sep 17 00:00:00 2001 From: Julian Eager Date: Mon, 23 Oct 2023 18:34:34 +0800 Subject: [PATCH 01/15] enable parallel scraping --- cumulus/pallets/xcmp-queue/src/bridging.rs | 4 +- cumulus/pallets/xcmp-queue/src/tests.rs | 22 ++++-- .../procedural/src/pallet/expand/warnings.rs | 8 +- .../frame/remote-externalities/src/lib.rs | 79 +++++++++++++++++++ 4 files changed, 98 insertions(+), 15 deletions(-) diff --git a/cumulus/pallets/xcmp-queue/src/bridging.rs b/cumulus/pallets/xcmp-queue/src/bridging.rs index 0fc3f1f39ea3..53238fe2bf7a 100644 --- a/cumulus/pallets/xcmp-queue/src/bridging.rs +++ b/cumulus/pallets/xcmp-queue/src/bridging.rs @@ -55,7 +55,9 @@ impl, Runtime: crate::Config> let sibling_bridge_hub_id: ParaId = SiblingBridgeHubParaId::get(); // let's find the channel's state with the sibling parachain, - let Some((outbound_state, queued_pages)) = pallet::Pallet::::outbound_channel_state(sibling_bridge_hub_id) else { + let Some((outbound_state, queued_pages)) = + pallet::Pallet::::outbound_channel_state(sibling_bridge_hub_id) + else { return false }; // suspended channel => it is congested diff --git a/cumulus/pallets/xcmp-queue/src/tests.rs b/cumulus/pallets/xcmp-queue/src/tests.rs index cf6d947609d2..bab7e92ca2de 100644 --- a/cumulus/pallets/xcmp-queue/src/tests.rs +++ b/cumulus/pallets/xcmp-queue/src/tests.rs @@ -410,9 +410,11 @@ fn verify_fee_factor_increase_and_decrease() { assert_eq!(DeliveryFeeFactor::::get(sibling_para_id), initial); // Sending the message right now is cheap - let (_, delivery_fees) = validate_send::(destination, xcm.clone()) - .expect("message can be sent; qed"); - let Fungible(delivery_fee_amount) = delivery_fees.inner()[0].fun else { unreachable!("asset is fungible; qed"); }; + let (_, delivery_fees) = + validate_send::(destination, xcm.clone()).expect("message can be sent; qed"); + let Fungible(delivery_fee_amount) = delivery_fees.inner()[0].fun else { + unreachable!("asset is fungible; qed"); + }; assert_eq!(delivery_fee_amount, 402_000_000); let smaller_xcm = Xcm(vec![ClearOrigin; 30]); @@ -422,19 +424,23 @@ fn verify_fee_factor_increase_and_decrease() { assert_ok!(send_xcm::(destination, xcm.clone())); // Size 520 assert_eq!(DeliveryFeeFactor::::get(sibling_para_id), FixedU128::from_float(1.05)); - for _ in 0..12 { // We finish at size 929 + for _ in 0..12 { + // We finish at size 929 assert_ok!(send_xcm::(destination, smaller_xcm.clone())); } assert!(DeliveryFeeFactor::::get(sibling_para_id) > FixedU128::from_float(1.88)); // Sending the message right now is expensive - let (_, delivery_fees) = validate_send::(destination, xcm.clone()) - .expect("message can be sent; qed"); - let Fungible(delivery_fee_amount) = delivery_fees.inner()[0].fun else { unreachable!("asset is fungible; qed"); }; + let (_, delivery_fees) = + validate_send::(destination, xcm.clone()).expect("message can be sent; qed"); + let Fungible(delivery_fee_amount) = delivery_fees.inner()[0].fun else { + unreachable!("asset is fungible; qed"); + }; assert_eq!(delivery_fee_amount, 758_030_955); // Fee factor only decreases in `take_outbound_messages` - for _ in 0..5 { // We take 5 100 byte pages + for _ in 0..5 { + // We take 5 100 byte pages XcmpQueue::take_outbound_messages(1); } assert!(DeliveryFeeFactor::::get(sibling_para_id) < FixedU128::from_float(1.72)); diff --git a/substrate/frame/support/procedural/src/pallet/expand/warnings.rs b/substrate/frame/support/procedural/src/pallet/expand/warnings.rs index 6ce2097c2684..030e3ddaf323 100644 --- a/substrate/frame/support/procedural/src/pallet/expand/warnings.rs +++ b/substrate/frame/support/procedural/src/pallet/expand/warnings.rs @@ -33,9 +33,7 @@ pub(crate) fn weight_witness_warning( if dev_mode { return } - let CallWeightDef::Immediate(w) = &method.weight else { - return - }; + let CallWeightDef::Immediate(w) = &method.weight else { return }; let partial_warning = Warning::new_deprecated("UncheckedWeightWitness") .old("not check weight witness data") @@ -66,9 +64,7 @@ pub(crate) fn weight_constant_warning( if dev_mode { return } - let syn::Expr::Lit(lit) = weight else { - return - }; + let syn::Expr::Lit(lit) = weight else { return }; let warning = Warning::new_deprecated("ConstantWeight") .index(warnings.len()) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 71e9320ebeeb..817a4b09f8d0 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -439,6 +439,85 @@ where Ok(keys) } + /// Get keys at `prefix` in `block` in parallel manner. + async fn rpc_get_keys_parallel( + &self, + prefix: StorageKey, + block: B::Hash, + parallel: u16, + ) -> Result, &'static str> { + const MAX_PARALLEL: u16 = 16 ** 3; + const POW_OF_SIXTEEN: [u16; 3] = [1, 16, 256]; + + // round to power of 16, up to MAX_PARALLEL + fn round(n: u16) -> (u16, usize) { + if n <= 1 { + return (1, 0) + } else if n <= 16 { + return (16, 1) + } + + let mut pow: u16 = 16; + let mut exp: usize = 1; + + while pow < n { + if pow == MAX_PARALLEL { + break; + } + + pow = pow.saturating_mul(16); + exp += 1; + } + + debug_assert!(pow <= MAX_PARALLEL); + + // FIXME eagr: lack of a better idea for threshold + if n * 4 <= pow { + (pow / 16, exp - 1) + } else { + (pow, exp) + } + } + + fn extension(n: u16, len: usize) -> Vec { + let mut ext = vec![0; len]; + for i in 0..len { + ext[i] = (n / POW_OF_SIXTEEN[i] % 16) as u8; + } + ext + } + + let (parallel, len) = round(parallel); + + let batch = (0..parallel).into_iter().map(|i| { + let mut prefix = prefix.as_ref().to_vec(); + prefix.extend(extension(i, len)); + let prefix = StorageKey(prefix); + + self.rpc_get_keys_paged(prefix, block) + }); + + let keys = futures::future::join_all(batch) + .await + .into_iter() + .filter_map(|res| match res { + Ok(keys) => Some(keys), + Err(err) => { + log::warn!( + target: LOG_TARGET, + "{} when fetching keys at block {:?}", + err, + block, + ); + None + }, + }) + .flatten() + .collect::>(); + + Ok(keys) + } + /// Fetches storage data from a node using a dynamic batch size. /// /// This function adjusts the batch size on the fly to help prevent overwhelming the node with From d39f72eb369c76858a1be609fecabcb99b8e9046 Mon Sep 17 00:00:00 2001 From: Julian Eager Date: Mon, 23 Oct 2023 18:44:48 +0800 Subject: [PATCH 02/15] glitch --- substrate/utils/frame/remote-externalities/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 817a4b09f8d0..6786ca55fc15 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -446,7 +446,7 @@ where block: B::Hash, parallel: u16, ) -> Result, &'static str> { - const MAX_PARALLEL: u16 = 16 ** 3; + const MAX_PARALLEL: u16 = 4096; const POW_OF_SIXTEEN: [u16; 3] = [1, 16, 256]; // round to power of 16, up to MAX_PARALLEL @@ -462,7 +462,7 @@ where while pow < n { if pow == MAX_PARALLEL { - break; + break } pow = pow.saturating_mul(16); From 16e0e0d1e0e1bbe9f0fe46d4be47092a4d748123 Mon Sep 17 00:00:00 2001 From: Julian Eager Date: Wed, 25 Oct 2023 20:26:27 +0800 Subject: [PATCH 03/15] split fn --- .../frame/remote-externalities/src/lib.rs | 140 +++++++++++------- 1 file changed, 90 insertions(+), 50 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 6786ca55fc15..b26b51dbba13 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -446,56 +446,8 @@ where block: B::Hash, parallel: u16, ) -> Result, &'static str> { - const MAX_PARALLEL: u16 = 4096; - const POW_OF_SIXTEEN: [u16; 3] = [1, 16, 256]; - - // round to power of 16, up to MAX_PARALLEL - fn round(n: u16) -> (u16, usize) { - if n <= 1 { - return (1, 0) - } else if n <= 16 { - return (16, 1) - } - - let mut pow: u16 = 16; - let mut exp: usize = 1; - - while pow < n { - if pow == MAX_PARALLEL { - break - } - - pow = pow.saturating_mul(16); - exp += 1; - } - - debug_assert!(pow <= MAX_PARALLEL); - - // FIXME eagr: lack of a better idea for threshold - if n * 4 <= pow { - (pow / 16, exp - 1) - } else { - (pow, exp) - } - } - - fn extension(n: u16, len: usize) -> Vec { - let mut ext = vec![0; len]; - for i in 0..len { - ext[i] = (n / POW_OF_SIXTEEN[i] % 16) as u8; - } - ext - } - - let (parallel, len) = round(parallel); - - let batch = (0..parallel).into_iter().map(|i| { - let mut prefix = prefix.as_ref().to_vec(); - prefix.extend(extension(i, len)); - let prefix = StorageKey(prefix); - - self.rpc_get_keys_paged(prefix, block) - }); + let prefixes = extend_prefix(&prefix, parallel); + let batch = prefixes.into_iter().map(|prefix| self.rpc_get_keys_paged(prefix, block)); let keys = futures::future::join_all(batch) .await @@ -838,6 +790,64 @@ where } } +// Create a batch of storage key prefixes each starting with `prefix`, meant to be used for key +// scraping. Given the prefix 00, the return can be 000-00F or 0000-00FF, depending on `size`. +// `size` will be rounded to power of 16 if not already, so is the returned batch size. +fn extend_prefix(prefix: &StorageKey, size: u16) -> Vec { + const MAX_EXT_LEN: usize = 3; + const MAX_BATCH_SIZE: u16 = 16u16.pow(MAX_EXT_LEN as u32); + const POW_OF_SIXTEEN: [u16; MAX_EXT_LEN] = [1, 16, 256]; + + // round to power of 16 + // up to MAX_BATCH_SIZE + fn round(n: u16) -> (u16, usize) { + if n <= 1 { + return (1, 0) + } else if n <= 16 { + return (16, 1) + } + + let mut pow: u16 = 16; + let mut exp: usize = 1; + + while pow < n { + if pow == MAX_BATCH_SIZE { + break + } + + pow = pow.saturating_mul(16); + exp += 1; + } + + debug_assert!(pow <= MAX_BATCH_SIZE); + debug_assert!(exp <= MAX_EXT_LEN); + + // round down if below threshold + if n * 4 <= pow { + (pow / 16, exp - 1) + } else { + (pow, exp) + } + } + + let (size, len) = round(size); + let mut ext = vec![0; len]; + + (0..size) + .map(|idx| { + // 0-f | 00-ff | 000-fff + // relatively static, use OnceCell if turned out to be hot + for i in 0..len { + ext[len - i - 1] = (idx / POW_OF_SIXTEEN[i] % 16) as u8; + } + + let mut prefix = prefix.as_ref().to_vec(); + prefix.extend(&ext); + StorageKey(prefix) + }) + .collect() +} + impl Builder where B::Hash: DeserializeOwned, @@ -1519,4 +1529,34 @@ mod remote_tests { .unwrap() .execute_with(|| {}); } + + #[test] + fn prefixes_for_scraping_keys() { + let prefix = StorageKey(vec![0, 0]); + + assert_eq!(extend_prefix(&prefix, 0), vec![StorageKey(vec![0, 0])]); + assert_eq!(extend_prefix(&prefix, 1), vec![StorageKey(vec![0, 0])]); + assert_eq!(extend_prefix(&prefix, 16), (0..16).map(|i| StorageKey(vec![0, 0, i])).collect::>()); + + let prefixes = extend_prefix(&prefix, 256); + assert_eq!(prefixes, (0..256u32).map(|i| StorageKey(vec![0, 0, (i / 16 % 16) as u8, (i % 16) as u8])).collect::>()); + assert_eq!(prefixes[0], StorageKey(vec![0, 0, 0, 0])); + assert_eq!(prefixes[1], StorageKey(vec![0, 0, 0, 1])); + assert_eq!(prefixes[15], StorageKey(vec![0, 0, 0, 15])); + assert_eq!(prefixes[16], StorageKey(vec![0, 0, 1, 0])); + assert_eq!(prefixes[254], StorageKey(vec![0, 0, 15, 14])); + assert_eq!(prefixes[255], StorageKey(vec![0, 0, 15, 15])); + + let prefixes = extend_prefix(&prefix, 4096); + assert_eq!(prefixes, (0..4096u32).map(|i| StorageKey(vec![0, 0, (i / 256 % 16) as u8, (i / 16 % 16) as u8, (i % 16) as u8])).collect::>()); + assert_eq!(prefixes[0], StorageKey(vec![0, 0, 0, 0, 0])); + assert_eq!(prefixes[1], StorageKey(vec![0, 0, 0, 0, 1])); + assert_eq!(prefixes[4094], StorageKey(vec![0, 0, 15, 15, 14])); + assert_eq!(prefixes[4095], StorageKey(vec![0, 0, 15, 15, 15])); + + // rounding + assert_eq!(extend_prefix(&prefix, 2), extend_prefix(&prefix, 16)); + assert_eq!(extend_prefix(&prefix, 65), extend_prefix(&prefix, 256)); + assert_eq!(extend_prefix(&prefix, 1025), extend_prefix(&prefix, 4096)); + } } From 48701a33e3eadefa8c7bf1f5861a133618fcc013 Mon Sep 17 00:00:00 2001 From: Julian Eager Date: Wed, 25 Oct 2023 23:12:54 +0800 Subject: [PATCH 04/15] get parallel --- substrate/utils/frame/remote-externalities/src/lib.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index b26b51dbba13..983f7e3fdc67 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -621,16 +621,17 @@ where /// map them to values one by one. /// /// This can work with public nodes. But, expect it to be darn slow. - pub(crate) async fn rpc_get_pairs_paged( + pub(crate) async fn rpc_get_pairs( &self, prefix: StorageKey, at: B::Hash, pending_ext: &mut TestExternalities>, + parallel: u16, ) -> Result, &'static str> { let start = Instant::now(); let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into()); let keys = self - .rpc_get_keys_paged(prefix.clone(), at) + .rpc_get_keys_parallel(prefix.clone(), at, parallel) .await? .into_iter() .collect::>(); @@ -935,7 +936,7 @@ where for prefix in &config.hashed_prefixes { let now = std::time::Instant::now(); let additional_key_values = - self.rpc_get_pairs_paged(StorageKey(prefix.to_vec()), at, pending_ext).await?; + self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext, 16).await?; let elapsed = now.elapsed(); log::info!( target: LOG_TARGET, From ea33098615a6bc1e7d8a643acd0e86f29118695f Mon Sep 17 00:00:00 2001 From: Julian Eager Date: Fri, 27 Oct 2023 19:23:35 +0800 Subject: [PATCH 05/15] rewrite rpc_get_keys_parallel() --- .../frame/remote-externalities/src/lib.rs | 178 ++++++++---------- 1 file changed, 79 insertions(+), 99 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 983f7e3fdc67..17c839f0061e 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -439,34 +439,37 @@ where Ok(keys) } - /// Get keys at `prefix` in `block` in parallel manner. + /// Get keys with `prefix` at `block` in a parallel manner, with `parallel` is saturated at 256. + /// If `parallel` is set to 1, it is the same as calling [`rpc_get_keys_paged()`]. async fn rpc_get_keys_parallel( &self, prefix: StorageKey, block: B::Hash, parallel: u16, ) -> Result, &'static str> { - let prefixes = extend_prefix(&prefix, parallel); - let batch = prefixes.into_iter().map(|prefix| self.rpc_get_keys_paged(prefix, block)); - - let keys = futures::future::join_all(batch) - .await - .into_iter() - .filter_map(|res| match res { - Ok(keys) => Some(keys), - Err(err) => { - log::warn!( - target: LOG_TARGET, - "{} when fetching keys at block {:?}", - err, - block, - ); - None - }, - }) - .flatten() - .collect::>(); + let mut keys: Vec = Vec::with_capacity(4096); + for chunk in prefix_chunks(&prefix, parallel) { + let batch = chunk.into_iter().map(|prefix| self.rpc_get_keys_paged(prefix, block)); + let partial = futures::future::join_all(batch) + .await + .into_iter() + .filter_map(|res| match res { + Ok(keys) => Some(keys), + Err(err) => { + log::warn!( + target: LOG_TARGET, + "{} when fetching keys at block {:?}", + err, + block, + ); + None + }, + }) + .flatten() + .collect::>(); + keys.extend(partial) + } Ok(keys) } @@ -791,62 +794,42 @@ where } } -// Create a batch of storage key prefixes each starting with `prefix`, meant to be used for key -// scraping. Given the prefix 00, the return can be 000-00F or 0000-00FF, depending on `size`. -// `size` will be rounded to power of 16 if not already, so is the returned batch size. -fn extend_prefix(prefix: &StorageKey, size: u16) -> Vec { - const MAX_EXT_LEN: usize = 3; - const MAX_BATCH_SIZE: u16 = 16u16.pow(MAX_EXT_LEN as u32); - const POW_OF_SIXTEEN: [u16; MAX_EXT_LEN] = [1, 16, 256]; - - // round to power of 16 - // up to MAX_BATCH_SIZE - fn round(n: u16) -> (u16, usize) { - if n <= 1 { - return (1, 0) - } else if n <= 16 { - return (16, 1) - } - - let mut pow: u16 = 16; - let mut exp: usize = 1; - - while pow < n { - if pow == MAX_BATCH_SIZE { - break +/// Meant to be used for fetching storage keys, by dividing the workload into chunks to be executed +/// in parallel. +fn prefix_chunks(prefix: &StorageKey, parallel: u16) -> Vec> { + const MAX_PARALLEL: usize = 256; + const FACTORS: [usize; 7] = [2, 4, 8, 16, 32, 64, 128]; + + // round up to a factor of MAX_PARALLEL + fn round(n: usize) -> usize { + for factor in FACTORS { + if factor >= n { + return factor } - - pow = pow.saturating_mul(16); - exp += 1; - } - - debug_assert!(pow <= MAX_BATCH_SIZE); - debug_assert!(exp <= MAX_EXT_LEN); - - // round down if below threshold - if n * 4 <= pow { - (pow / 16, exp - 1) - } else { - (pow, exp) } + MAX_PARALLEL } - let (size, len) = round(size); - let mut ext = vec![0; len]; - - (0..size) - .map(|idx| { - // 0-f | 00-ff | 000-fff - // relatively static, use OnceCell if turned out to be hot - for i in 0..len { - ext[len - i - 1] = (idx / POW_OF_SIXTEEN[i] % 16) as u8; - } + if parallel == 1 { + return vec![vec![StorageKey(prefix.as_ref().to_vec())]] + } - let mut prefix = prefix.as_ref().to_vec(); - prefix.extend(&ext); - StorageKey(prefix) + let window = round(parallel as usize); + debug_assert!(MAX_PARALLEL / window * window == MAX_PARALLEL); + + let ext: [u8; MAX_PARALLEL] = core::array::from_fn(|i| i as u8); + ext.chunks(window) + .map(|chunk| { + chunk + .into_iter() + .map(|i| { + let mut prefix = prefix.as_ref().to_vec(); + prefix.push(*i); + StorageKey(prefix) + }) + .collect::>() }) - .collect() + .collect::>>() } impl Builder @@ -1531,33 +1514,30 @@ mod remote_tests { .execute_with(|| {}); } - #[test] - fn prefixes_for_scraping_keys() { - let prefix = StorageKey(vec![0, 0]); - - assert_eq!(extend_prefix(&prefix, 0), vec![StorageKey(vec![0, 0])]); - assert_eq!(extend_prefix(&prefix, 1), vec![StorageKey(vec![0, 0])]); - assert_eq!(extend_prefix(&prefix, 16), (0..16).map(|i| StorageKey(vec![0, 0, i])).collect::>()); - - let prefixes = extend_prefix(&prefix, 256); - assert_eq!(prefixes, (0..256u32).map(|i| StorageKey(vec![0, 0, (i / 16 % 16) as u8, (i % 16) as u8])).collect::>()); - assert_eq!(prefixes[0], StorageKey(vec![0, 0, 0, 0])); - assert_eq!(prefixes[1], StorageKey(vec![0, 0, 0, 1])); - assert_eq!(prefixes[15], StorageKey(vec![0, 0, 0, 15])); - assert_eq!(prefixes[16], StorageKey(vec![0, 0, 1, 0])); - assert_eq!(prefixes[254], StorageKey(vec![0, 0, 15, 14])); - assert_eq!(prefixes[255], StorageKey(vec![0, 0, 15, 15])); - - let prefixes = extend_prefix(&prefix, 4096); - assert_eq!(prefixes, (0..4096u32).map(|i| StorageKey(vec![0, 0, (i / 256 % 16) as u8, (i / 16 % 16) as u8, (i % 16) as u8])).collect::>()); - assert_eq!(prefixes[0], StorageKey(vec![0, 0, 0, 0, 0])); - assert_eq!(prefixes[1], StorageKey(vec![0, 0, 0, 0, 1])); - assert_eq!(prefixes[4094], StorageKey(vec![0, 0, 15, 15, 14])); - assert_eq!(prefixes[4095], StorageKey(vec![0, 0, 15, 15, 15])); - - // rounding - assert_eq!(extend_prefix(&prefix, 2), extend_prefix(&prefix, 16)); - assert_eq!(extend_prefix(&prefix, 65), extend_prefix(&prefix, 256)); - assert_eq!(extend_prefix(&prefix, 1025), extend_prefix(&prefix, 4096)); + #[tokio::test] + async fn can_fetch_parallel() { + init_logger(); + + let uri = String::from("wss://kusama-bridge-hub-rpc.polkadot.io:443"); + let mut builder = Builder::::new() + .mode(Mode::Online(OnlineConfig { transport: uri.into(), ..Default::default() })); + builder.init_remote_client().await.unwrap(); + + let at = builder.as_online().at.unwrap(); + + let prefix = StorageKey(vec![13]); + let para_1 = builder.rpc_get_keys_parallel(prefix.clone(), at, 1).await.unwrap_or(vec![]); + let para_16 = builder.rpc_get_keys_parallel(prefix.clone(), at, 16).await.unwrap_or(vec![]); + let paged = builder.rpc_get_keys_paged(prefix, at).await.unwrap(); + assert_eq!(para_1, paged); + assert_eq!(para_16, paged); + + // scrape all + let prefix = StorageKey(vec![]); + let para_1 = builder.rpc_get_keys_parallel(prefix.clone(), at, 1).await.unwrap_or(vec![]); + let para_16 = builder.rpc_get_keys_parallel(prefix.clone(), at, 16).await.unwrap_or(vec![]); + let paged = builder.rpc_get_keys_paged(prefix, at).await.unwrap(); + assert_eq!(para_1, paged); + assert_eq!(para_16, paged); } } From 4be72d83cafcaf3555f4b97fef8ccbea752cabe5 Mon Sep 17 00:00:00 2001 From: Julian Eager Date: Wed, 8 Nov 2023 02:18:07 +0800 Subject: [PATCH 06/15] parallel with start key instead of prefix --- .../frame/remote-externalities/src/lib.rs | 168 +++++++++++------- 1 file changed, 106 insertions(+), 62 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 17c839f0061e..d58a0ffda396 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -400,6 +400,7 @@ where }) } + /// FIXME eagr: dead code /// Get all the keys at `prefix` at `hash` using the paged, safe RPC methods. async fn rpc_get_keys_paged( &self, @@ -447,29 +448,102 @@ where block: B::Hash, parallel: u16, ) -> Result, &'static str> { - let mut keys: Vec = Vec::with_capacity(4096); - for chunk in prefix_chunks(&prefix, parallel) { - let batch = chunk.into_iter().map(|prefix| self.rpc_get_keys_paged(prefix, block)); - let partial = futures::future::join_all(batch) - .await - .into_iter() - .filter_map(|res| match res { - Ok(keys) => Some(keys), - Err(err) => { - log::warn!( - target: LOG_TARGET, - "{} when fetching keys at block {:?}", - err, - block, - ); - None - }, + // guarantee to return a non-empty list + fn calculate_start_keys(prefix: &StorageKey, parallel: u16) -> Vec { + // FIXME eagr: no need to go above 256 right? coz it feels 256 is more than enough. + // I'm kinda cutting corners here. If parallel is restricted to [1, 256], then we don't + // need to involve bigint. + let parallel = parallel.max(1).min(256); + let prefix = prefix.as_ref().to_vec(); + + if prefix.len() >= 32 { + return vec![StorageKey(prefix.clone())] + } + + // assuming 256-bit hash + let step = 256 / parallel; + let ext_len = 31 - prefix.len(); + + (0..parallel) + .map(|i| { + let mut key = prefix.clone(); + key.push((i * step) as u8); + key.extend(vec![0; ext_len]); + StorageKey(key) }) - .flatten() - .collect::>(); + .collect() + } + + let start_keys = calculate_start_keys(&prefix, parallel); + let mut end_keys: Vec> = + start_keys[1..].iter().map(Some).collect(); + end_keys.push(None); + + let batch = start_keys.iter().zip(end_keys).map(|(start_key, end_key)| { + self.rpc_get_keys_in_range(&prefix, block, start_key.clone(), end_key.cloned()) + }); + + let keys = futures::future::join_all(batch) + .await + .into_iter() + .filter_map(|res| match res { + Ok(keys) => Some(keys), + Err(err) => { + log::warn!( + target: LOG_TARGET, + "{} when fetching keys at block {:?}", + err, + block, + ); + None + }, + }) + .flatten() + .collect::>(); + + Ok(keys) + } - keys.extend(partial) + /// Get all keys with `prefix` within the given range at `block`. + /// Use `None` for `end_key` if you want the range `start_key..`. + async fn rpc_get_keys_in_range( + &self, + prefix: &StorageKey, + block: B::Hash, + start_key: StorageKey, + end_key: Option, + ) -> Result, &'static str> { + let mut last_key: Option = Some(start_key); + let mut keys: Vec = vec![]; + + loop { + // This loop can hit the node with very rapid requests, occasionally causing it to + // error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry. + let retry_strategy = + FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES); + let get_page_closure = + || self.get_keys_single_page(Some(prefix.clone()), last_key.clone(), block); + let mut page = Retry::spawn(retry_strategy, get_page_closure).await?; + + last_key = page.last().cloned(); + + // avoid duplicated keys across workloads + if let (Some(last), Some(end)) = (&last_key, &end_key) { + if last >= end { + page.retain(|key| key < end); + } + } + + let page_len = page.len(); + keys.extend(page); + + // scraping out of range or no more matches, + // we are done either way + if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize { + break + } } + Ok(keys) } @@ -794,44 +868,6 @@ where } } -/// Meant to be used for fetching storage keys, by dividing the workload into chunks to be executed -/// in parallel. -fn prefix_chunks(prefix: &StorageKey, parallel: u16) -> Vec> { - const MAX_PARALLEL: usize = 256; - const FACTORS: [usize; 7] = [2, 4, 8, 16, 32, 64, 128]; - - // round up to a factor of MAX_PARALLEL - fn round(n: usize) -> usize { - for factor in FACTORS { - if factor >= n { - return factor - } - } - MAX_PARALLEL - } - - if parallel == 1 { - return vec![vec![StorageKey(prefix.as_ref().to_vec())]] - } - - let window = round(parallel as usize); - debug_assert!(MAX_PARALLEL / window * window == MAX_PARALLEL); - - let ext: [u8; MAX_PARALLEL] = core::array::from_fn(|i| i as u8); - ext.chunks(window) - .map(|chunk| { - chunk - .into_iter() - .map(|i| { - let mut prefix = prefix.as_ref().to_vec(); - prefix.push(*i); - StorageKey(prefix) - }) - .collect::>() - }) - .collect::>>() -} - impl Builder where B::Hash: DeserializeOwned, @@ -1526,18 +1562,26 @@ mod remote_tests { let at = builder.as_online().at.unwrap(); let prefix = StorageKey(vec![13]); - let para_1 = builder.rpc_get_keys_parallel(prefix.clone(), at, 1).await.unwrap_or(vec![]); - let para_16 = builder.rpc_get_keys_parallel(prefix.clone(), at, 16).await.unwrap_or(vec![]); + let para_1 = builder.rpc_get_keys_parallel(prefix.clone(), at, 1).await.unwrap(); + let para_2 = builder.rpc_get_keys_parallel(prefix.clone(), at, 2).await.unwrap(); + let para_3 = builder.rpc_get_keys_parallel(prefix.clone(), at, 3).await.unwrap(); + let para_16 = builder.rpc_get_keys_parallel(prefix.clone(), at, 16).await.unwrap(); let paged = builder.rpc_get_keys_paged(prefix, at).await.unwrap(); assert_eq!(para_1, paged); + assert_eq!(para_2, paged); + assert_eq!(para_3, paged); assert_eq!(para_16, paged); // scrape all let prefix = StorageKey(vec![]); - let para_1 = builder.rpc_get_keys_parallel(prefix.clone(), at, 1).await.unwrap_or(vec![]); - let para_16 = builder.rpc_get_keys_parallel(prefix.clone(), at, 16).await.unwrap_or(vec![]); + let para_1 = builder.rpc_get_keys_parallel(prefix.clone(), at, 1).await.unwrap(); + let para_2 = builder.rpc_get_keys_parallel(prefix.clone(), at, 2).await.unwrap(); + let para_3 = builder.rpc_get_keys_parallel(prefix.clone(), at, 3).await.unwrap(); + let para_16 = builder.rpc_get_keys_parallel(prefix.clone(), at, 16).await.unwrap(); let paged = builder.rpc_get_keys_paged(prefix, at).await.unwrap(); assert_eq!(para_1, paged); + assert_eq!(para_2, paged); + assert_eq!(para_3, paged); assert_eq!(para_16, paged); } } From 775b0376127076965ff0137cca67ac695dd8bad4 Mon Sep 17 00:00:00 2001 From: Julian Eager Date: Wed, 8 Nov 2023 12:05:14 +0800 Subject: [PATCH 07/15] avoid unnecessary clones --- .../frame/remote-externalities/src/lib.rs | 55 +++++++++++-------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index d58a0ffda396..4a0aa2f0a41d 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -440,47 +440,45 @@ where Ok(keys) } - /// Get keys with `prefix` at `block` in a parallel manner, with `parallel` is saturated at 256. - /// If `parallel` is set to 1, it is the same as calling [`rpc_get_keys_paged()`]. + /// Get keys with `prefix` at `block` in a parallel manner, with `parallel` saturated at 256. async fn rpc_get_keys_parallel( &self, prefix: StorageKey, block: B::Hash, parallel: u16, ) -> Result, &'static str> { - // guarantee to return a non-empty list - fn calculate_start_keys(prefix: &StorageKey, parallel: u16) -> Vec { + // should guarantee to return a non-empty list + fn gen_start_keys(prefix: &StorageKey, parallel: u16) -> Vec { // FIXME eagr: no need to go above 256 right? coz it feels 256 is more than enough. // I'm kinda cutting corners here. If parallel is restricted to [1, 256], then we don't // need to involve bigint. let parallel = parallel.max(1).min(256); let prefix = prefix.as_ref().to_vec(); + // assume 256-bit hash if prefix.len() >= 32 { return vec![StorageKey(prefix.clone())] } - // assuming 256-bit hash let step = 256 / parallel; - let ext_len = 31 - prefix.len(); + let ext = 31 - prefix.len(); (0..parallel) .map(|i| { let mut key = prefix.clone(); key.push((i * step) as u8); - key.extend(vec![0; ext_len]); + key.extend(vec![0; ext]); StorageKey(key) }) .collect() } - let start_keys = calculate_start_keys(&prefix, parallel); - let mut end_keys: Vec> = - start_keys[1..].iter().map(Some).collect(); + let start_keys = gen_start_keys(&prefix, parallel); + let mut end_keys: Vec> = start_keys[1..].iter().map(Some).collect(); end_keys.push(None); let batch = start_keys.iter().zip(end_keys).map(|(start_key, end_key)| { - self.rpc_get_keys_in_range(&prefix, block, start_key.clone(), end_key.cloned()) + self.rpc_get_keys_in_range(&prefix, block, start_key, end_key) }); let keys = futures::future::join_all(batch) @@ -510,10 +508,10 @@ where &self, prefix: &StorageKey, block: B::Hash, - start_key: StorageKey, - end_key: Option, + start_key: &StorageKey, + end_key: Option<&StorageKey>, ) -> Result, &'static str> { - let mut last_key: Option = Some(start_key); + let mut last_key: Option<&StorageKey> = Some(start_key); let mut keys: Vec = vec![]; loop { @@ -522,13 +520,11 @@ where let retry_strategy = FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES); let get_page_closure = - || self.get_keys_single_page(Some(prefix.clone()), last_key.clone(), block); + || self.get_keys_single_page(Some(prefix.clone()), last_key.cloned(), block); let mut page = Retry::spawn(retry_strategy, get_page_closure).await?; - last_key = page.last().cloned(); - // avoid duplicated keys across workloads - if let (Some(last), Some(end)) = (&last_key, &end_key) { + if let (Some(last), Some(end)) = (page.last(), end_key) { if last >= end { page.retain(|key| key < end); } @@ -536,12 +532,21 @@ where let page_len = page.len(); keys.extend(page); + last_key = keys.last(); // scraping out of range or no more matches, // we are done either way if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize { + log::debug!(target: LOG_TARGET, "last page received: {}", page_len); break } + + log::debug!( + target: LOG_TARGET, + "new total = {}, full page received: {}", + keys.len(), + HexDisplay::from(last_key.expect("full page received, cannot be None")) + ); } Ok(keys) @@ -955,7 +960,7 @@ where for prefix in &config.hashed_prefixes { let now = std::time::Instant::now(); let additional_key_values = - self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext, 16).await?; + self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext, 4).await?; let elapsed = now.elapsed(); log::info!( target: LOG_TARGET, @@ -1562,26 +1567,30 @@ mod remote_tests { let at = builder.as_online().at.unwrap(); let prefix = StorageKey(vec![13]); + let para_0 = builder.rpc_get_keys_parallel(prefix.clone(), at, 0).await.unwrap(); let para_1 = builder.rpc_get_keys_parallel(prefix.clone(), at, 1).await.unwrap(); let para_2 = builder.rpc_get_keys_parallel(prefix.clone(), at, 2).await.unwrap(); let para_3 = builder.rpc_get_keys_parallel(prefix.clone(), at, 3).await.unwrap(); - let para_16 = builder.rpc_get_keys_parallel(prefix.clone(), at, 16).await.unwrap(); + let para_32 = builder.rpc_get_keys_parallel(prefix.clone(), at, 32).await.unwrap(); let paged = builder.rpc_get_keys_paged(prefix, at).await.unwrap(); + assert_eq!(para_0, paged); assert_eq!(para_1, paged); assert_eq!(para_2, paged); assert_eq!(para_3, paged); - assert_eq!(para_16, paged); + assert_eq!(para_32, paged); // scrape all let prefix = StorageKey(vec![]); + let para_0 = builder.rpc_get_keys_parallel(prefix.clone(), at, 0).await.unwrap(); let para_1 = builder.rpc_get_keys_parallel(prefix.clone(), at, 1).await.unwrap(); let para_2 = builder.rpc_get_keys_parallel(prefix.clone(), at, 2).await.unwrap(); let para_3 = builder.rpc_get_keys_parallel(prefix.clone(), at, 3).await.unwrap(); - let para_16 = builder.rpc_get_keys_parallel(prefix.clone(), at, 16).await.unwrap(); + let para_32 = builder.rpc_get_keys_parallel(prefix.clone(), at, 32).await.unwrap(); let paged = builder.rpc_get_keys_paged(prefix, at).await.unwrap(); + assert_eq!(para_0, paged); assert_eq!(para_1, paged); assert_eq!(para_2, paged); assert_eq!(para_3, paged); - assert_eq!(para_16, paged); + assert_eq!(para_32, paged); } } From 9881284ec52bc239d3e6e0562b15d27efb93ae55 Mon Sep 17 00:00:00 2001 From: Julian Eager Date: Thu, 9 Nov 2023 21:34:59 +0800 Subject: [PATCH 08/15] make start key optional --- .../frame/remote-externalities/src/lib.rs | 94 ++++--------------- 1 file changed, 19 insertions(+), 75 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 4a0aa2f0a41d..6c3fb8594f2c 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -400,46 +400,6 @@ where }) } - /// FIXME eagr: dead code - /// Get all the keys at `prefix` at `hash` using the paged, safe RPC methods. - async fn rpc_get_keys_paged( - &self, - prefix: StorageKey, - at: B::Hash, - ) -> Result, &'static str> { - let mut last_key: Option = None; - let mut all_keys: Vec = vec![]; - let keys = loop { - // This loop can hit the node with very rapid requests, occasionally causing it to - // error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry. - let retry_strategy = - FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES); - let get_page_closure = - || self.get_keys_single_page(Some(prefix.clone()), last_key.clone(), at); - let page = Retry::spawn(retry_strategy, get_page_closure).await?; - let page_len = page.len(); - - all_keys.extend(page); - - if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize { - log::debug!(target: LOG_TARGET, "last page received: {}", page_len); - break all_keys - } else { - let new_last_key = - all_keys.last().expect("all_keys is populated; has .last(); qed"); - log::debug!( - target: LOG_TARGET, - "new total = {}, full page received: {}", - all_keys.len(), - HexDisplay::from(new_last_key) - ); - last_key = Some(new_last_key.clone()); - }; - }; - - Ok(keys) - } - /// Get keys with `prefix` at `block` in a parallel manner, with `parallel` saturated at 256. async fn rpc_get_keys_parallel( &self, @@ -449,15 +409,12 @@ where ) -> Result, &'static str> { // should guarantee to return a non-empty list fn gen_start_keys(prefix: &StorageKey, parallel: u16) -> Vec { - // FIXME eagr: no need to go above 256 right? coz it feels 256 is more than enough. - // I'm kinda cutting corners here. If parallel is restricted to [1, 256], then we don't - // need to involve bigint. let parallel = parallel.max(1).min(256); let prefix = prefix.as_ref().to_vec(); // assume 256-bit hash - if prefix.len() >= 32 { - return vec![StorageKey(prefix.clone())] + if prefix.len() > 31 { + return vec![StorageKey(prefix)] } let step = 256 / parallel; @@ -474,10 +431,11 @@ where } let start_keys = gen_start_keys(&prefix, parallel); - let mut end_keys: Vec> = start_keys[1..].iter().map(Some).collect(); + let start_keys: Vec> = start_keys.iter().map(Some).collect(); + let mut end_keys: Vec> = start_keys[1..].to_vec(); end_keys.push(None); - let batch = start_keys.iter().zip(end_keys).map(|(start_key, end_key)| { + let batch = start_keys.into_iter().zip(end_keys).map(|(start_key, end_key)| { self.rpc_get_keys_in_range(&prefix, block, start_key, end_key) }); @@ -503,15 +461,15 @@ where } /// Get all keys with `prefix` within the given range at `block`. - /// Use `None` for `end_key` if you want the range `start_key..`. + /// Both `start_key` and `end_key` are optional if you want an open-ended range. async fn rpc_get_keys_in_range( &self, prefix: &StorageKey, block: B::Hash, - start_key: &StorageKey, + start_key: Option<&StorageKey>, end_key: Option<&StorageKey>, ) -> Result, &'static str> { - let mut last_key: Option<&StorageKey> = Some(start_key); + let mut last_key: Option<&StorageKey> = start_key; let mut keys: Vec = vec![]; loop { @@ -1557,40 +1515,26 @@ mod remote_tests { #[tokio::test] async fn can_fetch_parallel() { + use std::time::Instant; + init_logger(); - let uri = String::from("wss://kusama-bridge-hub-rpc.polkadot.io:443"); + let uri = String::from("wss://polkadot-try-runtime-node.parity-chains.parity.io:443"); let mut builder = Builder::::new() .mode(Mode::Online(OnlineConfig { transport: uri.into(), ..Default::default() })); builder.init_remote_client().await.unwrap(); let at = builder.as_online().at.unwrap(); - let prefix = StorageKey(vec![13]); - let para_0 = builder.rpc_get_keys_parallel(prefix.clone(), at, 0).await.unwrap(); - let para_1 = builder.rpc_get_keys_parallel(prefix.clone(), at, 1).await.unwrap(); - let para_2 = builder.rpc_get_keys_parallel(prefix.clone(), at, 2).await.unwrap(); - let para_3 = builder.rpc_get_keys_parallel(prefix.clone(), at, 3).await.unwrap(); - let para_32 = builder.rpc_get_keys_parallel(prefix.clone(), at, 32).await.unwrap(); - let paged = builder.rpc_get_keys_paged(prefix, at).await.unwrap(); - assert_eq!(para_0, paged); - assert_eq!(para_1, paged); - assert_eq!(para_2, paged); - assert_eq!(para_3, paged); - assert_eq!(para_32, paged); - // scrape all let prefix = StorageKey(vec![]); - let para_0 = builder.rpc_get_keys_parallel(prefix.clone(), at, 0).await.unwrap(); - let para_1 = builder.rpc_get_keys_parallel(prefix.clone(), at, 1).await.unwrap(); - let para_2 = builder.rpc_get_keys_parallel(prefix.clone(), at, 2).await.unwrap(); - let para_3 = builder.rpc_get_keys_parallel(prefix.clone(), at, 3).await.unwrap(); - let para_32 = builder.rpc_get_keys_parallel(prefix.clone(), at, 32).await.unwrap(); - let paged = builder.rpc_get_keys_paged(prefix, at).await.unwrap(); - assert_eq!(para_0, paged); - assert_eq!(para_1, paged); - assert_eq!(para_2, paged); - assert_eq!(para_3, paged); - assert_eq!(para_32, paged); + // let start = Instant::now(); + // let para_2 = builder.rpc_get_keys_parallel(prefix.clone(), at, 2).await.unwrap(); + // log::error!("parallels elapsed: {:?}", start.elapsed()); + let start = Instant::now(); + let para_4 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4).await.unwrap(); + log::error!("parallels elapsed: {:?}", start.elapsed()); + assert_eq!(para_4[0], para_4[para_4.len() - 1]); + // assert_eq!(para_4, para_2); } } From 05ebb51a4ef78c9c924d137e4a68a2b710badf14 Mon Sep 17 00:00:00 2001 From: Julian Eager Date: Thu, 9 Nov 2023 21:46:27 +0800 Subject: [PATCH 09/15] thanks clippy --- .../frame/remote-externalities/src/lib.rs | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 6c3fb8594f2c..f04ce37cee5a 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -600,7 +600,7 @@ where "Batch request failed ({}/{} retries). Error: {}", retries, Self::MAX_RETRIES, - e.to_string() + e ); // after 2 subsequent failures something very wrong is happening. log a warning // and reset the batch size down to 1. @@ -700,9 +700,9 @@ where .unwrap() .progress_chars("=>-"), ); - let payloads_chunked = payloads.chunks((&payloads.len() / Self::PARALLEL_REQUESTS).max(1)); + let payloads_chunked = payloads.chunks((payloads.len() / Self::PARALLEL_REQUESTS).max(1)); let requests = payloads_chunked.map(|payload_chunk| { - Self::get_storage_data_dynamic_batch_size(&client, payload_chunk.to_vec(), &bar) + Self::get_storage_data_dynamic_batch_size(client, payload_chunk.to_vec(), &bar) }); // Execute the requests and move the Result outside. let storage_data_result: Result, _> = @@ -716,7 +716,7 @@ where }, }; bar.finish_with_message("✅ Downloaded key values"); - print!("\n"); + println!(); // Check if we got responses for all submitted requests. assert_eq!(keys.len(), storage_data.len()); @@ -850,8 +850,9 @@ where pending_ext: &mut TestExternalities>, ) -> Result { let child_roots = top_kv - .into_iter() - .filter_map(|(k, _)| is_default_child_storage_key(k.as_ref()).then(|| k.clone())) + .iter() + .filter(|(k, _)| is_default_child_storage_key(k.as_ref())) + .map(|(k, _)| k.clone()) .collect::>(); if child_roots.is_empty() { @@ -871,11 +872,10 @@ where let mut child_kv = vec![]; for prefixed_top_key in child_roots { let child_keys = - Self::rpc_child_get_keys(&client, &prefixed_top_key, StorageKey(vec![]), at) - .await?; + Self::rpc_child_get_keys(client, &prefixed_top_key, StorageKey(vec![]), at).await?; let child_kv_inner = - Self::rpc_child_get_storage_paged(&client, &prefixed_top_key, child_keys, at) + Self::rpc_child_get_storage_paged(client, &prefixed_top_key, child_keys, at) .await?; let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0); @@ -1182,7 +1182,7 @@ mod test_prelude { pub(crate) type Block = RawBlock>; pub(crate) fn init_logger() { - let _ = sp_tracing::try_init_simple(); + sp_tracing::try_init_simple(); } } From 82507461a99885b9343c5a5e6a9f4baf494f0831 Mon Sep 17 00:00:00 2001 From: Julian Eager Date: Fri, 10 Nov 2023 19:03:20 +0800 Subject: [PATCH 10/15] divide workload based on suffix size --- .../frame/remote-externalities/src/lib.rs | 92 ++++++++++++------- 1 file changed, 59 insertions(+), 33 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index f04ce37cee5a..72ede5a88f2e 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -47,6 +47,7 @@ use std::{ fs, ops::{Deref, DerefMut}, path::{Path, PathBuf}, + sync::Arc, time::{Duration, Instant}, }; use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi}; @@ -298,6 +299,7 @@ impl Default for SnapshotConfig { } /// Builder for remote-externalities. +#[derive(Clone)] pub struct Builder { /// Custom key-pairs to be injected into the final externalities. The *hashed* keys and values /// must be given. @@ -405,54 +407,79 @@ where &self, prefix: StorageKey, block: B::Hash, - parallel: u16, + parallel: usize, ) -> Result, &'static str> { // should guarantee to return a non-empty list - fn gen_start_keys(prefix: &StorageKey, parallel: u16) -> Vec { - let parallel = parallel.max(1).min(256); - let prefix = prefix.as_ref().to_vec(); - - // assume 256-bit hash - if prefix.len() > 31 { + fn gen_start_keys(prefix: &StorageKey) -> Vec { + let mut prefix = prefix.as_ref().to_vec(); + // length of suffix for a 256-bit hash + let scale = 32_usize.saturating_sub(prefix.len()); + + // no need to divide workload + if scale < 3 { + prefix.extend(vec![0; scale]); return vec![StorageKey(prefix)] } - let step = 256 / parallel; - let ext = 31 - prefix.len(); + // avoid tiny segments for small suffix space, which is counter-productive + let cutoff = 5; + // grow coefficient faster for larger scale + let coefficient = 2_usize.saturating_pow(scale as u32 / cutoff); + // maximized at 2048 when scraping whole blocks + let segments = coefficient * scale; - (0..parallel) + let step = 0x10000 / segments; + let ext = scale - 2; + + (0..segments) .map(|i| { let mut key = prefix.clone(); - key.push((i * step) as u8); + let start = i * step; + key.extend(vec![(start >> 8) as u8, (start & 0xff) as u8]); key.extend(vec![0; ext]); StorageKey(key) }) .collect() } - let start_keys = gen_start_keys(&prefix, parallel); + let start_keys = gen_start_keys(&prefix); let start_keys: Vec> = start_keys.iter().map(Some).collect(); let mut end_keys: Vec> = start_keys[1..].to_vec(); end_keys.push(None); - let batch = start_keys.into_iter().zip(end_keys).map(|(start_key, end_key)| { - self.rpc_get_keys_in_range(&prefix, block, start_key, end_key) - }); + let parallel = Arc::new(tokio::sync::Semaphore::new(parallel)); + let builder = Arc::new(self.clone()); + let mut handles = vec![]; + + for (start_key, end_key) in start_keys.into_iter().zip(end_keys) { + let permit = parallel + .clone() + .acquire_owned() + .await + .expect("semaphore is not closed until the end of loop"); + + let builder = builder.clone(); + let prefix = prefix.clone(); + let start_key = start_key.cloned(); + let end_key = end_key.cloned(); + + let handle = tokio::spawn(async move { + let res = builder + .rpc_get_keys_in_range(&prefix, block, start_key.as_ref(), end_key.as_ref()) + .await; + drop(permit); + res + }); - let keys = futures::future::join_all(batch) + handles.push(handle); + } + + let keys = futures::future::join_all(handles) .await .into_iter() .filter_map(|res| match res { - Ok(keys) => Some(keys), - Err(err) => { - log::warn!( - target: LOG_TARGET, - "{} when fetching keys at block {:?}", - err, - block, - ); - None - }, + Ok(Ok(keys)) => Some(keys), + _ => None, }) .flatten() .collect::>(); @@ -666,7 +693,7 @@ where prefix: StorageKey, at: B::Hash, pending_ext: &mut TestExternalities>, - parallel: u16, + parallel: usize, ) -> Result, &'static str> { let start = Instant::now(); let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into()); @@ -1528,13 +1555,12 @@ mod remote_tests { // scrape all let prefix = StorageKey(vec![]); - // let start = Instant::now(); - // let para_2 = builder.rpc_get_keys_parallel(prefix.clone(), at, 2).await.unwrap(); - // log::error!("parallels elapsed: {:?}", start.elapsed()); let start = Instant::now(); let para_4 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4).await.unwrap(); - log::error!("parallels elapsed: {:?}", start.elapsed()); - assert_eq!(para_4[0], para_4[para_4.len() - 1]); - // assert_eq!(para_4, para_2); + log::error!("parallels 4 elapsed: {:?}", start.elapsed()); + let start = Instant::now(); + let para_8 = builder.rpc_get_keys_parallel(prefix.clone(), at, 8).await.unwrap(); + log::error!("parallels 8 elapsed: {:?}", start.elapsed()); + assert_eq!(para_4.len(), para_8.len()); } } From 5043d040c5cc6b70a12d86d28b393d25000b8118 Mon Sep 17 00:00:00 2001 From: Julian Eager Date: Sat, 18 Nov 2023 17:26:03 +0800 Subject: [PATCH 11/15] bench different number of chunks --- .../frame/remote-externalities/src/lib.rs | 127 +++++++++++++++--- 1 file changed, 107 insertions(+), 20 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 72ede5a88f2e..7b9fe4d8ed9e 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -402,36 +402,43 @@ where }) } - /// Get keys with `prefix` at `block` in a parallel manner, with `parallel` saturated at 256. + /// Get keys with `prefix` at `block` in a parallel manner. async fn rpc_get_keys_parallel( &self, prefix: StorageKey, block: B::Hash, parallel: usize, + // FIXME eagr: remove it + cutoff: u32, ) -> Result, &'static str> { - // should guarantee to return a non-empty list - fn gen_start_keys(prefix: &StorageKey) -> Vec { + // Divide the workload and return the start key of each chunks. Guarantee to return a + // non-empty list. + // + // Assuming 256-bit hashes, given a prefix of length L, + // SCALE = 32 - L, in the sense of the scale of the problem. + // We don't want to divide the workload too fine, which is also counter-productive. + // For that purpose, we use CUTOFF to control how fine the chunks are. + // The number of resulted chunks is determined as (2 ^ (SCALE / CUTOFF)) * SCALE. + fn gen_start_keys(prefix: &StorageKey, cutoff: u32) -> Vec { let mut prefix = prefix.as_ref().to_vec(); - // length of suffix for a 256-bit hash - let scale = 32_usize.saturating_sub(prefix.len()); + let scale = 32usize.saturating_sub(prefix.len()); // no need to divide workload - if scale < 3 { + if scale < 8 { prefix.extend(vec![0; scale]); return vec![StorageKey(prefix)] } - // avoid tiny segments for small suffix space, which is counter-productive - let cutoff = 5; + // FIXME eagr: use literal after determining the best cutoff + // let cutoff = cutoff; // grow coefficient faster for larger scale - let coefficient = 2_usize.saturating_pow(scale as u32 / cutoff); - // maximized at 2048 when scraping whole blocks - let segments = coefficient * scale; + let coefficient = 2usize.saturating_pow(scale as u32 / cutoff); + let chunks = coefficient * scale; - let step = 0x10000 / segments; + let step = 0x10000 / chunks; let ext = scale - 2; - (0..segments) + (0..chunks) .map(|i| { let mut key = prefix.clone(); let start = i * step; @@ -442,11 +449,12 @@ where .collect() } - let start_keys = gen_start_keys(&prefix); + let start_keys = gen_start_keys(&prefix, cutoff); let start_keys: Vec> = start_keys.iter().map(Some).collect(); let mut end_keys: Vec> = start_keys[1..].to_vec(); end_keys.push(None); + // use a semaphore to limit max scraping tasks let parallel = Arc::new(tokio::sync::Semaphore::new(parallel)); let builder = Arc::new(self.clone()); let mut handles = vec![]; @@ -474,6 +482,8 @@ where handles.push(handle); } + parallel.close(); + let keys = futures::future::join_all(handles) .await .into_iter() @@ -698,7 +708,8 @@ where let start = Instant::now(); let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into()); let keys = self - .rpc_get_keys_parallel(prefix.clone(), at, parallel) + // FIXME eagr: remove cutoff + .rpc_get_keys_parallel(prefix.clone(), at, parallel, 8) .await? .into_iter() .collect::>(); @@ -1147,6 +1158,46 @@ where Ok(ext) } + + /// Get all the keys at `prefix` at `hash` using the paged, safe RPC methods. + #[cfg(test)] + async fn rpc_get_keys_paged( + &self, + prefix: StorageKey, + hash: B::Hash, + ) -> Result, &'static str> { + let mut last_key: Option = None; + let mut all_keys: Vec = vec![]; + let keys = loop { + // This loop can hit the node with very rapid requests, occasionally causing it to + // error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry. + let retry_strategy = + FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES); + let get_page_closure = + || self.get_keys_single_page(Some(prefix.clone()), last_key.clone(), hash); + let page = Retry::spawn(retry_strategy, get_page_closure).await?; + let page_len = page.len(); + + all_keys.extend(page); + + if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize { + log::debug!(target: LOG_TARGET, "last page received: {}", page_len); + break all_keys + } else { + let new_last_key = + all_keys.last().expect("all_keys is populated; has .last(); qed"); + log::debug!( + target: LOG_TARGET, + "new total = {}, full page received: {}", + all_keys.len(), + HexDisplay::from(new_last_key) + ); + last_key = Some(new_last_key.clone()); + }; + }; + + Ok(keys) + } } // Public methods @@ -1555,12 +1606,48 @@ mod remote_tests { // scrape all let prefix = StorageKey(vec![]); + // 16*32 chunks + let start = Instant::now(); + let p4c8 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 8).await.unwrap(); + log::error!("p4c8: {:?}", start.elapsed()); + // 8*32 chunks + let start = Instant::now(); + let p4c10 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 10).await.unwrap(); + log::error!("p4c10: {:?}", start.elapsed()); + // 4*32 chunks + let start = Instant::now(); + let p4c16 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 16).await.unwrap(); + log::error!("p4c16: {:?}", start.elapsed()); + // 2*32 chunks let start = Instant::now(); - let para_4 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4).await.unwrap(); - log::error!("parallels 4 elapsed: {:?}", start.elapsed()); + let p4c20 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 20).await.unwrap(); + log::error!("p4c20: {:?}", start.elapsed()); + // 1*32 chunks let start = Instant::now(); - let para_8 = builder.rpc_get_keys_parallel(prefix.clone(), at, 8).await.unwrap(); - log::error!("parallels 8 elapsed: {:?}", start.elapsed()); - assert_eq!(para_4.len(), para_8.len()); + let p4c33 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 33).await.unwrap(); + log::error!("p4c33: {:?}", start.elapsed()); + assert_eq!(p4c8.len(), p4c33.len()); + } + + #[tokio::test] + async fn can_fetch_parallel_correctly() { + init_logger(); + + let uri = String::from("wss://kusama-bridge-hub-rpc.polkadot.io:443"); + let mut builder = Builder::::new() + .mode(Mode::Online(OnlineConfig { transport: uri.into(), ..Default::default() })); + builder.init_remote_client().await.unwrap(); + + let at = builder.as_online().at.unwrap(); + + let prefix = StorageKey(vec![13]); + let paged = builder.rpc_get_keys_paged(prefix.clone(), at).await.unwrap(); + let para = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 16).await.unwrap(); + assert_eq!(paged, para); + + let prefix = StorageKey(vec![]); + let paged = builder.rpc_get_keys_paged(prefix.clone(), at).await.unwrap(); + let para = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 32).await.unwrap(); + assert_eq!(paged, para); } } From 64db2e5dcf39e6922b47432514fe8a17e8d07181 Mon Sep 17 00:00:00 2001 From: Liam Aharon Date: Mon, 20 Nov 2023 12:47:59 +0400 Subject: [PATCH 12/15] add rpc_get_keys_paged bench --- substrate/utils/frame/remote-externalities/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 7b9fe4d8ed9e..17d4df2db886 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -1606,6 +1606,10 @@ mod remote_tests { // scrape all let prefix = StorageKey(vec![]); + // original + let start = Instant::now(); + builder.rpc_get_keys_paged(prefix.clone(), at).await.unwrap(); + log::error!("rpc_get_keys_paged: {:?}", start.elapsed()); // 16*32 chunks let start = Instant::now(); let p4c8 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 8).await.unwrap(); From 605cd8d47ade3853b75888eafc70f3e704b534e6 Mon Sep 17 00:00:00 2001 From: Julian Eager Date: Tue, 21 Nov 2023 13:04:50 +0800 Subject: [PATCH 13/15] clean up --- .../frame/remote-externalities/src/lib.rs | 120 ++---------------- 1 file changed, 13 insertions(+), 107 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 17d4df2db886..69ac84371598 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -405,36 +405,24 @@ where /// Get keys with `prefix` at `block` in a parallel manner. async fn rpc_get_keys_parallel( &self, - prefix: StorageKey, + prefix: &StorageKey, block: B::Hash, parallel: usize, - // FIXME eagr: remove it - cutoff: u32, ) -> Result, &'static str> { // Divide the workload and return the start key of each chunks. Guarantee to return a // non-empty list. - // - // Assuming 256-bit hashes, given a prefix of length L, - // SCALE = 32 - L, in the sense of the scale of the problem. - // We don't want to divide the workload too fine, which is also counter-productive. - // For that purpose, we use CUTOFF to control how fine the chunks are. - // The number of resulted chunks is determined as (2 ^ (SCALE / CUTOFF)) * SCALE. - fn gen_start_keys(prefix: &StorageKey, cutoff: u32) -> Vec { + fn gen_start_keys(prefix: &StorageKey) -> Vec { let mut prefix = prefix.as_ref().to_vec(); let scale = 32usize.saturating_sub(prefix.len()); // no need to divide workload - if scale < 8 { + if scale < 9 { prefix.extend(vec![0; scale]); return vec![StorageKey(prefix)] } - // FIXME eagr: use literal after determining the best cutoff - // let cutoff = cutoff; - // grow coefficient faster for larger scale - let coefficient = 2usize.saturating_pow(scale as u32 / cutoff); - let chunks = coefficient * scale; - + // FIXME: figure out a better algo for dividing workload + let chunks = 16; let step = 0x10000 / chunks; let ext = scale - 2; @@ -449,7 +437,7 @@ where .collect() } - let start_keys = gen_start_keys(&prefix, cutoff); + let start_keys = gen_start_keys(&prefix); let start_keys: Vec> = start_keys.iter().map(Some).collect(); let mut end_keys: Vec> = start_keys[1..].to_vec(); end_keys.push(None); @@ -708,8 +696,7 @@ where let start = Instant::now(); let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into()); let keys = self - // FIXME eagr: remove cutoff - .rpc_get_keys_parallel(prefix.clone(), at, parallel, 8) + .rpc_get_keys_parallel(&prefix, at, parallel) .await? .into_iter() .collect::>(); @@ -955,6 +942,7 @@ where let mut keys_and_values = Vec::new(); for prefix in &config.hashed_prefixes { let now = std::time::Instant::now(); + // By observation, 4 parallel tasks almost always perform the best. let additional_key_values = self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext, 4).await?; let elapsed = now.elapsed(); @@ -1158,46 +1146,6 @@ where Ok(ext) } - - /// Get all the keys at `prefix` at `hash` using the paged, safe RPC methods. - #[cfg(test)] - async fn rpc_get_keys_paged( - &self, - prefix: StorageKey, - hash: B::Hash, - ) -> Result, &'static str> { - let mut last_key: Option = None; - let mut all_keys: Vec = vec![]; - let keys = loop { - // This loop can hit the node with very rapid requests, occasionally causing it to - // error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry. - let retry_strategy = - FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES); - let get_page_closure = - || self.get_keys_single_page(Some(prefix.clone()), last_key.clone(), hash); - let page = Retry::spawn(retry_strategy, get_page_closure).await?; - let page_len = page.len(); - - all_keys.extend(page); - - if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize { - log::debug!(target: LOG_TARGET, "last page received: {}", page_len); - break all_keys - } else { - let new_last_key = - all_keys.last().expect("all_keys is populated; has .last(); qed"); - log::debug!( - target: LOG_TARGET, - "new total = {}, full page received: {}", - all_keys.len(), - HexDisplay::from(new_last_key) - ); - last_key = Some(new_last_key.clone()); - }; - }; - - Ok(keys) - } } // Public methods @@ -1592,49 +1540,7 @@ mod remote_tests { } #[tokio::test] - async fn can_fetch_parallel() { - use std::time::Instant; - - init_logger(); - - let uri = String::from("wss://polkadot-try-runtime-node.parity-chains.parity.io:443"); - let mut builder = Builder::::new() - .mode(Mode::Online(OnlineConfig { transport: uri.into(), ..Default::default() })); - builder.init_remote_client().await.unwrap(); - - let at = builder.as_online().at.unwrap(); - - // scrape all - let prefix = StorageKey(vec![]); - // original - let start = Instant::now(); - builder.rpc_get_keys_paged(prefix.clone(), at).await.unwrap(); - log::error!("rpc_get_keys_paged: {:?}", start.elapsed()); - // 16*32 chunks - let start = Instant::now(); - let p4c8 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 8).await.unwrap(); - log::error!("p4c8: {:?}", start.elapsed()); - // 8*32 chunks - let start = Instant::now(); - let p4c10 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 10).await.unwrap(); - log::error!("p4c10: {:?}", start.elapsed()); - // 4*32 chunks - let start = Instant::now(); - let p4c16 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 16).await.unwrap(); - log::error!("p4c16: {:?}", start.elapsed()); - // 2*32 chunks - let start = Instant::now(); - let p4c20 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 20).await.unwrap(); - log::error!("p4c20: {:?}", start.elapsed()); - // 1*32 chunks - let start = Instant::now(); - let p4c33 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 33).await.unwrap(); - log::error!("p4c33: {:?}", start.elapsed()); - assert_eq!(p4c8.len(), p4c33.len()); - } - - #[tokio::test] - async fn can_fetch_parallel_correctly() { + async fn can_fetch_in_parallel() { init_logger(); let uri = String::from("wss://kusama-bridge-hub-rpc.polkadot.io:443"); @@ -1645,13 +1551,13 @@ mod remote_tests { let at = builder.as_online().at.unwrap(); let prefix = StorageKey(vec![13]); - let paged = builder.rpc_get_keys_paged(prefix.clone(), at).await.unwrap(); - let para = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 16).await.unwrap(); + let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap(); + let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap(); assert_eq!(paged, para); let prefix = StorageKey(vec![]); - let paged = builder.rpc_get_keys_paged(prefix.clone(), at).await.unwrap(); - let para = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 32).await.unwrap(); + let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap(); + let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap(); assert_eq!(paged, para); } } From 5d2392d6ddb3cffdce24b861bd8748e8e1290ed9 Mon Sep 17 00:00:00 2001 From: Julian Eager Date: Tue, 21 Nov 2023 16:10:25 +0800 Subject: [PATCH 14/15] as per advices --- substrate/utils/frame/remote-externalities/src/lib.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 69ac84371598..5e94bea45f58 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -409,8 +409,8 @@ where block: B::Hash, parallel: usize, ) -> Result, &'static str> { - // Divide the workload and return the start key of each chunks. Guarantee to return a - // non-empty list. + /// Divide the workload and return the start key of each chunks. Guaranteed to return a + /// non-empty list. fn gen_start_keys(prefix: &StorageKey) -> Vec { let mut prefix = prefix.as_ref().to_vec(); let scale = 32usize.saturating_sub(prefix.len()); @@ -691,12 +691,11 @@ where prefix: StorageKey, at: B::Hash, pending_ext: &mut TestExternalities>, - parallel: usize, ) -> Result, &'static str> { let start = Instant::now(); let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into()); let keys = self - .rpc_get_keys_parallel(&prefix, at, parallel) + .rpc_get_keys_parallel(&prefix, at, Self::PARALLEL_REQUESTS) .await? .into_iter() .collect::>(); @@ -942,9 +941,8 @@ where let mut keys_and_values = Vec::new(); for prefix in &config.hashed_prefixes { let now = std::time::Instant::now(); - // By observation, 4 parallel tasks almost always perform the best. let additional_key_values = - self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext, 4).await?; + self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext).await?; let elapsed = now.elapsed(); log::info!( target: LOG_TARGET, From 35307663e6191144d332d7519796519335e14412 Mon Sep 17 00:00:00 2001 From: Julian Eager Date: Mon, 27 Nov 2023 17:04:09 +0800 Subject: [PATCH 15/15] link followup --- substrate/utils/frame/remote-externalities/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 5e94bea45f58..5c7a36867ff6 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -421,7 +421,6 @@ where return vec![StorageKey(prefix)] } - // FIXME: figure out a better algo for dividing workload let chunks = 16; let step = 0x10000 / chunks; let ext = scale - 2; @@ -694,6 +693,8 @@ where ) -> Result, &'static str> { let start = Instant::now(); let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into()); + // TODO We could start downloading when having collected the first batch of keys + // https://github.com/paritytech/polkadot-sdk/issues/2494 let keys = self .rpc_get_keys_parallel(&prefix, at, Self::PARALLEL_REQUESTS) .await?