Skip to content

Commit

Permalink
Fix crash during initialization of event monitor when node is down (i…
Browse files Browse the repository at this point in the history
…nformalsystems#895)

* Introduce error type for event monitor

* Do not crash if chain runtime fails to subscribe to events

* Gracefully handle runtime init error in `start-multi` command

* Improve worker error context

* Fix chain id in error message if spawning chain runtime fails

* Print channel error when one arises

* Refactor event monitor loop a little bit

* Update changelog
  • Loading branch information
romac authored May 6, 2021
1 parent d2f246f commit d7abc91
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 104 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Jongwhan Lee (@leejw51crypto) ([#878]).
- Fix pagination in gRPC query for clients ([#811])
- Fix relayer crash when hermes starts in the same time as packets are being sent ([#851])
- Fix missing port information in `hermes query channels` ([#840])
- Fix crash during initialization of event monitor when node is down ([#863])

- [ibc-relayer-cli]
- Fix for `ft-transfer` mismatching arguments ([#869])
Expand All @@ -51,6 +52,7 @@ Jongwhan Lee (@leejw51crypto) ([#878]).
[#851]: https://github.com/informalsystems/ibc-rs/issues/851
[#854]: https://github.com/informalsystems/ibc-rs/issues/854
[#861]: https://github.com/informalsystems/ibc-rs/issues/861
[#863]: https://github.com/informalsystems/ibc-rs/issues/863
[#869]: https://github.com/informalsystems/ibc-rs/issues/869
[#878]: https://github.com/informalsystems/ibc-rs/issues/878

Expand Down
22 changes: 20 additions & 2 deletions relayer-cli/src/commands/start_multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,26 @@ fn start_all_connections(config: &Config) -> Result<Output, BoxError> {
conn.a_chain, conn.b_chain
);

let chain_a = registry.get_or_spawn(&conn.a_chain)?;
let chain_b = registry.get_or_spawn(&conn.b_chain)?;
let chain_a = registry.get_or_spawn(&conn.a_chain);
let chain_b = registry.get_or_spawn(&conn.b_chain);

let (chain_a, chain_b) = match (chain_a, chain_b) {
(Ok(a), Ok(b)) => (a, b),
(Err(err), _) => {
error!(
"failed to initialize runtime for chain '{}': {}",
conn.a_chain, err
);
continue;
}
(_, Err(err)) => {
error!(
"failed to initialize runtime for chain '{}': {}",
conn.b_chain, err
);
continue;
}
};

s.spawn(|_| {
let supervisor = Supervisor::spawn(chain_a, chain_b).unwrap();
Expand Down
6 changes: 4 additions & 2 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,11 @@ impl Chain for CosmosSdkChain {
self.config.id.clone(),
self.config.websocket_addr.clone(),
rt,
)?;
)
.map_err(Kind::EventMonitor)?;

event_monitor.subscribe().map_err(Kind::EventMonitor)?;

event_monitor.subscribe().unwrap();
let monitor_thread = thread::spawn(move || event_monitor.run());

Ok((event_receiver, Some(monitor_thread)))
Expand Down
45 changes: 24 additions & 21 deletions relayer/src/chain/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,38 @@ use std::{sync::Arc, thread};

use crossbeam_channel as channel;
use tokio::runtime::Runtime as TokioRuntime;
use tracing::error;

use ibc::ics02_client::client_consensus::AnyConsensusStateWithHeight;
use ibc::ics02_client::events::UpdateClient;
use ibc::ics02_client::misbehaviour::AnyMisbehaviour;
use ibc::ics04_channel::channel::IdentifiedChannelEnd;
use ibc::{
events::IbcEvent,
ics02_client::{
client_consensus::{AnyConsensusState, ConsensusState},
client_consensus::{AnyConsensusState, AnyConsensusStateWithHeight, ConsensusState},
client_state::{AnyClientState, ClientState},
events::UpdateClient,
header::{AnyHeader, Header},
misbehaviour::AnyMisbehaviour,
},
ics03_connection::{connection::ConnectionEnd, version::Version},
ics04_channel::{
channel::ChannelEnd,
packet::{PacketMsgType, Sequence},
},
ics03_connection::connection::ConnectionEnd,
ics03_connection::version::Version,
ics04_channel::channel::ChannelEnd,
ics04_channel::packet::{PacketMsgType, Sequence},
ics23_commitment::commitment::CommitmentPrefix,
ics24_host::identifier::{ChannelId, ClientId, ConnectionId, PortId},
proofs::Proofs,
query::QueryTxRequest,
signer::Signer,
Height,
};
use ibc_proto::ibc::core::channel::v1::QueryChannelsRequest;
use ibc_proto::ibc::core::client::v1::QueryConsensusStatesRequest;

use ibc_proto::ibc::core::{
channel::v1::{
PacketState, QueryNextSequenceReceiveRequest, QueryPacketAcknowledgementsRequest,
QueryPacketCommitmentsRequest, QueryUnreceivedAcksRequest, QueryUnreceivedPacketsRequest,
PacketState, QueryChannelsRequest, QueryNextSequenceReceiveRequest,
QueryPacketAcknowledgementsRequest, QueryPacketCommitmentsRequest,
QueryUnreceivedAcksRequest, QueryUnreceivedPacketsRequest,
},
client::v1::QueryConsensusStatesRequest,
commitment::v1::MerkleProof,
};

Expand Down Expand Up @@ -91,10 +93,10 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
let light_client = chain.init_light_client()?;

// Start the event monitor
let (event_receiver, event_monitor_thread) = chain.init_event_monitor(rt.clone())?;
let (event_batch_rx, event_monitor_thread) = chain.init_event_monitor(rt.clone())?;

// Instantiate & spawn the runtime
let (handle, runtime_thread) = Self::init(chain, light_client, event_receiver, rt);
let (handle, runtime_thread) = Self::init(chain, light_client, event_batch_rx, rt);

let threads = Threads {
chain_runtime: runtime_thread,
Expand Down Expand Up @@ -153,12 +155,13 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
loop {
channel::select! {
recv(self.event_receiver) -> event_batch => {
if let Ok(event_batch) = event_batch {
self.event_bus
.broadcast(Arc::new(event_batch))
.map_err(|e| Kind::Channel.context(e))?;
} else {
// TODO: Handle error
match event_batch {
Ok(event_batch) => {
self.event_bus
.broadcast(Arc::new(event_batch))
.map_err(|e| Kind::Channel.context(e))?;
},
Err(e) => error!("received error via event bus: {}", e),
}
},
recv(self.request_receiver) -> event => {
Expand Down Expand Up @@ -296,7 +299,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
self.query_txs(request, reply_to)?
},

Err(_e) => todo!(), // TODO: Handle error?
Err(e) => error!("received error via chain request channel: {}", e),
}
},
}
Expand Down
4 changes: 4 additions & 0 deletions relayer/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pub enum Kind {
#[error("Websocket error to endpoint {0}")]
Websocket(tendermint_rpc::Url),

/// Event monitor error
#[error("Event monitor")]
EventMonitor(crate::event::monitor::Error),

/// GRPC error (typically raised by the GRPC client or the GRPC requester)
#[error("GRPC error")]
Grpc,
Expand Down
Loading

0 comments on commit d7abc91

Please sign in to comment.