From 0362b4459e325c7237874df28d1fba28e51d6470 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 19 May 2021 13:18:40 +0200 Subject: [PATCH 1/7] event monitor: Bulk events from all transactions included in a block --- Cargo.lock | 1 + relayer/Cargo.toml | 1 + relayer/src/event/monitor.rs | 93 ++++++++++++++++++++++-------------- relayer/src/util.rs | 1 + relayer/src/util/stream.rs | 74 ++++++++++++++++++++++++++++ 5 files changed, 135 insertions(+), 35 deletions(-) create mode 100644 relayer/src/util/stream.rs diff --git a/Cargo.lock b/Cargo.lock index 0354e9e3ef..bc2acb0ed7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1234,6 +1234,7 @@ name = "ibc-relayer" version = "0.3.1" dependencies = [ "anomaly", + "async-stream", "async-trait", "bech32 0.8.0", "bitcoin", diff --git a/relayer/Cargo.toml b/relayer/Cargo.toml index de33777cf6..34e49759f0 100644 --- a/relayer/Cargo.toml +++ b/relayer/Cargo.toml @@ -51,6 +51,7 @@ tonic = "0.4" dirs-next = "2.0.0" dyn-clone = "1.0.3" retry = { version = "1.2.1", default-features = false } +async-stream = "0.3.1" [dependencies.tendermint] version = "=0.19.0" diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 581de76352..f31b0bebb6 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -1,13 +1,15 @@ use std::sync::Arc; use crossbeam_channel as channel; -use futures::stream::StreamExt; -use futures::{stream::select_all, Stream}; -use itertools::Itertools; +use futures::{ + pin_mut, + stream::{self, select_all, StreamExt}, + Stream, +}; use thiserror::Error; use tokio::task::JoinHandle; use tokio::{runtime::Runtime as TokioRuntime, sync::mpsc}; -use tracing::{error, info, trace}; +use tracing::{debug, error, info, trace}; use tendermint_rpc::{ event::Event as RpcEvent, @@ -18,7 +20,10 @@ use tendermint_rpc::{ use ibc::{events::IbcEvent, ics02_client::height::Height, ics24_host::identifier::ChainId}; -use crate::util::retry::{retry_with_index, RetryResult}; +use crate::util::{ + retry::{retry_with_index, RetryResult}, + stream::group_while, +}; mod retry_strategy { use crate::util::retry::clamp_total; @@ -276,24 +281,30 @@ impl EventMonitor { /// Event monitor loop pub fn run(mut self) { - info!(chain.id = %self.chain_id, "starting event monitor"); + debug!(chain.id = %self.chain_id, "starting event monitor"); let rt = self.rt.clone(); + // Take ownership of the subscriptions + let subscriptions = + std::mem::replace(&mut self.subscriptions, Box::new(futures::stream::empty())); + + // Convert the stream of RPC events into a stream of event batches. + let batches = stream_batches(subscriptions, self.chain_id.clone()); + + // Needed to be able to poll the stream + pin_mut!(batches); + loop { let result = rt.block_on(async { tokio::select! { - Some(event) = self.subscriptions.next() => { - event - .map_err(Error::NextEventBatchFailed) - .and_then(|e| self.collect_events(e)) - }, + Some(batch) = batches.next() => Ok(batch), Some(e) = self.rx_err.recv() => Err(Error::WebSocketDriver(e)), } }); match result { - Ok(batches) => self.process_batches(batches).unwrap_or_else(|e| { + Ok(batch) => self.process_batch(batch).unwrap_or_else(|e| { error!("failed to process event batch: {}", e); }), Err(e) => { @@ -307,31 +318,43 @@ impl EventMonitor { } /// Collect the IBC events from the subscriptions - fn process_batches(&self, batches: Vec) -> Result<()> { - for batch in batches { - self.tx_batch - .send(Ok(batch)) - .map_err(|_| Error::ChannelSendFailed)?; - } + fn process_batch(&self, batch: EventBatch) -> Result<()> { + self.tx_batch + .send(Ok(batch)) + .map_err(|_| Error::ChannelSendFailed)?; Ok(()) } +} - /// Collect the IBC events from the subscriptions - fn collect_events(&mut self, event: RpcEvent) -> Result> { - let ibc_events = crate::event::rpc::get_all_events(&self.chain_id, event) - .map_err(Error::CollectEventsFailed)?; - - let events_by_height = ibc_events.into_iter().into_group_map(); - let batches = events_by_height - .into_iter() - .map(|(height, events)| EventBatch { - chain_id: self.chain_id.clone(), - height, - events, - }) - .collect(); - - Ok(batches) - } +/// Collect the IBC events from an RPC event +fn collect_events(chain_id: &ChainId, event: RpcEvent) -> impl Stream { + let events = crate::event::rpc::get_all_events(chain_id, event).unwrap_or_default(); + stream::iter(events) +} + +/// Convert a stream of RPC event into a stream of event batches +fn stream_batches( + subscriptions: Box, + chain_id: ChainId, +) -> impl Stream { + let id = chain_id.clone(); + + // Collect IBC events from each RPC event + let batches = subscriptions + .filter_map(|rpc_event| async { rpc_event.ok() }) + .flat_map(move |rpc_event| collect_events(&id, rpc_event)); + + // Group events by height + let grouped = group_while(batches, |(h0, _), (h1, _)| h0 == h1); + + // Convert each group to a batch + grouped.map(move |events| { + let (height, _) = events.first().expect("internal error: found empty group"); // SAFETY: upheld by `group_while` + EventBatch { + height: *height, + chain_id: chain_id.clone(), + events: events.into_iter().map(|e| e.1).collect(), + } + }) } diff --git a/relayer/src/util.rs b/relayer/src/util.rs index 81e784465a..41deb43c83 100644 --- a/relayer/src/util.rs +++ b/relayer/src/util.rs @@ -7,3 +7,4 @@ pub use recv_multiple::{recv_multiple, try_recv_multiple}; pub mod iter; pub mod retry; pub mod sled; +pub mod stream; diff --git a/relayer/src/util/stream.rs b/relayer/src/util/stream.rs new file mode 100644 index 0000000000..96639b3701 --- /dev/null +++ b/relayer/src/util/stream.rs @@ -0,0 +1,74 @@ +use async_stream::stream; +use futures::stream::Stream; + +/// ## Example +/// +/// ```rust,ignore +/// let input = stream::iter(vec![0, 0, 0, 1, 1, 2, 3, 3, 3, 3]); +/// let output = group_while(stream, |a, b| a == b).collect::>().await; +/// assert_eq!(output, vec![vec![0, 0, 0], vec![1, 1], vec![2], vec![3, 3, 3, 3]]); +/// ``` +pub fn group_while(input: S, group_these: F) -> impl Stream> +where + S: Stream, + F: Fn(&A, &A) -> bool + 'static, +{ + struct State { + cur: A, + group: Vec, + } + + stream! { + let mut state = None; + + for await x in input { + match &mut state { + None => { + state = Some(State { cur: x, group: vec![] }); + }, + Some(state) if group_these(&state.cur, &x) => { + let prev = std::mem::replace(&mut state.cur, x); + state.group.push(prev); + }, + Some(state) => { + let cur = std::mem::replace(&mut state.cur, x); + state.group.push(cur); + let group = std::mem::replace(&mut state.group, vec![]); + yield group; + } + } + } + + if let Some(State{ cur, mut group }) = state { + group.push(cur); + yield group; + } + } +} + +#[cfg(test)] +mod tests { + use super::group_while; + use futures::{executor::block_on, stream, StreamExt}; + + #[test] + fn group_while_non_empty() { + let input = stream::iter(vec![1, 1, 2, 3, 3, 3, 4, 5, 5]); + let output = group_while(input, |a, b| a == b).collect::>(); + let result = block_on(output); + + assert_eq!( + result, + vec![vec![1, 1], vec![2], vec![3, 3, 3], vec![4], vec![5, 5]] + ); + } + + #[test] + fn group_while_empty() { + let input = stream::iter(Vec::::new()); + let output = group_while(input, |a, b| a == b).collect::>(); + let result = block_on(output); + + assert_eq!(result, Vec::>::new()); + } +} From c1fb08ac4a71e1ae85170079e3e0402ae3fc51a8 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 19 May 2021 16:00:41 +0200 Subject: [PATCH 2/7] Update changelog --- CHANGELOG.md | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0afec248d7..3ec3e6a98b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,24 @@ ## Unreleased -> Nothing yet, +### FEATURES + +> Nothing + +### IMPROVEMENTS + +- [ibc-relayer] + - Bulk events from all transactions included in a block ([#957]) + +### BUG FIXES + +> Nothing + +### BREAKING CHANGES + +> Nothing + +[#957]: https://github.com/informalsystems/ibc-rs/issues/957 ## v0.3.1 *May 14h, 2021* @@ -32,6 +49,8 @@ as well as support Protobuf-encoded keys. ### BREAKING CHANGES +> Nothing + [#875]: https://github.com/informalsystems/ibc-rs/issues/875 [#920]: https://github.com/informalsystems/ibc-rs/issues/920 From c917f3940fd03c66c93062f19b6a8db44edf8dd8 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 20 May 2021 14:11:14 +0200 Subject: [PATCH 3/7] Improve unit test to ensure items are not re-ordered --- relayer/src/util/stream.rs | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/relayer/src/util/stream.rs b/relayer/src/util/stream.rs index 96639b3701..a567066349 100644 --- a/relayer/src/util/stream.rs +++ b/relayer/src/util/stream.rs @@ -53,13 +53,30 @@ mod tests { #[test] fn group_while_non_empty() { - let input = stream::iter(vec![1, 1, 2, 3, 3, 3, 4, 5, 5]); - let output = group_while(input, |a, b| a == b).collect::>(); + let input = stream::iter(vec![ + (1, 1), + (1, 2), + (2, 1), + (3, 1), + (3, 2), + (3, 3), + (4, 1), + (5, 1), + (5, 2), + ]); + + let output = group_while(input, |a, b| a.0 == b.0).collect::>(); let result = block_on(output); assert_eq!( result, - vec![vec![1, 1], vec![2], vec![3, 3, 3], vec![4], vec![5, 5]] + vec![ + vec![(1, 1), (1, 2)], + vec![(2, 1)], + vec![(3, 1), (3, 2), (3, 3)], + vec![(4, 1)], + vec![(5, 1), (5, 2)] + ] ); } From f76b565989fdc545f3f85d78b7c5cb0565d2693f Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 20 May 2021 15:10:39 +0200 Subject: [PATCH 4/7] Cleanup --- relayer/src/event/monitor.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index f31b0bebb6..68e6f4142a 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -283,8 +283,6 @@ impl EventMonitor { pub fn run(mut self) { debug!(chain.id = %self.chain_id, "starting event monitor"); - let rt = self.rt.clone(); - // Take ownership of the subscriptions let subscriptions = std::mem::replace(&mut self.subscriptions, Box::new(futures::stream::empty())); @@ -295,6 +293,9 @@ impl EventMonitor { // Needed to be able to poll the stream pin_mut!(batches); + // Work around double borrow + let rt = self.rt.clone(); + loop { let result = rt.block_on(async { tokio::select! { @@ -341,12 +342,12 @@ fn stream_batches( let id = chain_id.clone(); // Collect IBC events from each RPC event - let batches = subscriptions + let events = subscriptions .filter_map(|rpc_event| async { rpc_event.ok() }) .flat_map(move |rpc_event| collect_events(&id, rpc_event)); // Group events by height - let grouped = group_while(batches, |(h0, _), (h1, _)| h0 == h1); + let grouped = group_while(events, |(h0, _), (h1, _)| h0 == h1); // Convert each group to a batch grouped.map(move |events| { From c01771765ceab051f20385e91c989daab9304b8a Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 20 May 2021 18:04:40 +0200 Subject: [PATCH 5/7] Ensure NewBlock event is always first in the event batch --- relayer/src/event/monitor.rs | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 68e6f4142a..7b213c0f65 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{cmp::Ordering, sync::Arc}; use crossbeam_channel as channel; use futures::{ @@ -351,11 +351,28 @@ fn stream_batches( // Convert each group to a batch grouped.map(move |events| { - let (height, _) = events.first().expect("internal error: found empty group"); // SAFETY: upheld by `group_while` + let height = events + .first() + .map(|(h, _)| h) + .copied() + .expect("internal error: found empty group"); // SAFETY: upheld by `group_while` + + let mut events = events.into_iter().map(|(_, e)| e).collect(); + sort_events(&mut events); + EventBatch { - height: *height, + height, + events, chain_id: chain_id.clone(), - events: events.into_iter().map(|e| e.1).collect(), } }) } + +/// Sort the given events by putting the NewBlock event first, +/// and leaving the other events as is. +fn sort_events(events: &mut Vec) { + events.sort_by(|a, b| match (a, b) { + (IbcEvent::NewBlock(_), _) => Ordering::Less, + _ => Ordering::Equal, + }) +} From 122954b43e79b42b6c6514a70abdfdfa210f4319 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 20 May 2021 18:22:49 +0200 Subject: [PATCH 6/7] Re-enable events filtering in `listen` command --- relayer-cli/src/commands/listen.rs | 77 +++++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 13 deletions(-) diff --git a/relayer-cli/src/commands/listen.rs b/relayer-cli/src/commands/listen.rs index 22e2b38f12..c294aa5e3c 100644 --- a/relayer-cli/src/commands/listen.rs +++ b/relayer-cli/src/commands/listen.rs @@ -1,12 +1,11 @@ -use std::{ops::Deref, sync::Arc, thread}; +use std::{fmt, ops::Deref, str::FromStr, sync::Arc, thread}; use abscissa_core::{application::fatal_error, error::BoxError, Command, Options, Runnable}; use itertools::Itertools; use tokio::runtime::Runtime as TokioRuntime; -use tendermint_rpc::query::{EventType, Query}; +use ibc::{events::IbcEvent, ics24_host::identifier::ChainId}; -use ibc::ics24_host::identifier::ChainId; use ibc_relayer::{ config::ChainConfig, event::monitor::{EventMonitor, EventReceiver}, @@ -14,6 +13,47 @@ use ibc_relayer::{ use crate::prelude::*; +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum EventFilter { + NewBlock, + Tx, +} + +impl EventFilter { + pub fn matches(&self, event: &IbcEvent) -> bool { + match self { + EventFilter::NewBlock => matches!(event, IbcEvent::NewBlock(_)), + EventFilter::Tx => { + !(matches!( + event, + IbcEvent::NewBlock(_) | IbcEvent::Empty(_) | IbcEvent::ChainError(_) + )) + } + } + } +} + +impl fmt::Display for EventFilter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::NewBlock => write!(f, "NewBlock"), + Self::Tx => write!(f, "Tx"), + } + } +} + +impl FromStr for EventFilter { + type Err = BoxError; + + fn from_str(s: &str) -> Result { + match s { + "NewBlock" => Ok(Self::NewBlock), + "Tx" => Ok(Self::Tx), + invalid => Err(format!("unrecognized event type: {}", invalid).into()), + } + } +} + #[derive(Command, Debug, Options)] pub struct ListenCmd { /// Identifier of the chain to listen for events from @@ -22,7 +62,7 @@ pub struct ListenCmd { /// Add an event type to listen for, can be repeated. Listen for all events by default (available: Tx, NewBlock) #[options(short = "e", long = "event", meta = "EVENT")] - events: Vec, + events: Vec, } impl ListenCmd { @@ -34,7 +74,7 @@ impl ListenCmd { .ok_or_else(|| format!("chain '{}' not found in configuration", self.chain_id))?; let events = if self.events.is_empty() { - &[EventType::Tx, EventType::NewBlock] + &[EventFilter::Tx, EventFilter::NewBlock] } else { self.events.as_slice() }; @@ -51,29 +91,42 @@ impl Runnable for ListenCmd { } /// Listen to events -pub fn listen(config: &ChainConfig, events: &[EventType]) -> Result<(), BoxError> { +pub fn listen(config: &ChainConfig, filters: &[EventFilter]) -> Result<(), BoxError> { println!( "[info] Listening for events `{}` on '{}'...", - events.iter().format(", "), + filters.iter().format(", "), config.id ); let rt = Arc::new(TokioRuntime::new()?); - let queries = events.iter().cloned().map(Query::from).collect(); - let (event_monitor, rx) = subscribe(&config, queries, rt)?; + let (event_monitor, rx) = subscribe(&config, rt)?; thread::spawn(|| event_monitor.run()); while let Ok(event_batch) = rx.recv() { - println!("{:#?}", event_batch); + match event_batch { + Ok(batch) => { + println!("- Event batch at height {}", batch.height); + for event in batch.events { + if event_match(&event, filters) { + println!("+ {:#?}", event); + } + } + println!(); + } + Err(e) => println!("- Error: {}", e), + } } Ok(()) } +fn event_match(event: &IbcEvent, filters: &[EventFilter]) -> bool { + filters.iter().any(|f| f.matches(event)) +} + fn subscribe( chain_config: &ChainConfig, - queries: Vec, rt: Arc, ) -> Result<(EventMonitor, EventReceiver), BoxError> { let (mut event_monitor, rx) = EventMonitor::new( @@ -83,8 +136,6 @@ fn subscribe( ) .map_err(|e| format!("could not initialize event monitor: {}", e))?; - event_monitor.set_queries(queries); - event_monitor .subscribe() .map_err(|e| format!("could not initialize subscriptions: {}", e))?; From e6693ef9e75bf94e3e229eb52c3c3c5c0cd2654d Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 20 May 2021 19:11:54 +0200 Subject: [PATCH 7/7] Only print event batch header if there are matching events --- relayer-cli/src/commands/listen.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/relayer-cli/src/commands/listen.rs b/relayer-cli/src/commands/listen.rs index c294aa5e3c..35983b958f 100644 --- a/relayer-cli/src/commands/listen.rs +++ b/relayer-cli/src/commands/listen.rs @@ -106,12 +106,22 @@ pub fn listen(config: &ChainConfig, filters: &[EventFilter]) -> Result<(), BoxEr while let Ok(event_batch) = rx.recv() { match event_batch { Ok(batch) => { + let matching_events = batch + .events + .into_iter() + .filter(|e| event_match(&e, filters)) + .collect_vec(); + + if matching_events.is_empty() { + continue; + } + println!("- Event batch at height {}", batch.height); - for event in batch.events { - if event_match(&event, filters) { - println!("+ {:#?}", event); - } + + for event in matching_events { + println!("+ {:#?}", event); } + println!(); } Err(e) => println!("- Error: {}", e),