Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate deliver-tx error for all messages in a failed Tx #2334

Merged
merged 11 commits into from
Jul 5, 2022
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Replicate the deliver_tx error for all messages in the Tx
romac marked this conversation as resolved.
Show resolved Hide resolved
([#2333](https://github.com/informalsystems/ibc-rs/issues/2333))
11 changes: 1 addition & 10 deletions modules/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,9 @@ pub enum IbcEvent {

AppModule(ModuleEvent),

Empty(String), // Special event, signifying empty response
ChainError(String), // Special event, signifying an error on CheckTx or DeliverTx
}

impl Default for IbcEvent {
fn default() -> Self {
Self::Empty("".to_string())
}
}

/// For use in debug messages
pub struct PrettyEvents<'a>(pub &'a [IbcEvent]);
impl<'a> fmt::Display for PrettyEvents<'a> {
Expand Down Expand Up @@ -297,7 +290,6 @@ impl fmt::Display for IbcEvent {

IbcEvent::AppModule(ev) => write!(f, "AppModuleEv({:?})", ev),

IbcEvent::Empty(ev) => write!(f, "EmptyEv({})", ev),
IbcEvent::ChainError(ev) => write!(f, "ChainErrorEv({})", ev),
}
}
Expand Down Expand Up @@ -329,7 +321,7 @@ impl TryFrom<IbcEvent> for AbciEvent {
IbcEvent::TimeoutPacket(event) => event.try_into().map_err(Error::channel)?,
IbcEvent::TimeoutOnClosePacket(event) => event.try_into().map_err(Error::channel)?,
IbcEvent::AppModule(event) => event.try_into()?,
IbcEvent::NewBlock(_) | IbcEvent::Empty(_) | IbcEvent::ChainError(_) => {
IbcEvent::NewBlock(_) | IbcEvent::ChainError(_) => {
return Err(Error::incorrect_event_type(event.to_string()))
}
})
Expand Down Expand Up @@ -438,7 +430,6 @@ impl IbcEvent {
IbcEvent::TimeoutPacket(_) => IbcEventType::Timeout,
IbcEvent::TimeoutOnClosePacket(_) => IbcEventType::TimeoutOnClose,
IbcEvent::AppModule(_) => IbcEventType::AppModule,
IbcEvent::Empty(_) => IbcEventType::Empty,
IbcEvent::ChainError(_) => IbcEventType::ChainError,
}
}
Expand Down
7 changes: 1 addition & 6 deletions relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,7 @@ impl EventFilter {
pub fn matches(&self, event: &IbcEvent) -> bool {
match self {
EventFilter::NewBlock => matches!(event, IbcEvent::NewBlock(_)),
EventFilter::Tx => {
!(matches!(
event,
IbcEvent::NewBlock(_) | IbcEvent::Empty(_) | IbcEvent::ChainError(_)
))
}
EventFilter::Tx => !(matches!(event, IbcEvent::NewBlock(_) | IbcEvent::ChainError(_))),
}
}
}
Expand Down
32 changes: 24 additions & 8 deletions relayer/src/chain/cosmos/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tendermint_rpc::endpoint::broadcast::tx_sync::Response;
use crate::chain::cosmos::retry::send_tx_with_account_sequence_retry;
use crate::chain::cosmos::types::account::Account;
use crate::chain::cosmos::types::config::TxConfig;
use crate::chain::cosmos::types::tx::TxSyncResult;
use crate::chain::cosmos::types::tx::{TxStatus, TxSyncResult};
use crate::chain::cosmos::wait::wait_for_block_commits;
use crate::config::types::{MaxMsgNum, MaxTxSize, Memo};
use crate::error::Error;
Expand Down Expand Up @@ -99,18 +99,34 @@ async fn send_messages_as_batches(
let mut tx_sync_results = Vec::new();

for batch in batches {
let events_per_tx = vec![IbcEvent::default(); batch.len()];
let message_count = batch.len();

let response =
send_tx_with_account_sequence_retry(config, key_entry, account, tx_memo, batch, 0)
.await?;

let tx_sync_result = TxSyncResult {
response,
events: events_per_tx,
};

tx_sync_results.push(tx_sync_result);
if response.code.is_err() {
let events_per_tx = vec![IbcEvent::ChainError(format!(
"check_tx (broadcast_tx_sync) on chain {} for Tx hash {} reports error: code={:?}, log={:?}",
config.chain_id, response.hash, response.code, response.log
)); message_count];

let tx_sync_result = TxSyncResult {
response,
events: events_per_tx,
status: TxStatus::ReceivedResponse,
};

tx_sync_results.push(tx_sync_result);
} else {
let tx_sync_result = TxSyncResult {
response,
events: Vec::new(),
status: TxStatus::Pending { message_count },
};

tx_sync_results.push(tx_sync_result);
}
}

Ok(tx_sync_results)
Expand Down
54 changes: 40 additions & 14 deletions relayer/src/chain/cosmos/query/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use ibc::core::ics04_channel::packet::{Packet, Sequence};
use ibc::core::ics24_host::identifier::ChainId;
use ibc::events::{from_tx_response_event, IbcEvent};
use ibc::Height as ICSHeight;
use tendermint::abci::transaction::Hash as TxHash;
use tendermint::abci::Event;
use tendermint_rpc::endpoint::tx::Response as ResultTx;
use tendermint_rpc::endpoint::tx::Response as TxResponse;
use tendermint_rpc::{Client, HttpClient, Order, Url};

use crate::chain::cosmos::query::{header_query, packet_query, tx_hash_query};
use crate::chain::requests::{
QueryClientEventRequest, QueryHeight, QueryPacketEventDataRequest, QueryTxRequest,
QueryClientEventRequest, QueryHeight, QueryPacketEventDataRequest, QueryTxHash, QueryTxRequest,
};
use crate::error::Error;

Expand Down Expand Up @@ -143,7 +144,7 @@ pub async fn query_txs(
fn update_client_from_tx_search_response(
chain_id: &ChainId,
request: &QueryClientEventRequest,
response: ResultTx,
response: TxResponse,
) -> Result<Option<IbcEvent>, Error> {
let height = ICSHeight::new(chain_id.version(), u64::from(response.height))
.map_err(|_| Error::invalid_height_no_source())?;
Expand Down Expand Up @@ -185,7 +186,7 @@ fn packet_from_tx_search_response(
chain_id: &ChainId,
request: &QueryPacketEventDataRequest,
seq: Sequence,
response: ResultTx,
response: TxResponse,
) -> Result<Option<IbcEvent>, Error> {
let height = ICSHeight::new(chain_id.version(), u64::from(response.height))
.map_err(|_| Error::invalid_height_no_source())?;
Expand Down Expand Up @@ -238,21 +239,46 @@ fn filter_matching_event(
}
}

fn all_ibc_events_from_tx_search_response(chain_id: &ChainId, response: ResultTx) -> Vec<IbcEvent> {
pub async fn query_tx_response(
rpc_client: &HttpClient,
rpc_address: &Url,
tx_hash: &TxHash,
) -> Result<Option<TxResponse>, Error> {
let response = rpc_client
.tx_search(
tx_hash_query(&QueryTxHash(*tx_hash)),
false,
1,
1, // get only the first Tx matching the query
Order::Ascending,
)
.await
.map_err(|e| Error::rpc(rpc_address.clone(), e))?;

Ok(response.txs.into_iter().next())
}

fn all_ibc_events_from_tx_search_response(
chain_id: &ChainId,
response: TxResponse,
) -> Vec<IbcEvent> {
let height = ICSHeight::new(chain_id.version(), u64::from(response.height)).unwrap();
let deliver_tx_result = response.tx_result;

if deliver_tx_result.code.is_err() {
return vec![IbcEvent::ChainError(format!(
// We can only return a single ChainError here because at this point
// we have lost information about how many messages were in the transaction
vec![IbcEvent::ChainError(format!(
"deliver_tx for {} reports error: code={:?}, log={:?}",
response.hash, deliver_tx_result.code, deliver_tx_result.log
))];
}
))]
} else {
let result = deliver_tx_result
.events
.iter()
.flat_map(|event| from_tx_response_event(height, event).into_iter())
.collect::<Vec<_>>();

let mut result = vec![];
for event in deliver_tx_result.events {
if let Some(ibc_ev) = from_tx_response_event(height, &event) {
result.push(ibc_ev);
}
result
}
result
}
6 changes: 6 additions & 0 deletions relayer/src/chain/cosmos/types/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@ pub struct SignedTx {
pub signatures: Vec<Vec<u8>>,
}

pub enum TxStatus {
Pending { message_count: usize },
ReceivedResponse,
}

pub struct TxSyncResult {
// the broadcast_tx_sync response
pub response: Response,
// the events generated by a Tx once executed
pub events: Vec<IbcEvent>,
pub status: TxStatus,
}
66 changes: 30 additions & 36 deletions relayer/src/chain/cosmos/wait.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use core::time::Duration;
use ibc::core::ics24_host::identifier::ChainId;
use ibc::events::from_tx_response_event;
use ibc::events::IbcEvent;
use ibc::Height;
use itertools::Itertools;
use std::thread;
use std::time::Instant;
use tendermint_rpc::{HttpClient, Url};
use tracing::{info, trace};

use crate::chain::cosmos::query::tx::query_txs;
use crate::chain::cosmos::types::tx::TxSyncResult;
use crate::chain::requests::{QueryTxHash, QueryTxRequest};
use crate::chain::cosmos::query::tx::query_tx_response;
use crate::chain::cosmos::types::tx::{TxStatus, TxSyncResult};
use crate::error::Error;

const WAIT_BACKOFF: Duration = Duration::from_millis(300);
Expand Down Expand Up @@ -69,46 +70,39 @@ async fn update_tx_sync_result(
rpc_address: &Url,
tx_sync_result: &mut TxSyncResult,
) -> Result<(), Error> {
let TxSyncResult { response, events } = tx_sync_result;

// If this transaction was not committed, determine whether it was because it failed
// or because it hasn't been committed yet.
if empty_event_present(events) {
// If the transaction failed, replace the events with an error,
// so that we don't attempt to resolve the transaction later on.
if response.code.value() != 0 {
*events = vec![IbcEvent::ChainError(format!(
"deliver_tx on chain {} for Tx hash {} reports error: code={:?}, log={:?}",
chain_id, response.hash, response.code, response.log
))];
}

// Otherwise, try to resolve transaction hash to the corresponding events.
let events_per_tx = query_txs(
chain_id,
rpc_client,
rpc_address,
QueryTxRequest::Transaction(QueryTxHash(response.hash)),
)
.await?;

// If we get events back, progress was made, so we replace the events
// with the new ones. in both cases we will check in the next iteration
// whether or not the transaction was fully committed.
if !events_per_tx.is_empty() {
*events = events_per_tx;
if let TxStatus::Pending { message_count } = tx_sync_result.status {
let response =
query_tx_response(rpc_client, rpc_address, &tx_sync_result.response.hash).await?;

if let Some(response) = response {
tx_sync_result.status = TxStatus::ReceivedResponse;

if response.tx_result.code.is_err() {
tx_sync_result.events = vec![
IbcEvent::ChainError(format!(
"deliver_tx for {} reports error: code={:?}, log={:?}",
response.hash, response.tx_result.code, response.tx_result.log
));
message_count
];
} else {
let height = Height::new(chain_id.version(), u64::from(response.height)).unwrap();

tx_sync_result.events = response
.tx_result
.events
.iter()
.flat_map(|event| from_tx_response_event(height, event).into_iter())
.collect::<Vec<_>>();
}
}
}

Ok(())
}

fn empty_event_present(events: &[IbcEvent]) -> bool {
events.iter().any(|ev| matches!(ev, IbcEvent::Empty(_)))
}

fn all_tx_results_found(tx_sync_results: &[TxSyncResult]) -> bool {
tx_sync_results
.iter()
.all(|r| !empty_event_present(&r.events))
.all(|r| matches!(r.status, TxStatus::ReceivedResponse))
}
3 changes: 3 additions & 0 deletions relayer/src/link/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ define_error! {
format!("unexpected query tx response: {}", e.event)
},

UpdateClientEventNotFound
| _ | { "update client event not found in tx response" },

InvalidChannelState
{
channel_id: ChannelId,
Expand Down
2 changes: 1 addition & 1 deletion relayer/src/link/relay_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
match events.first() {
Some(IbcEvent::UpdateClient(event)) => Ok(event.height()),
Some(event) => Err(LinkError::unexpected_event(event.clone())),
None => Err(LinkError::unexpected_event(IbcEvent::default())),
None => Err(LinkError::update_client_event_not_found()),
}
}

Expand Down
66 changes: 66 additions & 0 deletions tools/integration-test/src/tests/error_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use ibc::events::IbcEvent;
use ibc_relayer::chain::tracking::TrackedMsgs;
use ibc_test_framework::prelude::*;
use ibc_test_framework::relayer::transfer::build_transfer_message;

#[test]
fn test_error_events() -> Result<(), Error> {
run_binary_channel_test(&ErrorEventsTest)
}

pub struct ErrorEventsTest;

impl TestOverrides for ErrorEventsTest {}

impl BinaryChannelTest for ErrorEventsTest {
fn run<ChainA: ChainHandle, ChainB: ChainHandle>(
&self,
_config: &TestConfig,
_relayer: RelayerDriver,
chains: ConnectedChains<ChainA, ChainB>,
channel: ConnectedChannel<ChainA, ChainB>,
) -> Result<(), Error> {
let denom_a = chains.node_a.denom();

let wallet_a = chains.node_a.wallets().user1().cloned();
let wallet_b = chains.node_b.wallets().user1().cloned();

let balance_a = chains
.node_a
.chain_driver()
.query_balance(&wallet_a.address(), &denom_a)?;

// Create 4x transfer messages where each transfers
// (1/3 + 1) of the total balance the user has.
// So the third and fourth message should fail.

let transfer_message = build_transfer_message(
&channel.port_a.as_ref(),
&channel.channel_id_a.as_ref(),
&wallet_a.as_ref(),
&wallet_b.address(),
&denom_a,
(balance_a / 3) + 1,
)?;

let messages = TrackedMsgs::new_static(vec![transfer_message; 4], "test_error_events");

let events = chains.handle_a().send_messages_and_wait_commit(messages)?;

// We expect 4 error events to be returned, corresponding to the
// 4 messages sent.

assert_eq!(events.len(), 4);

for event in events {
match event {
IbcEvent::ChainError(_) => {}
_ => {
panic!("expect all events to be error events");
}
}
}

Ok(())
}
}
Loading