diff --git a/guide/src/tutorial_raw.md b/guide/src/tutorial_raw.md index 85d2721ef7..62ff3aa35f 100644 --- a/guide/src/tutorial_raw.md +++ b/guide/src/tutorial_raw.md @@ -33,7 +33,7 @@ Currently, cosmos-SDK implementation uses: - `connection-` for connections - For example `connection-0` is assigned to the first connection created on `ibc-1`: ```shell - tx raw conn-init ibc-1 ibc-0 07-tendermint-0 07-tendermint-0 | jq + hermes tx raw conn-init ibc-1 ibc-0 07-tendermint-0 07-tendermint-0 | jq ``` ```json { diff --git a/relayer-cli/src/conclude.rs b/relayer-cli/src/conclude.rs index c60d8b8a1d..01a0d124bb 100644 --- a/relayer-cli/src/conclude.rs +++ b/relayer-cli/src/conclude.rs @@ -35,7 +35,9 @@ //! #[derive(Debug, Error)] //! pub enum Kind { //! #[error("failed with underlying causes: {0}, {1}")] -//! Query(String, String), // ... +//! Query(String, String), +//! // ... +//! } //! ``` //! //! - Exit from a query/tx with success: diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 50005719ab..c30b4f13b0 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -24,6 +24,12 @@ pub struct EventBatch { pub events: Vec, } +impl EventBatch { + pub fn unwrap_or_clone(self: Arc) -> Self { + Arc::try_unwrap(self).unwrap_or_else(|batch| batch.as_ref().clone()) + } +} + type SubscriptionResult = Result; type SubscriptionStream = dyn Stream + Send + Sync + Unpin; diff --git a/relayer/src/link.rs b/relayer/src/link.rs index 226aff61dd..12674f8923 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -1,5 +1,5 @@ +use std::thread; use std::time::Duration; -use std::{sync::Arc, thread}; use prost_types::Any; use thiserror::Error; @@ -411,12 +411,9 @@ impl RelayPath { Err(LinkError::OldPacketClearingFailed) } - /// Iterate through the IBC Events, build the message for each and collect all at same height. - /// Send a multi message transaction with these, prepending the client update - pub fn relay_from_events(&mut self, batch: Arc) -> Result<(), LinkError> { + pub fn clear_packets(&mut self, height: Height) -> Result<(), LinkError> { if self.clear_packets { - self.src_height = batch - .height + self.src_height = height .decrement() .map_err(|e| LinkError::Failed(e.to_string()))?; @@ -424,6 +421,14 @@ impl RelayPath { self.clear_packets = false; } + Ok(()) + } + + /// Iterate through the IBC Events, build the message for each and collect all at same height. + /// Send a multi message transaction with these, prepending the client update + pub fn relay_from_events(&mut self, batch: EventBatch) -> Result<(), LinkError> { + self.clear_packets(batch.height)?; + // collect relevant events in self.all_events self.collect_events(&batch.events); self.adjust_events_height()?; @@ -893,12 +898,12 @@ impl Link { return Ok(()); } - if let Ok(events) = events_a.try_recv() { - self.a_to_b.relay_from_events(events)?; + if let Ok(batch) = events_a.try_recv() { + self.a_to_b.relay_from_events(batch.unwrap_or_clone())?; } - if let Ok(events) = events_b.try_recv() { - self.b_to_a.relay_from_events(events)?; + if let Ok(batch) = events_b.try_recv() { + self.b_to_a.relay_from_events(batch.unwrap_or_clone())?; } // TODO - select over the two subscriptions diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 6b5a98fcec..e7c86e44fc 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -1,13 +1,18 @@ -use std::{collections::HashMap, sync::Arc, thread::JoinHandle, time::Duration}; +use std::{ + collections::HashMap, + thread::{self, JoinHandle}, + time::Duration, +}; use anomaly::BoxError; use crossbeam_channel::{Receiver, Sender}; -use itertools::Itertools; use ibc::{ events::IbcEvent, + ics02_client::events::NewBlock, ics04_channel::events::{CloseInit, SendPacket, TimeoutPacket, WriteAcknowledgement}, ics24_host::identifier::{ChainId, ChannelId, PortId}, + Height, }; use crate::{ @@ -16,28 +21,58 @@ use crate::{ link::{Link, LinkParameters}, }; -pub struct WorkerCmd { - pub batch: Arc, -} - -impl WorkerCmd { - pub fn new(batch: Arc) -> Self { - Self { batch } - } +/// A command for a [`Worker`]. +pub enum WorkerCmd { + /// A batch of packet events need to be relayed + PacketEvents { batch: EventBatch }, + /// A batch of [`NewBlock`] events need to be relayed + NewBlocks { + height: Height, + new_blocks: Vec, + }, } +/// Handle to a [`Worker`], for sending [`WorkerCmd`]s to it. pub struct WorkerHandle { - pub tx: Sender, - pub thread_handle: JoinHandle<()>, + tx: Sender, + thread_handle: JoinHandle<()>, } impl WorkerHandle { - pub fn handle_packet_events(&self, batch: EventBatch) -> Result<(), BoxError> { - self.tx.send(WorkerCmd::new(Arc::new(batch)))?; + /// Send a batch of packet events to the worker. + pub fn send_packet_events( + &self, + height: Height, + events: Vec, + chain_id: ChainId, + ) -> Result<(), BoxError> { + let batch = EventBatch { + height, + events, + chain_id, + }; + + self.tx.send(WorkerCmd::PacketEvents { batch })?; + Ok(()) + } + + /// Send a batch of [`NewBlock`] events to the worker. + pub fn send_new_blocks( + &self, + height: Height, + new_blocks: Vec, + ) -> Result<(), BoxError> { + self.tx.send(WorkerCmd::NewBlocks { height, new_blocks })?; Ok(()) } + + /// Wait for the worker thread to finish. + pub fn join(self) -> thread::Result<()> { + self.thread_handle.join() + } } +/// A pair of [`ChainHandle`]s. #[derive(Clone)] pub struct ChainHandlePair { pub a: Box, @@ -45,6 +80,7 @@ pub struct ChainHandlePair { } impl ChainHandlePair { + /// Swap the two handles. pub fn swap(self) -> Self { Self { a: self.b, @@ -53,12 +89,16 @@ impl ChainHandlePair { } } +/// The supervisor listens for events on a pair of chains, +/// and dispatches the events it receives to the appropriate +/// worker, based on the [`Object`] associated with each event. pub struct Supervisor { chains: ChainHandlePair, workers: HashMap, } impl Supervisor { + /// Spawn a supervisor which listens for events on the two given chains. pub fn spawn( chain_a: Box, chain_b: Box, @@ -74,56 +114,68 @@ impl Supervisor { }) } + /// Run the supervisor event loop. pub fn run(mut self) -> Result<(), BoxError> { let subscription_a = self.chains.a.subscribe()?; let subscription_b = self.chains.b.subscribe()?; loop { for batch in subscription_a.try_iter() { - self.process_batch(batch)?; + self.process_batch(batch.unwrap_or_clone())?; } for batch in subscription_b.try_iter() { - self.process_batch(batch)?; + self.process_batch(batch.unwrap_or_clone())?; } std::thread::sleep(Duration::from_millis(600)); } } - fn process_batch(&mut self, batch: Arc) -> Result<(), BoxError> { - // TODO(romac): Need to send NewBlock events to all workers + /// Process a batch of events received from a chain. + fn process_batch(&mut self, batch: EventBatch) -> Result<(), BoxError> { + let height = batch.height; + let chain_id = batch.chain_id.clone(); + let direction = if chain_id == self.chains.a.id() { + Direction::AtoB + } else { + Direction::BtoA + }; - let events = collect_events(&batch.events, batch.chain_id.clone()); - let events_per_object = events.into_iter().into_group_map(); + let collected = collect_events(batch); - for (object, events) in events_per_object.into_iter() { - if events.is_empty() { - return Ok(()); + if collected.has_new_blocks() { + for worker in self.workers.values() { + worker.send_new_blocks(height, collected.new_blocks.clone())?; } + } - let worker_batch = EventBatch { - height: batch.height, - chain_id: batch.chain_id.clone(), - events, - }; + for (object, events) in collected.per_object.into_iter() { + if events.is_empty() { + continue; + } - let is_dest = batch.chain_id == self.chains.b.id(); - let worker = self.worker_for_object(object, is_dest); - worker.handle_packet_events(worker_batch)?; + let worker = self.worker_for_object(object, direction); + worker.send_packet_events(height, events, chain_id.clone())?; } Ok(()) } - fn worker_for_object(&mut self, object: Object, swap: bool) -> &WorkerHandle { + /// Get a handle to the worker in charge of handling events associated + /// with the given [`Object`]. + /// + /// This function will spawn a new [`Worker`] if one does not exists already. + /// + /// The `direction` parameter indicates in which direction the worker should + /// relay events. + fn worker_for_object(&mut self, object: Object, direction: Direction) -> &WorkerHandle { if self.workers.contains_key(&object) { &self.workers[&object] } else { - let chains = if swap { - self.chains.clone().swap() - } else { - self.chains.clone() + let chains = match direction { + Direction::AtoB => self.chains.clone(), + Direction::BtoA => self.chains.clone().swap(), }; let worker = Worker::spawn(chains, object.clone()); @@ -132,12 +184,23 @@ impl Supervisor { } } +/// The direction in which a [`Worker`] should relay events. +#[derive(Copy, Clone, Debug)] +pub enum Direction { + /// From chain A to chain B. + AtoB, + /// From chain B to chain A. + BtoA, +} + +/// A worker processes batches of events associated with a given [`Object`]. pub struct Worker { chains: ChainHandlePair, rx: Receiver, } impl Worker { + /// Spawn a worker which relay events pertaining to `object` between two `chains`. pub fn spawn(chains: ChainHandlePair, object: Object) -> WorkerHandle { let (tx, rx) = crossbeam_channel::unbounded(); @@ -147,7 +210,8 @@ impl Worker { WorkerHandle { tx, thread_handle } } - pub fn run(self, object: Object) { + /// Run the worker event loop. + fn run(self, object: Object) { let result = match object { Object::UnidirectionalChannelPath(path) => self.run_uni_chan_path(path), }; @@ -157,6 +221,7 @@ impl Worker { } } + /// Run the event loop for events associated with a [`UnidirectionalChannelPath`]. fn run_uni_chan_path(self, path: UnidirectionalChannelPath) -> Result<(), BoxError> { println!("running worker for object {:?}", path); @@ -170,22 +235,39 @@ impl Worker { )?; while let Ok(cmd) = self.rx.recv() { - link.a_to_b.relay_from_events(cmd.batch)?; + match cmd { + WorkerCmd::PacketEvents { batch } => link.a_to_b.relay_from_events(batch)?, + WorkerCmd::NewBlocks { + height, + new_blocks: _, + } => link.a_to_b.clear_packets(height)?, + } } Ok(()) } } +/// A unidirectional path from a source chain, channel and port. #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct UnidirectionalChannelPath { + /// Source chain identifier. pub src_chain_id: ChainId, + /// Source channel identiier. pub src_channel_id: ChannelId, + /// Source port identiier. pub src_port_id: PortId, } +/// An object determines the amount of parallelism that can +/// be exercised when processing [`IbcEvent`] between +/// two chains. For each [`Object`], a corresponding +/// [`Worker`] is spawned and all [`IbcEvent`]s mapped +/// to an [`Object`] are sent to the associated [`Worker`] +/// for processing. #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum Object { + /// See [`UnidirectionalChannelPath`]. UnidirectionalChannelPath(UnidirectionalChannelPath), } @@ -196,6 +278,7 @@ impl From for Object { } impl Object { + /// Build the object associated with the given [`SendPacket`] event. pub fn for_send_packet(e: &SendPacket, chain_id: &ChainId) -> Self { UnidirectionalChannelPath { src_chain_id: chain_id.clone(), @@ -205,6 +288,7 @@ impl Object { .into() } + /// Build the object associated with the given [`WriteAcknowledgement`] event. pub fn for_write_ack(e: &WriteAcknowledgement, chain_id: &ChainId) -> Self { UnidirectionalChannelPath { src_chain_id: chain_id.clone(), @@ -214,6 +298,7 @@ impl Object { .into() } + /// Build the object associated with the given [`TimeoutPacket`] event. pub fn for_timeout_packet(e: &TimeoutPacket, chain_id: &ChainId) -> Self { UnidirectionalChannelPath { src_chain_id: chain_id.clone(), @@ -223,6 +308,7 @@ impl Object { .into() } + /// Build the object associated with the given [`CloseInit`] event. pub fn for_close_init_channel(e: &CloseInit, chain_id: &ChainId) -> Self { UnidirectionalChannelPath { src_chain_id: chain_id.clone(), @@ -233,21 +319,63 @@ impl Object { } } -fn collect_events(events: &[IbcEvent], chain_id: ChainId) -> Vec<(Object, IbcEvent)> { - events - .iter() - .filter_map(|e| match e { - IbcEvent::SendPacket(p) => Some((Object::for_send_packet(p, &chain_id), e.clone())), - IbcEvent::TimeoutPacket(p) => { - Some((Object::for_timeout_packet(p, &chain_id), e.clone())) +/// Describes the result of [`collect_events`]. +#[derive(Clone, Debug)] +pub struct CollectedEvents { + /// The height at which these events were emitted from the chain. + pub height: Height, + /// The chain from which the events were emitted. + pub chain_id: ChainId, + /// [`NewBlock`] events collected from the [`EventBatch`]. + pub new_blocks: Vec, + /// Mapping between [`Object`]s and their associated [`IbcEvent`]s. + pub per_object: HashMap>, +} + +impl CollectedEvents { + pub fn new(height: Height, chain_id: ChainId) -> Self { + Self { + height, + chain_id, + new_blocks: Default::default(), + per_object: Default::default(), + } + } + + /// Whether the collected events include any [`NewBlock`] events. + pub fn has_new_blocks(&self) -> bool { + !self.new_blocks.is_empty() + } +} + +/// Collect the events we are interested in from an [`EventBatch`], +/// and maps each [`IbcEvent`] to their corresponding [`Object`]. +pub fn collect_events(batch: EventBatch) -> CollectedEvents { + let mut collected = CollectedEvents::new(batch.height, batch.chain_id); + for event in batch.events { + match event { + IbcEvent::NewBlock(inner) => { + collected.new_blocks.push(inner); } - IbcEvent::WriteAcknowledgement(p) => { - Some((Object::for_write_ack(p, &chain_id), e.clone())) + IbcEvent::SendPacket(ref inner) => { + let object = Object::for_send_packet(inner, &collected.chain_id); + collected.per_object.entry(object).or_default().push(event); } - IbcEvent::CloseInitChannel(p) => { - Some((Object::for_close_init_channel(p, &chain_id), e.clone())) + IbcEvent::TimeoutPacket(ref inner) => { + let object = Object::for_timeout_packet(inner, &collected.chain_id); + collected.per_object.entry(object).or_default().push(event); } - _ => None, - }) - .collect() + IbcEvent::WriteAcknowledgement(ref inner) => { + let object = Object::for_write_ack(inner, &collected.chain_id); + collected.per_object.entry(object).or_default().push(event); + } + IbcEvent::CloseInitChannel(ref inner) => { + let object = Object::for_close_init_channel(inner, &collected.chain_id); + collected.per_object.entry(object).or_default().push(event); + } + _ => (), + } + } + + collected }