From 39305e071d2ed644d6bbd2feb0fe1a182cc59fd0 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 5 May 2021 17:55:36 +0200 Subject: [PATCH 01/13] Restart event monitor when node goes down and propagate errors through the subscription channel --- relayer-cli/src/commands/listen.rs | 13 +- relayer-cli/src/commands/misbehaviour.rs | 59 ++++--- relayer/src/chain.rs | 12 +- relayer/src/chain/cosmos.rs | 11 +- relayer/src/chain/handle.rs | 11 +- relayer/src/chain/mock.rs | 10 +- relayer/src/chain/runtime.rs | 13 +- relayer/src/event/monitor.rs | 202 +++++++++++++++-------- relayer/src/link.rs | 21 ++- relayer/src/supervisor.rs | 22 ++- 10 files changed, 230 insertions(+), 144 deletions(-) diff --git a/relayer-cli/src/commands/listen.rs b/relayer-cli/src/commands/listen.rs index d7aa193d47..32326194cc 100644 --- a/relayer-cli/src/commands/listen.rs +++ b/relayer-cli/src/commands/listen.rs @@ -8,7 +8,10 @@ use tokio::runtime::Runtime as TokioRuntime; use tendermint_rpc::query::{EventType, Query}; use ibc::ics24_host::identifier::ChainId; -use ibc_relayer::{config::ChainConfig, event::monitor::*}; +use ibc_relayer::{ + config::ChainConfig, + event::monitor::{Error as EventMonitorError, EventBatch, EventMonitor}, +}; use crate::prelude::*; @@ -73,7 +76,13 @@ fn subscribe( chain_config: &ChainConfig, queries: Vec, rt: Arc, -) -> Result<(EventMonitor, channel::Receiver), BoxError> { +) -> Result< + ( + EventMonitor, + channel::Receiver>, + ), + BoxError, +> { let (mut event_monitor, rx) = EventMonitor::new( chain_config.id.clone(), chain_config.websocket_addr.clone(), diff --git a/relayer-cli/src/commands/misbehaviour.rs b/relayer-cli/src/commands/misbehaviour.rs index 589ffc6054..e29217e00f 100644 --- a/relayer-cli/src/commands/misbehaviour.rs +++ b/relayer-cli/src/commands/misbehaviour.rs @@ -4,6 +4,7 @@ use ibc::ics02_client::events::UpdateClient; use ibc::ics02_client::height::Height; use ibc::ics24_host::identifier::{ChainId, ClientId}; use ibc_relayer::chain::handle::ChainHandle; +use ibc_relayer::event::monitor::UnwrapOrClone; use ibc_relayer::foreign_client::{ForeignClient, MisbehaviourResults}; use crate::application::CliApp; @@ -52,32 +53,40 @@ pub fn monitor_misbehaviour( let subscription = chain.subscribe()?; // check previous updates that may have been missed - misbehaviour_handling(chain.clone(), config, client_id, None)?; + misbehaviour_handling(chain.clone(), config, client_id.clone(), None)?; // process update client events while let Ok(event_batch) = subscription.recv() { - for event in event_batch.events.iter() { - match event { - IbcEvent::UpdateClient(update) => { - debug!("{:?}", update); - misbehaviour_handling( - chain.clone(), - config, - update.client_id(), - Some(update.clone()), - )?; + let event_batch = event_batch.unwrap_or_clone(); + match event_batch { + Ok(event_batch) => { + for event in event_batch.events { + match event { + IbcEvent::UpdateClient(update) => { + debug!("{:?}", update); + misbehaviour_handling( + chain.clone(), + config, + update.client_id().clone(), + Some(update), + )?; + } + + IbcEvent::CreateClient(_create) => { + // TODO - get header from full node, consensus state from chain, compare + } + + IbcEvent::ClientMisbehaviour(ref _misbehaviour) => { + // TODO - submit misbehaviour to the witnesses (our full node) + return Ok(Some(event)); + } + + _ => {} + } } - - IbcEvent::CreateClient(_create) => { - // TODO - get header from full node, consensus state from chain, compare - } - - IbcEvent::ClientMisbehaviour(_misbehaviour) => { - // TODO - submit misbehaviour to the witnesses (our full node) - return Ok(Some(event.clone())); - } - - _ => {} + } + Err(e) => { + dbg!(e); } } } @@ -88,11 +97,11 @@ pub fn monitor_misbehaviour( fn misbehaviour_handling( chain: Box, config: &config::Reader, - client_id: &ClientId, + client_id: ClientId, update: Option, ) -> Result<(), BoxError> { let client_state = chain - .query_client_state(client_id, Height::zero()) + .query_client_state(&client_id, Height::zero()) .map_err(|e| format!("could not query client state for {}: {}", client_id, e))?; if client_state.is_frozen() { @@ -108,7 +117,7 @@ fn misbehaviour_handling( ) })?; - let client = ForeignClient::restore(client_id, chain.clone(), counterparty_chain.clone()); + let client = ForeignClient::restore(&client_id, chain.clone(), counterparty_chain.clone()); let result = client.detect_misbehaviour_and_submit_evidence(update); if let MisbehaviourResults::EvidenceSubmitted(events) = result { info!("evidence submission result {:?}", events); diff --git a/relayer/src/chain.rs b/relayer/src/chain.rs index 4afeca8a1b..1117bb29cd 100644 --- a/relayer/src/chain.rs +++ b/relayer/src/chain.rs @@ -1,6 +1,5 @@ use std::{sync::Arc, thread}; -use crossbeam_channel as channel; use prost_types::Any; use tendermint::block::Height; use tokio::runtime::Runtime as TokioRuntime; @@ -33,12 +32,11 @@ use ibc_proto::ibc::core::connection::v1::{ QueryClientConnectionsRequest, QueryConnectionsRequest, }; -use crate::config::ChainConfig; use crate::connection::ConnectionMsgType; use crate::error::{Error, Kind}; -use crate::event::monitor::EventBatch; use crate::keyring::{KeyEntry, KeyRing}; use crate::light_client::LightClient; +use crate::{config::ChainConfig, event::monitor::EventReceiver}; pub(crate) mod cosmos; pub mod handle; @@ -89,13 +87,7 @@ pub trait Chain: Sized { fn init_event_monitor( &self, rt: Arc, - ) -> Result< - ( - channel::Receiver, - Option>, - ), - Error, - >; + ) -> Result<(EventReceiver, Option>), Error>; /// Returns the chain's identifier fn id(&self) -> &ChainId; diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index b27c6c277b..2c473a863f 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -6,7 +6,6 @@ use std::{ use anomaly::fail; use bech32::{ToBase32, Variant}; use bitcoin::hashes::hex::ToHex; -use crossbeam_channel as channel; use prost::Message; use prost_types::Any; use tendermint::abci::Path as TendermintABCIPath; @@ -65,7 +64,7 @@ use ibc_proto::ibc::core::connection::v1::{ use crate::chain::QueryResponse; use crate::config::ChainConfig; use crate::error::{Error, Kind}; -use crate::event::monitor::{EventBatch, EventMonitor}; +use crate::event::monitor::{EventMonitor, EventReceiver}; use crate::keyring::{KeyEntry, KeyRing, Store}; use crate::light_client::tendermint::LightClient as TmLightClient; use crate::light_client::LightClient; @@ -361,13 +360,7 @@ impl Chain for CosmosSdkChain { fn init_event_monitor( &self, rt: Arc, - ) -> Result< - ( - channel::Receiver, - Option>, - ), - Error, - > { + ) -> Result<(EventReceiver, Option>), Error> { crate::time!("init_event_monitor"); let (mut event_monitor, event_receiver) = EventMonitor::new( diff --git a/relayer/src/chain/handle.rs b/relayer/src/chain/handle.rs index f95a11f2ff..3b582044fb 100644 --- a/relayer/src/chain/handle.rs +++ b/relayer/src/chain/handle.rs @@ -34,13 +34,16 @@ use ibc_proto::ibc::core::client::v1::QueryConsensusStatesRequest; use ibc_proto::ibc::core::commitment::v1::MerkleProof; pub use prod::ProdChainHandle; -use crate::connection::ConnectionMsgType; -use crate::keyring::KeyEntry; -use crate::{error::Error, event::monitor::EventBatch}; +use crate::{ + connection::ConnectionMsgType, + error::Error, + event::monitor::{Error as EventMonitorError, EventBatch}, + keyring::KeyEntry, +}; mod prod; -pub type Subscription = channel::Receiver>; +pub type Subscription = channel::Receiver>>; pub type ReplyTo = channel::Sender>; pub type Reply = channel::Receiver>; diff --git a/relayer/src/chain/mock.rs b/relayer/src/chain/mock.rs index 7c181408a1..486feebcb4 100644 --- a/relayer/src/chain/mock.rs +++ b/relayer/src/chain/mock.rs @@ -41,7 +41,7 @@ use ibc_proto::ibc::core::connection::v1::{ use crate::chain::Chain; use crate::config::ChainConfig; use crate::error::{Error, Kind}; -use crate::event::monitor::EventBatch; +use crate::event::monitor::EventReceiver; use crate::keyring::{KeyEntry, KeyRing}; use crate::light_client::{mock::LightClient as MockLightClient, LightClient}; @@ -79,13 +79,7 @@ impl Chain for MockChain { fn init_event_monitor( &self, _rt: Arc, - ) -> Result< - ( - channel::Receiver, - Option>, - ), - Error, - > { + ) -> Result<(EventReceiver, Option>), Error> { let (_, rx) = channel::unbounded(); Ok((rx, None)) } diff --git a/relayer/src/chain/runtime.rs b/relayer/src/chain/runtime.rs index c205aad5cf..b73adafb1d 100644 --- a/relayer/src/chain/runtime.rs +++ b/relayer/src/chain/runtime.rs @@ -41,7 +41,10 @@ use crate::{ config::ChainConfig, connection::ConnectionMsgType, error::{Error, Kind}, - event::{bus::EventBus, monitor::EventBatch}, + event::{ + bus::EventBus, + monitor::{Error as EventMonitorError, EventBatch}, + }, keyring::KeyEntry, light_client::LightClient, }; @@ -69,10 +72,10 @@ pub struct ChainRuntime { request_receiver: channel::Receiver, /// An event bus, for broadcasting events that this runtime receives (via `event_receiver`) to subscribers - event_bus: EventBus>, + event_bus: EventBus>>, /// Receiver channel from the event bus - event_receiver: channel::Receiver, + event_receiver: channel::Receiver>, /// A handle to the light client light_client: Box>, @@ -110,7 +113,7 @@ impl ChainRuntime { fn init( chain: C, light_client: Box>, - event_receiver: channel::Receiver, + event_receiver: channel::Receiver>, rt: Arc, ) -> (Box, thread::JoinHandle<()>) { let chain_runtime = Self::new(chain, light_client, event_receiver, rt); @@ -128,7 +131,7 @@ impl ChainRuntime { fn new( chain: C, light_client: Box>, - event_receiver: channel::Receiver, + event_receiver: channel::Receiver>, rt: Arc, ) -> Self { let (request_sender, request_receiver) = channel::unbounded::(); diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 59ee7be917..849ee40858 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -1,33 +1,45 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use crossbeam_channel as channel; use futures::stream::StreamExt; use futures::{stream::select_all, Stream}; use itertools::Itertools; -use tendermint_rpc::{query::EventType, query::Query, SubscriptionClient, WebSocketClient}; use thiserror::Error; -use tokio::runtime::Runtime as TokioRuntime; use tokio::task::JoinHandle; +use tokio::{runtime::Runtime as TokioRuntime, sync::mpsc}; use tracing::{debug, error, info, warn}; +use tendermint_rpc::{ + event::Event as RpcEvent, + query::{EventType, Query}, + Error as RpcError, Result as RpcResult, SubscriptionClient, WebSocketClient, + WebSocketClientDriver, +}; + use ibc::{events::IbcEvent, ics02_client::height::Height, ics24_host::identifier::ChainId}; +const DEFAULT_RETRIES: usize = 100; +const DEFAULT_DELAY: Duration = Duration::from_secs(5); + #[derive(Debug, Clone, Error)] pub enum Error { - #[error("failed to create WebSocket client driver: {0}")] - ClientCreationFailed(tendermint_rpc::Error), + #[error("WebSocket driver failed: {0}")] + WebSocketDriver(RpcError), - #[error("failed to terminate previous WebSocket client driver: {0}")] + #[error("failed to create WebSocket driver: {0}")] + ClientCreationFailed(RpcError), + + #[error("failed to terminate previous WebSocket driver: {0}")] ClientTerminationFailed(Arc), - #[error("failed to run previous WebSocket client driver to completion: {0}")] - ClientCompletionFailed(tendermint_rpc::Error), + #[error("failed to run previous WebSocket driver to completion: {0}")] + ClientCompletionFailed(RpcError), #[error("failed to subscribe to events via WebSocket client: {0}")] - ClientSubscriptionFailed(tendermint_rpc::Error), + ClientSubscriptionFailed(RpcError), #[error("failed to collect events over WebSocket subscription: {0}")] - NextEventBatchFailed(tendermint_rpc::Error), + NextEventBatchFailed(RpcError), #[error("failed to extract IBC events: {0}")] CollectEventsFailed(String), @@ -44,15 +56,23 @@ pub struct EventBatch { pub events: Vec, } -impl EventBatch { - pub fn unwrap_or_clone(self: Arc) -> Self { +pub trait UnwrapOrClone { + fn unwrap_or_clone(self: Arc) -> Self; +} + +impl UnwrapOrClone for Result { + fn unwrap_or_clone(self: Arc) -> Self { Arc::try_unwrap(self).unwrap_or_else(|batch| batch.as_ref().clone()) } } -type SubscriptionResult = Result; +type SubscriptionResult = RpcResult; type SubscriptionStream = dyn Stream + Send + Sync + Unpin; +pub type Result = std::result::Result; + +pub type EventReceiver = channel::Receiver>; + /// Connect to a Tendermint node, subscribe to a set of queries, /// receive push events over a websocket, and filter them for the /// event handler. @@ -66,11 +86,15 @@ type SubscriptionStream = dyn Stream + Send + Sync + pub struct EventMonitor { chain_id: ChainId, /// WebSocket to collect events from - websocket_client: WebSocketClient, + client: WebSocketClient, /// Async task handle for the WebSocket client's driver - websocket_driver_handle: JoinHandle>, + driver_handle: JoinHandle<()>, /// Channel to handler where the monitor for this chain sends the events - tx_batch: channel::Sender, + tx_batch: channel::Sender>, + /// Channel where to receive client driver errors + rx_err: mpsc::UnboundedReceiver, + /// Channel where to send client driver errors + tx_err: mpsc::UnboundedSender, /// Node Address node_addr: tendermint_rpc::Url, /// Queries @@ -81,21 +105,33 @@ pub struct EventMonitor { rt: Arc, } +async fn run_driver( + driver: WebSocketClientDriver, + tx: mpsc::UnboundedSender, +) { + if let Err(e) = driver.run().await { + if tx.send(e).is_err() { + println!("failed to relay driver error to event monitor"); + } + } +} + impl EventMonitor { /// Create an event monitor, and connect to a node pub fn new( chain_id: ChainId, node_addr: tendermint_rpc::Url, rt: Arc, - ) -> Result<(Self, channel::Receiver), Error> { - let (tx, rx) = channel::unbounded(); + ) -> Result<(Self, EventReceiver)> { + let (tx_batch, rx_batch) = channel::unbounded(); let ws_addr = node_addr.clone(); - let (websocket_client, websocket_driver) = rt - .block_on(async move { WebSocketClient::new(ws_addr.clone()).await }) + let (client, driver) = rt + .block_on(async move { WebSocketClient::new(ws_addr).await }) .map_err(Error::ClientCreationFailed)?; - let websocket_driver_handle = rt.spawn(websocket_driver.run()); + let (tx_err, rx_err) = mpsc::unbounded_channel(); + let websocket_driver_handle = rt.spawn(run_driver(driver, tx_err.clone())); // TODO: move them to config file(?) let event_queries = vec![Query::from(EventType::Tx), Query::from(EventType::NewBlock)]; @@ -103,15 +139,17 @@ impl EventMonitor { let monitor = Self { rt, chain_id, - websocket_client, - websocket_driver_handle, + client, + driver_handle: websocket_driver_handle, event_queries, - tx_batch: tx, + tx_batch, + rx_err, + tx_err, node_addr, subscriptions: Box::new(futures::stream::empty()), }; - Ok((monitor, rx)) + Ok((monitor, rx_batch)) } /// Set the queries to subscribe to. @@ -131,7 +169,7 @@ impl EventMonitor { } /// Clear the current subscriptions, and subscribe again to all queries. - pub fn subscribe(&mut self) -> Result<(), Error> { + pub fn subscribe(&mut self) -> Result<()> { let mut subscriptions = vec![]; for query in &self.event_queries { @@ -139,7 +177,7 @@ impl EventMonitor { let subscription = self .rt - .block_on(self.websocket_client.subscribe(query.clone())) + .block_on(self.client.subscribe(query.clone())) .map_err(Error::ClientSubscriptionFailed)?; subscriptions.push(subscription); @@ -152,85 +190,108 @@ impl EventMonitor { Ok(()) } - fn try_reconnect(&mut self) -> Result<(), Error> { + fn try_reconnect(&mut self) -> Result<()> { warn!( "trying to reconnect to WebSocket endpoint: {}", self.node_addr ); // Try to reconnect - let (mut websocket_client, websocket_driver) = self + let (mut client, driver) = self .rt .block_on(WebSocketClient::new(self.node_addr.clone())) .map_err(Error::ClientCreationFailed)?; - let mut websocket_driver_handle = self.rt.spawn(websocket_driver.run()); + let mut driver_handle = self.rt.spawn(run_driver(driver, self.tx_err.clone())); // Swap the new client with the previous one which failed, // so that we can shut the latter down gracefully. - std::mem::swap(&mut self.websocket_client, &mut websocket_client); - std::mem::swap( - &mut self.websocket_driver_handle, - &mut websocket_driver_handle, - ); + std::mem::swap(&mut self.client, &mut client); + std::mem::swap(&mut self.driver_handle, &mut driver_handle); warn!("reconnected to WebSocket endpoint: {}", self.node_addr); // Shut down previous client debug!("gracefully shutting down previous client"); - if let Err(e) = websocket_client.close() { + if let Err(e) = client.close() { error!("previous websocket client closing failure {}", e); } - let result = self - .rt - .block_on(websocket_driver_handle) + self.rt + .block_on(driver_handle) .map_err(|e| Error::ClientTerminationFailed(Arc::new(e)))?; debug!("previous client successfully shutdown"); - result.map_err(Error::ClientCompletionFailed) + Ok(()) } /// Try to resubscribe to events - fn try_resubscribe(&mut self) -> Result<(), Error> { + fn try_resubscribe(&mut self) -> Result<()> { warn!("trying to resubscribe to events"); self.subscribe() } + /// Attempt to restart the WebSocket client at most a given number of times. + fn restart(&mut self, max_retries: usize, delay: Duration) { + for _ in 0..max_retries { + std::thread::sleep(delay); + + // Try to reconnect + if let Err(e) = self.try_reconnect() { + println!("error on reconnecting: {}", e); + continue; + } + + // Try to resubscribe + if let Err(e) = self.try_resubscribe() { + println!("error on reconnecting: {}", e); + continue; + } + + return; + } + } + /// Event monitor loop pub fn run(mut self) { info!(chain.id = %self.chain_id, "starting event monitor"); + let rt = self.rt.clone(); + loop { - match self.collect_events() { + let result = rt.block_on(async { + tokio::select! { + Some(event) = self.subscriptions.next() => { + event + .map_err(Error::NextEventBatchFailed) + .and_then(|e| self.collect_events(e)) + }, + Some(e) = self.rx_err.recv() => Err(Error::WebSocketDriver(e)), + } + }); + + match result { Ok(batches) => self.process_batches(batches).unwrap_or_else(|e| { error!("failed to process event batch: {}", e); }), Err(e) => { error!("failed to collect events: {}", e); - // Try to reconnect - self.try_reconnect().unwrap_or_else(|e| { - error!("error on reconnecting: {}", e); - }); - - // Try to resubscribe - self.try_resubscribe().unwrap_or_else(|e| { - error!("error on reconnecting: {}", e); - }); + // Restart the event monitor + self.restart(DEFAULT_RETRIES, DEFAULT_DELAY); } } } } /// Collect the IBC events from the subscriptions - fn process_batches(&self, batches: Vec) -> Result<(), Error> { + fn process_batches(&self, batches: Vec) -> Result<()> { for batch in batches { self.tx_batch - .send(batch) + .send(Ok(batch)) .map_err(|_| Error::ChannelSendFailed)?; } @@ -238,25 +299,20 @@ impl EventMonitor { } /// Collect the IBC events from the subscriptions - fn collect_events(&mut self) -> Result, Error> { - if let Some(event) = self.rt.block_on(self.subscriptions.next()) { - let event = event.map_err(Error::NextEventBatchFailed)?; - let ibc_events = crate::event::rpc::get_all_events(&self.chain_id, event) - .map_err(Error::CollectEventsFailed)?; - - let events_by_height = ibc_events.into_iter().into_group_map(); - let batches = events_by_height - .into_iter() - .map(|(height, events)| EventBatch { - chain_id: self.chain_id.clone(), - height, - events, - }) - .collect(); - - Ok(batches) - } else { - Ok(vec![]) - } + fn collect_events(&mut self, event: RpcEvent) -> Result> { + let ibc_events = crate::event::rpc::get_all_events(&self.chain_id, event) + .map_err(Error::CollectEventsFailed)?; + + let events_by_height = ibc_events.into_iter().into_group_map(); + let batches = events_by_height + .into_iter() + .map(|(height, events)| EventBatch { + chain_id: self.chain_id.clone(), + height, + events, + }) + .collect(); + + Ok(batches) } } diff --git a/relayer/src/link.rs b/relayer/src/link.rs index 7c00e55e7e..fd7e802315 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -31,13 +31,16 @@ use ibc_proto::ibc::core::channel::v1::{ QueryPacketCommitmentsRequest, QueryUnreceivedAcksRequest, QueryUnreceivedPacketsRequest, }; -use crate::channel::{Channel, ChannelError, ChannelSide}; use crate::connection::ConnectionError; use crate::error::Error; use crate::event::monitor::EventBatch; use crate::foreign_client::{ForeignClient, ForeignClientError}; use crate::relay::MAX_ITER; use crate::{chain::handle::ChainHandle, transfer::PacketError}; +use crate::{ + channel::{Channel, ChannelError, ChannelSide}, + event::monitor::UnwrapOrClone, +}; use ibc::events::VecIbcEvents; #[derive(Debug, Error)] @@ -1473,7 +1476,13 @@ impl Link { // Input new events to the relay path, and schedule any batch associated with them if let Ok(batch) = events_a.try_recv() { - self.a_to_b.update_schedule(batch.unwrap_or_clone())?; + let batch = batch.unwrap_or_clone(); + match batch { + Ok(batch) => self.a_to_b.update_schedule(batch)?, + Err(e) => { + dbg!(e); + } + } } // Refresh the scheduled batches and execute any outstanding ones. @@ -1481,7 +1490,13 @@ impl Link { self.a_to_b.execute_schedule()?; if let Ok(batch) = events_b.try_recv() { - self.b_to_a.update_schedule(batch.unwrap_or_clone())?; + let batch = batch.unwrap_or_clone(); + match batch { + Ok(batch) => self.b_to_a.update_schedule(batch)?, + Err(e) => { + dbg!(e); + } + } } self.b_to_a.refresh_schedule()?; diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 4421bb2563..f1d3b346c8 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -12,7 +12,9 @@ use tracing::{debug, error, error_span, info, trace, warn}; use ibc::events::VecIbcEvents; use ibc::ics02_client::client_state::{ClientState, IdentifiedAnyClientState}; use ibc::ics02_client::events::UpdateClient; +use ibc::ics03_connection::connection::IdentifiedConnectionEnd; use ibc::ics04_channel::channel::IdentifiedChannelEnd; +use ibc::ics04_channel::events::Attributes; use ibc::ics24_host::identifier::ClientId; use ibc::{ events::IbcEvent, @@ -30,12 +32,10 @@ use ibc_proto::ibc::core::channel::v1::QueryChannelsRequest; use crate::{ chain::handle::ChainHandle, - event::monitor::EventBatch, + event::monitor::{EventBatch, UnwrapOrClone}, foreign_client::{ForeignClient, ForeignClientError, MisbehaviourResults}, link::{Link, LinkParameters}, }; -use ibc::ics03_connection::connection::IdentifiedConnectionEnd; -use ibc::ics04_channel::events::Attributes; /// A command for a [`Worker`]. pub enum WorkerCmd { @@ -251,11 +251,23 @@ impl Supervisor { loop { for batch in subscription_a.try_iter() { - self.process_batch(self.chains.a.clone(), batch.unwrap_or_clone())?; + let batch = batch.unwrap_or_clone(); + match batch { + Ok(batch) => self.process_batch(self.chains.a.clone(), batch)?, + Err(e) => { + dbg!(e); + } + } } for batch in subscription_b.try_iter() { - self.process_batch(self.chains.b.clone(), batch.unwrap_or_clone())?; + let batch = batch.unwrap_or_clone(); + match batch { + Ok(batch) => self.process_batch(self.chains.b.clone(), batch)?, + Err(e) => { + dbg!(e); + } + } } std::thread::sleep(Duration::from_millis(600)); From e2c8085d0fa420ff576e8a8c4ae4f6dfbea5332d Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 6 May 2021 09:19:54 +0200 Subject: [PATCH 02/13] Prevent worker from exiting on error --- relayer/src/supervisor.rs | 74 ++++++++++++++++++++++++++------------- 1 file changed, 50 insertions(+), 24 deletions(-) diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index f1d3b346c8..bd3bb2b554 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -251,22 +251,36 @@ impl Supervisor { loop { for batch in subscription_a.try_iter() { - let batch = batch.unwrap_or_clone(); - match batch { - Ok(batch) => self.process_batch(self.chains.a.clone(), batch)?, + let result = batch + .unwrap_or_clone() + .map_err(Into::into) + .and_then(|batch| self.process_batch(self.chains.a.clone(), batch)); + + match result { + Ok(()) => trace!("[{}] batch processing complete", self.chains.a.id()), Err(e) => { - dbg!(e); + error!( + "[{}] error during batch processing: {}", + self.chains.a.id(), + e + ); } } } for batch in subscription_b.try_iter() { - let batch = batch.unwrap_or_clone(); - match batch { - Ok(batch) => self.process_batch(self.chains.b.clone(), batch)?, - Err(e) => { - dbg!(e); - } + let result = batch + .unwrap_or_clone() + .map_err(Into::into) + .and_then(|batch| self.process_batch(self.chains.b.clone(), batch)); + + match result { + Ok(()) => trace!("[{}] batch processing complete", self.chains.b.id()), + Err(e) => error!( + "[{}] error during batch processing: {}", + self.chains.b.id(), + e + ), } } @@ -415,7 +429,7 @@ impl Worker { /// Run the worker event loop. fn run(self, object: Object) { - let span = error_span!("worker loop", worker = %self); + let span = error_span!("worker loop", worker = %object.short_name()); let _guard = span.enter(); let result = match object { @@ -467,6 +481,7 @@ impl Worker { "running client worker & initial misbehaviour detection for {}", client ); + // initial check for evidence of misbehaviour for all updates let skip_misbehaviour = self.run_client_misbehaviour(&client, None); @@ -477,23 +492,23 @@ impl Worker { loop { thread::sleep(Duration::from_millis(600)); + // Run client refresh, exit only if expired or frozen - if let Err(ForeignClientError::ExpiredOrFrozen(client_id, chain_id)) = client.refresh() - { - return Err(Box::new(ForeignClientError::ExpiredOrFrozen( - client_id, chain_id, - ))); + if let Err(e @ ForeignClientError::ExpiredOrFrozen(..)) = client.refresh() { + error!("failed to refresh client '{}': {}", client, e); + continue; } if skip_misbehaviour { continue; } + if let Ok(WorkerCmd::IbcEvents { batch }) = self.rx.try_recv() { - trace!("[{}] client worker receives batch {:?}", client, batch); + trace!("client '{}' worker receives batch {:?}", client, batch); for event in batch.events { if let IbcEvent::UpdateClient(update) = event { - debug!("[{}] client updated", client); + debug!("client '{}' updated", client); // Run misbehaviour. If evidence submitted the loop will exit in next // iteration with frozen client self.run_client_misbehaviour(&client, Some(update)); @@ -520,24 +535,35 @@ impl Worker { } loop { + thread::sleep(Duration::from_millis(200)); + if let Ok(cmd) = self.rx.try_recv() { - match cmd { + let result = match cmd { WorkerCmd::IbcEvents { batch } => { // Update scheduled batches. - link.a_to_b.update_schedule(batch)?; + link.a_to_b.update_schedule(batch) } WorkerCmd::NewBlock { height, new_block: _, - } => link.a_to_b.clear_packets(height)?, + } => link.a_to_b.clear_packets(height), + }; + + if let Err(e) = result { + error!("{}", e); + continue; } } // Refresh the scheduled batches and execute any outstanding ones. - link.a_to_b.refresh_schedule()?; - link.a_to_b.execute_schedule()?; + let result = link + .a_to_b + .refresh_schedule() + .and_then(|_| link.a_to_b.execute_schedule()); - thread::sleep(Duration::from_millis(100)) + if let Err(e) = result { + error!("{}", e); + } } } } From 3161adef18ebc6180355da5959a80f95cf0602c1 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 6 May 2021 09:20:15 +0200 Subject: [PATCH 03/13] Improve debug output a little --- relayer-cli/src/commands/start_multi.rs | 2 +- relayer/src/foreign_client.rs | 3 ++- relayer/src/supervisor.rs | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/relayer-cli/src/commands/start_multi.rs b/relayer-cli/src/commands/start_multi.rs index c1cb8b722c..4a70eea78e 100644 --- a/relayer-cli/src/commands/start_multi.rs +++ b/relayer-cli/src/commands/start_multi.rs @@ -116,7 +116,7 @@ fn start_all_connections(config: &Config) -> Result { }); match result { - Ok(Ok(())) => Ok(Output::success_msg("ok")), + Ok(Ok(())) => Ok(Output::success_msg("supervisor shutdown")), Ok(Err(e)) => Err(e), Err(e) => std::panic::resume_unwind(e), } diff --git a/relayer/src/foreign_client.rs b/relayer/src/foreign_client.rs index 3fb86ee69b..1403ca9664 100644 --- a/relayer/src/foreign_client.rs +++ b/relayer/src/foreign_client.rs @@ -480,9 +480,10 @@ impl ForeignClient { })?; debug!( - "[{}] MsgUpdateAnyClient for target {:?} trusted {:?}", + "[{}] MsgUpdateAnyClient for target height {} and trusted height {}", self, target_height, trusted_height ); + let new_msg = MsgUpdateAnyClient { client_id: self.id.clone(), header, diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index bd3bb2b554..a1734436e9 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -584,7 +584,7 @@ pub struct Client { impl Client { pub fn short_name(&self) -> String { format!( - "{} -> {}:{}", + "{}->{}:{}", self.src_chain_id, self.dst_chain_id, self.dst_client_id ) } @@ -609,7 +609,7 @@ pub struct UnidirectionalChannelPath { impl UnidirectionalChannelPath { pub fn short_name(&self) -> String { format!( - "{}/{}:{} -> {}", + "{}/{}:{}->{}", self.src_channel_id, self.src_port_id, self.src_chain_id, self.dst_chain_id, ) } From 31c992a46e40f11e0dfb6044f5520fdd0ca4bb3d Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 6 May 2021 09:20:31 +0200 Subject: [PATCH 04/13] Small refactor --- relayer/src/supervisor.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index a1734436e9..06ba86f86f 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -449,24 +449,22 @@ impl Worker { client: &ForeignClient, update: Option, ) -> bool { - let mut skip_misbehaviour = false; - let res = client.detect_misbehaviour_and_submit_evidence(update); - match res { - MisbehaviourResults::ValidClient => {} + match client.detect_misbehaviour_and_submit_evidence(update) { + MisbehaviourResults::ValidClient => false, MisbehaviourResults::VerificationError => { // can retry in next call + false } - MisbehaviourResults::EvidenceSubmitted(_events) => { + MisbehaviourResults::EvidenceSubmitted(_) => { // if evidence was submitted successfully then exit - skip_misbehaviour = true; + true } MisbehaviourResults::CannotExecute => { // skip misbehaviour checking if chain does not have support for it (i.e. client // update event does not include the header) - skip_misbehaviour = true; + true } - }; - skip_misbehaviour + } } /// Run the event loop for events associated with a [`Client`]. From 6534c88cad4dfecea424f63fe766768d953af964 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 6 May 2021 14:51:08 +0200 Subject: [PATCH 05/13] Gracefully handle runtime failing to start rather than unwrapping --- relayer/src/chain/runtime.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/relayer/src/chain/runtime.rs b/relayer/src/chain/runtime.rs index b73adafb1d..e97cd9072c 100644 --- a/relayer/src/chain/runtime.rs +++ b/relayer/src/chain/runtime.rs @@ -122,7 +122,12 @@ impl ChainRuntime { let handle = chain_runtime.handle(); // Spawn the runtime & return - let thread = thread::spawn(move || chain_runtime.run().unwrap()); + let id = handle.id(); + let thread = thread::spawn(move || { + if let Err(e) = chain_runtime.run() { + error!("failed to start runtime for chain '{}': {}", id, e); + } + }); (handle, thread) } From c1d8b34a082feb8e345653ccdfd8d74dad2f8217 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 6 May 2021 14:54:16 +0200 Subject: [PATCH 06/13] Update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 98f1fcb436..33c663cf85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ Jongwhan Lee (@leejw51crypto) ([#878]). - [ibc-relayer] - Change the default for client creation to allow governance recovery in case of expiration or misbehaviour ([#785]) + - The relayer is now more resilient to a node not being up or going down, and will attempt to reconnect ([#871]) ### BUG FIXES @@ -54,6 +55,7 @@ Jongwhan Lee (@leejw51crypto) ([#878]). [#861]: https://github.com/informalsystems/ibc-rs/issues/861 [#863]: https://github.com/informalsystems/ibc-rs/issues/863 [#869]: https://github.com/informalsystems/ibc-rs/issues/869 +[#871]: https://github.com/informalsystems/ibc-rs/issues/871 [#878]: https://github.com/informalsystems/ibc-rs/issues/878 From d75bc22da7c732c735217f01991d64392fb2c677 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 6 May 2021 15:48:28 +0200 Subject: [PATCH 07/13] Replace println! with error! --- relayer/src/event/monitor.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 849ee40858..7393a8db03 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -111,7 +111,7 @@ async fn run_driver( ) { if let Err(e) = driver.run().await { if tx.send(e).is_err() { - println!("failed to relay driver error to event monitor"); + error!("failed to relay driver error to event monitor"); } } } @@ -241,13 +241,13 @@ impl EventMonitor { // Try to reconnect if let Err(e) = self.try_reconnect() { - println!("error on reconnecting: {}", e); + error!("error when reconnecting: {}", e); continue; } // Try to resubscribe if let Err(e) = self.try_resubscribe() { - println!("error on reconnecting: {}", e); + error!("error when reconnecting: {}", e); continue; } From 493191b7c549871ad46f87b4006820d8b48008a4 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 6 May 2021 15:55:50 +0200 Subject: [PATCH 08/13] Do not crash if chain runtime fails to initialize or subscribe to events --- relayer/src/supervisor.rs | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index ddea0932f8..4a3cde8af9 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -205,7 +205,7 @@ impl Supervisor { collected } - fn spawn_workers(&mut self) -> Result<(), BoxError> { + fn spawn_workers(&mut self) { let req = QueryChannelsRequest { pagination: ibc_proto::cosmos::base::query::pagination::all(), }; @@ -225,7 +225,14 @@ impl Supervisor { continue; } }; - let channels = chain.query_channels(req.clone())?; + + let channels = match chain.query_channels(req.clone()) { + Ok(channels) => channels, + Err(e) => { + error!("failed to query channels from {}: {}", chain_id, e); + continue; + } + }; for channel in channels { match self.spawn_workers_for_channel(chain.clone(), channel.clone()) { @@ -243,8 +250,6 @@ impl Supervisor { } } } - - Ok(()) } /// Spawns all the [`Worker`]s that will handle a given channel for a given source chain. @@ -330,12 +335,27 @@ impl Supervisor { let mut subscriptions = Vec::with_capacity(self.config.chains.len()); for chain_config in &self.config.chains { - let chain = self.registry.get_or_spawn(&chain_config.id)?; - let subscription = chain.subscribe()?; - subscriptions.push((chain, subscription)); + let chain = match self.registry.get_or_spawn(&chain_config.id) { + Ok(chain) => chain, + Err(e) => { + error!( + "failed to spawn chain runtime for {}: {}", + chain_config.id, e + ); + continue; + } + }; + + match chain.subscribe() { + Ok(subscription) => subscriptions.push((chain, subscription)), + Err(e) => error!( + "failed to subscribe to events of {}: {}", + chain_config.id, e + ), + } } - self.spawn_workers()?; + self.spawn_workers(); loop { match recv_multiple(&subscriptions) { From 747bdd4003e7495f40e0c66c878c455b5e0d7a30 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 6 May 2021 16:12:37 +0200 Subject: [PATCH 09/13] Remove trace! for decreased verbosity --- relayer/src/supervisor.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 4a3cde8af9..95ba719c6d 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -365,9 +365,8 @@ impl Supervisor { .map_err(Into::into) .and_then(|batch| self.process_batch(chain.clone(), batch)); - match result { - Ok(()) => trace!("[{}] batch processing complete", chain.id()), - Err(e) => error!("[{}] error during batch processing: {}", chain.id(), e), + if let Err(e) = result { + error!("[{}] error during batch processing: {}", chain.id(), e); } } Err(e) => error!("error when waiting for events: {}", e), From 7b094a1210068aee9b0a17033546767d215e1cea Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 6 May 2021 17:31:32 +0200 Subject: [PATCH 10/13] Retry WebSocket connection with exponential backoff --- Cargo.lock | 7 ++++++ relayer/Cargo.toml | 1 + relayer/src/event/monitor.rs | 47 ++++++++++++++++++++++++++++-------- 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2b7aa1a5c1..160b60845d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1247,6 +1247,7 @@ dependencies = [ "k256", "prost", "prost-types", + "retry", "ripemd160", "serde", "serde_cbor", @@ -2024,6 +2025,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "retry" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0ee4a654b43dd7e3768be7a1c0fc20e90f0a84b72a60ffb6c11e1cae2545c2e" + [[package]] name = "ring" version = "0.16.20" diff --git a/relayer/Cargo.toml b/relayer/Cargo.toml index 07e8d9eebb..edf0ec022d 100644 --- a/relayer/Cargo.toml +++ b/relayer/Cargo.toml @@ -50,6 +50,7 @@ dyn-clonable = "0.9.0" tonic = "0.4" dirs-next = "2.0.0" dyn-clone = "1.0.3" +retry = { version = "1.2.1", default-features = false } [dependencies.tendermint] version = "=0.19.0" diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 7393a8db03..9a6837b555 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -18,8 +18,11 @@ use tendermint_rpc::{ use ibc::{events::IbcEvent, ics02_client::height::Height, ics24_host::identifier::ChainId}; -const DEFAULT_RETRIES: usize = 100; -const DEFAULT_DELAY: Duration = Duration::from_secs(5); +const MAX_RETRIES: usize = 1000; +const MAX_RETRY_DELAY: Duration = Duration::from_secs(5 * 60); +const INITIAL_RETRY_DELAY: Duration = Duration::from_secs(2); +const DEFAULT_RETRY_STRATEGY: RetryStrategy = + RetryStrategy::new(INITIAL_RETRY_DELAY, MAX_RETRY_DELAY, MAX_RETRIES); #[derive(Debug, Clone, Error)] pub enum Error { @@ -235,24 +238,25 @@ impl EventMonitor { } /// Attempt to restart the WebSocket client at most a given number of times. - fn restart(&mut self, max_retries: usize, delay: Duration) { - for _ in 0..max_retries { - std::thread::sleep(delay); + fn restart(&mut self, strategy: RetryStrategy) { + use retry::{retry, OperationResult as TryResult}; + retry(strategy.iter(), || { // Try to reconnect if let Err(e) = self.try_reconnect() { error!("error when reconnecting: {}", e); - continue; + return TryResult::Retry(()); } // Try to resubscribe if let Err(e) = self.try_resubscribe() { error!("error when reconnecting: {}", e); - continue; + return TryResult::Retry(()); } - return; - } + TryResult::Ok(()) + }) + .unwrap_or_else(|_| error!("failed to reconnect after {} retries", strategy.max_retries)); } /// Event monitor loop @@ -281,7 +285,7 @@ impl EventMonitor { error!("failed to collect events: {}", e); // Restart the event monitor - self.restart(DEFAULT_RETRIES, DEFAULT_DELAY); + self.restart(DEFAULT_RETRY_STRATEGY); } } } @@ -316,3 +320,26 @@ impl EventMonitor { Ok(batches) } } + +struct RetryStrategy { + initial_delay: Duration, + max_delay: Duration, + max_retries: usize, +} + +impl RetryStrategy { + const fn new(initial_delay: Duration, max_delay: Duration, max_retries: usize) -> Self { + Self { + initial_delay, + max_delay, + max_retries, + } + } + + pub fn iter(&self) -> impl Iterator { + let max_delay = self.max_delay; + retry::delay::Exponential::from(self.initial_delay) + .take(self.max_retries) + .map(move |delay| delay.min(max_delay)) + } +} From 96597c2e5f232bda46dc92118c636ae3d2094c11 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 6 May 2021 17:38:09 +0200 Subject: [PATCH 11/13] Simplify types --- relayer-cli/src/commands/listen.rs | 11 ++--------- relayer/src/chain/handle.rs | 4 ++-- relayer/src/chain/runtime.rs | 10 +++++----- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/relayer-cli/src/commands/listen.rs b/relayer-cli/src/commands/listen.rs index 32326194cc..22e2b38f12 100644 --- a/relayer-cli/src/commands/listen.rs +++ b/relayer-cli/src/commands/listen.rs @@ -1,7 +1,6 @@ use std::{ops::Deref, sync::Arc, thread}; use abscissa_core::{application::fatal_error, error::BoxError, Command, Options, Runnable}; -use crossbeam_channel as channel; use itertools::Itertools; use tokio::runtime::Runtime as TokioRuntime; @@ -10,7 +9,7 @@ use tendermint_rpc::query::{EventType, Query}; use ibc::ics24_host::identifier::ChainId; use ibc_relayer::{ config::ChainConfig, - event::monitor::{Error as EventMonitorError, EventBatch, EventMonitor}, + event::monitor::{EventMonitor, EventReceiver}, }; use crate::prelude::*; @@ -76,13 +75,7 @@ fn subscribe( chain_config: &ChainConfig, queries: Vec, rt: Arc, -) -> Result< - ( - EventMonitor, - channel::Receiver>, - ), - BoxError, -> { +) -> Result<(EventMonitor, EventReceiver), BoxError> { let (mut event_monitor, rx) = EventMonitor::new( chain_config.id.clone(), chain_config.websocket_addr.clone(), diff --git a/relayer/src/chain/handle.rs b/relayer/src/chain/handle.rs index 3b582044fb..ef8befd12d 100644 --- a/relayer/src/chain/handle.rs +++ b/relayer/src/chain/handle.rs @@ -37,13 +37,13 @@ pub use prod::ProdChainHandle; use crate::{ connection::ConnectionMsgType, error::Error, - event::monitor::{Error as EventMonitorError, EventBatch}, + event::monitor::{EventBatch, Result as MonitorResult}, keyring::KeyEntry, }; mod prod; -pub type Subscription = channel::Receiver>>; +pub type Subscription = channel::Receiver>>; pub type ReplyTo = channel::Sender>; pub type Reply = channel::Receiver>; diff --git a/relayer/src/chain/runtime.rs b/relayer/src/chain/runtime.rs index e97cd9072c..220179da3f 100644 --- a/relayer/src/chain/runtime.rs +++ b/relayer/src/chain/runtime.rs @@ -43,7 +43,7 @@ use crate::{ error::{Error, Kind}, event::{ bus::EventBus, - monitor::{Error as EventMonitorError, EventBatch}, + monitor::{EventBatch, EventReceiver, Result as MonitorResult}, }, keyring::KeyEntry, light_client::LightClient, @@ -72,10 +72,10 @@ pub struct ChainRuntime { request_receiver: channel::Receiver, /// An event bus, for broadcasting events that this runtime receives (via `event_receiver`) to subscribers - event_bus: EventBus>>, + event_bus: EventBus>>, /// Receiver channel from the event bus - event_receiver: channel::Receiver>, + event_receiver: EventReceiver, /// A handle to the light client light_client: Box>, @@ -113,7 +113,7 @@ impl ChainRuntime { fn init( chain: C, light_client: Box>, - event_receiver: channel::Receiver>, + event_receiver: EventReceiver, rt: Arc, ) -> (Box, thread::JoinHandle<()>) { let chain_runtime = Self::new(chain, light_client, event_receiver, rt); @@ -136,7 +136,7 @@ impl ChainRuntime { fn new( chain: C, light_client: Box>, - event_receiver: channel::Receiver>, + event_receiver: EventReceiver, rt: Arc, ) -> Self { let (request_sender, request_receiver) = channel::unbounded::(); From 31fff98ced3d6a09cb518152e71ac43d7d0d4662 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 6 May 2021 18:32:57 +0200 Subject: [PATCH 12/13] Extract retry strategy helpers to `ibc_relayer::util::retry` module --- relayer/src/event/monitor.rs | 53 ++++++-------- relayer/src/util.rs | 1 + relayer/src/util/retry.rs | 134 +++++++++++++++++++++++++++++++++++ 3 files changed, 155 insertions(+), 33 deletions(-) create mode 100644 relayer/src/util/retry.rs diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 9a6837b555..08f2a230ae 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -4,6 +4,7 @@ use crossbeam_channel as channel; use futures::stream::StreamExt; use futures::{stream::select_all, Stream}; use itertools::Itertools; +use retry::delay::Exponential; use thiserror::Error; use tokio::task::JoinHandle; use tokio::{runtime::Runtime as TokioRuntime, sync::mpsc}; @@ -18,11 +19,11 @@ use tendermint_rpc::{ use ibc::{events::IbcEvent, ics02_client::height::Height, ics24_host::identifier::ChainId}; +use crate::util::retry::Clamped; + const MAX_RETRIES: usize = 1000; const MAX_RETRY_DELAY: Duration = Duration::from_secs(5 * 60); const INITIAL_RETRY_DELAY: Duration = Duration::from_secs(2); -const DEFAULT_RETRY_STRATEGY: RetryStrategy = - RetryStrategy::new(INITIAL_RETRY_DELAY, MAX_RETRY_DELAY, MAX_RETRIES); #[derive(Debug, Clone, Error)] pub enum Error { @@ -237,26 +238,35 @@ impl EventMonitor { self.subscribe() } - /// Attempt to restart the WebSocket client at most a given number of times. - fn restart(&mut self, strategy: RetryStrategy) { - use retry::{retry, OperationResult as TryResult}; + /// Attempt to restart the WebSocket client using the given retry stragegy. + /// + /// See the [`retry`](https://docs.rs/retry) crate and the + /// [`crate::util::retry`] module for more information. + fn restart(&mut self) { + use retry::{retry_with_index, OperationResult as TryResult}; + + let strategy = Clamped::new( + Exponential::from_millis_with_factor(INITIAL_RETRY_DELAY.as_millis() as u64, 1.1), + MAX_RETRY_DELAY, + MAX_RETRIES, + ); - retry(strategy.iter(), || { + retry_with_index(strategy.iter(), |index| { // Try to reconnect if let Err(e) = self.try_reconnect() { error!("error when reconnecting: {}", e); - return TryResult::Retry(()); + return TryResult::Retry(index); } // Try to resubscribe if let Err(e) = self.try_resubscribe() { error!("error when reconnecting: {}", e); - return TryResult::Retry(()); + return TryResult::Retry(index); } TryResult::Ok(()) }) - .unwrap_or_else(|_| error!("failed to reconnect after {} retries", strategy.max_retries)); + .unwrap_or_else(|retries| error!("failed to reconnect after {} retries", retries)); } /// Event monitor loop @@ -285,7 +295,7 @@ impl EventMonitor { error!("failed to collect events: {}", e); // Restart the event monitor - self.restart(DEFAULT_RETRY_STRATEGY); + self.restart(); } } } @@ -320,26 +330,3 @@ impl EventMonitor { Ok(batches) } } - -struct RetryStrategy { - initial_delay: Duration, - max_delay: Duration, - max_retries: usize, -} - -impl RetryStrategy { - const fn new(initial_delay: Duration, max_delay: Duration, max_retries: usize) -> Self { - Self { - initial_delay, - max_delay, - max_retries, - } - } - - pub fn iter(&self) -> impl Iterator { - let max_delay = self.max_delay; - retry::delay::Exponential::from(self.initial_delay) - .take(self.max_retries) - .map(move |delay| delay.min(max_delay)) - } -} diff --git a/relayer/src/util.rs b/relayer/src/util.rs index 457bdcebaf..7955e5c215 100644 --- a/relayer/src/util.rs +++ b/relayer/src/util.rs @@ -2,4 +2,5 @@ mod block_on; pub use block_on::block_on; pub mod iter; +pub mod retry; pub mod sled; diff --git a/relayer/src/util/retry.rs b/relayer/src/util/retry.rs new file mode 100644 index 0000000000..9148375c6e --- /dev/null +++ b/relayer/src/util/retry.rs @@ -0,0 +1,134 @@ +use std::time::Duration; + +#[derive(Copy, Clone, Debug)] +pub struct ConstantGrowth { + delay: Duration, + incr: Duration, +} + +impl ConstantGrowth { + pub const fn new(delay: Duration, incr: Duration) -> Self { + Self { delay, incr } + } + + pub const fn clamp(self, max_delay: Duration, max_retries: usize) -> Clamped { + Clamped::new(self, max_delay, max_retries) + } +} + +impl From for ConstantGrowth { + fn from(delay: Duration) -> Self { + Self::new(delay, Duration::from_secs(1)) + } +} + +impl Iterator for ConstantGrowth { + type Item = Duration; + + fn next(&mut self) -> Option { + let delay = self.delay; + + if let Some(next) = self.delay.checked_add(self.incr) { + self.delay = next; + } + + Some(delay) + } +} + +#[derive(Copy, Clone, Debug)] +pub struct Clamped { + pub strategy: S, + pub max_delay: Duration, + pub max_retries: usize, +} + +impl Clamped { + pub const fn new(strategy: S, max_delay: Duration, max_retries: usize) -> Self { + Self { + strategy, + max_delay, + max_retries, + } + } + + pub fn iter(self) -> impl Iterator + where + S: Iterator, + { + let Self { + strategy, + max_retries, + max_delay, + } = self; + + strategy + .take(max_retries) + .map(move |delay| delay.min(max_delay)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const CONST_STRATEGY: ConstantGrowth = + ConstantGrowth::new(Duration::from_secs(1), Duration::from_millis(500)); + + #[test] + fn const_growth_no_clamp() { + let delays = CONST_STRATEGY.take(10).collect::>(); + assert_eq!( + delays, + vec![ + Duration::from_millis(1000), + Duration::from_millis(1500), + Duration::from_millis(2000), + Duration::from_millis(2500), + Duration::from_millis(3000), + Duration::from_millis(3500), + Duration::from_millis(4000), + Duration::from_millis(4500), + Duration::from_millis(5000), + Duration::from_millis(5500) + ] + ); + } + + #[test] + fn clamped_const_growth_max_delay() { + let strategy = CONST_STRATEGY.clamp(Duration::from_secs(10), 10); + let delays = strategy.iter().collect::>(); + assert_eq!( + delays, + vec![ + Duration::from_millis(1000), + Duration::from_millis(1500), + Duration::from_millis(2000), + Duration::from_millis(2500), + Duration::from_millis(3000), + Duration::from_millis(3500), + Duration::from_millis(4000), + Duration::from_millis(4500), + Duration::from_millis(5000), + Duration::from_millis(5500) + ] + ); + } + + #[test] + fn clamped_const_growth_max_retries() { + let strategy = CONST_STRATEGY.clamp(Duration::from_secs(10000), 5); + let delays = strategy.iter().collect::>(); + assert_eq!( + delays, + vec![ + Duration::from_millis(1000), + Duration::from_millis(1500), + Duration::from_millis(2000), + Duration::from_millis(2500), + Duration::from_millis(3000) + ] + ); + } +} From de4e2b1154559fbb3892c1b1324a57a561437941 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 6 May 2021 18:39:19 +0200 Subject: [PATCH 13/13] Use fibonacci increment backoff strategy --- relayer/src/event/monitor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 08f2a230ae..60cf6bfaed 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -4,7 +4,7 @@ use crossbeam_channel as channel; use futures::stream::StreamExt; use futures::{stream::select_all, Stream}; use itertools::Itertools; -use retry::delay::Exponential; +use retry::delay::Fibonacci; use thiserror::Error; use tokio::task::JoinHandle; use tokio::{runtime::Runtime as TokioRuntime, sync::mpsc}; @@ -246,7 +246,7 @@ impl EventMonitor { use retry::{retry_with_index, OperationResult as TryResult}; let strategy = Clamped::new( - Exponential::from_millis_with_factor(INITIAL_RETRY_DELAY.as_millis() as u64, 1.1), + Fibonacci::from(INITIAL_RETRY_DELAY), MAX_RETRY_DELAY, MAX_RETRIES, );