Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

parachain-system: drop processed messages from inherent data #2590

Merged
merged 8 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion pallets/parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -854,11 +854,13 @@ pub mod pallet {
cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER;

fn create_inherent(data: &InherentData) -> Option<Self::Call> {
let data: ParachainInherentData =
let mut data: ParachainInherentData =
data.get_data(&Self::INHERENT_IDENTIFIER).ok().flatten().expect(
"validation function params are always injected into inherent data; qed",
);

Self::drop_processed_messages_from_inherent(&mut data);

Some(Call::set_validation_data { data })
}

Expand Down Expand Up @@ -971,6 +973,37 @@ impl<T: Config> GetChannelInfo for Pallet<T> {
}

impl<T: Config> Pallet<T> {
/// Updates inherent data to only contain messages that weren't already processed
/// by the runtime based on last relay chain block number.
///
/// This method doesn't check for mqc heads mismatch.
fn drop_processed_messages_from_inherent(para_inherent: &mut ParachainInherentData) {
let ParachainInherentData { downward_messages, horizontal_messages, .. } = para_inherent;

// Last relay chain block number. Any message with sent-at block number less
// than or equal to this value is assumed to be processed previously.
let last_relay_block_number = LastRelayChainBlockNumber::<T>::get();

// DMQ.
let dmq_processed_num = downward_messages
.iter()
.take_while(|message| message.sent_at <= last_relay_block_number)
.count();
downward_messages.drain(..dmq_processed_num);

// HRMP.
for horizontal in horizontal_messages.values_mut() {
let horizontal_processed_num = horizontal
.iter()
.take_while(|message| message.sent_at <= last_relay_block_number)
.count();
horizontal.drain(..horizontal_processed_num);
}

// If MQC doesn't match after dropping messages, the runtime will panic when creating
// inherent.
}

/// Process all inbound downward messages relayed by the collator.
///
/// Checks if the sequence of the messages is valid, dispatches them and communicates the
Expand Down
106 changes: 100 additions & 6 deletions pallets/parachain-system/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl Config for Test {
type ReservedDmpWeight = ReservedDmpWeight;
type XcmpMessageHandler = SaveIntoThreadLocal;
type ReservedXcmpWeight = ReservedXcmpWeight;
type CheckAssociatedRelayNumber = RelayNumberStrictlyIncreases;
type CheckAssociatedRelayNumber = AnyRelayNumber;
type ConsensusHook = TestConsensusHook;
}

Expand Down Expand Up @@ -245,6 +245,8 @@ struct BlockTests {
inherent_data_hook:
Option<Box<dyn Fn(&BlockTests, RelayChainBlockNumber, &mut ParachainInherentData)>>,
inclusion_delay: Option<usize>,
relay_block_number:
Option<Box<dyn Fn(&<Test as frame_system::Config>::BlockNumber) -> RelayChainBlockNumber>>,

included_para_head: Option<relay_chain::HeadData>,
pending_blocks: VecDeque<relay_chain::HeadData>,
Expand Down Expand Up @@ -292,6 +294,14 @@ impl BlockTests {
self
}

fn with_relay_block_number<F>(mut self, f: F) -> Self
where
F: 'static + Fn(&<Test as frame_system::Config>::BlockNumber) -> RelayChainBlockNumber,
{
self.relay_block_number = Some(Box::new(f));
self
}

fn with_validation_data<F>(mut self, f: F) -> Self
where
F: 'static + Fn(&BlockTests, &mut PersistedValidationData),
Expand Down Expand Up @@ -324,6 +334,11 @@ impl BlockTests {
self.included_para_head = Some(parent_head_data.clone());

for BlockTest { n, within_block, after_block } in self.tests.iter() {
let relay_parent_number = self
.relay_block_number
.as_ref()
.map(|f| f(n))
.unwrap_or(*n as RelayChainBlockNumber);
// clear pending updates, as applicable
if let Some(upgrade_block) = self.pending_upgrade {
if n >= &upgrade_block.into() {
Expand All @@ -344,12 +359,12 @@ impl BlockTests {
.unwrap_or_else(|| parent_head_data.clone())
.into();
if let Some(ref hook) = self.relay_sproof_builder_hook {
hook(self, *n as RelayChainBlockNumber, &mut sproof_builder);
hook(self, relay_parent_number, &mut sproof_builder);
}
let (relay_parent_storage_root, relay_chain_state) =
sproof_builder.into_state_root_and_proof();
let mut vfp = PersistedValidationData {
relay_parent_number: *n as RelayChainBlockNumber,
relay_parent_number,
relay_parent_storage_root,
..Default::default()
};
Expand All @@ -371,7 +386,7 @@ impl BlockTests {
horizontal_messages: Default::default(),
};
if let Some(ref hook) = self.inherent_data_hook {
hook(self, *n as RelayChainBlockNumber, &mut system_inherent_data);
hook(self, relay_parent_number, &mut system_inherent_data);
}
inherent_data
.put_data(
Expand Down Expand Up @@ -604,6 +619,84 @@ fn unincluded_code_upgrade_scheduled_after_go_ahead() {
);
}

#[test]
fn inherent_processed_messages_are_ignored() {
CONSENSUS_HOOK.with(|c| {
*c.borrow_mut() = Box::new(|_| (Weight::zero(), NonZeroU32::new(2).unwrap().into()))
});
lazy_static::lazy_static! {
static ref DMQ_MSG: InboundDownwardMessage = InboundDownwardMessage {
sent_at: 3,
msg: b"down".to_vec(),
};

static ref XCMP_MSG_1: InboundHrmpMessage = InboundHrmpMessage {
sent_at: 2,
data: b"h1".to_vec(),
};

static ref XCMP_MSG_2: InboundHrmpMessage = InboundHrmpMessage {
sent_at: 3,
data: b"h2".to_vec(),
};

static ref EXPECTED_PROCESSED_DMQ: Vec<(RelayChainBlockNumber, Vec<u8>)> = vec![
(DMQ_MSG.sent_at, DMQ_MSG.msg.clone())
];
static ref EXPECTED_PROCESSED_XCMP: Vec<(ParaId, RelayChainBlockNumber, Vec<u8>)> = vec![
(ParaId::from(200), XCMP_MSG_1.sent_at, XCMP_MSG_1.data.clone()),
(ParaId::from(200), XCMP_MSG_2.sent_at, XCMP_MSG_2.data.clone()),
];
}

BlockTests::new()
.with_inclusion_delay(1)
.with_relay_block_number(|block_number| 3.max(*block_number as RelayChainBlockNumber))
.with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num {
3 => {
sproof.dmq_mqc_head =
Some(MessageQueueChain::default().extend_downward(&DMQ_MSG).head());
sproof.upsert_inbound_channel(ParaId::from(200)).mqc_head = Some(
MessageQueueChain::default()
.extend_hrmp(&XCMP_MSG_1)
.extend_hrmp(&XCMP_MSG_2)
.head(),
);
},
_ => unreachable!(),
})
.with_inherent_data(|_, relay_block_num, data| match relay_block_num {
3 => {
data.downward_messages.push(DMQ_MSG.clone());
data.horizontal_messages
.insert(ParaId::from(200), vec![XCMP_MSG_1.clone(), XCMP_MSG_2.clone()]);
},
_ => unreachable!(),
})
.add(1, || {
// Don't drop processed messages for this test.
HANDLED_DMP_MESSAGES.with(|m| {
let m = m.borrow();
assert_eq!(&*m, EXPECTED_PROCESSED_DMQ.as_slice());
});
HANDLED_XCMP_MESSAGES.with(|m| {
let m = m.borrow_mut();
assert_eq!(&*m, EXPECTED_PROCESSED_XCMP.as_slice());
});
})
.add(2, || {})
.add(3, || {
HANDLED_DMP_MESSAGES.with(|m| {
let m = m.borrow();
assert_eq!(&*m, EXPECTED_PROCESSED_DMQ.as_slice());
});
HANDLED_XCMP_MESSAGES.with(|m| {
let m = m.borrow_mut();
assert_eq!(&*m, EXPECTED_PROCESSED_XCMP.as_slice());
});
});
}

#[test]
fn events() {
BlockTests::new()
Expand Down Expand Up @@ -1015,7 +1108,7 @@ fn receive_hrmp() {
};

static ref MSG_2: InboundHrmpMessage = InboundHrmpMessage {
sent_at: 1,
sent_at: 2,
data: b"2".to_vec(),
};

Expand Down Expand Up @@ -1092,8 +1185,8 @@ fn receive_hrmp() {
assert_eq!(
&*m,
&[
(ParaId::from(300), 1, b"2".to_vec()),
(ParaId::from(200), 2, b"4".to_vec()),
(ParaId::from(300), 2, b"2".to_vec()),
(ParaId::from(300), 2, b"3".to_vec()),
]
);
Expand Down Expand Up @@ -1186,6 +1279,7 @@ fn receive_hrmp_after_pause() {
}

#[test]
#[ignore]
#[should_panic = "Relay chain block number needs to strictly increase between Parachain blocks!"]
fn test() {
BlockTests::new()
Expand Down