Skip to content

Commit

Permalink
[bridges] Prune messages from confirmation tx body, not from the on_i…
Browse files Browse the repository at this point in the history
…dle (#4944)

Original PR with more context:
paritytech/parity-bridges-common#2211
Relates to:
paritytech/parity-bridges-common#2210

## TODO

- [x] fresh weighs for `pallet_bridge_messages`
- [x] add `try_state` for `pallet_bridge_messages` which checks for
unpruned messages - relates to the
[comment](paritytech/parity-bridges-common#2211 (comment))
- [x] ~prepare migration, that prunes leftovers, which would be pruned
eventually from `on_idle` the
[comment](paritytech/parity-bridges-common#2211 (comment)
can be done also by `set_storage` / `kill_storage` or with
`OnRuntimeUpgrade` implementatino when `do_try_state_for_outbound_lanes`
detects problem.

## Open question

- [ ] Do we really need `oldest_unpruned_nonce` afterwards?
- after the runtime upgrade and when `do_try_state_for_outbound_lanes`
pass, we won't need any migrations here
    - we won't even need `do_try_state_for_outbound_lanes`

---------

Signed-off-by: Branislav Kontur <[email protected]>
Co-authored-by: Svyatoslav Nikolsky <[email protected]>
Co-authored-by: command-bot <>
  • Loading branch information
bkontur and svyatonik authored Jul 10, 2024
1 parent 308ec4a commit 7c875be
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 332 deletions.
95 changes: 60 additions & 35 deletions bridges/modules/messages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ use bp_runtime::{
};
use codec::{Decode, Encode, MaxEncodedLen};
use frame_support::{dispatch::PostDispatchInfo, ensure, fail, traits::Get, DefaultNoBound};
use sp_runtime::traits::UniqueSaturatedFrom;
use sp_std::{marker::PhantomData, prelude::*};

mod inbound_lane;
Expand Down Expand Up @@ -153,40 +152,6 @@ pub mod pallet {
type OperatingModeStorage = PalletOperatingMode<T, I>;
}

#[pallet::hooks]
impl<T: Config<I>, I: 'static> Hooks<BlockNumberFor<T>> for Pallet<T, I>
where
u32: TryFrom<BlockNumberFor<T>>,
{
fn on_idle(_block: BlockNumberFor<T>, remaining_weight: Weight) -> Weight {
// we'll need at least to read outbound lane state, kill a message and update lane state
let db_weight = T::DbWeight::get();
if !remaining_weight.all_gte(db_weight.reads_writes(1, 2)) {
return Weight::zero()
}

// messages from lane with index `i` in `ActiveOutboundLanes` are pruned when
// `System::block_number() % lanes.len() == i`. Otherwise we need to read lane states on
// every block, wasting the whole `remaining_weight` for nothing and causing starvation
// of the last lane pruning
let active_lanes = T::ActiveOutboundLanes::get();
let active_lanes_len = (active_lanes.len() as u32).into();
let active_lane_index = u32::unique_saturated_from(
frame_system::Pallet::<T>::block_number() % active_lanes_len,
);
let active_lane_id = active_lanes[active_lane_index as usize];

// first db read - outbound lane state
let mut active_lane = outbound_lane::<T, I>(active_lane_id);
let mut used_weight = db_weight.reads(1);
// and here we'll have writes
used_weight += active_lane.prune_messages(db_weight, remaining_weight - used_weight);

// we already checked we have enough `remaining_weight` to cover this `used_weight`
used_weight
}
}

#[pallet::call]
impl<T: Config<I>, I: 'static> Pallet<T, I> {
/// Change `PalletOwner`.
Expand Down Expand Up @@ -610,6 +575,14 @@ pub mod pallet {
}
}

#[pallet::hooks]
impl<T: Config<I>, I: 'static> Hooks<BlockNumberFor<T>> for Pallet<T, I> {
#[cfg(feature = "try-runtime")]
fn try_state(_n: BlockNumberFor<T>) -> Result<(), sp_runtime::TryRuntimeError> {
Self::do_try_state()
}
}

impl<T: Config<I>, I: 'static> Pallet<T, I> {
/// Get stored data of the outbound message with given nonce.
pub fn outbound_message_data(lane: LaneId, nonce: MessageNonce) -> Option<MessagePayload> {
Expand Down Expand Up @@ -644,6 +617,58 @@ pub mod pallet {
}
}

#[cfg(any(feature = "try-runtime", test))]
impl<T: Config<I>, I: 'static> Pallet<T, I> {
/// Ensure the correctness of the state of this pallet.
pub fn do_try_state() -> Result<(), sp_runtime::TryRuntimeError> {
Self::do_try_state_for_outbound_lanes()
}

/// Ensure the correctness of the state of outbound lanes.
pub fn do_try_state_for_outbound_lanes() -> Result<(), sp_runtime::TryRuntimeError> {
use sp_runtime::traits::One;
use sp_std::vec::Vec;

// collect unpruned lanes
let mut unpruned_lanes = Vec::new();
for (lane_id, lane_data) in OutboundLanes::<T, I>::iter() {
let Some(expected_last_prunned_nonce) =
lane_data.oldest_unpruned_nonce.checked_sub(One::one())
else {
continue;
};

// collect message_nonces that were supposed to be pruned
let mut unpruned_message_nonces = Vec::new();
const MAX_MESSAGES_ITERATION: u64 = 16;
let start_nonce =
expected_last_prunned_nonce.checked_sub(MAX_MESSAGES_ITERATION).unwrap_or(0);
for current_nonce in start_nonce..=expected_last_prunned_nonce {
// check a message for current_nonce
if OutboundMessages::<T, I>::contains_key(MessageKey {
lane_id,
nonce: current_nonce,
}) {
unpruned_message_nonces.push(current_nonce);
}
}

if !unpruned_message_nonces.is_empty() {
log::warn!(
target: LOG_TARGET,
"do_try_state_for_outbound_lanes for lane_id: {lane_id:?} with lane_data: {lane_data:?} found unpruned_message_nonces: {unpruned_message_nonces:?}",
);
unpruned_lanes.push((lane_id, lane_data, unpruned_message_nonces));
}
}

// ensure messages before `oldest_unpruned_nonce` are really pruned.
ensure!(unpruned_lanes.is_empty(), "Found unpruned lanes!");

Ok(())
}
}

