Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bridges] Prune messages from confirmation tx body, not from the on_idle #4944

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 62 additions & 35 deletions bridges/modules/messages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ use bp_runtime::{
};
use codec::{Decode, Encode, MaxEncodedLen};
use frame_support::{dispatch::PostDispatchInfo, ensure, fail, traits::Get, DefaultNoBound};
use sp_runtime::traits::UniqueSaturatedFrom;
use sp_std::{marker::PhantomData, prelude::*};

mod inbound_lane;
Expand Down Expand Up @@ -153,40 +152,6 @@ pub mod pallet {
type OperatingModeStorage = PalletOperatingMode<T, I>;
}

#[pallet::hooks]
impl<T: Config<I>, I: 'static> Hooks<BlockNumberFor<T>> for Pallet<T, I>
where
u32: TryFrom<BlockNumberFor<T>>,
{
fn on_idle(_block: BlockNumberFor<T>, remaining_weight: Weight) -> Weight {
// we'll need at least to read outbound lane state, kill a message and update lane state
let db_weight = T::DbWeight::get();
if !remaining_weight.all_gte(db_weight.reads_writes(1, 2)) {
return Weight::zero()
}

// messages from lane with index `i` in `ActiveOutboundLanes` are pruned when
// `System::block_number() % lanes.len() == i`. Otherwise we need to read lane states on
// every block, wasting the whole `remaining_weight` for nothing and causing starvation
// of the last lane pruning
let active_lanes = T::ActiveOutboundLanes::get();
let active_lanes_len = (active_lanes.len() as u32).into();
let active_lane_index = u32::unique_saturated_from(
frame_system::Pallet::<T>::block_number() % active_lanes_len,
);
let active_lane_id = active_lanes[active_lane_index as usize];

// first db read - outbound lane state
let mut active_lane = outbound_lane::<T, I>(active_lane_id);
let mut used_weight = db_weight.reads(1);
// and here we'll have writes
used_weight += active_lane.prune_messages(db_weight, remaining_weight - used_weight);

// we already checked we have enough `remaining_weight` to cover this `used_weight`
used_weight
}
}

#[pallet::call]
impl<T: Config<I>, I: 'static> Pallet<T, I> {
/// Change `PalletOwner`.
Expand Down Expand Up @@ -610,6 +575,14 @@ pub mod pallet {
}
}

#[pallet::hooks]
impl<T: Config<I>, I: 'static> Hooks<BlockNumberFor<T>> for Pallet<T, I> {
#[cfg(feature = "try-runtime")]
fn try_state(_n: BlockNumberFor<T>) -> Result<(), sp_runtime::TryRuntimeError> {
Self::do_try_state()
}
}

impl<T: Config<I>, I: 'static> Pallet<T, I> {
/// Get stored data of the outbound message with given nonce.
pub fn outbound_message_data(lane: LaneId, nonce: MessageNonce) -> Option<MessagePayload> {
Expand Down Expand Up @@ -644,6 +617,60 @@ pub mod pallet {
}
}

#[cfg(any(feature = "try-runtime", test))]
impl<T: Config<I>, I: 'static> Pallet<T, I> {
/// Ensure the correctness of the state of this pallet.
pub fn do_try_state() -> Result<(), sp_runtime::TryRuntimeError> {
Self::do_try_state_for_outbound_lanes()
}

/// Ensure the correctness of the state of outbound lanes.
pub fn do_try_state_for_outbound_lanes() -> Result<(), sp_runtime::TryRuntimeError> {
use sp_runtime::traits::One;
use sp_std::vec::Vec;

// collect unpruned lanes
let mut unpruned_lanes = Vec::new();
for (lane_id, lane_data) in OutboundLanes::<T, I>::iter() {
let Some(expected_last_prunned_nonce) =
lane_data.oldest_unpruned_nonce.checked_sub(One::one())
else {
continue;
};

// collect message_nonces that were supposed to be pruned
let mut unpruned_message_nonces = Vec::new();
let mut nonce_to_check = expected_last_prunned_nonce;
bkontur marked this conversation as resolved.
Show resolved Hide resolved
loop {
if OutboundMessages::<T, I>::contains_key(MessageKey {
lane_id,
nonce: nonce_to_check,
}) {
unpruned_message_nonces.push(nonce_to_check);
}
if let Some(new_nonce_to_check) = nonce_to_check.checked_sub(One::one()) {
bkontur marked this conversation as resolved.
Show resolved Hide resolved
nonce_to_check = new_nonce_to_check;
} else {
break;
}
}

if !unpruned_message_nonces.is_empty() {
log::warn!(
target: LOG_TARGET,
"do_try_state_for_outbound_lanes for lane_id: {lane_id:?} with lane_data: {lane_data:?} found unpruned_message_nonces: {unpruned_message_nonces:?}",
);
unpruned_lanes.push((lane_id, lane_data, unpruned_message_nonces));
}
}

// ensure messages before `oldest_unpruned_nonce` are really pruned.
ensure!(unpruned_lanes.is_empty(), "Found unpruned lanes!");

Ok(())
}
}

