Skip to content

Commit

Permalink
Fix bug in event monitor where subscriptions would not be terminated …
Browse files Browse the repository at this point in the history
…properly on WebSocket error (informalsystems#290)

* Store the combined stream of all subscriptions within the EventMonitor

* Explicitly close the WebSocket client when this one fails to ensure that subscriptions are terminated

* Fix event type in Tx subscription

* Fix build for latest tendermint-rs master

* Formatting

Co-authored-by: Thane Thomson <[email protected]>
  • Loading branch information
romac and thanethomson authored Oct 8, 2020
1 parent baf27d3 commit e889ff3
Showing 1 changed file with 47 additions and 35 deletions.
82 changes: 47 additions & 35 deletions relayer/src/event_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use ibc::events::IBCEvent;
use tendermint::{chain, net, Error as TMError};
use tendermint_rpc::Subscription;
use tendermint_rpc::{SubscriptionClient, WebSocketClient};
use tendermint_rpc::{
query::EventType, query::Query, Subscription, SubscriptionClient, WebSocketClient,
};
use tokio::stream::StreamExt;
use tokio::sync::mpsc::Sender;

use futures::stream::select_all;
use tracing::{debug, info};
use futures::{stream::select_all, Stream};
use tracing::{debug, error, info};

type SubscriptionResult = Result<tendermint_rpc::event::Event, tendermint_rpc::Error>;
type SubscriptionStream = dyn Stream<Item = SubscriptionResult> + Send + Sync + Unpin;

/// Connect to a TM node, receive push events over a websocket and filter them for the
/// event handler.
Expand All @@ -19,9 +23,9 @@ pub struct EventMonitor {
/// Node Address
node_addr: net::Address,
/// Queries
event_queries: Vec<String>,
/// Subscriptions
subscriptions: Vec<Subscription>,
event_queries: Vec<Query>,
/// All subscriptions combined in a single stream
subscriptions: Box<SubscriptionStream>,
}

impl EventMonitor {
Expand All @@ -34,35 +38,29 @@ impl EventMonitor {
let websocket_client = WebSocketClient::new(rpc_addr.clone()).await?;

// TODO: move them to config file(?)
let event_queries = vec![
"tm.event='NewTx'".to_string(),
"tm.event='NewBlock'".to_string(),
];
let event_queries = vec![Query::from(EventType::Tx), Query::from(EventType::NewBlock)];

Ok(EventMonitor {
chain_id,
websocket_client,
channel_to_handler,
node_addr: rpc_addr.clone(),
subscriptions: Vec::with_capacity(event_queries.len()),
event_queries,
subscriptions: Box::new(futures::stream::empty()),
})
}

/// Terminate and clear the current subscriptions, and subscribe again to all queries.
/// Clear the current subscriptions, and subscribe again to all queries.
pub async fn subscribe(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let count = self.subscriptions.len();
let subscriptions = std::mem::replace(&mut self.subscriptions, Vec::with_capacity(count));

for subscription in subscriptions {
subscription.terminate().await?;
}
let mut subscriptions = vec![];

for query in &self.event_queries {
let subscription = self.websocket_client.subscribe(query.clone()).await?;
self.subscriptions.push(subscription);
subscriptions.push(subscription);
}

self.subscriptions = Box::new(select_all(subscriptions));

Ok(())
}

Expand All @@ -75,20 +73,30 @@ impl EventMonitor {
Ok(..) => continue,
Err(err) => {
debug!("Web socket error: {}", err);

// Try to reconnect
let websocket_client = WebSocketClient::new(self.node_addr.clone())
let mut websocket_client = WebSocketClient::new(self.node_addr.clone())
.await
.unwrap_or_else(|e| {
debug!("Error on reconnection {}", e);
panic!("Abort on failed reconnection")
debug!("Error on reconnection: {}", e);
panic!("Abort on failed reconnection");
});

// Swap the new client with the previous one which failed,
// so that we can shut the latter down gracefully.
std::mem::swap(&mut self.websocket_client, &mut websocket_client);

debug!("Reconnected");
self.websocket_client = websocket_client;

// Shut down previous client
debug!("Gracefully shutting down previous client");
websocket_client.close().await.unwrap_or_else(|e| {
error!("Failed to close previous WebSocket client: {}", e);
});

// Try to resubscribe
if let Err(err) = self.subscribe().await {
debug!("Error on recreating subscriptions {}", err);
debug!("Error on recreating subscriptions: {}", err);
panic!("Abort during reconnection");
};
}
Expand All @@ -98,18 +106,22 @@ impl EventMonitor {

/// Collect the IBC events from the subscriptions
pub async fn collect_events(&mut self) -> Result<(), TMError> {
if let Some(event) = select_all(&mut self.subscriptions).next().await {
match event {
Ok(event) => {
if let Ok(ibc_events) = ibc::events::get_all_events(event) {
// TODO - send_timeout()?
self.channel_to_handler
.send((self.chain_id, ibc_events))
.await?;
tokio::select! {
Some(event) = self.subscriptions.next() => {
match event {
Ok(event) => {
if let Ok(ibc_events) = ibc::events::get_all_events(event) {
// TODO - send_timeout()?
self.channel_to_handler
.send((self.chain_id, ibc_events))
.await?;
}
}
Err(err) => {
error!("Error on collecting events from subscriptions: {}", err);
}
}
Err(_e) => (), // TODO,
}
},
}

Ok(())
Expand Down

0 comments on commit e889ff3

Please sign in to comment.