diff --git a/CHANGELOG.md b/CHANGELOG.md index 38b5010900..f7dc8aaf0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,19 +2,28 @@ ## Unreleased +### FEATURES + +- [release] + - Released official Hermes image on Docker Hub ([#894]) + +### IMPROVEMENTS + +- [ibc-relayer] + - Bulk events from all transactions included in a block ([#957]) + ### BUG FIXES - [ibc-relayer-cli] - Prevent sending `ft-transfer` MsgTransfer on a non-Open channel. ([#960]) -### FEATURES - -- [release] - - Official hermes image on Docker Hub. ([#894]) +### BREAKING CHANGES +> Nothing -[#960]: https://github.com/informalsystems/ibc-rs/issues/960 [#894]: https://github.com/informalsystems/ibc-rs/pull/894 +[#957]: https://github.com/informalsystems/ibc-rs/issues/957 +[#960]: https://github.com/informalsystems/ibc-rs/issues/960 ## v0.3.1 *May 14h, 2021* @@ -44,6 +53,8 @@ as well as support Protobuf-encoded keys. ### BREAKING CHANGES +> Nothing + [#875]: https://github.com/informalsystems/ibc-rs/issues/875 [#920]: https://github.com/informalsystems/ibc-rs/issues/920 diff --git a/Cargo.lock b/Cargo.lock index 0354e9e3ef..bc2acb0ed7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1234,6 +1234,7 @@ name = "ibc-relayer" version = "0.3.1" dependencies = [ "anomaly", + "async-stream", "async-trait", "bech32 0.8.0", "bitcoin", diff --git a/relayer-cli/src/commands/listen.rs b/relayer-cli/src/commands/listen.rs index 22e2b38f12..35983b958f 100644 --- a/relayer-cli/src/commands/listen.rs +++ b/relayer-cli/src/commands/listen.rs @@ -1,12 +1,11 @@ -use std::{ops::Deref, sync::Arc, thread}; +use std::{fmt, ops::Deref, str::FromStr, sync::Arc, thread}; use abscissa_core::{application::fatal_error, error::BoxError, Command, Options, Runnable}; use itertools::Itertools; use tokio::runtime::Runtime as TokioRuntime; -use tendermint_rpc::query::{EventType, Query}; +use ibc::{events::IbcEvent, ics24_host::identifier::ChainId}; -use ibc::ics24_host::identifier::ChainId; use ibc_relayer::{ config::ChainConfig, event::monitor::{EventMonitor, EventReceiver}, @@ -14,6 +13,47 @@ use ibc_relayer::{ use crate::prelude::*; +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum EventFilter { + NewBlock, + Tx, +} + +impl EventFilter { + pub fn matches(&self, event: &IbcEvent) -> bool { + match self { + EventFilter::NewBlock => matches!(event, IbcEvent::NewBlock(_)), + EventFilter::Tx => { + !(matches!( + event, + IbcEvent::NewBlock(_) | IbcEvent::Empty(_) | IbcEvent::ChainError(_) + )) + } + } + } +} + +impl fmt::Display for EventFilter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::NewBlock => write!(f, "NewBlock"), + Self::Tx => write!(f, "Tx"), + } + } +} + +impl FromStr for EventFilter { + type Err = BoxError; + + fn from_str(s: &str) -> Result { + match s { + "NewBlock" => Ok(Self::NewBlock), + "Tx" => Ok(Self::Tx), + invalid => Err(format!("unrecognized event type: {}", invalid).into()), + } + } +} + #[derive(Command, Debug, Options)] pub struct ListenCmd { /// Identifier of the chain to listen for events from @@ -22,7 +62,7 @@ pub struct ListenCmd { /// Add an event type to listen for, can be repeated. Listen for all events by default (available: Tx, NewBlock) #[options(short = "e", long = "event", meta = "EVENT")] - events: Vec, + events: Vec, } impl ListenCmd { @@ -34,7 +74,7 @@ impl ListenCmd { .ok_or_else(|| format!("chain '{}' not found in configuration", self.chain_id))?; let events = if self.events.is_empty() { - &[EventType::Tx, EventType::NewBlock] + &[EventFilter::Tx, EventFilter::NewBlock] } else { self.events.as_slice() }; @@ -51,29 +91,52 @@ impl Runnable for ListenCmd { } /// Listen to events -pub fn listen(config: &ChainConfig, events: &[EventType]) -> Result<(), BoxError> { +pub fn listen(config: &ChainConfig, filters: &[EventFilter]) -> Result<(), BoxError> { println!( "[info] Listening for events `{}` on '{}'...", - events.iter().format(", "), + filters.iter().format(", "), config.id ); let rt = Arc::new(TokioRuntime::new()?); - let queries = events.iter().cloned().map(Query::from).collect(); - let (event_monitor, rx) = subscribe(&config, queries, rt)?; + let (event_monitor, rx) = subscribe(&config, rt)?; thread::spawn(|| event_monitor.run()); while let Ok(event_batch) = rx.recv() { - println!("{:#?}", event_batch); + match event_batch { + Ok(batch) => { + let matching_events = batch + .events + .into_iter() + .filter(|e| event_match(&e, filters)) + .collect_vec(); + + if matching_events.is_empty() { + continue; + } + + println!("- Event batch at height {}", batch.height); + + for event in matching_events { + println!("+ {:#?}", event); + } + + println!(); + } + Err(e) => println!("- Error: {}", e), + } } Ok(()) } +fn event_match(event: &IbcEvent, filters: &[EventFilter]) -> bool { + filters.iter().any(|f| f.matches(event)) +} + fn subscribe( chain_config: &ChainConfig, - queries: Vec, rt: Arc, ) -> Result<(EventMonitor, EventReceiver), BoxError> { let (mut event_monitor, rx) = EventMonitor::new( @@ -83,8 +146,6 @@ fn subscribe( ) .map_err(|e| format!("could not initialize event monitor: {}", e))?; - event_monitor.set_queries(queries); - event_monitor .subscribe() .map_err(|e| format!("could not initialize subscriptions: {}", e))?; diff --git a/relayer/Cargo.toml b/relayer/Cargo.toml index de33777cf6..34e49759f0 100644 --- a/relayer/Cargo.toml +++ b/relayer/Cargo.toml @@ -51,6 +51,7 @@ tonic = "0.4" dirs-next = "2.0.0" dyn-clone = "1.0.3" retry = { version = "1.2.1", default-features = false } +async-stream = "0.3.1" [dependencies.tendermint] version = "=0.19.0" diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 581de76352..7b213c0f65 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -1,13 +1,15 @@ -use std::sync::Arc; +use std::{cmp::Ordering, sync::Arc}; use crossbeam_channel as channel; -use futures::stream::StreamExt; -use futures::{stream::select_all, Stream}; -use itertools::Itertools; +use futures::{ + pin_mut, + stream::{self, select_all, StreamExt}, + Stream, +}; use thiserror::Error; use tokio::task::JoinHandle; use tokio::{runtime::Runtime as TokioRuntime, sync::mpsc}; -use tracing::{error, info, trace}; +use tracing::{debug, error, info, trace}; use tendermint_rpc::{ event::Event as RpcEvent, @@ -18,7 +20,10 @@ use tendermint_rpc::{ use ibc::{events::IbcEvent, ics02_client::height::Height, ics24_host::identifier::ChainId}; -use crate::util::retry::{retry_with_index, RetryResult}; +use crate::util::{ + retry::{retry_with_index, RetryResult}, + stream::group_while, +}; mod retry_strategy { use crate::util::retry::clamp_total; @@ -276,24 +281,31 @@ impl EventMonitor { /// Event monitor loop pub fn run(mut self) { - info!(chain.id = %self.chain_id, "starting event monitor"); + debug!(chain.id = %self.chain_id, "starting event monitor"); + + // Take ownership of the subscriptions + let subscriptions = + std::mem::replace(&mut self.subscriptions, Box::new(futures::stream::empty())); + + // Convert the stream of RPC events into a stream of event batches. + let batches = stream_batches(subscriptions, self.chain_id.clone()); + // Needed to be able to poll the stream + pin_mut!(batches); + + // Work around double borrow let rt = self.rt.clone(); loop { let result = rt.block_on(async { tokio::select! { - Some(event) = self.subscriptions.next() => { - event - .map_err(Error::NextEventBatchFailed) - .and_then(|e| self.collect_events(e)) - }, + Some(batch) = batches.next() => Ok(batch), Some(e) = self.rx_err.recv() => Err(Error::WebSocketDriver(e)), } }); match result { - Ok(batches) => self.process_batches(batches).unwrap_or_else(|e| { + Ok(batch) => self.process_batch(batch).unwrap_or_else(|e| { error!("failed to process event batch: {}", e); }), Err(e) => { @@ -307,31 +319,60 @@ impl EventMonitor { } /// Collect the IBC events from the subscriptions - fn process_batches(&self, batches: Vec) -> Result<()> { - for batch in batches { - self.tx_batch - .send(Ok(batch)) - .map_err(|_| Error::ChannelSendFailed)?; - } + fn process_batch(&self, batch: EventBatch) -> Result<()> { + self.tx_batch + .send(Ok(batch)) + .map_err(|_| Error::ChannelSendFailed)?; Ok(()) } +} - /// Collect the IBC events from the subscriptions - fn collect_events(&mut self, event: RpcEvent) -> Result> { - let ibc_events = crate::event::rpc::get_all_events(&self.chain_id, event) - .map_err(Error::CollectEventsFailed)?; - - let events_by_height = ibc_events.into_iter().into_group_map(); - let batches = events_by_height - .into_iter() - .map(|(height, events)| EventBatch { - chain_id: self.chain_id.clone(), - height, - events, - }) - .collect(); - - Ok(batches) - } +/// Collect the IBC events from an RPC event +fn collect_events(chain_id: &ChainId, event: RpcEvent) -> impl Stream { + let events = crate::event::rpc::get_all_events(chain_id, event).unwrap_or_default(); + stream::iter(events) +} + +/// Convert a stream of RPC event into a stream of event batches +fn stream_batches( + subscriptions: Box, + chain_id: ChainId, +) -> impl Stream { + let id = chain_id.clone(); + + // Collect IBC events from each RPC event + let events = subscriptions + .filter_map(|rpc_event| async { rpc_event.ok() }) + .flat_map(move |rpc_event| collect_events(&id, rpc_event)); + + // Group events by height + let grouped = group_while(events, |(h0, _), (h1, _)| h0 == h1); + + // Convert each group to a batch + grouped.map(move |events| { + let height = events + .first() + .map(|(h, _)| h) + .copied() + .expect("internal error: found empty group"); // SAFETY: upheld by `group_while` + + let mut events = events.into_iter().map(|(_, e)| e).collect(); + sort_events(&mut events); + + EventBatch { + height, + events, + chain_id: chain_id.clone(), + } + }) +} + +/// Sort the given events by putting the NewBlock event first, +/// and leaving the other events as is. +fn sort_events(events: &mut Vec) { + events.sort_by(|a, b| match (a, b) { + (IbcEvent::NewBlock(_), _) => Ordering::Less, + _ => Ordering::Equal, + }) } diff --git a/relayer/src/util.rs b/relayer/src/util.rs index 81e784465a..41deb43c83 100644 --- a/relayer/src/util.rs +++ b/relayer/src/util.rs @@ -7,3 +7,4 @@ pub use recv_multiple::{recv_multiple, try_recv_multiple}; pub mod iter; pub mod retry; pub mod sled; +pub mod stream; diff --git a/relayer/src/util/stream.rs b/relayer/src/util/stream.rs new file mode 100644 index 0000000000..a567066349 --- /dev/null +++ b/relayer/src/util/stream.rs @@ -0,0 +1,91 @@ +use async_stream::stream; +use futures::stream::Stream; + +/// ## Example +/// +/// ```rust,ignore +/// let input = stream::iter(vec![0, 0, 0, 1, 1, 2, 3, 3, 3, 3]); +/// let output = group_while(stream, |a, b| a == b).collect::>().await; +/// assert_eq!(output, vec![vec![0, 0, 0], vec![1, 1], vec![2], vec![3, 3, 3, 3]]); +/// ``` +pub fn group_while(input: S, group_these: F) -> impl Stream> +where + S: Stream, + F: Fn(&A, &A) -> bool + 'static, +{ + struct State { + cur: A, + group: Vec, + } + + stream! { + let mut state = None; + + for await x in input { + match &mut state { + None => { + state = Some(State { cur: x, group: vec![] }); + }, + Some(state) if group_these(&state.cur, &x) => { + let prev = std::mem::replace(&mut state.cur, x); + state.group.push(prev); + }, + Some(state) => { + let cur = std::mem::replace(&mut state.cur, x); + state.group.push(cur); + let group = std::mem::replace(&mut state.group, vec![]); + yield group; + } + } + } + + if let Some(State{ cur, mut group }) = state { + group.push(cur); + yield group; + } + } +} + +#[cfg(test)] +mod tests { + use super::group_while; + use futures::{executor::block_on, stream, StreamExt}; + + #[test] + fn group_while_non_empty() { + let input = stream::iter(vec![ + (1, 1), + (1, 2), + (2, 1), + (3, 1), + (3, 2), + (3, 3), + (4, 1), + (5, 1), + (5, 2), + ]); + + let output = group_while(input, |a, b| a.0 == b.0).collect::>(); + let result = block_on(output); + + assert_eq!( + result, + vec![ + vec![(1, 1), (1, 2)], + vec![(2, 1)], + vec![(3, 1), (3, 2), (3, 3)], + vec![(4, 1)], + vec![(5, 1), (5, 2)] + ] + ); + } + + #[test] + fn group_while_empty() { + let input = stream::iter(Vec::::new()); + let output = group_while(input, |a, b| a == b).collect::>(); + let result = block_on(output); + + assert_eq!(result, Vec::>::new()); + } +}