Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make supervisor more resilient to node going down #903

Merged
merged 16 commits into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use tokio::runtime::Runtime as TokioRuntime;
use tendermint_rpc::query::{EventType, Query};

use ibc::ics24_host::identifier::ChainId;
use ibc_relayer::{config::ChainConfig, event::monitor::*};
use ibc_relayer::{
config::ChainConfig,
event::monitor::{Error as EventMonitorError, EventBatch, EventMonitor},
};

use crate::prelude::*;

Expand Down Expand Up @@ -73,7 +76,13 @@ fn subscribe(
chain_config: &ChainConfig,
queries: Vec<Query>,
rt: Arc<TokioRuntime>,
) -> Result<(EventMonitor, channel::Receiver<EventBatch>), BoxError> {
) -> Result<
(
EventMonitor,
channel::Receiver<Result<EventBatch, EventMonitorError>>,
),
BoxError,
> {
let (mut event_monitor, rx) = EventMonitor::new(
chain_config.id.clone(),
chain_config.websocket_addr.clone(),
Expand Down
59 changes: 34 additions & 25 deletions relayer-cli/src/commands/misbehaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use ibc::ics02_client::events::UpdateClient;
use ibc::ics02_client::height::Height;
use ibc::ics24_host::identifier::{ChainId, ClientId};
use ibc_relayer::chain::handle::ChainHandle;
use ibc_relayer::event::monitor::UnwrapOrClone;
use ibc_relayer::foreign_client::{ForeignClient, MisbehaviourResults};

use crate::application::CliApp;
Expand Down Expand Up @@ -52,32 +53,40 @@ pub fn monitor_misbehaviour(
let subscription = chain.subscribe()?;

// check previous updates that may have been missed
misbehaviour_handling(chain.clone(), config, client_id, None)?;
misbehaviour_handling(chain.clone(), config, client_id.clone(), None)?;

// process update client events
while let Ok(event_batch) = subscription.recv() {
for event in event_batch.events.iter() {
match event {
IbcEvent::UpdateClient(update) => {
debug!("{:?}", update);
misbehaviour_handling(
chain.clone(),
config,
update.client_id(),
Some(update.clone()),
)?;
let event_batch = event_batch.unwrap_or_clone();
match event_batch {
Ok(event_batch) => {
for event in event_batch.events {
match event {
IbcEvent::UpdateClient(update) => {
debug!("{:?}", update);
misbehaviour_handling(
chain.clone(),
config,
update.client_id().clone(),
Some(update),
)?;
}

IbcEvent::CreateClient(_create) => {
// TODO - get header from full node, consensus state from chain, compare
}

IbcEvent::ClientMisbehaviour(ref _misbehaviour) => {
// TODO - submit misbehaviour to the witnesses (our full node)
return Ok(Some(event));
}

_ => {}
}
}

IbcEvent::CreateClient(_create) => {
// TODO - get header from full node, consensus state from chain, compare
}

IbcEvent::ClientMisbehaviour(_misbehaviour) => {
// TODO - submit misbehaviour to the witnesses (our full node)
return Ok(Some(event.clone()));
}

_ => {}
}
Err(e) => {
dbg!(e);
}
}
}
Expand All @@ -88,11 +97,11 @@ pub fn monitor_misbehaviour(
fn misbehaviour_handling(
chain: Box<dyn ChainHandle>,
config: &config::Reader<CliApp>,
client_id: &ClientId,
client_id: ClientId,
update: Option<UpdateClient>,
) -> Result<(), BoxError> {
let client_state = chain
.query_client_state(client_id, Height::zero())
.query_client_state(&client_id, Height::zero())
.map_err(|e| format!("could not query client state for {}: {}", client_id, e))?;

if client_state.is_frozen() {
Expand All @@ -108,7 +117,7 @@ fn misbehaviour_handling(
)
})?;

let client = ForeignClient::restore(client_id, chain.clone(), counterparty_chain.clone());
let client = ForeignClient::restore(&client_id, chain.clone(), counterparty_chain.clone());
let result = client.detect_misbehaviour_and_submit_evidence(update);
if let MisbehaviourResults::EvidenceSubmitted(events) = result {
info!("evidence submission result {:?}", events);
Expand Down
2 changes: 1 addition & 1 deletion relayer-cli/src/commands/start_multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ fn start_all_connections(config: &Config) -> Result<Output, BoxError> {
});

match result {
Ok(Ok(())) => Ok(Output::success_msg("ok")),
Ok(Ok(())) => Ok(Output::success_msg("supervisor shutdown")),
Ok(Err(e)) => Err(e),
Err(e) => std::panic::resume_unwind(e),
}
Expand Down
12 changes: 2 additions & 10 deletions relayer/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{sync::Arc, thread};

use crossbeam_channel as channel;
use prost_types::Any;
use tendermint::block::Height;
use tokio::runtime::Runtime as TokioRuntime;
Expand Down Expand Up @@ -33,12 +32,11 @@ use ibc_proto::ibc::core::connection::v1::{
QueryClientConnectionsRequest, QueryConnectionsRequest,
};

use crate::config::ChainConfig;
use crate::connection::ConnectionMsgType;
use crate::error::{Error, Kind};
use crate::event::monitor::EventBatch;
use crate::keyring::{KeyEntry, KeyRing};
use crate::light_client::LightClient;
use crate::{config::ChainConfig, event::monitor::EventReceiver};

pub(crate) mod cosmos;
pub mod handle;
Expand Down Expand Up @@ -89,13 +87,7 @@ pub trait Chain: Sized {
fn init_event_monitor(
&self,
rt: Arc<TokioRuntime>,
) -> Result<
(
channel::Receiver<EventBatch>,
Option<thread::JoinHandle<()>>,
),
Error,
>;
) -> Result<(EventReceiver, Option<thread::JoinHandle<()>>), Error>;

/// Returns the chain's identifier
fn id(&self) -> &ChainId;
Expand Down
11 changes: 2 additions & 9 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
use anomaly::fail;
use bech32::{ToBase32, Variant};
use bitcoin::hashes::hex::ToHex;
use crossbeam_channel as channel;
use prost::Message;
use prost_types::Any;
use tendermint::abci::Path as TendermintABCIPath;
Expand Down Expand Up @@ -65,7 +64,7 @@ use ibc_proto::ibc::core::connection::v1::{
use crate::chain::QueryResponse;
use crate::config::ChainConfig;
use crate::error::{Error, Kind};
use crate::event::monitor::{EventBatch, EventMonitor};
use crate::event::monitor::{EventMonitor, EventReceiver};
use crate::keyring::{KeyEntry, KeyRing, Store};
use crate::light_client::tendermint::LightClient as TmLightClient;
use crate::light_client::LightClient;
Expand Down Expand Up @@ -361,13 +360,7 @@ impl Chain for CosmosSdkChain {
fn init_event_monitor(
&self,
rt: Arc<TokioRuntime>,
) -> Result<
(
channel::Receiver<EventBatch>,
Option<thread::JoinHandle<()>>,
),
Error,
> {
) -> Result<(EventReceiver, Option<thread::JoinHandle<()>>), Error> {
crate::time!("init_event_monitor");

let (mut event_monitor, event_receiver) = EventMonitor::new(
Expand Down
11 changes: 7 additions & 4 deletions relayer/src/chain/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ use ibc_proto::ibc::core::client::v1::QueryConsensusStatesRequest;
use ibc_proto::ibc::core::commitment::v1::MerkleProof;
pub use prod::ProdChainHandle;

use crate::connection::ConnectionMsgType;
use crate::keyring::KeyEntry;
use crate::{error::Error, event::monitor::EventBatch};
use crate::{
connection::ConnectionMsgType,
error::Error,
event::monitor::{Error as EventMonitorError, EventBatch},
keyring::KeyEntry,
};

mod prod;

pub type Subscription = channel::Receiver<Arc<EventBatch>>;
pub type Subscription = channel::Receiver<Arc<Result<EventBatch, EventMonitorError>>>;
romac marked this conversation as resolved.
Show resolved Hide resolved

pub type ReplyTo<T> = channel::Sender<Result<T, Error>>;
pub type Reply<T> = channel::Receiver<Result<T, Error>>;
Expand Down
10 changes: 2 additions & 8 deletions relayer/src/chain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use ibc_proto::ibc::core::connection::v1::{
use crate::chain::Chain;
use crate::config::ChainConfig;
use crate::error::{Error, Kind};
use crate::event::monitor::EventBatch;
use crate::event::monitor::EventReceiver;
use crate::keyring::{KeyEntry, KeyRing};
use crate::light_client::{mock::LightClient as MockLightClient, LightClient};

Expand Down Expand Up @@ -79,13 +79,7 @@ impl Chain for MockChain {
fn init_event_monitor(
&self,
_rt: Arc<Runtime>,
) -> Result<
(
channel::Receiver<EventBatch>,
Option<thread::JoinHandle<()>>,
),
Error,
> {
) -> Result<(EventReceiver, Option<thread::JoinHandle<()>>), Error> {
let (_, rx) = channel::unbounded();
Ok((rx, None))
}
Expand Down
13 changes: 8 additions & 5 deletions relayer/src/chain/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ use crate::{
config::ChainConfig,
connection::ConnectionMsgType,
error::{Error, Kind},
event::{bus::EventBus, monitor::EventBatch},
event::{
bus::EventBus,
monitor::{Error as EventMonitorError, EventBatch},
},
keyring::KeyEntry,
light_client::LightClient,
};
Expand Down Expand Up @@ -69,10 +72,10 @@ pub struct ChainRuntime<C: Chain> {
request_receiver: channel::Receiver<ChainRequest>,

/// An event bus, for broadcasting events that this runtime receives (via `event_receiver`) to subscribers
event_bus: EventBus<Arc<EventBatch>>,
event_bus: EventBus<Arc<Result<EventBatch, EventMonitorError>>>,

/// Receiver channel from the event bus
event_receiver: channel::Receiver<EventBatch>,
event_receiver: channel::Receiver<Result<EventBatch, EventMonitorError>>,

/// A handle to the light client
light_client: Box<dyn LightClient<C>>,
Expand Down Expand Up @@ -110,7 +113,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
fn init(
chain: C,
light_client: Box<dyn LightClient<C>>,
event_receiver: channel::Receiver<EventBatch>,
event_receiver: channel::Receiver<Result<EventBatch, EventMonitorError>>,
rt: Arc<TokioRuntime>,
) -> (Box<dyn ChainHandle>, thread::JoinHandle<()>) {
let chain_runtime = Self::new(chain, light_client, event_receiver, rt);
Expand All @@ -128,7 +131,7 @@ impl<C: Chain + Send + 'static> ChainRuntime<C> {
fn new(
chain: C,
light_client: Box<dyn LightClient<C>>,
event_receiver: channel::Receiver<EventBatch>,
event_receiver: channel::Receiver<Result<EventBatch, EventMonitorError>>,
rt: Arc<TokioRuntime>,
) -> Self {
let (request_sender, request_receiver) = channel::unbounded::<ChainRequest>();
Expand Down
Loading