Skip to content

Commit

Permalink
Mock ledger services in integration tests
Browse files Browse the repository at this point in the history
This enables processing Ethereum events, broadcasting protocol
transactions, among other things, while running integration tests.
  • Loading branch information
sug0 committed Oct 9, 2023
1 parent 2368969 commit da706ad
Show file tree
Hide file tree
Showing 5 changed files with 402 additions and 108 deletions.
141 changes: 92 additions & 49 deletions apps/src/lib/node/ledger/ethereum_oracle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::ops::ControlFlow;
use async_trait::async_trait;
use ethabi::Address;
use ethbridge_events::{event_codecs, EventKind};
use itertools::Either;
use namada::core::hints;
use namada::core::types::ethereum_structs;
use namada::eth_bridge::ethers;
Expand Down Expand Up @@ -75,13 +76,6 @@ pub trait RpcClient {
/// Ethereum event log.
type Log: IntoEthAbiLog;

/// Whether we should stop running the Ethereum oracle
/// if a call to [`Self::check_events_in_block`] fails.
///
/// This is only useful for testing purposes. In general,
/// no implementation should override this constant.
const EXIT_ON_EVENTS_FAILURE: bool = true;

/// Instantiate a new client, pointing to the
/// given RPC url.
fn new_client(rpc_url: &str) -> Self
Expand All @@ -108,6 +102,10 @@ pub trait RpcClient {
backoff: Duration,
deadline: Instant,
) -> Result<SyncStatus, Error>;

/// Given its current state, check if this RPC client
/// may recover from the given [`enum@Error`].
fn may_recover(&self, error: &Error) -> bool;
}

#[async_trait(?Send)]
Expand Down Expand Up @@ -172,6 +170,14 @@ impl RpcClient for Provider<Http> {
},
}
}

#[inline(always)]
fn may_recover(&self, error: &Error) -> bool {
!matches!(
error,
Error::Timeout | Error::Channel(_, _) | Error::CheckEvents(_, _, _)
)
}
}

