Skip to content

Commit

Permalink
Add missing imports
Browse files Browse the repository at this point in the history
  • Loading branch information
seanchen1991 committed Dec 2, 2022
1 parent e25642b commit 5cab4e8
Showing 1 changed file with 57 additions and 81 deletions.
138 changes: 57 additions & 81 deletions crates/relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@ use core::{
use num_bigint::BigInt;
use std::{cmp::Ordering, thread};

use ibc_proto::protobuf::Protobuf;
use tendermint::block::Height as TmHeight;
use tendermint::node::info::TxIndexStatus;
use tendermint_light_client_verifier::types::LightBlock as TmLightBlock;
use tendermint_rpc::{
endpoint::broadcast::tx_sync::Response, endpoint::status, Client, HttpClient, Order,
};
use tokio::runtime::Runtime as TokioRuntime;
use tonic::{codegen::http::Uri, metadata::AsciiMetadataValue};
use tracing::{error, instrument, trace, warn};

use ibc_proto::cosmos::base::node::v1beta1::ConfigResponse;
use ibc_proto::cosmos::staking::v1beta1::Params as StakingParams;
use ibc_proto::protobuf::Protobuf;
use ibc_relayer_types::clients::ics07_tendermint::client_state::{
AllowUpdate, ClientState as TmClientState,
};
Expand Down Expand Up @@ -44,18 +50,10 @@ use ibc_relayer_types::core::ics24_host::{
use ibc_relayer_types::signer::Signer;
use ibc_relayer_types::Height as ICSHeight;

use tendermint::block::Height as TmHeight;
use tendermint::node::info::TxIndexStatus;
use tendermint_light_client_verifier::types::LightBlock as TmLightBlock;
use tendermint_rpc::endpoint::broadcast::tx_sync::Response;
use tendermint_rpc::endpoint::status;
use tendermint_rpc::{Client, HttpClient, Order};

use crate::account::Balance;
use crate::chain::client::ClientSettings;
use crate::chain::cosmos::batch::sequential_send_batched_messages_and_wait_commit;
use crate::chain::cosmos::batch::{
send_batched_messages_and_wait_check_tx, send_batched_messages_and_wait_commit,
sequential_send_batched_messages_and_wait_commit,
};
use crate::chain::cosmos::encode::key_entry_to_signer;
use crate::chain::cosmos::fee::maybe_register_counterparty_payee;
Expand All @@ -74,23 +72,33 @@ use crate::chain::cosmos::types::gas::{
default_gas_from_config, gas_multiplier_from_config, max_gas_from_config,
};
use crate::chain::endpoint::{ChainEndpoint, ChainStatus, HealthCheck};
use crate::chain::handle::Subscription;
use crate::chain::requests::*;
use crate::chain::requests::{Qualified, QueryPacketEventDataRequest};
use crate::chain::tracking::TrackedMsgs;
use crate::client_state::{AnyClientState, IdentifiedAnyClientState};
use crate::config::ChainConfig;
use crate::config::{parse_gas_prices, ChainConfig, GasPrice};
use crate::consensus_state::{AnyConsensusState, AnyConsensusStateWithHeight};
use crate::denom::DenomTrace;
use crate::error::Error;
use crate::event::monitor::{EventMonitor, TxMonitorCmd};
use crate::event::monitor::{EventReceiver, TxMonitorCmd};
use crate::event::IbcEventWithHeight;
use crate::keyring::{KeyEntry, KeyRing};
use crate::light_client::tendermint::LightClient as TmLightClient;
use crate::light_client::{LightClient, Verified};
use crate::misbehaviour::MisbehaviourEvidence;
use crate::util::pretty::{
PrettyConsensusStateWithHeight, PrettyIdentifiedChannel, PrettyIdentifiedClientState,
PrettyIdentifiedConnection,
use crate::util::pretty::{PrettyConsensusStateWithHeight, PrettyIdentifiedChannel};
use crate::util::pretty::{PrettyIdentifiedClientState, PrettyIdentifiedConnection};
use crate::{account::Balance, event::monitor::EventMonitor};

use super::requests::{
IncludeProof, QueryChannelClientStateRequest, QueryChannelRequest, QueryChannelsRequest,
QueryClientConnectionsRequest, QueryClientStateRequest, QueryClientStatesRequest,
QueryConnectionChannelsRequest, QueryConnectionRequest, QueryConnectionsRequest,
QueryConsensusStateRequest, QueryConsensusStatesRequest, QueryHeight,
QueryHostConsensusStateRequest, QueryNextSequenceReceiveRequest,
QueryPacketAcknowledgementRequest, QueryPacketAcknowledgementsRequest,
QueryPacketCommitmentRequest, QueryPacketCommitmentsRequest, QueryPacketReceiptRequest,
QueryTxRequest, QueryUnreceivedAcksRequest, QueryUnreceivedPacketsRequest,
QueryUpgradedClientStateRequest, QueryUpgradedConsensusStateRequest,
};

pub mod batch;
Expand Down Expand Up @@ -131,11 +139,8 @@ pub struct CosmosSdkChain {
light_client: TmLightClient,
rt: Arc<TokioRuntime>,
keybase: KeyRing,

/// A cached copy of the account information
account: Option<Account>,

tx_monitor_cmd: Option<TxMonitorCmd>,
}

impl CosmosSdkChain {
Expand Down Expand Up @@ -272,25 +277,6 @@ impl CosmosSdkChain {
Ok(())
}

fn init_event_monitor(&mut self) -> Result<TxMonitorCmd, Error> {
crate::time!("init_event_monitor");

let (mut event_monitor, monitor_tx) = EventMonitor::new(
self.config.id.clone(),
self.config.websocket_addr.clone(),
self.rt.clone(),
)
.map_err(Error::event_monitor)?;

event_monitor
.init_subscriptions()
.map_err(Error::event_monitor)?;

thread::spawn(move || event_monitor.run());

Ok(monitor_tx)
}

/// Query the chain staking parameters
pub fn query_staking_params(&self) -> Result<StakingParams, Error> {
crate::time!("query_staking_params");
Expand Down Expand Up @@ -636,7 +622,6 @@ impl CosmosSdkChain {
.map(|ev| IbcEventWithHeight::new(ev, response_height))
.collect(),
);

Ok((begin_block_events, end_block_events))
}

Expand All @@ -650,29 +635,22 @@ impl CosmosSdkChain {
let mut begin_block_events = vec![];
let mut end_block_events = vec![];

for seq in request.sequences.iter().copied() {
for seq in request.sequences.iter() {
let response = self
.block_on(self.rpc_client.block_search(
packet_query(request, seq),
// We only need the first page
packet_query(request, *seq),
1,
// There should only be a single match for this query, but due to
// the fact that the indexer treat the query as a disjunction over
// all events in a block rather than a conjunction over a single event,
// we may end up with partial matches and therefore have to account for
// that by fetching multiple results and filter it down after the fact.
// In the worst case we get N blocks where N is the number of channels,
// but 10 seems to work well enough in practice while keeping the response
// size, and therefore pressure on the node, fairly low.
10,
// We could pick either ordering here, since matching blocks may be at pretty
// much any height relative to the target blocks, so we went with most recent
// blocks first.
Order::Descending,
1, // there should only be a single match for this query
Order::Ascending,
))
.map_err(|e| Error::rpc(self.config.rpc_addr.clone(), e))?;

for block in response.blocks.into_iter().map(|response| response.block) {
assert!(
response.blocks.len() <= 1,
"block_search: unexpected number of blocks"
);

if let Some(block) = response.blocks.first().map(|first| &first.block) {
let response_height =
ICSHeight::new(self.id().version(), u64::from(block.header.height))
.map_err(|_| Error::invalid_height_no_source())?;
Expand All @@ -683,16 +661,13 @@ impl CosmosSdkChain {
}
}

// `query_packet_from_block` retrieves the begin and end block events
// and filter them to retain only those matching the query
let (new_begin_block_events, new_end_block_events) =
self.query_packet_from_block(request, &[seq], &response_height)?;
self.query_packet_from_block(request, &[*seq], &response_height)?;

begin_block_events.extend(new_begin_block_events);
end_block_events.extend(new_end_block_events);
}
}

Ok((begin_block_events, end_block_events))
}
}
Expand Down Expand Up @@ -727,19 +702,34 @@ impl ChainEndpoint for CosmosSdkChain {
light_client,
rt,
keybase,
tx_config,
account: None,
tx_monitor_cmd: None,
tx_config,
};

Ok(chain)
}

fn shutdown(self) -> Result<(), Error> {
if let Some(monitor_tx) = self.tx_monitor_cmd {
monitor_tx.shutdown().map_err(Error::event_monitor)?;
}
fn init_event_monitor(
&self,
rt: Arc<TokioRuntime>,
) -> Result<(EventReceiver, TxMonitorCmd), Error> {
crate::time!("init_event_monitor");

let (mut event_monitor, event_receiver, monitor_tx) = EventMonitor::new(
self.config.id.clone(),
self.config.websocket_addr.clone(),
rt,
)
.map_err(Error::event_monitor)?;

event_monitor.subscribe().map_err(Error::event_monitor)?;

thread::spawn(move || event_monitor.run());

Ok((event_receiver, monitor_tx))
}

fn shutdown(self) -> Result<(), Error> {
Ok(())
}

Expand All @@ -751,20 +741,6 @@ impl ChainEndpoint for CosmosSdkChain {
&mut self.keybase
}

fn subscribe(&mut self) -> Result<Subscription, Error> {
let tx_monitor_cmd = match &self.tx_monitor_cmd {
Some(tx_monitor_cmd) => tx_monitor_cmd,
None => {
let tx_monitor_cmd = self.init_event_monitor()?;
self.tx_monitor_cmd = Some(tx_monitor_cmd);
self.tx_monitor_cmd.as_ref().unwrap()
}
};

let subscription = tx_monitor_cmd.subscribe().map_err(Error::event_monitor)?;
Ok(subscription)
}

/// Does multiple RPC calls to the full node, to check for
/// reachability and some basic APIs are available.
///
Expand Down

0 comments on commit 5cab4e8

Please sign in to comment.