From a1b40ae23c9f878d1614d2f100745d988a6d655a Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 2 Jun 2021 12:03:31 +0200 Subject: [PATCH] Restart the event monitor event loop to reinitialize the subscriptions `Stream` after a restart (#1027) * Add chain id to tracing messages in event monitor * Restart the event loop after restarting the WebSocket client * Restart the event loop in a stack-safe fashion * Remove trampoline, and just use `return` instead --- relayer/src/event/monitor.rs | 62 +++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 19 deletions(-) diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 7b213c0f65..c30205c034 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -192,7 +192,7 @@ impl EventMonitor { let mut subscriptions = vec![]; for query in &self.event_queries { - trace!("subscribing to query: {}", query); + trace!(chain.id = %self.chain_id, "subscribing to query: {}", query); let subscription = self .rt @@ -204,14 +204,14 @@ impl EventMonitor { self.subscriptions = Box::new(select_all(subscriptions)); - trace!("subscribed to all queries"); + trace!(chain.id = %self.chain_id, "subscribed to all queries"); Ok(()) } fn try_reconnect(&mut self) -> Result<()> { - trace!( - "trying to reconnect to WebSocket endpoint: {}", + trace!(chain.id = %self.chain_id, + "trying to reconnect to WebSocket endpoint {}", self.node_addr ); @@ -228,27 +228,28 @@ impl EventMonitor { std::mem::swap(&mut self.client, &mut client); std::mem::swap(&mut self.driver_handle, &mut driver_handle); - trace!("reconnected to WebSocket endpoint: {}", self.node_addr); + trace!( + chain.id = %self.chain_id, + "reconnected to WebSocket endpoint {}", + self.node_addr, + ); // Shut down previous client - trace!("gracefully shutting down previous client"); - if let Err(e) = client.close() { - trace!("previous websocket client closing failure: {}", e); - } + trace!(chain.id = %self.chain_id, "gracefully shutting down previous client"); + let _ = client.close(); self.rt .block_on(driver_handle) .map_err(|e| Error::ClientTerminationFailed(Arc::new(e)))?; - trace!("previous client successfully shutdown"); + trace!(chain.id = %self.chain_id, "previous client successfully shutdown"); Ok(()) } /// Try to resubscribe to events fn try_resubscribe(&mut self) -> Result<()> { - trace!("trying to resubscribe to events"); - + trace!(chain.id = %self.chain_id, "trying to resubscribe to events"); self.subscribe() } @@ -260,13 +261,13 @@ impl EventMonitor { let result = retry_with_index(retry_strategy::default(), |index| { // Try to reconnect if let Err(e) = self.try_reconnect() { - trace!("error when reconnecting: {}", e); + trace!(chain.id = %self.chain_id, "error when reconnecting: {}", e); return RetryResult::Retry(index); } // Try to resubscribe if let Err(e) = self.try_resubscribe() { - trace!("error when reconnecting: {}", e); + trace!(chain.id = %self.chain_id, "error when reconnecting: {}", e); return RetryResult::Retry(index); } @@ -274,8 +275,16 @@ impl EventMonitor { }); match result { - Ok(()) => info!("successfully reconnected to WebSocket endpoint"), - Err(retries) => error!("failed to reconnect after {} retries", retries), + Ok(()) => info!( + chain.id = %self.chain_id, + "successfully reconnected to WebSocket endpoint {}", + self.node_addr + ), + Err(retries) => error!( + chain.id = %self.chain_id, + "failed to reconnect to {} after {} retries", + self.node_addr, retries + ), } } @@ -283,6 +292,14 @@ impl EventMonitor { pub fn run(mut self) { debug!(chain.id = %self.chain_id, "starting event monitor"); + // Continuously run the event loop, so that when it aborts + // because of WebSocket client restart, we pick up the work again. + loop { + self.run_loop(); + } + } + + fn run_loop(&mut self) { // Take ownership of the subscriptions let subscriptions = std::mem::replace(&mut self.subscriptions, Box::new(futures::stream::empty())); @@ -306,13 +323,20 @@ impl EventMonitor { match result { Ok(batch) => self.process_batch(batch).unwrap_or_else(|e| { - error!("failed to process event batch: {}", e); + error!(chain.id = %self.chain_id, "failed to process event batch: {}", e); }), Err(e) => { - error!("failed to collect events: {}", e); + error!(chain.id = %self.chain_id, "failed to collect events: {}", e); - // Restart the event monitor + // Restart the event monitor, reconnect to the WebSocket endpoint, + // and subscribe again to the queries. self.restart(); + + // Abort this event loop, the `run` method will start a new one. + // We can't just write `return self.run()` here because Rust + // does not perform tail call optimization, and we would + // thus potentially blow up the stack after many restarts. + return; } } }