Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable parallel key scraping #1985

Merged
merged 21 commits into from
Nov 29, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
bench different number of chunks
eagr committed Nov 18, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 5043d040c5cc6b70a12d86d28b393d25000b8118
127 changes: 107 additions & 20 deletions substrate/utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
@@ -402,36 +402,43 @@ where
})
}

/// Get keys with `prefix` at `block` in a parallel manner, with `parallel` saturated at 256.
/// Get keys with `prefix` at `block` in a parallel manner.
async fn rpc_get_keys_parallel(
&self,
prefix: StorageKey,
block: B::Hash,
parallel: usize,
// FIXME eagr: remove it
cutoff: u32,
) -> Result<Vec<StorageKey>, &'static str> {
// should guarantee to return a non-empty list
fn gen_start_keys(prefix: &StorageKey) -> Vec<StorageKey> {
// Divide the workload and return the start key of each chunks. Guarantee to return a
// non-empty list.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Divide the workload and return the start key of each chunks. Guarantee to return a
// non-empty list.
/// Divide the workload and return the start key of each chunks. Guaranteed to return a
/// non-empty list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to also have some documentation explaining how the algorithm works in some detail.

//
// Assuming 256-bit hashes, given a prefix of length L,
// SCALE = 32 - L, in the sense of the scale of the problem.
// We don't want to divide the workload too fine, which is also counter-productive.
// For that purpose, we use CUTOFF to control how fine the chunks are.
// The number of resulted chunks is determined as (2 ^ (SCALE / CUTOFF)) * SCALE.
fn gen_start_keys(prefix: &StorageKey, cutoff: u32) -> Vec<StorageKey> {
let mut prefix = prefix.as_ref().to_vec();
// length of suffix for a 256-bit hash
let scale = 32_usize.saturating_sub(prefix.len());
let scale = 32usize.saturating_sub(prefix.len());

// no need to divide workload
if scale < 3 {
if scale < 8 {
prefix.extend(vec![0; scale]);
return vec![StorageKey(prefix)]
}

// avoid tiny segments for small suffix space, which is counter-productive
let cutoff = 5;
// FIXME eagr: use literal after determining the best cutoff
// let cutoff = cutoff;
// grow coefficient faster for larger scale
let coefficient = 2_usize.saturating_pow(scale as u32 / cutoff);
// maximized at 2048 when scraping whole blocks
let segments = coefficient * scale;
let coefficient = 2usize.saturating_pow(scale as u32 / cutoff);
let chunks = coefficient * scale;

let step = 0x10000 / segments;
let step = 0x10000 / chunks;
let ext = scale - 2;

(0..segments)
(0..chunks)
.map(|i| {
let mut key = prefix.clone();
let start = i * step;
@@ -442,11 +449,12 @@ where
.collect()
}

let start_keys = gen_start_keys(&prefix);
let start_keys = gen_start_keys(&prefix, cutoff);
let start_keys: Vec<Option<&StorageKey>> = start_keys.iter().map(Some).collect();
let mut end_keys: Vec<Option<&StorageKey>> = start_keys[1..].to_vec();
end_keys.push(None);

// use a semaphore to limit max scraping tasks
let parallel = Arc::new(tokio::sync::Semaphore::new(parallel));
let builder = Arc::new(self.clone());
let mut handles = vec![];
@@ -474,6 +482,8 @@ where
handles.push(handle);
}

parallel.close();

let keys = futures::future::join_all(handles)
.await
.into_iter()
@@ -698,7 +708,8 @@ where
let start = Instant::now();
let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into());
let keys = self
.rpc_get_keys_parallel(prefix.clone(), at, parallel)
// FIXME eagr: remove cutoff
.rpc_get_keys_parallel(prefix.clone(), at, parallel, 8)
.await?
.into_iter()
.collect::<Vec<_>>();
@@ -1147,6 +1158,46 @@ where

Ok(ext)
}

/// Get all the keys at `prefix` at `hash` using the paged, safe RPC methods.
#[cfg(test)]
async fn rpc_get_keys_paged(
&self,
prefix: StorageKey,
hash: B::Hash,
) -> Result<Vec<StorageKey>, &'static str> {
let mut last_key: Option<StorageKey> = None;
let mut all_keys: Vec<StorageKey> = vec![];
let keys = loop {
// This loop can hit the node with very rapid requests, occasionally causing it to
// error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry.
let retry_strategy =
FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
let get_page_closure =
|| self.get_keys_single_page(Some(prefix.clone()), last_key.clone(), hash);
let page = Retry::spawn(retry_strategy, get_page_closure).await?;
let page_len = page.len();

all_keys.extend(page);

if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
log::debug!(target: LOG_TARGET, "last page received: {}", page_len);
break all_keys
} else {
let new_last_key =
all_keys.last().expect("all_keys is populated; has .last(); qed");
log::debug!(
target: LOG_TARGET,
"new total = {}, full page received: {}",
all_keys.len(),
HexDisplay::from(new_last_key)
);
last_key = Some(new_last_key.clone());
};
};

Ok(keys)
}
}

// Public methods
@@ -1555,12 +1606,48 @@ mod remote_tests {

// scrape all
let prefix = StorageKey(vec![]);
// 16*32 chunks
let start = Instant::now();
let p4c8 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 8).await.unwrap();
log::error!("p4c8: {:?}", start.elapsed());
// 8*32 chunks
let start = Instant::now();
let p4c10 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 10).await.unwrap();
log::error!("p4c10: {:?}", start.elapsed());
// 4*32 chunks
let start = Instant::now();
let p4c16 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 16).await.unwrap();
log::error!("p4c16: {:?}", start.elapsed());
// 2*32 chunks
let start = Instant::now();
let para_4 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4).await.unwrap();
log::error!("parallels 4 elapsed: {:?}", start.elapsed());
let p4c20 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 20).await.unwrap();
log::error!("p4c20: {:?}", start.elapsed());
// 1*32 chunks
let start = Instant::now();
let para_8 = builder.rpc_get_keys_parallel(prefix.clone(), at, 8).await.unwrap();
log::error!("parallels 8 elapsed: {:?}", start.elapsed());
assert_eq!(para_4.len(), para_8.len());
let p4c33 = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 33).await.unwrap();
log::error!("p4c33: {:?}", start.elapsed());
assert_eq!(p4c8.len(), p4c33.len());
}

#[tokio::test]
async fn can_fetch_parallel_correctly() {
init_logger();

let uri = String::from("wss://kusama-bridge-hub-rpc.polkadot.io:443");
let mut builder = Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig { transport: uri.into(), ..Default::default() }));
builder.init_remote_client().await.unwrap();

let at = builder.as_online().at.unwrap();

let prefix = StorageKey(vec![13]);
let paged = builder.rpc_get_keys_paged(prefix.clone(), at).await.unwrap();
let para = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 16).await.unwrap();
assert_eq!(paged, para);

let prefix = StorageKey(vec![]);
let paged = builder.rpc_get_keys_paged(prefix.clone(), at).await.unwrap();
let para = builder.rpc_get_keys_parallel(prefix.clone(), at, 4, 32).await.unwrap();
assert_eq!(paged, para);
}
}