diff --git a/src/error.rs b/src/error.rs index 99f8cae..71941b6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -59,6 +59,8 @@ pub enum Error { ParseIntError(#[from] ParseIntError), #[error("ParseFloat error")] ParseFloatError(#[from] ParseFloatError), + #[error("Timeout error")] + Timeout(#[from] tokio::time::error::Elapsed), } impl From> for Error { diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 6bb3fc3..75aee3b 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -31,15 +31,15 @@ const WAIT_FOR_BLOCK: u64 = 10; // processes. const MAX_BLOCKS_IN_CHANNEL: usize = 100; +// Timeout duration after triggering an error +// due to block_producer task hanging +const TIMEOUT_DURATION: Duration = Duration::from_secs(30); + // Block info required to be saved type BlockInfo = (Block, block_results::Response); #[instrument(skip(client))] -async fn get_block( - block_height: u32, - chain_name: &str, - client: &HttpClient, -) -> (Block, block_results::Response) { +async fn get_block(block_height: u32, chain_name: &str, client: &HttpClient) -> BlockInfo { loop { let height = Height::from(block_height); tracing::trace!(message = "Requesting block: ", block_height); @@ -167,12 +167,9 @@ fn blocks_stream<'a>( block: u64, chain_name: &'a str, client: &'a HttpClient, -) -> impl Stream + 'a { - futures::stream::iter(block..).then(move |i| async move { - timeout(Duration::from_secs(30), get_block(i as u32, chain_name, client)) - .await - .unwrap() - }) +) -> impl Stream + 'a { + futures::stream::iter(block..) + .then(move |i| async move { get_block(i as u32, chain_name, client).await }) } /// Start the indexer service blocking current thread. @@ -238,7 +235,12 @@ pub async fn start_indexing( spawn_block_producer(current_height as _, chain_name, client, producer_shutdown); // Block consumer that stores block into the database - while let Some(block) = rx.recv().await { + // Propagades a Timeout error if producer task hangs which means it stop sending new blocks + // through the channel for longer than 30 seconds + while let Some(block) = timeout(TIMEOUT_DURATION, rx.recv()) + .await + .map_err(Error::Timeout)? + { // block is now the block info and the block results if let Err(e) = db.save_block(&block.0, &block.1, &checksums_map).await { // shutdown producer task