Skip to content
This repository has been archived by the owner on Jun 19, 2024. It is now read-only.

Propagades timeout error #162

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub enum Error {
ParseIntError(#[from] ParseIntError),
#[error("ParseFloat error")]
ParseFloatError(#[from] ParseFloatError),
#[error("Timeout error")]
Timeout(#[from] tokio::time::error::Elapsed),
}

impl From<SendError<(tendermint::Block, block_results::Response)>> for Error {
Expand Down
26 changes: 14 additions & 12 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ const WAIT_FOR_BLOCK: u64 = 10;
// processes.
const MAX_BLOCKS_IN_CHANNEL: usize = 100;

// Timeout duration after triggering an error
// due to block_producer task hanging
const TIMEOUT_DURATION: Duration = Duration::from_secs(30);

// Block info required to be saved
type BlockInfo = (Block, block_results::Response);

#[instrument(skip(client))]
async fn get_block(
block_height: u32,
chain_name: &str,
client: &HttpClient,
) -> (Block, block_results::Response) {
async fn get_block(block_height: u32, chain_name: &str, client: &HttpClient) -> BlockInfo {
loop {
let height = Height::from(block_height);
tracing::trace!(message = "Requesting block: ", block_height);
Expand Down Expand Up @@ -167,12 +167,9 @@ fn blocks_stream<'a>(
block: u64,
chain_name: &'a str,
client: &'a HttpClient,
) -> impl Stream<Item = (Block, block_results::Response)> + 'a {
futures::stream::iter(block..).then(move |i| async move {
timeout(Duration::from_secs(30), get_block(i as u32, chain_name, client))
.await
.unwrap()
})
) -> impl Stream<Item = BlockInfo> + 'a {
futures::stream::iter(block..)
.then(move |i| async move { get_block(i as u32, chain_name, client).await })
}

/// Start the indexer service blocking current thread.
Expand Down Expand Up @@ -238,7 +235,12 @@ pub async fn start_indexing(
spawn_block_producer(current_height as _, chain_name, client, producer_shutdown);

// Block consumer that stores block into the database
while let Some(block) = rx.recv().await {
// Propagades a Timeout error if producer task hangs which means it stop sending new blocks
// through the channel for longer than 30 seconds
while let Some(block) = timeout(TIMEOUT_DURATION, rx.recv())
.await
.map_err(Error::Timeout)?
{
// block is now the block info and the block results
if let Err(e) = db.save_block(&block.0, &block.1, &checksums_map).await {
// shutdown producer task
Expand Down
Loading