From 57c23bb276b210c918d9816b73fb00ae26ac309c Mon Sep 17 00:00:00 2001 From: Vlad Frolov Date: Thu, 30 Jul 2020 21:39:38 +0300 Subject: [PATCH] fix(jsonrpc): Fixed race condition in broadcast_tx_* methods --- chain/jsonrpc/src/lib.rs | 56 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/chain/jsonrpc/src/lib.rs b/chain/jsonrpc/src/lib.rs index 9af8fd5589e..a1fbe5a15a9 100644 --- a/chain/jsonrpc/src/lib.rs +++ b/chain/jsonrpc/src/lib.rs @@ -249,6 +249,39 @@ impl JsonRpcHandler { Ok(Value::String(hash)) } + async fn tx_exists( + &self, + tx_hash: CryptoHash, + signer_account_id: &AccountId, + ) -> Result { + timeout(self.polling_config.polling_timeout, async { + loop { + // TODO(optimization): Introduce a view_client method to only get transaction + // status without the information about execution outcomes. + match self + .view_client_addr + .send(TxStatus { tx_hash, signer_account_id: signer_account_id.clone() }) + .await + { + Ok(Ok(Some(_))) => { + return Ok(true); + } + Ok(Err(TxStatusError::MissingTransaction(_))) => { + return Ok(false); + } + Err(_) => return Err(ServerError::InternalError), + _ => {} + } + delay_for(self.polling_config.polling_interval).await; + } + }) + .await + .map_err(|_| { + near_metrics::inc_counter(&metrics::RPC_TIMEOUT_TOTAL); + ServerError::Timeout + })? + } + async fn tx_status_fetch( &self, tx_info: TransactionInfo, @@ -311,12 +344,17 @@ impl JsonRpcHandler { })? } + /// Send a transaction idempotently (subsequent send of the same transaction will not cause + /// any new side-effects and the result will be the same unless we garbage collected it + /// already). async fn send_tx( &self, tx: SignedTransaction, check_only: bool, ) -> Result { - Ok(self + let tx_hash = tx.get_hash(); + let signer_account_id = tx.transaction.signer_id.clone(); + let response = self .client_addr .send(NetworkClientMessages::Transaction { transaction: tx, @@ -324,7 +362,21 @@ impl JsonRpcHandler { check_only, }) .map_err(|err| RpcError::server_error(Some(ServerError::from(err)))) - .await?) + .await?; + + // If we receive InvalidNonce error, it might be the case that the transaction was + // resubmitted, and we should check if that is the case and return ValidTx response to + // maintain idempotence of the send_tx method. + if let NetworkClientResponses::InvalidTx( + near_primitives::errors::InvalidTxError::InvalidNonce { .. }, + ) = response + { + if self.tx_exists(tx_hash, &signer_account_id).await? { + return Ok(NetworkClientResponses::ValidTx); + } + } + + Ok(response) } async fn send_tx_sync(&self, params: Option) -> Result {