Skip to content

Commit

Permalink
Merge branch 'tiago/int-test-services' (#1976)
Browse files Browse the repository at this point in the history
* origin/tiago/int-test-services:
  Changelog for #1976
  Improve reading flow of mock node service defs
  Mock ledger services in integration tests
  • Loading branch information
tzemanovic authored and brentstone committed Nov 11, 2023
2 parents 2abc5cd + 9c69e4b commit 6a06a50
Show file tree
Hide file tree
Showing 6 changed files with 432 additions and 108 deletions.
2 changes: 2 additions & 0 deletions .changelog/unreleased/testing/1976-int-test-services.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Mock ledger services in integration tests
([\#1976](https://github.com/anoma/namada/pull/1976))
141 changes: 92 additions & 49 deletions apps/src/lib/node/ledger/ethereum_oracle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::ops::ControlFlow;
use async_trait::async_trait;
use ethabi::Address;
use ethbridge_events::{event_codecs, EventKind};
use itertools::Either;
use namada::core::hints;
use namada::core::types::ethereum_structs;
use namada::eth_bridge::ethers;
Expand Down Expand Up @@ -75,13 +76,6 @@ pub trait RpcClient {
/// Ethereum event log.
type Log: IntoEthAbiLog;

/// Whether we should stop running the Ethereum oracle
/// if a call to [`Self::check_events_in_block`] fails.
///
/// This is only useful for testing purposes. In general,
/// no implementation should override this constant.
const EXIT_ON_EVENTS_FAILURE: bool = true;

/// Instantiate a new client, pointing to the
/// given RPC url.
fn new_client(rpc_url: &str) -> Self
Expand All @@ -108,6 +102,10 @@ pub trait RpcClient {
backoff: Duration,
deadline: Instant,
) -> Result<SyncStatus, Error>;

/// Given its current state, check if this RPC client
/// may recover from the given [`enum@Error`].
fn may_recover(&self, error: &Error) -> bool;
}

#[async_trait(?Send)]
Expand Down Expand Up @@ -172,6 +170,14 @@ impl RpcClient for Provider<Http> {
},
}
}

#[inline(always)]
fn may_recover(&self, error: &Error) -> bool {
!matches!(
error,
Error::Timeout | Error::Channel(_, _) | Error::CheckEvents(_, _, _)
)
}
}

