From a29c9c70395d64d641545f8f148117b5aa9d4e6d Mon Sep 17 00:00:00 2001 From: Adi Seredinschi Date: Mon, 25 Oct 2021 11:44:37 +0200 Subject: [PATCH 01/29] More structure in logs, pass 1 --- Cargo.lock | 15 +++ relayer/Cargo.toml | 1 + relayer/src/chain.rs | 8 +- relayer/src/chain/cosmos.rs | 18 ++-- relayer/src/chain/handle.rs | 10 +- relayer/src/chain/handle/prod.rs | 9 +- relayer/src/chain/mock.rs | 7 +- relayer/src/chain/runtime.rs | 17 ++-- relayer/src/channel.rs | 48 ++++++++-- relayer/src/connection.rs | 50 ++++++++-- relayer/src/foreign_client.rs | 27 +++++- relayer/src/link/operational_data.rs | 86 +++++++++++++---- relayer/src/link/relay_path.rs | 134 +++++++++++++++------------ relayer/src/link/relay_sender.rs | 8 +- relayer/src/supervisor/spawn.rs | 4 +- relayer/src/transfer.rs | 3 +- relayer/src/upgrade_chain.rs | 3 +- 17 files changed, 316 insertions(+), 132 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e05955cb66..3fe1ab533c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1364,6 +1364,12 @@ dependencies = [ "informalsystems-tonic", "itertools", "k256", +<<<<<<< HEAD +======= + "nanoid", + "prost", + "prost-types", +>>>>>>> c2ad8c2c (More structure in logs, pass 1) "retry", "ripemd160", "semver 1.0.4", @@ -1789,6 +1795,15 @@ dependencies = [ "twoway", ] +[[package]] +name = "nanoid" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ffa00dec017b5b1a8b7cf5e2c008bfda1aa7e0697ac1508b491fdf2622fb4d8" +dependencies = [ + "rand 0.8.4", +] + [[package]] name = "native-tls" version = "0.2.8" diff --git a/relayer/Cargo.toml b/relayer/Cargo.toml index bf5c902aa9..37b7bc5e16 100644 --- a/relayer/Cargo.toml +++ b/relayer/Cargo.toml @@ -65,6 +65,7 @@ fraction = {version = "0.9.0", default-features = false } semver = "1.0" uint = "0.9" humantime = "2.1.0" +nanoid = "0.4.0" [dependencies.tendermint] version = "=0.23.0-internal" diff --git a/relayer/src/chain.rs b/relayer/src/chain.rs index b413b33d6b..b4afd8597d 100644 --- a/relayer/src/chain.rs +++ b/relayer/src/chain.rs @@ -1,5 +1,4 @@ use alloc::sync::Arc; -use prost_types::Any; use tendermint::block::Height; use tokio::runtime::Runtime as TokioRuntime; @@ -41,10 +40,13 @@ use crate::keyring::{KeyEntry, KeyRing}; use crate::light_client::LightClient; use crate::{config::ChainConfig, event::monitor::EventReceiver}; +use self::tx::TrackedMsgs; + pub(crate) mod cosmos; pub mod counterparty; pub mod handle; pub mod runtime; +pub mod tx; #[cfg(test)] pub mod mock; @@ -121,14 +123,14 @@ pub trait ChainEndpoint: Sized { // synchronously wait for it to be committed. fn send_messages_and_wait_commit( &mut self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error>; /// Sends one or more transactions with `msgs` to chain. /// Non-blocking alternative to `send_messages_and_wait_commit` interface. fn send_messages_and_wait_check_tx( &mut self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error>; fn get_signer(&mut self) -> Result; diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 9d26b5f0a6..6662187698 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -24,7 +24,7 @@ use tendermint_rpc::query::{EventType, Query}; use tendermint_rpc::{endpoint::broadcast::tx_sync::Response, Client, HttpClient, Order}; use tokio::runtime::Runtime as TokioRuntime; use tonic::codegen::http::Uri; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, error, span, trace, warn, Level}; use ibc::events::{from_tx_response_event, IbcEvent}; use ibc::ics02_client::client_consensus::{ @@ -83,7 +83,7 @@ use crate::{ sdk_error::sdk_error_from_tx_sync_error_code, }; -use super::{ChainEndpoint, HealthCheck}; +use super::{tx::TrackedMsgs, ChainEndpoint, HealthCheck}; mod compatibility; @@ -842,9 +842,10 @@ impl ChainEndpoint for CosmosSdkChain { /// msgs in a Tx until any of the max size, max num msgs, max fee are exceeded. fn send_messages_and_wait_commit( &mut self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error> { crate::time!("send_messages_and_wait_commit"); + let proto_msgs = tracked_msgs.msgs; debug!( "send_messages_and_wait_commit with {} messages", proto_msgs.len() @@ -898,13 +899,14 @@ impl ChainEndpoint for CosmosSdkChain { fn send_messages_and_wait_check_tx( &mut self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error> { crate::time!("send_messages_and_wait_check_tx"); - debug!( - "send_messages_and_wait_check_tx with {} messages", - proto_msgs.len() - ); + + let span = span!(Level::DEBUG, "send", id = %tracked_msgs); + let _enter = span.enter(); + + let proto_msgs = tracked_msgs.msgs; if proto_msgs.is_empty() { return Ok(vec![]); diff --git a/relayer/src/chain/handle.rs b/relayer/src/chain/handle.rs index 52f25fd82c..80ff4afaf8 100644 --- a/relayer/src/chain/handle.rs +++ b/relayer/src/chain/handle.rs @@ -49,7 +49,7 @@ use crate::{ keyring::KeyEntry, }; -use super::HealthCheck; +use super::{tx::TrackedMsgs, HealthCheck}; mod prod; @@ -105,12 +105,12 @@ pub enum ChainRequest { }, SendMessagesAndWaitCommit { - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, reply_to: ReplyTo>, }, SendMessagesAndWaitCheckTx { - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, reply_to: ReplyTo>, }, @@ -333,7 +333,7 @@ pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug { /// and return the list of events emitted by the chain after the transaction was committed. fn send_messages_and_wait_commit( &self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error>; /// Submit messages asynchronously. @@ -342,7 +342,7 @@ pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug { /// returns a set of transaction hashes. fn send_messages_and_wait_check_tx( &self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error>; fn get_signer(&self) -> Result; diff --git a/relayer/src/chain/handle/prod.rs b/relayer/src/chain/handle/prod.rs index 0397c6d812..667fd2605a 100644 --- a/relayer/src/chain/handle/prod.rs +++ b/relayer/src/chain/handle/prod.rs @@ -36,6 +36,7 @@ use ibc_proto::ibc::core::commitment::v1::MerkleProof; use ibc_proto::ibc::core::connection::v1::QueryClientConnectionsRequest; use ibc_proto::ibc::core::connection::v1::QueryConnectionsRequest; +use crate::chain::tx::TrackedMsgs; use crate::{connection::ConnectionMsgType, error::Error, keyring::KeyEntry}; use super::{reply_channel, ChainHandle, ChainRequest, HealthCheck, ReplyTo, Subscription}; @@ -94,20 +95,20 @@ impl ChainHandle for ProdChainHandle { fn send_messages_and_wait_commit( &self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error> { self.send(|reply_to| ChainRequest::SendMessagesAndWaitCommit { - proto_msgs, + tracked_msgs, reply_to, }) } fn send_messages_and_wait_check_tx( &self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error> { self.send(|reply_to| ChainRequest::SendMessagesAndWaitCheckTx { - proto_msgs, + tracked_msgs, reply_to, }) } diff --git a/relayer/src/chain/mock.rs b/relayer/src/chain/mock.rs index 06be65b9e1..a0adbcf2bf 100644 --- a/relayer/src/chain/mock.rs +++ b/relayer/src/chain/mock.rs @@ -46,6 +46,7 @@ use crate::keyring::{KeyEntry, KeyRing}; use crate::light_client::Verified; use crate::light_client::{mock::LightClient as MockLightClient, LightClient}; +use super::tx::TrackedMsgs; use super::HealthCheck; /// The representation of a mocked chain as the relayer sees it. @@ -125,17 +126,17 @@ impl ChainEndpoint for MockChain { fn send_messages_and_wait_commit( &mut self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error> { // Use the ICS18Context interface to submit the set of messages. - let events = self.context.send(proto_msgs).map_err(Error::ics18)?; + let events = self.context.send(tracked_msgs.msgs).map_err(Error::ics18)?; Ok(events) } fn send_messages_and_wait_check_tx( &mut self, - _proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error> { todo!() } diff --git a/relayer/src/chain/runtime.rs b/relayer/src/chain/runtime.rs index 637dcee7ff..012eefb95e 100644 --- a/relayer/src/chain/runtime.rs +++ b/relayer/src/chain/runtime.rs @@ -55,6 +55,7 @@ use crate::{ use super::{ handle::{ChainHandle, ChainRequest, ReplyTo, Subscription}, + tx::TrackedMsgs, ChainEndpoint, HealthCheck, }; @@ -204,12 +205,12 @@ where self.subscribe(reply_to)? }, - Ok(ChainRequest::SendMessagesAndWaitCommit { proto_msgs, reply_to }) => { - self.send_messages_and_wait_commit(proto_msgs, reply_to)? + Ok(ChainRequest::SendMessagesAndWaitCommit { tracked_msgs, reply_to }) => { + self.send_messages_and_wait_commit(tracked_msgs, reply_to)? }, - Ok(ChainRequest::SendMessagesAndWaitCheckTx { proto_msgs, reply_to }) => { - self.send_messages_and_wait_check_tx(proto_msgs, reply_to)? + Ok(ChainRequest::SendMessagesAndWaitCheckTx { tracked_msgs, reply_to }) => { + self.send_messages_and_wait_check_tx(tracked_msgs, reply_to)? }, Ok(ChainRequest::Signer { reply_to }) => { @@ -377,19 +378,19 @@ where fn send_messages_and_wait_commit( &mut self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, reply_to: ReplyTo>, ) -> Result<(), Error> { - let result = self.chain.send_messages_and_wait_commit(proto_msgs); + let result = self.chain.send_messages_and_wait_commit(tracked_msgs); reply_to.send(result).map_err(Error::send) } fn send_messages_and_wait_check_tx( &mut self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, reply_to: ReplyTo>, ) -> Result<(), Error> { - let result = self.chain.send_messages_and_wait_check_tx(proto_msgs); + let result = self.chain.send_messages_and_wait_check_tx(tracked_msgs); reply_to.send(result).map_err(Error::send) } diff --git a/relayer/src/channel.rs b/relayer/src/channel.rs index 44a35294b3..c664f5be93 100644 --- a/relayer/src/channel.rs +++ b/relayer/src/channel.rs @@ -20,6 +20,7 @@ use ibc_proto::ibc::core::channel::v1::QueryConnectionChannelsRequest; use crate::chain::counterparty::{channel_connection_client, channel_state_on_destination}; use crate::chain::handle::ChainHandle; +use crate::chain::tx::TrackedMsgs; use crate::connection::Connection; use crate::foreign_client::ForeignClient; use crate::object::Channel as WorkerChannelObject; @@ -701,9 +702,15 @@ impl Channel { pub fn build_chan_open_init_and_send(&self) -> Result { let dst_msgs = self.build_chan_open_init()?; + // TODO(ADI) + let tm = TrackedMsgs { + msgs: dst_msgs, + tracking_nr: "".into(), + }; + let events = self .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ChannelError::submit(self.dst_chain().id(), e))?; // Find the relevant event for channel open init @@ -858,9 +865,15 @@ impl Channel { pub fn build_chan_open_try_and_send(&self) -> Result { let dst_msgs = self.build_chan_open_try()?; + // TODO(ADI) + let tm = TrackedMsgs { + msgs: dst_msgs, + tracking_nr: "".into(), + }; + let events = self .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ChannelError::submit(self.dst_chain().id(), e))?; // Find the relevant event for channel open try @@ -943,9 +956,15 @@ impl Channel { ) -> Result { let dst_msgs = channel.build_chan_open_ack()?; + // TODO(ADI) + let tm = TrackedMsgs { + msgs: dst_msgs, + tracking_nr: "".into(), + }; + let events = channel .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ChannelError::submit(channel.dst_chain().id(), e))?; // Find the relevant event for channel open ack @@ -1039,9 +1058,14 @@ impl Channel { ) -> Result { let dst_msgs = channel.build_chan_open_confirm()?; + // TODO(ADI) + let tm = TrackedMsgs { + msgs: dst_msgs, + tracking_nr: "".into(), + }; let events = channel .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ChannelError::submit(channel.dst_chain().id(), e))?; // Find the relevant event for channel open confirm @@ -1102,9 +1126,15 @@ impl Channel { pub fn build_chan_close_init_and_send(&self) -> Result { let dst_msgs = self.build_chan_close_init()?; + // TODO(ADI) + let tm = TrackedMsgs { + msgs: dst_msgs, + tracking_nr: "".into(), + }; + let events = self .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ChannelError::submit(self.dst_chain().id(), e))?; // Find the relevant event for channel close init @@ -1181,9 +1211,15 @@ impl Channel { pub fn build_chan_close_confirm_and_send(&self) -> Result { let dst_msgs = self.build_chan_close_confirm()?; + // TODO(ADI) + let tm = TrackedMsgs { + msgs: dst_msgs, + tracking_nr: "".into(), + }; + let events = self .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ChannelError::submit(self.dst_chain().id(), e))?; // Find the relevant event for channel close confirm diff --git a/relayer/src/connection.rs b/relayer/src/connection.rs index 9a379930a8..39ed3789a2 100644 --- a/relayer/src/connection.rs +++ b/relayer/src/connection.rs @@ -1,6 +1,7 @@ use core::time::Duration; use crate::chain::counterparty::connection_state_on_destination; +use crate::chain::tx::TrackedMsgs; use crate::util::retry::RetryResult; use flex_error::define_error; use ibc_proto::ibc::core::connection::v1::QueryConnectionsRequest; @@ -754,9 +755,15 @@ impl Connection { pub fn build_conn_init_and_send(&self) -> Result { let dst_msgs = self.build_conn_init()?; + // TODO(ADI) + let tm = TrackedMsgs { + msgs: dst_msgs, + tracking_nr: "".into(), + }; + let events = self .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?; // Find the relevant event for connection init @@ -811,8 +818,14 @@ impl Connection { .query_latest_height() .map_err(|e| ConnectionError::chain_query(self.dst_chain().id(), e))?; let client_msgs = self.build_update_client_on_src(src_client_target_height)?; + + // TODO(ADI) + let tm = TrackedMsgs { + msgs: client_msgs, + tracking_nr: "".into(), + }; self.src_chain() - .send_messages_and_wait_commit(client_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ConnectionError::submit(self.src_chain().id(), e))?; let query_height = self @@ -881,9 +894,15 @@ impl Connection { pub fn build_conn_try_and_send(&self) -> Result { let dst_msgs = self.build_conn_try()?; + // TODO(ADI) + let tm = TrackedMsgs { + msgs: dst_msgs, + tracking_nr: "".into(), + }; + let events = self .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?; // Find the relevant event for connection try transaction @@ -928,8 +947,15 @@ impl Connection { .query_latest_height() .map_err(|e| ConnectionError::chain_query(self.dst_chain().id(), e))?; let client_msgs = self.build_update_client_on_src(src_client_target_height)?; + + // TODO(ADI) + let tm = TrackedMsgs { + msgs: client_msgs, + tracking_nr: "".into(), + }; + self.src_chain() - .send_messages_and_wait_commit(client_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ConnectionError::submit(self.src_chain().id(), e))?; let query_height = self @@ -972,9 +998,15 @@ impl Connection { pub fn build_conn_ack_and_send(&self) -> Result { let dst_msgs = self.build_conn_ack()?; + // TODO(ADI) + let tm = TrackedMsgs { + msgs: dst_msgs, + tracking_nr: "".into(), + }; + let events = self .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?; // Find the relevant event for connection ack @@ -1049,9 +1081,15 @@ impl Connection { pub fn build_conn_confirm_and_send(&self) -> Result { let dst_msgs = self.build_conn_confirm()?; + // TODO(ADI) + let tm = TrackedMsgs { + msgs: dst_msgs, + tracking_nr: "".into(), + }; + let events = self .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?; // Find the relevant event for connection confirm diff --git a/relayer/src/foreign_client.rs b/relayer/src/foreign_client.rs index 984dd6a123..9d27f96817 100644 --- a/relayer/src/foreign_client.rs +++ b/relayer/src/foreign_client.rs @@ -7,6 +7,7 @@ use itertools::Itertools; use prost_types::Any; use tracing::{debug, error, info, trace, warn}; +use crate::chain::tx::TrackedMsgs; use crate::error::Error as RelayerError; use flex_error::define_error; use ibc::downcast; @@ -392,9 +393,15 @@ impl ForeignClient ForeignClient ForeignClient ForeignClient, + pub tracking_nr: String, +} + +impl TrackedEvents { + pub fn is_empty(&self) -> bool { + self.list.is_empty() + } + + pub fn len(&self) -> usize { + self.list.len() + } + + pub fn set_height(&mut self, height: Height) { + for event in self.list.iter_mut() { + event.set_height(height); + } + } +} + +impl From> for TrackedEvents { + fn from(list: Vec) -> Self { + Self { + list, + tracking_nr: nanoid!(10), + } + } +} + +impl fmt::Display for TrackedEvents { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.tracking_nr) + } +} + +/// A packet message that is prepared for sending +/// to a chain, but has not been sent yet. +/// +/// Comprises the proto-encoded packet message, +/// alongside the event which generated it. #[derive(Clone)] pub struct TransitMessage { pub event: IbcEvent, @@ -50,38 +93,45 @@ pub struct OperationalData { /// Stores the time when the clients on the target chain has been updated, i.e., when this data /// was scheduled. Necessary for packet delays. pub scheduled_time: Instant, + pub tracking_nr: String, } impl OperationalData { - pub fn new(proofs_height: Height, target: OperationalDataTarget) -> Self { + pub fn new(proofs_height: Height, target: OperationalDataTarget, tn: String) -> Self { OperationalData { proofs_height, batch: vec![], target, scheduled_time: Instant::now(), + tracking_nr: tn, } } - pub fn events(&self) -> Vec { - self.batch.iter().map(|gm| gm.event.clone()).collect() + pub fn events(&self) -> TrackedEvents { + let list = self.batch.iter().map(|gm| gm.event.clone()).collect(); + TrackedEvents { + list, + tracking_nr: self.tracking_nr.clone(), + } } - /// Returns all the messages in this operational data, plus prepending the client update message + /// Returns all the messages in this operational + /// data, plus prepending the client update message /// if necessary. pub fn assemble_msgs( &self, relay_path: &RelayPath, - ) -> Result, LinkError> { + ) -> Result { if self.batch.is_empty() { warn!("assemble_msgs() method call on an empty OperationalData!"); - return Ok(vec![]); + return Ok(TrackedMsgs::empty()); } // For zero delay we prepend the client update msgs. let client_update_msg = if relay_path.zero_delay() { let update_height = self.proofs_height.increment(); - info!( + debug!( "[{}] prepending {} client update @ height {}", relay_path, self.target, update_height ); @@ -109,13 +159,14 @@ impl OperationalData { None => self.batch.iter().map(|gm| gm.msg.clone()).collect(), }; - info!( - "[{}] assembled batch of {} message(s)", - relay_path, - msgs.len() - ); + let tm = TrackedMsgs { + msgs, + tracking_nr: self.tracking_nr.clone(), + }; + + info!("[{}] assembled batch of {} msgs", relay_path, tm.msgs.len()); - Ok(msgs) + Ok(tm) } } @@ -123,7 +174,8 @@ impl fmt::Display for OperationalData { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "Op.Data [->{} @{}; {} event(s) & msg(s) in batch]", + "{} ->{} @{}; len={}", + self.tracking_nr, self.target, self.proofs_height, self.batch.len(), diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 5f8a9624e0..79069e0ce7 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -9,7 +9,7 @@ use chrono::Utc; use ibc::timestamp::Timestamp; use itertools::Itertools; use prost_types::Any; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info, span, trace, warn, Level}; use ibc::{ events::{IbcEvent, PrettyEvents, WithBlockDataType}, @@ -37,12 +37,15 @@ use crate::chain::counterparty::{ unreceived_acknowledgements_sequences, unreceived_packets_sequences, }; use crate::chain::handle::ChainHandle; +use crate::chain::tx::TrackedMsgs; use crate::channel::error::ChannelError; use crate::channel::Channel; use crate::event::monitor::EventBatch; use crate::foreign_client::{ForeignClient, ForeignClientError}; use crate::link::error::{self, LinkError}; -use crate::link::operational_data::{OperationalData, OperationalDataTarget, TransitMessage}; +use crate::link::operational_data::{ + OperationalData, OperationalDataTarget, TrackedEvents, TransitMessage, +}; use crate::link::pending::PendingTxs; use crate::link::relay_sender::{AsyncReply, SubmitReply}; use crate::link::relay_summary::RelaySummary; @@ -243,7 +246,7 @@ impl RelayPath { // Determines if the events received are relevant and should be processed. // Only events for a port/channel matching one of the channel ends should be processed. - fn filter_relaying_events(&self, events: Vec) -> Vec { + fn filter_relaying_events(&self, events: Vec) -> TrackedEvents { let src_channel_id = self.src_channel_id(); let mut result = vec![]; @@ -281,7 +284,9 @@ impl RelayPath { _ => {} } } - result + + // Transform into tracked `Events` + result.into() } fn relay_pending_packets(&self, height: Option) -> Result<(), LinkError> { @@ -310,6 +315,9 @@ impl RelayPath { force: bool, ) -> Result<(), LinkError> { if self.should_clear_packets() || force { + let span = span!(Level::DEBUG, "clear", f = ?force); + let _enter = span.enter(); + // Disable further clearing of old packets by default. // Clearing may still happen: upon new blocks, when `force = true`. self.clear_packets.replace(false); @@ -318,11 +326,9 @@ impl RelayPath { .map(|h| h.decrement().map_err(|e| LinkError::decrement_height(h, e))) .transpose()?; - info!(height = ?clear_height, "[{}] clearing pending packets", self); - self.relay_pending_packets(clear_height)?; - info!(height = ?clear_height, "[{}] finished scheduling pending packets clearing", self); + debug!(height = ?clear_height, "[{}] done scheduling", self); } Ok(()) @@ -338,7 +344,7 @@ impl RelayPath { } /// Produces and schedules operational data for this relaying path based on the input events. - fn events_to_operational_data(&self, events: Vec) -> Result<(), LinkError> { + fn events_to_operational_data(&self, events: TrackedEvents) -> Result<(), LinkError> { // Obtain the operational data for the source chain (mostly timeout packets) and for the // destination chain (e.g., receive packet messages). let (src_opt, dst_opt) = self.generate_operational_data(events)?; @@ -363,15 +369,12 @@ impl RelayPath { /// or `MsgTimeout`). fn generate_operational_data( &self, - input: Vec, + events: TrackedEvents, ) -> Result<(Option, Option), LinkError> { - if !input.is_empty() { - info!( - "[{}] generate messages from batch with {} events", - self, - input.len() - ); - } + let span = span!(Level::DEBUG, "generate", id = %events); + let _enter = span.enter(); + + let input = events.list; let src_height = match input.get(0) { None => return Ok((None, None)), Some(ev) => ev.height(), @@ -379,9 +382,17 @@ impl RelayPath { let dst_height = self.dst_latest_height()?; // Operational data targeting the source chain (e.g., Timeout packets) - let mut src_od = OperationalData::new(dst_height, OperationalDataTarget::Source); + let mut src_od = OperationalData::new( + dst_height, + OperationalDataTarget::Source, + events.tracking_nr.clone(), + ); // Operational data targeting the destination chain (e.g., SendPacket messages) - let mut dst_od = OperationalData::new(src_height, OperationalDataTarget::Destination); + let mut dst_od = OperationalData::new( + src_height, + OperationalDataTarget::Destination, + events.tracking_nr.clone(), + ); for event in input { debug!("[{}] {} => {}", self, self.src_chain().id(), event); @@ -490,15 +501,15 @@ impl RelayPath { initial_od: OperationalData, ) -> Result { // We will operate on potentially different operational data if the initial one fails. + let span = span!(Level::INFO, "relay", id = %initial_od); + let _enter = span.enter(); + let mut odata = initial_od; for i in 0..MAX_RETRIES { - info!( - "[{}] relay op. data of {} msgs(s) to {} (height {}), delayed by: {:?} [try {}/{}]", + debug!( + "[{}] delayed by: {:?} [try {}/{}]", self, - odata.batch.len(), - odata.target, - odata.proofs_height.increment(), odata.scheduled_time.elapsed(), i + 1, MAX_RETRIES @@ -554,7 +565,7 @@ impl RelayPath { &self, initial_odata: OperationalData, ) -> Option { - info!( + warn!( "[{}] failed. Regenerate operational data from {} events", self, initial_odata.events().len() @@ -745,9 +756,15 @@ impl RelayPath { i + 1, MAX_RETRIES, ); + // TODO(ADI) Fill in the tn. + let tm = TrackedMsgs { + msgs: dst_update, + tracking_nr: "".into(), + }; + let dst_tx_events = self .dst_chain() - .send_messages_and_wait_commit(dst_update) + .send_messages_and_wait_commit(tm) .map_err(LinkError::relayer)?; info!("[{}] result {}\n", self, PrettyEvents(&dst_tx_events)); @@ -786,9 +803,15 @@ impl RelayPath { dst_chain_height, ); + // TODO(ADI) Fill in the tn. + let tm = TrackedMsgs { + msgs: src_update, + tracking_nr: "".into(), + }; + let src_tx_events = self .src_chain() - .send_messages_and_wait_commit(src_update) + .send_messages_and_wait_commit(tm) .map_err(LinkError::relayer)?; info!("[{}] result {}\n", self, PrettyEvents(&src_tx_events)); @@ -812,7 +835,7 @@ impl RelayPath { fn target_height_and_send_packet_events( &self, opt_query_height: Option, - ) -> Result<(Vec, Height), LinkError> { + ) -> Result<(TrackedEvents, Height), LinkError> { let mut events_result = vec![]; let src_channel_id = self.src_channel_id(); @@ -832,7 +855,7 @@ impl RelayPath { let sequences: Vec = sequences.into_iter().map(From::from).collect(); if sequences.is_empty() { - return Ok((events_result, query_height)); + return Ok((events_result.into(), query_height)); } debug!( @@ -921,7 +944,7 @@ impl RelayPath { ); } - Ok((events_result, query_height)) + Ok((events_result.into(), query_height)) } /// Returns relevant packet events for building ack messages. @@ -929,7 +952,7 @@ impl RelayPath { fn target_height_and_write_ack_events( &self, opt_query_height: Option, - ) -> Result<(Vec, Height), LinkError> { + ) -> Result<(TrackedEvents, Height), LinkError> { let mut events_result = vec![]; let src_channel_id = self.src_channel_id(); @@ -950,7 +973,7 @@ impl RelayPath { let sequences: Vec = sequences.into_iter().map(From::from).collect(); if sequences.is_empty() { - return Ok((events_result, query_height)); + return Ok((events_result.into(), query_height)); } debug!( @@ -1007,7 +1030,7 @@ impl RelayPath { info!("[{}] found unprocessed WriteAcknowledgement events for {:?} (first 10 shown here; total={})", self, packet_sequences, events_result.len()); } - Ok((events_result, query_height)) + Ok((events_result.into(), query_height)) } /// Schedules the relaying of RecvPacket and Timeout messages. @@ -1027,9 +1050,7 @@ impl RelayPath { return Ok(()); } - for event in events.iter_mut() { - event.set_height(height); - } + events.set_height(height); self.events_to_operational_data(events)?; @@ -1050,9 +1071,7 @@ impl RelayPath { return Ok(()); } - for event in events.iter_mut() { - event.set_height(height); - } + events.set_height(height); self.events_to_operational_data(events)?; Ok(()) @@ -1307,6 +1326,9 @@ impl RelayPath { return Ok(()); } + let span = span!(Level::INFO, "refresh"); + let _enter = span.enter(); + let dst_current_height = self.dst_latest_height()?; // Intermediary data struct to help better manage the transfer from dst. operational data @@ -1327,17 +1349,11 @@ impl RelayPath { IbcEvent::SendPacket(e) => { // Catch any SendPacket event that timed-out if self.send_packet_event_handled(e)? { - debug!( - "[{}] refreshing schedule: already handled send packet {}", - self, e - ); + debug!("[{}] already handled send packet {}", self, e); } else if let Some(new_msg) = self.build_timeout_from_send_packet_event(e, dst_current_height)? { - debug!( - "[{}] refreshing schedule: found a timed-out msg in the op data {}", - self, odata - ); + debug!("[{}] found a timed-out msg in the op data {}", self, odata); timed_out.entry(odata_pos).or_insert_with(Vec::new).push( TransitMessage { event: event.clone(), @@ -1351,10 +1367,7 @@ impl RelayPath { } IbcEvent::WriteAcknowledgement(e) => { if self.write_ack_event_handled(e)? { - debug!( - "[{}] refreshing schedule: already handled {} write ack ", - self, e - ); + debug!("[{}] already handled {} write ack ", self, e); } else { retain_batch.push(gm.clone()); } @@ -1383,12 +1396,12 @@ impl RelayPath { // Schedule new operational data targeting the source chain for (_, batch) in timed_out.into_iter() { let mut new_od = - OperationalData::new(dst_current_height, OperationalDataTarget::Source); + OperationalData::new(dst_current_height, OperationalDataTarget::Source, todo!()); new_od.batch = batch; info!( - "[{}] refreshing schedule: re-scheduling from new timed-out batch of size {}", + "[{}] re-scheduling from new timed-out batch of size {}", self, new_od.batch.len() ); @@ -1403,6 +1416,9 @@ impl RelayPath { /// If the relaying path has non-zero packet delays, this method also updates the client on the /// target chain with the appropriate headers. fn schedule_operational_data(&self, mut od: OperationalData) -> Result<(), LinkError> { + let span = span!(Level::INFO, "schedule", id = %od); + let _enter = span.enter(); + if od.batch.is_empty() { info!( "[{}] ignoring operational data for {} because it has no messages", @@ -1411,21 +1427,19 @@ impl RelayPath { return Ok(()); } - info!( - "[{}] scheduling op. data with {} msg(s) for {} (height {})", - self, - od.batch.len(), - od.target, - od.proofs_height.increment(), // increment for easier correlation with the client logs - ); - // Update clients ahead of scheduling the operational data, if the delays are non-zero. if !self.zero_delay() { + debug!("[{}] connection delay is non-zero: updating client", self); let target_height = od.proofs_height.increment(); match od.target { OperationalDataTarget::Source => self.update_client_src(target_height)?, OperationalDataTarget::Destination => self.update_client_dst(target_height)?, }; + } else { + debug!( + "[{}] connection delay is zero: client update message will be prepended later", + self + ); } od.scheduled_time = Instant::now(); diff --git a/relayer/src/link/relay_sender.rs b/relayer/src/link/relay_sender.rs index fd87e4f224..aac599c4f8 100644 --- a/relayer/src/link/relay_sender.rs +++ b/relayer/src/link/relay_sender.rs @@ -1,12 +1,12 @@ use core::fmt; -use prost_types::Any; use tendermint_rpc::endpoint::broadcast::tx_sync; use tracing::info; use ibc::events::{IbcEvent, PrettyEvents}; use crate::chain::handle::ChainHandle; +use crate::chain::tx::TrackedMsgs; use crate::link::error::LinkError; use crate::link::RelaySummary; @@ -24,7 +24,7 @@ impl SubmitReply for RelaySummary { pub trait Submit { type Reply: SubmitReply; - fn submit(target: &impl ChainHandle, msgs: Vec) -> Result; + fn submit(target: &impl ChainHandle, msgs: TrackedMsgs) -> Result; } /// Synchronous sender @@ -36,7 +36,7 @@ impl Submit for SyncSender { // TODO: Switch from the `Chain::send_msgs` interface in this method // to use `Chain::submit_msgs` instead; implement waiting for block // commits directly here (instead of blocking in the chain runtime). - fn submit(target: &impl ChainHandle, msgs: Vec) -> Result { + fn submit(target: &impl ChainHandle, msgs: TrackedMsgs) -> Result { let tx_events = target .send_messages_and_wait_commit(msgs) .map_err(LinkError::relayer)?; @@ -76,7 +76,7 @@ pub struct AsyncSender; impl Submit for AsyncSender { type Reply = AsyncReply; - fn submit(target: &impl ChainHandle, msgs: Vec) -> Result { + fn submit(target: &impl ChainHandle, msgs: TrackedMsgs) -> Result { let a = target .send_messages_and_wait_check_tx(msgs) .map_err(LinkError::relayer)?; diff --git a/relayer/src/supervisor/spawn.rs b/relayer/src/supervisor/spawn.rs index 9ca9623212..6f0a8d2450 100644 --- a/relayer/src/supervisor/spawn.rs +++ b/relayer/src/supervisor/spawn.rs @@ -1,5 +1,5 @@ use itertools::Itertools; -use tracing::{debug, error, warn}; +use tracing::{debug, error, trace, warn}; use ibc::{ ics02_client::client_state::{ClientState, IdentifiedAnyClientState}, @@ -416,7 +416,7 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { ); if conn_state_src.is_open() && conn_state_dst.is_open() { - debug!( + trace!( "connection {} on chain {} is already open, not spawning Connection worker", connection.connection_id, chain.id() diff --git a/relayer/src/transfer.rs b/relayer/src/transfer.rs index c3bda4a4d1..a01ba3359f 100644 --- a/relayer/src/transfer.rs +++ b/relayer/src/transfer.rs @@ -13,6 +13,7 @@ use ibc::Height; use uint::FromStrRadixErr; use crate::chain::handle::ChainHandle; +use crate::chain::tx::TrackedMsgs; use crate::config::ChainConfig; use crate::error::Error; use crate::util::bigint::U256; @@ -132,7 +133,7 @@ pub fn build_and_send_transfer_messages( let msgs = vec![raw_msg; opts.number_msgs]; let events = packet_src_chain - .send_messages_and_wait_commit(msgs) + .send_messages_and_wait_commit(TrackedMsgs::new(msgs)) .map_err(|e| PacketError::submit(packet_src_chain.id(), e))?; // Check if the chain rejected the transaction diff --git a/relayer/src/upgrade_chain.rs b/relayer/src/upgrade_chain.rs index 6c14cf2fa1..e2b22d41a2 100644 --- a/relayer/src/upgrade_chain.rs +++ b/relayer/src/upgrade_chain.rs @@ -15,6 +15,7 @@ use ibc_proto::cosmos::gov::v1beta1::MsgSubmitProposal; use ibc_proto::cosmos::upgrade::v1beta1::{Plan, SoftwareUpgradeProposal}; use ibc_proto::ibc::core::client::v1::UpgradeProposal; +use crate::chain::tx::TrackedMsgs; use crate::chain::{ChainEndpoint, CosmosSdkChain}; use crate::config::ChainConfig; use crate::error::Error; @@ -132,7 +133,7 @@ pub fn build_and_send_ibc_upgrade_proposal( }; let events = dst_chain - .send_messages_and_wait_commit(vec![any_msg]) + .send_messages_and_wait_commit(TrackedMsgs::new_single_msg(any_msg)) .map_err(|e| UpgradeChainError::submit(dst_chain.id().clone(), e))?; // Check if the chain rejected the transaction From 4df293f0807a298fce8ded058af72ec83a060f35 Mon Sep 17 00:00:00 2001 From: Adi Seredinschi Date: Mon, 25 Oct 2021 11:56:55 +0200 Subject: [PATCH 02/29] Pass 2 --- Cargo.lock | 5 ----- relayer/src/chain/tx.rs | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 5 deletions(-) create mode 100644 relayer/src/chain/tx.rs diff --git a/Cargo.lock b/Cargo.lock index 3fe1ab533c..24b7770588 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1364,12 +1364,7 @@ dependencies = [ "informalsystems-tonic", "itertools", "k256", -<<<<<<< HEAD -======= "nanoid", - "prost", - "prost-types", ->>>>>>> c2ad8c2c (More structure in logs, pass 1) "retry", "ripemd160", "semver 1.0.4", diff --git a/relayer/src/chain/tx.rs b/relayer/src/chain/tx.rs new file mode 100644 index 0000000000..1e7b6d3e6c --- /dev/null +++ b/relayer/src/chain/tx.rs @@ -0,0 +1,33 @@ +use core::fmt; + +use prost_types::Any; + +#[derive(Debug, Clone)] +pub struct TrackedMsgs { + pub msgs: Vec, + pub tracking_nr: String, +} + +// TODO(Adi): The tracking nr should never be empty +impl TrackedMsgs { + pub fn new(msgs: Vec) -> Self { + Self { + msgs, + tracking_nr: "".into(), + } + } + + pub fn new_single_msg(msg: Any) -> Self { + Self::new(vec![msg]) + } + + pub fn empty() -> Self { + Self::new(vec![]) + } +} + +impl fmt::Display for TrackedMsgs { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}; len={}", self.tracking_nr, self.msgs.len()) + } +} From 5ae8e42912b4922ebb2d4c6cec26f9e14468a3ff Mon Sep 17 00:00:00 2001 From: Adi Seredinschi Date: Mon, 25 Oct 2021 12:55:03 +0200 Subject: [PATCH 03/29] Pass 3 --- relayer/src/chain/mock.rs | 3 +-- relayer/src/link/operational_data.rs | 4 ++++ relayer/src/link/relay_path.rs | 17 +++++++---------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/relayer/src/chain/mock.rs b/relayer/src/chain/mock.rs index a0adbcf2bf..7e13a43f84 100644 --- a/relayer/src/chain/mock.rs +++ b/relayer/src/chain/mock.rs @@ -3,7 +3,6 @@ use core::ops::Add; use core::time::Duration; use crossbeam_channel as channel; -use prost_types::Any; use tendermint_testgen::light_block::TmLightBlock; use tokio::runtime::Runtime; @@ -136,7 +135,7 @@ impl ChainEndpoint for MockChain { fn send_messages_and_wait_check_tx( &mut self, - tracked_msgs: TrackedMsgs, + _tracked_msgs: TrackedMsgs, ) -> Result, Error> { todo!() } diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index 59709e18cf..b5baced2f0 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -107,6 +107,10 @@ impl OperationalData { } } + pub fn push(&mut self, msg: TransitMessage) { + self.batch.push(msg) + } + pub fn events(&self) -> TrackedEvents { let list = self.batch.iter().map(|gm| gm.event.clone()).collect(); TrackedEvents { diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 79069e0ce7..d7191597e8 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -391,11 +391,11 @@ impl RelayPath { let mut dst_od = OperationalData::new( src_height, OperationalDataTarget::Destination, - events.tracking_nr.clone(), + events.tracking_nr, ); for event in input { - debug!("[{}] {} => {}", self, self.src_chain().id(), event); + trace!("[{}] {} => {}", self, self.src_chain().id(), event); let (dst_msg, src_msg) = match event { IbcEvent::CloseInitChannel(_) => ( Some(self.build_chan_close_confirm_from_event(&event)?), @@ -1335,7 +1335,7 @@ impl RelayPath { // to source operational data. let mut all_dst_odata = self.dst_operational_data.clone_vec(); - let mut timed_out: HashMap> = HashMap::default(); + let mut timed_out: HashMap = HashMap::default(); // For each operational data targeting the destination chain... for (odata_pos, odata) in all_dst_odata.iter_mut().enumerate() { @@ -1354,7 +1354,9 @@ impl RelayPath { self.build_timeout_from_send_packet_event(e, dst_current_height)? { debug!("[{}] found a timed-out msg in the op data {}", self, odata); - timed_out.entry(odata_pos).or_insert_with(Vec::new).push( + timed_out.entry(odata_pos).or_insert_with( + || OperationalData::new(dst_current_height, OperationalDataTarget::Source, odata.tracking_nr.clone()) + ).push( TransitMessage { event: event.clone(), msg: new_msg, @@ -1394,12 +1396,7 @@ impl RelayPath { } // Schedule new operational data targeting the source chain - for (_, batch) in timed_out.into_iter() { - let mut new_od = - OperationalData::new(dst_current_height, OperationalDataTarget::Source, todo!()); - - new_od.batch = batch; - + for (_, new_od) in timed_out.into_iter() { info!( "[{}] re-scheduling from new timed-out batch of size {}", self, From 50ff524df54179d64ed9ff9265ed1fe9991103ab Mon Sep 17 00:00:00 2001 From: Adi Seredinschi Date: Mon, 25 Oct 2021 15:35:24 +0200 Subject: [PATCH 04/29] Resolving todos, refactoring --- config.toml | 2 + relayer-cli/src/commands.rs | 10 +++- relayer/src/chain/cosmos.rs | 12 ++--- relayer/src/chain/mock.rs | 5 +- relayer/src/chain/tx.rs | 28 +++++++---- relayer/src/channel.rs | 36 +++----------- relayer/src/connection.rs | 36 +++----------- relayer/src/foreign_client.rs | 23 +++------ relayer/src/link/operational_data.rs | 44 +++++++++-------- relayer/src/link/relay_path.rs | 72 +++++++++++++++------------- relayer/src/transfer.rs | 2 +- relayer/src/upgrade_chain.rs | 2 +- 12 files changed, 124 insertions(+), 148 deletions(-) diff --git a/config.toml b/config.toml index 3445a61e6c..4ded94331a 100644 --- a/config.toml +++ b/config.toml @@ -88,6 +88,8 @@ websocket_addr = 'ws://127.0.0.1:26657/websocket' # Specify the maximum amount of time (duration) that the RPC requests should # take before timing out. Default: 10s (10 seconds) +# Note: Hermes uses this parameter only in `start` mode; for all other commands, +# Hermes uses a large preconfigured timeout, on the order of minutes. rpc_timeout = '10s' # Specify the prefix used by the chain. Required diff --git a/relayer-cli/src/commands.rs b/relayer-cli/src/commands.rs index c2db0bf156..e45acbbf9d 100644 --- a/relayer-cli/src/commands.rs +++ b/relayer-cli/src/commands.rs @@ -5,6 +5,7 @@ //! See the `impl Configurable` below for how to specify the path to the //! application's configuration file. +use core::time::Duration; use std::path::PathBuf; use abscissa_core::{ @@ -140,6 +141,14 @@ impl Configurable for CliCmd { ccfg.memo_prefix.apply_suffix(&suffix); } + // For all commands except for `start` Hermes retries + // for a prolonged period of time. + if !matches!(self, CliCmd::Start(_)) { + for c in config.chains.iter_mut() { + c.rpc_timeout = Duration::from_secs(120); + } + } + match self { CliCmd::Tx(cmd) => cmd.override_config(config), // CliCmd::Help(cmd) => cmd.override_config(config), @@ -148,7 +157,6 @@ impl Configurable for CliCmd { // CliCmd::Update(cmd) => cmd.override_config(config), // CliCmd::Upgrade(cmd) => cmd.override_config(config), // CliCmd::Start(cmd) => cmd.override_config(config), - // CliCmd::StartMulti(cmd) => cmd.override_config(config), // CliCmd::Query(cmd) => cmd.override_config(config), // CliCmd::Listen(cmd) => cmd.override_config(config), // CliCmd::Misbehaviour(cmd) => cmd.override_config(config), diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 6662187698..3228f22362 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -845,11 +845,11 @@ impl ChainEndpoint for CosmosSdkChain { tracked_msgs: TrackedMsgs, ) -> Result, Error> { crate::time!("send_messages_and_wait_commit"); - let proto_msgs = tracked_msgs.msgs; - debug!( - "send_messages_and_wait_commit with {} messages", - proto_msgs.len() - ); + + let span = span!(Level::DEBUG, "send_w", id = %tracked_msgs); + let _enter = span.enter(); + + let proto_msgs = tracked_msgs.messages(); if proto_msgs.is_empty() { return Ok(vec![]); @@ -906,7 +906,7 @@ impl ChainEndpoint for CosmosSdkChain { let span = span!(Level::DEBUG, "send", id = %tracked_msgs); let _enter = span.enter(); - let proto_msgs = tracked_msgs.msgs; + let proto_msgs = tracked_msgs.messages(); if proto_msgs.is_empty() { return Ok(vec![]); diff --git a/relayer/src/chain/mock.rs b/relayer/src/chain/mock.rs index 7e13a43f84..fd8611c301 100644 --- a/relayer/src/chain/mock.rs +++ b/relayer/src/chain/mock.rs @@ -128,7 +128,10 @@ impl ChainEndpoint for MockChain { tracked_msgs: TrackedMsgs, ) -> Result, Error> { // Use the ICS18Context interface to submit the set of messages. - let events = self.context.send(tracked_msgs.msgs).map_err(Error::ics18)?; + let events = self + .context + .send(tracked_msgs.into()) + .map_err(Error::ics18)?; Ok(events) } diff --git a/relayer/src/chain/tx.rs b/relayer/src/chain/tx.rs index 1e7b6d3e6c..f830f8f717 100644 --- a/relayer/src/chain/tx.rs +++ b/relayer/src/chain/tx.rs @@ -4,30 +4,38 @@ use prost_types::Any; #[derive(Debug, Clone)] pub struct TrackedMsgs { - pub msgs: Vec, - pub tracking_nr: String, + msgs: Vec, + tracking_id: String, } -// TODO(Adi): The tracking nr should never be empty impl TrackedMsgs { - pub fn new(msgs: Vec) -> Self { + pub fn new(msgs: Vec, tid: &str) -> Self { Self { msgs, - tracking_nr: "".into(), + tracking_id: tid.into(), } } - pub fn new_single_msg(msg: Any) -> Self { - Self::new(vec![msg]) + pub fn messages(&self) -> &Vec { + &self.msgs } - pub fn empty() -> Self { - Self::new(vec![]) + pub fn new_single(msg: Any, tid: &str) -> Self { + Self { + msgs: vec![msg], + tracking_id: tid.into(), + } + } +} + +impl From for Vec { + fn from(tm: TrackedMsgs) -> Vec { + tm.msgs } } impl fmt::Display for TrackedMsgs { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}; len={}", self.tracking_nr, self.msgs.len()) + write!(f, "{}; len={}", self.tracking_id, self.msgs.len()) } } diff --git a/relayer/src/channel.rs b/relayer/src/channel.rs index c664f5be93..3624e19fa8 100644 --- a/relayer/src/channel.rs +++ b/relayer/src/channel.rs @@ -702,11 +702,7 @@ impl Channel { pub fn build_chan_open_init_and_send(&self) -> Result { let dst_msgs = self.build_chan_open_init()?; - // TODO(ADI) - let tm = TrackedMsgs { - msgs: dst_msgs, - tracking_nr: "".into(), - }; + let tm = TrackedMsgs::new(dst_msgs, "create channel"); let events = self .dst_chain() @@ -865,11 +861,7 @@ impl Channel { pub fn build_chan_open_try_and_send(&self) -> Result { let dst_msgs = self.build_chan_open_try()?; - // TODO(ADI) - let tm = TrackedMsgs { - msgs: dst_msgs, - tracking_nr: "".into(), - }; + let tm = TrackedMsgs::new(dst_msgs, "create channel"); let events = self .dst_chain() @@ -956,11 +948,7 @@ impl Channel { ) -> Result { let dst_msgs = channel.build_chan_open_ack()?; - // TODO(ADI) - let tm = TrackedMsgs { - msgs: dst_msgs, - tracking_nr: "".into(), - }; + let tm = TrackedMsgs::new(dst_msgs, "create channel"); let events = channel .dst_chain() @@ -1058,11 +1046,7 @@ impl Channel { ) -> Result { let dst_msgs = channel.build_chan_open_confirm()?; - // TODO(ADI) - let tm = TrackedMsgs { - msgs: dst_msgs, - tracking_nr: "".into(), - }; + let tm = TrackedMsgs::new(dst_msgs, "create channel"); let events = channel .dst_chain() .send_messages_and_wait_commit(tm) @@ -1126,11 +1110,7 @@ impl Channel { pub fn build_chan_close_init_and_send(&self) -> Result { let dst_msgs = self.build_chan_close_init()?; - // TODO(ADI) - let tm = TrackedMsgs { - msgs: dst_msgs, - tracking_nr: "".into(), - }; + let tm = TrackedMsgs::new(dst_msgs, "create channel"); let events = self .dst_chain() @@ -1211,11 +1191,7 @@ impl Channel { pub fn build_chan_close_confirm_and_send(&self) -> Result { let dst_msgs = self.build_chan_close_confirm()?; - // TODO(ADI) - let tm = TrackedMsgs { - msgs: dst_msgs, - tracking_nr: "".into(), - }; + let tm = TrackedMsgs::new(dst_msgs, "create channel"); let events = self .dst_chain() diff --git a/relayer/src/connection.rs b/relayer/src/connection.rs index 39ed3789a2..1088682425 100644 --- a/relayer/src/connection.rs +++ b/relayer/src/connection.rs @@ -755,11 +755,7 @@ impl Connection { pub fn build_conn_init_and_send(&self) -> Result { let dst_msgs = self.build_conn_init()?; - // TODO(ADI) - let tm = TrackedMsgs { - msgs: dst_msgs, - tracking_nr: "".into(), - }; + let tm = TrackedMsgs::new(dst_msgs, "create connection"); let events = self .dst_chain() @@ -819,11 +815,7 @@ impl Connection { .map_err(|e| ConnectionError::chain_query(self.dst_chain().id(), e))?; let client_msgs = self.build_update_client_on_src(src_client_target_height)?; - // TODO(ADI) - let tm = TrackedMsgs { - msgs: client_msgs, - tracking_nr: "".into(), - }; + let tm = TrackedMsgs::new(client_msgs, "create connection"); self.src_chain() .send_messages_and_wait_commit(tm) .map_err(|e| ConnectionError::submit(self.src_chain().id(), e))?; @@ -894,11 +886,7 @@ impl Connection { pub fn build_conn_try_and_send(&self) -> Result { let dst_msgs = self.build_conn_try()?; - // TODO(ADI) - let tm = TrackedMsgs { - msgs: dst_msgs, - tracking_nr: "".into(), - }; + let tm = TrackedMsgs::new(dst_msgs, "create connection"); let events = self .dst_chain() @@ -948,11 +936,7 @@ impl Connection { .map_err(|e| ConnectionError::chain_query(self.dst_chain().id(), e))?; let client_msgs = self.build_update_client_on_src(src_client_target_height)?; - // TODO(ADI) - let tm = TrackedMsgs { - msgs: client_msgs, - tracking_nr: "".into(), - }; + let tm = TrackedMsgs::new(client_msgs, "create connection"); self.src_chain() .send_messages_and_wait_commit(tm) @@ -998,11 +982,7 @@ impl Connection { pub fn build_conn_ack_and_send(&self) -> Result { let dst_msgs = self.build_conn_ack()?; - // TODO(ADI) - let tm = TrackedMsgs { - msgs: dst_msgs, - tracking_nr: "".into(), - }; + let tm = TrackedMsgs::new(dst_msgs, "create connection"); let events = self .dst_chain() @@ -1081,11 +1061,7 @@ impl Connection { pub fn build_conn_confirm_and_send(&self) -> Result { let dst_msgs = self.build_conn_confirm()?; - // TODO(ADI) - let tm = TrackedMsgs { - msgs: dst_msgs, - tracking_nr: "".into(), - }; + let tm = TrackedMsgs::new(dst_msgs, "create connection"); let events = self .dst_chain() diff --git a/relayer/src/foreign_client.rs b/relayer/src/foreign_client.rs index 9d27f96817..e10e3dd8b9 100644 --- a/relayer/src/foreign_client.rs +++ b/relayer/src/foreign_client.rs @@ -393,11 +393,7 @@ impl ForeignClient ForeignClient ForeignClient ForeignClient, - pub tracking_nr: String, + list: Vec, + tracking_id: String, } impl TrackedEvents { @@ -41,6 +41,14 @@ impl TrackedEvents { self.list.is_empty() } + pub fn events(&self) -> &Vec { + &self.list + } + + pub fn tracking_id(&self) -> &String { + &self.tracking_id + } + pub fn len(&self) -> usize { self.list.len() } @@ -56,14 +64,14 @@ impl From> for TrackedEvents { fn from(list: Vec) -> Self { Self { list, - tracking_nr: nanoid!(10), + tracking_id: nanoid!(10), } } } impl fmt::Display for TrackedEvents { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.tracking_nr) + write!(f, "{}", self.tracking_id) } } @@ -93,17 +101,17 @@ pub struct OperationalData { /// Stores the time when the clients on the target chain has been updated, i.e., when this data /// was scheduled. Necessary for packet delays. pub scheduled_time: Instant, - pub tracking_nr: String, + pub tracking_id: String, } impl OperationalData { - pub fn new(proofs_height: Height, target: OperationalDataTarget, tn: String) -> Self { + pub fn new(proofs_height: Height, target: OperationalDataTarget, tn: &str) -> Self { OperationalData { proofs_height, batch: vec![], target, scheduled_time: Instant::now(), - tracking_nr: tn, + tracking_id: tn.to_string(), } } @@ -115,7 +123,7 @@ impl OperationalData { let list = self.batch.iter().map(|gm| gm.event.clone()).collect(); TrackedEvents { list, - tracking_nr: self.tracking_nr.clone(), + tracking_id: self.tracking_id.clone(), } } @@ -126,11 +134,6 @@ impl OperationalData { &self, relay_path: &RelayPath, ) -> Result { - if self.batch.is_empty() { - warn!("assemble_msgs() method call on an empty OperationalData!"); - return Ok(TrackedMsgs::empty()); - } - // For zero delay we prepend the client update msgs. let client_update_msg = if relay_path.zero_delay() { let update_height = self.proofs_height.increment(); @@ -163,12 +166,13 @@ impl OperationalData { None => self.batch.iter().map(|gm| gm.msg.clone()).collect(), }; - let tm = TrackedMsgs { - msgs, - tracking_nr: self.tracking_nr.clone(), - }; + let tm = TrackedMsgs::new(msgs, &self.tracking_id); - info!("[{}] assembled batch of {} msgs", relay_path, tm.msgs.len()); + info!( + "[{}] assembled batch of {} msgs", + relay_path, + tm.messages().len() + ); Ok(tm) } @@ -179,7 +183,7 @@ impl fmt::Display for OperationalData { write!( f, "{} ->{} @{}; len={}", - self.tracking_nr, + self.tracking_id, self.target, self.proofs_height, self.batch.len(), diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index d7191597e8..3910d44a26 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -374,7 +374,7 @@ impl RelayPath { let span = span!(Level::DEBUG, "generate", id = %events); let _enter = span.enter(); - let input = events.list; + let input = events.events(); let src_height = match input.get(0) { None => return Ok((None, None)), Some(ev) => ev.height(), @@ -385,22 +385,21 @@ impl RelayPath { let mut src_od = OperationalData::new( dst_height, OperationalDataTarget::Source, - events.tracking_nr.clone(), + events.tracking_id(), ); // Operational data targeting the destination chain (e.g., SendPacket messages) let mut dst_od = OperationalData::new( src_height, OperationalDataTarget::Destination, - events.tracking_nr, + events.tracking_id(), ); for event in input { trace!("[{}] {} => {}", self, self.src_chain().id(), event); let (dst_msg, src_msg) = match event { - IbcEvent::CloseInitChannel(_) => ( - Some(self.build_chan_close_confirm_from_event(&event)?), - None, - ), + IbcEvent::CloseInitChannel(_) => { + (Some(self.build_chan_close_confirm_from_event(event)?), None) + } IbcEvent::TimeoutPacket(ref timeout_ev) => { // When a timeout packet for an ordered channel is processed on-chain (src here) // the chain closes the channel but no close init event is emitted, instead @@ -412,10 +411,7 @@ impl RelayPath { .src_channel(timeout_ev.height)? .state_matches(&ChannelState::Closed) { - ( - Some(self.build_chan_close_confirm_from_event(&event)?), - None, - ) + (Some(self.build_chan_close_confirm_from_event(event)?), None) } else { (None, None) } @@ -474,7 +470,10 @@ impl RelayPath { msg.type_url, event ); - src_od.batch.push(TransitMessage { event, msg }); + src_od.batch.push(TransitMessage { + event: event.clone(), + msg, + }); } } } @@ -734,7 +733,11 @@ impl RelayPath { } /// Handles updating the client on the destination chain - fn update_client_dst(&self, src_chain_height: Height) -> Result<(), LinkError> { + fn update_client_dst( + &self, + src_chain_height: Height, + tracking_id: &str, + ) -> Result<(), LinkError> { // Handle the update on the destination chain // Check if a consensus state at update_height exists on destination chain already if self @@ -756,11 +759,7 @@ impl RelayPath { i + 1, MAX_RETRIES, ); - // TODO(ADI) Fill in the tn. - let tm = TrackedMsgs { - msgs: dst_update, - tracking_nr: "".into(), - }; + let tm = TrackedMsgs::new(dst_update, tracking_id); let dst_tx_events = self .dst_chain() @@ -784,7 +783,11 @@ impl RelayPath { } /// Handles updating the client on the source chain - fn update_client_src(&self, dst_chain_height: Height) -> Result<(), LinkError> { + fn update_client_src( + &self, + dst_chain_height: Height, + tracking_id: &str, + ) -> Result<(), LinkError> { if self .src_chain() .proven_client_consensus(self.src_client_id(), dst_chain_height, Height::zero()) @@ -803,11 +806,7 @@ impl RelayPath { dst_chain_height, ); - // TODO(ADI) Fill in the tn. - let tm = TrackedMsgs { - msgs: src_update, - tracking_nr: "".into(), - }; + let tm = TrackedMsgs::new(src_update, tracking_id); let src_tx_events = self .src_chain() @@ -1354,14 +1353,19 @@ impl RelayPath { self.build_timeout_from_send_packet_event(e, dst_current_height)? { debug!("[{}] found a timed-out msg in the op data {}", self, odata); - timed_out.entry(odata_pos).or_insert_with( - || OperationalData::new(dst_current_height, OperationalDataTarget::Source, odata.tracking_nr.clone()) - ).push( - TransitMessage { + timed_out + .entry(odata_pos) + .or_insert_with(|| { + OperationalData::new( + dst_current_height, + OperationalDataTarget::Source, + &odata.tracking_id, + ) + }) + .push(TransitMessage { event: event.clone(), msg: new_msg, - }, - ); + }); } else { // A SendPacket event, but did not time-out yet, retain retain_batch.push(gm.clone()); @@ -1429,8 +1433,12 @@ impl RelayPath { debug!("[{}] connection delay is non-zero: updating client", self); let target_height = od.proofs_height.increment(); match od.target { - OperationalDataTarget::Source => self.update_client_src(target_height)?, - OperationalDataTarget::Destination => self.update_client_dst(target_height)?, + OperationalDataTarget::Source => { + self.update_client_src(target_height, &od.tracking_id)? + } + OperationalDataTarget::Destination => { + self.update_client_dst(target_height, &od.tracking_id)? + } }; } else { debug!( diff --git a/relayer/src/transfer.rs b/relayer/src/transfer.rs index a01ba3359f..28bc5e94a2 100644 --- a/relayer/src/transfer.rs +++ b/relayer/src/transfer.rs @@ -133,7 +133,7 @@ pub fn build_and_send_transfer_messages( let msgs = vec![raw_msg; opts.number_msgs]; let events = packet_src_chain - .send_messages_and_wait_commit(TrackedMsgs::new(msgs)) + .send_messages_and_wait_commit(TrackedMsgs::new(msgs, "ft-transfer")) .map_err(|e| PacketError::submit(packet_src_chain.id(), e))?; // Check if the chain rejected the transaction diff --git a/relayer/src/upgrade_chain.rs b/relayer/src/upgrade_chain.rs index e2b22d41a2..ec7f2d9e0c 100644 --- a/relayer/src/upgrade_chain.rs +++ b/relayer/src/upgrade_chain.rs @@ -133,7 +133,7 @@ pub fn build_and_send_ibc_upgrade_proposal( }; let events = dst_chain - .send_messages_and_wait_commit(TrackedMsgs::new_single_msg(any_msg)) + .send_messages_and_wait_commit(TrackedMsgs::new_single(any_msg, "upgrade")) .map_err(|e| UpgradeChainError::submit(dst_chain.id().clone(), e))?; // Check if the chain rejected the transaction From 794a7bf8764658097613c1136757c12bd9879a35 Mon Sep 17 00:00:00 2001 From: Adi Seredinschi Date: Tue, 26 Oct 2021 11:37:50 +0200 Subject: [PATCH 05/29] Better config.toml comment --- config.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config.toml b/config.toml index 4ded94331a..0f5ca6266b 100644 --- a/config.toml +++ b/config.toml @@ -88,8 +88,8 @@ websocket_addr = 'ws://127.0.0.1:26657/websocket' # Specify the maximum amount of time (duration) that the RPC requests should # take before timing out. Default: 10s (10 seconds) -# Note: Hermes uses this parameter only in `start` mode; for all other commands, -# Hermes uses a large preconfigured timeout, on the order of minutes. +# Note: Hermes uses this parameter _only_ in `start` mode; for all other CLIs, +# Hermes uses a large preconfigured timeout (on the order of minutes). rpc_timeout = '10s' # Specify the prefix used by the chain. Required From d75c8321ffbaf6567405d020174c7085d33d7c54 Mon Sep 17 00:00:00 2001 From: Adi Seredinschi Date: Fri, 26 Nov 2021 10:33:13 +0100 Subject: [PATCH 06/29] Post-merge fixes --- relayer/src/chain/cosmos.rs | 2 +- relayer/src/link/relay_path.rs | 3 +-- relayer/src/transfer.rs | 1 - tools/integration-test/src/relayer/chain.rs | 9 +++++---- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 864905986c..0b5c01eeea 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -30,7 +30,7 @@ use tendermint_rpc::{ }; use tokio::runtime::Runtime as TokioRuntime; use tonic::codegen::http::Uri; -use tracing::{debug, error, span, trace, warn, Level}; +use tracing::{debug, error, info, span, trace, warn, Level}; use ibc::clients::ics07_tendermint::client_state::{AllowUpdate, ClientState}; use ibc::clients::ics07_tendermint::consensus_state::ConsensusState as TMConsensusState; diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 1966c07826..cff6eb7312 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -393,7 +393,7 @@ impl RelayPath { let dst_latest_height = dst_latest_info.height; // Operational data targeting the source chain (e.g., Timeout packets) let mut src_od = OperationalData::new( - dst_height, + dst_latest_height, OperationalDataTarget::Source, events.tracking_id(), ); @@ -1338,7 +1338,6 @@ impl RelayPath { let span = span!(Level::INFO, "refresh"); let _enter = span.enter(); - let dst_current_height = self.dst_latest_height()?; let dst_status = self .dst_chain() .query_status() diff --git a/relayer/src/transfer.rs b/relayer/src/transfer.rs index 10c68ca0a1..381f4d6ef3 100644 --- a/relayer/src/transfer.rs +++ b/relayer/src/transfer.rs @@ -14,7 +14,6 @@ use uint::FromStrRadixErr; use crate::chain::handle::ChainHandle; use crate::chain::tx::TrackedMsgs; -use crate::config::ChainConfig; use crate::error::Error; use crate::util::bigint::U256; diff --git a/tools/integration-test/src/relayer/chain.rs b/tools/integration-test/src/relayer/chain.rs index 4d1f35b34b..3a5e0bfcd3 100644 --- a/tools/integration-test/src/relayer/chain.rs +++ b/tools/integration-test/src/relayer/chain.rs @@ -55,6 +55,7 @@ use ibc_proto::ibc::core::commitment::v1::MerkleProof; use ibc_proto::ibc::core::connection::v1::QueryClientConnectionsRequest; use ibc_proto::ibc::core::connection::v1::QueryConnectionsRequest; use ibc_relayer::chain::handle::{ChainHandle, ChainRequest, Subscription}; +use ibc_relayer::chain::tx::TrackedMsgs; use ibc_relayer::chain::{HealthCheck, StatusResponse}; use ibc_relayer::config::ChainConfig; use ibc_relayer::error::Error; @@ -89,16 +90,16 @@ where fn send_messages_and_wait_commit( &self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error> { - self.value().send_messages_and_wait_commit(proto_msgs) + self.value().send_messages_and_wait_commit(tracked_msgs) } fn send_messages_and_wait_check_tx( &self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error> { - self.value().send_messages_and_wait_check_tx(proto_msgs) + self.value().send_messages_and_wait_check_tx(tracked_msgs) } fn get_signer(&self) -> Result { From a2c3311eb40ff28af313cd57fb23b4bfd502dfb1 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Fri, 17 Dec 2021 18:20:42 +0200 Subject: [PATCH 07/29] Post-merge fix --- relayer/src/channel.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/relayer/src/channel.rs b/relayer/src/channel.rs index 2b3c2132fc..8a9783cb97 100644 --- a/relayer/src/channel.rs +++ b/relayer/src/channel.rs @@ -23,6 +23,7 @@ use ibc_proto::ibc::core::channel::v1::QueryConnectionChannelsRequest; use crate::chain::counterparty::{channel_connection_client, channel_state_on_destination}; use crate::chain::handle::ChainHandle; +use crate::chain::tx::TrackedMsgs; use crate::channel::version::ResolveContext; use crate::connection::Connection; use crate::foreign_client::ForeignClient; From 590603569f35151cabf7318cbd907bfebf145cc6 Mon Sep 17 00:00:00 2001 From: Adi Seredinschi Date: Mon, 20 Dec 2021 16:25:05 +0200 Subject: [PATCH 08/29] Sketch: printing tx hashes from SendPacket events. This was inspired from commit b63335b7fd810ecd167eaf1392ab5fb513eedd0b (see rpc.rs therein). --- relayer/src/event/rpc.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/relayer/src/event/rpc.rs b/relayer/src/event/rpc.rs index 9e1dc5bcd9..2d9d6c0e60 100644 --- a/relayer/src/event/rpc.rs +++ b/relayer/src/event/rpc.rs @@ -60,6 +60,13 @@ pub fn get_all_events( chan_event.set_height(height); tracing::trace!("extracted ibc_channel event {:?}", chan_event); vals.push((height, chan_event)); + if matches!(chan_event, IbcEvent::SendPacket(_)) { + // TODO(Mikhail): Extract the tx hash & print it here. + let pre_tx_hash = + result.events + .map(|events| events.get("tx.hash")) + .flatten(); + } } } } From 74913b79e04494e20e048e190a0f86ff5b2abfef Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Tue, 21 Dec 2021 15:59:03 +0200 Subject: [PATCH 09/29] log the tx hashes in ibc_channel event SendPacket --- relayer/src/event/rpc.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/relayer/src/event/rpc.rs b/relayer/src/event/rpc.rs index 2d9d6c0e60..2adc77567c 100644 --- a/relayer/src/event/rpc.rs +++ b/relayer/src/event/rpc.rs @@ -58,15 +58,20 @@ pub fn get_all_events( if *query == queries::ibc_channel().to_string() { if let Some(mut chan_event) = ChannelEvents::try_from_tx(abci_event) { chan_event.set_height(height); - tracing::trace!("extracted ibc_channel event {:?}", chan_event); - vals.push((height, chan_event)); + let _span = tracing::trace_span!("ibc_channel event"); + tracing::trace!("extracted {:?}", chan_event); if matches!(chan_event, IbcEvent::SendPacket(_)) { - // TODO(Mikhail): Extract the tx hash & print it here. - let pre_tx_hash = - result.events - .map(|events| events.get("tx.hash")) - .flatten(); + // Should be the same as the hash of tx_result.tx? + if let Some(hashes) = result + .events + .as_ref() + .map(|events| events.get("tx.hash").cloned()) + .flatten() + { + tracing::trace!(event = "SendPacket", "tx hashes: {:?}", hashes); + } } + vals.push((height, chan_event)); } } } From dcf662ae9d8ecbdba2259fcfbea792b0d29f6151 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Wed, 22 Dec 2021 13:09:07 +0200 Subject: [PATCH 10/29] Improve code to print out the tx hash Just examine the hash map, do not allocate any copies. --- relayer/src/event/rpc.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/relayer/src/event/rpc.rs b/relayer/src/event/rpc.rs index 2adc77567c..e5b6e49999 100644 --- a/relayer/src/event/rpc.rs +++ b/relayer/src/event/rpc.rs @@ -62,13 +62,13 @@ pub fn get_all_events( tracing::trace!("extracted {:?}", chan_event); if matches!(chan_event, IbcEvent::SendPacket(_)) { // Should be the same as the hash of tx_result.tx? - if let Some(hashes) = result + if let Some(hash) = result .events .as_ref() - .map(|events| events.get("tx.hash").cloned()) - .flatten() + .and_then(|events| events.get("tx.hash")) + .and_then(|values| values.get(0)) { - tracing::trace!(event = "SendPacket", "tx hashes: {:?}", hashes); + tracing::trace!(event = "SendPacket", "tx hash: {}", hash); } } vals.push((height, chan_event)); From f9ca4db8d6c7d5deed8dd6a900d63a94f52bec28 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Wed, 22 Dec 2021 14:17:33 +0200 Subject: [PATCH 11/29] Actually enter the tracing span --- relayer/src/event/rpc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/src/event/rpc.rs b/relayer/src/event/rpc.rs index e5b6e49999..e3f3ac7640 100644 --- a/relayer/src/event/rpc.rs +++ b/relayer/src/event/rpc.rs @@ -58,7 +58,7 @@ pub fn get_all_events( if *query == queries::ibc_channel().to_string() { if let Some(mut chan_event) = ChannelEvents::try_from_tx(abci_event) { chan_event.set_height(height); - let _span = tracing::trace_span!("ibc_channel event"); + let _span = tracing::trace_span!("ibc_channel event").entered(); tracing::trace!("extracted {:?}", chan_event); if matches!(chan_event, IbcEvent::SendPacket(_)) { // Should be the same as the hash of tx_result.tx? From 5de3f2b9ddcd89d49936b714736dd56b85679bc5 Mon Sep 17 00:00:00 2001 From: Adi Seredinschi Date: Thu, 23 Dec 2021 09:25:23 +0200 Subject: [PATCH 12/29] Apply suggestions from code review Co-authored-by: Mikhail Zabaluev --- relayer/src/chain/cosmos.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 82627f06f2..f9e296961c 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -956,8 +956,7 @@ impl ChainEndpoint for CosmosSdkChain { ) -> Result, Error> { crate::time!("send_messages_and_wait_commit"); - let span = span!(Level::DEBUG, "send_w", id = %tracked_msgs); - let _enter = span.enter(); + let _span = span!(Level::DEBUG, "send_w", id = %tracked_msgs).entered(); let proto_msgs = tracked_msgs.messages(); From 1d15085e97ee51ed3cb75ea6b0d85730e342b24d Mon Sep 17 00:00:00 2001 From: Adi Seredinschi Date: Thu, 23 Dec 2021 09:40:39 +0200 Subject: [PATCH 13/29] Comment explaining TrackedMsgs --- relayer/src/chain/tx.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/relayer/src/chain/tx.rs b/relayer/src/chain/tx.rs index f830f8f717..982a107f4a 100644 --- a/relayer/src/chain/tx.rs +++ b/relayer/src/chain/tx.rs @@ -2,6 +2,12 @@ use core::fmt; use prost_types::Any; +/// A wrapper over a vector of proto-encoded messages +/// (`Vec`), which has an associated tracking +/// number. +/// +/// A [`TrackedMsgs`] correlates with a [`TrackedEvents`] +/// by sharing the same `tracking_id`. #[derive(Debug, Clone)] pub struct TrackedMsgs { msgs: Vec, From 60e66e6438d87b0b5738d16610f4fb318816c82a Mon Sep 17 00:00:00 2001 From: Adi Seredinschi Date: Thu, 23 Dec 2021 09:46:34 +0200 Subject: [PATCH 14/29] Removed use of TrackedEvents Display impl --- relayer/src/link/operational_data.rs | 6 ------ relayer/src/link/relay_path.rs | 4 ++-- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index 0d0a879bc9..058b323aad 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -69,12 +69,6 @@ impl From> for TrackedEvents { } } -impl fmt::Display for TrackedEvents { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.tracking_id) - } -} - /// A packet message that is prepared for sending /// to a chain, but has not been sent yet. /// diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 5a5d9a71ab..71f288e83d 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -288,7 +288,7 @@ impl RelayPath { } } - // Transform into tracked `Events` + // Transform into `TrackedEvents` result.into() } @@ -378,7 +378,7 @@ impl RelayPath { &self, events: TrackedEvents, ) -> Result<(Option, Option), LinkError> { - let span = span!(Level::DEBUG, "generate", id = %events); + let span = span!(Level::DEBUG, "generate", id = %events.tracking_id()); let _enter = span.enter(); let input = events.events(); From d189d2cb8f7f3f9709a113cc762fc483ad2a7f7a Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 23 Dec 2021 13:14:02 +0200 Subject: [PATCH 15/29] Erase Display impl for TrackedMsgs Add tracking_id method to use in tracing instead. Also, use less cryptic span names in send_messages_*. --- relayer/src/chain/cosmos.rs | 5 +++-- relayer/src/chain/tx.rs | 12 ++++-------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 17c6730652..50dfa0cca9 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -1054,7 +1054,8 @@ impl ChainEndpoint for CosmosSdkChain { ) -> Result, Error> { crate::time!("send_messages_and_wait_commit"); - let _span = span!(Level::DEBUG, "send_w", id = %tracked_msgs).entered(); + let _span = + span!(Level::DEBUG, "send_tx_commit", id = %tracked_msgs.tracking_id()).entered(); let proto_msgs = tracked_msgs.messages(); @@ -1111,7 +1112,7 @@ impl ChainEndpoint for CosmosSdkChain { ) -> Result, Error> { crate::time!("send_messages_and_wait_check_tx"); - let span = span!(Level::DEBUG, "send", id = %tracked_msgs); + let span = span!(Level::DEBUG, "send_tx_check", id = %tracked_msgs.tracking_id()); let _enter = span.enter(); let proto_msgs = tracked_msgs.messages(); diff --git a/relayer/src/chain/tx.rs b/relayer/src/chain/tx.rs index 982a107f4a..12de31c159 100644 --- a/relayer/src/chain/tx.rs +++ b/relayer/src/chain/tx.rs @@ -1,5 +1,3 @@ -use core::fmt; - use prost_types::Any; /// A wrapper over a vector of proto-encoded messages @@ -26,6 +24,10 @@ impl TrackedMsgs { &self.msgs } + pub fn tracking_id(&self) -> &str { + &self.tracking_id + } + pub fn new_single(msg: Any, tid: &str) -> Self { Self { msgs: vec![msg], @@ -39,9 +41,3 @@ impl From for Vec { tm.msgs } } - -impl fmt::Display for TrackedMsgs { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}; len={}", self.tracking_id, self.msgs.len()) - } -} From 3166f76b87f96c9849132b07a42229dedf8ad101 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 23 Dec 2021 13:19:14 +0200 Subject: [PATCH 16/29] Allow passing IDs without copy in TrackedMsgs Make the tracking ID construction parameter generic to accept any Into parameter. --- relayer/src/chain/tx.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/relayer/src/chain/tx.rs b/relayer/src/chain/tx.rs index 12de31c159..3947a20019 100644 --- a/relayer/src/chain/tx.rs +++ b/relayer/src/chain/tx.rs @@ -13,13 +13,20 @@ pub struct TrackedMsgs { } impl TrackedMsgs { - pub fn new(msgs: Vec, tid: &str) -> Self { + pub fn new(msgs: Vec, tid: impl Into) -> Self { Self { msgs, tracking_id: tid.into(), } } + pub fn new_single(msg: Any, tid: impl Into) -> Self { + Self { + msgs: vec![msg], + tracking_id: tid.into(), + } + } + pub fn messages(&self) -> &Vec { &self.msgs } @@ -27,13 +34,6 @@ impl TrackedMsgs { pub fn tracking_id(&self) -> &str { &self.tracking_id } - - pub fn new_single(msg: Any, tid: &str) -> Self { - Self { - msgs: vec![msg], - tracking_id: tid.into(), - } - } } impl From for Vec { From c93249b8901c8dfaca327708e4aefabc2e54a71a Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 23 Dec 2021 13:43:35 +0200 Subject: [PATCH 17/29] Different tracking ids for creation flows Identify TrackedMsgs batches for creating channels and connections with the specific message that is used. --- relayer/src/channel.rs | 12 ++++++------ relayer/src/connection.rs | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/relayer/src/channel.rs b/relayer/src/channel.rs index 8a9783cb97..c5d854af48 100644 --- a/relayer/src/channel.rs +++ b/relayer/src/channel.rs @@ -704,7 +704,7 @@ impl Channel { pub fn build_chan_open_init_and_send(&self) -> Result { let dst_msgs = self.build_chan_open_init()?; - let tm = TrackedMsgs::new(dst_msgs, "create channel"); + let tm = TrackedMsgs::new(dst_msgs, "create channel with ChannelOpenInit"); let events = self .dst_chain() @@ -870,7 +870,7 @@ impl Channel { pub fn build_chan_open_try_and_send(&self) -> Result { let dst_msgs = self.build_chan_open_try()?; - let tm = TrackedMsgs::new(dst_msgs, "create channel"); + let tm = TrackedMsgs::new(dst_msgs, "create channel with ChannelOpenTry"); let events = self .dst_chain() @@ -957,7 +957,7 @@ impl Channel { ) -> Result { let dst_msgs = channel.build_chan_open_ack()?; - let tm = TrackedMsgs::new(dst_msgs, "create channel"); + let tm = TrackedMsgs::new(dst_msgs, "create channel with ChannelOpenAck"); let events = channel .dst_chain() @@ -1055,7 +1055,7 @@ impl Channel { ) -> Result { let dst_msgs = channel.build_chan_open_confirm()?; - let tm = TrackedMsgs::new(dst_msgs, "create channel"); + let tm = TrackedMsgs::new(dst_msgs, "create channel with ChannelOpenConfirm"); let events = channel .dst_chain() .send_messages_and_wait_commit(tm) @@ -1119,7 +1119,7 @@ impl Channel { pub fn build_chan_close_init_and_send(&self) -> Result { let dst_msgs = self.build_chan_close_init()?; - let tm = TrackedMsgs::new(dst_msgs, "create channel"); + let tm = TrackedMsgs::new(dst_msgs, "create channel with ChannelCloseInit"); let events = self .dst_chain() @@ -1200,7 +1200,7 @@ impl Channel { pub fn build_chan_close_confirm_and_send(&self) -> Result { let dst_msgs = self.build_chan_close_confirm()?; - let tm = TrackedMsgs::new(dst_msgs, "create channel"); + let tm = TrackedMsgs::new(dst_msgs, "create channel with ChannelCloseConfirm"); let events = self .dst_chain() diff --git a/relayer/src/connection.rs b/relayer/src/connection.rs index 24cb7e307d..3f222799ec 100644 --- a/relayer/src/connection.rs +++ b/relayer/src/connection.rs @@ -768,7 +768,7 @@ impl Connection { pub fn build_conn_init_and_send(&self) -> Result { let dst_msgs = self.build_conn_init()?; - let tm = TrackedMsgs::new(dst_msgs, "create connection"); + let tm = TrackedMsgs::new(dst_msgs, "create connection with ConnectionOpenInit"); let events = self .dst_chain() @@ -828,7 +828,7 @@ impl Connection { .map_err(|e| ConnectionError::chain_query(self.dst_chain().id(), e))?; let client_msgs = self.build_update_client_on_src(src_client_target_height)?; - let tm = TrackedMsgs::new(client_msgs, "create connection"); + let tm = TrackedMsgs::new(client_msgs, "update client on source for ConnectionOpenTry"); self.src_chain() .send_messages_and_wait_commit(tm) .map_err(|e| ConnectionError::submit(self.src_chain().id(), e))?; @@ -899,7 +899,7 @@ impl Connection { pub fn build_conn_try_and_send(&self) -> Result { let dst_msgs = self.build_conn_try()?; - let tm = TrackedMsgs::new(dst_msgs, "create connection"); + let tm = TrackedMsgs::new(dst_msgs, "create connection with ConnectionOpenTry"); let events = self .dst_chain() @@ -949,7 +949,7 @@ impl Connection { .map_err(|e| ConnectionError::chain_query(self.dst_chain().id(), e))?; let client_msgs = self.build_update_client_on_src(src_client_target_height)?; - let tm = TrackedMsgs::new(client_msgs, "create connection"); + let tm = TrackedMsgs::new(client_msgs, "update client on source for ConnectionOpenAck"); self.src_chain() .send_messages_and_wait_commit(tm) @@ -995,7 +995,7 @@ impl Connection { pub fn build_conn_ack_and_send(&self) -> Result { let dst_msgs = self.build_conn_ack()?; - let tm = TrackedMsgs::new(dst_msgs, "create connection"); + let tm = TrackedMsgs::new(dst_msgs, "create connection with ConnectionOpenAck"); let events = self .dst_chain() @@ -1074,7 +1074,7 @@ impl Connection { pub fn build_conn_confirm_and_send(&self) -> Result { let dst_msgs = self.build_conn_confirm()?; - let tm = TrackedMsgs::new(dst_msgs, "create connection"); + let tm = TrackedMsgs::new(dst_msgs, "create connection with ConnectionOpenConfirm"); let events = self .dst_chain() From 1bba81b9645bdd3ede490f2d2b3c2a724e557b78 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 23 Dec 2021 15:06:44 +0200 Subject: [PATCH 18/29] Redo displaying for OperationalData Add OperationalInfo that can hold the displayable data on the batch, either borrowed or owned with transforming from first to the other. Implement Display on the OperationalInfo instead of OperationalData for clarity. --- relayer/src/link/operational_data.rs | 78 ++++++++++++++++++++++------ relayer/src/link/relay_path.rs | 30 ++++++----- 2 files changed, 81 insertions(+), 27 deletions(-) diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index 058b323aad..fe5908a55e 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -1,3 +1,4 @@ +use alloc::borrow::Cow; use core::fmt; use core::iter; use std::time::Instant; @@ -45,14 +46,10 @@ impl TrackedEvents { &self.list } - pub fn tracking_id(&self) -> &String { + pub fn tracking_id(&self) -> &str { &self.tracking_id } - pub fn len(&self) -> usize { - self.list.len() - } - pub fn set_height(&mut self, height: Height) { for event in self.list.iter_mut() { event.set_height(height); @@ -99,13 +96,17 @@ pub struct OperationalData { } impl OperationalData { - pub fn new(proofs_height: Height, target: OperationalDataTarget, tn: &str) -> Self { + pub fn new( + proofs_height: Height, + target: OperationalDataTarget, + tracking_id: impl Into, + ) -> Self { OperationalData { proofs_height, batch: vec![], target, scheduled_time: Instant::now(), - tracking_id: tn.to_string(), + tracking_id: tracking_id.into(), } } @@ -113,11 +114,22 @@ impl OperationalData { self.batch.push(msg) } - pub fn events(&self) -> TrackedEvents { - let list = self.batch.iter().map(|gm| gm.event.clone()).collect(); + /// Returns displayable information on the operation's data. + pub fn info(&self) -> OperationalInfo<'_> { + OperationalInfo { + tracking_id: Cow::Borrowed(&self.tracking_id), + target: self.target, + proofs_height: self.proofs_height, + batch_len: self.batch.len(), + } + } + + /// Transforms `self` into the list of events accompanied with the tracking ID. + pub fn into_events(self) -> TrackedEvents { + let list = self.batch.into_iter().map(|gm| gm.event).collect(); TrackedEvents { list, - tracking_id: self.tracking_id.clone(), + tracking_id: self.tracking_id, } } @@ -172,15 +184,51 @@ impl OperationalData { } } -impl fmt::Display for OperationalData { +/// A lightweight informational data structure that can be extracted +/// out of [`OperationalData`] for e.g. logging purposes. +pub struct OperationalInfo<'a> { + tracking_id: Cow<'a, str>, + target: OperationalDataTarget, + proofs_height: Height, + batch_len: usize, +} + +impl<'a> OperationalInfo<'a> { + pub fn tracking_id(&self) -> &str { + &self.tracking_id + } + + pub fn target(&self) -> OperationalDataTarget { + self.target + } + + /// Returns the length of the assembled batch of in-transit messages. + pub fn batch_len(&self) -> usize { + self.batch_len + } + + pub fn into_owned(self) -> OperationalInfo<'static> { + let Self { + tracking_id, + target, + proofs_height, + batch_len, + } = self; + OperationalInfo { + tracking_id: tracking_id.into_owned().into(), + target, + proofs_height, + batch_len, + } + } +} + +impl<'a> fmt::Display for OperationalInfo<'a> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, "{} ->{} @{}; len={}", - self.tracking_id, - self.target, - self.proofs_height, - self.batch.len(), + self.tracking_id, self.target, self.proofs_height, self.batch_len, ) } } diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 71f288e83d..04cdc4fcb3 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -511,7 +511,7 @@ impl RelayPath { initial_od: OperationalData, ) -> Result { // We will operate on potentially different operational data if the initial one fails. - let span = span!(Level::INFO, "relay", id = %initial_od); + let span = span!(Level::INFO, "relay", id = %initial_od.info().tracking_id()); let _enter = span.enter(); let mut odata = initial_od; @@ -575,29 +575,31 @@ impl RelayPath { &self, initial_odata: OperationalData, ) -> Option { + let op_info = initial_odata.info().into_owned(); + warn!( "[{}] failed. Regenerate operational data from {} events", self, - initial_odata.events().len() + op_info.batch_len() ); // Retry by re-generating the operational data using the initial events - let (src_opt, dst_opt) = match self.generate_operational_data(initial_odata.events()) { + let (src_opt, dst_opt) = match self.generate_operational_data(initial_odata.into_events()) { Ok(new_operational_data) => new_operational_data, Err(e) => { error!( "[{}] failed to regenerate operational data from initial data: {} \ with error {}, discarding this op. data", - self, initial_odata, e + self, op_info, e ); return None; } // Cannot retry, contain the error by reporting a None }; if let Some(src_od) = src_opt { - if src_od.target == initial_odata.target { + if src_od.target == op_info.target() { // Our target is the _source_ chain, retry these messages - info!("[{}] will retry with op data {}", self, src_od); + info!("[{}] will retry with op data {}", self, src_od.info()); return Some(src_od); } else { // Our target is the _destination_ chain, the data in `src_od` contains @@ -606,7 +608,7 @@ impl RelayPath { error!( "[{}] failed to schedule newly-generated operational data from \ initial data: {} with error {}, discarding this op. data", - self, initial_odata, e + self, op_info, e ); return None; } @@ -614,9 +616,9 @@ impl RelayPath { } if let Some(dst_od) = dst_opt { - if dst_od.target == initial_odata.target { + if dst_od.target == op_info.target() { // Our target is the _destination_ chain, retry these messages - info!("[{}] will retry with op data {}", self, dst_od); + info!("[{}] will retry with op data {}", self, dst_od.info()); return Some(dst_od); } else { // Our target is the _source_ chain, but `dst_od` has new messages @@ -629,7 +631,7 @@ impl RelayPath { } } else { // There is no message intended for the destination chain - if initial_odata.target == OperationalDataTarget::Destination { + if op_info.target() == OperationalDataTarget::Destination { info!("[{}] exhausted all events from this operational data", self); return None; } @@ -1368,7 +1370,11 @@ impl RelayPath { } else if let Some(new_msg) = self.build_timeout_from_send_packet_event(e, &dst_status)? { - debug!("[{}] found a timed-out msg in the op data {}", self, odata); + debug!( + "[{}] found a timed-out msg in the op data {}", + self, + odata.info(), + ); timed_out .entry(odata_pos) .or_insert_with(|| { @@ -1433,7 +1439,7 @@ impl RelayPath { /// If the relaying path has non-zero packet delays, this method also updates the client on the /// target chain with the appropriate headers. fn schedule_operational_data(&self, mut od: OperationalData) -> Result<(), LinkError> { - let span = span!(Level::INFO, "schedule", id = %od); + let span = span!(Level::INFO, "schedule", id = %od.info().tracking_id()); let _enter = span.enter(); if od.batch.is_empty() { From 6df6b63139bc45f0f672d4ecd741fc8041ba84de Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 23 Dec 2021 15:07:51 +0200 Subject: [PATCH 19/29] Deabbreviate an info level log message --- relayer/src/link/operational_data.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index fe5908a55e..d8cc888e18 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -175,7 +175,7 @@ impl OperationalData { let tm = TrackedMsgs::new(msgs, &self.tracking_id); info!( - "[{}] assembled batch of {} msgs", + "[{}] assembled batch of {} message(s)", relay_path, tm.messages().len() ); From 85c60fa8f1b37e0f823aaa3cf03c79bafd8ef994 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 23 Dec 2021 15:23:02 +0200 Subject: [PATCH 20/29] Improve logging of operational data Use the `odata` key in tracing. --- relayer/src/link/operational_data.rs | 4 ---- relayer/src/link/relay_path.rs | 10 ++++------ 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index d8cc888e18..356fc691bb 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -194,10 +194,6 @@ pub struct OperationalInfo<'a> { } impl<'a> OperationalInfo<'a> { - pub fn tracking_id(&self) -> &str { - &self.tracking_id - } - pub fn target(&self) -> OperationalDataTarget { self.target } diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 04cdc4fcb3..043a9d566b 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -511,8 +511,7 @@ impl RelayPath { initial_od: OperationalData, ) -> Result { // We will operate on potentially different operational data if the initial one fails. - let span = span!(Level::INFO, "relay", id = %initial_od.info().tracking_id()); - let _enter = span.enter(); + let _span = span!(Level::INFO, "relay", odata = %initial_od.info()).entered(); let mut odata = initial_od; @@ -599,7 +598,7 @@ impl RelayPath { if let Some(src_od) = src_opt { if src_od.target == op_info.target() { // Our target is the _source_ chain, retry these messages - info!("[{}] will retry with op data {}", self, src_od.info()); + info!(odata = %src_od.info(), "[{}] will retry", self); return Some(src_od); } else { // Our target is the _destination_ chain, the data in `src_od` contains @@ -618,7 +617,7 @@ impl RelayPath { if let Some(dst_od) = dst_opt { if dst_od.target == op_info.target() { // Our target is the _destination_ chain, retry these messages - info!("[{}] will retry with op data {}", self, dst_od.info()); + info!(odata = %dst_od.info(), "[{}] will retry", self); return Some(dst_od); } else { // Our target is the _source_ chain, but `dst_od` has new messages @@ -1439,8 +1438,7 @@ impl RelayPath { /// If the relaying path has non-zero packet delays, this method also updates the client on the /// target chain with the appropriate headers. fn schedule_operational_data(&self, mut od: OperationalData) -> Result<(), LinkError> { - let span = span!(Level::INFO, "schedule", id = %od.info().tracking_id()); - let _enter = span.enter(); + let _span = span!(Level::INFO, "schedule", odata = %od.info()).entered(); if od.batch.is_empty() { info!( From d84bccda05839cb1fe0c500c0314b925075f5eaf Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Wed, 5 Jan 2022 13:39:51 +0200 Subject: [PATCH 21/29] Remove verbose wording on TrackedMsgs IDs The operation name is usually enough. --- relayer/src/channel.rs | 12 ++++++------ relayer/src/connection.rs | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/relayer/src/channel.rs b/relayer/src/channel.rs index 6d9884f3f3..a29052a28c 100644 --- a/relayer/src/channel.rs +++ b/relayer/src/channel.rs @@ -740,7 +740,7 @@ impl Channel { pub fn build_chan_open_init_and_send(&self) -> Result { let dst_msgs = self.build_chan_open_init()?; - let tm = TrackedMsgs::new(dst_msgs, "create channel with ChannelOpenInit"); + let tm = TrackedMsgs::new(dst_msgs, "ChannelOpenInit"); let events = self .dst_chain() @@ -906,7 +906,7 @@ impl Channel { pub fn build_chan_open_try_and_send(&self) -> Result { let dst_msgs = self.build_chan_open_try()?; - let tm = TrackedMsgs::new(dst_msgs, "create channel with ChannelOpenTry"); + let tm = TrackedMsgs::new(dst_msgs, "ChannelOpenTry"); let events = self .dst_chain() @@ -993,7 +993,7 @@ impl Channel { ) -> Result { let dst_msgs = channel.build_chan_open_ack()?; - let tm = TrackedMsgs::new(dst_msgs, "create channel with ChannelOpenAck"); + let tm = TrackedMsgs::new(dst_msgs, "ChannelOpenAck"); let events = channel .dst_chain() @@ -1091,7 +1091,7 @@ impl Channel { ) -> Result { let dst_msgs = channel.build_chan_open_confirm()?; - let tm = TrackedMsgs::new(dst_msgs, "create channel with ChannelOpenConfirm"); + let tm = TrackedMsgs::new(dst_msgs, "ChannelOpenConfirm"); let events = channel .dst_chain() .send_messages_and_wait_commit(tm) @@ -1155,7 +1155,7 @@ impl Channel { pub fn build_chan_close_init_and_send(&self) -> Result { let dst_msgs = self.build_chan_close_init()?; - let tm = TrackedMsgs::new(dst_msgs, "create channel with ChannelCloseInit"); + let tm = TrackedMsgs::new(dst_msgs, "ChannelCloseInit"); let events = self .dst_chain() @@ -1236,7 +1236,7 @@ impl Channel { pub fn build_chan_close_confirm_and_send(&self) -> Result { let dst_msgs = self.build_chan_close_confirm()?; - let tm = TrackedMsgs::new(dst_msgs, "create channel with ChannelCloseConfirm"); + let tm = TrackedMsgs::new(dst_msgs, "ChannelCloseConfirm"); let events = self .dst_chain() diff --git a/relayer/src/connection.rs b/relayer/src/connection.rs index 1ec94b8cef..030cf27748 100644 --- a/relayer/src/connection.rs +++ b/relayer/src/connection.rs @@ -804,7 +804,7 @@ impl Connection { pub fn build_conn_init_and_send(&self) -> Result { let dst_msgs = self.build_conn_init()?; - let tm = TrackedMsgs::new(dst_msgs, "create connection with ConnectionOpenInit"); + let tm = TrackedMsgs::new(dst_msgs, "ConnectionOpenInit"); let events = self .dst_chain() @@ -935,7 +935,7 @@ impl Connection { pub fn build_conn_try_and_send(&self) -> Result { let dst_msgs = self.build_conn_try()?; - let tm = TrackedMsgs::new(dst_msgs, "create connection with ConnectionOpenTry"); + let tm = TrackedMsgs::new(dst_msgs, "ConnectionOpenTry"); let events = self .dst_chain() @@ -1031,7 +1031,7 @@ impl Connection { pub fn build_conn_ack_and_send(&self) -> Result { let dst_msgs = self.build_conn_ack()?; - let tm = TrackedMsgs::new(dst_msgs, "create connection with ConnectionOpenAck"); + let tm = TrackedMsgs::new(dst_msgs, "ConnectionOpenAck"); let events = self .dst_chain() @@ -1110,7 +1110,7 @@ impl Connection { pub fn build_conn_confirm_and_send(&self) -> Result { let dst_msgs = self.build_conn_confirm()?; - let tm = TrackedMsgs::new(dst_msgs, "create connection with ConnectionOpenConfirm"); + let tm = TrackedMsgs::new(dst_msgs, "ConnectionOpenConfirm"); let events = self .dst_chain() From 05071619ce04cd977c2ad7dbf1ef1a2581dfb61d Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Mon, 10 Jan 2022 20:42:31 +0200 Subject: [PATCH 22/29] Fix typos in descriptions of RunError variants --- relayer/src/worker/error.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/relayer/src/worker/error.rs b/relayer/src/worker/error.rs index 1005bed0fb..7b7e0f61f7 100644 --- a/relayer/src/worker/error.rs +++ b/relayer/src/worker/error.rs @@ -10,19 +10,19 @@ define_error! { RunError { Ics02 [ Ics02Error ] - | _ | { "client errror" }, + | _ | { "client error" }, Connection [ ConnectionError ] - | _ | { "connection errror" }, + | _ | { "connection error" }, Channel [ ChannelError ] - | _ | { "channel errror" }, + | _ | { "channel error" }, Link [ LinkError ] - | _ | { "link errror" }, + | _ | { "link error" }, Retry { retries: retry::Error } From 914373c1560ce9c66cac6513c795306755b153e6 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Mon, 10 Jan 2022 21:02:12 +0200 Subject: [PATCH 23/29] Use a tracing span for task log messages The task lifetime is better tracked with a tracing span, which also reduces code repetition in tracking macros by putting in the task name once. --- relayer/src/util/task.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/relayer/src/util/task.rs b/relayer/src/util/task.rs index 273d35e7f3..9938081210 100644 --- a/relayer/src/util/task.rs +++ b/relayer/src/util/task.rs @@ -4,7 +4,7 @@ use core::time::Duration; use crossbeam_channel::{bounded, Sender}; use std::sync::{Arc, RwLock}; use std::thread; -use tracing::{error, info, warn}; +use tracing::{error, info, span, warn}; use crate::util::lock::LockExt; @@ -90,14 +90,19 @@ pub fn spawn_background_task( interval_pause: Option, mut step_runner: impl FnMut() -> Result> + Send + Sync + 'static, ) -> TaskHandle { - info!("spawning new background task {}", task_name); + let span = span!(tracing::Level::ERROR, "task", name = %task_name); + let _entered = span.enter(); + + info!("spawning"); let stopped = Arc::new(RwLock::new(false)); let write_stopped = stopped.clone(); let (shutdown_sender, receiver) = bounded(1); + let thread_span = span.clone(); let join_handle = thread::spawn(move || { + let _entered = thread_span.enter(); loop { match receiver.try_recv() { Ok(()) => { @@ -106,17 +111,14 @@ pub fn spawn_background_task( _ => match step_runner() { Ok(Next::Continue) => {} Ok(Next::Abort) => { - info!("task is aborting: {}", task_name); + info!("aborting"); break; } Err(TaskError::Ignore(e)) => { - warn!("task {} encountered ignorable error: {}", task_name, e); + warn!("encountered ignorable error: {}", e); } Err(TaskError::Fatal(e)) => { - error!( - "aborting task {} after encountering fatal error: {}", - task_name, e - ); + error!("aborting after encountering fatal error: {}", e); break; } }, @@ -127,7 +129,7 @@ pub fn spawn_background_task( } *write_stopped.acquire_write() = true; - info!("task {} has terminated", task_name); + info!("terminated"); }); TaskHandle { From d1ae093ea2d713df29f90f52785b5b49be5e8f1f Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Tue, 11 Jan 2022 15:09:47 +0200 Subject: [PATCH 24/29] Rework tracing spans for background tasks spawn_background_task receives a Span constructed by the caller. This allows embedding contextual information for the task, which is used to reduce repetition in logging macros for the workers. --- relayer/src/supervisor.rs | 8 +-- relayer/src/util/task.rs | 12 ++-- relayer/src/worker/channel.rs | 14 ++-- relayer/src/worker/client.rs | 51 ++++++++------ relayer/src/worker/connection.rs | 15 ++-- relayer/src/worker/packet.rs | 113 +++++++++++++++++-------------- 6 files changed, 109 insertions(+), 104 deletions(-) diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index ff492ad791..2a9a9cf171 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -7,7 +7,7 @@ use std::sync::RwLock; use crossbeam_channel::{unbounded, Receiver, Sender}; use itertools::Itertools; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, error_span, info, trace, warn}; use ibc::{ core::ics24_host::identifier::{ChainId, ChannelId, PortId}, @@ -166,7 +166,7 @@ fn spawn_batch_worker( subscriptions: Arc>>, ) -> TaskHandle { spawn_background_task( - "supervisor_batch".to_string(), + error_span!("supervisor_batch"), Some(Duration::from_millis(500)), move || -> Result> { if let Some((chain, batch)) = try_recv_multiple(&subscriptions.acquire_read()) { @@ -194,7 +194,7 @@ pub fn spawn_cmd_worker( cmd_rx: Receiver, ) -> TaskHandle { spawn_background_task( - "supervisor_cmd".to_string(), + error_span!("supervisor_cmd"), Some(Duration::from_millis(500)), move || { if let Ok(cmd) = cmd_rx.try_recv() { @@ -237,7 +237,7 @@ pub fn spawn_rest_worker( rest_rx: rest::Receiver, ) -> TaskHandle { spawn_background_task( - "supervisor_rest".to_string(), + error_span!("supervisor_rest"), Some(Duration::from_millis(500)), move || -> Result> { handle_rest_requests( diff --git a/relayer/src/util/task.rs b/relayer/src/util/task.rs index 9938081210..919e4e3212 100644 --- a/relayer/src/util/task.rs +++ b/relayer/src/util/task.rs @@ -4,7 +4,7 @@ use core::time::Duration; use crossbeam_channel::{bounded, Sender}; use std::sync::{Arc, RwLock}; use std::thread; -use tracing::{error, info, span, warn}; +use tracing::{error, info, warn}; use crate::util::lock::LockExt; @@ -86,23 +86,19 @@ pub enum Next { [`TaskHandle`]. */ pub fn spawn_background_task( - task_name: String, + span: tracing::Span, interval_pause: Option, mut step_runner: impl FnMut() -> Result> + Send + Sync + 'static, ) -> TaskHandle { - let span = span!(tracing::Level::ERROR, "task", name = %task_name); - let _entered = span.enter(); - - info!("spawning"); + info!(parent: &span, "spawning"); let stopped = Arc::new(RwLock::new(false)); let write_stopped = stopped.clone(); let (shutdown_sender, receiver) = bounded(1); - let thread_span = span.clone(); let join_handle = thread::spawn(move || { - let _entered = thread_span.enter(); + let _entered = span.enter(); loop { match receiver.try_recv() { Ok(()) => { diff --git a/relayer/src/worker/channel.rs b/relayer/src/worker/channel.rs index 2057db2992..75abb14ade 100644 --- a/relayer/src/worker/channel.rs +++ b/relayer/src/worker/channel.rs @@ -1,6 +1,6 @@ use core::time::Duration; use crossbeam_channel::Receiver; -use tracing::debug; +use tracing::{debug, error_span}; use crate::channel::Channel as RelayChannel; use crate::util::task::{spawn_background_task, Next, TaskError, TaskHandle}; @@ -20,7 +20,7 @@ pub fn spawn_channel_worker( cmd_rx: Receiver, ) -> TaskHandle { spawn_background_task( - format!("ChannelWorker({})", channel.short_name()), + error_span!("ChannelWorker", channel = %channel.short_name()), Some(Duration::from_millis(200)), move || { if let Ok(cmd) = cmd_rx.try_recv() { @@ -29,10 +29,7 @@ pub fn spawn_channel_worker( // there can be up to two event for this channel, e.g. init and try. // process the last event, the one with highest "rank". let last_event = batch.events.last(); - debug!( - channel = %channel.short_name(), - "channel worker starts processing {:#?}", last_event - ); + debug!("starts processing {:#?}", last_event); if let Some(event) = last_event { let mut handshake_channel = RelayChannel::restore_from_event( @@ -54,10 +51,7 @@ pub fn spawn_channel_worker( height: current_height, new_block: _, } => { - debug!( - channel = %channel.short_name(), - "Channel worker starts processing block event at {:#?}", current_height - ); + debug!("starts processing block event at {:#?}", current_height); let height = current_height .decrement() diff --git a/relayer/src/worker/client.rs b/relayer/src/worker/client.rs index 3c54135cf7..2fccbfdf53 100644 --- a/relayer/src/worker/client.rs +++ b/relayer/src/worker/client.rs @@ -1,7 +1,7 @@ use core::convert::Infallible; use core::time::Duration; use crossbeam_channel::Receiver; -use tracing::{debug, trace, warn}; +use tracing::{debug, span, trace, warn}; use ibc::events::IbcEvent; @@ -19,13 +19,19 @@ pub fn spawn_refresh_client( ) -> Option { if client.is_expired_or_frozen() { warn!( - "skipping refresh client task on frozen client: {}", - client.id() + client = %client.id, + "skipping refresh client task on frozen client", ); None } else { Some(spawn_background_task( - format!("RefreshClientWorker({})", client), + span!( + tracing::Level::ERROR, + "RefreshClientWorker", + client = %client.id, + src_chain = %client.src_chain.id(), + dst_chain = %client.dst_chain.id(), + ), Some(Duration::from_secs(1)), move || { let res = client.refresh().map_err(|e| { @@ -52,40 +58,47 @@ pub fn detect_misbehavior_task( ) -> Option { if client.is_expired_or_frozen() { warn!( - "skipping detect misbehavior task on frozen client: {}", - client.id() + client = %client.id(), + "skipping detect misbehavior task on frozen client", ); return None; } { - debug!("[{}] doing first misbehavior check", client); + let _span = span!( + tracing::Level::DEBUG, + "DetectMisbehaviorFirstCheck", + client = %client.id, + src_chain = %client.src_chain.id(), + dst_chain = %client.dst_chain.id(), + ) + .entered(); + debug!("doing first check"); let misbehavior_result = client.detect_misbehaviour_and_submit_evidence(None); - debug!( - "[{}] detect misbehavior result: {:?}", - client, misbehavior_result - ); + trace!("detect misbehavior result: {:?}", misbehavior_result); } let handle = spawn_background_task( - format!("DetectMisbehaviorWorker({})", client), + span!( + tracing::Level::ERROR, + "DetectMisbehaviorWorker", + client = %client.id, + src_chain = %client.src_chain.id(), + dst_chain = %client.dst_chain.id(), + ), Some(Duration::from_millis(600)), move || -> Result> { if let Ok(cmd) = receiver.try_recv() { match cmd { WorkerCmd::IbcEvents { batch } => { - trace!("[{}] worker received batch: {:?}", client, batch); + trace!("received batch: {:?}", batch); for event in batch.events { if let IbcEvent::UpdateClient(update) = event { - debug!("[{}] checking misbehavior for updated client", client); + debug!("checking misbehavior for updated client"); let misbehavior_result = client.detect_misbehaviour_and_submit_evidence(Some(update)); - trace!( - "[{}] detect misbehavior result: {:?}", - client, - misbehavior_result - ); + trace!("detect misbehavior result: {:?}", misbehavior_result); match misbehavior_result { MisbehaviourResults::ValidClient => {} diff --git a/relayer/src/worker/connection.rs b/relayer/src/worker/connection.rs index 39f892a00e..832a6b166d 100644 --- a/relayer/src/worker/connection.rs +++ b/relayer/src/worker/connection.rs @@ -1,6 +1,6 @@ use core::time::Duration; use crossbeam_channel::Receiver; -use tracing::debug; +use tracing::{debug, error_span}; use crate::connection::Connection as RelayConnection; use crate::util::task::{spawn_background_task, Next, TaskError, TaskHandle}; @@ -20,7 +20,7 @@ pub fn spawn_connection_worker( cmd_rx: Receiver, ) -> TaskHandle { spawn_background_task( - format!("ConnectionWorker({})", connection.short_name()), + error_span!("ConnectionWorker", connection = %connection.short_name()), Some(Duration::from_millis(200)), move || { if let Ok(cmd) = cmd_rx.try_recv() { @@ -30,10 +30,7 @@ pub fn spawn_connection_worker( // process the last event, the one with highest "rank". let last_event = batch.events.last(); - debug!( - connection = %connection.short_name(), - "connection worker starts processing {:#?}", last_event - ); + debug!("starts processing {:#?}", last_event); if let Some(event) = last_event { let mut handshake_connection = RelayConnection::restore_from_event( @@ -56,11 +53,7 @@ pub fn spawn_connection_worker( height: current_height, new_block: _, } => { - debug!( - connection = %connection.short_name(), - "connection worker starts processing block event at {}", - current_height - ); + debug!("starts processing block event at {}", current_height); let height = current_height .decrement() diff --git a/relayer/src/worker/packet.rs b/relayer/src/worker/packet.rs index 5a8d539167..5b4aa010e1 100644 --- a/relayer/src/worker/packet.rs +++ b/relayer/src/worker/packet.rs @@ -2,7 +2,7 @@ use core::time::Duration; use crossbeam_channel::Receiver; use ibc::Height; use std::sync::{Arc, Mutex}; -use tracing::{error, trace}; +use tracing::{error, error_span, trace}; use crate::chain::handle::ChainHandle; use crate::foreign_client::HasExpiredOrFrozenError; @@ -47,31 +47,37 @@ pub fn spawn_packet_worker( // Mutex is used to prevent race condition between the packet workers link: Arc>>, ) -> TaskHandle { - spawn_background_task( - format!("PacketWorker({})", link.lock().unwrap().a_to_b), - Some(Duration::from_millis(1000)), - move || { - let relay_path = &link.lock().unwrap().a_to_b; + let span = { + let relay_path = &link.lock().unwrap().a_to_b; + error_span!( + "PacketWorker", + src_chain = %relay_path.src_chain().id(), + src_port = %relay_path.src_port_id(), + src_channel = %relay_path.src_channel_id(), + dst_chain = %relay_path.dst_chain().id(), + ) + }; + spawn_background_task(span, Some(Duration::from_millis(1000)), move || { + let relay_path = &link.lock().unwrap().a_to_b; - relay_path - .refresh_schedule() - .map_err(handle_link_error_in_task)?; + relay_path + .refresh_schedule() + .map_err(handle_link_error_in_task)?; - relay_path - .execute_schedule() - .map_err(handle_link_error_in_task)?; + relay_path + .execute_schedule() + .map_err(handle_link_error_in_task)?; - let summary = relay_path.process_pending_txs(); + let summary = relay_path.process_pending_txs(); - if !summary.is_empty() { - trace!("Packet worker produced relay summary: {:?}", summary); - } + if !summary.is_empty() { + trace!("Packet worker produced relay summary: {:?}", summary); + } - telemetry!(packet_metrics(&path, &summary)); + telemetry!(packet_metrics(&path, &summary)); - Ok(Next::Continue) - }, - ) + Ok(Next::Continue) + }) } pub fn spawn_packet_cmd_worker( @@ -83,28 +89,34 @@ pub fn spawn_packet_cmd_worker( path: Packet, ) -> TaskHandle { let mut is_first_run: bool = true; - spawn_background_task( - format!("PacketCmdWorker({})", link.lock().unwrap().a_to_b), - Some(Duration::from_millis(200)), - move || { - if let Ok(cmd) = cmd_rx.try_recv() { - retry_with_index(retry_strategy::worker_stubborn_strategy(), |index| { - handle_packet_cmd( - &mut is_first_run, - &link.lock().unwrap(), - clear_on_start, - clear_interval, - &path, - cmd.clone(), - index, - ) - }) - .map_err(|e| TaskError::Fatal(RunError::retry(e)))?; - } - - Ok(Next::Continue) - }, - ) + let span = { + let relay_path = &link.lock().unwrap().a_to_b; + error_span!( + "PacketCmdWorker", + src_chain = %relay_path.src_chain().id(), + src_port = %relay_path.src_port_id(), + src_channel = %relay_path.src_channel_id(), + dst_chain = %relay_path.dst_chain().id(), + ) + }; + spawn_background_task(span, Some(Duration::from_millis(200)), move || { + if let Ok(cmd) = cmd_rx.try_recv() { + retry_with_index(retry_strategy::worker_stubborn_strategy(), |index| { + handle_packet_cmd( + &mut is_first_run, + &link.lock().unwrap(), + clear_on_start, + clear_interval, + &path, + cmd.clone(), + index, + ) + }) + .map_err(|e| TaskError::Fatal(RunError::retry(e)))?; + } + + Ok(Next::Continue) + }) } /// Receives worker commands, which may be: @@ -124,7 +136,7 @@ fn handle_packet_cmd( cmd: WorkerCmd, index: u64, ) -> RetryResult<(), u64> { - trace!("[{}] handling packet worker command {:?}", link.a_to_b, cmd); + trace!("handling command {:?}", cmd); let result = match cmd { WorkerCmd::IbcEvents { batch } => link.a_to_b.update_schedule(batch), @@ -150,8 +162,8 @@ fn handle_packet_cmd( error!( path = %path.short_name(), retry_index = %index, - "[{}] worker will retry: handling command encountered error: {}", - link.a_to_b, e + "will retry: handling command encountered error: {}", + e ); return RetryResult::Retry(index); @@ -175,16 +187,13 @@ fn handle_packet_cmd( if let Err(e) = schedule_result { if e.is_expired_or_frozen_error() { - error!( - "[{}] worker aborting due to expired or frozen client", - link.a_to_b - ); + error!("aborting due to expired or frozen client"); return RetryResult::Err(index); } else { error!( retry_index = %index, - "[{}] worker will retry: schedule execution encountered error: {}", - link.a_to_b, e + "will retry: schedule execution encountered error: {}", + e, ); return RetryResult::Retry(index); } @@ -193,7 +202,7 @@ fn handle_packet_cmd( let summary = link.a_to_b.process_pending_txs(); if !summary.is_empty() { - trace!("Packet worker produced relay summary: {:?}", summary); + trace!("produced relay summary: {:?}", summary); } telemetry!(packet_metrics(path, &summary)); From e6b41f9c6ed337a64873dae9f93fd40491656abf Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Tue, 11 Jan 2022 17:34:20 +0200 Subject: [PATCH 25/29] Erase Display impl on RelayPath, use spans instead With spans injecting all of the information that was formatted using the RelayPath Display impl, this is redundant. --- relayer/src/link.rs | 19 +++ relayer/src/link/operational_data.rs | 10 +- relayer/src/link/relay_path.rs | 169 +++++++++------------------ 3 files changed, 78 insertions(+), 120 deletions(-) diff --git a/relayer/src/link.rs b/relayer/src/link.rs index 06c96b95de..5f0c394648 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -7,6 +7,7 @@ use ibc::{ events::IbcEvent, Height, }; +use tracing::error_span; use crate::chain::counterparty::check_channel_counterparty; use crate::chain::handle::ChainHandle; @@ -163,6 +164,15 @@ impl Link { /// Implements the `packet-recv` CLI pub fn build_and_send_recv_packet_messages(&mut self) -> Result, LinkError> { + let _span = error_span!( + "PacketRecvCmd", + src_chain = %self.a_to_b.src_chain().id(), + src_port = %self.a_to_b.src_port_id(), + src_channel = %self.a_to_b.src_channel_id(), + dst_chain = %self.a_to_b.dst_chain().id(), + ) + .entered(); + self.a_to_b.build_recv_packet_and_timeout_msgs(None)?; let mut results = vec![]; @@ -180,6 +190,15 @@ impl Link { /// Implements the `packet-ack` CLI pub fn build_and_send_ack_packet_messages(&mut self) -> Result, LinkError> { + let _span = error_span!( + "PacketAckCmd", + src_chain = %self.a_to_b.src_chain().id(), + src_port = %self.a_to_b.src_port_id(), + src_channel = %self.a_to_b.src_channel_id(), + dst_chain = %self.a_to_b.dst_chain().id(), + ) + .entered(); + self.a_to_b.build_packet_ack_msgs(None)?; let mut results = vec![]; diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index 356fc691bb..8682433c66 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -145,8 +145,8 @@ impl OperationalData { let update_height = self.proofs_height.increment(); debug!( - "[{}] prepending {} client update @ height {}", - relay_path, self.target, update_height + "prepending {} client update at height {}", + self.target, update_height ); // Fetch the client update message. Vector may be empty if the client already has the header @@ -174,11 +174,7 @@ impl OperationalData { let tm = TrackedMsgs::new(msgs, &self.tracking_id); - info!( - "[{}] assembled batch of {} message(s)", - relay_path, - tm.messages().len() - ); + info!("assembled batch of {} message(s)", tm.messages().len()); Ok(tm) } diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 043a9d566b..cb6df819dd 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1,6 +1,5 @@ use alloc::collections::BTreeMap as HashMap; use alloc::collections::VecDeque; -use core::fmt; use std::sync::{Arc, RwLock}; use std::thread; use std::time::Instant; @@ -301,8 +300,8 @@ impl RelayPath { match cleared { Ok(()) => return Ok(()), Err(e) => error!( - "[{}] failed to clear packets, retry {}/{}: {}", - self, i, MAX_RETRIES, e + "failed to clear packets, retry {}/{}: {}", + i, MAX_RETRIES, e ), } } @@ -335,7 +334,7 @@ impl RelayPath { self.relay_pending_packets(clear_height)?; - debug!(height = ?clear_height, "[{}] done scheduling", self); + debug!(height = ?clear_height, "done scheduling"); } Ok(()) @@ -406,7 +405,7 @@ impl RelayPath { ); for event in input { - trace!("[{}] {} => {}", self, self.src_chain().id(), event); + trace!("processing event: {}", event); let (dst_msg, src_msg) = match event { IbcEvent::CloseInitChannel(_) => { (Some(self.build_chan_close_confirm_from_event(event)?), None) @@ -429,7 +428,7 @@ impl RelayPath { } IbcEvent::SendPacket(ref send_packet_ev) => { if self.send_packet_event_handled(send_packet_ev)? { - debug!("[{}] {} already handled", self, send_packet_ev); + debug!("{} already handled", send_packet_ev); (None, None) } else { self.build_recv_or_timeout_from_send_packet_event( @@ -445,7 +444,7 @@ impl RelayPath { { (None, None) } else if self.write_ack_event_handled(write_ack_ev)? { - debug!("[{}] {} already handled", self, write_ack_ev); + debug!("{} already handled", write_ack_ev); (None, None) } else { (self.build_ack_from_recv_event(write_ack_ev)?, None) @@ -456,13 +455,7 @@ impl RelayPath { // Collect messages to be sent to the destination chain (e.g., RecvPacket) if let Some(msg) = dst_msg { - debug!( - "[{}] {} <= {} from {}", - self, - self.dst_chain().id(), - msg.type_url, - event - ); + debug!("{} from {}", msg.type_url, event); dst_od.batch.push(TransitMessage { event: event.clone(), msg, @@ -474,13 +467,7 @@ impl RelayPath { // For Ordered channels a single timeout event should be sent as this closes the channel. // Otherwise a multi message transaction will fail. if self.unordered_channel() || src_od.batch.is_empty() { - debug!( - "[{}] {} <= {} from {}", - self, - self.src_chain().id(), - msg.type_url, - event - ); + debug!("{} from {}", msg.type_url, event); src_od.batch.push(TransitMessage { event: event.clone(), msg, @@ -517,8 +504,7 @@ impl RelayPath { for i in 0..MAX_RETRIES { debug!( - "[{}] delayed by: {:?} [try {}/{}]", - self, + "delayed by: {:?} [try {}/{}]", odata.scheduled_time.elapsed(), i + 1, MAX_RETRIES @@ -528,20 +514,15 @@ impl RelayPath { match self.send_from_operational_data::(odata.clone()) { Ok(reply) => { // Done with this op. data - info!("[{}] success", self); + info!("success"); return Ok(reply); } Err(LinkError(error::LinkErrorDetail::Send(e), _)) => { // This error means we could retry - error!("[{}] error {}", self, e.event); + error!("error {}", e.event); if i + 1 == MAX_RETRIES { - error!( - "[{}] {}/{} retries exhausted. giving up", - self, - i + 1, - MAX_RETRIES - ) + error!("{}/{} retries exhausted. giving up", i + 1, MAX_RETRIES) } else { // If we haven't exhausted all retries, regenerate the op. data & retry match self.regenerate_operational_data(odata.clone()) { @@ -577,8 +558,7 @@ impl RelayPath { let op_info = initial_odata.info().into_owned(); warn!( - "[{}] failed. Regenerate operational data from {} events", - self, + "failed. Regenerate operational data from {} events", op_info.batch_len() ); @@ -587,9 +567,9 @@ impl RelayPath { Ok(new_operational_data) => new_operational_data, Err(e) => { error!( - "[{}] failed to regenerate operational data from initial data: {} \ + "failed to regenerate operational data from initial data: {} \ with error {}, discarding this op. data", - self, op_info, e + op_info, e ); return None; } // Cannot retry, contain the error by reporting a None @@ -598,16 +578,16 @@ impl RelayPath { if let Some(src_od) = src_opt { if src_od.target == op_info.target() { // Our target is the _source_ chain, retry these messages - info!(odata = %src_od.info(), "[{}] will retry", self); + info!(odata = %src_od.info(), "will retry"); return Some(src_od); } else { // Our target is the _destination_ chain, the data in `src_od` contains // potentially new timeout messages that have to be handled separately. if let Err(e) = self.schedule_operational_data(src_od) { error!( - "[{}] failed to schedule newly-generated operational data from \ - initial data: {} with error {}, discarding this op. data", - self, op_info, e + "failed to schedule newly-generated operational data from \ + initial data: {} with error {}, discarding this op. data", + op_info, e ); return None; } @@ -617,21 +597,20 @@ impl RelayPath { if let Some(dst_od) = dst_opt { if dst_od.target == op_info.target() { // Our target is the _destination_ chain, retry these messages - info!(odata = %dst_od.info(), "[{}] will retry", self); + info!(odata = %dst_od.info(), "will retry"); return Some(dst_od); } else { // Our target is the _source_ chain, but `dst_od` has new messages // intended for the destination chain, this should never be the case error!( - "[{}] generated new messages for destination chain while handling \ + "generated new messages for destination chain while handling \ failed events targeting the source chain!", - self ); } } else { // There is no message intended for the destination chain if op_info.target() == OperationalDataTarget::Destination { - info!("[{}] exhausted all events from this operational data", self); + info!("exhausted all events from this operational data"); return None; } } @@ -653,7 +632,7 @@ impl RelayPath { odata: OperationalData, ) -> Result { if odata.batch.is_empty() { - error!("[{}] ignoring empty operational data!", self); + error!("ignoring empty operational data!"); return Ok(S::Reply::empty()); } @@ -764,9 +743,7 @@ impl RelayPath { for i in 0..MAX_RETRIES { let dst_update = self.build_update_client_on_dst(src_chain_height)?; info!( - "[{}] sending updateClient to client hosted on dest. chain {} for height {} [try {}/{}]", - self, - self.dst_chain().id(), + "sending updateClient to client hosted on destination chain for height {} [try {}/{}]", src_chain_height, i + 1, MAX_RETRIES, ); @@ -777,7 +754,7 @@ impl RelayPath { .dst_chain() .send_messages_and_wait_commit(tm) .map_err(LinkError::relayer)?; - info!("[{}] result {}\n", self, PrettyEvents(&dst_tx_events)); + info!("result: {}", PrettyEvents(&dst_tx_events)); dst_err_ev = dst_tx_events .into_iter() @@ -812,9 +789,7 @@ impl RelayPath { for _ in 0..MAX_RETRIES { let src_update = self.build_update_client_on_src(dst_chain_height)?; info!( - "[{}] sending updateClient to client hosted on src. chain {} for height {}", - self, - self.src_chain().id(), + "sending updateClient to client hosted on source chain for height {}", dst_chain_height, ); @@ -824,7 +799,7 @@ impl RelayPath { .src_chain() .send_messages_and_wait_commit(tm) .map_err(LinkError::relayer)?; - info!("[{}] result {}\n", self, PrettyEvents(&src_tx_events)); + info!("result: {}", PrettyEvents(&src_tx_events)); src_err_ev = src_tx_events .into_iter() @@ -870,16 +845,14 @@ impl RelayPath { } debug!( - "[{}] packet seq. that still have commitments on {}: {} (first 10 shown here; total={})", - self, + "packet seq. that still have commitments on {}: {} (first 10 shown here; total={})", self.src_chain().id(), commit_sequences.iter().take(10).join(", "), commit_sequences.len() ); debug!( - "[{}] recv packets to send out to {} of the ones with commitments on source {}: {} (first 10 shown here; total={})", - self, + "recv packets to send out to {} of the ones with commitments on {}: {} (first 10 shown here; total={})", self.dst_chain().id(), self.src_chain().id(), sequences.iter().take(10).join(", "), sequences.len() @@ -918,9 +891,9 @@ impl RelayPath { Default::default() }; - trace!("[{}] start_block_events {:?}", self, start_block_events); - trace!("[{}] tx_events {:?}", self, tx_events); - trace!("[{}] end_block_events {:?}", self, end_block_events); + trace!("start_block_events {:?}", start_block_events); + trace!("tx_events {:?}", tx_events); + trace!("end_block_events {:?}", end_block_events); // events must be ordered in the following fashion - // start-block events followed by tx-events followed by end-block events @@ -929,10 +902,7 @@ impl RelayPath { events_result.extend(end_block_events); if events_result.is_empty() { - info!( - "[{}] found zero unprocessed SendPacket events on source chain, nothing to do", - self - ); + info!("found zero unprocessed SendPacket events on source chain, nothing to do"); } else { let mut packet_sequences = vec![]; for event in events_result.iter() { @@ -948,8 +918,7 @@ impl RelayPath { } } info!( - "[{}] found unprocessed SendPacket events for {:?} (first 10 shown here; total={})", - self, + "found unprocessed SendPacket events for {:?} (first 10 shown here; total={})", packet_sequences, events_result.len() ); @@ -988,8 +957,7 @@ impl RelayPath { } debug!( - "[{}] packets that have acknowledgments on {}: [{:?}..{:?}] (total={})", - self, + "packets that have acknowledgments on {}: [{:?}..{:?}] (total={})", self.src_chain().id(), acks_on_src.first(), acks_on_src.last(), @@ -997,8 +965,7 @@ impl RelayPath { ); debug!( - "[{}] ack packets to send out to {} of the ones with acknowledgments on {}: {} (first 10 shown here; total={})", - self, + "ack packets to send out to {} of the ones with acknowledgments on {}: {} (first 10 shown here; total={})", self.dst_chain().id(), self.src_chain().id(), sequences.iter().take(10).join(", "), sequences.len() @@ -1019,8 +986,7 @@ impl RelayPath { if events_result.is_empty() { info!( - "[{}] found zero unprocessed WriteAcknowledgement events on source chain, nothing to do", - self + "found zero unprocessed WriteAcknowledgement events on source chain, nothing to do", ); } else { let mut packet_sequences = vec![]; @@ -1038,7 +1004,11 @@ impl RelayPath { } } } - info!("[{}] found unprocessed WriteAcknowledgement events for {:?} (first 10 shown here; total={})", self, packet_sequences, events_result.len()); + info!( + "found unprocessed WriteAcknowledgement events for {:?} (first 10 shown here; total={})", + packet_sequences, + events_result.len(), + ); } Ok((events_result.into(), query_height)) @@ -1103,8 +1073,7 @@ impl RelayPath { let msg = MsgRecvPacket::new(packet.clone(), proofs.clone(), self.dst_signer()?); trace!( - "[{}] built recv_packet msg {}, proofs at height {}", - self, + "built recv_packet msg {}, proofs at height {}", msg.packet, proofs.height() ); @@ -1137,8 +1106,7 @@ impl RelayPath { ); trace!( - "[{}] built acknowledgment msg {}, proofs at height {}", - self, + "built acknowledgment msg {}, proofs at height {}", msg.packet, proofs.height() ); @@ -1185,8 +1153,7 @@ impl RelayPath { ); trace!( - "[{}] built timeout msg {}, proofs at height {}", - self, + "built timeout msg {}, proofs at height {}", msg.packet, proofs.height() ); @@ -1218,8 +1185,7 @@ impl RelayPath { ); trace!( - "[{}] built timeout on close msg {}, proofs at height {}", - self, + "built timeout on close msg {}, proofs at height {}", msg.packet, proofs.height() ); @@ -1365,15 +1331,11 @@ impl RelayPath { IbcEvent::SendPacket(e) => { // Catch any SendPacket event that timed-out if self.send_packet_event_handled(e)? { - debug!("[{}] already handled send packet {}", self, e); + debug!("already handled send packet {}", e); } else if let Some(new_msg) = self.build_timeout_from_send_packet_event(e, &dst_status)? { - debug!( - "[{}] found a timed-out msg in the op data {}", - self, - odata.info(), - ); + debug!("found a timed-out msg in the op data {}", odata.info(),); timed_out .entry(odata_pos) .or_insert_with(|| { @@ -1394,7 +1356,7 @@ impl RelayPath { } IbcEvent::WriteAcknowledgement(e) => { if self.write_ack_event_handled(e)? { - debug!("[{}] already handled {} write ack ", self, e); + debug!("already handled {} write ack ", e); } else { retain_batch.push(gm.clone()); } @@ -1423,8 +1385,7 @@ impl RelayPath { // Schedule new operational data targeting the source chain for (_, new_od) in timed_out.into_iter() { info!( - "[{}] re-scheduling from new timed-out batch of size {}", - self, + "re-scheduling from new timed-out batch of size {}", new_od.batch.len() ); @@ -1442,15 +1403,15 @@ impl RelayPath { if od.batch.is_empty() { info!( - "[{}] ignoring operational data for {} because it has no messages", - self, od.target + "ignoring operational data for {} because it has no messages", + od.target ); return Ok(()); } // Update clients ahead of scheduling the operational data, if the delays are non-zero. if !self.zero_delay() { - debug!("[{}] connection delay is non-zero: updating client", self); + debug!("connection delay is non-zero: updating client"); let target_height = od.proofs_height.increment(); match od.target { OperationalDataTarget::Source => { @@ -1461,10 +1422,7 @@ impl RelayPath { } }; } else { - debug!( - "[{}] connection delay is zero: client update message will be prepended later", - self - ); + debug!("connection delay is zero: client update message will be prepended later"); } od.scheduled_time = Instant::now(); @@ -1539,15 +1497,13 @@ impl RelayPath { match delay_left { None => info!( - "[{}] ready to fetch a scheduled op. data with batch of size {} targeting {}", - self, + "ready to fetch a scheduled op. data with batch of size {} targeting {}", odata.batch.len(), odata.target, ), Some(delay_left) => { info!( - "[{}] waiting ({:?} left) for a scheduled op. data with batch of size {} targeting {}", - self, + "waiting ({:?} left) for a scheduled op. data with batch of size {} targeting {}", delay_left, odata.batch.len(), odata.target, @@ -1580,16 +1536,3 @@ impl RelayPath { ) } } - -impl fmt::Display for RelayPath { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "{}:{}/{} -> {}", - self.src_chain().id(), - self.src_port_id(), - self.src_channel_id(), - self.dst_chain().id() - ) - } -} From 73d135ed412aba0e3a89bbc218c703be75631bc1 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Wed, 12 Jan 2022 17:38:22 +0200 Subject: [PATCH 26/29] Shorten or remove span IDs for supervisor tasks For the batch worker tasks, we should have a span with the task name and other details, so the top-level span is not needed. For other supervisor tasks, the span name is shortened. --- relayer/src/supervisor.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index c4002d6203..1dd0b8d025 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -166,7 +166,7 @@ fn spawn_batch_worker( subscriptions: Arc>>, ) -> TaskHandle { spawn_background_task( - error_span!("supervisor_batch"), + tracing::Span::none(), Some(Duration::from_millis(500)), move || -> Result> { if let Some((chain, batch)) = try_recv_multiple(&subscriptions.acquire_read()) { @@ -194,7 +194,7 @@ pub fn spawn_cmd_worker( cmd_rx: Receiver, ) -> TaskHandle { spawn_background_task( - error_span!("supervisor_cmd"), + error_span!("cmd"), Some(Duration::from_millis(500)), move || { if let Ok(cmd) = cmd_rx.try_recv() { @@ -237,7 +237,7 @@ pub fn spawn_rest_worker( rest_rx: rest::Receiver, ) -> TaskHandle { spawn_background_task( - error_span!("supervisor_rest"), + error_span!("rest"), Some(Duration::from_millis(500)), move || -> Result> { handle_rest_requests( From 6cb9023249fdee04d20c2975dc2f9e411cfaa3f3 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Wed, 12 Jan 2022 17:42:17 +0200 Subject: [PATCH 27/29] Erase [rest] prefixes from log messages We have the span now, these are redundant and bad style. --- relayer/src/rest.rs | 24 ++++++++++++------------ relayer/src/supervisor.rs | 6 +++--- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/relayer/src/rest.rs b/relayer/src/rest.rs index 8ad9e17486..4f6cb31fce 100644 --- a/relayer/src/rest.rs +++ b/relayer/src/rest.rs @@ -44,48 +44,48 @@ pub fn process_incoming_requests(config: &Config, channel: &Receiver) -> Option< match channel.try_recv() { Ok(request) => match request { Request::Version { reply_to } => { - trace!("[rest] Version"); + trace!("Version"); let v = VersionInfo { name: NAME.to_string(), version: VER.to_string(), }; - reply_to.send(Ok(v)).unwrap_or_else(|e| { - error!("[rest/supervisor] error replying to a REST request {}", e) - }); + reply_to + .send(Ok(v)) + .unwrap_or_else(|e| error!("error replying to a REST request {}", e)); } Request::GetChains { reply_to } => { - trace!("[rest] GetChains"); + trace!("GetChains"); reply_to .send(Ok(config.chains.iter().map(|c| c.id.clone()).collect())) - .unwrap_or_else(|e| error!("[rest] error replying to a REST request {}", e)); + .unwrap_or_else(|e| error!("error replying to a REST request {}", e)); } Request::GetChain { chain_id, reply_to } => { - trace!("[rest] GetChain {}", chain_id); + trace!("GetChain {}", chain_id); let result = config .find_chain(&chain_id) .cloned() .ok_or(RestApiError::ChainConfigNotFound(chain_id)); - reply_to.send(result).unwrap_or_else(|e| { - error!("[rest/supervisor] error replying to a REST request {}", e) - }); + reply_to + .send(result) + .unwrap_or_else(|e| error!("error replying to a REST request {}", e)); } Request::State { reply_to } => { - trace!("[rest] State"); + trace!("State"); return Some(Command::DumpState(reply_to)); } }, Err(e) => { if !matches!(e, TryRecvError::Empty) { - error!("[rest] error while waiting for requests: {}", e); + error!("error while waiting for requests: {}", e); } } } diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 1dd0b8d025..3189541162 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -560,9 +560,9 @@ fn handle_rest_cmd( match m { rest::Command::DumpState(reply) => { let state = state(registry, workers); - reply.send(Ok(state)).unwrap_or_else(|e| { - error!("[rest/supervisor] error replying to a REST request {}", e) - }); + reply + .send(Ok(state)) + .unwrap_or_else(|e| error!("error replying to a REST request {}", e)); } } } From 7ae88800c3dc0d0fb7dd52d4873afcabc59d30b8 Mon Sep 17 00:00:00 2001 From: Adi Seredinschi Date: Mon, 17 Jan 2022 14:50:19 +0100 Subject: [PATCH 28/29] Simplification & consolidation w/ Mikhail --- relayer/src/chain/cosmos.rs | 39 ++++++++++++++------------------ relayer/src/worker/channel.rs | 2 +- relayer/src/worker/client.rs | 2 +- relayer/src/worker/connection.rs | 2 +- relayer/src/worker/packet.rs | 4 ++-- 5 files changed, 22 insertions(+), 27 deletions(-) diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index b1b50ce512..362eb65fd7 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -301,8 +301,7 @@ impl CosmosSdkChain { account_seq: u64, ) -> Result { debug!( - "[{}] send_tx: sending {} messages using account sequence {}", - self.id(), + "sending {} messages using account sequence {}", proto_msgs.len(), account_seq, ); @@ -311,8 +310,7 @@ impl CosmosSdkChain { let max_fee = self.max_fee(); debug!( - "[{}] send_tx: max fee, for use in tx simulation: {}", - self.id(), + "max fee, for use in tx simulation: {}", PrettyFee(&max_fee) ); @@ -343,8 +341,7 @@ impl CosmosSdkChain { let adjusted_fee = self.fee_with_gas(estimated_gas); debug!( - "[{}] send_tx: using {} gas, fee {}", - self.id(), + "using {} gas, fee {}", estimated_gas, PrettyFee(&adjusted_fee) ); @@ -402,7 +399,7 @@ impl CosmosSdkChain { // and refresh the s.n., to allow proceeding to the other transactions. A separate // retry at the worker-level will handle retrying. Err(e) if mismatching_account_sequence_number(&e) => { - warn!("send_tx failed at estimate_gas step mismatching account sequence: dropping the tx & refreshing account sequence number"); + warn!("failed at estimate_gas step mismatching account sequence: dropping the tx & refreshing account sequence number"); self.refresh_account()?; // Note: propagating error here can lead to bug & dropped packets: // https://github.com/informalsystems/ibc-rs/issues/1153 @@ -414,7 +411,7 @@ impl CosmosSdkChain { Ok(response) if response.code == Code::Err(INCORRECT_ACCOUNT_SEQUENCE_ERR) => { if retry_counter < retry_strategy::MAX_ACCOUNT_SEQUENCE_RETRY { let retry_counter = retry_counter + 1; - warn!("send_tx failed at broadcast step with incorrect account sequence. retrying ({}/{})", + warn!("failed at broadcast step with incorrect account sequence. retrying ({}/{})", retry_counter, retry_strategy::MAX_ACCOUNT_SEQUENCE_RETRY); // Backoff & re-fetch the account s.n. let backoff = (retry_counter as u64) @@ -429,7 +426,7 @@ impl CosmosSdkChain { // we ignore the error and return the original response to downstream. // We do not return an error here, because the current convention // let the caller handle error responses separately. - error!("failed to send_tx due to account sequence errors. the relayer wallet may be used elsewhere concurrently."); + error!("failed due to account sequence errors. the relayer wallet may be used elsewhere concurrently."); Ok(response) } } @@ -440,7 +437,7 @@ impl CosmosSdkChain { // Complete success. match response.code { tendermint::abci::Code::Ok => { - debug!("[{}] send_tx: broadcast_tx_sync: {:?}", self.id(), response); + debug!("broadcast_tx_sync: {:?}", response); self.incr_account_sequence(); Ok(response) @@ -450,8 +447,7 @@ impl CosmosSdkChain { // Avoid increasing the account s.n. if CheckTx failed // Log the error error!( - "[{}] send_tx: broadcast_tx_sync: {:?}: diagnostic: {:?}", - self.id(), + "broadcast_tx_sync: {:?}: diagnostic: {:?}", response, sdk_error_from_tx_sync_error_code(code) ); @@ -468,6 +464,8 @@ impl CosmosSdkChain { fn send_tx(&mut self, proto_msgs: Vec) -> Result { crate::time!("send_tx"); + let _span = span!(Level::ERROR, "send_tx", id = %self.id()).entered(); + self.send_tx_with_account_sequence_retry(proto_msgs, 0) } @@ -481,12 +479,13 @@ impl CosmosSdkChain { /// In this case we use the `default_gas` param. fn estimate_gas(&mut self, tx: Tx) -> Result { let simulated_gas = self.send_tx_simulate(tx).map(|sr| sr.gas_info); + let _span = span!(Level::ERROR, "estimate_gas").entered(); + match simulated_gas { Ok(Some(gas_info)) => { debug!( - "[{}] estimate_gas: tx simulation successful, gas amount used: {:?}", - self.id(), + "tx simulation successful, gas amount used: {:?}", gas_info.gas_used ); @@ -495,8 +494,7 @@ impl CosmosSdkChain { Ok(None) => { warn!( - "[{}] estimate_gas: tx simulation successful but no gas amount used was returned, falling back on default gas: {}", - self.id(), + "tx simulation successful but no gas amount used was returned, falling back on default gas: {}", self.default_gas() ); @@ -508,8 +506,7 @@ impl CosmosSdkChain { // See `can_recover_from_simulation_failure` for more info. Err(e) if can_recover_from_simulation_failure(&e) => { warn!( - "[{}] estimate_gas: failed to simulate tx, falling back on default gas because the error is potentially recoverable: {}", - self.id(), + "failed to simulate tx, falling back on default gas because the error is potentially recoverable: {}", e.detail() ); @@ -518,8 +515,7 @@ impl CosmosSdkChain { Err(e) => { error!( - "[{}] estimate_gas: failed to simulate tx. propagating error to caller: {}", - self.id(), + "failed to simulate tx. propagating error to caller: {}", e.detail() ); // Propagate the error, the retrying mechanism at caller may catch & retry. @@ -697,8 +693,7 @@ impl CosmosSdkChain { info!( sequence = %account.sequence, number = %account.account_number, - "[{}] refresh: retrieved account", - self.id() + "refresh: retrieved account", ); self.account = Some(account); diff --git a/relayer/src/worker/channel.rs b/relayer/src/worker/channel.rs index 75abb14ade..c0b9571f38 100644 --- a/relayer/src/worker/channel.rs +++ b/relayer/src/worker/channel.rs @@ -20,7 +20,7 @@ pub fn spawn_channel_worker( cmd_rx: Receiver, ) -> TaskHandle { spawn_background_task( - error_span!("ChannelWorker", channel = %channel.short_name()), + error_span!("channel", channel = %channel.short_name()), Some(Duration::from_millis(200)), move || { if let Ok(cmd) = cmd_rx.try_recv() { diff --git a/relayer/src/worker/client.rs b/relayer/src/worker/client.rs index 2fccbfdf53..2914541e75 100644 --- a/relayer/src/worker/client.rs +++ b/relayer/src/worker/client.rs @@ -27,7 +27,7 @@ pub fn spawn_refresh_client( Some(spawn_background_task( span!( tracing::Level::ERROR, - "RefreshClientWorker", + "refresh", client = %client.id, src_chain = %client.src_chain.id(), dst_chain = %client.dst_chain.id(), diff --git a/relayer/src/worker/connection.rs b/relayer/src/worker/connection.rs index 832a6b166d..1ddf8912f7 100644 --- a/relayer/src/worker/connection.rs +++ b/relayer/src/worker/connection.rs @@ -20,7 +20,7 @@ pub fn spawn_connection_worker( cmd_rx: Receiver, ) -> TaskHandle { spawn_background_task( - error_span!("ConnectionWorker", connection = %connection.short_name()), + error_span!("connection", connection = %connection.short_name()), Some(Duration::from_millis(200)), move || { if let Ok(cmd) = cmd_rx.try_recv() { diff --git a/relayer/src/worker/packet.rs b/relayer/src/worker/packet.rs index 5b4aa010e1..dff637422f 100644 --- a/relayer/src/worker/packet.rs +++ b/relayer/src/worker/packet.rs @@ -50,7 +50,7 @@ pub fn spawn_packet_worker( let span = { let relay_path = &link.lock().unwrap().a_to_b; error_span!( - "PacketWorker", + "packet", src_chain = %relay_path.src_chain().id(), src_port = %relay_path.src_port_id(), src_channel = %relay_path.src_channel_id(), @@ -92,7 +92,7 @@ pub fn spawn_packet_cmd_worker( let span = { let relay_path = &link.lock().unwrap().a_to_b; error_span!( - "PacketCmdWorker", + "packet_cmd", src_chain = %relay_path.src_chain().id(), src_port = %relay_path.src_port_id(), src_channel = %relay_path.src_channel_id(), From bfec7d6975bf614ba6030e7c823df397c2e6959c Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Mon, 17 Jan 2022 16:10:04 +0200 Subject: [PATCH 29/29] Changelog entry for #1491 --- .../unreleased/improvements/ibc-relayer/1491-structured-logs.md | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .changelog/unreleased/improvements/ibc-relayer/1491-structured-logs.md diff --git a/.changelog/unreleased/improvements/ibc-relayer/1491-structured-logs.md b/.changelog/unreleased/improvements/ibc-relayer/1491-structured-logs.md new file mode 100644 index 0000000000..1f1ae711e9 --- /dev/null +++ b/.changelog/unreleased/improvements/ibc-relayer/1491-structured-logs.md @@ -0,0 +1,2 @@ +- More structural logging in relayer, using tracing spans and key-value pairs. + ([#1491](https://github.com/informalsystems/ibc-rs/pull/1491))