/// Get-parameter that returns number of active outbound lanes that the pallet maintains.
pub struct MaybeOutboundLanesCount<T, I>(PhantomData<(T, I)>);

Expand Down
106 changes: 13 additions & 93 deletions bridges/modules/messages/src/outbound_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,9 @@ use bp_messages::{
ChainWithMessages, DeliveredMessages, LaneId, MessageNonce, OutboundLaneData, UnrewardedRelayer,
};
use codec::{Decode, Encode};
use frame_support::{
traits::Get,
weights::{RuntimeDbWeight, Weight},
BoundedVec, PalletError,
};
use frame_support::{traits::Get, BoundedVec, PalletError};
use scale_info::TypeInfo;
use sp_runtime::{traits::Zero, RuntimeDebug};
use sp_runtime::RuntimeDebug;
use sp_std::{collections::vec_deque::VecDeque, marker::PhantomData};

/// Outbound lane storage.
Expand Down Expand Up @@ -143,41 +139,17 @@ impl<S: OutboundLaneStorage> OutboundLane<S> {

ensure_unrewarded_relayers_are_correct(confirmed_messages.end, relayers)?;

// prune all confirmed messages
for nonce in confirmed_messages.begin..=confirmed_messages.end {
self.storage.remove_message(&nonce);
}

data.latest_received_nonce = confirmed_messages.end;
data.oldest_unpruned_nonce = data.latest_received_nonce.saturating_add(1);
self.storage.set_data(data);

Ok(Some(confirmed_messages))
}

/// Prune at most `max_messages_to_prune` already received messages.
///
/// Returns weight, consumed by messages pruning and lane state update.
pub fn prune_messages(
&mut self,
db_weight: RuntimeDbWeight,
mut remaining_weight: Weight,
) -> Weight {
let write_weight = db_weight.writes(1);
let two_writes_weight = write_weight + write_weight;
let mut spent_weight = Weight::zero();
let mut data = self.storage.data();
while remaining_weight.all_gte(two_writes_weight) &&
data.oldest_unpruned_nonce <= data.latest_received_nonce
{
self.storage.remove_message(&data.oldest_unpruned_nonce);

spent_weight += write_weight;
remaining_weight -= write_weight;
data.oldest_unpruned_nonce += 1;
}

if !spent_weight.is_zero() {
spent_weight += write_weight;
self.storage.set_data(data);
}

spent_weight
}
}

/// Verifies unrewarded relayers vec.
Expand Down Expand Up @@ -221,7 +193,6 @@ mod tests {
REGULAR_PAYLOAD, TEST_LANE_ID,
},
};
use frame_support::weights::constants::RocksDbWeight;
use sp_std::ops::RangeInclusive;

fn unrewarded_relayers(
Expand Down Expand Up @@ -281,7 +252,7 @@ mod tests {
);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}

Expand All @@ -302,15 +273,15 @@ mod tests {
);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 2);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 3);

assert_eq!(
lane.confirm_delivery(3, 3, &unrewarded_relayers(3..=3)),
Ok(Some(delivered_messages(3..=3))),
);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}

Expand All @@ -331,12 +302,12 @@ mod tests {
assert_eq!(lane.confirm_delivery(3, 3, &unrewarded_relayers(1..=3)), Ok(None),);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);

assert_eq!(lane.confirm_delivery(1, 2, &unrewarded_relayers(1..=1)), Ok(None),);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}

Expand Down Expand Up @@ -394,57 +365,6 @@ mod tests {
);
}

#[test]
fn prune_messages_works() {
run_test(|| {
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
// when lane is empty, nothing is pruned
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
Weight::zero()
);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
// when nothing is confirmed, nothing is pruned
lane.send_message(outbound_message_data(REGULAR_PAYLOAD));
lane.send_message(outbound_message_data(REGULAR_PAYLOAD));
lane.send_message(outbound_message_data(REGULAR_PAYLOAD));
assert!(lane.storage.message(&1).is_some());
assert!(lane.storage.message(&2).is_some());
assert!(lane.storage.message(&3).is_some());
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
Weight::zero()
);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
// after confirmation, some messages are received
assert_eq!(
lane.confirm_delivery(2, 2, &unrewarded_relayers(1..=2)),
Ok(Some(delivered_messages(1..=2))),
);
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
RocksDbWeight::get().writes(3),
);
assert!(lane.storage.message(&1).is_none());
assert!(lane.storage.message(&2).is_none());
assert!(lane.storage.message(&3).is_some());
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 3);
// after last message is confirmed, everything is pruned
assert_eq!(
lane.confirm_delivery(1, 3, &unrewarded_relayers(3..=3)),
Ok(Some(delivered_messages(3..=3))),
);
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
RocksDbWeight::get().writes(2),
);
assert!(lane.storage.message(&1).is_none());
assert!(lane.storage.message(&2).is_none());
assert!(lane.storage.message(&3).is_none());
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}

#[test]
fn confirm_delivery_detects_when_more_than_expected_messages_are_confirmed() {
run_test(|| {
Expand Down
Loading
Loading