diff --git a/bridges/bin/node/runtime/Cargo.toml b/bridges/bin/node/runtime/Cargo.toml index 62a8e6baee411..5deeff830845f 100644 --- a/bridges/bin/node/runtime/Cargo.toml +++ b/bridges/bin/node/runtime/Cargo.toml @@ -50,6 +50,11 @@ version = "0.1.0" default-features = false path = "../../../modules/currency-exchange" +[dependencies.pallet-message-lane] +version = "0.1.0" +default-features = false +path = "../../../modules/message-lane" + [dependencies.frame-support] version = "2.0.0-rc6" tag = 'v2.0.0-rc6' diff --git a/bridges/modules/message-lane/Cargo.toml b/bridges/modules/message-lane/Cargo.toml new file mode 100644 index 0000000000000..71bb86f689f08 --- /dev/null +++ b/bridges/modules/message-lane/Cargo.toml @@ -0,0 +1,56 @@ +[package] +name = "pallet-message-lane" +description = "Module that allows bridged chains to exchange messages using lane concept." +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" +license = "GPL-3.0-or-later WITH Classpath-exception-2.0" + +[dependencies] +bp-message-lane = { path = "../../primitives/message-lane", default-features = false } +codec = { package = "parity-scale-codec", version = "1.3.1", default-features = false } + +# Substrate Based Dependencies + +[dependencies.frame-support] +version = "2.0.0-rc6" +tag = 'v2.0.0-rc6' +default-features = false +git = "https://github.com/paritytech/substrate/" + +[dependencies.frame-system] +version = "2.0.0-rc6" +tag = 'v2.0.0-rc6' +default-features = false +git = "https://github.com/paritytech/substrate/" + +[dependencies.sp-std] +version = "2.0.0-rc6" +tag = 'v2.0.0-rc6' +default-features = false +git = "https://github.com/paritytech/substrate/" + +[dev-dependencies.sp-core] +version = "2.0.0-rc6" +tag = 'v2.0.0-rc6' +git = "https://github.com/paritytech/substrate/" + +[dev-dependencies.sp-io] +version = "2.0.0-rc6" +tag = 'v2.0.0-rc6' +git = "https://github.com/paritytech/substrate/" + +[dev-dependencies.sp-runtime] +version = "2.0.0-rc6" +tag = 'v2.0.0-rc6' +git = "https://github.com/paritytech/substrate/" + +[features] +default = ["std"] +std = [ + "bp-message-lane/std", + "codec/std", + "frame-support/std", + "frame-system/std", + "sp-std/std" +] diff --git a/bridges/modules/message-lane/src/inbound_lane.rs b/bridges/modules/message-lane/src/inbound_lane.rs new file mode 100644 index 0000000000000..abf6ffb346760 --- /dev/null +++ b/bridges/modules/message-lane/src/inbound_lane.rs @@ -0,0 +1,236 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Everything about incoming messages receival. + +use bp_message_lane::{InboundLaneData, LaneId, Message, MessageKey, MessageNonce, MessageResult, OnMessageReceived}; + +/// Inbound lane storage. +pub trait InboundLaneStorage { + /// Message payload. + type Payload; + + /// Lane id. + fn id(&self) -> LaneId; + /// Get lane data from the storage. + fn data(&self) -> InboundLaneData; + /// Update lane data in the storage. + fn set_data(&mut self, data: InboundLaneData); + /// Returns saved inbound message payload. + fn message(&self, nonce: &MessageNonce) -> Option; + /// Save inbound message in the storage. + fn save_message(&mut self, nonce: MessageNonce, payload: Self::Payload); + /// Remove inbound message from the storage. + fn remove_message(&mut self, nonce: &MessageNonce); +} + +/// Inbound messages lane. +pub struct InboundLane { + storage: S, +} + +impl InboundLane { + /// Create new inbound lane backed by given storage. + pub fn new(storage: S) -> Self { + InboundLane { storage } + } + + /// Receive new message. + pub fn receive_message( + &mut self, + nonce: MessageNonce, + payload: S::Payload, + processor: &mut impl OnMessageReceived, + ) -> bool { + let mut data = self.storage.data(); + let is_correct_message = nonce == data.latest_received_nonce + 1; + if !is_correct_message { + return false; + } + + let is_process_required = is_correct_message && data.oldest_unprocessed_nonce == nonce; + data.latest_received_nonce = nonce; + self.storage.set_data(data); + + let payload_to_save = match is_process_required { + true => { + let message = Message { + key: MessageKey { + lane_id: self.storage.id(), + nonce, + }, + payload, + }; + match processor.on_message_received(message) { + MessageResult::Processed => None, + MessageResult::NotProcessed(message) => Some(message.payload), + } + } + false => Some(payload), + }; + + if let Some(payload_to_save) = payload_to_save { + self.storage.save_message(nonce, payload_to_save); + } + + true + } + + /// Process stored lane messages. + /// + /// Stops processing either when all messages are processed, or when processor returns + /// MessageResult::NotProcessed. + pub fn process_messages(&mut self, processor: &mut impl OnMessageReceived) { + let mut anything_processed = false; + let mut data = self.storage.data(); + while data.oldest_unprocessed_nonce <= data.latest_received_nonce { + let nonce = data.oldest_unprocessed_nonce; + let payload = self + .storage + .message(&nonce) + .expect("message is referenced by lane; referenced message is not pruned; qed"); + let message = Message { + key: MessageKey { + lane_id: self.storage.id(), + nonce, + }, + payload, + }; + + let process_result = processor.on_message_received(message); + if let MessageResult::NotProcessed(_) = process_result { + break; + } + + self.storage.remove_message(&nonce); + + anything_processed = true; + data.oldest_unprocessed_nonce += 1; + } + + if anything_processed { + self.storage.set_data(data); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + inbound_lane, + mock::{ + run_test, TestMessageProcessor, TestPayload, TestRuntime, PAYLOAD_TO_QUEUE, REGULAR_PAYLOAD, TEST_LANE_ID, + }, + }; + + #[test] + fn fails_to_receive_message_with_incorrect_nonce() { + run_test(|| { + let mut lane = inbound_lane::(TEST_LANE_ID); + assert!(!lane.receive_message(10, REGULAR_PAYLOAD, &mut TestMessageProcessor)); + assert!(lane.storage.message(&10).is_none()); + assert_eq!(lane.storage.data().latest_received_nonce, 0); + }); + } + + #[test] + fn correct_message_is_queued_if_some_other_messages_are_queued() { + run_test(|| { + let mut lane = inbound_lane::(TEST_LANE_ID); + assert!(lane.receive_message(1, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor)); + assert!(lane.storage.message(&1).is_some()); + assert!(lane.receive_message(2, REGULAR_PAYLOAD, &mut TestMessageProcessor)); + assert!(lane.storage.message(&2).is_some()); + assert_eq!(lane.storage.data().latest_received_nonce, 2); + }); + } + + #[test] + fn correct_message_is_queued_if_processor_wants_to_queue() { + run_test(|| { + let mut lane = inbound_lane::(TEST_LANE_ID); + assert!(lane.receive_message(1, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor)); + assert!(lane.storage.message(&1).is_some()); + assert_eq!(lane.storage.data().latest_received_nonce, 1); + }); + } + + #[test] + fn correct_message_is_not_queued_if_processed_instantly() { + run_test(|| { + let mut lane = inbound_lane::(TEST_LANE_ID); + assert!(lane.receive_message(1, REGULAR_PAYLOAD, &mut TestMessageProcessor)); + assert!(lane.storage.message(&1).is_none()); + assert_eq!(lane.storage.data().latest_received_nonce, 1); + }); + } + + #[test] + fn process_message_does_nothing_when_lane_is_empty() { + run_test(|| { + let mut lane = inbound_lane::(TEST_LANE_ID); + assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 1); + lane.process_messages(&mut TestMessageProcessor); + assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 1); + }); + } + + #[test] + fn process_message_works() { + run_test(|| { + pub struct QueueByNonce(MessageNonce); + + impl OnMessageReceived for QueueByNonce { + fn on_message_received(&mut self, message: Message) -> MessageResult { + if message.key.nonce == self.0 { + MessageResult::NotProcessed(message) + } else { + MessageResult::Processed + } + } + } + + let mut lane = inbound_lane::(TEST_LANE_ID); + assert!(lane.receive_message(1, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor)); + assert!(lane.receive_message(2, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor)); + assert!(lane.receive_message(3, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor)); + assert!(lane.receive_message(4, REGULAR_PAYLOAD, &mut TestMessageProcessor)); + + assert!(lane.storage.message(&1).is_some()); + assert!(lane.storage.message(&2).is_some()); + assert!(lane.storage.message(&3).is_some()); + assert!(lane.storage.message(&4).is_some()); + assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 1); + + lane.process_messages(&mut QueueByNonce(3)); + + assert!(lane.storage.message(&1).is_none()); + assert!(lane.storage.message(&2).is_none()); + assert!(lane.storage.message(&3).is_some()); + assert!(lane.storage.message(&4).is_some()); + assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 3); + + lane.process_messages(&mut QueueByNonce(10)); + + assert!(lane.storage.message(&1).is_none()); + assert!(lane.storage.message(&2).is_none()); + assert!(lane.storage.message(&3).is_none()); + assert!(lane.storage.message(&4).is_none()); + assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 5); + }); + } +} diff --git a/bridges/modules/message-lane/src/lib.rs b/bridges/modules/message-lane/src/lib.rs new file mode 100644 index 0000000000000..40517887e1579 --- /dev/null +++ b/bridges/modules/message-lane/src/lib.rs @@ -0,0 +1,281 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Runtime module that allows sending and receiving messages using lane concept: +//! +//! 1) the message is sent using `send_message()` call; +//! 2) every outbound message is assigned nonce; +//! 3) the messages are stored in the storage; +//! 4) external component (relay) delivers messages to bridged chain; +//! 5) messages are processed in order (ordered by assigned nonce); +//! 6) relay may send proof-of-receiving and proof-of-processing back to this chain. +//! +//! Once message is sent, its progress can be tracked by looking at module events. +//! The assigned nonce is reported using `MessageAccepted` event. When message is +//! accepted by the bridged chain, `MessagesDelivered` is fired. When message is +//! processedby the bridged chain, `MessagesProcessed` by the bridged chain. + +#![cfg_attr(not(feature = "std"), no_std)] + +use crate::inbound_lane::{InboundLane, InboundLaneStorage}; +use crate::outbound_lane::{OutboundLane, OutboundLaneStorage}; + +use bp_message_lane::{ + InboundLaneData, LaneId, Message, MessageKey, MessageNonce, OnMessageReceived, OutboundLaneData, +}; +use frame_support::{decl_event, decl_module, decl_storage, traits::Get, Parameter, StorageMap}; +use frame_system::ensure_signed; +use sp_std::{marker::PhantomData, prelude::*}; + +mod inbound_lane; +mod outbound_lane; + +#[cfg(test)] +mod mock; + +/// The module configuration trait +pub trait Trait: frame_system::Trait { + /// They overarching event type. + type Event: From + Into<::Event>; + /// Message payload. + type Payload: Parameter; + /// Maximal number of messages that may be pruned during maintenance. Maintenance occurs + /// whenever outbound lane is updated - i.e. when new message is sent, or receival is + /// confirmed. The reason is that if you want to use lane, you should be ready to pay + /// for it. + type MaxMessagesToPruneAtOnce: Get; + /// Called when message has been received. + type OnMessageReceived: Default + OnMessageReceived; +} + +decl_storage! { + trait Store for Module, I: Instance = DefaultInstance> as MessageLane { + /// Map of lane id => inbound lane data. + InboundLanes: map hasher(blake2_128_concat) LaneId => InboundLaneData; + /// All stored (unprocessed) inbound messages. + InboundMessages: map hasher(blake2_128_concat) MessageKey => Option; + /// Map of lane id => outbound lane data. + OutboundLanes: map hasher(blake2_128_concat) LaneId => OutboundLaneData; + /// All queued outbound messages. + OutboundMessages: map hasher(blake2_128_concat) MessageKey => Option; + } +} + +decl_event!( + pub enum Event { + /// Message has been accepted and is waiting to be delivered. + MessageAccepted(LaneId, MessageNonce), + /// Messages in the inclusive range have been delivered to the bridged chain. + MessagesDelivered(LaneId, MessageNonce, MessageNonce), + /// Messages in the inclusive range have been processed by the bridged chain. + MessagesProcessed(LaneId, MessageNonce, MessageNonce), + } +); + +decl_module! { + pub struct Module, I: Instance = DefaultInstance> for enum Call where origin: T::Origin { + /// Deposit one of this module's events by using the default implementation. + fn deposit_event() = default; + + /// Send message over lane. + #[weight = 0] // TODO: update me (https://github.com/paritytech/parity-bridges-common/issues/78) + pub fn send_message( + origin, + lane_id: LaneId, + payload: T::Payload, + ) { + let _ = ensure_signed(origin)?; + let mut lane = outbound_lane::(lane_id); + let nonce = lane.send_message(payload); + lane.prune_messages(T::MaxMessagesToPruneAtOnce::get()); + + Self::deposit_event(Event::MessageAccepted(lane_id, nonce)); + } + } +} + +impl, I: Instance> Module { + // ========================================================================================= + // === Exposed mutables ==================================================================== + // ========================================================================================= + + /// Receive new TRUSTED lane messages. + /// + /// Trusted here means that the function itself doesn't check whether message has actually + /// been sent through the other end of the channel. We only check that we are receiving + /// and processing messages in order here. + /// + /// Messages vector is required to be sorted by nonce within each lane. Otherise messages + /// will be rejected. + pub fn receive_messages(messages: Vec>) -> MessageNonce { + let mut correct_messages = 0; + let mut processor = T::OnMessageReceived::default(); + for message in messages { + let mut lane = inbound_lane::(message.key.lane_id); + if lane.receive_message(message.key.nonce, message.payload, &mut processor) { + correct_messages += 1; + } + } + + correct_messages + } + + /// Process stored lane messages. + /// + /// Stops processing either when all messages are processed, or when processor returns + /// MessageResult::NotProcessed. + pub fn process_lane_messages(lane_id: &LaneId, processor: &mut impl OnMessageReceived) { + inbound_lane::(*lane_id).process_messages(processor); + } + + /// Receive TRUSTED proof of message receival. + /// + /// Trusted here means that the function itself doesn't check whether the bridged chain has + /// actually received these messages. + /// + /// The caller may break the channel by providing `latest_received_nonce` that is larger + /// than actual one. Not-yet-sent messages may be pruned in this case. + pub fn confirm_receival(lane_id: &LaneId, latest_received_nonce: MessageNonce) { + let mut lane = outbound_lane::(*lane_id); + let received_range = lane.confirm_receival(latest_received_nonce); + + if let Some(received_range) = received_range { + Self::deposit_event(Event::MessagesDelivered(*lane_id, received_range.0, received_range.1)); + } + } + + /// Receive TRUSTED proof of message processing. + /// + /// Trusted here means that the function itself doesn't check whether the bridged chain has + /// actually processed these messages. + pub fn confirm_processing(lane_id: &LaneId, latest_processed_nonce: MessageNonce) { + let mut lane = outbound_lane::(*lane_id); + let processed_range = lane.confirm_processing(latest_processed_nonce); + + if let Some(processed_range) = processed_range { + Self::deposit_event(Event::MessagesProcessed(*lane_id, processed_range.0, processed_range.1)); + } + } +} + +/// Creates new inbound lane object, backed by runtime storage. +fn inbound_lane, I: Instance>(lane_id: LaneId) -> InboundLane> { + InboundLane::new(RuntimeInboundLaneStorage { + lane_id, + _phantom: Default::default(), + }) +} + +/// Creates new outbound lane object, backed by runtime storage. +fn outbound_lane, I: Instance>(lane_id: LaneId) -> OutboundLane> { + OutboundLane::new(RuntimeOutboundLaneStorage { + lane_id, + _phantom: Default::default(), + }) +} + +/// Runtime inbound lane storage. +struct RuntimeInboundLaneStorage { + lane_id: LaneId, + _phantom: PhantomData<(T, I)>, +} + +impl, I: Instance> InboundLaneStorage for RuntimeInboundLaneStorage { + type Payload = T::Payload; + + fn id(&self) -> LaneId { + self.lane_id + } + + fn data(&self) -> InboundLaneData { + InboundLanes::::get(&self.lane_id) + } + + fn set_data(&mut self, data: InboundLaneData) { + InboundLanes::::insert(&self.lane_id, data) + } + + fn message(&self, nonce: &MessageNonce) -> Option { + InboundMessages::::get(MessageKey { + lane_id: self.lane_id, + nonce: *nonce, + }) + } + + fn save_message(&mut self, nonce: MessageNonce, payload: T::Payload) { + InboundMessages::::insert( + MessageKey { + lane_id: self.lane_id, + nonce, + }, + payload, + ); + } + + fn remove_message(&mut self, nonce: &MessageNonce) { + InboundMessages::::remove(MessageKey { + lane_id: self.lane_id, + nonce: *nonce, + }); + } +} + +/// Runtime outbound lane storage. +struct RuntimeOutboundLaneStorage { + lane_id: LaneId, + _phantom: PhantomData<(T, I)>, +} + +impl, I: Instance> OutboundLaneStorage for RuntimeOutboundLaneStorage { + type Payload = T::Payload; + + fn id(&self) -> LaneId { + self.lane_id + } + + fn data(&self) -> OutboundLaneData { + OutboundLanes::::get(&self.lane_id) + } + + fn set_data(&mut self, data: OutboundLaneData) { + OutboundLanes::::insert(&self.lane_id, data) + } + + #[cfg(test)] + fn message(&self, nonce: &MessageNonce) -> Option { + OutboundMessages::::get(MessageKey { + lane_id: self.lane_id, + nonce: *nonce, + }) + } + + fn save_message(&mut self, nonce: MessageNonce, payload: T::Payload) { + OutboundMessages::::insert( + MessageKey { + lane_id: self.lane_id, + nonce, + }, + payload, + ); + } + + fn remove_message(&mut self, nonce: &MessageNonce) { + OutboundMessages::::remove(MessageKey { + lane_id: self.lane_id, + nonce: *nonce, + }); + } +} diff --git a/bridges/modules/message-lane/src/mock.rs b/bridges/modules/message-lane/src/mock.rs new file mode 100644 index 0000000000000..5512490751906 --- /dev/null +++ b/bridges/modules/message-lane/src/mock.rs @@ -0,0 +1,121 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +use bp_message_lane::{LaneId, Message, MessageResult, OnMessageReceived}; +use frame_support::{impl_outer_event, impl_outer_origin, parameter_types, weights::Weight}; +use sp_core::H256; +use sp_runtime::{ + testing::Header as SubstrateHeader, + traits::{BlakeTwo256, IdentityLookup}, + Perbill, +}; + +use crate::Trait; + +pub type AccountId = u64; +pub type TestPayload = u64; + +#[derive(Clone, Eq, PartialEq, Debug)] +pub struct TestRuntime; + +mod message_lane { + pub use crate::Event; +} + +impl_outer_event! { + pub enum TestEvent for TestRuntime { + frame_system, + message_lane, + } +} + +impl_outer_origin! { + pub enum Origin for TestRuntime where system = frame_system {} +} + +parameter_types! { + pub const BlockHashCount: u64 = 250; + pub const MaximumBlockWeight: Weight = 1024; + pub const MaximumBlockLength: u32 = 2 * 1024; + pub const AvailableBlockRatio: Perbill = Perbill::one(); +} + +impl frame_system::Trait for TestRuntime { + type Origin = Origin; + type Index = u64; + type Call = (); + type BlockNumber = u64; + type Hash = H256; + type Hashing = BlakeTwo256; + type AccountId = AccountId; + type Lookup = IdentityLookup; + type Header = SubstrateHeader; + type Event = TestEvent; + type BlockHashCount = BlockHashCount; + type MaximumBlockWeight = MaximumBlockWeight; + type DbWeight = (); + type BlockExecutionWeight = (); + type ExtrinsicBaseWeight = (); + type MaximumExtrinsicWeight = (); + type AvailableBlockRatio = AvailableBlockRatio; + type MaximumBlockLength = MaximumBlockLength; + type Version = (); + type ModuleToIndex = (); + type AccountData = (); + type OnNewAccount = (); + type OnKilledAccount = (); + type BaseCallFilter = (); + type SystemWeightInfo = (); +} + +parameter_types! { + pub const MaxMessagesToPruneAtOnce: u64 = 10; +} + +impl Trait for TestRuntime { + type Event = TestEvent; + type Payload = TestPayload; + type MaxMessagesToPruneAtOnce = MaxMessagesToPruneAtOnce; + type OnMessageReceived = TestMessageProcessor; +} + +/// Lane that we're using in tests. +pub const TEST_LANE_ID: LaneId = [0, 0, 0, 1]; + +/// Regular message payload that is not PAYLOAD_TO_QUEUE. +pub const REGULAR_PAYLOAD: TestPayload = 0; + +/// All messages with this payload are queued by TestMessageProcessor. +pub const PAYLOAD_TO_QUEUE: TestPayload = 42; + +/// Message processor that immediately handles all messages except messages with PAYLOAD_TO_QUEUE payload. +#[derive(Debug, Default)] +pub struct TestMessageProcessor; + +impl OnMessageReceived for TestMessageProcessor { + fn on_message_received(&mut self, message: Message) -> MessageResult { + if message.payload == PAYLOAD_TO_QUEUE { + MessageResult::NotProcessed(message) + } else { + MessageResult::Processed + } + } +} + +/// Run message lane test. +pub fn run_test(test: impl FnOnce() -> T) -> T { + sp_io::TestExternalities::new(Default::default()).execute_with(test) +} diff --git a/bridges/modules/message-lane/src/outbound_lane.rs b/bridges/modules/message-lane/src/outbound_lane.rs new file mode 100644 index 0000000000000..7357dbd410830 --- /dev/null +++ b/bridges/modules/message-lane/src/outbound_lane.rs @@ -0,0 +1,271 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Everything about outgoing messages sending. + +use bp_message_lane::{LaneId, MessageNonce, OutboundLaneData}; + +/// Outbound lane storage. +pub trait OutboundLaneStorage { + /// Message payload. + type Payload; + + /// Lane id. + fn id(&self) -> LaneId; + /// Get lane data from the storage. + fn data(&self) -> OutboundLaneData; + /// Update lane data in the storage. + fn set_data(&mut self, data: OutboundLaneData); + /// Returns saved outbound message payload. + #[cfg(test)] + fn message(&self, nonce: &MessageNonce) -> Option; + /// Save outbound message in the storage. + fn save_message(&mut self, nonce: MessageNonce, payload: Self::Payload); + /// Remove outbound message from the storage. + fn remove_message(&mut self, nonce: &MessageNonce); +} + +/// Outbound messages lane. +pub struct OutboundLane { + storage: S, +} + +impl OutboundLane { + /// Create new inbound lane backed by given storage. + pub fn new(storage: S) -> Self { + OutboundLane { storage } + } + + /// Send message over lane. + /// + /// Returns new message nonce. + pub fn send_message(&mut self, payload: S::Payload) -> MessageNonce { + let mut data = self.storage.data(); + let nonce = data.latest_generated_nonce + 1; + data.latest_generated_nonce = nonce; + + self.storage.save_message(nonce, payload); + self.storage.set_data(data); + + nonce + } + + /// Confirm message receival. + /// + /// Returns `None` if confirmation is wrong/duplicate. + /// Returns `Some` with inclusive ranges of message nonces that have been received. + pub fn confirm_receival(&mut self, latest_received_nonce: MessageNonce) -> Option<(MessageNonce, MessageNonce)> { + let mut data = self.storage.data(); + if latest_received_nonce <= data.latest_received_nonce || latest_received_nonce > data.latest_generated_nonce { + return None; + } + + let prev_latest_received_nonce = data.latest_received_nonce; + data.latest_received_nonce = latest_received_nonce; + self.storage.set_data(data); + + Some((prev_latest_received_nonce + 1, latest_received_nonce)) + } + + /// Confirm message processing. + /// + /// Returns `None` if confirmation is wrong/duplicate. + /// Returns `Some` with inclusive ranges of message nonces that have been processed. + pub fn confirm_processing(&mut self, latest_processed_nonce: MessageNonce) -> Option<(MessageNonce, MessageNonce)> { + let mut data = self.storage.data(); + // wait for recieval confirmation first + if latest_processed_nonce <= data.latest_processed_nonce || latest_processed_nonce > data.latest_received_nonce + { + return None; + } + + let prev_latest_processed_nonce = data.latest_processed_nonce; + data.latest_processed_nonce = latest_processed_nonce; + self.storage.set_data(data); + + Some((prev_latest_processed_nonce + 1, latest_processed_nonce)) + } + + /// Prune at most `max_messages_to_prune` already received messages. + /// + /// Returns number of pruned messages. + pub fn prune_messages(&mut self, max_messages_to_prune: MessageNonce) -> MessageNonce { + let mut pruned_messages = 0; + let mut anything_changed = false; + let mut data = self.storage.data(); + while pruned_messages < max_messages_to_prune && data.oldest_unpruned_nonce <= data.latest_received_nonce { + self.storage.remove_message(&data.oldest_unpruned_nonce); + + anything_changed = true; + pruned_messages += 1; + data.oldest_unpruned_nonce += 1; + } + + if anything_changed { + self.storage.set_data(data); + } + + pruned_messages + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + mock::{run_test, TestRuntime, REGULAR_PAYLOAD, TEST_LANE_ID}, + outbound_lane, + }; + + #[test] + fn send_message_works() { + run_test(|| { + let mut lane = outbound_lane::(TEST_LANE_ID); + assert_eq!(lane.storage.data().latest_generated_nonce, 0); + assert_eq!(lane.send_message(REGULAR_PAYLOAD), 1); + assert!(lane.storage.message(&1).is_some()); + assert_eq!(lane.storage.data().latest_generated_nonce, 1); + }); + } + + #[test] + fn confirm_receival_works() { + run_test(|| { + let mut lane = outbound_lane::(TEST_LANE_ID); + assert_eq!(lane.send_message(REGULAR_PAYLOAD), 1); + assert_eq!(lane.send_message(REGULAR_PAYLOAD), 2); + assert_eq!(lane.send_message(REGULAR_PAYLOAD), 3); + assert_eq!(lane.storage.data().latest_generated_nonce, 3); + assert_eq!(lane.storage.data().latest_received_nonce, 0); + assert_eq!(lane.confirm_receival(3), Some((1, 3))); + assert_eq!(lane.storage.data().latest_generated_nonce, 3); + assert_eq!(lane.storage.data().latest_received_nonce, 3); + }); + } + + #[test] + fn confirm_receival_rejects_nonce_lesser_than_latest_received() { + run_test(|| { + let mut lane = outbound_lane::(TEST_LANE_ID); + lane.send_message(REGULAR_PAYLOAD); + lane.send_message(REGULAR_PAYLOAD); + lane.send_message(REGULAR_PAYLOAD); + assert_eq!(lane.storage.data().latest_generated_nonce, 3); + assert_eq!(lane.storage.data().latest_received_nonce, 0); + assert_eq!(lane.confirm_receival(3), Some((1, 3))); + assert_eq!(lane.confirm_receival(3), None); + assert_eq!(lane.storage.data().latest_generated_nonce, 3); + assert_eq!(lane.storage.data().latest_received_nonce, 3); + + assert_eq!(lane.confirm_receival(2), None); + assert_eq!(lane.storage.data().latest_generated_nonce, 3); + assert_eq!(lane.storage.data().latest_received_nonce, 3); + }); + } + + #[test] + fn confirm_receival_rejects_nonce_larger_than_last_generated() { + run_test(|| { + let mut lane = outbound_lane::(TEST_LANE_ID); + lane.send_message(REGULAR_PAYLOAD); + lane.send_message(REGULAR_PAYLOAD); + lane.send_message(REGULAR_PAYLOAD); + assert_eq!(lane.storage.data().latest_generated_nonce, 3); + assert_eq!(lane.storage.data().latest_received_nonce, 0); + assert_eq!(lane.confirm_receival(10), None); + assert_eq!(lane.storage.data().latest_generated_nonce, 3); + assert_eq!(lane.storage.data().latest_received_nonce, 0); + }); + } + + #[test] + fn confirm_processing_works() { + run_test(|| { + let mut lane = outbound_lane::(TEST_LANE_ID); + assert_eq!(lane.send_message(REGULAR_PAYLOAD), 1); + assert_eq!(lane.send_message(REGULAR_PAYLOAD), 2); + assert_eq!(lane.send_message(REGULAR_PAYLOAD), 3); + assert_eq!(lane.storage.data().latest_generated_nonce, 3); + assert_eq!(lane.storage.data().latest_processed_nonce, 0); + assert_eq!(lane.confirm_receival(3), Some((1, 3))); + assert_eq!(lane.confirm_processing(2), Some((1, 2))); + assert_eq!(lane.storage.data().latest_processed_nonce, 2); + assert_eq!(lane.confirm_processing(3), Some((3, 3))); + assert_eq!(lane.storage.data().latest_generated_nonce, 3); + assert_eq!(lane.storage.data().latest_processed_nonce, 3); + }); + } + + #[test] + fn confirm_processing_rejects_nonce_lesser_than_latest_processed() { + run_test(|| { + let mut lane = outbound_lane::(TEST_LANE_ID); + lane.send_message(REGULAR_PAYLOAD); + lane.send_message(REGULAR_PAYLOAD); + lane.send_message(REGULAR_PAYLOAD); + assert_eq!(lane.storage.data().latest_generated_nonce, 3); + assert_eq!(lane.storage.data().latest_processed_nonce, 0); + assert_eq!(lane.confirm_receival(3), Some((1, 3))); + assert_eq!(lane.confirm_processing(3), Some((1, 3))); + assert_eq!(lane.confirm_processing(3), None); + assert_eq!(lane.storage.data().latest_generated_nonce, 3); + assert_eq!(lane.storage.data().latest_processed_nonce, 3); + + assert_eq!(lane.confirm_processing(2), None); + assert_eq!(lane.storage.data().latest_generated_nonce, 3); + assert_eq!(lane.storage.data().latest_processed_nonce, 3); + }); + } + + #[test] + fn confirm_processing_rejects_nonce_larger_than_last_received() { + run_test(|| { + let mut lane = outbound_lane::(TEST_LANE_ID); + lane.send_message(REGULAR_PAYLOAD); + lane.send_message(REGULAR_PAYLOAD); + lane.send_message(REGULAR_PAYLOAD); + assert_eq!(lane.storage.data().latest_generated_nonce, 3); + assert_eq!(lane.storage.data().latest_processed_nonce, 0); + assert_eq!(lane.confirm_processing(2), None); + assert_eq!(lane.storage.data().latest_generated_nonce, 3); + assert_eq!(lane.storage.data().latest_processed_nonce, 0); + }); + } + + #[test] + fn prune_messages_works() { + run_test(|| { + let mut lane = outbound_lane::(TEST_LANE_ID); + // when lane is empty, nothing is pruned + assert_eq!(lane.prune_messages(100), 0); + assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1); + // when nothing is confirmed, nothing is pruned + lane.send_message(REGULAR_PAYLOAD); + lane.send_message(REGULAR_PAYLOAD); + lane.send_message(REGULAR_PAYLOAD); + assert_eq!(lane.prune_messages(100), 0); + assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1); + // after confirmation, some messages are received + assert_eq!(lane.confirm_receival(2), Some((1, 2))); + assert_eq!(lane.prune_messages(100), 2); + assert_eq!(lane.storage.data().oldest_unpruned_nonce, 3); + // after last message is confirmed, everything is pruned + assert_eq!(lane.confirm_receival(3), Some((3, 3))); + assert_eq!(lane.prune_messages(100), 1); + assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4); + }); + } +} diff --git a/bridges/primitives/message-lane/Cargo.toml b/bridges/primitives/message-lane/Cargo.toml new file mode 100644 index 0000000000000..b937e452ac72f --- /dev/null +++ b/bridges/primitives/message-lane/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "bp-message-lane" +description = "Primitives of message lane module." +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" +license = "GPL-3.0-or-later WITH Classpath-exception-2.0" + +[dependencies] +codec = { package = "parity-scale-codec", version = "1.3.1", default-features = false, features = ["derive"] } + +# Substrate Based Dependencies + +[dependencies.sp-api] +version = "2.0.0-rc6" +tag = 'v2.0.0-rc6' +default-features = false +git = "https://github.com/paritytech/substrate.git" + +[features] +default = ["std"] +std = [ + "codec/std", + "sp-api/std" +] diff --git a/bridges/primitives/message-lane/src/lib.rs b/bridges/primitives/message-lane/src/lib.rs new file mode 100644 index 0000000000000..fce49679469db --- /dev/null +++ b/bridges/primitives/message-lane/src/lib.rs @@ -0,0 +1,135 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Primitives for sending and receiving Substrate <-> Substrate messages. + +#![cfg_attr(not(feature = "std"), no_std)] +// RuntimeApi generated functions +#![allow(clippy::too_many_arguments)] +// Generated by `DecodeLimit::decode_with_depth_limit` +#![allow(clippy::unnecessary_mut_passed)] + +use codec::{Decode, Encode}; +use sp_api::decl_runtime_apis; + +/// Lane identifier. +pub type LaneId = [u8; 4]; + +/// Message nonce. Valid messages will never have 0 nonce. +pub type MessageNonce = u64; + +/// Message key (unique message identifier) as it is stored in the storage. +#[derive(Encode, Decode, Clone)] +pub struct MessageKey { + /// ID of the message lane. + pub lane_id: LaneId, + /// Message nonce. + pub nonce: MessageNonce, +} + +/// Message as it is stored in the storage. +#[derive(Encode, Decode, Clone)] +pub struct Message { + /// Message key. + pub key: MessageKey, + /// Message payload. + pub payload: Payload, +} + +/// Message processing result. +pub enum MessageResult { + /// Message has been processed and should not be queued. + Processed, + /// Message has NOT been processed and should be queued for processing later. + NotProcessed(Message), +} + +/// Called when inbound message is received. +pub trait OnMessageReceived { + /// Called when inbound message is received. + /// + /// It is up to the implementers of this trait to determine whether the message + /// is invalid (i.e. improperly encoded, has too large weight, ...) or not. And, + /// if message is invalid, then it should be dropped immediately (by returning + /// `MessageResult::Processed`), or it'll block the lane forever. + fn on_message_received(&mut self, message: Message) -> MessageResult; +} + +/// Inbound lane data. +#[derive(Encode, Decode, Clone)] +pub struct InboundLaneData { + /// Nonce of oldest message that we haven't processed yet. May point to not-yet-received message if + /// lane is currently empty. + pub oldest_unprocessed_nonce: MessageNonce, + /// Nonce of latest message that we have received from bridged chain. + pub latest_received_nonce: MessageNonce, +} + +impl Default for InboundLaneData { + fn default() -> Self { + InboundLaneData { + // it is 1 because we're processing everything in [oldest_unprocessed_nonce; latest_received_nonce] + oldest_unprocessed_nonce: 1, + latest_received_nonce: 0, + } + } +} + +/// Outbound lane data. +#[derive(Encode, Decode, Clone)] +pub struct OutboundLaneData { + /// Nonce of oldest message that we haven't yet pruned. May point to not-yet-generated message if + /// all sent messages are already pruned. + pub oldest_unpruned_nonce: MessageNonce, + /// Nonce of latest message, received by bridged chain. + pub latest_received_nonce: MessageNonce, + /// Nonce of latest message, processed by bridged chain. + pub latest_processed_nonce: MessageNonce, + /// Nonce of latest message, generated by us. + pub latest_generated_nonce: MessageNonce, +} + +impl Default for OutboundLaneData { + fn default() -> Self { + OutboundLaneData { + // it is 1 because we're pruning everything in [oldest_unpruned_nonce; latest_received_nonce] + oldest_unpruned_nonce: 1, + latest_received_nonce: 0, + latest_processed_nonce: 0, + latest_generated_nonce: 0, + } + } +} + +decl_runtime_apis! { + /// Outbound message lane API. + pub trait OutboundLaneApi { + /// Returns nonce of the latest message, received by bridged chain. + fn latest_received_nonce(lane: LaneId) -> MessageNonce; + /// Returns nonce of the latest message, processed by bridged chain. + fn latest_processed_nonce(lane: LaneId) -> MessageNonce; + /// Returns nonce of the latest message, generated by given lane. + fn latest_generated_nonce(lane: LaneId) -> MessageNonce; + } + + /// Inbound message lane API. + pub trait InboundLaneApi { + /// Returns nonce of the latest message, received by given lane. + fn latest_received_nonce(lane: LaneId) -> MessageNonce; + /// Returns nonce of the latest message, processed by given lane. + fn latest_processed_nonce(lane: LaneId) -> MessageNonce; + } +}