Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ListSnapshots and LoadSnapshotChunk methods #3386

Merged
merged 11 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changelog/unreleased/improvements/3383-add-db-snapshots.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
* Resolves the first two points of Issue [\#3307](https://github.com/anoma/namada/issues/3307):
- Add the ability to create chunkable snapshots to our rocksdb implementation
- Spawn a background task to create snapshots are certain blockheights

Specifically adds a config parameter that indicates after how many blocks a
snapshot should be created. If set, then on the corresponding calls to commit,
a background task is spun up that takes a snapshot of rocksDB and writes it
in a convenient format to a file. This file contains metadata of how to be
broken up into chunks. Once a new snapshot is created, older snapshots are
removed. ([\#3383](https://github.com/anoma/namada/pull/3383))
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
- Addresses the third point and part of the fourth point of Issue
[\#3307](https://github.com/anoma/namada/issues/3307)
* Adds chunking logic to snapshots
* Implements the `ListSnapshots` ABCI call
* Implements the `LoadSnapshotChunk` ABCI call

([\#3386](https://github.com/anoma/namada/pull/3386))
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions crates/apps_lib/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod utils;

use std::fs::{create_dir_all, File};
use std::io::Write;
use std::num::NonZeroU64;
use std::path::{Path, PathBuf};

use directories::ProjectDirs;
Expand Down Expand Up @@ -119,6 +120,9 @@ pub struct Shell {
pub action_at_height: Option<ActionAtHeight>,
/// Specify if tendermint is started as validator, fullnode or seednode
pub tendermint_mode: TendermintMode,
/// When set, indicates after how many blocks a new snapshot
/// will be taken (counting from the first block)
pub blocks_between_snapshots: Option<NonZeroU64>,
}

impl Ledger {
Expand Down Expand Up @@ -147,6 +151,7 @@ impl Ledger {
cometbft_dir: COMETBFT_DIR.into(),
action_at_height: None,
tendermint_mode: mode,
blocks_between_snapshots: None,
},
cometbft: tendermint_config,
ethereum_bridge: ethereum_bridge::ledger::Config::default(),
Expand Down
13 changes: 13 additions & 0 deletions crates/core/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@ impl DbColFam {
DbColFam::REPLAYPROT => REPLAY_PROTECTION_CF,
}
}

/// Return an array of all column families
pub fn all() -> [&'static str; 6] {
[
SUBSPACE_CF,
BLOCK_CF,
STATE_CF,
DIFFS_CF,
ROLLBACK_CF,
REPLAY_PROTECTION_CF,
]
}
}

impl FromStr for DbColFam {
Expand All @@ -145,6 +157,7 @@ impl FromStr for DbColFam {
}
}
}

/// Transaction index within block.
#[derive(
Default,
Expand Down
1 change: 1 addition & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ namada_test_utils = {path = "../test_utils", optional = true}

arse-merkle-tree = { workspace = true, features = ["blake2b"] }
async-trait.workspace = true
base64.workspace = true
blake2b-rs.workspace = true
borsh.workspace = true
borsh-ext.workspace = true
Expand Down
10 changes: 5 additions & 5 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl Shell {
}
Request::Commit => {
tracing::debug!("Request Commit");
Ok(Response::Commit(self.commit()))
Ok(self.commit())
}
Request::Flush => Ok(Response::Flush),
Request::Echo(msg) => Ok(Response::Echo(response::Echo {
Expand All @@ -153,14 +153,14 @@ impl Shell {
Ok(Response::CheckTx(self.mempool_validate(&tx.tx, r#type)))
}
Request::ListSnapshots => {
Ok(Response::ListSnapshots(Default::default()))
self.list_snapshots().map(Response::ListSnapshots)
}
Request::OfferSnapshot(_) => {
Ok(Response::OfferSnapshot(Default::default()))
}
Request::LoadSnapshotChunk(_) => {
Ok(Response::LoadSnapshotChunk(Default::default()))
}
Request::LoadSnapshotChunk(req) => self
.load_snapshot_chunk(req)
.map(Response::LoadSnapshotChunk),
Request::ApplySnapshotChunk(_) => {
Ok(Response::ApplySnapshotChunk(Default::default()))
}
Expand Down
42 changes: 35 additions & 7 deletions crates/node/src/shell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod prepare_proposal;
use namada::state::State;
pub mod process_proposal;
pub(super) mod queries;
mod snapshots;
mod stats;
#[cfg(any(test, feature = "testing"))]
#[allow(dead_code)]
Expand All @@ -25,6 +26,7 @@ mod vote_extensions;

use std::cell::RefCell;
use std::collections::BTreeSet;
use std::num::NonZeroU64;
use std::path::{Path, PathBuf};
#[allow(unused_imports)]
use std::rc::Rc;
Expand Down Expand Up @@ -77,6 +79,7 @@ use crate::facade::tendermint::{self, validator};
use crate::facade::tendermint_proto::v0_37::crypto::public_key;
use crate::shims::abcipp_shim_types::shim;
use crate::shims::abcipp_shim_types::shim::response::TxResult;
use crate::shims::abcipp_shim_types::shim::TakeSnapshot;
use crate::{storage, tendermint_node};

fn key_to_tendermint(
Expand Down Expand Up @@ -120,6 +123,8 @@ pub enum Error {
Storage(#[from] namada::state::StorageError),
#[error("Transaction replay attempt: {0}")]
ReplayAttempt(String),
#[error("Error with snapshots: {0}")]
Snapshot(std::io::Error),
}

impl From<Error> for TxResult {
Expand Down Expand Up @@ -355,6 +360,9 @@ where
event_log: EventLog,
/// A migration that can be scheduled at a given block height
pub scheduled_migration: Option<ScheduledMigration<D::Migrator>>,
/// When set, indicates after how many blocks a new snapshot
/// will be taken (counting from the first block)
pub blocks_between_snapshots: Option<NonZeroU64>,
}

/// Storage key filter to store the diffs into the storage. Return `false` for
Expand Down Expand Up @@ -548,6 +556,7 @@ where
// TODO(namada#3237): config event log params
event_log: EventLog::default(),
scheduled_migration,
blocks_between_snapshots: config.shell.blocks_between_snapshots,
};
shell.update_eth_oracle(&Default::default());
shell
Expand Down Expand Up @@ -659,7 +668,7 @@ where

/// Commit a block. Persist the application state and return the Merkle root
/// hash.
pub fn commit(&mut self) -> response::Commit {
pub fn commit(&mut self) -> shim::Response {
self.bump_last_processed_eth_block();

self.state
Expand All @@ -678,13 +687,32 @@ where
);

self.broadcast_queued_txs();
let take_snapshot = self.check_snapshot_required();

shim::Response::Commit(
response::Commit {
// NB: by passing 0, we forbid CometBFT from deleting
// data pertaining to past blocks
retain_height: tendermint::block::Height::from(0_u32),
// NB: current application hash
data: merkle_root.0.to_vec().into(),
},
take_snapshot,
)
}

response::Commit {
// NB: by passing 0, we forbid CometBFT from deleting
// data pertaining to past blocks
retain_height: tendermint::block::Height::from(0_u32),
// NB: current application hash
data: merkle_root.0.to_vec().into(),
/// Check if we have reached a block height at which we should take a
/// snapshot
fn check_snapshot_required(&self) -> TakeSnapshot {
let committed_height = self.state.in_mem().get_last_block_height();
let take_snapshot = match self.blocks_between_snapshots {
Some(b) => committed_height.0 % b == 0,
_ => false,
};
if take_snapshot {
self.state.db().path().into()
} else {
TakeSnapshot::No
}
}

Expand Down
59 changes: 59 additions & 0 deletions crates/node/src/shell/snapshots.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use borsh_ext::BorshSerializeExt;
use namada::hash::{Hash, Sha256Hasher};
use namada::state::BlockHeight;

use super::{Error, Result};
use crate::facade::tendermint::abci::types::Snapshot;
use crate::facade::tendermint::v0_37::abci::{
request as tm_request, response as tm_response,
};
use crate::shell::Shell;
use crate::storage;
use crate::storage::{DbSnapshot, SnapshotMetadata};

impl Shell<storage::PersistentDB, Sha256Hasher> {
/// List the snapshot files held locally. Furthermore, the number
/// of chunks, as hash of each chunk, and a hash of the chunk
/// metadata are provided so that syncing nodes can verify can verify
/// snapshots they receive.
pub fn list_snapshots(&self) -> Result<tm_response::ListSnapshots> {
if self.blocks_between_snapshots.is_none() {
Ok(Default::default())
} else {
let snapshots = DbSnapshot::files(&self.base_dir)
.map_err(Error::Snapshot)?
.into_iter()
.map(|SnapshotMetadata { height, chunks, .. }| {
let hash = Hash::sha256(chunks.serialize_to_vec()).0;
Snapshot {
height: u32::try_from(height.0).unwrap().into(),
format: 0,
#[allow(clippy::cast_possible_truncation)]
chunks: chunks.len() as u32,
hash: hash.into_iter().collect(),
metadata: Default::default(),
}
})
.collect();

Ok(tm_response::ListSnapshots { snapshots })
}
}

/// Load the bytes of a requested chunk and return them
/// to cometbft.
pub fn load_snapshot_chunk(
&self,
req: tm_request::LoadSnapshotChunk,
) -> Result<tm_response::LoadSnapshotChunk> {
let chunk = DbSnapshot::load_chunk(
BlockHeight(req.height.into()),
u64::from(req.chunk),
&self.base_dir,
)
.map_err(Error::Snapshot)?;
Ok(tm_response::LoadSnapshotChunk {
chunk: chunk.into_iter().collect(),
})
}
}
72 changes: 71 additions & 1 deletion crates/node/src/shims/abcipp_shim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use namada::core::hash::Hash;
use namada::core::key::tm_raw_hash_to_string;
use namada::core::storage::BlockHeight;
use namada::proof_of_stake::storage::find_validator_by_raw_hash;
use namada::state::DB;
use namada::time::{DateTimeUtc, Utc};
use namada::tx::data::hash_tx;
use namada_sdk::migrations::ScheduledMigration;
Expand All @@ -16,14 +17,17 @@ use tokio::sync::mpsc::UnboundedSender;
use tower::Service;

use super::abcipp_shim_types::shim::request::{FinalizeBlock, ProcessedTx};
use super::abcipp_shim_types::shim::{Error, Request, Response, TxBytes};
use super::abcipp_shim_types::shim::{
Error, Request, Response, TakeSnapshot, TxBytes,
};
use crate::config;
use crate::config::{Action, ActionAtHeight};
use crate::facade::tendermint::v0_37::abci::{
request, Request as Req, Response as Resp,
};
use crate::facade::tower_abci::BoxError;
use crate::shell::{EthereumOracleChannels, Shell};
use crate::storage::DbSnapshot;

/// The shim wraps the shell, which implements ABCI++.
/// The shim makes a crude translation between the ABCI interface currently used
Expand All @@ -37,6 +41,7 @@ pub struct AbcippShim {
Req,
tokio::sync::oneshot::Sender<Result<Resp, BoxError>>,
)>,
snapshot_task: Option<std::thread::JoinHandle<Result<(), std::io::Error>>>,
}

impl AbcippShim {
Expand Down Expand Up @@ -74,6 +79,7 @@ impl AbcippShim {
begin_block_request: None,
delivered_txs: vec![],
shell_recv,
snapshot_task: None,
},
AbciService {
shell_send,
Expand Down Expand Up @@ -160,6 +166,14 @@ impl AbcippShim {
_ => Err(Error::ConvertResp(res)),
})
}
Req::Commit => match self.service.call(Request::Commit) {
Ok(Response::Commit(res, take_snapshot)) => {
self.update_snapshot_task(take_snapshot);
Ok(Resp::Commit(res))
}
Ok(resp) => Err(Error::ConvertResp(resp)),
Err(e) => Err(Error::Shell(e)),
},
_ => match Request::try_from(req.clone()) {
Ok(request) => self
.service
Expand All @@ -177,6 +191,62 @@ impl AbcippShim {
}
}
}

fn update_snapshot_task(&mut self, take_snapshot: TakeSnapshot) {
let snapshot_taken = self
.snapshot_task
.as_ref()
.map(|t| t.is_finished())
.unwrap_or_default();
if snapshot_taken {
let task = self.snapshot_task.take().unwrap();
match task.join() {
Ok(Err(e)) => tracing::error!(
"Failed to create snapshot with error: {:?}",
e
),
Err(e) => tracing::error!(
"Failed to join thread creating snapshot: {:?}",
e
),
_ => {}
}
}
let TakeSnapshot::Yes(db_path) = take_snapshot else {
return;
};
let base_dir = self.service.base_dir.clone();

let (snap_send, snap_recv) = tokio::sync::oneshot::channel();
let snapshot_task = std::thread::spawn(move || {
let db = crate::storage::open(db_path, true, None)
.expect("Could not open DB");
let snapshot = db.snapshot();
// signal to main thread that the snapshot has finished
snap_send.send(()).unwrap();

let last_height = db
.read_last_block()
.expect("Could not read database")
.expect("Last block should exists")
.height;
let cfs = db.column_families();
snapshot.write_to_file(cfs, base_dir.clone(), last_height)?;
DbSnapshot::cleanup(last_height, &base_dir)
});

// it's important that the thread is
// blocked until the snapshot is created so that no writes
// happen to the db while snapshotting. We want the db frozen
// at this specific point in time.
if snap_recv.blocking_recv().is_err() {
tracing::error!("Failed to start snapshot task.")
} else {
// N.B. If a task is still running, it will continue
// in the background but we will forget about it.
self.snapshot_task.replace(snapshot_task);
}
}
}

/// Indicates how [`AbciService`] should
Expand Down
Loading
Loading