diff --git a/crates/relayer/src/chain/cosmos.rs b/crates/relayer/src/chain/cosmos.rs index 4c0e0465ab..473e8317bb 100644 --- a/crates/relayer/src/chain/cosmos.rs +++ b/crates/relayer/src/chain/cosmos.rs @@ -9,13 +9,19 @@ use core::{ use num_bigint::BigInt; use std::{cmp::Ordering, thread}; +use ibc_proto::protobuf::Protobuf; +use tendermint::block::Height as TmHeight; +use tendermint::node::info::TxIndexStatus; +use tendermint_light_client_verifier::types::LightBlock as TmLightBlock; +use tendermint_rpc::{ + endpoint::broadcast::tx_sync::Response, endpoint::status, Client, HttpClient, Order, +}; use tokio::runtime::Runtime as TokioRuntime; use tonic::{codegen::http::Uri, metadata::AsciiMetadataValue}; use tracing::{error, instrument, trace, warn}; use ibc_proto::cosmos::base::node::v1beta1::ConfigResponse; use ibc_proto::cosmos::staking::v1beta1::Params as StakingParams; -use ibc_proto::protobuf::Protobuf; use ibc_relayer_types::clients::ics07_tendermint::client_state::{ AllowUpdate, ClientState as TmClientState, }; @@ -44,18 +50,10 @@ use ibc_relayer_types::core::ics24_host::{ use ibc_relayer_types::signer::Signer; use ibc_relayer_types::Height as ICSHeight; -use tendermint::block::Height as TmHeight; -use tendermint::node::info::TxIndexStatus; -use tendermint_light_client_verifier::types::LightBlock as TmLightBlock; -use tendermint_rpc::endpoint::broadcast::tx_sync::Response; -use tendermint_rpc::endpoint::status; -use tendermint_rpc::{Client, HttpClient, Order}; - -use crate::account::Balance; use crate::chain::client::ClientSettings; +use crate::chain::cosmos::batch::sequential_send_batched_messages_and_wait_commit; use crate::chain::cosmos::batch::{ send_batched_messages_and_wait_check_tx, send_batched_messages_and_wait_commit, - sequential_send_batched_messages_and_wait_commit, }; use crate::chain::cosmos::encode::key_entry_to_signer; use crate::chain::cosmos::fee::maybe_register_counterparty_payee; @@ -74,23 +72,33 @@ use crate::chain::cosmos::types::gas::{ default_gas_from_config, gas_multiplier_from_config, max_gas_from_config, }; use crate::chain::endpoint::{ChainEndpoint, ChainStatus, HealthCheck}; -use crate::chain::handle::Subscription; -use crate::chain::requests::*; +use crate::chain::requests::{Qualified, QueryPacketEventDataRequest}; use crate::chain::tracking::TrackedMsgs; use crate::client_state::{AnyClientState, IdentifiedAnyClientState}; -use crate::config::ChainConfig; +use crate::config::{parse_gas_prices, ChainConfig, GasPrice}; use crate::consensus_state::{AnyConsensusState, AnyConsensusStateWithHeight}; use crate::denom::DenomTrace; use crate::error::Error; -use crate::event::monitor::{EventMonitor, TxMonitorCmd}; +use crate::event::monitor::{EventReceiver, TxMonitorCmd}; use crate::event::IbcEventWithHeight; use crate::keyring::{KeyEntry, KeyRing}; use crate::light_client::tendermint::LightClient as TmLightClient; use crate::light_client::{LightClient, Verified}; use crate::misbehaviour::MisbehaviourEvidence; -use crate::util::pretty::{ - PrettyConsensusStateWithHeight, PrettyIdentifiedChannel, PrettyIdentifiedClientState, - PrettyIdentifiedConnection, +use crate::util::pretty::{PrettyConsensusStateWithHeight, PrettyIdentifiedChannel}; +use crate::util::pretty::{PrettyIdentifiedClientState, PrettyIdentifiedConnection}; +use crate::{account::Balance, event::monitor::EventMonitor}; + +use super::requests::{ + IncludeProof, QueryChannelClientStateRequest, QueryChannelRequest, QueryChannelsRequest, + QueryClientConnectionsRequest, QueryClientStateRequest, QueryClientStatesRequest, + QueryConnectionChannelsRequest, QueryConnectionRequest, QueryConnectionsRequest, + QueryConsensusStateRequest, QueryConsensusStatesRequest, QueryHeight, + QueryHostConsensusStateRequest, QueryNextSequenceReceiveRequest, + QueryPacketAcknowledgementRequest, QueryPacketAcknowledgementsRequest, + QueryPacketCommitmentRequest, QueryPacketCommitmentsRequest, QueryPacketReceiptRequest, + QueryTxRequest, QueryUnreceivedAcksRequest, QueryUnreceivedPacketsRequest, + QueryUpgradedClientStateRequest, QueryUpgradedConsensusStateRequest, }; pub mod batch; @@ -131,11 +139,8 @@ pub struct CosmosSdkChain { light_client: TmLightClient, rt: Arc, keybase: KeyRing, - /// A cached copy of the account information account: Option, - - tx_monitor_cmd: Option, } impl CosmosSdkChain { @@ -272,25 +277,6 @@ impl CosmosSdkChain { Ok(()) } - fn init_event_monitor(&mut self) -> Result { - crate::time!("init_event_monitor"); - - let (mut event_monitor, monitor_tx) = EventMonitor::new( - self.config.id.clone(), - self.config.websocket_addr.clone(), - self.rt.clone(), - ) - .map_err(Error::event_monitor)?; - - event_monitor - .init_subscriptions() - .map_err(Error::event_monitor)?; - - thread::spawn(move || event_monitor.run()); - - Ok(monitor_tx) - } - /// Query the chain staking parameters pub fn query_staking_params(&self) -> Result { crate::time!("query_staking_params"); @@ -636,7 +622,6 @@ impl CosmosSdkChain { .map(|ev| IbcEventWithHeight::new(ev, response_height)) .collect(), ); - Ok((begin_block_events, end_block_events)) } @@ -650,29 +635,22 @@ impl CosmosSdkChain { let mut begin_block_events = vec![]; let mut end_block_events = vec![]; - for seq in request.sequences.iter().copied() { + for seq in request.sequences.iter() { let response = self .block_on(self.rpc_client.block_search( - packet_query(request, seq), - // We only need the first page + packet_query(request, *seq), 1, - // There should only be a single match for this query, but due to - // the fact that the indexer treat the query as a disjunction over - // all events in a block rather than a conjunction over a single event, - // we may end up with partial matches and therefore have to account for - // that by fetching multiple results and filter it down after the fact. - // In the worst case we get N blocks where N is the number of channels, - // but 10 seems to work well enough in practice while keeping the response - // size, and therefore pressure on the node, fairly low. - 10, - // We could pick either ordering here, since matching blocks may be at pretty - // much any height relative to the target blocks, so we went with most recent - // blocks first. - Order::Descending, + 1, // there should only be a single match for this query + Order::Ascending, )) .map_err(|e| Error::rpc(self.config.rpc_addr.clone(), e))?; - for block in response.blocks.into_iter().map(|response| response.block) { + assert!( + response.blocks.len() <= 1, + "block_search: unexpected number of blocks" + ); + + if let Some(block) = response.blocks.first().map(|first| &first.block) { let response_height = ICSHeight::new(self.id().version(), u64::from(block.header.height)) .map_err(|_| Error::invalid_height_no_source())?; @@ -683,16 +661,13 @@ impl CosmosSdkChain { } } - // `query_packet_from_block` retrieves the begin and end block events - // and filter them to retain only those matching the query let (new_begin_block_events, new_end_block_events) = - self.query_packet_from_block(request, &[seq], &response_height)?; + self.query_packet_from_block(request, &[*seq], &response_height)?; begin_block_events.extend(new_begin_block_events); end_block_events.extend(new_end_block_events); } } - Ok((begin_block_events, end_block_events)) } } @@ -727,19 +702,34 @@ impl ChainEndpoint for CosmosSdkChain { light_client, rt, keybase, - tx_config, account: None, - tx_monitor_cmd: None, + tx_config, }; Ok(chain) } - fn shutdown(self) -> Result<(), Error> { - if let Some(monitor_tx) = self.tx_monitor_cmd { - monitor_tx.shutdown().map_err(Error::event_monitor)?; - } + fn init_event_monitor( + &self, + rt: Arc, + ) -> Result<(EventReceiver, TxMonitorCmd), Error> { + crate::time!("init_event_monitor"); + let (mut event_monitor, event_receiver, monitor_tx) = EventMonitor::new( + self.config.id.clone(), + self.config.websocket_addr.clone(), + rt, + ) + .map_err(Error::event_monitor)?; + + event_monitor.subscribe().map_err(Error::event_monitor)?; + + thread::spawn(move || event_monitor.run()); + + Ok((event_receiver, monitor_tx)) + } + + fn shutdown(self) -> Result<(), Error> { Ok(()) } @@ -751,20 +741,6 @@ impl ChainEndpoint for CosmosSdkChain { &mut self.keybase } - fn subscribe(&mut self) -> Result { - let tx_monitor_cmd = match &self.tx_monitor_cmd { - Some(tx_monitor_cmd) => tx_monitor_cmd, - None => { - let tx_monitor_cmd = self.init_event_monitor()?; - self.tx_monitor_cmd = Some(tx_monitor_cmd); - self.tx_monitor_cmd.as_ref().unwrap() - } - }; - - let subscription = tx_monitor_cmd.subscribe().map_err(Error::event_monitor)?; - Ok(subscription) - } - /// Does multiple RPC calls to the full node, to check for /// reachability and some basic APIs are available. ///