Skip to content

Commit

Permalink
Re-enable events filtering in listen command
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed May 20, 2021
1 parent c017717 commit 122954b
Showing 1 changed file with 64 additions and 13 deletions.
77 changes: 64 additions & 13 deletions relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,59 @@
use std::{ops::Deref, sync::Arc, thread};
use std::{fmt, ops::Deref, str::FromStr, sync::Arc, thread};

use abscissa_core::{application::fatal_error, error::BoxError, Command, Options, Runnable};
use itertools::Itertools;
use tokio::runtime::Runtime as TokioRuntime;

use tendermint_rpc::query::{EventType, Query};
use ibc::{events::IbcEvent, ics24_host::identifier::ChainId};

use ibc::ics24_host::identifier::ChainId;
use ibc_relayer::{
config::ChainConfig,
event::monitor::{EventMonitor, EventReceiver},
};

use crate::prelude::*;

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum EventFilter {
NewBlock,
Tx,
}

impl EventFilter {
pub fn matches(&self, event: &IbcEvent) -> bool {
match self {
EventFilter::NewBlock => matches!(event, IbcEvent::NewBlock(_)),
EventFilter::Tx => {
!(matches!(
event,
IbcEvent::NewBlock(_) | IbcEvent::Empty(_) | IbcEvent::ChainError(_)
))
}
}
}
}

impl fmt::Display for EventFilter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NewBlock => write!(f, "NewBlock"),
Self::Tx => write!(f, "Tx"),
}
}
}

impl FromStr for EventFilter {
type Err = BoxError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"NewBlock" => Ok(Self::NewBlock),
"Tx" => Ok(Self::Tx),
invalid => Err(format!("unrecognized event type: {}", invalid).into()),
}
}
}

#[derive(Command, Debug, Options)]
pub struct ListenCmd {
/// Identifier of the chain to listen for events from
Expand All @@ -22,7 +62,7 @@ pub struct ListenCmd {

/// Add an event type to listen for, can be repeated. Listen for all events by default (available: Tx, NewBlock)
#[options(short = "e", long = "event", meta = "EVENT")]
events: Vec<EventType>,
events: Vec<EventFilter>,
}

impl ListenCmd {
Expand All @@ -34,7 +74,7 @@ impl ListenCmd {
.ok_or_else(|| format!("chain '{}' not found in configuration", self.chain_id))?;

let events = if self.events.is_empty() {
&[EventType::Tx, EventType::NewBlock]
&[EventFilter::Tx, EventFilter::NewBlock]
} else {
self.events.as_slice()
};
Expand All @@ -51,29 +91,42 @@ impl Runnable for ListenCmd {
}

/// Listen to events
pub fn listen(config: &ChainConfig, events: &[EventType]) -> Result<(), BoxError> {
pub fn listen(config: &ChainConfig, filters: &[EventFilter]) -> Result<(), BoxError> {
println!(
"[info] Listening for events `{}` on '{}'...",
events.iter().format(", "),
filters.iter().format(", "),
config.id
);

let rt = Arc::new(TokioRuntime::new()?);
let queries = events.iter().cloned().map(Query::from).collect();
let (event_monitor, rx) = subscribe(&config, queries, rt)?;
let (event_monitor, rx) = subscribe(&config, rt)?;

thread::spawn(|| event_monitor.run());

while let Ok(event_batch) = rx.recv() {
println!("{:#?}", event_batch);
match event_batch {
Ok(batch) => {
println!("- Event batch at height {}", batch.height);
for event in batch.events {
if event_match(&event, filters) {
println!("+ {:#?}", event);
}
}
println!();
}
Err(e) => println!("- Error: {}", e),
}
}

Ok(())
}

fn event_match(event: &IbcEvent, filters: &[EventFilter]) -> bool {
filters.iter().any(|f| f.matches(event))
}

fn subscribe(
chain_config: &ChainConfig,
queries: Vec<Query>,
rt: Arc<TokioRuntime>,
) -> Result<(EventMonitor, EventReceiver), BoxError> {
let (mut event_monitor, rx) = EventMonitor::new(
Expand All @@ -83,8 +136,6 @@ fn subscribe(
)
.map_err(|e| format!("could not initialize event monitor: {}", e))?;

event_monitor.set_queries(queries);

event_monitor
.subscribe()
.map_err(|e| format!("could not initialize subscriptions: {}", e))?;
Expand Down

0 comments on commit 122954b

Please sign in to comment.