/// A client that can talk to geth and parse
Expand All @@ -197,15 +203,18 @@ impl<C: RpcClient> Oracle<C> {
/// Construct a new [`Oracle`]. Note that it can not do anything until it
/// has been sent a configuration via the passed in `control` channel.
pub fn new(
url: &str,
client_or_url: Either<C, &str>,
sender: BoundedSender<EthereumEvent>,
last_processed_block: last_processed_block::Sender,
backoff: Duration,
ceiling: Duration,
control: control::Receiver,
) -> Self {
Self {
client: C::new_client(url),
client: match client_or_url {
Either::Left(client) => client,
Either::Right(url) => C::new_client(url),
},
sender,
backoff,
ceiling,
Expand Down Expand Up @@ -275,7 +284,7 @@ pub fn run_oracle<C: RpcClient>(
tracing::info!(?url, "Ethereum event oracle is starting");

let oracle = Oracle::<C>::new(
&url,
Either::Right(&url),
sender,
last_processed_block,
DEFAULT_BACKOFF,
Expand All @@ -300,6 +309,75 @@ pub fn run_oracle<C: RpcClient>(
.with_no_cleanup()
}

/// Determine what action to take after attempting to
/// process events contained in an Ethereum block.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum ProcessEventAction {
/// No events could be processed at this time, so we must keep
/// polling for new events.
ContinuePollingEvents,
/// Some error occurred while processing Ethereum events in
/// the current height. We must halt the oracle.
HaltOracle,
/// The current Ethereum block height has been processed.
/// We must advance to the next Ethereum height.
ProceedToNextBlock,
}

impl ProcessEventAction {
/// Check whether the action commands a new block to be processed.
#[inline]
pub fn process_new_block(&self) -> bool {
matches!(self, Self::ProceedToNextBlock)
}
}

impl ProcessEventAction {
/// Handles the requested oracle action, translating it to a format
/// understood by the set of [`Sleep`] abstractions.
fn handle(self) -> ControlFlow<Result<(), ()>, ()> {
match self {
ProcessEventAction::ContinuePollingEvents => {
ControlFlow::Continue(())
}
ProcessEventAction::HaltOracle => ControlFlow::Break(Err(())),
ProcessEventAction::ProceedToNextBlock => {
ControlFlow::Break(Ok(()))
}
}
}
}

/// Tentatively process a batch of Ethereum events.
pub(crate) async fn try_process_eth_events<C: RpcClient>(
oracle: &Oracle<C>,
config: &Config,
next_block_to_process: &ethereum_structs::BlockHeight,
) -> ProcessEventAction {
process_events_in_block(next_block_to_process, oracle, config)
.await
.map_or_else(
|error| {
if oracle.client.may_recover(&error) {
tracing::debug!(
%error,
block = ?next_block_to_process,
"Error while trying to process Ethereum block"
);
ProcessEventAction::ContinuePollingEvents
} else {
tracing::error!(
reason = %error,
block = ?next_block_to_process,
"The Ethereum oracle has disconnected"
);
ProcessEventAction::HaltOracle
}
},
|()| ProcessEventAction::ProceedToNextBlock,
)
}

/// Given an oracle, watch for new Ethereum events, processing
/// them into Namada native types.
///
Expand Down Expand Up @@ -334,43 +412,8 @@ async fn run_oracle_aux<C: RpcClient>(mut oracle: Oracle<C>) {
);
let res = Sleep { strategy: Constant(oracle.backoff) }.run(|| async {
tokio::select! {
result = process(&oracle, &config, next_block_to_process.clone()) => {
match result {
Ok(()) => {
ControlFlow::Break(Ok(()))
},
Err(
reason @ (
Error::Timeout
| Error::Channel(_, _)
| Error::CheckEvents(_, _, _)
)
) => {
// the oracle is unresponsive, we don't want the test to end
if !C::EXIT_ON_EVENTS_FAILURE
&& matches!(&reason, Error::CheckEvents(_, _, _))
{
tracing::debug!("Allowing the Ethereum oracle to keep running");
return ControlFlow::Continue(());
}
tracing::error!(
%reason,
block = ?next_block_to_process,
"The Ethereum oracle has disconnected"
);
ControlFlow::Break(Err(()))
}
Err(error) => {
// this is a recoverable error, hence the debug log,
// to avoid spamming info logs
tracing::debug!(
%error,
block = ?next_block_to_process,
"Error while trying to process Ethereum block"
);
ControlFlow::Continue(())
}
}
action = try_process_eth_events(&oracle, &config, &next_block_to_process) => {
action.handle()
},
_ = oracle.sender.closed() => {
tracing::info!(
Expand Down Expand Up @@ -400,10 +443,10 @@ async fn run_oracle_aux<C: RpcClient>(mut oracle: Oracle<C>) {

/// Checks if the given block has any events relating to the bridge, and if so,
/// sends them to the oracle's `sender` channel
async fn process<C: RpcClient>(
async fn process_events_in_block<C: RpcClient>(
block_to_process: &ethereum_structs::BlockHeight,
oracle: &Oracle<C>,
config: &Config,
block_to_process: ethereum_structs::BlockHeight,
) -> Result<(), Error> {
let mut queue: Vec<PendingEvent> = vec![];
let pending = &mut queue;
Expand Down
20 changes: 12 additions & 8 deletions apps/src/lib/node/ledger/ethereum_oracle/test_tools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub mod event_log {
}
}

#[cfg(test)]
#[cfg(any(test, feature = "testing"))]
pub mod mock_web3_client {
use std::borrow::Cow;
use std::fmt::Debug;
Expand Down Expand Up @@ -102,7 +102,7 @@ pub mod mock_web3_client {
/// reason is for interior mutability.
pub struct Web3Client(Arc<Mutex<Web3ClientInner>>);

/// Command sender for [`Web3`] instances.
/// Command sender for [`TestOracle`] instances.
pub struct Web3Controller(Arc<Mutex<Web3ClientInner>>);

impl Web3Controller {
Expand Down Expand Up @@ -148,8 +148,6 @@ pub mod mock_web3_client {
impl RpcClient for Web3Client {
type Log = ethabi::RawLog;

const EXIT_ON_EVENTS_FAILURE: bool = false;

#[cold]
fn new_client(_: &str) -> Self
where
Expand Down Expand Up @@ -184,14 +182,15 @@ pub mod mock_web3_client {
}
if client.last_block_processed.as_ref() < Some(&block_to_check)
{
client
.blocks_processed
.send(block_to_check.clone())
.unwrap();
_ = client.blocks_processed.send(block_to_check.clone());
client.last_block_processed = Some(block_to_check);
}
Ok(logs)
} else {
tracing::debug!(
"No events to be processed by the Test Ethereum oracle, \
as it has been artificially set as unresponsive"
);
Err(Error::CheckEvents(
ty.into(),
addr,
Expand All @@ -209,6 +208,11 @@ pub mod mock_web3_client {
let height = self.0.lock().unwrap().latest_block_height.clone();
Ok(SyncStatus::AtHeight(height))
}

#[inline(always)]
fn may_recover(&self, _: &Error) -> bool {
true
}
}

impl Web3Client {
Expand Down
Loading

0 comments on commit da706ad

Please sign in to comment.