Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable parallel key scraping #1985

Merged
merged 21 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cumulus/pallets/xcmp-queue/src/bridging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ impl<SiblingBridgeHubParaId: Get<ParaId>, Runtime: crate::Config>
let sibling_bridge_hub_id: ParaId = SiblingBridgeHubParaId::get();

// let's find the channel's state with the sibling parachain,
let Some((outbound_state, queued_pages)) = pallet::Pallet::<Runtime>::outbound_channel_state(sibling_bridge_hub_id) else {
let Some((outbound_state, queued_pages)) =
pallet::Pallet::<Runtime>::outbound_channel_state(sibling_bridge_hub_id)
else {
return false
};
// suspended channel => it is congested
Expand Down
22 changes: 14 additions & 8 deletions cumulus/pallets/xcmp-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,11 @@ fn verify_fee_factor_increase_and_decrease() {
assert_eq!(DeliveryFeeFactor::<Test>::get(sibling_para_id), initial);

// Sending the message right now is cheap
let (_, delivery_fees) = validate_send::<XcmpQueue>(destination, xcm.clone())
.expect("message can be sent; qed");
let Fungible(delivery_fee_amount) = delivery_fees.inner()[0].fun else { unreachable!("asset is fungible; qed"); };
let (_, delivery_fees) =
validate_send::<XcmpQueue>(destination, xcm.clone()).expect("message can be sent; qed");
let Fungible(delivery_fee_amount) = delivery_fees.inner()[0].fun else {
unreachable!("asset is fungible; qed");
};
assert_eq!(delivery_fee_amount, 402_000_000);

let smaller_xcm = Xcm(vec![ClearOrigin; 30]);
Expand All @@ -422,19 +424,23 @@ fn verify_fee_factor_increase_and_decrease() {
assert_ok!(send_xcm::<XcmpQueue>(destination, xcm.clone())); // Size 520
assert_eq!(DeliveryFeeFactor::<Test>::get(sibling_para_id), FixedU128::from_float(1.05));

for _ in 0..12 { // We finish at size 929
for _ in 0..12 {
// We finish at size 929
assert_ok!(send_xcm::<XcmpQueue>(destination, smaller_xcm.clone()));
}
assert!(DeliveryFeeFactor::<Test>::get(sibling_para_id) > FixedU128::from_float(1.88));

// Sending the message right now is expensive
let (_, delivery_fees) = validate_send::<XcmpQueue>(destination, xcm.clone())
.expect("message can be sent; qed");
let Fungible(delivery_fee_amount) = delivery_fees.inner()[0].fun else { unreachable!("asset is fungible; qed"); };
let (_, delivery_fees) =
validate_send::<XcmpQueue>(destination, xcm.clone()).expect("message can be sent; qed");
let Fungible(delivery_fee_amount) = delivery_fees.inner()[0].fun else {
unreachable!("asset is fungible; qed");
};
assert_eq!(delivery_fee_amount, 758_030_955);

// Fee factor only decreases in `take_outbound_messages`
for _ in 0..5 { // We take 5 100 byte pages
for _ in 0..5 {
// We take 5 100 byte pages
XcmpQueue::take_outbound_messages(1);
}
assert!(DeliveryFeeFactor::<Test>::get(sibling_para_id) < FixedU128::from_float(1.72));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ pub(crate) fn weight_witness_warning(
if dev_mode {
return
}
let CallWeightDef::Immediate(w) = &method.weight else {
return
};
let CallWeightDef::Immediate(w) = &method.weight else { return };

let partial_warning = Warning::new_deprecated("UncheckedWeightWitness")
.old("not check weight witness data")
Expand Down Expand Up @@ -66,9 +64,7 @@ pub(crate) fn weight_constant_warning(
if dev_mode {
return
}
let syn::Expr::Lit(lit) = weight else {
return
};
let syn::Expr::Lit(lit) = weight else { return };

let warning = Warning::new_deprecated("ConstantWeight")
.index(warnings.len())
Expand Down
79 changes: 79 additions & 0 deletions substrate/utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,85 @@ where
Ok(keys)
}

/// Get keys at `prefix` in `block` in parallel manner.
async fn rpc_get_keys_parallel(
&self,
prefix: StorageKey,
block: B::Hash,
parallel: u16,
) -> Result<Vec<StorageKey>, &'static str> {
const MAX_PARALLEL: u16 = 4096;
const POW_OF_SIXTEEN: [u16; 3] = [1, 16, 256];

// round to power of 16, up to MAX_PARALLEL
fn round(n: u16) -> (u16, usize) {
if n <= 1 {
return (1, 0)
} else if n <= 16 {
return (16, 1)
}

let mut pow: u16 = 16;
let mut exp: usize = 1;

while pow < n {
if pow == MAX_PARALLEL {
break
}

pow = pow.saturating_mul(16);
exp += 1;
}

debug_assert!(pow <= MAX_PARALLEL);

// FIXME eagr: lack of a better idea for threshold
if n * 4 <= pow {
(pow / 16, exp - 1)
} else {
(pow, exp)
}
}

fn extension(n: u16, len: usize) -> Vec<u8> {
let mut ext = vec![0; len];
for i in 0..len {
ext[i] = (n / POW_OF_SIXTEEN[i] % 16) as u8;
}
ext
}

let (parallel, len) = round(parallel);

let batch = (0..parallel).into_iter().map(|i| {
let mut prefix = prefix.as_ref().to_vec();
prefix.extend(extension(i, len));
let prefix = StorageKey(prefix);

self.rpc_get_keys_paged(prefix, block)
});

let keys = futures::future::join_all(batch)
.await
.into_iter()
.filter_map(|res| match res {
Ok(keys) => Some(keys),
Err(err) => {
log::warn!(
target: LOG_TARGET,
"{} when fetching keys at block {:?}",
err,
block,
);
None
},
})
.flatten()
.collect::<Vec<StorageKey>>();

Ok(keys)
}

/// Fetches storage data from a node using a dynamic batch size.
///
/// This function adjusts the batch size on the fly to help prevent overwhelming the node with
Expand Down