Skip to content

Commit

Permalink
Propagades timeout error (Zondax#162)
Browse files Browse the repository at this point in the history
The enhancement shifts the timeout mechanism from the block producer to
the consumer side. This change ensures that if the block producer
stalls, the consumer, instead of indefinitely waiting on an empty
channel, will encounter a timeout. This timeout triggers an error that
is then easily propagated back to the main function. Consequently, this
allows the entire indexing process to terminate gracefully, rather than
abruptly due to a panic.
  • Loading branch information
neithanmo authored Mar 12, 2024
1 parent 8fc52be commit aecf92d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub enum Error {
ParseIntError(#[from] ParseIntError),
#[error("ParseFloat error")]
ParseFloatError(#[from] ParseFloatError),
#[error("Timeout error")]
Timeout(#[from] tokio::time::error::Elapsed),
}

impl From<SendError<(tendermint::Block, block_results::Response)>> for Error {
Expand Down
26 changes: 14 additions & 12 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ const WAIT_FOR_BLOCK: u64 = 10;
// processes.
const MAX_BLOCKS_IN_CHANNEL: usize = 100;

// Timeout duration after triggering an error
// due to block_producer task hanging
const TIMEOUT_DURATION: Duration = Duration::from_secs(30);

// Block info required to be saved
type BlockInfo = (Block, block_results::Response);

#[instrument(skip(client))]
async fn get_block(
block_height: u32,
chain_name: &str,
client: &HttpClient,
) -> (Block, block_results::Response) {
async fn get_block(block_height: u32, chain_name: &str, client: &HttpClient) -> BlockInfo {
loop {
let height = Height::from(block_height);
tracing::trace!(message = "Requesting block: ", block_height);
Expand Down Expand Up @@ -167,12 +167,9 @@ fn blocks_stream<'a>(
block: u64,
chain_name: &'a str,
client: &'a HttpClient,
) -> impl Stream<Item = (Block, block_results::Response)> + 'a {
futures::stream::iter(block..).then(move |i| async move {
timeout(Duration::from_secs(30), get_block(i as u32, chain_name, client))
.await
.unwrap()
})
) -> impl Stream<Item = BlockInfo> + 'a {
futures::stream::iter(block..)
.then(move |i| async move { get_block(i as u32, chain_name, client).await })
}

/// Start the indexer service blocking current thread.
Expand Down Expand Up @@ -238,7 +235,12 @@ pub async fn start_indexing(
spawn_block_producer(current_height as _, chain_name, client, producer_shutdown);

// Block consumer that stores block into the database
while let Some(block) = rx.recv().await {
// Propagades a Timeout error if producer task hangs which means it stop sending new blocks
// through the channel for longer than 30 seconds
while let Some(block) = timeout(TIMEOUT_DURATION, rx.recv())
.await
.map_err(Error::Timeout)?
{
// block is now the block info and the block results
if let Err(e) = db.save_block(&block.0, &block.1, &checksums_map).await {
// shutdown producer task
Expand Down

0 comments on commit aecf92d

Please sign in to comment.