Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable parallel key scraping #1985

Merged
merged 21 commits into from
Nov 29, 2023
Merged
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 155 additions & 37 deletions substrate/utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use std::{
fs,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
};
use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi};
Expand Down Expand Up @@ -298,6 +299,7 @@ impl Default for SnapshotConfig {
}

/// Builder for remote-externalities.
#[derive(Clone)]
pub struct Builder<B: BlockT> {
/// Custom key-pairs to be injected into the final externalities. The *hashed* keys and values
/// must be given.
Expand Down Expand Up @@ -400,41 +402,135 @@ where
})
}

/// Get all the keys at `prefix` at `hash` using the paged, safe RPC methods.
async fn rpc_get_keys_paged(
/// Get keys with `prefix` at `block` in a parallel manner.
async fn rpc_get_keys_parallel(
&self,
prefix: StorageKey,
at: B::Hash,
prefix: &StorageKey,
block: B::Hash,
parallel: usize,
) -> Result<Vec<StorageKey>, &'static str> {
/// Divide the workload and return the start key of each chunks. Guaranteed to return a
/// non-empty list.
fn gen_start_keys(prefix: &StorageKey) -> Vec<StorageKey> {
let mut prefix = prefix.as_ref().to_vec();
let scale = 32usize.saturating_sub(prefix.len());

// no need to divide workload
if scale < 9 {
prefix.extend(vec![0; scale]);
return vec![StorageKey(prefix)]
}

// FIXME: figure out a better algo for dividing workload
liamaharon marked this conversation as resolved.
Show resolved Hide resolved
let chunks = 16;
let step = 0x10000 / chunks;
let ext = scale - 2;

(0..chunks)
liamaharon marked this conversation as resolved.
Show resolved Hide resolved
.map(|i| {
let mut key = prefix.clone();
let start = i * step;
key.extend(vec![(start >> 8) as u8, (start & 0xff) as u8]);
key.extend(vec![0; ext]);
StorageKey(key)
})
.collect()
}

let start_keys = gen_start_keys(&prefix);
let start_keys: Vec<Option<&StorageKey>> = start_keys.iter().map(Some).collect();
let mut end_keys: Vec<Option<&StorageKey>> = start_keys[1..].to_vec();
end_keys.push(None);

// use a semaphore to limit max scraping tasks
let parallel = Arc::new(tokio::sync::Semaphore::new(parallel));
let builder = Arc::new(self.clone());
let mut handles = vec![];

for (start_key, end_key) in start_keys.into_iter().zip(end_keys) {
let permit = parallel
.clone()
.acquire_owned()
.await
.expect("semaphore is not closed until the end of loop");

let builder = builder.clone();
let prefix = prefix.clone();
let start_key = start_key.cloned();
let end_key = end_key.cloned();

let handle = tokio::spawn(async move {
let res = builder
.rpc_get_keys_in_range(&prefix, block, start_key.as_ref(), end_key.as_ref())
.await;
drop(permit);
res
});

handles.push(handle);
}

parallel.close();

let keys = futures::future::join_all(handles)
.await
.into_iter()
.filter_map(|res| match res {
Ok(Ok(keys)) => Some(keys),
_ => None,
})
.flatten()
.collect::<Vec<StorageKey>>();

Ok(keys)
}

/// Get all keys with `prefix` within the given range at `block`.
/// Both `start_key` and `end_key` are optional if you want an open-ended range.
async fn rpc_get_keys_in_range(
&self,
prefix: &StorageKey,
block: B::Hash,
start_key: Option<&StorageKey>,
end_key: Option<&StorageKey>,
) -> Result<Vec<StorageKey>, &'static str> {
let mut last_key: Option<StorageKey> = None;
let mut all_keys: Vec<StorageKey> = vec![];
let keys = loop {
let mut last_key: Option<&StorageKey> = start_key;
let mut keys: Vec<StorageKey> = vec![];

loop {
// This loop can hit the node with very rapid requests, occasionally causing it to
// error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry.
let retry_strategy =
FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
let get_page_closure =
|| self.get_keys_single_page(Some(prefix.clone()), last_key.clone(), at);
let page = Retry::spawn(retry_strategy, get_page_closure).await?;
let page_len = page.len();
|| self.get_keys_single_page(Some(prefix.clone()), last_key.cloned(), block);
let mut page = Retry::spawn(retry_strategy, get_page_closure).await?;

all_keys.extend(page);
// avoid duplicated keys across workloads
if let (Some(last), Some(end)) = (page.last(), end_key) {
if last >= end {
page.retain(|key| key < end);
}
}
liamaharon marked this conversation as resolved.
Show resolved Hide resolved

let page_len = page.len();
keys.extend(page);
last_key = keys.last();

// scraping out of range or no more matches,
// we are done either way
if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
log::debug!(target: LOG_TARGET, "last page received: {}", page_len);
break all_keys
} else {
let new_last_key =
all_keys.last().expect("all_keys is populated; has .last(); qed");
log::debug!(
target: LOG_TARGET,
"new total = {}, full page received: {}",
all_keys.len(),
HexDisplay::from(new_last_key)
);
last_key = Some(new_last_key.clone());
};
};
break
}

