From 122954b43e79b42b6c6514a70abdfdfa210f4319 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 20 May 2021 18:22:49 +0200 Subject: [PATCH] Re-enable events filtering in `listen` command --- relayer-cli/src/commands/listen.rs | 77 +++++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 13 deletions(-) diff --git a/relayer-cli/src/commands/listen.rs b/relayer-cli/src/commands/listen.rs index 22e2b38f12..c294aa5e3c 100644 --- a/relayer-cli/src/commands/listen.rs +++ b/relayer-cli/src/commands/listen.rs @@ -1,12 +1,11 @@ -use std::{ops::Deref, sync::Arc, thread}; +use std::{fmt, ops::Deref, str::FromStr, sync::Arc, thread}; use abscissa_core::{application::fatal_error, error::BoxError, Command, Options, Runnable}; use itertools::Itertools; use tokio::runtime::Runtime as TokioRuntime; -use tendermint_rpc::query::{EventType, Query}; +use ibc::{events::IbcEvent, ics24_host::identifier::ChainId}; -use ibc::ics24_host::identifier::ChainId; use ibc_relayer::{ config::ChainConfig, event::monitor::{EventMonitor, EventReceiver}, @@ -14,6 +13,47 @@ use ibc_relayer::{ use crate::prelude::*; +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum EventFilter { + NewBlock, + Tx, +} + +impl EventFilter { + pub fn matches(&self, event: &IbcEvent) -> bool { + match self { + EventFilter::NewBlock => matches!(event, IbcEvent::NewBlock(_)), + EventFilter::Tx => { + !(matches!( + event, + IbcEvent::NewBlock(_) | IbcEvent::Empty(_) | IbcEvent::ChainError(_) + )) + } + } + } +} + +impl fmt::Display for EventFilter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::NewBlock => write!(f, "NewBlock"), + Self::Tx => write!(f, "Tx"), + } + } +} + +impl FromStr for EventFilter { + type Err = BoxError; + + fn from_str(s: &str) -> Result { + match s { + "NewBlock" => Ok(Self::NewBlock), + "Tx" => Ok(Self::Tx), + invalid => Err(format!("unrecognized event type: {}", invalid).into()), + } + } +} + #[derive(Command, Debug, Options)] pub struct ListenCmd { /// Identifier of the chain to listen for events from @@ -22,7 +62,7 @@ pub struct ListenCmd { /// Add an event type to listen for, can be repeated. Listen for all events by default (available: Tx, NewBlock) #[options(short = "e", long = "event", meta = "EVENT")] - events: Vec, + events: Vec, } impl ListenCmd { @@ -34,7 +74,7 @@ impl ListenCmd { .ok_or_else(|| format!("chain '{}' not found in configuration", self.chain_id))?; let events = if self.events.is_empty() { - &[EventType::Tx, EventType::NewBlock] + &[EventFilter::Tx, EventFilter::NewBlock] } else { self.events.as_slice() }; @@ -51,29 +91,42 @@ impl Runnable for ListenCmd { } /// Listen to events -pub fn listen(config: &ChainConfig, events: &[EventType]) -> Result<(), BoxError> { +pub fn listen(config: &ChainConfig, filters: &[EventFilter]) -> Result<(), BoxError> { println!( "[info] Listening for events `{}` on '{}'...", - events.iter().format(", "), + filters.iter().format(", "), config.id ); let rt = Arc::new(TokioRuntime::new()?); - let queries = events.iter().cloned().map(Query::from).collect(); - let (event_monitor, rx) = subscribe(&config, queries, rt)?; + let (event_monitor, rx) = subscribe(&config, rt)?; thread::spawn(|| event_monitor.run()); while let Ok(event_batch) = rx.recv() { - println!("{:#?}", event_batch); + match event_batch { + Ok(batch) => { + println!("- Event batch at height {}", batch.height); + for event in batch.events { + if event_match(&event, filters) { + println!("+ {:#?}", event); + } + } + println!(); + } + Err(e) => println!("- Error: {}", e), + } } Ok(()) } +fn event_match(event: &IbcEvent, filters: &[EventFilter]) -> bool { + filters.iter().any(|f| f.matches(event)) +} + fn subscribe( chain_config: &ChainConfig, - queries: Vec, rt: Arc, ) -> Result<(EventMonitor, EventReceiver), BoxError> { let (mut event_monitor, rx) = EventMonitor::new( @@ -83,8 +136,6 @@ fn subscribe( ) .map_err(|e| format!("could not initialize event monitor: {}", e))?; - event_monitor.set_queries(queries); - event_monitor .subscribe() .map_err(|e| format!("could not initialize subscriptions: {}", e))?;