diff --git a/CHANGELOG.md b/CHANGELOG.md index 640b385808..e04967689a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,11 +26,14 @@ - [ibc-relayer] - Fixed: Hermes does not clear packets on start ([#1200]) + - Recover from missed RPC events after WebSocket subscription is closed by Tendermint ([#1196]) + [#1094]: https://github.com/informalsystems/ibc-rs/issues/1094 [#1114]: https://github.com/informalsystems/ibc-rs/issues/1114 [#1192]: https://github.com/informalsystems/ibc-rs/issues/1192 [#1194]: https://github.com/informalsystems/ibc-rs/issues/1194 +[#1196]: https://github.com/informalsystems/ibc-rs/issues/1196 [#1198]: https://github.com/informalsystems/ibc-rs/issues/1198 [#1200]: https://github.com/informalsystems/ibc-rs/issues/1200 diff --git a/Cargo.lock b/Cargo.lock index c664af02bf..ada45915e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -556,9 +556,9 @@ dependencies = [ [[package]] name = "crypto-mac" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25fab6889090c8133f3deb8f73ba3c65a7f456f66436fc012a1b1e272b1e103e" +checksum = "b1d1a86f49236c215f271d40892d5fc950490551400b02ef360692c29815c714" dependencies = [ "generic-array", "subtle", @@ -728,9 +728,9 @@ checksum = "ee2626afccd7561a06cf1367e2950c4718ea04565e20fb5029b6c7d8ad09abcf" [[package]] name = "ecdsa" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05cb0ed2d2ce37766ac86c05f66973ace8c51f7f1533bedce8fb79e2b54b3f14" +checksum = "713c32426287891008edb98f8b5c6abb2130aa043c93a818728fcda78606f274" dependencies = [ "der", "elliptic-curve", @@ -740,9 +740,9 @@ dependencies = [ [[package]] name = "ed25519" -version = "1.1.1" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d0860415b12243916284c67a9be413e044ee6668247b99ba26d94b2bc06c8f6" +checksum = "4620d40f6d2601794401d6dd95a5cf69b6c157852539470eeda433a99b3c0efc" dependencies = [ "serde", "signature", @@ -771,9 +771,9 @@ checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" [[package]] name = "elliptic-curve" -version = "0.10.4" +version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83e5c176479da93a0983f0a6fdc3c1b8e7d5be0d7fe3fe05a99f15b96582b9a8" +checksum = "069397e10739989e400628cbc0556a817a8a64119d7a2315767f4456e1332c23" dependencies = [ "crypto-bigint", "ff", @@ -1192,7 +1192,7 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b" dependencies = [ - "crypto-mac 0.11.0", + "crypto-mac 0.11.1", "digest", ] @@ -2010,18 +2010,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7509cc106041c40a4518d2af7a61530e1eed0e6285296a3d8c5472806ccc4a4" +checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48c950132583b500556b1efd71d45b319029f2b71518d979fcc208e16b42426f" +checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" dependencies = [ "proc-macro2", "quote", @@ -2042,9 +2042,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkcs8" -version = "0.7.0" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09d156817ae0125e8aa5067710b0db24f0984830614f99875a70aa5e3b74db69" +checksum = "87bb2d5c68b7505a3a89eb2f3583a4d56303863005226c2ef99319930a262be4" dependencies = [ "der", "spki", @@ -2653,9 +2653,9 @@ dependencies = [ [[package]] name = "sha-1" -version = "0.9.6" +version = "0.9.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c4cfa741c5832d0ef7fab46cabed29c2aae926db0b11bb2069edd8db5e64e16" +checksum = "1a0c8611594e2ab4ebbf06ec7cbbf0a99450b8570e96cbf5188b5d5f6ef18d81" dependencies = [ "block-buffer", "cfg-if 1.0.0", @@ -2895,9 +2895,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "subtle" -version = "2.4.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e81da0851ada1f3e9d4312c704aa4f8806f0f9d69faaf8df2f3464b4a9437c2" +checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "subtle-encoding" @@ -3219,9 +3219,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b5220f05bb7de7f3f53c7c065e1199b3172696fe2db9f9c4d8ad9b4ee74c342" +checksum = "4ac2e1d4bd0f75279cfd5a076e0d578bbf02c22b7c39e766c437dd49b3ec43e0" dependencies = [ "tinyvec_macros", ] @@ -3234,9 +3234,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.8.1" +version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98c8b05dc14c75ea83d63dd391100353789f5f24b8b3866542a5e85c8be8e985" +checksum = "c2602b8af3767c285202012822834005f596c811042315fa7e9f5b12b2a43207" dependencies = [ "autocfg", "bytes", @@ -3787,9 +3787,9 @@ checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214" [[package]] name = "zeroize" -version = "1.3.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4756f7db3f7b5574938c3eb1c117038b8e07f95ee6718c0efad4ac21508f1efd" +checksum = "377db0846015f7ae377174787dd452e1c5f5a9050bc6f954911d01f116daa0cd" dependencies = [ "zeroize_derive", ] diff --git a/e2e/run.py b/e2e/run.py index 02e5b98c7a..e62d156c81 100755 --- a/e2e/run.py +++ b/e2e/run.py @@ -16,9 +16,9 @@ def passive_packets( - c: Config, - ibc0: ChainId, ibc1: ChainId, port_id: PortId, - ibc0_channel_id: ChannelId, ibc1_channel_id: ChannelId): + c: Config, + ibc0: ChainId, ibc1: ChainId, port_id: PortId, + ibc0_channel_id: ChannelId, ibc1_channel_id: ChannelId): # 1. create some unreceived acks @@ -27,26 +27,26 @@ def passive_packets( src_channel=ibc0_channel_id, amount=10000, height_offset=1000, number_msgs=2) # hermes tx raw ft-transfer ibc-0 ibc-1 transfer channel-1 10000 1000 -n 2 - packet.packet_send(c, src=ibc1, dst=ibc0 , src_port=port_id, + packet.packet_send(c, src=ibc1, dst=ibc0, src_port=port_id, src_channel=ibc1_channel_id, amount=10000, height_offset=1000, number_msgs=2) sleep(5.0) # hermes tx raw packet-recv ibc-1 ibc-0 transfer channel-0 - packet.packet_recv(c, src=ibc0 , dst=ibc1, + packet.packet_recv(c, src=ibc0, dst=ibc1, src_port=port_id, src_channel=ibc0_channel_id) # hermes tx raw packet-recv ibc-0 ibc-1 transfer channel-1 - packet.packet_recv(c, src=ibc1, dst=ibc0 , + packet.packet_recv(c, src=ibc1, dst=ibc0, src_port=port_id, src_channel=ibc1_channel_id) # 2. create some unreceived packets # hermes tx raw ft-transfer ibc-0 ibc-1 transfer channel-1 10000 1000 -n 3 - packet.packet_send(c, src=ibc1, dst=ibc0 , src_port=port_id, + packet.packet_send(c, src=ibc1, dst=ibc0, src_port=port_id, src_channel=ibc1_channel_id, amount=10000, height_offset=1000, number_msgs=3) # hermes tx raw ft-transfer ibc-1 ibc-0 transfer channel-0 10000 1000 -n 4 - packet.packet_send(c, src=ibc0 , dst=ibc1, src_port=port_id, + packet.packet_send(c, src=ibc0, dst=ibc1, src_port=port_id, src_channel=ibc0_channel_id, amount=10000, height_offset=1000, number_msgs=4) sleep(10.0) @@ -55,7 +55,7 @@ def passive_packets( # hermes query packet unreceived-packets ibc-0 transfer channel-0 unreceived = packet.query_unreceived_packets( - c, chain=ibc0 , port=port_id, channel=ibc0_channel_id) + c, chain=ibc0, port=port_id, channel=ibc0_channel_id) assert (len(unreceived) == 3), (unreceived, "unreceived packet mismatch") @@ -73,7 +73,7 @@ def passive_packets( # hermes query packet unreceived-acks ibc-0 transfer channel-0 unreceived = packet.query_unreceived_acks( - c, chain=ibc0 , port=port_id, channel=ibc0_channel_id) + c, chain=ibc0, port=port_id, channel=ibc0_channel_id) assert (len(unreceived) == 2), (unreceived, "unreceived packet mismatch") @@ -100,28 +100,29 @@ def passive_packets( # hermes query packet unreceived-packets ibc-0 transfer channel-0 unreceived = packet.query_unreceived_packets( - c, chain=ibc0 , port=port_id, channel=ibc0_channel_id) + c, chain=ibc0, port=port_id, channel=ibc0_channel_id) assert (len(unreceived) == 0), (unreceived, "unreceived packets mismatch (expected 0)") # hermes query packet unreceived-acks ibc-0 transfer channel-0 unreceived = packet.query_unreceived_acks( - c, chain=ibc0 , port=port_id, channel=ibc0_channel_id) + c, chain=ibc0, port=port_id, channel=ibc0_channel_id) assert (len(unreceived) == 0), (unreceived, "unreceived acks mismatch (expected 0)") # 7. send some packets # hermes tx raw ft-transfer ibc-0 ibc-1 transfer channel-1 10000 1000 -n 3 - packet.packet_send(c, src=ibc1, dst=ibc0 , src_port=port_id, + packet.packet_send(c, src=ibc1, dst=ibc0, src_port=port_id, src_channel=ibc1_channel_id, amount=10000, height_offset=1000, number_msgs=3) # hermes tx raw ft-transfer ibc-1 ibc-0 transfer channel-0 10000 1000 -n 4 packet.packet_send(c, src=ibc0, dst=ibc1, src_port=port_id, src_channel=ibc0_channel_id, amount=10000, height_offset=1000, number_msgs=4) - sleep(10.0) + sleep(20.0) + # 8. verify that there are no pending packets # hermes query packet unreceived-packets ibc-1 transfer channel-1 unreceived = packet.query_unreceived_packets( @@ -139,14 +140,14 @@ def passive_packets( # hermes query packet unreceived-packets ibc-0 transfer channel-0 unreceived = packet.query_unreceived_packets( - c, chain=ibc0 , port=port_id, channel=ibc0_channel_id) + c, chain=ibc0, port=port_id, channel=ibc0_channel_id) assert (len(unreceived) == 0), (unreceived, "unreceived packets mismatch (expected 0)") # hermes query packet unreceived-acks ibc-0 transfer channel-0 unreceived = packet.query_unreceived_acks( - c, chain=ibc0 , port=port_id, channel=ibc0_channel_id) + c, chain=ibc0, port=port_id, channel=ibc0_channel_id) assert (len(unreceived) == 0), (unreceived, "unreceived acks mismatch (expected 0)") @@ -154,15 +155,15 @@ def passive_packets( proc.kill() -def raw(c: Config, ibc0: ChainId, ibc1: ChainId, port_id: PortId) -> Tuple[ ClientId, ConnectionId, ChannelId, ClientId, ConnectionId, ChannelId]: +def raw(c: Config, ibc0: ChainId, ibc1: ChainId, port_id: PortId) -> Tuple[ClientId, ConnectionId, ChannelId, ClientId, ConnectionId, ChannelId]: ibc0_client_id = client.create_update_query_client(c, ibc0, ibc1) # Allocate first IDs on ibc-1 ibc1_client_id = client.create_update_query_client(c, ibc1, ibc0) ibc1_conn_id = connection.conn_init( - c, ibc1, ibc0 , ibc1_client_id, ibc0_client_id) + c, ibc1, ibc0, ibc1_client_id, ibc0_client_id) ibc1_chan_id = channel.chan_open_init( - c, dst=ibc1, src=ibc0 , dst_conn=ibc1_conn_id) + c, dst=ibc1, src=ibc0, dst_conn=ibc1_conn_id) ibc1_client_id = client.create_update_query_client(c, ibc1, ibc0) @@ -195,6 +196,7 @@ def raw(c: Config, ibc0: ChainId, ibc1: ChainId, port_id: PortId) -> Tuple[ Clie return ibc0_client_id, ibc0_conn_id, ibc0_chan_id, ibc1_client_id, ibc1_conn_id, ibc1_chan_id + def main(): parser = argparse.ArgumentParser( description='Test all relayer commands, end-to-end') @@ -234,33 +236,41 @@ def main(): chains = toml.load(config.config_file)['chains'] - ibc0 = chains[0]['id'] + ibc0 = chains[0]['id'] ibc1 = chains[1]['id'] port_id = PortId('transfer') - ibc0_client_id, ibc0_conn_id, ibc0_chan_id, ibc1_client_id, ibc1_conn_id, ibc1_chan_id = raw(config, ibc0 , ibc1, port_id) + ibc0_client_id, ibc0_conn_id, ibc0_chan_id, ibc1_client_id, ibc1_conn_id, ibc1_chan_id = raw( + config, ibc0, ibc1, port_id) sleep(2.0) passive_packets(config, ibc0, ibc1, port_id, ibc0_chan_id, ibc1_chan_id) sleep(2.0) - connection.passive_connection_init_then_start(config, ibc1, ibc0, ibc1_client_id, ibc0_client_id) + connection.passive_connection_init_then_start( + config, ibc1, ibc0, ibc1_client_id, ibc0_client_id) sleep(2.0) - connection.passive_connection_start_then_init(config, ibc1, ibc0, ibc1_client_id, ibc0_client_id) + connection.passive_connection_start_then_init( + config, ibc1, ibc0, ibc1_client_id, ibc0_client_id) sleep(2.0) - connection.passive_connection_try_then_start(config, ibc1, ibc0, ibc1_client_id, ibc0_client_id) + connection.passive_connection_try_then_start( + config, ibc1, ibc0, ibc1_client_id, ibc0_client_id) sleep(2.0) - channel.passive_channel_start_then_init(config, ibc1, ibc0, ibc1_conn_id, port_id) + channel.passive_channel_start_then_init( + config, ibc1, ibc0, ibc1_conn_id, port_id) sleep(2.0) - channel.passive_channel_init_then_start(config, ibc1, ibc0, ibc1_conn_id, port_id) + channel.passive_channel_init_then_start( + config, ibc1, ibc0, ibc1_conn_id, port_id) sleep(2.0) - channel.passive_channel_try_then_start(config, ibc1, ibc0, ibc1_conn_id, ibc0_conn_id, port_id) + channel.passive_channel_try_then_start( + config, ibc1, ibc0, ibc1_conn_id, ibc0_conn_id, port_id) sleep(2.0) + if __name__ == "__main__": main() diff --git a/relayer-cli/src/commands/listen.rs b/relayer-cli/src/commands/listen.rs index 2e8b55ec23..fd3b0047d9 100644 --- a/relayer-cli/src/commands/listen.rs +++ b/relayer-cli/src/commands/listen.rs @@ -3,6 +3,7 @@ use std::{fmt, ops::Deref, str::FromStr, sync::Arc, thread}; use abscissa_core::{application::fatal_error, error::BoxError, Command, Options, Runnable}; use itertools::Itertools; use tokio::runtime::Runtime as TokioRuntime; +use tracing::{error, info}; use ibc::{events::IbcEvent, ics24_host::identifier::ChainId}; @@ -92,8 +93,8 @@ impl Runnable for ListenCmd { /// Listen to events pub fn listen(config: &ChainConfig, filters: &[EventFilter]) -> Result<(), BoxError> { - println!( - "[info] Listening for events `{}` on '{}'...", + info!( + "listening for events `{}` on '{}'...", filters.iter().format(", "), config.id ); @@ -116,15 +117,15 @@ pub fn listen(config: &ChainConfig, filters: &[EventFilter]) -> Result<(), BoxEr continue; } - println!("- Event batch at height {}", batch.height); + info!("- event batch at height {}", batch.height); for event in matching_events { - println!("+ {:#?}", event); + info!("+ {:#?}", event); } - println!(); + info!(""); } - Err(e) => println!("- Error: {}", e), + Err(e) => error!("- error: {}", e), } } diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 7d7d39384f..942153e532 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -4,14 +4,15 @@ use crossbeam_channel as channel; use futures::{ pin_mut, stream::{self, select_all, StreamExt}, - Stream, + Stream, TryStreamExt, }; use thiserror::Error; use tokio::task::JoinHandle; use tokio::{runtime::Runtime as TokioRuntime, sync::mpsc}; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, trace}; use tendermint_rpc::{ + error::Code, event::Event as RpcEvent, query::{EventType, Query}, Error as RpcError, Result as RpcResult, SubscriptionClient, WebSocketClient, @@ -22,7 +23,7 @@ use ibc::{events::IbcEvent, ics02_client::height::Height, ics24_host::identifier use crate::util::{ retry::{retry_count, retry_with_index, RetryResult}, - stream::group_while, + stream::try_group_while, }; mod retry_strategy { @@ -63,10 +64,27 @@ pub enum Error { #[error("failed to extract IBC events: {0}")] CollectEventsFailed(String), + #[error("{0}")] + SubscriptionCancelled(RpcError), + + #[error("RPC error: {0}")] + GenericRpcError(RpcError), + #[error("event monitor failed to dispatch event batch to subscribers")] ChannelSendFailed, } +impl Error { + fn canceled_or_generic(e: RpcError) -> Self { + match (e.code(), e.data()) { + (Code::ServerError, Some(msg)) if msg.contains("subscription was cancelled") => { + Self::SubscriptionCancelled(e) + } + _ => Self::GenericRpcError(e), + } + } +} + /// A batch of events from a chain at a specific height #[derive(Clone, Debug)] pub struct EventBatch { @@ -192,7 +210,7 @@ impl EventMonitor { let mut subscriptions = vec![]; for query in &self.event_queries { - trace!(chain.id = %self.chain_id, "subscribing to query: {}", query); + trace!("[{}] subscribing to query: {}", self.chain_id, query); let subscription = self .rt @@ -204,14 +222,15 @@ impl EventMonitor { self.subscriptions = Box::new(select_all(subscriptions)); - trace!(chain.id = %self.chain_id, "subscribed to all queries"); + trace!("[{}] subscribed to all queries", self.chain_id); Ok(()) } fn try_reconnect(&mut self) -> Result<()> { - trace!(chain.id = %self.chain_id, - "trying to reconnect to WebSocket endpoint {}", + trace!( + "[{}] trying to reconnect to WebSocket endpoint {}", + self.chain_id, self.node_addr ); @@ -229,45 +248,49 @@ impl EventMonitor { std::mem::swap(&mut self.driver_handle, &mut driver_handle); trace!( - chain.id = %self.chain_id, - "reconnected to WebSocket endpoint {}", - self.node_addr, + "[{}] reconnected to WebSocket endpoint {}", + self.chain_id, + self.node_addr ); // Shut down previous client - trace!(chain.id = %self.chain_id, "gracefully shutting down previous client"); + trace!( + "[{}] gracefully shutting down previous client", + self.chain_id + ); + let _ = client.close(); self.rt .block_on(driver_handle) .map_err(|e| Error::ClientTerminationFailed(Arc::new(e)))?; - trace!(chain.id = %self.chain_id, "previous client successfully shutdown"); + trace!("[{}] previous client successfully shutdown", self.chain_id); Ok(()) } /// Try to resubscribe to events fn try_resubscribe(&mut self) -> Result<()> { - trace!(chain.id = %self.chain_id, "trying to resubscribe to events"); + trace!("[{}] trying to resubscribe to events", self.chain_id); self.subscribe() } - /// Attempt to restart the WebSocket client using the given retry strategy. + /// Attempt to reconnect the WebSocket client using the given retry strategy. /// /// See the [`retry`](https://docs.rs/retry) crate and the /// [`crate::util::retry`] module for more information. - fn restart(&mut self) { + fn reconnect(&mut self) { let result = retry_with_index(retry_strategy::default(), |_| { // Try to reconnect if let Err(e) = self.try_reconnect() { - trace!(chain.id = %self.chain_id, "error when reconnecting: {}", e); + trace!("[{}] error when reconnecting: {}", self.chain_id, e); return RetryResult::Retry(()); } // Try to resubscribe if let Err(e) = self.try_resubscribe() { - trace!(chain.id = %self.chain_id, "error when reconnecting: {}", e); + trace!("[{}] error when resubscribing: {}", self.chain_id, e); return RetryResult::Retry(()); } @@ -276,14 +299,14 @@ impl EventMonitor { match result { Ok(()) => info!( - chain.id = %self.chain_id, - "successfully reconnected to WebSocket endpoint {}", - self.node_addr + "[{}] successfully reconnected to WebSocket endpoint {}", + self.chain_id, self.node_addr ), Err(retries) => error!( - chain.id = %self.chain_id, - "failed to reconnect to {} after {} retries", - self.node_addr, retry_count(&retries) + "[{}] failed to reconnect to {} after {} retries", + self.chain_id, + self.node_addr, + retry_count(&retries) ), } } @@ -291,7 +314,7 @@ impl EventMonitor { /// Event monitor loop #[allow(clippy::while_let_loop)] pub fn run(mut self) { - debug!(chain.id = %self.chain_id, "starting event monitor"); + debug!("[{}] starting event monitor", self.chain_id); // Continuously run the event loop, so that when it aborts // because of WebSocket client restart, we pick up the work again. @@ -302,7 +325,7 @@ impl EventMonitor { } } - debug!(chain.id = %self.chain_id, "event monitor is shutting down"); + debug!("[{}] event monitor is shutting down", self.chain_id); // Close the WebSocket connection let _ = self.client.close(); @@ -310,7 +333,10 @@ impl EventMonitor { // Wait for the WebSocket driver to finish let _ = self.rt.block_on(self.driver_handle); - trace!(chain.id = %self.chain_id, "event monitor has successfully shut down"); + trace!( + "[{}] event monitor has successfully shut down", + self.chain_id + ); } fn run_loop(&mut self) -> Next { @@ -336,32 +362,64 @@ impl EventMonitor { let result = rt.block_on(async { tokio::select! { - Some(batch) = batches.next() => Ok(batch), + Some(batch) = batches.next() => batch, Some(err) = self.rx_err.recv() => Err(Error::WebSocketDriver(err)), } }); match result { Ok(batch) => self.process_batch(batch).unwrap_or_else(|e| { - warn!(chain.id = %self.chain_id, "{}", e); + error!("[{}] {}", self.chain_id, e); }), + Err(Error::SubscriptionCancelled(reason)) => { + error!( + "[{}] subscription cancelled, reason: {}", + self.chain_id, reason + ); + + self.propagate_error(Error::SubscriptionCancelled(reason)) + .unwrap_or_else(|e| { + error!("[{}] {}", self.chain_id, e); + }); + + // Reconnect to the WebSocket endpoint, and subscribe again to the queries. + self.reconnect(); + + // Abort this event loop, the `run` method will start a new one. + // We can't just write `return self.run()` here because Rust + // does not perform tail call optimization, and we would + // thus potentially blow up the stack after many restarts. + return Next::Continue; + } Err(e) => { - error!(chain.id = %self.chain_id, "failed to collect events: {}", e); + error!("[{}] failed to collect events: {}", self.chain_id, e); - // Restart the event monitor, reconnect to the WebSocket endpoint, - // and subscribe again to the queries. - self.restart(); + // Reconnect to the WebSocket endpoint, and subscribe again to the queries. + self.reconnect(); // Abort this event loop, the `run` method will start a new one. // We can't just write `return self.run()` here because Rust // does not perform tail call optimization, and we would // thus potentially blow up the stack after many restarts. - break; + return Next::Continue; } } } + } - Next::Continue + /// Propagate error to subscribers. + /// + /// The main use case for propagating RPC errors is for the [`Supervisor`] + /// to notice that the WebSocket connection or subscription has been closed, + /// and to trigger a clearing of packets, as this typically means that we have + /// missed a bunch of events which were emitted after the subscrption was closed. + /// In that case, this error will be handled in [`Supervisor::handle_batch`]. + fn propagate_error(&self, error: Error) -> Result<()> { + self.tx_batch + .send(Err(error)) + .map_err(|_| Error::ChannelSendFailed)?; + + Ok(()) } /// Collect the IBC events from the subscriptions @@ -375,28 +433,32 @@ impl EventMonitor { } /// Collect the IBC events from an RPC event -fn collect_events(chain_id: &ChainId, event: RpcEvent) -> impl Stream { +fn collect_events( + chain_id: &ChainId, + event: RpcEvent, +) -> impl Stream> { let events = crate::event::rpc::get_all_events(chain_id, event).unwrap_or_default(); - stream::iter(events) + stream::iter(events).map(Ok) } /// Convert a stream of RPC event into a stream of event batches fn stream_batches( subscriptions: Box, chain_id: ChainId, -) -> impl Stream { +) -> impl Stream> { let id = chain_id.clone(); // Collect IBC events from each RPC event let events = subscriptions - .filter_map(|rpc_event| async { rpc_event.ok() }) - .flat_map(move |rpc_event| collect_events(&id, rpc_event)); + .map_ok(move |rpc_event| collect_events(&id, rpc_event)) + .map_err(Error::canceled_or_generic) + .try_flatten(); // Group events by height - let grouped = group_while(events, |(h0, _), (h1, _)| h0 == h1); + let grouped = try_group_while(events, |(h0, _), (h1, _)| h0 == h1); // Convert each group to a batch - grouped.map(move |events| { + grouped.map_ok(move |events| { let height = events .first() .map(|(h, _)| h) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 31bbe1e006..cca77dfe18 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -248,16 +248,18 @@ impl RelayPath { result } - fn relay_pending_packets(&mut self, height: Height) -> Result<(), LinkError> { + fn relay_pending_packets(&mut self, height: Option) -> Result<(), LinkError> { for _ in 0..MAX_RETRIES { - if self - .build_recv_packet_and_timeout_msgs(Some(height)) - .is_ok() - && self.build_packet_ack_msgs(Some(height)).is_ok() - { + let cleared = self + .build_recv_packet_and_timeout_msgs(height) + .and_then(|()| self.build_packet_ack_msgs(height)) + .is_ok(); + + if cleared { return Ok(()); } } + Err(LinkError::OldPacketClearingFailed) } @@ -265,7 +267,7 @@ impl RelayPath { /// is set or if clearing is forced by the caller. pub fn schedule_packet_clearing( &mut self, - height: Height, + height: Option, force: bool, ) -> Result<(), LinkError> { if self.clear_packets || force { @@ -273,25 +275,23 @@ impl RelayPath { // Clearing may still happen: upon new blocks, when `force = true`. self.clear_packets = false; - info!( - "[{}] clearing pending packets from events before height {}", - self, height - ); + let clear_height = if let Some(height) = height { + Some(height.decrement().map_err(|e| { + LinkError::Failed(format!( + "Cannot clear packets at height {}, because this height cannot be decremented: {}", + height, + e.to_string() + )) + })?) + } else { + None + }; - let clear_height = height.decrement().map_err(|e| { - LinkError::Failed(format!( - "Cannot clear packets @height {}, because this height cannot be decremented: {}", - height, - e.to_string() - )) - })?; + info!(height = ?clear_height, "[{}] clearing pending packets", self); self.relay_pending_packets(clear_height)?; - info!( - "[{}] finished scheduling the clearing of pending packets", - self - ); + info!(height = ?clear_height, "[{}] finished scheduling pending packets clearing", self); } Ok(()) diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 325110248e..5d1aec8b33 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -19,7 +19,7 @@ use crate::{ chain::handle::ChainHandle, config::{ChainConfig, Config}, event, - event::monitor::{EventBatch, UnwrapOrClone}, + event::monitor::{Error as EventError, EventBatch, UnwrapOrClone}, object::Object, registry::Registry, telemetry::Telemetry, @@ -534,10 +534,11 @@ impl Supervisor { fn handle_batch(&mut self, chain: Box, batch: ArcBatch) { let chain_id = chain.id(); - let result = batch - .unwrap_or_clone() - .map_err(Into::into) - .and_then(|batch| self.process_batch(chain, batch)); + let result = match batch.unwrap_or_clone() { + Ok(batch) => self.process_batch(chain, batch), + Err(EventError::SubscriptionCancelled(_)) => self.clear_pending_packets(&chain_id), + Err(e) => Err(e.into()), + }; if let Err(e) = result { error!("[{}] error during batch processing: {}", chain_id, e); @@ -592,6 +593,14 @@ impl Supervisor { Ok(()) } + + fn clear_pending_packets(&mut self, chain_id: &ChainId) -> Result<(), BoxError> { + for worker in self.workers.workers_for_chain(chain_id) { + worker.clear_pending_packets()?; + } + + Ok(()) + } } /// Describes the result of [`collect_events`]. diff --git a/relayer/src/util/stream.rs b/relayer/src/util/stream.rs index 20aeba702b..36aeba243a 100644 --- a/relayer/src/util/stream.rs +++ b/relayer/src/util/stream.rs @@ -3,14 +3,43 @@ use futures::stream::Stream; /// ## Example /// -/// ```rust,ignore -/// let input = stream::iter(vec![0, 0, 0, 1, 1, 2, 3, 3, 3, 3]); -/// let output = group_while(stream, |a, b| a == b).collect::>().await; -/// assert_eq!(output, vec![vec![0, 0, 0], vec![1, 1], vec![2], vec![3, 3, 3, 3]]); +/// ```rust +/// use ibc_relayer::util::stream::try_group_while; +/// use futures::{stream, StreamExt, executor::block_on}; +/// +/// let input = stream::iter(vec![ +/// Ok(1), +/// Ok(1), +/// Ok(2), +/// Err(()), +/// Ok(2), +/// Ok(2), +/// Ok(3), +/// Ok(3), +/// Err(()), +/// ]); +/// +/// let output = try_group_while(input, |a, b| a == b).collect::>(); +/// +/// assert_eq!( +/// block_on(output), +/// vec![ +/// Ok(vec![1, 1]), +/// Ok(vec![2]), +/// Err(()), +/// Ok(vec![2, 2]), +/// Ok(vec![3]), +/// Ok(vec![3]), +/// Err(()), +/// ] +/// ); /// ``` -pub fn group_while(input: S, group_these: F) -> impl Stream> +pub fn try_group_while( + input: S, + group_these: F, +) -> impl Stream, E>> where - S: Stream, + S: Stream>, F: Fn(&A, &A) -> bool + 'static, { struct State { @@ -22,71 +51,126 @@ where let mut state = None; for await x in input { - match &mut state { - None => { - state = Some(State { cur: x, group: vec![] }); - }, - Some(state) if group_these(&state.cur, &x) => { - let prev = std::mem::replace(&mut state.cur, x); - state.group.push(prev); - }, - Some(state) => { - let cur = std::mem::replace(&mut state.cur, x); - state.group.push(cur); - let group = std::mem::replace(&mut state.group, vec![]); - yield group; + match x { + Ok(x) => { + match &mut state { + None => { + state = Some(State { cur: x, group: vec![] }); + }, + Some(state) if group_these(&state.cur, &x) => { + let prev = std::mem::replace(&mut state.cur, x); + state.group.push(prev); + }, + Some(state) => { + let cur = std::mem::replace(&mut state.cur, x); + state.group.push(cur); + let group = std::mem::take(&mut state.group); + yield Ok(group); + } + } + } + Err(e) => { + if let Some(cur_state) = std::mem::take(&mut state) { + if !cur_state.group.is_empty() { + yield Ok(cur_state.group); + } + yield Ok(vec![cur_state.cur]); + } + + yield Err(e); } } } - if let Some(State{ cur, mut group }) = state { + if let Some(State { cur, mut group }) = state { group.push(cur); - yield group; + yield Ok(group); } } } #[cfg(test)] mod tests { - use super::group_while; + use super::*; + use futures::{executor::block_on, stream, StreamExt}; use test_env_log::test; + fn ok(a: A) -> Result { + Ok(a) + } + + #[test] + fn try_group_while_non_empty() { + let input = stream::iter(vec![ + ok((1, 1)), + Ok((1, 2)), + Ok((2, 1)), + Ok((3, 1)), + Ok((3, 2)), + Ok((3, 3)), + Ok((4, 1)), + Ok((5, 1)), + Ok((5, 2)), + ]); + + let output = try_group_while(input, |a, b| a.0 == b.0).collect::>(); + let result = block_on(output); + + assert_eq!( + result, + vec![ + Ok(vec![(1, 1), (1, 2)]), + Ok(vec![(2, 1)]), + Ok(vec![(3, 1), (3, 2), (3, 3)]), + Ok(vec![(4, 1)]), + Ok(vec![(5, 1), (5, 2)]) + ] + ); + } + #[test] - fn group_while_non_empty() { + fn try_group_while_err() { let input = stream::iter(vec![ - (1, 1), - (1, 2), - (2, 1), - (3, 1), - (3, 2), - (3, 3), - (4, 1), - (5, 1), - (5, 2), + ok((1, 1)), + Ok((1, 2)), + Ok((2, 1)), + Ok((3, 1)), + Ok((3, 2)), + Err(()), + Ok((3, 3)), + Ok((4, 1)), + Ok((5, 1)), + Ok((5, 2)), + Err(()), ]); - let output = group_while(input, |a, b| a.0 == b.0).collect::>(); + let output = try_group_while(input, |a, b| a.0 == b.0).collect::>(); let result = block_on(output); assert_eq!( result, vec![ - vec![(1, 1), (1, 2)], - vec![(2, 1)], - vec![(3, 1), (3, 2), (3, 3)], - vec![(4, 1)], - vec![(5, 1), (5, 2)] + Ok(vec![(1, 1), (1, 2)]), + Ok(vec![(2, 1)]), + Ok(vec![(3, 1)]), + Ok(vec![(3, 2)]), + Err(()), + Ok(vec![(3, 3)]), + Ok(vec![(4, 1)]), + Ok(vec![(5, 1)]), + Ok(vec![(5, 2)]), + Err(()), ] ); } #[test] - fn group_while_empty() { - let input = stream::iter(Vec::::new()); - let output = group_while(input, |a, b| a == b).collect::>(); + fn try_group_while_empty() { + let input = stream::iter(Vec::>::new()); + let output = try_group_while(input, |a, b| a == b).collect::>(); let result = block_on(output); - assert_eq!(result, Vec::>::new()); + assert_eq!(result, Vec::, ()>>::new()); } } diff --git a/relayer/src/worker/channel.rs b/relayer/src/worker/channel.rs index 7f28f06e64..5403099c37 100644 --- a/relayer/src/worker/channel.rs +++ b/relayer/src/worker/channel.rs @@ -107,6 +107,8 @@ impl ChannelWorker { info!(channel = %self.channel.short_name(), "shutting down Channel worker"); return Ok(()); } + + WorkerCmd::ClearPendingPackets => Ok(()), // nothing to do }; if let Err(retries) = result { diff --git a/relayer/src/worker/client.rs b/relayer/src/worker/client.rs index f7577ef979..1301d17e48 100644 --- a/relayer/src/worker/client.rs +++ b/relayer/src/worker/client.rs @@ -120,8 +120,11 @@ impl ClientWorker { Next::Continue } - WorkerCmd::Shutdown => Next::Abort, + WorkerCmd::NewBlock { .. } => Next::Continue, + WorkerCmd::ClearPendingPackets => Next::Continue, + + WorkerCmd::Shutdown => Next::Abort, } } diff --git a/relayer/src/worker/cmd.rs b/relayer/src/worker/cmd.rs index 740ac10248..2a2725b5de 100644 --- a/relayer/src/worker/cmd.rs +++ b/relayer/src/worker/cmd.rs @@ -8,9 +8,12 @@ pub enum WorkerCmd { /// A batch of packet events need to be relayed IbcEvents { batch: EventBatch }, - /// A batch of [`NewBlock`] events need to be relayed + /// A new block has been committed NewBlock { height: Height, new_block: NewBlock }, + /// Trigger a pending packets clear + ClearPendingPackets, + /// Shutdown the worker Shutdown, } diff --git a/relayer/src/worker/connection.rs b/relayer/src/worker/connection.rs index 5d860a8672..8464541670 100644 --- a/relayer/src/worker/connection.rs +++ b/relayer/src/worker/connection.rs @@ -111,6 +111,8 @@ impl ConnectionWorker { info!(connection = %self.connection.short_name(), "shutting down Connection worker"); return Ok(()); } + + WorkerCmd::ClearPendingPackets => Ok(()), // nothing to do }; if let Err(retries) = result { diff --git a/relayer/src/worker/handle.rs b/relayer/src/worker/handle.rs index c3ef0f0ec1..2d18420134 100644 --- a/relayer/src/worker/handle.rs +++ b/relayer/src/worker/handle.rs @@ -64,12 +64,18 @@ impl WorkerHandle { Ok(()) } - /// Send a [`NewBlock`] event to the worker. + /// Notify the worker that a new block as been committed. pub fn send_new_block(&self, height: Height, new_block: NewBlock) -> Result<(), BoxError> { self.tx.send(WorkerCmd::NewBlock { height, new_block })?; Ok(()) } + /// Instruct the worker to clear pending packets. + pub fn clear_pending_packets(&self) -> Result<(), BoxError> { + self.tx.send(WorkerCmd::ClearPendingPackets)?; + Ok(()) + } + /// Shutdown the worker. pub fn shutdown(&self) -> Result<(), BoxError> { self.tx.send(WorkerCmd::Shutdown)?; diff --git a/relayer/src/worker/map.rs b/relayer/src/worker/map.rs index 33bae8144f..63c4e0be9e 100644 --- a/relayer/src/worker/map.rs +++ b/relayer/src/worker/map.rs @@ -176,6 +176,14 @@ impl WorkerMap { .collect() } + /// List the [`WorkerHandle`]s associated with the given chain. + pub fn workers_for_chain(&self, chain_id: &ChainId) -> Vec<&WorkerHandle> { + self.workers + .iter() + .filter_map(|(o, h)| o.for_chain(chain_id).then(|| h)) + .collect() + } + /// Shutdown the worker associated with the given [`Object`]. pub fn shutdown_worker(&mut self, object: &Object) { if let Some(handle) = self.workers.remove(object) { diff --git a/relayer/src/worker/packet.rs b/relayer/src/worker/packet.rs index f1aa9eab76..83f77e2b36 100644 --- a/relayer/src/worker/packet.rs +++ b/relayer/src/worker/packet.rs @@ -110,10 +110,13 @@ impl PacketWorker { // once at start, and _forced_ at predefined block intervals. let force_packet_clearing = self.clear_packets_interval != 0 && height.revision_height % self.clear_packets_interval == 0; + link.a_to_b - .schedule_packet_clearing(height, force_packet_clearing) + .schedule_packet_clearing(Some(height), force_packet_clearing) } + WorkerCmd::ClearPendingPackets => link.a_to_b.schedule_packet_clearing(None, true), + WorkerCmd::Shutdown => { return RetryResult::Ok(Step::Shutdown); }