/// A client that can talk to geth and parse
Expand All @@ -197,15 +203,18 @@ impl<C: RpcClient> Oracle<C> {
/// Construct a new [`Oracle`]. Note that it can not do anything until it
/// has been sent a configuration via the passed in `control` channel.
pub fn new(
url: &str,
client_or_url: Either<C, &str>,
sender: BoundedSender<EthereumEvent>,
last_processed_block: last_processed_block::Sender,
backoff: Duration,
ceiling: Duration,
control: control::Receiver,
) -> Self {
Self {
client: C::new_client(url),
client: match client_or_url {
Either::Left(client) => client,
Either::Right(url) => C::new_client(url),
},
sender,
backoff,
ceiling,
Expand Down Expand Up @@ -275,7 +284,7 @@ pub fn run_oracle<C: RpcClient>(
tracing::info!(?url, "Ethereum event oracle is starting");

let oracle = Oracle::<C>::new(
&url,
Either::Right(&url),
sender,
last_processed_block,
DEFAULT_BACKOFF,
Expand All @@ -300,6 +309,75 @@ pub fn run_oracle<C: RpcClient>(
.with_no_cleanup()
}

/// Determine what action to take after attempting to
/// process events contained in an Ethereum block.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum ProcessEventAction {
/// No events could be processed at this time, so we must keep
/// polling for new events.
ContinuePollingEvents,
/// Some error occurred while processing Ethereum events in
/// the current height. We must halt the oracle.
HaltOracle,
/// The current Ethereum block height has been processed.
/// We must advance to the next Ethereum height.
ProceedToNextBlock,
}

impl ProcessEventAction {
/// Check whether the action commands a new block to be processed.
#[inline]
pub fn process_new_block(&self) -> bool {
matches!(self, Self::ProceedToNextBlock)
}
}

impl ProcessEventAction {
/// Handles the requested oracle action, translating it to a format
/// understood by the set of [`Sleep`] abstractions.
fn handle(self) -> ControlFlow<Result<(), ()>, ()> {
match self {
ProcessEventAction::ContinuePollingEvents => {
ControlFlow::Continue(())
}
ProcessEventAction::HaltOracle => ControlFlow::Break(Err(())),
ProcessEventAction::ProceedToNextBlock => {
ControlFlow::Break(Ok(()))
}
}
}
}

/// Tentatively process a batch of Ethereum events.
pub(crate) async fn try_process_eth_events<C: RpcClient>(
oracle: &Oracle<C>,
config: &Config,
next_block_to_process: &ethereum_structs::BlockHeight,
) -> ProcessEventAction {
process_events_in_block(next_block_to_process, oracle, config)
.await
.map_or_else(
|error| {
if oracle.client.may_recover(&error) {
tracing::debug!(
%error,
block = ?next_block_to_process,
"Error while trying to process Ethereum block"
);
ProcessEventAction::ContinuePollingEvents
} else {
tracing::error!(
reason = %error,
block = ?next_block_to_process,
"The Ethereum oracle has disconnected"
);
ProcessEventAction::HaltOracle
}
},
|()| ProcessEventAction::ProceedToNextBlock,
)
}

/// Given an oracle, watch for new Ethereum events, processing
/// them into Namada native types.
///
Expand Down Expand Up @@ -334,43 +412,8 @@ async fn run_oracle_aux<C: RpcClient>(mut oracle: Oracle<C>) {
);
let res = Sleep { strategy: Constant(oracle.backoff) }.run(|| async {
tokio::select! {
result = process(&oracle, &config, next_block_to_process.clone()) => {
match result {
Ok(()) => {
ControlFlow::Break(Ok(()))
},
Err(
reason @ (
Error::Timeout
| Error::Channel(_, _)
| Error::CheckEvents(_, _, _)
)
) => {
// the oracle is unresponsive, we don't want the test to end
if !C::EXIT_ON_EVENTS_FAILURE
&& matches!(&reason, Error::CheckEvents(_, _, _))
{
tracing::debug!("Allowing the Ethereum oracle to keep running");
return ControlFlow::Continue(());
}
tracing::error!(
%reason,
block = ?next_block_to_process,
"The Ethereum oracle has disconnected"
);
ControlFlow::Break(Err(()))
}
Err(error) => {
// this is a recoverable error, hence the debug log,
// to avoid spamming info logs
tracing::debug!(
%error,
block = ?next_block_to_process,
"Error while trying to process Ethereum block"
);
ControlFlow::Continue(())
}
}
action = try_process_eth_events(&oracle, &config, &next_block_to_process) => {
action.handle()
},
_ = oracle.sender.closed() => {
tracing::info!(
Expand Down Expand Up @@ -400,10 +443,10 @@ async fn run_oracle_aux<C: RpcClient>(mut oracle: Oracle<C>) {

/// Checks if the given block has any events relating to the bridge, and if so,
/// sends them to the oracle's `sender` channel
async fn process<C: RpcClient>(
async fn process_events_in_block<C: RpcClient>(
block_to_process: &ethereum_structs::BlockHeight,
oracle: &Oracle<C>,
config: &Config,
block_to_process: ethereum_structs::BlockHeight,
) -> Result<(), Error> {
let mut queue: Vec<PendingEvent> = vec![];
let pending = &mut queue;
Expand Down
20 changes: 12 additions & 8 deletions apps/src/lib/node/ledger/ethereum_oracle/test_tools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub mod event_log {
}
}

#[cfg(test)]
#[cfg(any(test, feature = "testing"))]
pub mod mock_web3_client {
use std::borrow::Cow;
use std::fmt::Debug;
Expand Down Expand Up @@ -102,7 +102,7 @@ pub mod mock_web3_client {
/// reason is for interior mutability.
pub struct Web3Client(Arc<Mutex<Web3ClientInner>>);

/// Command sender for [`Web3`] instances.
/// Command sender for [`TestOracle`] instances.
pub struct Web3Controller(Arc<Mutex<Web3ClientInner>>);

impl Web3Controller {
Expand Down Expand Up @@ -148,8 +148,6 @@ pub mod mock_web3_client {
impl RpcClient for Web3Client {
type Log = ethabi::RawLog;

const EXIT_ON_EVENTS_FAILURE: bool = false;

#[cold]
fn new_client(_: &str) -> Self
where
Expand Down Expand Up @@ -184,14 +182,15 @@ pub mod mock_web3_client {
}
if client.last_block_processed.as_ref() < Some(&block_to_check)
{
client
.blocks_processed
.send(block_to_check.clone())
.unwrap();
_ = client.blocks_processed.send(block_to_check.clone());
client.last_block_processed = Some(block_to_check);
}
Ok(logs)
} else {
tracing::debug!(
"No events to be processed by the Test Ethereum oracle, \
as it has been artificially set as unresponsive"
);
Err(Error::CheckEvents(
ty.into(),
addr,
Expand All @@ -209,6 +208,11 @@ pub mod mock_web3_client {
let height = self.0.lock().unwrap().latest_block_height.clone();
Ok(SyncStatus::AtHeight(height))
}

#[inline(always)]
fn may_recover(&self, _: &Error) -> bool {
true
}
}

impl Web3Client {
Expand Down
Loading

0 comments on commit 6a06a50

Please sign in to comment.