diff --git a/relayer/src/event_monitor.rs b/relayer/src/event_monitor.rs index e90302fb84..63683a9def 100644 --- a/relayer/src/event_monitor.rs +++ b/relayer/src/event_monitor.rs @@ -1,12 +1,16 @@ use ibc::events::IBCEvent; use tendermint::{chain, net, Error as TMError}; -use tendermint_rpc::Subscription; -use tendermint_rpc::{SubscriptionClient, WebSocketClient}; +use tendermint_rpc::{ + query::EventType, query::Query, Subscription, SubscriptionClient, WebSocketClient, +}; use tokio::stream::StreamExt; use tokio::sync::mpsc::Sender; -use futures::stream::select_all; -use tracing::{debug, info}; +use futures::{stream::select_all, Stream}; +use tracing::{debug, error, info}; + +type SubscriptionResult = Result; +type SubscriptionStream = dyn Stream + Send + Sync + Unpin; /// Connect to a TM node, receive push events over a websocket and filter them for the /// event handler. @@ -19,9 +23,9 @@ pub struct EventMonitor { /// Node Address node_addr: net::Address, /// Queries - event_queries: Vec, - /// Subscriptions - subscriptions: Vec, + event_queries: Vec, + /// All subscriptions combined in a single stream + subscriptions: Box, } impl EventMonitor { @@ -34,35 +38,29 @@ impl EventMonitor { let websocket_client = WebSocketClient::new(rpc_addr.clone()).await?; // TODO: move them to config file(?) - let event_queries = vec![ - "tm.event='NewTx'".to_string(), - "tm.event='NewBlock'".to_string(), - ]; + let event_queries = vec![Query::from(EventType::Tx), Query::from(EventType::NewBlock)]; Ok(EventMonitor { chain_id, websocket_client, channel_to_handler, node_addr: rpc_addr.clone(), - subscriptions: Vec::with_capacity(event_queries.len()), event_queries, + subscriptions: Box::new(futures::stream::empty()), }) } - /// Terminate and clear the current subscriptions, and subscribe again to all queries. + /// Clear the current subscriptions, and subscribe again to all queries. pub async fn subscribe(&mut self) -> Result<(), Box> { - let count = self.subscriptions.len(); - let subscriptions = std::mem::replace(&mut self.subscriptions, Vec::with_capacity(count)); - - for subscription in subscriptions { - subscription.terminate().await?; - } + let mut subscriptions = vec![]; for query in &self.event_queries { let subscription = self.websocket_client.subscribe(query.clone()).await?; - self.subscriptions.push(subscription); + subscriptions.push(subscription); } + self.subscriptions = Box::new(select_all(subscriptions)); + Ok(()) } @@ -75,20 +73,30 @@ impl EventMonitor { Ok(..) => continue, Err(err) => { debug!("Web socket error: {}", err); + // Try to reconnect - let websocket_client = WebSocketClient::new(self.node_addr.clone()) + let mut websocket_client = WebSocketClient::new(self.node_addr.clone()) .await .unwrap_or_else(|e| { - debug!("Error on reconnection {}", e); - panic!("Abort on failed reconnection") + debug!("Error on reconnection: {}", e); + panic!("Abort on failed reconnection"); }); + // Swap the new client with the previous one which failed, + // so that we can shut the latter down gracefully. + std::mem::swap(&mut self.websocket_client, &mut websocket_client); + debug!("Reconnected"); - self.websocket_client = websocket_client; + + // Shut down previous client + debug!("Gracefully shutting down previous client"); + websocket_client.close().await.unwrap_or_else(|e| { + error!("Failed to close previous WebSocket client: {}", e); + }); // Try to resubscribe if let Err(err) = self.subscribe().await { - debug!("Error on recreating subscriptions {}", err); + debug!("Error on recreating subscriptions: {}", err); panic!("Abort during reconnection"); }; } @@ -98,18 +106,22 @@ impl EventMonitor { /// Collect the IBC events from the subscriptions pub async fn collect_events(&mut self) -> Result<(), TMError> { - if let Some(event) = select_all(&mut self.subscriptions).next().await { - match event { - Ok(event) => { - if let Ok(ibc_events) = ibc::events::get_all_events(event) { - // TODO - send_timeout()? - self.channel_to_handler - .send((self.chain_id, ibc_events)) - .await?; + tokio::select! { + Some(event) = self.subscriptions.next() => { + match event { + Ok(event) => { + if let Ok(ibc_events) = ibc::events::get_all_events(event) { + // TODO - send_timeout()? + self.channel_to_handler + .send((self.chain_id, ibc_events)) + .await?; + } + } + Err(err) => { + error!("Error on collecting events from subscriptions: {}", err); } } - Err(_e) => (), // TODO, - } + }, } Ok(())