From 0177c261506991f03014011e2236818c6b40e91a Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Fri, 19 Jul 2024 07:10:41 +1200 Subject: [PATCH] fix: Avoid restarting block streams which have just started (#895) On Block Streamer startup, all Block Streams are started in one big herd. Some of which can take a while to start processing. These end up getting marked as "Stalled", and are therefore restarted. This PR increases the monitoring scrape interval, so that Block Streams have a longer window to prove they are "Processing", and therefore do not get restarted. Also bumped the "stale" check in Coordinator, so they are less likely to get marked as Stale. --- block-streamer/src/block_stream.rs | 2 +- coordinator/src/handlers/block_streams.rs | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index bf19038b..eff2a0d1 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -131,7 +131,7 @@ impl BlockStream { redis.get_last_processed_block(&config).await.unwrap(); loop { - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs(15)).await; let new_last_processed_block = if let Ok(block) = redis.get_last_processed_block(&config).await { diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index 93878d61..9f71149b 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -246,7 +246,7 @@ impl BlockStreamsHandler { let updated_at = SystemTime::UNIX_EPOCH + Duration::from_secs(health.updated_at_timestamp_secs); - let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(30); + let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(60); let stalled = matches!( health.processing_state.try_into(), Ok(ProcessingState::Stalled) @@ -254,13 +254,17 @@ impl BlockStreamsHandler { if !stale && !stalled { return Ok(()); + } else { + tracing::info!(stale, stalled, "Restarting stalled block stream"); } + } else { + tracing::info!("Restarting stalled block stream"); } - tracing::info!("Restarting stalled block stream"); - self.stop(block_stream.stream_id.clone()).await?; - self.resume_block_stream(config).await?; + + let height = self.get_continuation_block_height(config).await?; + self.start(height, config).await?; Ok(()) }