From c9b274dbe14e41707ef65aedf32fe6363dba0bec Mon Sep 17 00:00:00 2001 From: rllola Date: Fri, 8 Mar 2024 15:28:48 +0100 Subject: [PATCH 1/2] added a timeout (still temp solution until the upstream PR is accepted) --- src/indexer/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 7d9c9c1..bae77a3 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -13,7 +13,7 @@ use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; use tokio::task::JoinHandle; use tracing::{info, instrument}; - +use tokio::time::timeout; use crate::config::IndexerConfig; use crate::utils::load_checksums; @@ -47,7 +47,7 @@ async fn get_block(block_height: u32, client: &HttpClient) -> (Block, block_resu let dur = instant.elapsed(); match response { - Ok(resp) => { + Ok(resp) => { info!("Got block {}", block_height); let labels = [( "indexer_get_block: ", @@ -156,7 +156,7 @@ fn blocks_stream( block: u64, client: &HttpClient, ) -> impl Stream + '_ { - futures::stream::iter(block..).then(move |i| async move { get_block(i as u32, client).await }) + futures::stream::iter(block..).then(move |i| async move { timeout(Duration::from_secs(30), get_block(i as u32, client)).await.unwrap() }) } /// Start the indexer service blocking current thread. From cc98b333b1b46bfa3236843080ac036d9d60a65c Mon Sep 17 00:00:00 2001 From: rllola Date: Fri, 8 Mar 2024 15:39:08 +0100 Subject: [PATCH 2/2] rustfmt --- src/indexer/mod.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index bae77a3..a4add30 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -1,3 +1,5 @@ +use crate::config::IndexerConfig; +use crate::utils::load_checksums; use futures::stream::StreamExt; use futures_util::pin_mut; use futures_util::Stream; @@ -12,10 +14,8 @@ use tendermint_rpc::{self, Client, HttpClient}; use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; use tokio::task::JoinHandle; -use tracing::{info, instrument}; use tokio::time::timeout; -use crate::config::IndexerConfig; -use crate::utils::load_checksums; +use tracing::{info, instrument}; pub mod utils; @@ -47,7 +47,7 @@ async fn get_block(block_height: u32, client: &HttpClient) -> (Block, block_resu let dur = instant.elapsed(); match response { - Ok(resp) => { + Ok(resp) => { info!("Got block {}", block_height); let labels = [( "indexer_get_block: ", @@ -156,7 +156,11 @@ fn blocks_stream( block: u64, client: &HttpClient, ) -> impl Stream + '_ { - futures::stream::iter(block..).then(move |i| async move { timeout(Duration::from_secs(30), get_block(i as u32, client)).await.unwrap() }) + futures::stream::iter(block..).then(move |i| async move { + timeout(Duration::from_secs(30), get_block(i as u32, client)) + .await + .unwrap() + }) } /// Start the indexer service blocking current thread.