diff --git a/crates/relayer-cli/src/commands/listen.rs b/crates/relayer-cli/src/commands/listen.rs index c46e3bd8fe..4c5b8f9ac4 100644 --- a/crates/relayer-cli/src/commands/listen.rs +++ b/crates/relayer-cli/src/commands/listen.rs @@ -8,18 +8,14 @@ use std::thread; use abscissa_core::clap::Parser; use abscissa_core::{application::fatal_error, Runnable}; +use eyre::eyre; use itertools::Itertools; use tokio::runtime::Runtime as TokioRuntime; use tracing::{error, info, instrument}; +use ibc_relayer::{chain::handle::Subscription, config::ChainConfig, event::monitor::EventMonitor}; use ibc_relayer_types::{core::ics24_host::identifier::ChainId, events::IbcEvent}; -use eyre::eyre; -use ibc_relayer::{ - config::ChainConfig, - event::monitor::{EventMonitor, EventReceiver}, -}; - use crate::prelude::*; #[derive(Copy, Clone, Debug, PartialEq, Eq)] @@ -115,14 +111,14 @@ pub fn listen(config: &ChainConfig, filters: &[EventFilter]) -> eyre::Result<()> thread::spawn(|| event_monitor.run()); while let Ok(event_batch) = rx.recv() { - match event_batch { + match event_batch.as_ref() { Ok(batch) => { let _span = tracing::error_span!("event_batch", batch_height = %batch.height).entered(); let matching_events = batch .events - .into_iter() + .iter() .filter(|e| event_match(&e.event, filters)) .collect_vec(); @@ -148,8 +144,8 @@ fn event_match(event: &IbcEvent, filters: &[EventFilter]) -> bool { fn subscribe( chain_config: &ChainConfig, rt: Arc, -) -> eyre::Result<(EventMonitor, EventReceiver)> { - let (mut event_monitor, rx, _) = EventMonitor::new( +) -> eyre::Result<(EventMonitor, Subscription)> { + let (mut event_monitor, tx_cmd) = EventMonitor::new( chain_config.id.clone(), chain_config.websocket_addr.clone(), rt, @@ -157,10 +153,12 @@ fn subscribe( .map_err(|e| eyre!("could not initialize event monitor: {}", e))?; event_monitor - .subscribe() + .init_subscriptions() .map_err(|e| eyre!("could not initialize subscriptions: {}", e))?; - Ok((event_monitor, rx)) + let subscription = tx_cmd.subscribe()?; + + Ok((event_monitor, subscription)) } #[cfg(test)] diff --git a/crates/relayer/src/chain/cosmos.rs b/crates/relayer/src/chain/cosmos.rs index 8756873f59..4430bc2870 100644 --- a/crates/relayer/src/chain/cosmos.rs +++ b/crates/relayer/src/chain/cosmos.rs @@ -9,19 +9,12 @@ use core::{ use num_bigint::BigInt; use std::{cmp::Ordering, thread}; -use ibc_proto::protobuf::Protobuf; -use tendermint::block::Height as TmHeight; -use tendermint::node::info::TxIndexStatus; -use tendermint_light_client_verifier::types::LightBlock as TmLightBlock; -use tendermint_rpc::{ - abci::Path as TendermintABCIPath, endpoint::broadcast::tx_sync::Response, endpoint::status, - Client, HttpClient, Order, -}; use tokio::runtime::Runtime as TokioRuntime; use tonic::{codegen::http::Uri, metadata::AsciiMetadataValue}; use tracing::{error, instrument, trace, warn}; use ibc_proto::cosmos::staking::v1beta1::Params as StakingParams; +use ibc_proto::protobuf::Protobuf; use ibc_relayer_types::clients::ics07_tendermint::client_state::{ AllowUpdate, ClientState as TmClientState, }; @@ -50,10 +43,19 @@ use ibc_relayer_types::core::ics24_host::{ use ibc_relayer_types::signer::Signer; use ibc_relayer_types::Height as ICSHeight; +use tendermint::block::Height as TmHeight; +use tendermint::node::info::TxIndexStatus; +use tendermint_light_client_verifier::types::LightBlock as TmLightBlock; +use tendermint_rpc::abci::Path as TendermintABCIPath; +use tendermint_rpc::endpoint::broadcast::tx_sync::Response; +use tendermint_rpc::endpoint::status; +use tendermint_rpc::{Client, HttpClient, Order}; + +use crate::account::Balance; use crate::chain::client::ClientSettings; -use crate::chain::cosmos::batch::sequential_send_batched_messages_and_wait_commit; use crate::chain::cosmos::batch::{ send_batched_messages_and_wait_check_tx, send_batched_messages_and_wait_commit, + sequential_send_batched_messages_and_wait_commit, }; use crate::chain::cosmos::encode::key_entry_to_signer; use crate::chain::cosmos::fee::maybe_register_counterparty_payee; @@ -72,33 +74,23 @@ use crate::chain::cosmos::types::gas::{ default_gas_from_config, gas_multiplier_from_config, max_gas_from_config, }; use crate::chain::endpoint::{ChainEndpoint, ChainStatus, HealthCheck}; -use crate::chain::requests::{Qualified, QueryPacketEventDataRequest}; +use crate::chain::handle::Subscription; +use crate::chain::requests::*; use crate::chain::tracking::TrackedMsgs; use crate::client_state::{AnyClientState, IdentifiedAnyClientState}; use crate::config::ChainConfig; use crate::consensus_state::{AnyConsensusState, AnyConsensusStateWithHeight}; use crate::denom::DenomTrace; use crate::error::Error; -use crate::event::monitor::{EventReceiver, TxMonitorCmd}; +use crate::event::monitor::{EventMonitor, TxMonitorCmd}; use crate::event::IbcEventWithHeight; use crate::keyring::{KeyEntry, KeyRing}; use crate::light_client::tendermint::LightClient as TmLightClient; use crate::light_client::{LightClient, Verified}; use crate::misbehaviour::MisbehaviourEvidence; -use crate::util::pretty::{PrettyConsensusStateWithHeight, PrettyIdentifiedChannel}; -use crate::util::pretty::{PrettyIdentifiedClientState, PrettyIdentifiedConnection}; -use crate::{account::Balance, event::monitor::EventMonitor}; - -use super::requests::{ - IncludeProof, QueryChannelClientStateRequest, QueryChannelRequest, QueryChannelsRequest, - QueryClientConnectionsRequest, QueryClientStateRequest, QueryClientStatesRequest, - QueryConnectionChannelsRequest, QueryConnectionRequest, QueryConnectionsRequest, - QueryConsensusStateRequest, QueryConsensusStatesRequest, QueryHeight, - QueryHostConsensusStateRequest, QueryNextSequenceReceiveRequest, - QueryPacketAcknowledgementRequest, QueryPacketAcknowledgementsRequest, - QueryPacketCommitmentRequest, QueryPacketCommitmentsRequest, QueryPacketReceiptRequest, - QueryTxRequest, QueryUnreceivedAcksRequest, QueryUnreceivedPacketsRequest, - QueryUpgradedClientStateRequest, QueryUpgradedConsensusStateRequest, +use crate::util::pretty::{ + PrettyConsensusStateWithHeight, PrettyIdentifiedChannel, PrettyIdentifiedClientState, + PrettyIdentifiedConnection, }; pub mod batch; @@ -139,8 +131,11 @@ pub struct CosmosSdkChain { light_client: TmLightClient, rt: Arc, keybase: KeyRing, + /// A cached copy of the account information account: Option, + + tx_monitor_cmd: Option, } impl CosmosSdkChain { @@ -277,6 +272,25 @@ impl CosmosSdkChain { Ok(()) } + fn init_event_monitor(&mut self) -> Result { + crate::time!("init_event_monitor"); + + let (mut event_monitor, monitor_tx) = EventMonitor::new( + self.config.id.clone(), + self.config.websocket_addr.clone(), + self.rt.clone(), + ) + .map_err(Error::event_monitor)?; + + event_monitor + .init_subscriptions() + .map_err(Error::event_monitor)?; + + thread::spawn(move || event_monitor.run()); + + Ok(monitor_tx) + } + /// Query the chain staking parameters pub fn query_staking_params(&self) -> Result { crate::time!("query_staking_params"); @@ -644,34 +658,19 @@ impl ChainEndpoint for CosmosSdkChain { light_client, rt, keybase, - account: None, tx_config, + account: None, + tx_monitor_cmd: None, }; Ok(chain) } - fn init_event_monitor( - &self, - rt: Arc, - ) -> Result<(EventReceiver, TxMonitorCmd), Error> { - crate::time!("init_event_monitor"); - - let (mut event_monitor, event_receiver, monitor_tx) = EventMonitor::new( - self.config.id.clone(), - self.config.websocket_addr.clone(), - rt, - ) - .map_err(Error::event_monitor)?; - - event_monitor.subscribe().map_err(Error::event_monitor)?; - - thread::spawn(move || event_monitor.run()); - - Ok((event_receiver, monitor_tx)) - } - fn shutdown(self) -> Result<(), Error> { + if let Some(monitor_tx) = self.tx_monitor_cmd { + monitor_tx.shutdown().map_err(Error::event_monitor)?; + } + Ok(()) } @@ -683,6 +682,20 @@ impl ChainEndpoint for CosmosSdkChain { &mut self.keybase } + fn subscribe(&mut self) -> Result { + let tx_monitor_cmd = match &self.tx_monitor_cmd { + Some(tx_monitor_cmd) => tx_monitor_cmd, + None => { + let tx_monitor_cmd = self.init_event_monitor()?; + self.tx_monitor_cmd = Some(tx_monitor_cmd); + self.tx_monitor_cmd.as_ref().unwrap() + } + }; + + let subscription = tx_monitor_cmd.subscribe().map_err(Error::event_monitor)?; + Ok(subscription) + } + /// Does multiple RPC calls to the full node, to check for /// reachability and some basic APIs are available. /// diff --git a/crates/relayer/src/chain/endpoint.rs b/crates/relayer/src/chain/endpoint.rs index 001588c761..9a473e7a03 100644 --- a/crates/relayer/src/chain/endpoint.rs +++ b/crates/relayer/src/chain/endpoint.rs @@ -29,6 +29,7 @@ use tendermint_rpc::endpoint::broadcast::tx_sync::Response as TxResponse; use crate::account::Balance; use crate::chain::client::ClientSettings; +use crate::chain::handle::Subscription; use crate::chain::requests::*; use crate::chain::tracking::TrackedMsgs; use crate::client_state::{AnyClientState, IdentifiedAnyClientState}; @@ -37,17 +38,11 @@ use crate::connection::ConnectionMsgType; use crate::consensus_state::{AnyConsensusState, AnyConsensusStateWithHeight}; use crate::denom::DenomTrace; use crate::error::{Error, QUERY_PROOF_EXPECT_MSG}; -use crate::event::monitor::{EventReceiver, TxMonitorCmd}; use crate::event::IbcEventWithHeight; use crate::keyring::{KeyEntry, KeyRing}; use crate::light_client::AnyHeader; use crate::misbehaviour::MisbehaviourEvidence; -use super::requests::{ - IncludeProof, QueryHeight, QueryPacketAcknowledgementRequest, QueryPacketCommitmentRequest, - QueryPacketReceiptRequest, QueryTxRequest, -}; - /// The result of a health check. #[derive(Debug)] pub enum HealthCheck { @@ -89,18 +84,15 @@ pub trait ChainEndpoint: Sized { /// Constructs the chain fn bootstrap(config: ChainConfig, rt: Arc) -> Result; - /// Initializes and returns the event monitor (if any) associated with this chain. - fn init_event_monitor( - &self, - rt: Arc, - ) -> Result<(EventReceiver, TxMonitorCmd), Error>; - /// Shutdown the chain runtime fn shutdown(self) -> Result<(), Error>; /// Perform a health check fn health_check(&self) -> Result; + // Events + fn subscribe(&mut self) -> Result; + // Keyring /// Returns the chain's keybase diff --git a/crates/relayer/src/chain/runtime.rs b/crates/relayer/src/chain/runtime.rs index f24d3f0445..434ee21f80 100644 --- a/crates/relayer/src/chain/runtime.rs +++ b/crates/relayer/src/chain/runtime.rs @@ -24,20 +24,16 @@ use ibc_relayer_types::{ Height, }; -use crate::chain::requests::QueryPacketEventDataRequest; use crate::{ account::Balance, + chain::requests::QueryPacketEventDataRequest, client_state::{AnyClientState, IdentifiedAnyClientState}, config::ChainConfig, connection::ConnectionMsgType, consensus_state::{AnyConsensusState, AnyConsensusStateWithHeight}, denom::DenomTrace, error::Error, - event::{ - bus::EventBus, - monitor::{EventBatch, EventReceiver, MonitorCmd, Result as MonitorResult, TxMonitorCmd}, - IbcEventWithHeight, - }, + event::IbcEventWithHeight, keyring::KeyEntry, light_client::AnyHeader, misbehaviour::MisbehaviourEvidence, @@ -47,17 +43,7 @@ use super::{ client::ClientSettings, endpoint::{ChainEndpoint, ChainStatus, HealthCheck}, handle::{ChainHandle, ChainRequest, ReplyTo, Subscription}, - requests::{ - IncludeProof, QueryChannelClientStateRequest, QueryChannelRequest, QueryChannelsRequest, - QueryClientConnectionsRequest, QueryClientStateRequest, QueryClientStatesRequest, - QueryConnectionChannelsRequest, QueryConnectionRequest, QueryConnectionsRequest, - QueryConsensusStateRequest, QueryConsensusStatesRequest, QueryHostConsensusStateRequest, - QueryNextSequenceReceiveRequest, QueryPacketAcknowledgementRequest, - QueryPacketAcknowledgementsRequest, QueryPacketCommitmentRequest, - QueryPacketCommitmentsRequest, QueryPacketReceiptRequest, QueryTxRequest, - QueryUnreceivedAcksRequest, QueryUnreceivedPacketsRequest, QueryUpgradedClientStateRequest, - QueryUpgradedConsensusStateRequest, - }, + requests::*, tracking::TrackedMsgs, }; @@ -66,64 +52,6 @@ pub struct Threads { pub event_monitor: Option>, } -#[derive(Debug)] -pub enum EventMonitorCtrl { - None { - /// Empty channel for when the None case - never: EventReceiver, - }, - Live { - /// Receiver channel from the event bus - event_receiver: EventReceiver, - - /// Sender channel to terminate the event monitor - tx_monitor_cmd: TxMonitorCmd, - }, -} - -impl EventMonitorCtrl { - pub fn none() -> Self { - Self::None { - never: channel::never(), - } - } - - pub fn live(event_receiver: EventReceiver, tx_monitor_cmd: TxMonitorCmd) -> Self { - Self::Live { - event_receiver, - tx_monitor_cmd, - } - } - - pub fn enable(&mut self, event_receiver: EventReceiver, tx_monitor_cmd: TxMonitorCmd) { - *self = Self::live(event_receiver, tx_monitor_cmd); - } - - pub fn recv(&self) -> &EventReceiver { - match self { - Self::None { ref never } => never, - Self::Live { - ref event_receiver, .. - } => event_receiver, - } - } - - pub fn shutdown(&self) -> Result<(), Error> { - match self { - Self::None { .. } => Ok(()), - Self::Live { - ref tx_monitor_cmd, .. - } => tx_monitor_cmd - .send(MonitorCmd::Shutdown) - .map_err(Error::send), - } - } - - pub fn is_live(&self) -> bool { - matches!(self, Self::Live { .. }) - } -} - pub struct ChainRuntime { /// The specific chain this runtime runs against chain: Endpoint, @@ -136,12 +64,6 @@ pub struct ChainRuntime { /// in through this channel. request_receiver: channel::Receiver<(Span, ChainRequest)>, - /// An event bus, for broadcasting events that this runtime receives (via `event_receiver`) to subscribers - event_bus: EventBus>>, - - /// Interface to the event monitor - event_monitor_ctrl: EventMonitorCtrl, - #[allow(dead_code)] rt: Arc, // Making this future-proof, so we keep the runtime around. } @@ -194,8 +116,6 @@ where chain, request_sender, request_receiver, - event_bus: EventBus::new(), - event_monitor_ctrl: EventMonitorCtrl::none(), } } @@ -209,18 +129,6 @@ where fn run(mut self) -> Result<(), Error> { loop { channel::select! { - recv(self.event_monitor_ctrl.recv()) -> event_batch => { - match event_batch { - Ok(event_batch) => { - self.event_bus - .broadcast(Arc::new(event_batch)); - }, - Err(e) => { - error!("received error via event bus: {}", e); - return Err(Error::channel_receive(e)); - }, - } - }, recv(self.request_receiver) -> event => { let (span, event) = match event { Ok((span, event)) => (span, event), @@ -234,9 +142,8 @@ where match event { ChainRequest::Shutdown { reply_to } => { - self.event_monitor_ctrl.shutdown()?; - let res = self.chain.shutdown(); + reply_to.send(res) .map_err(Error::send)?; @@ -444,21 +351,8 @@ where } fn subscribe(&mut self, reply_to: ReplyTo) -> Result<(), Error> { - if !self.event_monitor_ctrl.is_live() { - self.enable_event_monitor()?; - } - - let subscription = self.event_bus.subscribe(); - reply_to.send(Ok(subscription)).map_err(Error::send) - } - - fn enable_event_monitor(&mut self) -> Result<(), Error> { - let (event_receiver, tx_monitor_cmd) = self.chain.init_event_monitor(self.rt.clone())?; - - self.event_monitor_ctrl - .enable(event_receiver, tx_monitor_cmd); - - Ok(()) + let subscription = self.chain.subscribe(); + reply_to.send(subscription).map_err(Error::send) } fn send_messages_and_wait_commit( diff --git a/crates/relayer/src/error.rs b/crates/relayer/src/error.rs index bb8bb3ae45..2d36a191b3 100644 --- a/crates/relayer/src/error.rs +++ b/crates/relayer/src/error.rs @@ -300,6 +300,10 @@ define_error! { [ TraceError ] |_| { "internal message-passing failure while receiving inter-thread request/response" }, + ChannelReceiveTimeout + [ TraceError ] + |_| { "timeout when waiting for reponse over inter-thread channel" }, + InvalidInputHeader |_| { "the input header is not recognized as a header for this chain" }, diff --git a/crates/relayer/src/event/monitor.rs b/crates/relayer/src/event/monitor.rs index 6ce7e5f46d..fb7083da1f 100644 --- a/crates/relayer/src/event/monitor.rs +++ b/crates/relayer/src/event/monitor.rs @@ -21,7 +21,7 @@ use ibc_relayer_types::{ }; use crate::{ - chain::tracking::TrackingId, + chain::{handle::Subscription, tracking::TrackingId}, telemetry, util::{ retry::{retry_with_index, RetryResult}, @@ -32,7 +32,7 @@ use crate::{ mod error; pub use error::*; -use super::IbcEventWithHeight; +use super::{bus::EventBus, IbcEventWithHeight}; pub type Result = core::result::Result; @@ -65,11 +65,33 @@ type SubscriptionStream = dyn Stream + Send + Sync + pub type EventSender = channel::Sender>; pub type EventReceiver = channel::Receiver>; -pub type TxMonitorCmd = channel::Sender; + +#[derive(Clone, Debug)] +pub struct TxMonitorCmd(channel::Sender); + +impl TxMonitorCmd { + pub fn shutdown(&self) -> Result<()> { + self.0 + .send(MonitorCmd::Shutdown) + .map_err(|_| Error::channel_send_failed()) + } + + pub fn subscribe(&self) -> Result { + let (tx, rx) = crossbeam_channel::bounded(1); + self.0 + .send(MonitorCmd::Subscribe(tx)) + .map_err(|_| Error::channel_send_failed())?; + + let subscription = rx.recv().map_err(|_| Error::channel_recv_failed())?; + + Ok(subscription) + } +} #[derive(Debug)] pub enum MonitorCmd { Shutdown, + Subscribe(channel::Sender), } /// Connect to a Tendermint node, subscribe to a set of queries, @@ -85,8 +107,8 @@ pub struct EventMonitor { client: WebSocketClient, /// Async task handle for the WebSocket client's driver driver_handle: JoinHandle<()>, - /// Channel to handler where the monitor for this chain sends the events - tx_batch: channel::Sender>, + /// Event bus for broadcasting events + event_bus: EventBus>>, /// Channel where to receive client driver errors rx_err: mpsc::UnboundedReceiver, /// Channel where to send client driver errors @@ -148,8 +170,8 @@ impl EventMonitor { chain_id: ChainId, node_addr: Url, rt: Arc, - ) -> Result<(Self, EventReceiver, TxMonitorCmd)> { - let (tx_batch, rx_batch) = channel::unbounded(); + ) -> Result<(Self, TxMonitorCmd)> { + let event_bus = EventBus::new(); let (tx_cmd, rx_cmd) = channel::unbounded(); let ws_addr = node_addr.clone(); @@ -158,7 +180,7 @@ impl EventMonitor { .map_err(|_| Error::client_creation_failed(chain_id.clone(), node_addr.clone()))?; let (tx_err, rx_err) = mpsc::unbounded_channel(); - let websocket_driver_handle = rt.spawn(run_driver(driver, tx_err.clone())); + let driver_handle = rt.spawn(run_driver(driver, tx_err.clone())); // TODO: move them to config file(?) let event_queries = queries::all(); @@ -167,9 +189,9 @@ impl EventMonitor { rt, chain_id, client, - driver_handle: websocket_driver_handle, + driver_handle, event_queries, - tx_batch, + event_bus, rx_err, tx_err, rx_cmd, @@ -177,7 +199,7 @@ impl EventMonitor { subscriptions: Box::new(futures::stream::empty()), }; - Ok((monitor, rx_batch, tx_cmd)) + Ok((monitor, TxMonitorCmd(tx_cmd))) } /// The list of [`Query`] that this event monitor is subscribing for. @@ -186,8 +208,8 @@ impl EventMonitor { } /// Clear the current subscriptions, and subscribe again to all queries. - #[instrument(name = "event_monitor.subscribe", skip_all, fields(chain = %self.chain_id))] - pub fn subscribe(&mut self) -> Result<()> { + #[instrument(name = "event_monitor.init_subscriptions", skip_all, fields(chain = %self.chain_id))] + pub fn init_subscriptions(&mut self) -> Result<()> { let mut subscriptions = vec![]; for query in &self.event_queries { @@ -260,7 +282,7 @@ impl EventMonitor { )] fn try_resubscribe(&mut self) -> Result<()> { trace!("trying to resubscribe to events"); - self.subscribe() + self.init_subscriptions() } /// Attempt to reconnect the WebSocket client using the given retry strategy. @@ -348,8 +370,11 @@ impl EventMonitor { let rt = self.rt.clone(); loop { - if let Ok(MonitorCmd::Shutdown) = self.rx_cmd.try_recv() { - return Next::Abort; + if let Ok(cmd) = self.rx_cmd.try_recv() { + match cmd { + MonitorCmd::Shutdown => return Next::Abort, + MonitorCmd::Subscribe(tx) => tx.send(self.event_bus.subscribe()).unwrap(), + } } let result = rt.block_on(async { @@ -366,16 +391,12 @@ impl EventMonitor { } match result { - Ok(batch) => self.process_batch(batch).unwrap_or_else(|e| { - error!("error while processing batch: {}", e); - }), + Ok(batch) => self.process_batch(batch), Err(e) => { if let ErrorDetail::SubscriptionCancelled(reason) = e.detail() { error!("subscription cancelled, reason: {}", reason); - self.propagate_error(e).unwrap_or_else(|e| { - error!("{}", e); - }); + self.propagate_error(e); telemetry!(ws_reconnect, &self.chain_id); @@ -413,23 +434,15 @@ impl EventMonitor { /// and to trigger a clearing of packets, as this typically means that we have /// missed a bunch of events which were emitted after the subscription was closed. /// In that case, this error will be handled in [`Supervisor::handle_batch`]. - fn propagate_error(&self, error: Error) -> Result<()> { - self.tx_batch - .send(Err(error)) - .map_err(|_| Error::channel_send_failed())?; - - Ok(()) + fn propagate_error(&mut self, error: Error) { + self.event_bus.broadcast(Arc::new(Err(error))); } /// Collect the IBC events from the subscriptions - fn process_batch(&self, batch: EventBatch) -> Result<()> { + fn process_batch(&mut self, batch: EventBatch) { telemetry!(ws_events, &batch.chain_id, batch.events.len() as u64); - self.tx_batch - .send(Ok(batch)) - .map_err(|_| Error::channel_send_failed())?; - - Ok(()) + self.event_bus.broadcast(Arc::new(Ok(batch))); } } diff --git a/crates/relayer/src/event/monitor/error.rs b/crates/relayer/src/event/monitor/error.rs index c21f1d358e..01486a9670 100644 --- a/crates/relayer/src/event/monitor/error.rs +++ b/crates/relayer/src/event/monitor/error.rs @@ -36,7 +36,10 @@ define_error! { |e| { format!("failed to extract IBC events: {0}", e.reason) }, ChannelSendFailed - |_| { "internal message-passing failure: monitor could not reach chain handler" }, + |_| { "event monitor: internal message-passing failure: could not send message" }, + + ChannelRecvFailed + |_| { "event monitor: internal message-passing failure: could not receive message" }, SubscriptionCancelled [ TraceError ]