/// Get-parameter that returns number of active outbound lanes that the pallet maintains.
pub struct MaybeOutboundLanesCount<T, I>(PhantomData<(T, I)>);

Expand Down
106 changes: 13 additions & 93 deletions bridges/modules/messages/src/outbound_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,9 @@ use bp_messages::{
ChainWithMessages, DeliveredMessages, LaneId, MessageNonce, OutboundLaneData, UnrewardedRelayer,
};
use codec::{Decode, Encode};
use frame_support::{
traits::Get,
weights::{RuntimeDbWeight, Weight},
BoundedVec, PalletError,
};
use frame_support::{traits::Get, BoundedVec, PalletError};
use scale_info::TypeInfo;
use sp_runtime::{traits::Zero, RuntimeDebug};
use sp_runtime::RuntimeDebug;
use sp_std::{collections::vec_deque::VecDeque, marker::PhantomData};

/// Outbound lane storage.
Expand Down Expand Up @@ -143,41 +139,17 @@ impl<S: OutboundLaneStorage> OutboundLane<S> {

ensure_unrewarded_relayers_are_correct(confirmed_messages.end, relayers)?;

// prune all confirmed messages
for nonce in confirmed_messages.begin..=confirmed_messages.end {
self.storage.remove_message(&nonce);
}

data.latest_received_nonce = confirmed_messages.end;
data.oldest_unpruned_nonce = data.latest_received_nonce.saturating_add(1);
self.storage.set_data(data);

Ok(Some(confirmed_messages))
}

/// Prune at most `max_messages_to_prune` already received messages.
///
/// Returns weight, consumed by messages pruning and lane state update.
pub fn prune_messages(
&mut self,
db_weight: RuntimeDbWeight,
mut remaining_weight: Weight,
) -> Weight {
let write_weight = db_weight.writes(1);
let two_writes_weight = write_weight + write_weight;
let mut spent_weight = Weight::zero();
let mut data = self.storage.data();
while remaining_weight.all_gte(two_writes_weight) &&
data.oldest_unpruned_nonce <= data.latest_received_nonce
{
self.storage.remove_message(&data.oldest_unpruned_nonce);

spent_weight += write_weight;
remaining_weight -= write_weight;
data.oldest_unpruned_nonce += 1;
}

if !spent_weight.is_zero() {
spent_weight += write_weight;
self.storage.set_data(data);
}

spent_weight
}
}

/// Verifies unrewarded relayers vec.
Expand Down Expand Up @@ -221,7 +193,6 @@ mod tests {
REGULAR_PAYLOAD, TEST_LANE_ID,
},
};
use frame_support::weights::constants::RocksDbWeight;
use sp_std::ops::RangeInclusive;

fn unrewarded_relayers(
Expand Down Expand Up @@ -281,7 +252,7 @@ mod tests {
);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}

Expand All @@ -302,15 +273,15 @@ mod tests {
);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 2);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 3);

assert_eq!(
lane.confirm_delivery(3, 3, &unrewarded_relayers(3..=3)),
Ok(Some(delivered_messages(3..=3))),
);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}

Expand All @@ -331,12 +302,12 @@ mod tests {
assert_eq!(lane.confirm_delivery(3, 3, &unrewarded_relayers(1..=3)), Ok(None),);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);

assert_eq!(lane.confirm_delivery(1, 2, &unrewarded_relayers(1..=1)), Ok(None),);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}

Expand Down Expand Up @@ -394,57 +365,6 @@ mod tests {
);
}

#[test]
fn prune_messages_works() {
run_test(|| {
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
// when lane is empty, nothing is pruned
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
Weight::zero()
);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
// when nothing is confirmed, nothing is pruned
lane.send_message(outbound_message_data(REGULAR_PAYLOAD));
lane.send_message(outbound_message_data(REGULAR_PAYLOAD));
lane.send_message(outbound_message_data(REGULAR_PAYLOAD));
assert!(lane.storage.message(&1).is_some());
assert!(lane.storage.message(&2).is_some());
assert!(lane.storage.message(&3).is_some());
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
Weight::zero()
);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
// after confirmation, some messages are received
assert_eq!(
lane.confirm_delivery(2, 2, &unrewarded_relayers(1..=2)),
Ok(Some(delivered_messages(1..=2))),
);
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
RocksDbWeight::get().writes(3),
);
assert!(lane.storage.message(&1).is_none());
assert!(lane.storage.message(&2).is_none());
assert!(lane.storage.message(&3).is_some());
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 3);
// after last message is confirmed, everything is pruned
assert_eq!(
lane.confirm_delivery(1, 3, &unrewarded_relayers(3..=3)),
Ok(Some(delivered_messages(3..=3))),
);
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
RocksDbWeight::get().writes(2),
);
assert!(lane.storage.message(&1).is_none());
assert!(lane.storage.message(&2).is_none());
assert!(lane.storage.message(&3).is_none());
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}

#[test]
fn confirm_delivery_detects_when_more_than_expected_messages_are_confirmed() {
run_test(|| {
Expand Down
Loading

0 comments on commit 7c875be

Please sign in to comment.