Skip to content

Commit

Permalink
Merge branch 'bat/feature/stop-at-height' (#1189)
Browse files Browse the repository at this point in the history
* bat/feature/stop-at-height:
  changelog: #1189
  [fix]: Suspending and halting now works for syncing the chain
  [fix]: Addressing review comments
  [fix]: Removed double boxing of futures
  Update apps/src/lib/node/ledger/shims/abcipp_shim.rs
  [feat]: Added e2e tests covering the new features
  [feat]: Fixed graceful shutdown after suspension. Added more logging
  [temp]: Trying to shutdown ledger gracefully post-suspension
  [fix]: Fixed a lifetimes issue in the futures
  [feat]: Refactoring suspend to allow queries other than consensus to remain responsive
  [feat]: Add commands to halt/suspend chain at specified block height
  • Loading branch information
tzemanovic committed Apr 11, 2023
2 parents 2813b24 + f5bd287 commit 5edb5b8
Show file tree
Hide file tree
Showing 8 changed files with 372 additions and 45 deletions.
5 changes: 5 additions & 0 deletions .changelog/unreleased/features/1189-stop-at-height.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
- Introduced a new ledger sub-command: `run-until`. Then, at the provided block
height, the node will either halt or suspend. If the chain is suspended, only
the consensus connection is suspended. This means that the node can still be
queried. This is useful for debugging purposes.
([#1189](https://github.com/anoma/namada/pull/1189))
2 changes: 1 addition & 1 deletion apps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,4 @@ test-log = {version = "0.2.7", default-features = false, features = ["trace"]}
tokio-test = "0.4.2"

[build-dependencies]
git2 = "0.13.25"
git2 = "0.13.25"
46 changes: 28 additions & 18 deletions apps/src/bin/namada-node/cli.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Namada node CLI.
use eyre::{Context, Result};
use namada::types::time::Utc;
use namada::types::time::{DateTimeUtc, Utc};
use namada_apps::cli::{self, cmds};
use namada_apps::node::ledger;

Expand All @@ -14,23 +14,14 @@ pub fn main() -> Result<()> {
cmds::NamadaNode::Ledger(sub) => match sub {
cmds::Ledger::Run(cmds::LedgerRun(args)) => {
let wasm_dir = ctx.wasm_dir();

// Sleep until start time if needed
if let Some(time) = args.0 {
if let Ok(sleep_time) =
time.0.signed_duration_since(Utc::now()).to_std()
{
if !sleep_time.is_zero() {
tracing::info!(
"Waiting ledger start time: {:?}, time left: \
{:?}",
time,
sleep_time
);
std::thread::sleep(sleep_time)
}
}
}
sleep_until(args.0);
ledger::run(ctx.config.ledger, wasm_dir);
}
cmds::Ledger::RunUntil(cmds::LedgerRunUntil(args)) => {
let wasm_dir = ctx.wasm_dir();
sleep_until(args.time);
ctx.config.ledger.shell.action_at_height =
Some(args.action_at_height);
ledger::run(ctx.config.ledger, wasm_dir);
}
cmds::Ledger::Reset(_) => {
Expand Down Expand Up @@ -68,3 +59,22 @@ pub fn main() -> Result<()> {
}
Ok(())
}

/// Sleep until the given start time if necessary.
fn sleep_until(time: Option<DateTimeUtc>) {
// Sleep until start time if needed
if let Some(time) = time {
if let Ok(sleep_time) =
time.0.signed_duration_since(Utc::now()).to_std()
{
if !sleep_time.is_zero() {
tracing::info!(
"Waiting ledger start time: {:?}, time left: {:?}",
time,
sleep_time
);
std::thread::sleep(sleep_time)
}
}
}
}
75 changes: 73 additions & 2 deletions apps/src/lib/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ pub mod cmds {
#[derive(Clone, Debug)]
pub enum Ledger {
Run(LedgerRun),
RunUntil(LedgerRunUntil),
Reset(LedgerReset),
DumpDb(LedgerDumpDb),
RollBack(LedgerRollBack),
Expand All @@ -773,9 +774,11 @@ pub mod cmds {
let reset = SubCmd::parse(matches).map(Self::Reset);
let dump_db = SubCmd::parse(matches).map(Self::DumpDb);
let rollback = SubCmd::parse(matches).map(Self::RollBack);
let run_until = SubCmd::parse(matches).map(Self::RunUntil);
run.or(reset)
.or(dump_db)
.or(rollback)
.or(run_until)
// The `run` command is the default if no sub-command given
.or(Some(Self::Run(LedgerRun(args::LedgerRun(None)))))
})
Expand All @@ -788,6 +791,7 @@ pub mod cmds {
defaults to run the node.",
)
.subcommand(LedgerRun::def())
.subcommand(LedgerRunUntil::def())
.subcommand(LedgerReset::def())
.subcommand(LedgerDumpDb::def())
.subcommand(LedgerRollBack::def())
Expand All @@ -813,6 +817,28 @@ pub mod cmds {
}
}

#[derive(Clone, Debug)]
pub struct LedgerRunUntil(pub args::LedgerRunUntil);

impl SubCmd for LedgerRunUntil {
const CMD: &'static str = "run-until";

fn parse(matches: &ArgMatches) -> Option<Self> {
matches
.subcommand_matches(Self::CMD)
.map(|matches| Self(args::LedgerRunUntil::parse(matches)))
}

fn def() -> App {
App::new(Self::CMD)
.about(
"Run Namada ledger node until a given height. Then halt \
or suspend.",
)
.add_args::<args::LedgerRunUntil>()
}
}

#[derive(Clone, Debug)]
pub struct LedgerReset;

Expand Down Expand Up @@ -1595,7 +1621,7 @@ pub mod args {
use namada::types::chain::{ChainId, ChainIdPrefix};
use namada::types::key::*;
use namada::types::masp::MaspValue;
use namada::types::storage::{self, Epoch};
use namada::types::storage::{self, BlockHeight, Epoch};
use namada::types::time::DateTimeUtc;
use namada::types::token;
use namada::types::transaction::GasLimit;
Expand All @@ -1605,7 +1631,7 @@ pub mod args {
use super::utils::*;
use super::{ArgGroup, ArgMatches};
use crate::config;
use crate::config::TendermintMode;
use crate::config::{Action, ActionAtHeight, TendermintMode};
use crate::facade::tendermint::Timeout;
use crate::facade::tendermint_config::net::Address as TendermintAddress;

Expand All @@ -1623,6 +1649,7 @@ pub mod args {
Err(_) => config::DEFAULT_BASE_DIR.into(),
}),
);
const BLOCK_HEIGHT: Arg<BlockHeight> = arg("block-height");
// const BLOCK_HEIGHT_OPT: ArgOpt<BlockHeight> = arg_opt("height");
const BROADCAST_ONLY: ArgFlag = flag("broadcast-only");
const CHAIN_ID: Arg<ChainId> = arg("chain-id");
Expand Down Expand Up @@ -1654,6 +1681,7 @@ pub mod args {
arg_default_from_ctx("gas-token", DefaultFn(|| "NAM".into()));
const GENESIS_PATH: Arg<PathBuf> = arg("genesis-path");
const GENESIS_VALIDATOR: ArgOpt<String> = arg("genesis-validator").opt();
const HALT_ACTION: ArgFlag = flag("halt");
const HISTORIC: ArgFlag = flag("historic");
const LEDGER_ADDRESS_ABOUT: &str =
"Address of a ledger node as \"{scheme}://{host}:{port}\". If the \
Expand Down Expand Up @@ -1703,6 +1731,7 @@ pub mod args {
const SOURCE_OPT: ArgOpt<WalletAddress> = SOURCE.opt();
const STORAGE_KEY: Arg<storage::Key> = arg("storage-key");
const SUB_PREFIX: ArgOpt<String> = arg_opt("sub-prefix");
const SUSPEND_ACTION: ArgFlag = flag("suspend");
const TIMEOUT_HEIGHT: ArgOpt<u64> = arg_opt("timeout-height");
const TIMEOUT_SEC_OFFSET: ArgOpt<u64> = arg_opt("timeout-sec-offset");
const TOKEN_OPT: ArgOpt<WalletAddress> = TOKEN.opt();
Expand Down Expand Up @@ -1793,6 +1822,48 @@ pub mod args {
}
}

#[derive(Clone, Debug)]
pub struct LedgerRunUntil {
pub time: Option<DateTimeUtc>,
pub action_at_height: ActionAtHeight,
}

impl Args for LedgerRunUntil {
fn parse(matches: &ArgMatches) -> Self {
Self {
time: NAMADA_START_TIME.parse(matches),
action_at_height: ActionAtHeight {
height: BLOCK_HEIGHT.parse(matches),
action: if HALT_ACTION.parse(matches) {
Action::Halt
} else {
Action::Suspend
},
},
}
}

fn def(app: App) -> App {
app.arg(
NAMADA_START_TIME
.def()
.about("The start time of the ledger."),
)
.arg(BLOCK_HEIGHT.def().about("The block height to run until."))
.arg(HALT_ACTION.def().about("Halt at the given block height"))
.arg(
SUSPEND_ACTION
.def()
.about("Suspend consensus at the given block height"),
)
.group(
ArgGroup::new("find_flags")
.args(&[HALT_ACTION.name, SUSPEND_ACTION.name])
.required(true),
)
}
}

#[derive(Clone, Debug)]
pub struct LedgerDumpDb {
// TODO: allow to specify height
Expand Down
25 changes: 25 additions & 0 deletions apps/src/lib/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::path::{Path, PathBuf};
use std::str::FromStr;

use namada::types::chain::ChainId;
use namada::types::storage::BlockHeight;
use namada::types::time::Rfc3339String;
use serde::{Deserialize, Serialize};
use thiserror::Error;
Expand Down Expand Up @@ -67,6 +68,27 @@ impl From<String> for TendermintMode {
}
}

/// An action to be performed at a
/// certain block height.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Action {
/// Stop the chain.
Halt,
/// Suspend consensus indefinitely.
Suspend,
}

/// An action to be performed at a
/// certain block height along with the
/// given height.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActionAtHeight {
/// The height at which to take action.
pub height: BlockHeight,
/// The action to take.
pub action: Action,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Ledger {
pub genesis_time: Rfc3339String,
Expand Down Expand Up @@ -95,6 +117,8 @@ pub struct Shell {
db_dir: PathBuf,
/// Use the [`Ledger::tendermint_dir()`] method to read the value.
tendermint_dir: PathBuf,
/// An optional action to take when a given blockheight is reached.
pub action_at_height: Option<ActionAtHeight>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -142,6 +166,7 @@ impl Ledger {
storage_read_past_height_limit: Some(3600),
db_dir: DB_DIR.into(),
tendermint_dir: TENDERMINT_DIR.into(),
action_at_height: None,
},
tendermint: Tendermint {
rpc_address: SocketAddr::new(
Expand Down
14 changes: 10 additions & 4 deletions apps/src/lib/node/ledger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ fn start_abci_broadcaster_shell(
let genesis = genesis::genesis(&config.shell.base_dir, &config.chain_id);
#[cfg(feature = "dev")]
let genesis = genesis::genesis(1);
let (shell, abci_service) = AbcippShim::new(
let (shell, abci_service, service_handle) = AbcippShim::new(
config,
wasm_dir,
broadcaster_sender,
Expand All @@ -474,8 +474,13 @@ fn start_abci_broadcaster_shell(
// Start the ABCI server
let abci = spawner
.spawn_abortable("ABCI", move |aborter| async move {
let res =
run_abci(abci_service, ledger_address, abci_abort_recv).await;
let res = run_abci(
abci_service,
service_handle,
ledger_address,
abci_abort_recv,
)
.await;

drop(aborter);
res
Expand Down Expand Up @@ -508,6 +513,7 @@ fn start_abci_broadcaster_shell(
/// mempool, snapshot, and info.
async fn run_abci(
abci_service: AbciService,
service_handle: tokio::sync::broadcast::Sender<()>,
ledger_address: SocketAddr,
abort_recv: tokio::sync::oneshot::Receiver<()>,
) -> shell::Result<()> {
Expand All @@ -534,13 +540,13 @@ async fn run_abci(
)
.finish()
.unwrap();

tokio::select! {
// Run the server with the ABCI service
status = server.listen(ledger_address) => {
status.map_err(|err| Error::TowerServer(err.to_string()))
},
resp_sender = abort_recv => {
_ = service_handle.send(());
match resp_sender {
Ok(()) => {
tracing::info!("Shutting down ABCI server...");
Expand Down
Loading

0 comments on commit 5edb5b8

Please sign in to comment.