log::debug!(
target: LOG_TARGET,
"new total = {}, full page received: {}",
keys.len(),
HexDisplay::from(last_key.expect("full page received, cannot be None"))
);
}

Ok(keys)
}
Expand Down Expand Up @@ -529,7 +625,7 @@ where
"Batch request failed ({}/{} retries). Error: {}",
retries,
Self::MAX_RETRIES,
e.to_string()
e
);
// after 2 subsequent failures something very wrong is happening. log a warning
// and reset the batch size down to 1.
Expand Down Expand Up @@ -590,7 +686,7 @@ where
/// map them to values one by one.
///
/// This can work with public nodes. But, expect it to be darn slow.
pub(crate) async fn rpc_get_pairs_paged(
pub(crate) async fn rpc_get_pairs(
&self,
prefix: StorageKey,
at: B::Hash,
Expand All @@ -599,7 +695,7 @@ where
let start = Instant::now();
let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into());
let keys = self
.rpc_get_keys_paged(prefix.clone(), at)
.rpc_get_keys_parallel(&prefix, at, Self::PARALLEL_REQUESTS)
.await?
.into_iter()
.collect::<Vec<_>>();
Expand Down Expand Up @@ -628,9 +724,9 @@ where
.unwrap()
.progress_chars("=>-"),
);
let payloads_chunked = payloads.chunks((&payloads.len() / Self::PARALLEL_REQUESTS).max(1));
let payloads_chunked = payloads.chunks((payloads.len() / Self::PARALLEL_REQUESTS).max(1));
let requests = payloads_chunked.map(|payload_chunk| {
Self::get_storage_data_dynamic_batch_size(&client, payload_chunk.to_vec(), &bar)
Self::get_storage_data_dynamic_batch_size(client, payload_chunk.to_vec(), &bar)
});
// Execute the requests and move the Result outside.
let storage_data_result: Result<Vec<_>, _> =
Expand All @@ -644,7 +740,7 @@ where
},
};
bar.finish_with_message("✅ Downloaded key values");
print!("\n");
println!();
liamaharon marked this conversation as resolved.
Show resolved Hide resolved

// Check if we got responses for all submitted requests.
assert_eq!(keys.len(), storage_data.len());
Expand Down Expand Up @@ -778,8 +874,9 @@ where
pending_ext: &mut TestExternalities<HashingFor<B>>,
) -> Result<ChildKeyValues, &'static str> {
let child_roots = top_kv
.into_iter()
.filter_map(|(k, _)| is_default_child_storage_key(k.as_ref()).then(|| k.clone()))
.iter()
.filter(|(k, _)| is_default_child_storage_key(k.as_ref()))
.map(|(k, _)| k.clone())
.collect::<Vec<_>>();

if child_roots.is_empty() {
Expand All @@ -799,11 +896,10 @@ where
let mut child_kv = vec![];
for prefixed_top_key in child_roots {
let child_keys =
Self::rpc_child_get_keys(&client, &prefixed_top_key, StorageKey(vec![]), at)
.await?;
Self::rpc_child_get_keys(client, &prefixed_top_key, StorageKey(vec![]), at).await?;

let child_kv_inner =
Self::rpc_child_get_storage_paged(&client, &prefixed_top_key, child_keys, at)
Self::rpc_child_get_storage_paged(client, &prefixed_top_key, child_keys, at)
.await?;

let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0);
Expand Down Expand Up @@ -846,7 +942,7 @@ where
for prefix in &config.hashed_prefixes {
let now = std::time::Instant::now();
let additional_key_values =
self.rpc_get_pairs_paged(StorageKey(prefix.to_vec()), at, pending_ext).await?;
self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext).await?;
let elapsed = now.elapsed();
log::info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1110,7 +1206,7 @@ mod test_prelude {
pub(crate) type Block = RawBlock<ExtrinsicWrapper<Hash>>;

pub(crate) fn init_logger() {
let _ = sp_tracing::try_init_simple();
sp_tracing::try_init_simple();
}
}

Expand Down Expand Up @@ -1440,4 +1536,26 @@ mod remote_tests {
.unwrap()
.execute_with(|| {});
}

#[tokio::test]
async fn can_fetch_in_parallel() {
init_logger();

let uri = String::from("wss://kusama-bridge-hub-rpc.polkadot.io:443");
ggwpez marked this conversation as resolved.
Show resolved Hide resolved
let mut builder = Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig { transport: uri.into(), ..Default::default() }));
builder.init_remote_client().await.unwrap();

let at = builder.as_online().at.unwrap();

let prefix = StorageKey(vec![13]);
let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap();
assert_eq!(paged, para);

let prefix = StorageKey(vec![]);
let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap();
assert_eq!(paged, para);
}
}