diff --git a/.changelog/unreleased/features/1189-stop-at-height.md b/.changelog/unreleased/features/1189-stop-at-height.md new file mode 100644 index 00000000000..d8df5a6ede3 --- /dev/null +++ b/.changelog/unreleased/features/1189-stop-at-height.md @@ -0,0 +1,5 @@ +- Introduced a new ledger sub-command: `run-until`. Then, at the provided block + height, the node will either halt or suspend. If the chain is suspended, only + the consensus connection is suspended. This means that the node can still be + queried. This is useful for debugging purposes. + ([#1189](https://github.com/anoma/namada/pull/1189)) diff --git a/apps/Cargo.toml b/apps/Cargo.toml index f70783ae217..53e83402a7d 100644 --- a/apps/Cargo.toml +++ b/apps/Cargo.toml @@ -165,4 +165,4 @@ test-log = {version = "0.2.7", default-features = false, features = ["trace"]} tokio-test = "0.4.2" [build-dependencies] -git2 = "0.13.25" +git2 = "0.13.25" \ No newline at end of file diff --git a/apps/src/bin/namada-node/cli.rs b/apps/src/bin/namada-node/cli.rs index a7746a0d718..1c5520671d7 100644 --- a/apps/src/bin/namada-node/cli.rs +++ b/apps/src/bin/namada-node/cli.rs @@ -1,7 +1,7 @@ //! Namada node CLI. use eyre::{Context, Result}; -use namada::types::time::Utc; +use namada::types::time::{DateTimeUtc, Utc}; use namada_apps::cli::{self, cmds}; use namada_apps::node::ledger; @@ -14,23 +14,14 @@ pub fn main() -> Result<()> { cmds::NamadaNode::Ledger(sub) => match sub { cmds::Ledger::Run(cmds::LedgerRun(args)) => { let wasm_dir = ctx.wasm_dir(); - - // Sleep until start time if needed - if let Some(time) = args.0 { - if let Ok(sleep_time) = - time.0.signed_duration_since(Utc::now()).to_std() - { - if !sleep_time.is_zero() { - tracing::info!( - "Waiting ledger start time: {:?}, time left: \ - {:?}", - time, - sleep_time - ); - std::thread::sleep(sleep_time) - } - } - } + sleep_until(args.0); + ledger::run(ctx.config.ledger, wasm_dir); + } + cmds::Ledger::RunUntil(cmds::LedgerRunUntil(args)) => { + let wasm_dir = ctx.wasm_dir(); + sleep_until(args.time); + ctx.config.ledger.shell.action_at_height = + Some(args.action_at_height); ledger::run(ctx.config.ledger, wasm_dir); } cmds::Ledger::Reset(_) => { @@ -68,3 +59,22 @@ pub fn main() -> Result<()> { } Ok(()) } + +/// Sleep until the given start time if necessary. +fn sleep_until(time: Option) { + // Sleep until start time if needed + if let Some(time) = time { + if let Ok(sleep_time) = + time.0.signed_duration_since(Utc::now()).to_std() + { + if !sleep_time.is_zero() { + tracing::info!( + "Waiting ledger start time: {:?}, time left: {:?}", + time, + sleep_time + ); + std::thread::sleep(sleep_time) + } + } + } +} diff --git a/apps/src/lib/cli.rs b/apps/src/lib/cli.rs index 2a524fbe587..cdff4bd8b3a 100644 --- a/apps/src/lib/cli.rs +++ b/apps/src/lib/cli.rs @@ -759,6 +759,7 @@ pub mod cmds { #[derive(Clone, Debug)] pub enum Ledger { Run(LedgerRun), + RunUntil(LedgerRunUntil), Reset(LedgerReset), DumpDb(LedgerDumpDb), RollBack(LedgerRollBack), @@ -773,9 +774,11 @@ pub mod cmds { let reset = SubCmd::parse(matches).map(Self::Reset); let dump_db = SubCmd::parse(matches).map(Self::DumpDb); let rollback = SubCmd::parse(matches).map(Self::RollBack); + let run_until = SubCmd::parse(matches).map(Self::RunUntil); run.or(reset) .or(dump_db) .or(rollback) + .or(run_until) // The `run` command is the default if no sub-command given .or(Some(Self::Run(LedgerRun(args::LedgerRun(None))))) }) @@ -788,6 +791,7 @@ pub mod cmds { defaults to run the node.", ) .subcommand(LedgerRun::def()) + .subcommand(LedgerRunUntil::def()) .subcommand(LedgerReset::def()) .subcommand(LedgerDumpDb::def()) .subcommand(LedgerRollBack::def()) @@ -813,6 +817,28 @@ pub mod cmds { } } + #[derive(Clone, Debug)] + pub struct LedgerRunUntil(pub args::LedgerRunUntil); + + impl SubCmd for LedgerRunUntil { + const CMD: &'static str = "run-until"; + + fn parse(matches: &ArgMatches) -> Option { + matches + .subcommand_matches(Self::CMD) + .map(|matches| Self(args::LedgerRunUntil::parse(matches))) + } + + fn def() -> App { + App::new(Self::CMD) + .about( + "Run Namada ledger node until a given height. Then halt \ + or suspend.", + ) + .add_args::() + } + } + #[derive(Clone, Debug)] pub struct LedgerReset; @@ -1595,7 +1621,7 @@ pub mod args { use namada::types::chain::{ChainId, ChainIdPrefix}; use namada::types::key::*; use namada::types::masp::MaspValue; - use namada::types::storage::{self, Epoch}; + use namada::types::storage::{self, BlockHeight, Epoch}; use namada::types::time::DateTimeUtc; use namada::types::token; use namada::types::transaction::GasLimit; @@ -1605,7 +1631,7 @@ pub mod args { use super::utils::*; use super::{ArgGroup, ArgMatches}; use crate::config; - use crate::config::TendermintMode; + use crate::config::{Action, ActionAtHeight, TendermintMode}; use crate::facade::tendermint::Timeout; use crate::facade::tendermint_config::net::Address as TendermintAddress; @@ -1623,6 +1649,7 @@ pub mod args { Err(_) => config::DEFAULT_BASE_DIR.into(), }), ); + const BLOCK_HEIGHT: Arg = arg("block-height"); // const BLOCK_HEIGHT_OPT: ArgOpt = arg_opt("height"); const BROADCAST_ONLY: ArgFlag = flag("broadcast-only"); const CHAIN_ID: Arg = arg("chain-id"); @@ -1654,6 +1681,7 @@ pub mod args { arg_default_from_ctx("gas-token", DefaultFn(|| "NAM".into())); const GENESIS_PATH: Arg = arg("genesis-path"); const GENESIS_VALIDATOR: ArgOpt = arg("genesis-validator").opt(); + const HALT_ACTION: ArgFlag = flag("halt"); const HISTORIC: ArgFlag = flag("historic"); const LEDGER_ADDRESS_ABOUT: &str = "Address of a ledger node as \"{scheme}://{host}:{port}\". If the \ @@ -1703,6 +1731,7 @@ pub mod args { const SOURCE_OPT: ArgOpt = SOURCE.opt(); const STORAGE_KEY: Arg = arg("storage-key"); const SUB_PREFIX: ArgOpt = arg_opt("sub-prefix"); + const SUSPEND_ACTION: ArgFlag = flag("suspend"); const TIMEOUT_HEIGHT: ArgOpt = arg_opt("timeout-height"); const TIMEOUT_SEC_OFFSET: ArgOpt = arg_opt("timeout-sec-offset"); const TOKEN_OPT: ArgOpt = TOKEN.opt(); @@ -1793,6 +1822,48 @@ pub mod args { } } + #[derive(Clone, Debug)] + pub struct LedgerRunUntil { + pub time: Option, + pub action_at_height: ActionAtHeight, + } + + impl Args for LedgerRunUntil { + fn parse(matches: &ArgMatches) -> Self { + Self { + time: NAMADA_START_TIME.parse(matches), + action_at_height: ActionAtHeight { + height: BLOCK_HEIGHT.parse(matches), + action: if HALT_ACTION.parse(matches) { + Action::Halt + } else { + Action::Suspend + }, + }, + } + } + + fn def(app: App) -> App { + app.arg( + NAMADA_START_TIME + .def() + .about("The start time of the ledger."), + ) + .arg(BLOCK_HEIGHT.def().about("The block height to run until.")) + .arg(HALT_ACTION.def().about("Halt at the given block height")) + .arg( + SUSPEND_ACTION + .def() + .about("Suspend consensus at the given block height"), + ) + .group( + ArgGroup::new("find_flags") + .args(&[HALT_ACTION.name, SUSPEND_ACTION.name]) + .required(true), + ) + } + } + #[derive(Clone, Debug)] pub struct LedgerDumpDb { // TODO: allow to specify height diff --git a/apps/src/lib/config/mod.rs b/apps/src/lib/config/mod.rs index 811289c790a..8c1275718f7 100644 --- a/apps/src/lib/config/mod.rs +++ b/apps/src/lib/config/mod.rs @@ -11,6 +11,7 @@ use std::path::{Path, PathBuf}; use std::str::FromStr; use namada::types::chain::ChainId; +use namada::types::storage::BlockHeight; use namada::types::time::Rfc3339String; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -67,6 +68,27 @@ impl From for TendermintMode { } } +/// An action to be performed at a +/// certain block height. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Action { + /// Stop the chain. + Halt, + /// Suspend consensus indefinitely. + Suspend, +} + +/// An action to be performed at a +/// certain block height along with the +/// given height. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ActionAtHeight { + /// The height at which to take action. + pub height: BlockHeight, + /// The action to take. + pub action: Action, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Ledger { pub genesis_time: Rfc3339String, @@ -95,6 +117,8 @@ pub struct Shell { db_dir: PathBuf, /// Use the [`Ledger::tendermint_dir()`] method to read the value. tendermint_dir: PathBuf, + /// An optional action to take when a given blockheight is reached. + pub action_at_height: Option, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -142,6 +166,7 @@ impl Ledger { storage_read_past_height_limit: Some(3600), db_dir: DB_DIR.into(), tendermint_dir: TENDERMINT_DIR.into(), + action_at_height: None, }, tendermint: Tendermint { rpc_address: SocketAddr::new( diff --git a/apps/src/lib/node/ledger/mod.rs b/apps/src/lib/node/ledger/mod.rs index 7b7d4c5bcaf..810b2eead8e 100644 --- a/apps/src/lib/node/ledger/mod.rs +++ b/apps/src/lib/node/ledger/mod.rs @@ -458,7 +458,7 @@ fn start_abci_broadcaster_shell( let genesis = genesis::genesis(&config.shell.base_dir, &config.chain_id); #[cfg(feature = "dev")] let genesis = genesis::genesis(1); - let (shell, abci_service) = AbcippShim::new( + let (shell, abci_service, service_handle) = AbcippShim::new( config, wasm_dir, broadcaster_sender, @@ -474,8 +474,13 @@ fn start_abci_broadcaster_shell( // Start the ABCI server let abci = spawner .spawn_abortable("ABCI", move |aborter| async move { - let res = - run_abci(abci_service, ledger_address, abci_abort_recv).await; + let res = run_abci( + abci_service, + service_handle, + ledger_address, + abci_abort_recv, + ) + .await; drop(aborter); res @@ -508,6 +513,7 @@ fn start_abci_broadcaster_shell( /// mempool, snapshot, and info. async fn run_abci( abci_service: AbciService, + service_handle: tokio::sync::broadcast::Sender<()>, ledger_address: SocketAddr, abort_recv: tokio::sync::oneshot::Receiver<()>, ) -> shell::Result<()> { @@ -534,13 +540,13 @@ async fn run_abci( ) .finish() .unwrap(); - tokio::select! { // Run the server with the ABCI service status = server.listen(ledger_address) => { status.map_err(|err| Error::TowerServer(err.to_string())) }, resp_sender = abort_recv => { + _ = service_handle.send(()); match resp_sender { Ok(()) => { tracing::info!("Shutting down ABCI server..."); diff --git a/apps/src/lib/node/ledger/shims/abcipp_shim.rs b/apps/src/lib/node/ledger/shims/abcipp_shim.rs index 8bc45ce9774..dff8ab71362 100644 --- a/apps/src/lib/node/ledger/shims/abcipp_shim.rs +++ b/apps/src/lib/node/ledger/shims/abcipp_shim.rs @@ -10,8 +10,10 @@ use namada::types::address::Address; use namada::types::hash::Hash; #[cfg(not(feature = "abcipp"))] use namada::types::storage::BlockHash; +use namada::types::storage::BlockHeight; #[cfg(not(feature = "abcipp"))] use namada::types::transaction::hash_tx; +use tokio::sync::broadcast; use tokio::sync::mpsc::UnboundedSender; use tower::Service; @@ -21,6 +23,7 @@ use super::abcipp_shim_types::shim::request::{FinalizeBlock, ProcessedTx}; use super::abcipp_shim_types::shim::TxBytes; use super::abcipp_shim_types::shim::{Error, Request, Response}; use crate::config; +use crate::config::{Action, ActionAtHeight}; #[cfg(not(feature = "abcipp"))] use crate::facade::tendermint_proto::abci::RequestBeginBlock; use crate::facade::tower_abci::{BoxError, Request as Req, Response as Resp}; @@ -52,10 +55,13 @@ impl AbcippShim { vp_wasm_compilation_cache: u64, tx_wasm_compilation_cache: u64, native_token: Address, - ) -> (Self, AbciService) { + ) -> (Self, AbciService, broadcast::Sender<()>) { // We can use an unbounded channel here, because tower-abci limits the // the number of requests that can come in + let (shell_send, shell_recv) = std::sync::mpsc::channel(); + let (server_shutdown, _) = broadcast::channel::<()>(1); + let action_at_height = config.shell.action_at_height.clone(); ( Self { service: Shell::new( @@ -73,7 +79,13 @@ impl AbcippShim { delivered_txs: vec![], shell_recv, }, - AbciService { shell_send }, + AbciService { + shell_send, + shutdown: server_shutdown.clone(), + action_at_height, + suspended: false, + }, + server_shutdown, ) } @@ -193,12 +205,147 @@ impl AbcippShim { } } +/// Indicates how [`AbciService`] should +/// check whether or not it needs to take +/// action. +#[derive(Debug)] +enum CheckAction { + /// No check necessary. + NoAction, + /// Check a given block height. + Check(i64), + /// The action been taken. + AlreadySuspended, +} + #[derive(Debug)] pub struct AbciService { + /// A channel for forwarding requests to the shell shell_send: std::sync::mpsc::Sender<( Req, tokio::sync::oneshot::Sender>, )>, + /// Indicates if the consensus connection is suspended. + suspended: bool, + /// This resolves the non-completing futures returned to tower-abci + /// during suspension. + shutdown: broadcast::Sender<()>, + /// An action to be taken at a specified block height. + action_at_height: Option, +} + +impl AbciService { + /// Check if we are at a block height with a scheduled action. + /// If so, perform the action. + fn maybe_take_action( + action_at_height: Option, + check: CheckAction, + mut shutdown_recv: broadcast::Receiver<()>, + ) -> (bool, Option<>::Future>) { + let hght = match check { + CheckAction::AlreadySuspended => BlockHeight::from(u64::MAX), + CheckAction::Check(hght) => BlockHeight::from(hght as u64), + CheckAction::NoAction => BlockHeight::default(), + }; + match action_at_height { + Some(ActionAtHeight { + height, + action: Action::Suspend, + }) if height <= hght => { + if height == hght { + tracing::info!( + "Reached block height {}, suspending.", + height + ); + tracing::warn!( + "\x1b[93mThis feature is intended for debugging \ + purposes. Note that on shutdown a spurious panic \ + message will be produced.\x1b[0m" + ) + } + ( + true, + Some( + async move { + shutdown_recv.recv().await.unwrap(); + Err(BoxError::from( + "Not all tendermint responses were processed. \ + If the `--suspended` flag was passed, you \ + may ignore this error.", + )) + } + .boxed(), + ), + ) + } + Some(ActionAtHeight { + height, + action: Action::Halt, + }) if height == hght => { + tracing::info!( + "Reached block height {}, halting the chain.", + height + ); + ( + false, + Some( + async move { + Err(BoxError::from(format!( + "Reached block height {}, halting the chain.", + height + ))) + } + .boxed(), + ), + ) + } + _ => (false, None), + } + } + + /// If we are not taking special action for this request, + /// forward it normally. + fn forward_request(&mut self, req: Req) -> >::Future { + let (resp_send, recv) = tokio::sync::oneshot::channel(); + let result = self.shell_send.send((req, resp_send)); + + async move { + if let Err(err) = result { + // The shell has shut-down + return Err(err.into()); + } + match recv.await { + Ok(resp) => resp, + Err(err) => { + tracing::info!("ABCI response channel didn't respond"); + Err(err.into()) + } + } + } + .boxed() + } + + /// Given the type of request, determine if we need to check + /// to possibly take an action. + fn get_action(&self, req: &Req) -> Option { + match req { + Req::PrepareProposal(req) => Some(CheckAction::Check(req.height)), + Req::ProcessProposal(req) => Some(CheckAction::Check(req.height)), + Req::EndBlock(req) => Some(CheckAction::Check(req.height)), + Req::BeginBlock(_) + | Req::DeliverTx(_) + | Req::InitChain(_) + | Req::CheckTx(_) + | Req::Commit(_) => { + if self.suspended { + Some(CheckAction::AlreadySuspended) + } else { + Some(CheckAction::NoAction) + } + } + _ => None, + } + } } /// The ABCI tower service implementation sends and receives messages to and @@ -218,23 +365,17 @@ impl Service for AbciService { } fn call(&mut self, req: Req) -> Self::Future { - let (resp_send, recv) = tokio::sync::oneshot::channel(); - let result = self.shell_send.send((req, resp_send)); - Box::pin( - async move { - if let Err(err) = result { - // The shell has shut-down - return Err(err.into()); - } - match recv.await { - Ok(resp) => resp, - Err(err) => { - tracing::info!("ABCI response channel didn't respond"); - Err(err.into()) - } - } - } - .boxed(), - ) + let action = self.get_action(&req); + if let Some(action) = action { + let (suspended, fut) = Self::maybe_take_action( + self.action_at_height.clone(), + action, + self.shutdown.subscribe(), + ); + self.suspended = suspended; + fut.unwrap_or_else(|| self.forward_request(req)) + } else { + self.forward_request(req) + } } } diff --git a/tests/src/e2e/ledger_tests.rs b/tests/src/e2e/ledger_tests.rs index 7dc7acd67dd..e566bfd793b 100644 --- a/tests/src/e2e/ledger_tests.rs +++ b/tests/src/e2e/ledger_tests.rs @@ -267,6 +267,75 @@ fn run_ledger_load_state_and_reset() -> Result<()> { Ok(()) } +/// In this test we +/// 1. Run the ledger node until a pre-configured height, +/// at which point it should suspend. +/// 2. Check that we can still query the ledger. +/// 3. Check that we can shutdown the ledger normally afterwards. +#[test] +fn suspend_ledger() -> Result<()> { + let test = setup::single_node_net()?; + // 1. Run the ledger node + let mut ledger = run_as!( + test, + Who::Validator(0), + Bin::Node, + &["ledger", "run-until", "--block-height", "2", "--suspend",], + Some(40) + )?; + + ledger.exp_string("Namada ledger node started")?; + // There should be no previous state + ledger.exp_string("No state could be found")?; + // Wait to commit a block + ledger.exp_regex(r"Committed block hash.*, height: [0-9]+")?; + ledger.exp_string("Reached block height 2, suspending.")?; + let bg_ledger = ledger.background(); + + // 2. Query the ledger + let validator_one_rpc = get_actor_rpc(&test, &Who::Validator(0)); + let mut client = run!( + test, + Bin::Client, + &["epoch", "--ledger-address", &validator_one_rpc], + Some(40) + )?; + client.exp_string("Last committed epoch: 0")?; + + // 3. Shut it down + let mut ledger = bg_ledger.foreground(); + ledger.send_control('c')?; + // Wait for the node to stop running to finish writing the state and tx + // queue + ledger.exp_string("Namada ledger node has shut down.")?; + ledger.exp_eof()?; + Ok(()) +} + +/// Test that if we configure the ledger to +/// halt at a given height, it does indeed halt. +#[test] +fn stop_ledger_at_height() -> Result<()> { + let test = setup::single_node_net()?; + // 1. Run the ledger node + let mut ledger = run_as!( + test, + Who::Validator(0), + Bin::Node, + &["ledger", "run-until", "--block-height", "2", "--halt",], + Some(40) + )?; + + ledger.exp_string("Namada ledger node started")?; + // There should be no previous state + ledger.exp_string("No state could be found")?; + // Wait to commit a block + ledger.exp_regex(r"Committed block hash.*, height: [0-9]+")?; + ledger.exp_string("Reached block height 2, halting the chain.")?; + ledger.exp_eof()?; + Ok(()) +} + /// In this test we: /// 1. Run the ledger node /// 2. Submit a token transfer tx