diff --git a/apps/src/lib/node/ledger/ethereum_oracle/mod.rs b/apps/src/lib/node/ledger/ethereum_oracle/mod.rs index 6980778c07..6c5a251a4e 100644 --- a/apps/src/lib/node/ledger/ethereum_oracle/mod.rs +++ b/apps/src/lib/node/ledger/ethereum_oracle/mod.rs @@ -7,6 +7,7 @@ use std::ops::ControlFlow; use async_trait::async_trait; use ethabi::Address; use ethbridge_events::{event_codecs, EventKind}; +use itertools::Either; use namada::core::hints; use namada::core::types::ethereum_structs; use namada::eth_bridge::ethers; @@ -75,13 +76,6 @@ pub trait RpcClient { /// Ethereum event log. type Log: IntoEthAbiLog; - /// Whether we should stop running the Ethereum oracle - /// if a call to [`Self::check_events_in_block`] fails. - /// - /// This is only useful for testing purposes. In general, - /// no implementation should override this constant. - const EXIT_ON_EVENTS_FAILURE: bool = true; - /// Instantiate a new client, pointing to the /// given RPC url. fn new_client(rpc_url: &str) -> Self @@ -108,6 +102,10 @@ pub trait RpcClient { backoff: Duration, deadline: Instant, ) -> Result; + + /// Given its current state, check if this RPC client + /// may recover from the given [`enum@Error`]. + fn may_recover(&self, error: &Error) -> bool; } #[async_trait(?Send)] @@ -172,6 +170,14 @@ impl RpcClient for Provider { }, } } + + #[inline(always)] + fn may_recover(&self, error: &Error) -> bool { + !matches!( + error, + Error::Timeout | Error::Channel(_, _) | Error::CheckEvents(_, _, _) + ) + } } /// A client that can talk to geth and parse @@ -197,7 +203,7 @@ impl Oracle { /// Construct a new [`Oracle`]. Note that it can not do anything until it /// has been sent a configuration via the passed in `control` channel. pub fn new( - url: &str, + client_or_url: Either, sender: BoundedSender, last_processed_block: last_processed_block::Sender, backoff: Duration, @@ -205,7 +211,10 @@ impl Oracle { control: control::Receiver, ) -> Self { Self { - client: C::new_client(url), + client: match client_or_url { + Either::Left(client) => client, + Either::Right(url) => C::new_client(url), + }, sender, backoff, ceiling, @@ -275,7 +284,7 @@ pub fn run_oracle( tracing::info!(?url, "Ethereum event oracle is starting"); let oracle = Oracle::::new( - &url, + Either::Right(&url), sender, last_processed_block, DEFAULT_BACKOFF, @@ -300,6 +309,75 @@ pub fn run_oracle( .with_no_cleanup() } +/// Determine what action to take after attempting to +/// process events contained in an Ethereum block. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub(crate) enum ProcessEventAction { + /// No events could be processed at this time, so we must keep + /// polling for new events. + ContinuePollingEvents, + /// Some error occurred while processing Ethereum events in + /// the current height. We must halt the oracle. + HaltOracle, + /// The current Ethereum block height has been processed. + /// We must advance to the next Ethereum height. + ProceedToNextBlock, +} + +impl ProcessEventAction { + /// Check whether the action commands a new block to be processed. + #[inline] + pub fn process_new_block(&self) -> bool { + matches!(self, Self::ProceedToNextBlock) + } +} + +impl ProcessEventAction { + /// Handles the requested oracle action, translating it to a format + /// understood by the set of [`Sleep`] abstractions. + fn handle(self) -> ControlFlow, ()> { + match self { + ProcessEventAction::ContinuePollingEvents => { + ControlFlow::Continue(()) + } + ProcessEventAction::HaltOracle => ControlFlow::Break(Err(())), + ProcessEventAction::ProceedToNextBlock => { + ControlFlow::Break(Ok(())) + } + } + } +} + +/// Tentatively process a batch of Ethereum events. +pub(crate) async fn try_process_eth_events( + oracle: &Oracle, + config: &Config, + next_block_to_process: ðereum_structs::BlockHeight, +) -> ProcessEventAction { + process_events_in_block(next_block_to_process, oracle, config) + .await + .map_or_else( + |error| { + if oracle.client.may_recover(&error) { + tracing::debug!( + %error, + block = ?next_block_to_process, + "Error while trying to process Ethereum block" + ); + ProcessEventAction::ContinuePollingEvents + } else { + tracing::error!( + reason = %error, + block = ?next_block_to_process, + "The Ethereum oracle has disconnected" + ); + ProcessEventAction::HaltOracle + } + }, + |()| ProcessEventAction::ProceedToNextBlock, + ) +} + /// Given an oracle, watch for new Ethereum events, processing /// them into Namada native types. /// @@ -334,43 +412,8 @@ async fn run_oracle_aux(mut oracle: Oracle) { ); let res = Sleep { strategy: Constant(oracle.backoff) }.run(|| async { tokio::select! { - result = process(&oracle, &config, next_block_to_process.clone()) => { - match result { - Ok(()) => { - ControlFlow::Break(Ok(())) - }, - Err( - reason @ ( - Error::Timeout - | Error::Channel(_, _) - | Error::CheckEvents(_, _, _) - ) - ) => { - // the oracle is unresponsive, we don't want the test to end - if !C::EXIT_ON_EVENTS_FAILURE - && matches!(&reason, Error::CheckEvents(_, _, _)) - { - tracing::debug!("Allowing the Ethereum oracle to keep running"); - return ControlFlow::Continue(()); - } - tracing::error!( - %reason, - block = ?next_block_to_process, - "The Ethereum oracle has disconnected" - ); - ControlFlow::Break(Err(())) - } - Err(error) => { - // this is a recoverable error, hence the debug log, - // to avoid spamming info logs - tracing::debug!( - %error, - block = ?next_block_to_process, - "Error while trying to process Ethereum block" - ); - ControlFlow::Continue(()) - } - } + action = try_process_eth_events(&oracle, &config, &next_block_to_process) => { + action.handle() }, _ = oracle.sender.closed() => { tracing::info!( @@ -400,10 +443,10 @@ async fn run_oracle_aux(mut oracle: Oracle) { /// Checks if the given block has any events relating to the bridge, and if so, /// sends them to the oracle's `sender` channel -async fn process( +async fn process_events_in_block( + block_to_process: ðereum_structs::BlockHeight, oracle: &Oracle, config: &Config, - block_to_process: ethereum_structs::BlockHeight, ) -> Result<(), Error> { let mut queue: Vec = vec![]; let pending = &mut queue; diff --git a/apps/src/lib/node/ledger/ethereum_oracle/test_tools/mod.rs b/apps/src/lib/node/ledger/ethereum_oracle/test_tools/mod.rs index 9a2454be17..0479445b7f 100644 --- a/apps/src/lib/node/ledger/ethereum_oracle/test_tools/mod.rs +++ b/apps/src/lib/node/ledger/ethereum_oracle/test_tools/mod.rs @@ -57,7 +57,7 @@ pub mod event_log { } } -#[cfg(test)] +#[cfg(any(test, feature = "testing"))] pub mod mock_web3_client { use std::borrow::Cow; use std::fmt::Debug; @@ -102,7 +102,7 @@ pub mod mock_web3_client { /// reason is for interior mutability. pub struct Web3Client(Arc>); - /// Command sender for [`Web3`] instances. + /// Command sender for [`TestOracle`] instances. pub struct Web3Controller(Arc>); impl Web3Controller { @@ -148,8 +148,6 @@ pub mod mock_web3_client { impl RpcClient for Web3Client { type Log = ethabi::RawLog; - const EXIT_ON_EVENTS_FAILURE: bool = false; - #[cold] fn new_client(_: &str) -> Self where @@ -184,14 +182,15 @@ pub mod mock_web3_client { } if client.last_block_processed.as_ref() < Some(&block_to_check) { - client - .blocks_processed - .send(block_to_check.clone()) - .unwrap(); + _ = client.blocks_processed.send(block_to_check.clone()); client.last_block_processed = Some(block_to_check); } Ok(logs) } else { + tracing::debug!( + "No events to be processed by the Test Ethereum oracle, \ + as it has been artificially set as unresponsive" + ); Err(Error::CheckEvents( ty.into(), addr, @@ -209,6 +208,11 @@ pub mod mock_web3_client { let height = self.0.lock().unwrap().latest_block_height.clone(); Ok(SyncStatus::AtHeight(height)) } + + #[inline(always)] + fn may_recover(&self, _: &Error) -> bool { + true + } } impl Web3Client { diff --git a/apps/src/lib/node/ledger/shell/testing/node.rs b/apps/src/lib/node/ledger/shell/testing/node.rs index 034ac80845..88cb778699 100644 --- a/apps/src/lib/node/ledger/shell/testing/node.rs +++ b/apps/src/lib/node/ledger/shell/testing/node.rs @@ -1,11 +1,16 @@ +use std::future::poll_fn; use std::mem::ManuallyDrop; use std::path::PathBuf; use std::str::FromStr; use std::sync::{Arc, Mutex}; +use std::task::Poll; use color_eyre::eyre::{Report, Result}; use data_encoding::HEXUPPER; +use itertools::Either; use lazy_static::lazy_static; +use namada::core::types::ethereum_structs; +use namada::eth_bridge::oracle::config::Config as OracleConfig; use namada::ledger::events::log::dumb_queries; use namada::ledger::queries::{ EncodedResponseQuery, RequestCtx, RequestQuery, Router, RPC, @@ -23,13 +28,15 @@ use namada::sdk::queries::Client; use namada::tendermint_proto::abci::VoteInfo; use namada::tendermint_rpc::endpoint::abci_info; use namada::tendermint_rpc::SimpleRequest; +use namada::types::control_flow::time::Duration; +use namada::types::ethereum_events::EthereumEvent; use namada::types::hash::Hash; use namada::types::key::tm_consensus_key_raw_hash; use namada::types::storage::{BlockHash, BlockHeight, Epoch, Header}; use namada::types::time::DateTimeUtc; use num_traits::cast::FromPrimitive; use regex::Regex; -use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::mpsc; use crate::facade::tendermint_proto::abci::response_process_proposal::ProposalStatus; use crate::facade::tendermint_proto::abci::{ @@ -38,14 +45,184 @@ use crate::facade::tendermint_proto::abci::{ use crate::facade::tendermint_rpc::endpoint::abci_info::AbciInfo; use crate::facade::tendermint_rpc::error::Error as RpcError; use crate::facade::{tendermint, tendermint_rpc}; +use crate::node::ledger::ethereum_oracle::test_tools::mock_web3_client::{ + TestOracle, Web3Client, Web3Controller, +}; +use crate::node::ledger::ethereum_oracle::{ + control, last_processed_block, try_process_eth_events, +}; use crate::node::ledger::shell::testing::utils::TestDir; -use crate::node::ledger::shell::{ErrorCodes, Shell}; +use crate::node::ledger::shell::{ErrorCodes, EthereumOracleChannels, Shell}; use crate::node::ledger::shims::abcipp_shim_types::shim::request::{ FinalizeBlock, ProcessedTx, }; use crate::node::ledger::shims::abcipp_shim_types::shim::response::TxResult; use crate::node::ledger::storage; +/// Mock services data returned by [`mock_services`]. +pub struct MockServicesPackage { + /// Whether to automatically drive mock services or not. + pub auto_drive_services: bool, + /// Mock services stored by the [`MockNode`]. + pub services: MockServices, + /// Handlers to mock services stored by the [`Shell`]. + pub shell_handlers: MockServiceShellHandlers, + /// Handler to the mock services controller. + pub controller: MockServicesController, +} + +/// Mock services config. +pub struct MockServicesCfg { + /// Whether to automatically drive mock services or not. + pub auto_drive_services: bool, + /// Whether to enable the Ethereum oracle or not. + pub enable_eth_oracle: bool, +} + +/// Instantiate mock services for a node. +pub fn mock_services(cfg: MockServicesCfg) -> MockServicesPackage { + let (_, eth_client) = Web3Client::setup(); + let (eth_sender, eth_receiver) = mpsc::channel(1000); + let (last_processed_block_sender, last_processed_block_receiver) = + last_processed_block::channel(); + let (control_sender, control_receiver) = control::channel(); + let eth_oracle_controller = eth_client.controller(); + let oracle = TestOracle::new( + Either::Left(eth_client), + eth_sender.clone(), + last_processed_block_sender, + Duration::from_millis(5), + Duration::from_secs(30), + control_receiver, + ); + let eth_oracle_channels = EthereumOracleChannels::new( + eth_receiver, + control_sender, + last_processed_block_receiver, + ); + let (tx_broadcaster, tx_receiver) = mpsc::unbounded_channel(); + let ethereum_oracle = MockEthOracle { + oracle, + config: Default::default(), + next_block_to_process: Default::default(), + }; + MockServicesPackage { + auto_drive_services: cfg.auto_drive_services, + services: MockServices { + tx_receiver, + ethereum_oracle, + }, + shell_handlers: MockServiceShellHandlers { + tx_broadcaster: tx_broadcaster.clone(), + eth_oracle_channels: cfg + .enable_eth_oracle + .then_some(eth_oracle_channels), + }, + controller: MockServicesController { + eth_oracle: eth_oracle_controller, + eth_events: eth_sender, + tx_broadcaster, + }, + } +} + +/// Controller of various mock node services. +pub struct MockServicesController { + /// Ethereum oracle controller. + pub eth_oracle: Web3Controller, + /// Handler to the Ethereum oracle sender channel. + /// + /// Bypasses the Ethereum oracle service and sends + /// events directly to the [`Shell`]. + pub eth_events: mpsc::Sender, + /// Transaction broadcaster handle. + pub tx_broadcaster: mpsc::UnboundedSender>, +} + +/// Service handlers to be passed to a [`Shell`], when building +/// a mock node. +pub struct MockServiceShellHandlers { + /// Transaction broadcaster handle. + pub tx_broadcaster: mpsc::UnboundedSender>, + /// Ethereum oracle channel handlers. + pub eth_oracle_channels: Option, +} + +/// Services mocking the operation of the ledger's various async tasks. +pub struct MockServices { + /// Receives transactions that are supposed to be broadcasted + /// to the network. + pub tx_receiver: mpsc::UnboundedReceiver>, + /// Mock Ethereum oracle, that processes blocks from Ethereum + /// in order to find events emitted by a transaction to vote on. + pub ethereum_oracle: MockEthOracle, +} + +/// Actions to be performed by the mock node, as a result +/// of driving [`MockServices`]. +pub enum MockServiceAction { + /// The ledger should broadcast a new transaction. + BroadcastTx(Vec), +} + +impl MockServices { + /// Drive the internal state machine of the mock node's services. + pub async fn drive(&mut self) -> Vec { + let mut actions = vec![]; + + // process new eth events + // NOTE: this may result in a deadlock, if the events + // sent to the shell exceed the capacity of the oracle's + // events channel! + self.ethereum_oracle.drive().await; + + // receive txs from the broadcaster + while let Some(tx) = + poll_fn(|cx| match self.tx_receiver.poll_recv(cx) { + Poll::Pending => Poll::Ready(None), + poll => poll, + }) + .await + { + actions.push(MockServiceAction::BroadcastTx(tx)); + } + + actions + } +} + +/// Mock Ethereum oracle used for testing purposes. +pub struct MockEthOracle { + /// The inner oracle. + pub oracle: TestOracle, + /// The inner oracle's configuration. + pub config: OracleConfig, + /// The inner oracle's next block to process. + pub next_block_to_process: ethereum_structs::BlockHeight, +} + +impl MockEthOracle { + /// Updates the state of the Ethereum oracle. + /// + /// This includes sending any confirmed Ethereum events to + /// the shell and updating the height of the next Ethereum + /// block to process. Upon a successfully processed block, + /// this functions returns `true`. + pub async fn drive(&mut self) -> bool { + let new_block = try_process_eth_events( + &self.oracle, + &self.config, + &self.next_block_to_process, + ) + .await + .process_new_block(); + if new_block { + self.next_block_to_process += 1.into(); + } + new_block + } +} + /// Status of tx #[derive(Debug, Clone, PartialEq, Eq)] pub enum NodeResults { @@ -61,8 +238,9 @@ pub struct MockNode { pub shell: Arc>>, pub test_dir: ManuallyDrop, pub keep_temp: bool, - pub _broadcast_recv: UnboundedReceiver>, pub results: Arc>>, + pub services: Arc>, + pub auto_drive_services: bool, } impl Drop for MockNode { @@ -82,6 +260,26 @@ impl Drop for MockNode { } impl MockNode { + pub async fn handle_service_action(&self, action: MockServiceAction) { + match action { + MockServiceAction::BroadcastTx(tx) => { + _ = self.broadcast_tx_sync(tx.into()).await; + } + } + } + + async fn drive_mock_services(&self) { + if self.auto_drive_services { + let actions = { + let mut services = self.services.lock().await; + services.drive().await + }; + for action in actions { + self.handle_service_action(action).await; + } + } + } + pub fn genesis_dir(&self) -> PathBuf { self.test_dir .path() @@ -179,20 +377,45 @@ impl MockNode { pub fn finalize_and_commit(&self) { let (proposer_address, votes) = self.prepare_request(); - let mut req = FinalizeBlock { - hash: BlockHash([0u8; 32]), - header: Header { - hash: Hash([0; 32]), - time: DateTimeUtc::now(), - next_validators_hash: Hash([0; 32]), - }, - byzantine_validators: vec![], - txs: vec![], - proposer_address, - votes, - }; - req.header.time = DateTimeUtc::now(); let mut locked = self.shell.lock().unwrap(); + + // build finalize block abci request + let req = { + // check if we have protocol txs to be included + // in the finalize block request + let txs = { + let req = RequestPrepareProposal { + proposer_address: proposer_address.clone(), + ..Default::default() + }; + let txs = locked.prepare_proposal(req).txs; + + txs.into_iter() + .map(|tx| ProcessedTx { + tx, + result: TxResult { + code: 0, + info: String::new(), + }, + }) + .collect() + }; + let mut req = FinalizeBlock { + hash: BlockHash([0u8; 32]), + header: Header { + hash: Hash([0; 32]), + time: DateTimeUtc::now(), + next_validators_hash: Hash([0; 32]), + }, + byzantine_validators: vec![], + txs, + proposer_address, + votes, + }; + req.header.time = DateTimeUtc::now(); + req + }; + locked.finalize_block(req).expect("Test failed"); locked.commit(); } @@ -213,19 +436,19 @@ impl MockNode { /// Send a tx through Process Proposal and Finalize Block /// and register the results. - fn submit_tx(&self, tx_bytes: Vec) { - // The block space allocator disallows txs in certain blocks. + fn submit_txs(&self, txs: Vec>) { + // The block space allocator disallows encrypted txs in certain blocks. // Advance to block height that allows txs. self.advance_to_allowed_block(); let (proposer_address, votes) = self.prepare_request(); let req = RequestProcessProposal { - txs: vec![tx_bytes.clone()], + txs: txs.clone(), proposer_address: proposer_address.clone(), ..Default::default() }; let mut locked = self.shell.lock().unwrap(); - let mut result = locked.process_proposal(req); + let result = locked.process_proposal(req); let mut errors: Vec<_> = result .tx_results @@ -252,10 +475,11 @@ impl MockNode { next_validators_hash: Hash([0; 32]), }, byzantine_validators: vec![], - txs: vec![ProcessedTx { - tx: tx_bytes, - result: result.tx_results.remove(0), - }], + txs: txs + .into_iter() + .zip(result.tx_results.into_iter()) + .map(|(tx, result)| ProcessedTx { tx, result }) + .collect(), proposer_address, votes, }; @@ -310,6 +534,7 @@ impl MockNode { } } +// TODO: drive mock services #[cfg_attr(feature = "async-send", async_trait::async_trait)] #[cfg_attr(not(feature = "async-send"), async_trait::async_trait(?Send))] impl<'a> Client for &'a MockNode { @@ -322,6 +547,7 @@ impl<'a> Client for &'a MockNode { height: Option, prove: bool, ) -> std::result::Result { + self.drive_mock_services().await; let rpc = RPC; let data = data.unwrap_or_default(); let latest_height = { @@ -367,6 +593,7 @@ impl<'a> Client for &'a MockNode { /// `/abci_info`: get information about the ABCI application. async fn abci_info(&self) -> Result { + self.drive_mock_services().await; let locked = self.shell.lock().unwrap(); Ok(AbciInfo { data: "Namada".to_string(), @@ -398,6 +625,7 @@ impl<'a> Client for &'a MockNode { tx: namada::tendermint::abci::Transaction, ) -> Result { + self.drive_mock_services().await; let mut resp = tendermint_rpc::endpoint::broadcast::tx_sync::Response { code: Default::default(), data: Default::default(), @@ -405,9 +633,10 @@ impl<'a> Client for &'a MockNode { hash: tendermint::abci::transaction::Hash::new([0; 32]), }; let tx_bytes: Vec = tx.into(); - self.submit_tx(tx_bytes); + self.submit_txs(vec![tx_bytes]); if !self.success() { - resp.code = tendermint::abci::Code::Err(1337); // TODO: submit_tx should return the correct error code + message + // TODO: submit_txs should return the correct error code + message + resp.code = tendermint::abci::Code::Err(1337); return Ok(resp); } else { self.clear_results(); @@ -417,11 +646,13 @@ impl<'a> Client for &'a MockNode { proposer_address, ..Default::default() }; - let tx_bytes = { + let txs = { let locked = self.shell.lock().unwrap(); - locked.prepare_proposal(req).txs.remove(0) + locked.prepare_proposal(req).txs }; - self.submit_tx(tx_bytes); + if !txs.is_empty() { + self.submit_txs(txs); + } Ok(resp) } @@ -434,6 +665,7 @@ impl<'a> Client for &'a MockNode { _order: namada::tendermint_rpc::Order, ) -> Result { + self.drive_mock_services().await; let matcher = parse_tm_query(query); let borrowed = self.shell.lock().unwrap(); // we store an index into the event log as a block @@ -503,6 +735,7 @@ impl<'a> Client for &'a MockNode { where H: Into + Send, { + self.drive_mock_services().await; let height = height.into(); let encoded_event = EncodedEvent(height.value()); let locked = self.shell.lock().unwrap(); @@ -561,6 +794,7 @@ impl<'a> Client for &'a MockNode { /// Returns empty result (200 OK) on success, no response in case of an /// error. async fn health(&self) -> Result<(), RpcError> { + self.drive_mock_services().await; Ok(()) } } diff --git a/tests/src/integration/masp.rs b/tests/src/integration/masp.rs index ecd1b34465..d0a3121ce4 100644 --- a/tests/src/integration/masp.rs +++ b/tests/src/integration/masp.rs @@ -33,7 +33,7 @@ fn masp_incentives() -> Result<()> { // Lengthen epoch to ensure that a transaction can be constructed and // submitted within the same block. Necessary to ensure that conversion is // not invalidated. - let mut node = setup::setup()?; + let (mut node, _services) = setup::setup()?; // Wait till epoch boundary let ep0 = node.next_epoch(); // Send 20 BTC from Albert to PA @@ -767,7 +767,7 @@ fn masp_pinned_txs() -> Result<()> { // Download the shielded pool parameters before starting node let _ = CLIShieldedUtils::new::(PathBuf::new()); - let mut node = setup::setup()?; + let (mut node, _services) = setup::setup()?; // Wait till epoch boundary let _ep0 = node.next_epoch(); @@ -935,7 +935,7 @@ fn masp_txs_and_queries() -> Result<()> { Err(&'static str), } - let mut node = setup::setup()?; + let (mut node, _services) = setup::setup()?; _ = node.next_epoch(); let txs_args = vec![ // 0. Attempt to spend 10 BTC at SK(A) to PA(B) @@ -1230,7 +1230,7 @@ fn masp_txs_and_queries() -> Result<()> { /// 3. Submit a new wrapper with an invalid unshielding tx and assert the /// failure #[test] -fn wrapper_fee_unshielding() { +fn wrapper_fee_unshielding() -> Result<()> { // This address doesn't matter for tests. But an argument is required. let validator_one_rpc = "127.0.0.1:26567"; // Download the shielded pool parameters before starting node @@ -1238,7 +1238,7 @@ fn wrapper_fee_unshielding() { // Lengthen epoch to ensure that a transaction can be constructed and // submitted within the same block. Necessary to ensure that conversion is // not invalidated. - let mut node = setup::setup().unwrap(); + let (mut node, _services) = setup::setup()?; _ = node.next_epoch(); // 1. Shield some tokens @@ -1262,8 +1262,7 @@ fn wrapper_fee_unshielding() { "--ledger-address", validator_one_rpc, ], - ) - .unwrap(); + )?; node.assert_success(); _ = node.next_epoch(); @@ -1288,8 +1287,7 @@ fn wrapper_fee_unshielding() { "--ledger-address", validator_one_rpc, ], - ) - .unwrap(); + )?; node.assert_success(); // 3. Invalid unshielding @@ -1320,4 +1318,5 @@ fn wrapper_fee_unshielding() { .is_err(); assert!(tx_run); + Ok(()) } diff --git a/tests/src/integration/setup.rs b/tests/src/integration/setup.rs index df74c5f6f1..ead4c21a36 100644 --- a/tests/src/integration/setup.rs +++ b/tests/src/integration/setup.rs @@ -11,10 +11,13 @@ use namada_apps::config::genesis::genesis_config::GenesisConfig; use namada_apps::config::TendermintMode; use namada_apps::facade::tendermint::Timeout; use namada_apps::facade::tendermint_proto::google::protobuf::Timestamp; -use namada_apps::node::ledger::shell::testing::node::MockNode; +use namada_apps::node::ledger::shell::testing::node::{ + mock_services, MockNode, MockServicesCfg, MockServicesController, + MockServicesPackage, +}; use namada_apps::node::ledger::shell::testing::utils::TestDir; use namada_apps::node::ledger::shell::Shell; -use namada_core::types::address::Address; +use namada_core::types::address::nam; use namada_core::types::chain::{ChainId, ChainIdPrefix}; use toml::value::Table; @@ -26,14 +29,14 @@ use crate::e2e::setup::{ const ENV_VAR_KEEP_TEMP: &str = "NAMADA_INT_KEEP_TEMP"; /// Setup a network with a single genesis validator node. -pub fn setup() -> Result { +pub fn setup() -> Result<(MockNode, MockServicesController)> { initialize_genesis(|genesis| genesis) } /// Setup folders with genesis, configs, wasm, etc. pub fn initialize_genesis( mut update_genesis: impl FnMut(GenesisConfig) -> GenesisConfig, -) -> Result { +) -> Result<(MockNode, MockServicesController)> { let working_dir = std::fs::canonicalize("..").unwrap(); let keep_temp = match std::env::var(ENV_VAR_KEEP_TEMP) { Ok(val) => val.to_ascii_lowercase() != "false", @@ -81,7 +84,11 @@ pub fn initialize_genesis( }, ); - create_node(test_dir, &genesis, keep_temp) + let services_cfg = MockServicesCfg { + auto_drive_services: true, // TODO: change to false by default + enable_eth_oracle: genesis.ethereum_bridge_params.is_some(), + }; + create_node(test_dir, &genesis, keep_temp, services_cfg) } /// Create a mock ledger node. @@ -89,7 +96,8 @@ fn create_node( base_dir: TestDir, genesis: &GenesisConfig, keep_temp: bool, -) -> Result { + services_cfg: MockServicesCfg, +) -> Result<(MockNode, MockServicesController)> { // look up the chain id from the global file. let chain_id = if let toml::Value::String(chain_id) = toml::from_str::( @@ -119,26 +127,32 @@ fn create_node( ); // instantiate and initialize the ledger node. - let (sender, recv) = tokio::sync::mpsc::unbounded_channel(); + let MockServicesPackage { + auto_drive_services, + services, + shell_handlers, + controller, + } = mock_services(services_cfg); let node = MockNode { shell: Arc::new(Mutex::new(Shell::new( config::Ledger::new( base_dir.path(), chain_id.clone(), - TendermintMode::Validator + TendermintMode::Validator, ), wasm_dir, - sender, - None, + shell_handlers.tx_broadcaster, + shell_handlers.eth_oracle_channels, None, 50 * 1024 * 1024, // 50 kiB 50 * 1024 * 1024, // 50 kiB - Address::from_str("atest1v4ehgw36x3prswzxggunzv6pxqmnvdj9xvcyzvpsggeyvs3cg9qnywf589qnwvfsg5erg3fkl09rg5").unwrap(), + nam(), ))), test_dir: ManuallyDrop::new(base_dir), keep_temp, - _broadcast_recv: recv, + services: Arc::new(tokio::sync::Mutex::new(services)), results: Arc::new(Mutex::new(vec![])), + auto_drive_services, }; let init_req = namada_apps::facade::tower_abci::request::InitChain { time: Some(Timestamp { @@ -159,5 +173,5 @@ fn create_node( locked.commit(); } - Ok(node) + Ok((node, controller)) }