Skip to content

Commit

Permalink
Merge pull request #1352 from anoma/tiago/ethbridge/oracle-ping
Browse files Browse the repository at this point in the history
Keep oracle running if the Ethereum node is still available
  • Loading branch information
sug0 authored May 12, 2023
2 parents 2e1b7bd + 5dd5bb3 commit ed5e2d9
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 77 deletions.
4 changes: 2 additions & 2 deletions apps/src/lib/client/eth_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::time::{Duration, Instant};
use web30::client::Web3;

use crate::cli;
use crate::control_flow::timeouts::TimeoutStrategy;
use crate::control_flow::timeouts::SleepStrategy;
use crate::node::ledger::ethereum_oracle::eth_syncing_status;

/// Arguments to [`block_on_eth_sync`].
Expand All @@ -35,7 +35,7 @@ pub async fn block_on_eth_sync(args: BlockOnEthSync<'_>) {
} = args;
tracing::info!("Attempting to synchronize with the Ethereum network");
let client = Web3::new(url, rpc_timeout);
TimeoutStrategy::LinearBackoff { delta: delta_sleep }
SleepStrategy::LinearBackoff { delta: delta_sleep }
.timeout(deadline, || async {
let local_set = LocalSet::new();
let status_fut = local_set
Expand Down
4 changes: 1 addition & 3 deletions apps/src/lib/client/eth_bridge/validator_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ enum RelayResult {
GovernanceCallError(String),
/// Some nonce related error occurred.
///
/// The following comparison must hold:
///
/// contract + 1 = argument
/// The following comparison must hold: `contract + 1 = argument`.
NonceError {
/// The value of the [`Epoch`] argument passed via CLI.
argument: Epoch,
Expand Down
4 changes: 2 additions & 2 deletions apps/src/lib/client/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use crate::client::tendermint_rpc_types::TxResponse;
use crate::client::tx::{
Conversions, PinnedBalanceError, TransactionDelta, TransferDelta,
};
use crate::control_flow::timeouts::TimeoutStrategy;
use crate::control_flow::timeouts::SleepStrategy;
use crate::facade::tendermint::merkle::proof::Proof;
use crate::facade::tendermint_config::net::Address as TendermintAddress;
use crate::facade::tendermint_rpc::error::Error as TError;
Expand All @@ -78,7 +78,7 @@ pub async fn query_tx_status(
deadline: Instant,
) -> Event {
let client = HttpClient::new(address).unwrap();
TimeoutStrategy::LinearBackoff {
SleepStrategy::LinearBackoff {
delta: Duration::from_secs(1),
}
.timeout(deadline, || async {
Expand Down
53 changes: 33 additions & 20 deletions apps/src/lib/control_flow/timeouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ use std::ops::ControlFlow;
use tokio::time::error::Elapsed;
use tokio::time::{Duration, Instant};

/// A timeout strategy to
/// A sleep strategy to be applied to fallible runs of arbitrary tasks.
#[derive(Debug, Clone)]
pub enum TimeoutStrategy {
/// A constant timeout strategy.
pub enum SleepStrategy {
/// Constant sleep.
Constant(Duration),
/// A linear timeout strategy.
/// Linear backoff sleep.
LinearBackoff {
/// The amount of time added to each consecutive timeout.
/// The amount of time added to each consecutive run.
delta: Duration,
},
}

impl TimeoutStrategy {
impl SleepStrategy {
/// Sleep and update the `backoff` timeout, if necessary.
async fn sleep_update(&self, backoff: &mut Duration) {
match self {
Expand All @@ -32,31 +32,44 @@ impl TimeoutStrategy {
}
}

/// Execute a fallible task.
///
/// Different retries will result in a sleep operation,
/// with the current [`SleepStrategy`].
pub async fn run<T, F, G>(&self, mut future_gen: G) -> T
where
G: FnMut() -> F,
F: Future<Output = ControlFlow<T>>,
{
let mut backoff = Duration::from_secs(0);
loop {
let fut = future_gen();
match fut.await {
ControlFlow::Continue(()) => {
self.sleep_update(&mut backoff).await;
}
ControlFlow::Break(ret) => break ret,
}
}
}

/// Run a time constrained task until the given deadline.
///
/// Different retries will result in a sleep operation,
/// with the current [`TimeoutStrategy`].
/// with the current [`SleepStrategy`].
pub async fn timeout<T, F, G>(
&self,
deadline: Instant,
mut future_gen: G,
future_gen: G,
) -> Result<T, Elapsed>
where
G: FnMut() -> F,
F: Future<Output = ControlFlow<T>>,
{
tokio::time::timeout_at(deadline, async move {
let mut backoff = Duration::from_secs(0);
loop {
let fut = future_gen();
match fut.await {
ControlFlow::Continue(()) => {
self.sleep_update(&mut backoff).await;
}
ControlFlow::Break(ret) => break ret,
}
}
})
tokio::time::timeout_at(
deadline,
async move { self.run(future_gen).await },
)
.await
}
}
103 changes: 70 additions & 33 deletions apps/src/lib/node/ledger/ethereum_oracle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::time::Duration;

use clarity::Address;
use ethbridge_events::{event_codecs, EventKind};
use eyre::eyre;
use namada::core::hints;
use namada::core::types::ethereum_structs;
use namada::eth_bridge::oracle::config::Config;
use namada::types::ethereum_events::EthereumEvent;
Expand All @@ -26,7 +26,7 @@ use self::events::PendingEvent;
#[cfg(test)]
use self::test_tools::mock_web3_client::Web3;
use super::abortable::AbortableSpawner;
use crate::control_flow::timeouts::TimeoutStrategy;
use crate::control_flow::timeouts::SleepStrategy;
use crate::node::ledger::oracle::control::Command;

/// The default amount of time the oracle will wait between processing blocks
Expand All @@ -37,12 +37,6 @@ const DEFAULT_CEILING: Duration = std::time::Duration::from_secs(30);
pub enum Error {
#[error("Ethereum node has fallen out of sync")]
FallenBehind,
#[error(
"Couldn't get the latest synced Ethereum block height from the RPC \
endpoint: {0}"
)]
#[allow(dead_code)]
FetchHeight(String),
#[error(
"Couldn't check for events ({0} from {1}) with the RPC endpoint: {2}"
)]
Expand All @@ -53,12 +47,13 @@ pub enum Error {
"Need more confirmations for oracle to continue processing blocks."
)]
MoreConfirmations,
#[error("The Ethereum oracle timed out")]
Timeout,
}

/// The result of querying an Ethereum nodes syncing status.
pub enum SyncStatus {
/// The fullnode is syncing.
#[allow(dead_code)]
Syncing,
/// The fullnode is synced up to the given block height.
AtHeight(Uint256),
Expand All @@ -85,6 +80,7 @@ pub struct Oracle {
/// How long the oracle should wait between checking blocks
backoff: Duration,
/// How long the oracle should allow the fullnode to be unresponsive
#[cfg_attr(test, allow(dead_code))]
ceiling: Duration,
/// A channel for controlling and configuring the oracle.
control: control::Receiver,
Expand All @@ -99,15 +95,39 @@ impl Deref for Oracle {
}

/// Fetch the sync status of an Ethereum node.
#[inline]
pub async fn eth_syncing_status(
client: &web30::client::Web3,
) -> Result<SyncStatus, Error> {
match client.eth_block_number().await {
Ok(height) if height == 0u64.into() => Ok(SyncStatus::Syncing),
Ok(height) => Ok(SyncStatus::AtHeight(height)),
Err(Web3Error::SyncingNode(_)) => Ok(SyncStatus::Syncing),
Err(error) => Err(Error::FetchHeight(error.to_string())),
}
eth_syncing_status_timeout(
client,
DEFAULT_BACKOFF,
Instant::now() + DEFAULT_CEILING,
)
.await
}

/// Fetch the sync status of an Ethereum node, with a custom time
/// out duration.
///
/// Queries to the Ethereum node are interspersed with constant backoff
/// sleeps of `backoff_duration`, before ultimately timing out at `deadline`.
pub async fn eth_syncing_status_timeout(
client: &web30::client::Web3,
backoff_duration: Duration,
deadline: Instant,
) -> Result<SyncStatus, Error> {
SleepStrategy::Constant(backoff_duration)
.timeout(deadline, || async {
ControlFlow::Break(match client.eth_block_number().await {
Ok(height) if height == 0u64.into() => SyncStatus::Syncing,
Ok(height) => SyncStatus::AtHeight(height),
Err(Web3Error::SyncingNode(_)) => SyncStatus::Syncing,
Err(_) => return ControlFlow::Continue(()),
})
})
.await
.map_or_else(|_| Err(Error::Timeout), Ok)
}

impl Oracle {
Expand Down Expand Up @@ -140,7 +160,10 @@ impl Oracle {
/// number is 0 or not.
#[cfg(not(test))]
async fn syncing(&self) -> Result<SyncStatus, Error> {
match eth_syncing_status(&self.client).await? {
let deadline = Instant::now() + self.ceiling;
match eth_syncing_status_timeout(&self.client, self.backoff, deadline)
.await?
{
s @ SyncStatus::Syncing => Ok(s),
SyncStatus::AtHeight(height) => {
match &*self.last_processed_block.borrow() {
Expand Down Expand Up @@ -274,17 +297,32 @@ async fn run_oracle_aux(mut oracle: Oracle) {
?next_block_to_process,
"Checking Ethereum block for bridge events"
);
let deadline = Instant::now() + oracle.ceiling;
let res = TimeoutStrategy::Constant(oracle.backoff).timeout(deadline, || async {
let res = SleepStrategy::Constant(oracle.backoff).run(|| async {
tokio::select! {
result = process(&oracle, &config, next_block_to_process.clone()) => {
match result {
Ok(()) => {
ControlFlow::Break(Ok(()))
},
Err(
reason @ (
Error::Timeout
| Error::Channel(_, _)
| Error::CheckEvents(_, _, _)
)
) => {
tracing::error!(
%reason,
block = ?next_block_to_process,
"The Ethereum oracle has disconnected"
);
ControlFlow::Break(Err(()))
}
Err(error) => {
tracing::warn!(
?error,
// this is a recoverable error, hence the debug log,
// to avoid spamming info logs
tracing::debug!(
%error,
block = ?next_block_to_process,
"Error while trying to process Ethereum block"
);
Expand All @@ -297,25 +335,24 @@ async fn run_oracle_aux(mut oracle: Oracle) {
"Ethereum oracle can not send events to the ledger; the \
receiver has hung up. Shutting down"
);
ControlFlow::Break(Err(eyre!("Shutting down.")))
ControlFlow::Break(Err(()))
}
}
})
.await
.expect("Oracle timed out while trying to communicate with the Ethereum fullnode.");
.await;

if res.is_err() {
if hints::unlikely(res.is_err()) {
break;
} else {
oracle
.last_processed_block
.send_replace(Some(next_block_to_process.clone()));
// check if a new config has been sent.
if let Some(new_config) = oracle.update_config() {
config = new_config;
}
next_block_to_process += 1.into();
}

oracle
.last_processed_block
.send_replace(Some(next_block_to_process.clone()));
// check if a new config has been sent.
if let Some(new_config) = oracle.update_config() {
config = new_config;
}
next_block_to_process += 1.into();
}
}

Expand Down
31 changes: 14 additions & 17 deletions apps/src/lib/node/ledger/ethereum_oracle/test_tools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,21 @@ pub mod mock_web3_client {

/// Check and apply new incoming commands
fn check_cmd_channel(&self) {
let cmd =
if let Ok(cmd) = self.0.borrow_mut().cmd_channel.try_recv() {
cmd
} else {
return;
};
match cmd {
TestCmd::Normal => self.0.borrow_mut().active = true,
TestCmd::Unresponsive => self.0.borrow_mut().active = false,
TestCmd::NewHeight(height) => {
self.0.borrow_mut().latest_block_height = height
let mut oracle = self.0.borrow_mut();
while let Ok(cmd) = oracle.cmd_channel.try_recv() {
match cmd {
TestCmd::Normal => oracle.active = true,
TestCmd::Unresponsive => oracle.active = false,
TestCmd::NewHeight(height) => {
oracle.latest_block_height = height
}
TestCmd::NewEvent {
event_type: ty,
data,
height,
seen,
} => oracle.events.push((ty, data, height, seen)),
}
TestCmd::NewEvent {
event_type: ty,
data,
height,
seen,
} => self.0.borrow_mut().events.push((ty, data, height, seen)),
}
}

Expand Down

0 comments on commit ed5e2d9

Please sign in to comment.