Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry send_tx on account sequence mismatch #1349

Merged
merged 16 commits into from
Dec 21, 2021
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 90 additions & 24 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use bitcoin::hashes::hex::ToHex;
use itertools::Itertools;
use prost::Message;
use prost_types::Any;
use tendermint::abci::Path as TendermintABCIPath;
use tendermint::abci::{Code, Path as TendermintABCIPath};
use tendermint::account::Id as AccountId;
use tendermint::block::Height;
use tendermint::consensus::Params;
Expand All @@ -24,7 +24,7 @@ use tendermint_rpc::query::{EventType, Query};
use tendermint_rpc::{endpoint::broadcast::tx_sync::Response, Client, HttpClient, Order};
use tokio::runtime::Runtime as TokioRuntime;
use tonic::codegen::http::Uri;
use tracing::{debug, trace, warn};
use tracing::{debug, error, trace, warn};

use ibc::downcast;
use ibc::events::{from_tx_response_event, IbcEvent};
Expand Down Expand Up @@ -78,6 +78,7 @@ use crate::keyring::{KeyEntry, KeyRing, Store};
use crate::light_client::tendermint::LightClient as TmLightClient;
use crate::light_client::LightClient;
use crate::light_client::Verified;
use crate::util::retry::{retry_with_index, RetryResult};
use crate::{chain::QueryResponse, event::monitor::TxMonitorCmd};

use super::ChainEndpoint;
Expand All @@ -98,6 +99,10 @@ mod retry_strategy {
use crate::util::retry::Fixed;
use std::time::Duration;

// Maximum number of retries for send_tx in the case of
// an account sequence mismatch.
pub const MAX_ACCOUNT_SEQUENCE_RETRY: u32 = 5;

pub fn wait_for_block_commits(max_total_wait: Duration) -> impl Iterator<Item = Duration> {
let backoff_millis = 300; // The periodic backoff
let count: usize = (max_total_wait.as_millis() / backoff_millis as u128) as usize;
Expand Down Expand Up @@ -308,10 +313,11 @@ impl CosmosSdkChain {
self.rt.block_on(f)
}

fn send_tx(&mut self, proto_msgs: Vec<Any>) -> Result<Response, Error> {
crate::time!("send_tx");
let account_seq = self.account_sequence()?;

fn send_tx_with_account_sequence(
&mut self,
proto_msgs: Vec<Any>,
account_seq: u64,
) -> Result<Response, Error> {
debug!(
"[{}] send_tx: sending {} messages using nonce {}",
self.id(),
Expand Down Expand Up @@ -373,13 +379,65 @@ impl CosmosSdkChain {
let mut tx_bytes = Vec::new();
prost::Message::encode(&tx_raw, &mut tx_bytes).unwrap();

let response = self.block_on(broadcast_tx_sync(self, tx_bytes))?;
self.block_on(broadcast_tx_sync(self, tx_bytes))
}

// Try to send_tx with retry on account sequence error.
// An account sequence error can occur if the cached account sequence of
// the relayer somehow get outdated, or when the relayer's wallet is used
// concurrently elsewhere.
// When there is an account sequence mismatch, we refetch the account sequence
// from the chain and retry sending the transaction again with the new sequence.
fn send_tx_with_account_sequence_retry(
&mut self,
proto_msgs: Vec<Any>,
retries: u32,
) -> Result<Response, Error> {
let account_sequence = self.account_sequence()?;

let response = self.send_tx_with_account_sequence(proto_msgs.clone(), account_sequence)?;

debug!("[{}] send_tx: broadcast_tx_sync: {:?}", self.id(), response);

self.incr_account_sequence()?;
match response.code {
Code::Ok => {
self.incr_account_sequence();
Ok(response)
}
// Cosmos SDK defines an account sequence mismatch with the error code 32
Code::Err(32) => {
adizere marked this conversation as resolved.
Show resolved Hide resolved
if retries < retry_strategy::MAX_ACCOUNT_SEQUENCE_RETRY {
warn!("send_tx failed with incorrect account sequence. retrying with account sequence refetched from the chain.");

// Refetch account information from the chain.
adizere marked this conversation as resolved.
Show resolved Hide resolved
// We do not wait between retries as waiting will
// only increase the chance of the account sequence getting
// out of synced again from other sources.
self.refresh_account()?;

self.send_tx_with_account_sequence_retry(proto_msgs, retries + 1)
} else {
// If after the max retry we still get an account sequence mismatch error,
// we ignore the error and return the original response to downstream.
// We do not return an error here, because the current convention
// let the caller handle error responses separately.

error!("failed to send_tx with incorrect account sequence. the relayer's wallet may be used elsewhere concurrently.");

Ok(response)
Ok(response)
}
}
Code::Err(_) => {
// The original code do not convert an error response into error result.
// We may want to refactor this in the future to simplify error handling.
Ok(response)
}
}
}

fn send_tx(&mut self, proto_msgs: Vec<Any>) -> Result<Response, Error> {
crate::time!("send_tx");
self.send_tx_with_account_sequence_retry(proto_msgs, 0)
}

/// The maximum amount of gas the relayer is willing to pay for a transaction
Expand Down Expand Up @@ -520,21 +578,27 @@ impl CosmosSdkChain {
Ok((key, key_bytes))
}

fn account(&mut self) -> Result<&mut BaseAccount, Error> {
fn refresh_account(&mut self) -> Result<(), Error> {
let account = self.block_on(query_account(self, self.key()?.account))?;
debug!(
sequence = %account.sequence,
number = %account.account_number,
"[{}] send_tx: retrieved account",
self.id()
);

self.account = Some(account);
Ok(())
}

fn account(&mut self) -> Result<&BaseAccount, Error> {
if self.account == None {
let account = self.block_on(query_account(self, self.key()?.account))?;
debug!(
sequence = %account.sequence,
number = %account.account_number,
"[{}] send_tx: retrieved account",
self.id()
);
self.account = Some(account);
self.refresh_account()?;
}

Ok(self
.account
.as_mut()
.as_ref()
.expect("account was supposedly just cached"))
}

Expand All @@ -546,9 +610,13 @@ impl CosmosSdkChain {
Ok(self.account()?.sequence)
}

fn incr_account_sequence(&mut self) -> Result<(), Error> {
self.account()?.sequence += 1;
Ok(())
fn incr_account_sequence(&mut self) {
match &mut self.account {
Some(account) => {
account.sequence += 1;
}
None => {}
}
adizere marked this conversation as resolved.
Show resolved Hide resolved
}

fn signer(&self, sequence: u64) -> Result<SignerInfo, Error> {
Expand Down Expand Up @@ -629,8 +697,6 @@ impl CosmosSdkChain {
&self,
mut tx_sync_results: Vec<TxSyncResult>,
) -> Result<Vec<TxSyncResult>, Error> {
use crate::util::retry::{retry_with_index, RetryResult};

let hashes = tx_sync_results
.iter()
.map(|res| res.response.hash.to_string())
Expand Down