Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crash during initialization of event monitor when node is down #895

Merged
merged 10 commits into from
May 6, 2021
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Jongwhan Lee (@leejw51crypto) ([#878]).
- Fix pagination in gRPC query for clients ([#811])
- Fix relayer crash when hermes starts in the same time as packets are being sent ([#851])
- Fix missing port information in `hermes query channels` ([#840])
- Fix crash during initialization of event monitor when node is down ([#863])

- [ibc-relayer-cli]
- Fix for `ft-transfer` mismatching arguments ([#869])
Expand All @@ -51,6 +52,7 @@ Jongwhan Lee (@leejw51crypto) ([#878]).
[#851]: https://github.com/informalsystems/ibc-rs/issues/851
[#854]: https://github.com/informalsystems/ibc-rs/issues/854
[#861]: https://github.com/informalsystems/ibc-rs/issues/861
[#863]: https://github.com/informalsystems/ibc-rs/issues/863
[#869]: https://github.com/informalsystems/ibc-rs/issues/869
[#878]: https://github.com/informalsystems/ibc-rs/issues/878

Expand Down
22 changes: 20 additions & 2 deletions relayer-cli/src/commands/start_multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,26 @@ fn start_all_connections(config: &Config) -> Result<Output, BoxError> {
conn.a_chain, conn.b_chain
);

let chain_a = registry.get_or_spawn(&conn.a_chain)?;
let chain_b = registry.get_or_spawn(&conn.b_chain)?;
let chain_a = registry.get_or_spawn(&conn.a_chain);
let chain_b = registry.get_or_spawn(&conn.b_chain);

let (chain_a, chain_b) = match (chain_a, chain_b) {
(Ok(a), Ok(b)) => (a, b),
(Err(err), _) => {
error!(
"failed to initialize runtime for chain '{}': {}",
conn.a_chain, err
);
continue;
}
(_, Err(err)) => {
error!(
"failed to initialize runtime for chain '{}': {}",
conn.b_chain, err
);
continue;
}
};

s.spawn(|_| {
let supervisor = Supervisor::spawn(chain_a, chain_b).unwrap();
Expand Down
6 changes: 4 additions & 2 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,11 @@ impl Chain for CosmosSdkChain {
self.config.id.clone(),
self.config.websocket_addr.clone(),
rt,
)?;
)
.map_err(Kind::EventMonitor)?;

event_monitor.subscribe().map_err(Kind::EventMonitor)?;

event_monitor.subscribe().unwrap();
let monitor_thread = thread::spawn(move || event_monitor.run());

Ok((event_receiver, Some(monitor_thread)))
Expand Down
45 changes: 24 additions & 21 deletions relayer/src/chain/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,38 @@ use std::{sync::Arc, thread};

use crossbeam_channel as channel;
use tokio::runtime::Runtime as TokioRuntime;
use tracing::error;

use ibc::ics02_client::client_consensus::AnyConsensusStateWithHeight;
use ibc::ics02_client::events::UpdateClient;
use ibc::ics02_client::misbehaviour::AnyMisbehaviour;
use ibc::ics04_channel::channel::IdentifiedChannelEnd;
use ibc::{
events::IbcEvent,
ics02_client::{
client_consensus::{AnyConsensusState, ConsensusState},
client_consensus::{AnyConsensusState, AnyConsensusStateWithHeight, ConsensusState},
client_state::{AnyClientState, ClientState},
events::UpdateClient,
header::{AnyHeader, Header},
misbehaviour::AnyMisbehaviour,
},
ics03_connection::{connection::ConnectionEnd, version::Version},
ics04_channel::{
channel::ChannelEnd,
packet::{PacketMsgType, Sequence},
},
ics03_connection::connection::ConnectionEnd,
ics03_connection::version::Version,
ics04_channel::channel::ChannelEnd,
ics04_channel::packet::{PacketMsgType, Sequence},
ics23_commitment::commitment::CommitmentPrefix,
ics24_host::identifier::{ChannelId, ClientId, ConnectionId, PortId},
proofs::Proofs,
query::QueryTxRequest,
signer::Signer,
Height,
};
use ibc_proto::ibc::core::channel::v1::QueryChannelsRequest;
use ibc_proto::ibc::core::client::v1::QueryConsensusStatesRequest;

use ibc_proto::ibc::core::{
channel::v1::{
PacketState, QueryNextSequenceReceiveRequest, QueryPacketAcknowledgementsRequest,
QueryPacketCommitmentsRequest, QueryUnreceivedAcksRequest, QueryUnreceivedPacketsRequest,
PacketState, QueryChannelsRequest, QueryNextSequenceReceiveRequest,
QueryPacketAcknowledgementsRequest, QueryPacketCommitmentsRequest,
QueryUnreceivedAcksRequest, QueryUnreceivedPacketsRequest,
},
client::v1::QueryConsensusStatesRequest,
commitment::v1::MerkleProof,
};

Expand Down Expand Up @@ -91,10 +93,10 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
let light_client = chain.init_light_client()?;

// Start the event monitor
let (event_receiver, event_monitor_thread) = chain.init_event_monitor(rt.clone())?;
let (event_batch_rx, event_monitor_thread) = chain.init_event_monitor(rt.clone())?;

// Instantiate & spawn the runtime
let (handle, runtime_thread) = Self::init(chain, light_client, event_receiver, rt);
let (handle, runtime_thread) = Self::init(chain, light_client, event_batch_rx, rt);

let threads = Threads {
chain_runtime: runtime_thread,
Expand Down Expand Up @@ -153,12 +155,13 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
loop {
channel::select! {
recv(self.event_receiver) -> event_batch => {
if let Ok(event_batch) = event_batch {
self.event_bus
.broadcast(Arc::new(event_batch))
.map_err(|e| Kind::Channel.context(e))?;
} else {
// TODO: Handle error
match event_batch {
Ok(event_batch) => {
self.event_bus
.broadcast(Arc::new(event_batch))
.map_err(|e| Kind::Channel.context(e))?;
},
Err(e) => error!("received error via event bus: {}", e),
}
},
recv(self.request_receiver) -> event => {
Expand Down Expand Up @@ -296,7 +299,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
self.query_txs(request, reply_to)?
},

Err(_e) => todo!(), // TODO: Handle error?
Err(e) => error!("received error via chain request channel: {}", e),
}
},
}
Expand Down
4 changes: 4 additions & 0 deletions relayer/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pub enum Kind {
#[error("Websocket error to endpoint {0}")]
Websocket(tendermint_rpc::Url),

/// Event monitor error
#[error("Event monitor")]
EventMonitor(crate::event::monitor::Error),

/// GRPC error (typically raised by the GRPC client or the GRPC requester)
#[error("GRPC error")]
Grpc,
Expand Down
Loading