Skip to content

Commit

Permalink
Remove init_event_monitor from ChainEndpoint in favor of `subscri…
Browse files Browse the repository at this point in the history
…be` method
  • Loading branch information
romac committed Nov 16, 2022
1 parent 027578f commit cb0c16f
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 217 deletions.
22 changes: 10 additions & 12 deletions crates/relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,14 @@ use std::thread;

use abscissa_core::clap::Parser;
use abscissa_core::{application::fatal_error, Runnable};
use eyre::eyre;
use itertools::Itertools;
use tokio::runtime::Runtime as TokioRuntime;
use tracing::{error, info, instrument};

use ibc_relayer::{chain::handle::Subscription, config::ChainConfig, event::monitor::EventMonitor};
use ibc_relayer_types::{core::ics24_host::identifier::ChainId, events::IbcEvent};

use eyre::eyre;
use ibc_relayer::{
config::ChainConfig,
event::monitor::{EventMonitor, EventReceiver},
};

use crate::prelude::*;

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -115,14 +111,14 @@ pub fn listen(config: &ChainConfig, filters: &[EventFilter]) -> eyre::Result<()>
thread::spawn(|| event_monitor.run());

while let Ok(event_batch) = rx.recv() {
match event_batch {
match event_batch.as_ref() {
Ok(batch) => {
let _span =
tracing::error_span!("event_batch", batch_height = %batch.height).entered();

let matching_events = batch
.events
.into_iter()
.iter()
.filter(|e| event_match(&e.event, filters))
.collect_vec();

Expand All @@ -148,19 +144,21 @@ fn event_match(event: &IbcEvent, filters: &[EventFilter]) -> bool {
fn subscribe(
chain_config: &ChainConfig,
rt: Arc<TokioRuntime>,
) -> eyre::Result<(EventMonitor, EventReceiver)> {
let (mut event_monitor, rx, _) = EventMonitor::new(
) -> eyre::Result<(EventMonitor, Subscription)> {
let (mut event_monitor, tx_cmd) = EventMonitor::new(
chain_config.id.clone(),
chain_config.websocket_addr.clone(),
rt,
)
.map_err(|e| eyre!("could not initialize event monitor: {}", e))?;

event_monitor
.subscribe()
.init_subscriptions()
.map_err(|e| eyre!("could not initialize subscriptions: {}", e))?;

Ok((event_monitor, rx))
let subscription = tx_cmd.subscribe()?;

Ok((event_monitor, subscription))
}

#[cfg(test)]
Expand Down
105 changes: 59 additions & 46 deletions crates/relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,12 @@ use core::{
use num_bigint::BigInt;
use std::{cmp::Ordering, thread};

use ibc_proto::protobuf::Protobuf;
use tendermint::block::Height as TmHeight;
use tendermint::node::info::TxIndexStatus;
use tendermint_light_client_verifier::types::LightBlock as TmLightBlock;
use tendermint_rpc::{
abci::Path as TendermintABCIPath, endpoint::broadcast::tx_sync::Response, endpoint::status,
Client, HttpClient, Order,
};
use tokio::runtime::Runtime as TokioRuntime;
use tonic::{codegen::http::Uri, metadata::AsciiMetadataValue};
use tracing::{error, instrument, trace, warn};

use ibc_proto::cosmos::staking::v1beta1::Params as StakingParams;
use ibc_proto::protobuf::Protobuf;
use ibc_relayer_types::clients::ics07_tendermint::client_state::{
AllowUpdate, ClientState as TmClientState,
};
Expand Down Expand Up @@ -50,10 +43,19 @@ use ibc_relayer_types::core::ics24_host::{
use ibc_relayer_types::signer::Signer;
use ibc_relayer_types::Height as ICSHeight;

use tendermint::block::Height as TmHeight;
use tendermint::node::info::TxIndexStatus;
use tendermint_light_client_verifier::types::LightBlock as TmLightBlock;
use tendermint_rpc::abci::Path as TendermintABCIPath;
use tendermint_rpc::endpoint::broadcast::tx_sync::Response;
use tendermint_rpc::endpoint::status;
use tendermint_rpc::{Client, HttpClient, Order};

use crate::account::Balance;
use crate::chain::client::ClientSettings;
use crate::chain::cosmos::batch::sequential_send_batched_messages_and_wait_commit;
use crate::chain::cosmos::batch::{
send_batched_messages_and_wait_check_tx, send_batched_messages_and_wait_commit,
sequential_send_batched_messages_and_wait_commit,
};
use crate::chain::cosmos::encode::key_entry_to_signer;
use crate::chain::cosmos::fee::maybe_register_counterparty_payee;
Expand All @@ -72,33 +74,23 @@ use crate::chain::cosmos::types::gas::{
default_gas_from_config, gas_multiplier_from_config, max_gas_from_config,
};
use crate::chain::endpoint::{ChainEndpoint, ChainStatus, HealthCheck};
use crate::chain::requests::{Qualified, QueryPacketEventDataRequest};
use crate::chain::handle::Subscription;
use crate::chain::requests::*;
use crate::chain::tracking::TrackedMsgs;
use crate::client_state::{AnyClientState, IdentifiedAnyClientState};
use crate::config::ChainConfig;
use crate::consensus_state::{AnyConsensusState, AnyConsensusStateWithHeight};
use crate::denom::DenomTrace;
use crate::error::Error;
use crate::event::monitor::{EventReceiver, TxMonitorCmd};
use crate::event::monitor::{EventMonitor, TxMonitorCmd};
use crate::event::IbcEventWithHeight;
use crate::keyring::{KeyEntry, KeyRing};
use crate::light_client::tendermint::LightClient as TmLightClient;
use crate::light_client::{LightClient, Verified};
use crate::misbehaviour::MisbehaviourEvidence;
use crate::util::pretty::{PrettyConsensusStateWithHeight, PrettyIdentifiedChannel};
use crate::util::pretty::{PrettyIdentifiedClientState, PrettyIdentifiedConnection};
use crate::{account::Balance, event::monitor::EventMonitor};

use super::requests::{
IncludeProof, QueryChannelClientStateRequest, QueryChannelRequest, QueryChannelsRequest,
QueryClientConnectionsRequest, QueryClientStateRequest, QueryClientStatesRequest,
QueryConnectionChannelsRequest, QueryConnectionRequest, QueryConnectionsRequest,
QueryConsensusStateRequest, QueryConsensusStatesRequest, QueryHeight,
QueryHostConsensusStateRequest, QueryNextSequenceReceiveRequest,
QueryPacketAcknowledgementRequest, QueryPacketAcknowledgementsRequest,
QueryPacketCommitmentRequest, QueryPacketCommitmentsRequest, QueryPacketReceiptRequest,
QueryTxRequest, QueryUnreceivedAcksRequest, QueryUnreceivedPacketsRequest,
QueryUpgradedClientStateRequest, QueryUpgradedConsensusStateRequest,
use crate::util::pretty::{
PrettyConsensusStateWithHeight, PrettyIdentifiedChannel, PrettyIdentifiedClientState,
PrettyIdentifiedConnection,
};

pub mod batch;
Expand Down Expand Up @@ -139,8 +131,11 @@ pub struct CosmosSdkChain {
light_client: TmLightClient,
rt: Arc<TokioRuntime>,
keybase: KeyRing,

/// A cached copy of the account information
account: Option<Account>,

tx_monitor_cmd: Option<TxMonitorCmd>,
}

impl CosmosSdkChain {
Expand Down Expand Up @@ -277,6 +272,25 @@ impl CosmosSdkChain {
Ok(())
}

fn init_event_monitor(&mut self) -> Result<TxMonitorCmd, Error> {
crate::time!("init_event_monitor");

let (mut event_monitor, monitor_tx) = EventMonitor::new(
self.config.id.clone(),
self.config.websocket_addr.clone(),
self.rt.clone(),
)
.map_err(Error::event_monitor)?;

event_monitor
.init_subscriptions()
.map_err(Error::event_monitor)?;

thread::spawn(move || event_monitor.run());

Ok(monitor_tx)
}

/// Query the chain staking parameters
pub fn query_staking_params(&self) -> Result<StakingParams, Error> {
crate::time!("query_staking_params");
Expand Down Expand Up @@ -644,34 +658,19 @@ impl ChainEndpoint for CosmosSdkChain {
light_client,
rt,
keybase,
account: None,
tx_config,
account: None,
tx_monitor_cmd: None,
};

Ok(chain)
}

fn init_event_monitor(
&self,
rt: Arc<TokioRuntime>,
) -> Result<(EventReceiver, TxMonitorCmd), Error> {
crate::time!("init_event_monitor");

let (mut event_monitor, event_receiver, monitor_tx) = EventMonitor::new(
self.config.id.clone(),
self.config.websocket_addr.clone(),
rt,
)
.map_err(Error::event_monitor)?;

event_monitor.subscribe().map_err(Error::event_monitor)?;

thread::spawn(move || event_monitor.run());

Ok((event_receiver, monitor_tx))
}

fn shutdown(self) -> Result<(), Error> {
if let Some(monitor_tx) = self.tx_monitor_cmd {
monitor_tx.shutdown().map_err(Error::event_monitor)?;
}

Ok(())
}

Expand All @@ -683,6 +682,20 @@ impl ChainEndpoint for CosmosSdkChain {
&mut self.keybase
}

fn subscribe(&mut self) -> Result<Subscription, Error> {
let tx_monitor_cmd = match &self.tx_monitor_cmd {
Some(tx_monitor_cmd) => tx_monitor_cmd,
None => {
let tx_monitor_cmd = self.init_event_monitor()?;
self.tx_monitor_cmd = Some(tx_monitor_cmd);
self.tx_monitor_cmd.as_ref().unwrap()
}
};

let subscription = tx_monitor_cmd.subscribe().map_err(Error::event_monitor)?;
Ok(subscription)
}

/// Does multiple RPC calls to the full node, to check for
/// reachability and some basic APIs are available.
///
Expand Down
16 changes: 4 additions & 12 deletions crates/relayer/src/chain/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tendermint_rpc::endpoint::broadcast::tx_sync::Response as TxResponse;

use crate::account::Balance;
use crate::chain::client::ClientSettings;
use crate::chain::handle::Subscription;
use crate::chain::requests::*;
use crate::chain::tracking::TrackedMsgs;
use crate::client_state::{AnyClientState, IdentifiedAnyClientState};
Expand All @@ -37,17 +38,11 @@ use crate::connection::ConnectionMsgType;
use crate::consensus_state::{AnyConsensusState, AnyConsensusStateWithHeight};
use crate::denom::DenomTrace;
use crate::error::{Error, QUERY_PROOF_EXPECT_MSG};
use crate::event::monitor::{EventReceiver, TxMonitorCmd};
use crate::event::IbcEventWithHeight;
use crate::keyring::{KeyEntry, KeyRing};
use crate::light_client::AnyHeader;
use crate::misbehaviour::MisbehaviourEvidence;

use super::requests::{
IncludeProof, QueryHeight, QueryPacketAcknowledgementRequest, QueryPacketCommitmentRequest,
QueryPacketReceiptRequest, QueryTxRequest,
};

/// The result of a health check.
#[derive(Debug)]
pub enum HealthCheck {
Expand Down Expand Up @@ -89,18 +84,15 @@ pub trait ChainEndpoint: Sized {
/// Constructs the chain
fn bootstrap(config: ChainConfig, rt: Arc<TokioRuntime>) -> Result<Self, Error>;

/// Initializes and returns the event monitor (if any) associated with this chain.
fn init_event_monitor(
&self,
rt: Arc<TokioRuntime>,
) -> Result<(EventReceiver, TxMonitorCmd), Error>;

/// Shutdown the chain runtime
fn shutdown(self) -> Result<(), Error>;

/// Perform a health check
fn health_check(&self) -> Result<HealthCheck, Error>;

// Events
fn subscribe(&mut self) -> Result<Subscription, Error>;

// Keyring

/// Returns the chain's keybase
Expand Down
Loading

0 comments on commit cb0c16f

Please sign in to comment.