From 55818783eefd44c04a5c510d0779f7e8115ce0d3 Mon Sep 17 00:00:00 2001 From: satan Date: Fri, 24 May 2024 09:52:55 +0200 Subject: [PATCH 01/11] WIP --- crates/node/src/lib.rs | 2 +- crates/node/src/shell/mod.rs | 19 ++++---- crates/node/src/shims/abcipp_shim.rs | 53 ++++++++++++++++++++- crates/node/src/shims/abcipp_shim_types.rs | 23 ++++++++- crates/node/src/storage/mod.rs | 1 + crates/node/src/storage/rocksdb.rs | 55 +++++++++++++++++----- crates/storage/src/db.rs | 6 +++ 7 files changed, 135 insertions(+), 24 deletions(-) diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index f210bb8a71..89e9e3fcdb 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -138,7 +138,7 @@ impl Shell { } Request::Commit => { tracing::debug!("Request Commit"); - Ok(Response::Commit(self.commit())) + Ok(self.commit()) } Request::Flush => Ok(Response::Flush), Request::Echo(msg) => Ok(Response::Echo(response::Echo { diff --git a/crates/node/src/shell/mod.rs b/crates/node/src/shell/mod.rs index 2ace0f620a..e40acb63db 100644 --- a/crates/node/src/shell/mod.rs +++ b/crates/node/src/shell/mod.rs @@ -659,7 +659,7 @@ where /// Commit a block. Persist the application state and return the Merkle root /// hash. - pub fn commit(&mut self) -> response::Commit { + pub fn commit(&mut self) -> shim::Response { self.bump_last_processed_eth_block(); self.state @@ -679,13 +679,16 @@ where self.broadcast_queued_txs(); - response::Commit { - // NB: by passing 0, we forbid CometBFT from deleting - // data pertaining to past blocks - retain_height: tendermint::block::Height::from(0_u32), - // NB: current application hash - data: merkle_root.0.to_vec().into(), - } + shim::Response::Commit( + response::Commit { + // NB: by passing 0, we forbid CometBFT from deleting + // data pertaining to past blocks + retain_height: tendermint::block::Height::from(0_u32), + // NB: current application hash + data: merkle_root.0.to_vec().into(), + }, + self.state.db().path().into(), + ) } /// Updates the Ethereum oracle's last processed block. diff --git a/crates/node/src/shims/abcipp_shim.rs b/crates/node/src/shims/abcipp_shim.rs index 1ba7b2c50f..55bbcc6c52 100644 --- a/crates/node/src/shims/abcipp_shim.rs +++ b/crates/node/src/shims/abcipp_shim.rs @@ -16,7 +16,9 @@ use tokio::sync::mpsc::UnboundedSender; use tower::Service; use super::abcipp_shim_types::shim::request::{FinalizeBlock, ProcessedTx}; -use super::abcipp_shim_types::shim::{Error, Request, Response, TxBytes}; +use super::abcipp_shim_types::shim::{ + Error, Request, Response, TakeSnapshot, TxBytes, +}; use crate::config; use crate::config::{Action, ActionAtHeight}; use crate::facade::tendermint::v0_37::abci::{ @@ -24,6 +26,7 @@ use crate::facade::tendermint::v0_37::abci::{ }; use crate::facade::tower_abci::BoxError; use crate::shell::{EthereumOracleChannels, Shell}; +use crate::storage::SnapshotCreator; /// The shim wraps the shell, which implements ABCI++. /// The shim makes a crude translation between the ABCI interface currently used @@ -37,6 +40,7 @@ pub struct AbcippShim { Req, tokio::sync::oneshot::Sender>, )>, + snapshot_task: Option>>, } impl AbcippShim { @@ -74,6 +78,7 @@ impl AbcippShim { begin_block_request: None, delivered_txs: vec![], shell_recv, + snapshot_task: None, }, AbciService { shell_send, @@ -160,6 +165,14 @@ impl AbcippShim { _ => Err(Error::ConvertResp(res)), }) } + Req::Commit => match self.service.call(Request::Commit) { + Ok(Response::Commit(res, take_snapshot)) => { + self.update_snapshot_task(take_snapshot); + Ok(Resp::Commit(res)) + } + Ok(resp) => Err(Error::ConvertResp(resp)), + Err(e) => Err(Error::Shell(e)), + }, _ => match Request::try_from(req.clone()) { Ok(request) => self .service @@ -177,6 +190,44 @@ impl AbcippShim { } } } + + fn update_snapshot_task(&mut self, take_snapshot: TakeSnapshot) { + if self + .snapshot_task + .as_ref() + .map(|t| t.is_finished()) + .unwrap_or_default() + { + let task = self.snapshot_task.take().unwrap(); + match task.join() { + Ok(Err(e)) => tracing::error!( + "Failed to create snapshot with error: {:?}", + e + ), + Err(e) => tracing::error!( + "Failed to join thread creating snapshot: {:?}", + e + ), + _ => {} + } + + } + let TakeSnapshot::Yes(db_path) = take_snapshot else { + return; + }; + // it's important that this block the thread although it will + // be incredibly fast. + let Ok(snapshot_creator) = SnapshotCreator::new(db_path) else { + tracing::error!("Could not open the DB in order to take a snapshot"); + return + }; + let snapshot_task = std::thread::spawn(move || { + snapshot_creator.write_to_file() + // do stuff with snapshot; + }); + // TODO: What to do if an old snapshot task is still running? + self.snapshot_task.replace(snapshot_task); + } } /// Indicates how [`AbciService`] should diff --git a/crates/node/src/shims/abcipp_shim_types.rs b/crates/node/src/shims/abcipp_shim_types.rs index 6decadeb6b..50a01f3559 100644 --- a/crates/node/src/shims/abcipp_shim_types.rs +++ b/crates/node/src/shims/abcipp_shim_types.rs @@ -1,6 +1,8 @@ use crate::facade::tendermint::v0_37::abci::{Request, Response}; pub mod shim { + use std::fmt::Debug; + use std::path::PathBuf; use thiserror::Error; @@ -30,6 +32,23 @@ pub mod shim { } } + #[derive(Debug, Clone)] + /// Indicate whether a state snapshot should be created + /// at a certain point in time + pub enum TakeSnapshot { + No, + Yes(PathBuf), + } + + impl> From> for TakeSnapshot { + fn from(value: Option) -> Self { + match value { + None => TakeSnapshot::No, + Some(p) => TakeSnapshot::Yes(p.as_ref().to_path_buf()), + } + } + } + #[allow(clippy::large_enum_variant)] /// Our custom request types. It is the duty of the shim to change /// the request types coming from tower-abci to these before forwarding @@ -101,7 +120,7 @@ pub mod shim { RevertProposal(response::RevertProposal), FinalizeBlock(response::FinalizeBlock), EndBlock(tm_response::EndBlock), - Commit(tm_response::Commit), + Commit(tm_response::Commit, TakeSnapshot), Flush, Echo(tm_response::Echo), CheckTx(tm_response::CheckTx), @@ -120,7 +139,7 @@ pub mod shim { Response::InitChain(inner) => Ok(Resp::InitChain(inner)), Response::Info(inner) => Ok(Resp::Info(inner)), Response::Query(inner) => Ok(Resp::Query(inner)), - Response::Commit(inner) => Ok(Resp::Commit(inner)), + Response::Commit(inner, _) => Ok(Resp::Commit(inner)), Response::Flush => Ok(Resp::Flush), Response::Echo(inner) => Ok(Resp::Echo(inner)), Response::CheckTx(inner) => Ok(Resp::CheckTx(inner)), diff --git a/crates/node/src/storage/mod.rs b/crates/node/src/storage/mod.rs index 49d37c81a8..063ee7f237 100644 --- a/crates/node/src/storage/mod.rs +++ b/crates/node/src/storage/mod.rs @@ -12,6 +12,7 @@ use blake2b_rs::{Blake2b, Blake2bBuilder}; use namada::state::StorageHasher; use namada_sdk::state::FullAccessState; pub use rocksdb::RocksDBUpdateVisitor; +pub use rocksdb::SnapshotCreator; #[derive(Default)] pub struct PersistentStorageHasher(Blake2bHasher); diff --git a/crates/node/src/storage/rocksdb.rs b/crates/node/src/storage/rocksdb.rs index 36a527fa18..77ffbe7cef 100644 --- a/crates/node/src/storage/rocksdb.rs +++ b/crates/node/src/storage/rocksdb.rs @@ -121,10 +121,11 @@ pub struct RocksDB(rocksdb::DB); pub struct RocksDBWriteBatch(WriteBatch); /// Open RocksDB for the DB -pub fn open( +fn open( path: impl AsRef, + read_only: bool, cache: Option<&rocksdb::Cache>, -) -> Result { +) -> Result { let logical_cores = num_cpus::get(); let compaction_threads = i32::try_from(num_of_threads( ENV_VAR_ROCKSDB_COMPACTION_THREADS, @@ -218,10 +219,13 @@ pub fn open( REPLAY_PROTECTION_CF, replay_protection_cf_opts, )); - - rocksdb::DB::open_cf_descriptors(&db_opts, path, cfs) - .map(RocksDB) - .map_err(|e| Error::DBError(e.into_string())) + if read_only { + rocksdb::DB::open_cf_descriptors_read_only(&db_opts, path, cfs, false) + .map_err(|e| Error::DBError(e.into_string())) + } else { + rocksdb::DB::open_cf_descriptors(&db_opts, path, cfs) + .map_err(|e| Error::DBError(e.into_string())) + } } impl Drop for RocksDB { @@ -703,7 +707,11 @@ impl DB for RocksDB { db_path: impl AsRef, cache: Option<&Self::Cache>, ) -> Self { - open(db_path, cache).expect("cannot open the DB") + Self(open(db_path, false, cache).expect("cannot open the DB")) + } + + fn path(&self) -> Option<&Path> { + Some(self.0.path()) } fn flush(&self, wait: bool) -> Result<()> { @@ -1431,6 +1439,29 @@ impl DB for RocksDB { } } +pub struct SnapshotCreator<'a> { + db: rocksdb::DB, + snapshot: Option>, +} + +impl<'a> SnapshotCreator<'a> { + /// Creates a new snapshot of RocksDB. + pub fn new(db_path: impl AsRef) -> Result> { + let db = open(db_path, true, None)?; + let mut snap = Self { + db, + snapshot: None + }; + snap.snapshot = Some((&snap.db).snapshot()); + Ok(snap) + } + + pub fn write_to_file(&self) -> std::result::Result<(), std::io::Error> { + + Ok(()) + } +} + /// A struct that can visit a set of updates, /// registering them all in the batch pub struct RocksDBUpdateVisitor<'db> { @@ -1836,7 +1867,7 @@ mod test { #[test] fn test_load_state() { let dir = tempdir().unwrap(); - let db = open(dir.path(), None).unwrap(); + let db = RocksDB::open(dir.path(), None); let mut batch = RocksDB::batch(); let last_height = BlockHeight::default(); @@ -1869,7 +1900,7 @@ mod test { #[test] fn test_read() { let dir = tempdir().unwrap(); - let mut db = open(dir.path(), None).unwrap(); + let mut db = RocksDB::open(dir.path(), None); let key = Key::parse("test").unwrap(); let batch_key = Key::parse("batch").unwrap(); @@ -1971,7 +2002,7 @@ mod test { #[test] fn test_prefix_iter() { let dir = tempdir().unwrap(); - let db = open(dir.path(), None).unwrap(); + let db = RocksDB::open(dir.path(), None); let prefix_0 = Key::parse("0").unwrap(); let key_0_a = prefix_0.push(&"a".to_string()).unwrap(); @@ -2024,7 +2055,7 @@ mod test { println!("Running with persist_diffs: {persist_diffs}"); let dir = tempdir().unwrap(); - let mut db = open(dir.path(), None).unwrap(); + let mut db = RocksDB::open(dir.path(), None); // A key that's gonna be added on a second block let add_key = Key::parse("add").unwrap(); @@ -2185,7 +2216,7 @@ mod test { #[test] fn test_diffs() { let dir = tempdir().unwrap(); - let mut db = open(dir.path(), None).unwrap(); + let mut db = RocksDB::open(dir.path(), None); let key_with_diffs = Key::parse("with_diffs").unwrap(); let key_without_diffs = Key::parse("without_diffs").unwrap(); diff --git a/crates/storage/src/db.rs b/crates/storage/src/db.rs index 2483ee850d..60bb530e86 100644 --- a/crates/storage/src/db.rs +++ b/crates/storage/src/db.rs @@ -134,6 +134,12 @@ pub trait DB: Debug { cache: Option<&Self::Cache>, ) -> Self; + /// Get the path to the db in the filesystem, + /// if it exists (the DB may be in-memory only) + fn path(&self) -> Option<&std::path::Path> { + None + } + /// Flush data on the memory to persistent them fn flush(&self, wait: bool) -> Result<()>; From 2cae5c487a4de24f1ddb3d831f473ca25174e524 Mon Sep 17 00:00:00 2001 From: satan Date: Thu, 6 Jun 2024 13:08:45 +0200 Subject: [PATCH 02/11] [feat]: Added ability to create snapshots of rocksDb --- crates/core/src/storage.rs | 13 + crates/node/src/shims/abcipp_shim.rs | 42 +- crates/node/src/storage/mod.rs | 3 +- crates/node/src/storage/rocksdb.rs | 558 +++++++++++++++++++++++---- 4 files changed, 528 insertions(+), 88 deletions(-) diff --git a/crates/core/src/storage.rs b/crates/core/src/storage.rs index 424e338d4e..d976398bee 100644 --- a/crates/core/src/storage.rs +++ b/crates/core/src/storage.rs @@ -128,6 +128,18 @@ impl DbColFam { DbColFam::REPLAYPROT => REPLAY_PROTECTION_CF, } } + + /// Return an array of all column families + pub fn all() -> [&'static str; 6] { + [ + SUBSPACE_CF, + BLOCK_CF, + STATE_CF, + DIFFS_CF, + ROLLBACK_CF, + REPLAY_PROTECTION_CF, + ] + } } impl FromStr for DbColFam { @@ -145,6 +157,7 @@ impl FromStr for DbColFam { } } } + /// Transaction index within block. #[derive( Default, diff --git a/crates/node/src/shims/abcipp_shim.rs b/crates/node/src/shims/abcipp_shim.rs index 55bbcc6c52..f330a49021 100644 --- a/crates/node/src/shims/abcipp_shim.rs +++ b/crates/node/src/shims/abcipp_shim.rs @@ -8,6 +8,7 @@ use namada::core::hash::Hash; use namada::core::key::tm_raw_hash_to_string; use namada::core::storage::BlockHeight; use namada::proof_of_stake::storage::find_validator_by_raw_hash; +use namada::state::DB; use namada::time::{DateTimeUtc, Utc}; use namada::tx::data::hash_tx; use namada_sdk::migrations::ScheduledMigration; @@ -26,7 +27,7 @@ use crate::facade::tendermint::v0_37::abci::{ }; use crate::facade::tower_abci::BoxError; use crate::shell::{EthereumOracleChannels, Shell}; -use crate::storage::SnapshotCreator; +use crate::storage::DbSnapshot; /// The shim wraps the shell, which implements ABCI++. /// The shim makes a crude translation between the ABCI interface currently used @@ -210,23 +211,40 @@ impl AbcippShim { ), _ => {} } - } let TakeSnapshot::Yes(db_path) = take_snapshot else { return; }; - // it's important that this block the thread although it will - // be incredibly fast. - let Ok(snapshot_creator) = SnapshotCreator::new(db_path) else { - tracing::error!("Could not open the DB in order to take a snapshot"); - return - }; + let base_dir = self.service.base_dir.clone(); + + let (snap_send, snap_recv) = tokio::sync::oneshot::channel(); let snapshot_task = std::thread::spawn(move || { - snapshot_creator.write_to_file() - // do stuff with snapshot; + let db = crate::storage::open(db_path, true, None) + .expect("Could not open DB"); + let snapshot = db.snapshot(); + // signal to main thread that the snapshot has finished + snap_send.send(()).unwrap(); + + let last_height = db + .read_last_block() + .expect("Could not read database") + .expect("Last block should exists") + .height; + let cfs = db.column_families(); + let path = DbSnapshot::path(last_height, base_dir.clone()); + + snapshot.write_to_file(cfs, &path)?; + DbSnapshot::cleanup(last_height, &base_dir) }); - // TODO: What to do if an old snapshot task is still running? - self.snapshot_task.replace(snapshot_task); + + // it's important that the thread is + // blocked until the snapshot is created + if snap_recv.blocking_recv().is_err() { + tracing::error!("Failed to start snapshot task.") + } else { + // TODO: What to do if an old snapshot task is still running? + self.snapshot_task.replace(snapshot_task); + } } } diff --git a/crates/node/src/storage/mod.rs b/crates/node/src/storage/mod.rs index 063ee7f237..fc8dbe5504 100644 --- a/crates/node/src/storage/mod.rs +++ b/crates/node/src/storage/mod.rs @@ -11,8 +11,7 @@ use arse_merkle_tree::H256; use blake2b_rs::{Blake2b, Blake2bBuilder}; use namada::state::StorageHasher; use namada_sdk::state::FullAccessState; -pub use rocksdb::RocksDBUpdateVisitor; -pub use rocksdb::SnapshotCreator; +pub use rocksdb::{open, DbSnapshot, RocksDBUpdateVisitor}; #[derive(Default)] pub struct PersistentStorageHasher(Blake2bHasher); diff --git a/crates/node/src/storage/rocksdb.rs b/crates/node/src/storage/rocksdb.rs index 77ffbe7cef..72957dbf1e 100644 --- a/crates/node/src/storage/rocksdb.rs +++ b/crates/node/src/storage/rocksdb.rs @@ -44,19 +44,22 @@ //! - `current/{hash}`: a hash included in the current block //! - `{hash}`: a hash included in previous blocks +use std::ffi::OsStr; use std::fs::File; use std::io::{BufWriter, Write}; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::Mutex; use borsh::{BorshDeserialize, BorshSerialize}; +use borsh_ext::BorshSerializeExt; use data_encoding::HEXLOWER; use itertools::Either; use namada::core::collections::HashSet; use namada::core::storage::{BlockHeight, Epoch, Header, Key, KeySeg}; use namada::core::{decode, encode, ethereum_events}; use namada::eth_bridge::storage::proof::BridgePoolRootProof; +use namada::hash::Hash; use namada::ledger::eth_bridge::storage::bridge_pool; use namada::replay_protection; use namada::state::merkle_tree::{ @@ -80,6 +83,7 @@ use rocksdb::{ DBCompressionType, Direction, FlushOptions, IteratorMode, Options, ReadOptions, WriteBatch, }; +use sha2::{Digest, Sha256}; use crate::config::utils::num_of_threads; use crate::storage; @@ -111,21 +115,27 @@ const ADDRESS_GEN_KEY_SEGMENT: &str = "address_gen"; const OLD_DIFF_PREFIX: &str = "old"; const NEW_DIFF_PREFIX: &str = "new"; +const MAX_CHUNK_SIZE: usize = 10_000_000; /// RocksDB handle #[derive(Debug)] -pub struct RocksDB(rocksdb::DB); +pub struct RocksDB { + /// Handle to the db + inner: rocksdb::DB, + /// Indicates if read only + read_only: bool, +} /// DB Handle for batch writes. #[derive(Default)] pub struct RocksDBWriteBatch(WriteBatch); /// Open RocksDB for the DB -fn open( +pub fn open( path: impl AsRef, read_only: bool, cache: Option<&rocksdb::Cache>, -) -> Result { +) -> Result { let logical_cores = num_cpus::get(); let compaction_threads = i32::try_from(num_of_threads( ENV_VAR_ROCKSDB_COMPACTION_THREADS, @@ -219,24 +229,34 @@ fn open( REPLAY_PROTECTION_CF, replay_protection_cf_opts, )); - if read_only { - rocksdb::DB::open_cf_descriptors_read_only(&db_opts, path, cfs, false) - .map_err(|e| Error::DBError(e.into_string())) + Ok(if read_only { + RocksDB { + inner: rocksdb::DB::open_cf_descriptors_read_only( + &db_opts, path, cfs, false, + ) + .map_err(|e| Error::DBError(e.into_string()))?, + read_only: true, + } } else { - rocksdb::DB::open_cf_descriptors(&db_opts, path, cfs) - .map_err(|e| Error::DBError(e.into_string())) - } + RocksDB { + inner: rocksdb::DB::open_cf_descriptors(&db_opts, path, cfs) + .map_err(|e| Error::DBError(e.into_string()))?, + read_only: false, + } + }) } impl Drop for RocksDB { fn drop(&mut self) { - self.flush(true).expect("flush failed"); + if !self.read_only { + self.flush(true).expect("flush failed"); + } } } impl RocksDB { fn get_column_family(&self, cf_name: &str) -> Result<&ColumnFamily> { - self.0 + self.inner .cf_handle(cf_name) .ok_or(Error::DBError("No {cf_name} column family".to_string())) } @@ -259,7 +279,7 @@ impl RocksDB { cf: &ColumnFamily, key: impl AsRef, ) -> Result>> { - self.0 + self.inner .get_cf(cf, key.as_ref()) .map_err(|e| Error::DBError(e.into_string())) } @@ -275,7 +295,7 @@ impl RocksDB { T: BorshSerialize, { if let Some(current_value) = self - .0 + .inner .get_cf(cf, key.as_ref()) .map_err(|e| Error::DBError(e.into_string()))? { @@ -457,13 +477,14 @@ impl RocksDB { ) { let read_opts = make_iter_read_opts(prefix.clone()); let iter = if let Some(prefix) = prefix { - self.0.iterator_cf_opt( + self.inner.iterator_cf_opt( cf, read_opts, IteratorMode::From(prefix.as_bytes(), Direction::Forward), ) } else { - self.0.iterator_cf_opt(cf, read_opts, IteratorMode::Start) + self.inner + .iterator_cf_opt(cf, read_opts, IteratorMode::Start) }; let mut buf = BufWriter::new(file); @@ -480,6 +501,10 @@ impl RocksDB { buf.flush().expect("Unable to write to output file"); } + pub fn snapshot(&self) -> DbSnapshot<'_> { + DbSnapshot(self.inner.snapshot()) + } + /// Rollback to previous block. Given the inner working of tendermint /// rollback and of the key structure of Namada, calling rollback more than /// once without restarting the chain results in a single rollback. @@ -653,7 +678,7 @@ impl RocksDB { let prefix = last_block.height.to_string(); let mut delete_keys = |cf: &ColumnFamily| { let read_opts = make_iter_read_opts(Some(prefix.clone())); - let iter = self.0.iterator_cf_opt( + let iter = self.inner.iterator_cf_opt( cf, read_opts, IteratorMode::From(prefix.as_bytes(), Direction::Forward), @@ -676,6 +701,23 @@ impl RocksDB { self.exec_batch(batch) } + #[inline] + pub fn column_families(&self) -> [(&'static str, &ColumnFamily); 6] { + DbColFam::all() + .iter() + .map(|cf| { + ( + *cf, + self.get_column_family(cf) + .expect("Failed to read column family"), + ) + }) + .collect::>() + .try_into() + .map_err(|_| "There should be exactly six column families") + .unwrap() + } + /// Read diffs of non-persisted key-vals that are only kept for rollback of /// one block height. #[cfg(test)] @@ -692,12 +734,148 @@ impl RocksDB { old_and_new_diff_key(key, height)?.1 }; - self.0 + self.inner .get_cf(rollback_cf, key) .map_err(|e| Error::DBError(e.into_string())) } } +pub struct DbSnapshot<'a>(pub rocksdb::Snapshot<'a>); + +impl<'a> DbSnapshot<'a> { + /// Write a snapshot of the database out to file. The last line + /// of the file contains metadata about how to break the file into + /// chunks. + pub fn write_to_file( + &self, + cfs: [(&'static str, &'a ColumnFamily); 6], + path: &Path, + ) -> std::io::Result<()> { + let file = File::create(path)?; + let mut buf = BufWriter::new(file); + let mut chunker = Chunker::new(MAX_CHUNK_SIZE); + for (cf_name, cf) in cfs { + let read_opts = make_iter_read_opts(None); + let iter = + self.0.iterator_cf_opt(cf, read_opts, IteratorMode::Start); + + for (key, raw_val, _gas) in PersistentPrefixIterator( + PrefixIterator::new(iter, String::default()), + // Empty string to prevent prefix stripping, the prefix is + // already in the enclosed iterator + ) { + let val = HEXLOWER.encode(&raw_val); + let bytes = format!("{cf_name}:{key}={val}\n"); + chunker.add_line(&bytes); + buf.write_all(bytes.as_bytes())?; + } + buf.flush()?; + } + let chunks = chunker.finalize(); + let val = HEXLOWER.encode(&chunks.serialize_to_vec()); + let bytes = format!("chunks:{val}"); + buf.write_all(bytes.as_bytes())?; + buf.flush()?; + Ok(()) + } + + /// Remove snapshots older than the latest + pub fn cleanup( + latest_height: BlockHeight, + base_dir: &Path, + ) -> std::io::Result<()> { + let toml = OsStr::new("toml"); + for entry in std::fs::read_dir(base_dir)? { + let entry = entry?; + if entry.path().is_file() && Some(toml) == entry.path().extension() + { + if let Some(name) = entry.path().file_name() { + let Some(height) = name + .to_string_lossy() + .strip_prefix("snapshot_") + .and_then(|n| n.strip_suffix(".toml")) + .and_then(|h| BlockHeight::from_str(h).ok()) + else { + continue; + }; + if height < latest_height { + _ = std::fs::remove_file(entry.path()); + } + }; + } + } + Ok(()) + } + + /// Create a path to save a snapshot at a specific block height. + pub fn path(height: BlockHeight, mut base_dir: PathBuf) -> PathBuf { + base_dir.push(format!("snapshot_{}.toml", height)); + base_dir + } +} + +/// A chunk of a snapshot. Includes the last line number in the file +/// for this chunk and a hash of the chunk contents. +#[derive( + Debug, Clone, Default, PartialEq, Eq, BorshSerialize, BorshDeserialize, +)] +struct Chunk { + boundary: u64, + hash: Hash, +} + +/// Builds a set of chunks from a stream of lines to be +/// written to a file. +#[derive(Debug, Clone)] +struct Chunker { + chunks: Vec, + max_size: usize, + current_boundary: u64, + current_size: usize, + hasher: Sha256, +} +impl Chunker { + fn new(max_size: usize) -> Self { + Self { + chunks: vec![], + max_size, + current_boundary: 0, + current_size: 0, + hasher: Sha256::default(), + } + } + + fn add_line(&mut self, line: &str) { + if checked!(self.current_size + line.as_bytes().len()).unwrap() + > self.max_size + && self.current_boundary != 0 + { + let mut hasher = Sha256::default(); + std::mem::swap(&mut hasher, &mut self.hasher); + let hash: [u8; 32] = hasher.finalize().into(); + self.chunks.push(Chunk { + boundary: self.current_boundary, + hash: Hash(hash), + }); + self.current_size = 0; + } + + self.current_size = + checked!(self.current_size + line.as_bytes().len()).unwrap(); + self.hasher.update(line.as_bytes()); + self.current_boundary = checked!(self.current_boundary + 1).unwrap(); + } + + fn finalize(mut self) -> Vec { + let hash: [u8; 32] = self.hasher.finalize().into(); + self.chunks.push(Chunk { + boundary: self.current_boundary, + hash: Hash(hash), + }); + self.chunks + } +} + impl DB for RocksDB { type Cache = rocksdb::Cache; type Migrator = DbUpdateType; @@ -707,17 +885,17 @@ impl DB for RocksDB { db_path: impl AsRef, cache: Option<&Self::Cache>, ) -> Self { - Self(open(db_path, false, cache).expect("cannot open the DB")) + open(db_path, false, cache).expect("cannot open the DB") } fn path(&self) -> Option<&Path> { - Some(self.0.path()) + Some(self.inner.path()) } fn flush(&self, wait: bool) -> Result<()> { let mut flush_opts = FlushOptions::default(); flush_opts.set_wait(wait); - self.0 + self.inner .flush_opt(&flush_opts) .map_err(|e| Error::DBError(e.into_string())) } @@ -1021,7 +1199,7 @@ impl DB for RocksDB { replay_protection::key(hash), ] { if self - .0 + .inner .get_pinned_cf(replay_protection_cf, key.to_string()) .map_err(|e| Error::DBError(e.into_string()))? .is_some() @@ -1069,7 +1247,7 @@ impl DB for RocksDB { } None => { // If it has an "old" val, it was deleted at this height - if self.0.key_may_exist_cf(diffs_cf, &old_val_key) { + if self.inner.key_may_exist_cf(diffs_cf, &old_val_key) { // check if it actually exists if self.read_value_bytes(diffs_cf, old_val_key)?.is_some() { return Ok(None); @@ -1092,7 +1270,7 @@ impl DB for RocksDB { None => { // Check if the value was created at this height instead, // which would mean that it wasn't present before - if self.0.key_may_exist_cf(diffs_cf, &new_val_key) { + if self.inner.key_may_exist_cf(diffs_cf, &new_val_key) { // check if it actually exists if self .read_value_bytes(diffs_cf, new_val_key)? @@ -1154,7 +1332,7 @@ impl DB for RocksDB { } fn exec_batch(&self, batch: Self::WriteBatch) -> Result<()> { - self.0 + self.inner .write(batch.0) .map_err(|e| Error::DBError(e.into_string())) } @@ -1439,29 +1617,6 @@ impl DB for RocksDB { } } -pub struct SnapshotCreator<'a> { - db: rocksdb::DB, - snapshot: Option>, -} - -impl<'a> SnapshotCreator<'a> { - /// Creates a new snapshot of RocksDB. - pub fn new(db_path: impl AsRef) -> Result> { - let db = open(db_path, true, None)?; - let mut snap = Self { - db, - snapshot: None - }; - snap.snapshot = Some((&snap.db).snapshot()); - Ok(snap) - } - - pub fn write_to_file(&self) -> std::result::Result<(), std::io::Error> { - - Ok(()) - } -} - /// A struct that can visit a set of updates, /// registering them all in the batch pub struct RocksDBUpdateVisitor<'db> { @@ -1572,7 +1727,7 @@ impl<'iter> DBIter<'iter> for RocksDB { .get_column_family(BLOCK_CF) .expect("{BLOCK_CF} column family should exist"); let read_opts = make_iter_read_opts(Some(prefix.clone())); - let iter = self.0.iterator_cf_opt( + let iter = self.inner.iterator_cf_opt( block_cf, read_opts, IteratorMode::From(prefix.as_bytes(), Direction::Forward), @@ -1677,7 +1832,7 @@ fn iter_prefix<'a>( _ => stripped_prefix.clone(), }; let read_opts = make_iter_read_opts(Some(prefix.clone())); - let iter = db.0.iterator_cf_opt( + let iter = db.inner.iterator_cf_opt( cf, read_opts, IteratorMode::From(prefix.as_bytes(), Direction::Forward), @@ -1851,6 +2006,7 @@ mod imp { #[cfg(test)] mod test { use namada::address::EstablishedAddressGen; + use namada::core::collections::HashMap; use namada::core::hash::Hash; use namada::core::storage::Epochs; use namada::ledger::storage::ConversionState; @@ -2194,10 +2350,11 @@ mod test { assert_eq!(deleted, Some(to_delete_val)); // Check the conversion state let state_cf = db.get_column_family(STATE_CF).unwrap(); - let conversion_state = - db.0.get_cf(state_cf, "conversion_state".as_bytes()) - .unwrap() - .unwrap(); + let conversion_state = db + .inner + .get_cf(state_cf, "conversion_state".as_bytes()) + .unwrap() + .unwrap(); assert_eq!(conversion_state, encode(&conversion_state_0)); for tx in [b"tx1", b"tx2", b"tx3", b"tx4"] { assert!( @@ -2253,15 +2410,15 @@ mod test { // present let (old_with_h0, new_with_h0) = old_and_new_diff_key(&key_with_diffs, height_0).unwrap(); - assert!(db.0.get_cf(diffs_cf, old_with_h0).unwrap().is_none()); - assert!(db.0.get_cf(diffs_cf, new_with_h0).unwrap().is_some()); + assert!(db.inner.get_cf(diffs_cf, old_with_h0).unwrap().is_none()); + assert!(db.inner.get_cf(diffs_cf, new_with_h0).unwrap().is_some()); // Diffs new key for `key_without_diffs` at height_0 must be // present let (old_wo_h0, new_wo_h0) = old_and_new_diff_key(&key_without_diffs, height_0).unwrap(); - assert!(db.0.get_cf(rollback_cf, old_wo_h0).unwrap().is_none()); - assert!(db.0.get_cf(rollback_cf, new_wo_h0).unwrap().is_some()); + assert!(db.inner.get_cf(rollback_cf, old_wo_h0).unwrap().is_none()); + assert!(db.inner.get_cf(rollback_cf, new_wo_h0).unwrap().is_some()); } // Write second block @@ -2293,27 +2450,27 @@ mod test { // Diffs keys for `key_with_diffs` at height_0 must be present let (old_with_h0, new_with_h0) = old_and_new_diff_key(&key_with_diffs, height_0).unwrap(); - assert!(db.0.get_cf(diffs_cf, old_with_h0).unwrap().is_none()); - assert!(db.0.get_cf(diffs_cf, new_with_h0).unwrap().is_some()); + assert!(db.inner.get_cf(diffs_cf, old_with_h0).unwrap().is_none()); + assert!(db.inner.get_cf(diffs_cf, new_with_h0).unwrap().is_some()); // Diffs keys for `key_without_diffs` at height_0 must be gone let (old_wo_h0, new_wo_h0) = old_and_new_diff_key(&key_without_diffs, height_0).unwrap(); - assert!(db.0.get_cf(rollback_cf, old_wo_h0).unwrap().is_none()); - assert!(db.0.get_cf(rollback_cf, new_wo_h0).unwrap().is_none()); + assert!(db.inner.get_cf(rollback_cf, old_wo_h0).unwrap().is_none()); + assert!(db.inner.get_cf(rollback_cf, new_wo_h0).unwrap().is_none()); // Diffs keys for `key_with_diffs` at height_1 must be present let (old_with_h1, new_with_h1) = old_and_new_diff_key(&key_with_diffs, height_1).unwrap(); - assert!(db.0.get_cf(diffs_cf, old_with_h1).unwrap().is_some()); - assert!(db.0.get_cf(diffs_cf, new_with_h1).unwrap().is_some()); + assert!(db.inner.get_cf(diffs_cf, old_with_h1).unwrap().is_some()); + assert!(db.inner.get_cf(diffs_cf, new_with_h1).unwrap().is_some()); // Diffs keys for `key_without_diffs` at height_1 must be // present let (old_wo_h1, new_wo_h1) = old_and_new_diff_key(&key_without_diffs, height_1).unwrap(); - assert!(db.0.get_cf(rollback_cf, old_wo_h1).unwrap().is_some()); - assert!(db.0.get_cf(rollback_cf, new_wo_h1).unwrap().is_some()); + assert!(db.inner.get_cf(rollback_cf, old_wo_h1).unwrap().is_some()); + assert!(db.inner.get_cf(rollback_cf, new_wo_h1).unwrap().is_some()); } // Write third block @@ -2345,27 +2502,27 @@ mod test { // Diffs keys for `key_with_diffs` at height_1 must be present let (old_with_h1, new_with_h1) = old_and_new_diff_key(&key_with_diffs, height_1).unwrap(); - assert!(db.0.get_cf(diffs_cf, old_with_h1).unwrap().is_some()); - assert!(db.0.get_cf(diffs_cf, new_with_h1).unwrap().is_some()); + assert!(db.inner.get_cf(diffs_cf, old_with_h1).unwrap().is_some()); + assert!(db.inner.get_cf(diffs_cf, new_with_h1).unwrap().is_some()); // Diffs keys for `key_without_diffs` at height_1 must be gone let (old_wo_h1, new_wo_h1) = old_and_new_diff_key(&key_without_diffs, height_1).unwrap(); - assert!(db.0.get_cf(rollback_cf, old_wo_h1).unwrap().is_none()); - assert!(db.0.get_cf(rollback_cf, new_wo_h1).unwrap().is_none()); + assert!(db.inner.get_cf(rollback_cf, old_wo_h1).unwrap().is_none()); + assert!(db.inner.get_cf(rollback_cf, new_wo_h1).unwrap().is_none()); // Diffs keys for `key_with_diffs` at height_2 must be present let (old_with_h2, new_with_h2) = old_and_new_diff_key(&key_with_diffs, height_2).unwrap(); - assert!(db.0.get_cf(diffs_cf, old_with_h2).unwrap().is_some()); - assert!(db.0.get_cf(diffs_cf, new_with_h2).unwrap().is_some()); + assert!(db.inner.get_cf(diffs_cf, old_with_h2).unwrap().is_some()); + assert!(db.inner.get_cf(diffs_cf, new_with_h2).unwrap().is_some()); // Diffs keys for `key_without_diffs` at height_2 must be // present let (old_wo_h2, new_wo_h2) = old_and_new_diff_key(&key_without_diffs, height_2).unwrap(); - assert!(db.0.get_cf(rollback_cf, old_wo_h2).unwrap().is_some()); - assert!(db.0.get_cf(rollback_cf, new_wo_h2).unwrap().is_some()); + assert!(db.inner.get_cf(rollback_cf, old_wo_h2).unwrap().is_some()); + assert!(db.inner.get_cf(rollback_cf, new_wo_h2).unwrap().is_some()); } } @@ -2410,4 +2567,257 @@ mod test { db.add_block_to_batch(block, batch, true) } + + /// Test that we chunk a series of lines + /// up correctly based on a max chunk size. + #[test] + fn test_chunker() { + let mut chunker = Chunker::new(10); + let lines = vec![ + "fffffggggghh", + "aaaa", + "bbbbb", + "fffffggggghh", + "cc", + "dddddddd", + "eeeeeeeeee", + "ab", + ]; + for l in lines { + chunker.add_line(l); + } + let chunks = chunker.finalize(); + let expected = vec![ + Chunk { + boundary: 1, + hash: Hash::sha256("fffffggggghh"), + }, + Chunk { + boundary: 3, + hash: Hash::sha256("aaaabbbbb".as_bytes()), + }, + Chunk { + boundary: 4, + hash: Hash::sha256("fffffggggghh"), + }, + Chunk { + boundary: 6, + hash: Hash::sha256("ccdddddddd".as_bytes()), + }, + Chunk { + boundary: 7, + hash: Hash::sha256("eeeeeeeeee".as_bytes()), + }, + Chunk { + boundary: 8, + hash: Hash::sha256("ab".as_bytes()), + }, + ]; + assert_eq!(expected, chunks); + let mut chunker = Chunker::new(10); + let lines = vec![ + "aaaa", + "bbbbb", + "fffffggggghh", + "cc", + "dddddddd", + "eeeeeeeeee", + "ab", + ]; + for l in lines { + chunker.add_line(l); + } + let chunks = chunker.finalize(); + let expected = vec![ + Chunk { + boundary: 2, + hash: Hash::sha256("aaaabbbbb".as_bytes()), + }, + Chunk { + boundary: 3, + hash: Hash::sha256("fffffggggghh"), + }, + Chunk { + boundary: 5, + hash: Hash::sha256("ccdddddddd".as_bytes()), + }, + Chunk { + boundary: 6, + hash: Hash::sha256("eeeeeeeeee".as_bytes()), + }, + Chunk { + boundary: 7, + hash: Hash::sha256("ab".as_bytes()), + }, + ]; + assert_eq!(expected, chunks); + } + + /// Test that we correctly delete snapshots + /// older than a given block height + #[test] + fn test_snapshot_cleanup() { + let temp = tempfile::tempdir().expect("Test failed"); + let base_dir = temp.path().to_path_buf(); + for i in 0..4 { + let mut path = base_dir.clone(); + path.push(format!("snapshot_{}.toml", i)); + _ = File::create(path).expect("Test failed") + } + let mut path = base_dir.clone(); + path.push("snapshot_0_backup.toml"); + _ = File::create(path).expect("Test failed"); + let mut path = base_dir.clone(); + path.push("snapshot_0.bak"); + _ = File::create(path).expect("Test failed"); + DbSnapshot::cleanup(2.into(), &base_dir).expect("Test failed"); + let mut expected = HashSet::from([ + "snapshot_2.toml", + "snapshot_3.toml", + "snapshot_0_backup.toml", + "snapshot_0.bak", + ]); + for entry in std::fs::read_dir(base_dir).expect("Test failed") { + let entry = entry.expect("Test failed"); + assert!(entry.path().is_file()); + let path = entry.path(); + let path = path.file_name().expect("Test failed"); + assert!(expected.swap_remove(path.to_str().unwrap())); + } + assert!(expected.is_empty()); + } + + /// Test that taking a snapshot actually + /// freezes the database in time even if + /// it is written to. + #[test] + fn test_snapshot_creation() { + let temp = tempfile::tempdir().expect("Test failed"); + let mut db = open(&temp, false, None).expect("Test failed"); + db.write_subspace_val( + 1.into(), + &Key::parse("bing/fucking/bong").expect("Test failed"), + [1u8; 64], + false, + ) + .expect("Test failed"); + // we need to persist the changes and restart in read-only mode + // as rocksdb doesn't allow multiple read/write instances + drop(db); + let db = open(&temp, true, None).expect("Test failed"); + // freeze the database at this point in time + let snapshot = db.snapshot(); + + // write a new entry to the db + let mut db2 = open(&temp, false, None).expect("Test failed"); + db2.write_subspace_val( + 2.into(), + &Key::parse("I/AM/BATMAN").expect("Test failed"), + [2u8; 32], + false, + ) + .expect("Test failed"); + // flush the data + drop(db2); + let db2 = open(&temp, false, None).expect("Test failed"); + + // collect all entries in the snapshot + let mut snapshot_entries = HashMap::new(); + for (_, cf) in db.column_families() { + let read_opts = make_iter_read_opts(None); + let iter = + snapshot + .0 + .iterator_cf_opt(cf, read_opts, IteratorMode::Start); + + for (key, raw_val, _gas) in PersistentPrefixIterator( + PrefixIterator::new(iter, String::default()), + // Empty string to prevent prefix stripping, the prefix is + // already in the enclosed iterator + ) { + snapshot_entries.insert(key, raw_val); + } + } + + // collect ALL entries in the db + let mut db_entries = HashMap::new(); + for (_, cf) in db2.column_families() { + let read_opts = make_iter_read_opts(None); + let iter = + db2.inner + .iterator_cf_opt(cf, read_opts, IteratorMode::Start); + + for (key, raw_val, _gas) in PersistentPrefixIterator( + PrefixIterator::new(iter, String::default()), + // Empty string to prevent prefix stripping, the prefix is + // already in the enclosed iterator + ) { + db_entries.insert(key, raw_val); + } + } + + let expected_snap = HashMap::from([ + ("bing/fucking/bong".to_string(), vec![1u8; 64]), + ( + "0000000000002/new/bing/fucking/bong".to_string(), + vec![1u8; 64], + ), + ]); + assert_eq!(expected_snap, snapshot_entries); + let expected_db = HashMap::from([ + ("bing/fucking/bong".to_string(), vec![1u8; 64]), + ( + "0000000000002/new/bing/fucking/bong".to_string(), + vec![1u8; 64], + ), + ("I/AM/BATMAN".to_string(), vec![2u8; 32]), + ("0000000000004/new/I/AM/BATMAN".to_string(), vec![2u8; 32]), + ]); + assert_eq!(expected_db, db_entries); + } + + /// Test that [`DbSnapshot`] writes a snapshot + /// to disk correctly. + #[test] + fn test_db_snapshot() { + let temp = tempfile::tempdir().expect("Test failed"); + let mut db = open(&temp, false, None).expect("Test failed"); + db.write_subspace_val( + 1.into(), + &Key::parse("bing/fucking/bong").expect("Test failed"), + [1u8; 1], + false, + ) + .expect("Test failed"); + // we need to persist the changes and restart in read-only mode + // as rocksdb doesn't allow multiple read/write instances + drop(db); + let db = open(&temp, true, None).expect("Test failed"); + // freeze the database at this point in time + let snapshot = db.snapshot(); + let mut path = temp.path().to_path_buf(); + path.push("snapshot_0.toml"); + snapshot + .write_to_file(db.column_families(), &path) + .expect("Test failed"); + let snapshot = std::fs::read_to_string(path).expect("Test failed"); + let chunks = vec![Chunk { + boundary: 2, + hash: Hash::sha256( + "subspace:bing/fucking/bong=01\nrollback:0000000000002/new/\ + bing/fucking/bong=01\n" + .as_bytes(), + ), + }]; + let chunk_val = + format!("chunks:{}", HEXLOWER.encode(&chunks.serialize_to_vec())); + let expected = [ + "subspace:bing/fucking/bong=01".to_string(), + "rollback:0000000000002/new/bing/fucking/bong=01".to_string(), + chunk_val, + ]; + + let lines: Vec<&str> = snapshot.split('\n').collect(); + assert_eq!(lines, expected); + } } From bb835a1c4a7d35519bcf5757da0dd7df24f3e329 Mon Sep 17 00:00:00 2001 From: satan Date: Thu, 6 Jun 2024 14:00:41 +0200 Subject: [PATCH 03/11] [feat]: Added snapshot frequency to config --- crates/apps_lib/src/config/mod.rs | 5 +++++ crates/node/src/shell/mod.rs | 18 +++++++++++++++++- crates/node/src/shims/abcipp_shim.rs | 3 ++- 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/crates/apps_lib/src/config/mod.rs b/crates/apps_lib/src/config/mod.rs index c6484d10b0..adaf927f17 100644 --- a/crates/apps_lib/src/config/mod.rs +++ b/crates/apps_lib/src/config/mod.rs @@ -7,6 +7,7 @@ pub mod utils; use std::fs::{create_dir_all, File}; use std::io::Write; +use std::num::NonZeroU64; use std::path::{Path, PathBuf}; use directories::ProjectDirs; @@ -119,6 +120,9 @@ pub struct Shell { pub action_at_height: Option, /// Specify if tendermint is started as validator, fullnode or seednode pub tendermint_mode: TendermintMode, + /// When set, indicates after how many blocks a new snapshot + /// will be taken (counting from the first block) + pub blocks_between_snapshots: Option, } impl Ledger { @@ -147,6 +151,7 @@ impl Ledger { cometbft_dir: COMETBFT_DIR.into(), action_at_height: None, tendermint_mode: mode, + blocks_between_snapshots: None, }, cometbft: tendermint_config, ethereum_bridge: ethereum_bridge::ledger::Config::default(), diff --git a/crates/node/src/shell/mod.rs b/crates/node/src/shell/mod.rs index e40acb63db..614711036d 100644 --- a/crates/node/src/shell/mod.rs +++ b/crates/node/src/shell/mod.rs @@ -25,6 +25,7 @@ mod vote_extensions; use std::cell::RefCell; use std::collections::BTreeSet; +use std::num::NonZeroU64; use std::path::{Path, PathBuf}; #[allow(unused_imports)] use std::rc::Rc; @@ -77,6 +78,7 @@ use crate::facade::tendermint::{self, validator}; use crate::facade::tendermint_proto::v0_37::crypto::public_key; use crate::shims::abcipp_shim_types::shim; use crate::shims::abcipp_shim_types::shim::response::TxResult; +use crate::shims::abcipp_shim_types::shim::TakeSnapshot; use crate::{storage, tendermint_node}; fn key_to_tendermint( @@ -355,6 +357,9 @@ where event_log: EventLog, /// A migration that can be scheduled at a given block height pub scheduled_migration: Option>, + /// When set, indicates after how many blocks a new snapshot + /// will be taken (counting from the first block) + pub blocks_between_snapshots: Option, } /// Storage key filter to store the diffs into the storage. Return `false` for @@ -548,6 +553,7 @@ where // TODO(namada#3237): config event log params event_log: EventLog::default(), scheduled_migration, + blocks_between_snapshots: config.shell.blocks_between_snapshots, }; shell.update_eth_oracle(&Default::default()); shell @@ -679,6 +685,16 @@ where self.broadcast_queued_txs(); + let take_snapshot = match self.blocks_between_snapshots { + Some(b) => committed_height.0 % b == 0, + _ => false, + }; + let take_snapshot = if take_snapshot { + self.state.db().path().into() + } else { + TakeSnapshot::No + }; + shim::Response::Commit( response::Commit { // NB: by passing 0, we forbid CometBFT from deleting @@ -687,7 +703,7 @@ where // NB: current application hash data: merkle_root.0.to_vec().into(), }, - self.state.db().path().into(), + take_snapshot, ) } diff --git a/crates/node/src/shims/abcipp_shim.rs b/crates/node/src/shims/abcipp_shim.rs index f330a49021..5269cc22e1 100644 --- a/crates/node/src/shims/abcipp_shim.rs +++ b/crates/node/src/shims/abcipp_shim.rs @@ -242,7 +242,8 @@ impl AbcippShim { if snap_recv.blocking_recv().is_err() { tracing::error!("Failed to start snapshot task.") } else { - // TODO: What to do if an old snapshot task is still running? + // N.B. If a task is still running, it will continue + // in the background but we will forget about it. self.snapshot_task.replace(snapshot_task); } } From 8969ea06b1bfe209f76f38669997190c537e7bfd Mon Sep 17 00:00:00 2001 From: satan Date: Thu, 6 Jun 2024 14:11:23 +0200 Subject: [PATCH 04/11] changelog --- .../unreleased/improvements/3383-add-db-snapshots.md | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 .changelog/unreleased/improvements/3383-add-db-snapshots.md diff --git a/.changelog/unreleased/improvements/3383-add-db-snapshots.md b/.changelog/unreleased/improvements/3383-add-db-snapshots.md new file mode 100644 index 0000000000..ca678f1245 --- /dev/null +++ b/.changelog/unreleased/improvements/3383-add-db-snapshots.md @@ -0,0 +1,10 @@ +* Resolves the first two points of Issue [\#3307](https://github.com/anoma/namada/issues/3307): + - Add the ability to create chunkable snapshots to our rocksdb implementation + - Spawn a background task to create snapshots are certain blockheights + + Specifically adds a config parameter that indicates after how many blocks a + snapshot should be created. If set, then on the corresponding calls to commit, + a background task is spun up that takes a snapshot of rocksDB and writes it + in a convenient format to a file. This file contains metadata of how to be + broken up into chunks. Once a new snapshot is created, older snapshots are + removed. ([\#3383](https://github.com/anoma/namada/pull/3383)) \ No newline at end of file From 743dfec97e5222baad6342687db696f69dcef374 Mon Sep 17 00:00:00 2001 From: Jacob Turner Date: Wed, 12 Jun 2024 10:01:33 +0200 Subject: [PATCH 05/11] Update crates/node/src/storage/rocksdb.rs Co-authored-by: Tiago Carvalho --- crates/node/src/storage/rocksdb.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/node/src/storage/rocksdb.rs b/crates/node/src/storage/rocksdb.rs index 72957dbf1e..53d360e861 100644 --- a/crates/node/src/storage/rocksdb.rs +++ b/crates/node/src/storage/rocksdb.rs @@ -863,7 +863,7 @@ impl Chunker { self.current_size = checked!(self.current_size + line.as_bytes().len()).unwrap(); self.hasher.update(line.as_bytes()); - self.current_boundary = checked!(self.current_boundary + 1).unwrap(); + checked!(self.current_boundary += 1).unwrap(); } fn finalize(mut self) -> Vec { From 06d79002d395c0881bb46aa0f700e11ffd68fb1b Mon Sep 17 00:00:00 2001 From: Jacob Turner Date: Wed, 12 Jun 2024 10:01:39 +0200 Subject: [PATCH 06/11] Update crates/node/src/storage/rocksdb.rs Co-authored-by: Tiago Carvalho --- crates/node/src/storage/rocksdb.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/node/src/storage/rocksdb.rs b/crates/node/src/storage/rocksdb.rs index 53d360e861..1a29846ffd 100644 --- a/crates/node/src/storage/rocksdb.rs +++ b/crates/node/src/storage/rocksdb.rs @@ -860,8 +860,7 @@ impl Chunker { self.current_size = 0; } - self.current_size = - checked!(self.current_size + line.as_bytes().len()).unwrap(); + checked!(self.current_size += line.as_bytes().len()).unwrap(); self.hasher.update(line.as_bytes()); checked!(self.current_boundary += 1).unwrap(); } From a6b0a295bbab6816c3219d4a8d9ccbaf976975e6 Mon Sep 17 00:00:00 2001 From: satan Date: Wed, 12 Jun 2024 10:15:09 +0200 Subject: [PATCH 07/11] review comments --- Cargo.lock | 1 + crates/node/Cargo.toml | 1 + crates/node/src/shell/mod.rs | 26 +- crates/node/src/shims/abcipp_shim.rs | 4 +- crates/node/src/storage/rocksdb.rs | 14 +- crates/sdk/src/masp/utils.rs | 395 +++++++++++++++++++++++++++ 6 files changed, 423 insertions(+), 18 deletions(-) create mode 100644 crates/sdk/src/masp/utils.rs diff --git a/Cargo.lock b/Cargo.lock index c5f83204df..8b0db320df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5118,6 +5118,7 @@ version = "0.39.0" dependencies = [ "assert_matches", "async-trait", + "base64 0.13.1", "blake2b-rs", "borsh 1.2.1", "borsh-ext", diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 7e4d13b9c5..108d109b89 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -52,6 +52,7 @@ namada_test_utils = {path = "../test_utils", optional = true} arse-merkle-tree = { workspace = true, features = ["blake2b"] } async-trait.workspace = true +base64.workspace = true blake2b-rs.workspace = true borsh.workspace = true borsh-ext.workspace = true diff --git a/crates/node/src/shell/mod.rs b/crates/node/src/shell/mod.rs index 614711036d..06c9b0ac59 100644 --- a/crates/node/src/shell/mod.rs +++ b/crates/node/src/shell/mod.rs @@ -684,16 +684,7 @@ where ); self.broadcast_queued_txs(); - - let take_snapshot = match self.blocks_between_snapshots { - Some(b) => committed_height.0 % b == 0, - _ => false, - }; - let take_snapshot = if take_snapshot { - self.state.db().path().into() - } else { - TakeSnapshot::No - }; + let take_snapshot = self.check_snapshot_required(); shim::Response::Commit( response::Commit { @@ -707,6 +698,21 @@ where ) } + /// Check if we have reached a block height at which we should take a + /// snapshot + fn check_snapshot_required(&self) -> TakeSnapshot { + let committed_height = self.state.in_mem().get_last_block_height(); + let take_snapshot = match self.blocks_between_snapshots { + Some(b) => committed_height.0 % b == 0, + _ => false, + }; + if take_snapshot { + self.state.db().path().into() + } else { + TakeSnapshot::No + } + } + /// Updates the Ethereum oracle's last processed block. #[inline] fn bump_last_processed_eth_block(&mut self) { diff --git a/crates/node/src/shims/abcipp_shim.rs b/crates/node/src/shims/abcipp_shim.rs index 5269cc22e1..c68f2d2a88 100644 --- a/crates/node/src/shims/abcipp_shim.rs +++ b/crates/node/src/shims/abcipp_shim.rs @@ -238,7 +238,9 @@ impl AbcippShim { }); // it's important that the thread is - // blocked until the snapshot is created + // blocked until the snapshot is created so that no writes + // happen to the db while snapshotting. We want the db frozen + // at this specific point in time. if snap_recv.blocking_recv().is_err() { tracing::error!("Failed to start snapshot task.") } else { diff --git a/crates/node/src/storage/rocksdb.rs b/crates/node/src/storage/rocksdb.rs index 1a29846ffd..8f4ce3b6db 100644 --- a/crates/node/src/storage/rocksdb.rs +++ b/crates/node/src/storage/rocksdb.rs @@ -764,7 +764,7 @@ impl<'a> DbSnapshot<'a> { // Empty string to prevent prefix stripping, the prefix is // already in the enclosed iterator ) { - let val = HEXLOWER.encode(&raw_val); + let val = base64::encode(raw_val); let bytes = format!("{cf_name}:{key}={val}\n"); chunker.add_line(&bytes); buf.write_all(bytes.as_bytes())?; @@ -772,7 +772,7 @@ impl<'a> DbSnapshot<'a> { buf.flush()?; } let chunks = chunker.finalize(); - let val = HEXLOWER.encode(&chunks.serialize_to_vec()); + let val = base64::encode(chunks.serialize_to_vec()); let bytes = format!("chunks:{val}"); buf.write_all(bytes.as_bytes())?; buf.flush()?; @@ -2803,16 +2803,16 @@ mod test { let chunks = vec![Chunk { boundary: 2, hash: Hash::sha256( - "subspace:bing/fucking/bong=01\nrollback:0000000000002/new/\ - bing/fucking/bong=01\n" + "subspace:bing/fucking/bong=AQ==\nrollback:0000000000002/new/\ + bing/fucking/bong=AQ==\n" .as_bytes(), ), }]; let chunk_val = - format!("chunks:{}", HEXLOWER.encode(&chunks.serialize_to_vec())); + format!("chunks:{}", base64::encode(chunks.serialize_to_vec())); let expected = [ - "subspace:bing/fucking/bong=01".to_string(), - "rollback:0000000000002/new/bing/fucking/bong=01".to_string(), + "subspace:bing/fucking/bong=AQ==".to_string(), + "rollback:0000000000002/new/bing/fucking/bong=AQ==".to_string(), chunk_val, ]; diff --git a/crates/sdk/src/masp/utils.rs b/crates/sdk/src/masp/utils.rs new file mode 100644 index 0000000000..6dc740abab --- /dev/null +++ b/crates/sdk/src/masp/utils.rs @@ -0,0 +1,395 @@ +//! Helper functions and types + +use std::sync::{Arc, Mutex}; +use namada_core::storage::BlockHeight; +use namada_tx::{IndexedTx, Tx}; + +use crate::error::{Error, QueryError}; +use crate::io::Io; +use crate::masp::{extract_masp_tx, get_indexed_masp_events_at_height, IndexedNoteEntry, Unscanned}; +use crate::queries::Client; + +/// When retrying to fetch all notes in a +/// loop, this dictates the strategy for +/// how many attempts should be made. +pub enum RetryStrategy { + /// Always retry + Forever, + /// Limit number of retries to a fixed number + Times(u64), +} + +impl Iterator for RetryStrategy { + type Item = (); + + fn next(&mut self) -> Option { + match self { + Self::Forever => Some(()), + Self::Times(ref mut count) => { + if *count == 0 { + None + } else { + *count -= 1; + Some(()) + } + } + } + } +} + +/// This abstracts away the implementation details +/// of how shielded-sync fetches the necessary data +/// from a remote server. +pub trait MaspClient<'client, C: Client> { + /// Create a new [`MaspClient`] given an rpc client. + fn new(client: &'client C) -> Self + where + Self: 'client; + + /// Fetches shielded transfers + #[allow(async_fn_in_trait)] + async fn fetch_shielded_transfer( + &self, + progress: &impl ProgressTracker, + tx_sender: FetchQueueSender, + from: u64, + to: u64, + ) -> Result<(), Error>; +} + +/// An inefficient MASP client which simply uses a +/// client to the blockchain to query it directly. +pub struct LedgerMaspClient<'client, C: Client> { + client: &'client C, +} + +#[cfg(not(target_family = "wasm"))] +impl<'client, C: Client + Sync> MaspClient<'client, C> for LedgerMaspClient<'client, C> + where + LedgerMaspClient<'client, C>: 'client, +{ + fn new(client: &'client C) -> Self + where + Self: 'client, + { + Self { client } + } + + + async fn fetch_shielded_transfer( + &self, + progress: &impl ProgressTracker, + mut tx_sender: FetchQueueSender, + from: u64, + to: u64, + ) -> Result<(), Error> { + // Fetch all the transactions we do not have yet + let mut fetch_iter = progress.fetch(from..=to); + + while let Some(height) = fetch_iter.peek() { + let height = *height; + if tx_sender.contains_height(height) { + fetch_iter.next(); + continue; + } + + let txs_results = match get_indexed_masp_events_at_height( + self.client, + height.into(), + None, + ) + .await? + { + Some(events) => events, + None => { + fetch_iter.next(); + continue; + } + }; + + // Query the actual block to get the txs bytes. If we only need one + // tx it might be slightly better to query the /tx endpoint to + // reduce the amount of data sent over the network, but this is a + // minimal improvement and it's even hard to tell how many times + // we'd need a single masp tx to make this worth it + let block = self + .client + .block(height as u32) + .await + .map_err(|e| Error::from(QueryError::General(e.to_string())))? + .block + .data; + + for (idx, masp_sections_refs) in txs_results { + let tx = Tx::try_from(block[idx.0 as usize].as_ref()) + .map_err(|e| Error::Other(e.to_string()))?; + let extracted_masp_txs = + extract_masp_tx(&tx, &masp_sections_refs).await?; + + tx_sender.send(( + IndexedTx { + height: height.into(), + index: idx, + }, + extracted_masp_txs, + )); + } + fetch_iter.next(); + } + + Ok(()) + } +} + + + +/// A channel-like struct for "sending" newly fetched blocks +/// to the scanning algorithm. +/// +/// Holds a pointer to the unscanned cache which it can append to. +/// Furthermore, has an actual channel for keeping track if +/// 1. The process in possession of the channel is still alive +/// 2. Quickly updating the latest block height scanned. +#[derive(Clone)] +pub struct FetchQueueSender { + cache: Unscanned, + last_fetched: flume::Sender, +} + +/// A channel-like struct for "receiving" new fetched +/// blocks for the scanning algorithm. +/// +/// This is implemented as an iterator for the scanning +/// algorithm. This receiver pulls from the cache until +/// it is empty. It then waits until new entries appear +/// in the cache or the sender hangs up. +#[derive(Clone)] +pub(super) struct FetchQueueReceiver { + cache: Unscanned, + last_fetched: flume::Receiver, +} + +impl FetchQueueReceiver { + /// Check if the sender has hung up. + fn sender_alive(&self) -> bool { + self.last_fetched.sender_count() > 0 + } +} + +impl Iterator for FetchQueueReceiver { + type Item = IndexedNoteEntry; + + fn next(&mut self) -> Option { + if let Some(entry) = self.cache.pop_first() { + Some(entry) + } else { + while self.sender_alive() { + if let Some(entry) = self.cache.pop_first() { + return Some(entry); + } + } + None + } + } + + fn size_hint(&self) -> (usize, Option) { + let size = self.last_fetched.len(); + (size, Some(size)) + } +} + +impl FetchQueueSender { + /// Checks if the channel is already populated for the given block height + pub(super) fn contains_height(&self, height: u64) -> bool { + self.cache.contains_height(height) + } + + /// Send a new value of the channel + pub(super) fn send(&mut self, data: IndexedNoteEntry) { + self.last_fetched.send(data.0.height).unwrap(); + self.cache.insert(data); + } +} + +/// A convenience for creating a channel for fetching blocks. +pub mod fetch_channel { + + use super::{FetchQueueReceiver, FetchQueueSender, Unscanned}; + pub(in super::super) fn new( + cache: Unscanned, + ) -> (FetchQueueSender, FetchQueueReceiver) { + let (fetch_send, fetch_recv) = flume::unbounded(); + ( + FetchQueueSender { + cache: cache.clone(), + last_fetched: fetch_send, + }, + FetchQueueReceiver { + cache: cache.clone(), + last_fetched: fetch_recv, + }, + ) + } +} + +/// An enum to indicate how to track progress depending on +/// whether sync is currently fetch or scanning blocks. +#[derive(Debug, Copy, Clone)] +pub enum ProgressType { + /// Fetch + Fetch, + /// Scan + Scan, +} + +/// A peekable iterator interface +pub trait PeekableIter { + /// Peek at next element + fn peek(&mut self) -> Option<&I>; + + /// get next element + fn next(&mut self) -> Option; +} + +impl PeekableIter for std::iter::Peekable + where + I: Iterator, +{ + fn peek(&mut self) -> Option<&J> { + self.peek() + } + + fn next(&mut self) -> Option { + ::next(self) + } +} + +/// This trait keeps track of how much progress the +/// shielded sync algorithm has made relative to the inputs. +/// +/// It should track how much has been fetched and scanned and +/// whether the fetching has been finished. +/// +/// Additionally, it has access to IO in case the struct implementing +/// this trait wishes to log this progress. +pub trait ProgressTracker { + /// Get an IO handle + fn io(&self) -> &IO; + + /// Return an iterator to fetched shielded transfers + fn fetch(&self, items: I) -> impl PeekableIter + where + I: Iterator; + + /// Return an iterator over MASP transactions to be scanned + fn scan( + &self, + items: I, + ) -> impl Iterator + Send + where + I: Iterator + Send; + + /// The number of blocks that need to be fetched + fn left_to_fetch(&self) -> usize; +} + +/// The default type for tracking the progress of shielded-sync. +#[derive(Debug, Clone)] +pub struct DefaultTracker<'io, IO: Io> { + io: &'io IO, + progress: Arc>, +} + +impl<'io, IO: Io> DefaultTracker<'io, IO> { + /// New [`DefaultTracker`] + pub fn new(io: &'io IO) -> Self { + Self { + io, + progress: Arc::new(Mutex::new(Default::default())), + } + } +} + +#[derive(Default, Copy, Clone, Debug)] +pub(super) struct IterProgress { + pub index: usize, + pub length: usize, +} + +pub(super) struct DefaultFetchIterator + where + I: Iterator, +{ + pub inner: I, + pub progress: Arc>, + pub peeked: Option, +} + +impl PeekableIter for DefaultFetchIterator + where + I: Iterator, +{ + fn peek(&mut self) -> Option<&u64> { + if self.peeked.is_none() { + self.peeked = self.inner.next(); + } + self.peeked.as_ref() + } + + fn next(&mut self) -> Option { + self.peek(); + let item = self.peeked.take()?; + let mut locked = self.progress.lock().unwrap(); + locked.index += 1; + Some(item) + } +} + +impl<'io, IO: Io> ProgressTracker for DefaultTracker<'io, IO> { + fn io(&self) -> &IO { + self.io + } + + fn fetch(&self, items: I) -> impl PeekableIter + where + I: Iterator, + { + { + let mut locked = self.progress.lock().unwrap(); + locked.length = items.size_hint().0; + } + DefaultFetchIterator { + inner: items, + progress: self.progress.clone(), + peeked: None, + } + } + + fn scan(&self, items: I) -> impl Iterator + Send + where + I: IntoIterator, + { + let items: Vec<_> = items.into_iter().collect(); + items.into_iter() + } + + fn left_to_fetch(&self) -> usize { + let locked = self.progress.lock().unwrap(); + locked.length - locked.index + } +} + +#[cfg(test)] +mod util_tests { + use crate::masp::utils::RetryStrategy; + + #[test] + fn test_retry_strategy() { + let strategy = RetryStrategy::Times(3); + let mut counter = 0; + for _ in strategy { + counter += 1; + } + assert_eq!(counter, 3); + } +} From f768ef9a3468c388623846df9e4e142d33e649b2 Mon Sep 17 00:00:00 2001 From: satan Date: Thu, 13 Jun 2024 10:28:30 +0200 Subject: [PATCH 08/11] addressing review comment --- crates/node/src/shims/abcipp_shim.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/node/src/shims/abcipp_shim.rs b/crates/node/src/shims/abcipp_shim.rs index c68f2d2a88..a490b043a5 100644 --- a/crates/node/src/shims/abcipp_shim.rs +++ b/crates/node/src/shims/abcipp_shim.rs @@ -193,12 +193,12 @@ impl AbcippShim { } fn update_snapshot_task(&mut self, take_snapshot: TakeSnapshot) { - if self + let snapshot_taken = self .snapshot_task .as_ref() .map(|t| t.is_finished()) - .unwrap_or_default() - { + .unwrap_or_default(); + if snapshot_taken { let task = self.snapshot_task.take().unwrap(); match task.join() { Ok(Err(e)) => tracing::error!( From df8c15abd9a78f80dc8da7870624e2c895c7b3ba Mon Sep 17 00:00:00 2001 From: satan Date: Fri, 7 Jun 2024 12:10:51 +0200 Subject: [PATCH 09/11] Added changelog --- .../3386-listsnapshots-loadsnapshotchunk.md | 7 + crates/node/src/lib.rs | 8 +- crates/node/src/shell/mod.rs | 3 + crates/node/src/shell/snapshots.rs | 61 +++++ crates/node/src/shims/abcipp_shim.rs | 4 +- crates/node/src/storage/mod.rs | 2 +- crates/node/src/storage/rocksdb.rs | 255 +++++++++++++++--- 7 files changed, 301 insertions(+), 39 deletions(-) create mode 100644 .changelog/unreleased/improvements/3386-listsnapshots-loadsnapshotchunk.md create mode 100644 crates/node/src/shell/snapshots.rs diff --git a/.changelog/unreleased/improvements/3386-listsnapshots-loadsnapshotchunk.md b/.changelog/unreleased/improvements/3386-listsnapshots-loadsnapshotchunk.md new file mode 100644 index 0000000000..24fb15fb6d --- /dev/null +++ b/.changelog/unreleased/improvements/3386-listsnapshots-loadsnapshotchunk.md @@ -0,0 +1,7 @@ + - Addresses the third point and part of the fourth point of Issue + [\#3307](https://github.com/anoma/namada/issues/3307) + * Adds chunking logic to snapshots + * Implements the `ListSnapshots` ABCI call + * Implements the `LoadSnapshotChunk` ABCI call + + ([\#3386](https://github.com/anoma/namada/pull/3386)) diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 89e9e3fcdb..aaf4b4be0d 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -153,14 +153,14 @@ impl Shell { Ok(Response::CheckTx(self.mempool_validate(&tx.tx, r#type))) } Request::ListSnapshots => { - Ok(Response::ListSnapshots(Default::default())) + self.list_snapshots().map(Response::ListSnapshots) } Request::OfferSnapshot(_) => { Ok(Response::OfferSnapshot(Default::default())) } - Request::LoadSnapshotChunk(_) => { - Ok(Response::LoadSnapshotChunk(Default::default())) - } + Request::LoadSnapshotChunk(req) => self + .load_snapshot_chunk(req) + .map(Response::LoadSnapshotChunk), Request::ApplySnapshotChunk(_) => { Ok(Response::ApplySnapshotChunk(Default::default())) } diff --git a/crates/node/src/shell/mod.rs b/crates/node/src/shell/mod.rs index 06c9b0ac59..4f9eae585e 100644 --- a/crates/node/src/shell/mod.rs +++ b/crates/node/src/shell/mod.rs @@ -16,6 +16,7 @@ pub mod prepare_proposal; use namada::state::State; pub mod process_proposal; pub(super) mod queries; +mod snapshots; mod stats; #[cfg(any(test, feature = "testing"))] #[allow(dead_code)] @@ -122,6 +123,8 @@ pub enum Error { Storage(#[from] namada::state::StorageError), #[error("Transaction replay attempt: {0}")] ReplayAttempt(String), + #[error("Error with snapshots: {0}")] + Snapshot(std::io::Error), } impl From for TxResult { diff --git a/crates/node/src/shell/snapshots.rs b/crates/node/src/shell/snapshots.rs new file mode 100644 index 0000000000..a05d7ed813 --- /dev/null +++ b/crates/node/src/shell/snapshots.rs @@ -0,0 +1,61 @@ +use borsh_ext::BorshSerializeExt; +use namada::hash::{Hash, Sha256Hasher}; +use namada::state::BlockHeight; + +use super::{Error, Result}; +use crate::facade::tendermint::abci::types::Snapshot; +use crate::facade::tendermint::v0_37::abci::{ + request as tm_request, response as tm_response, +}; +// use crate::facade::tendermint_proto::v0_37::abci::Snapshot; +use crate::shell::Shell; +use crate::storage; +use crate::storage::{DbSnapshot, SnapshotMetadata}; + +impl Shell { + /// List the snapshot files held locally. Furthermore, the number + /// of chunks, as hash of each chunk, and a hash of the chunk + /// metadata are provided so that syncing nodes can verify can verify + /// snapshots they receive. + pub fn list_snapshots(&self) -> Result { + if self.blocks_between_snapshots.is_none() { + Ok(Default::default()) + } else { + let snapshots = DbSnapshot::files(&self.base_dir) + .map_err(Error::Snapshot)? + .into_iter() + .map(|SnapshotMetadata { height, chunks, .. }| { + let hash = Hash::sha256(chunks.serialize_to_vec()).0; + Snapshot { + #[allow(clippy::cast_possible_truncation)] + height: (height.0 as u32).into(), + format: 0, + #[allow(clippy::cast_possible_truncation)] + chunks: chunks.len() as u32, + hash: hash.into_iter().collect(), + metadata: Default::default(), + } + }) + .collect(); + + Ok(tm_response::ListSnapshots { snapshots }) + } + } + + /// Load the bytes of a requested chunk and return them + /// to cometbft. + pub fn load_snapshot_chunk( + &self, + req: tm_request::LoadSnapshotChunk, + ) -> Result { + let chunk = DbSnapshot::load_chunk( + BlockHeight(req.height.into()), + u64::from(req.chunk), + &self.base_dir, + ) + .map_err(Error::Snapshot)?; + Ok(tm_response::LoadSnapshotChunk { + chunk: chunk.into_iter().collect(), + }) + } +} diff --git a/crates/node/src/shims/abcipp_shim.rs b/crates/node/src/shims/abcipp_shim.rs index a490b043a5..998292b2c9 100644 --- a/crates/node/src/shims/abcipp_shim.rs +++ b/crates/node/src/shims/abcipp_shim.rs @@ -231,9 +231,7 @@ impl AbcippShim { .expect("Last block should exists") .height; let cfs = db.column_families(); - let path = DbSnapshot::path(last_height, base_dir.clone()); - - snapshot.write_to_file(cfs, &path)?; + snapshot.write_to_file(cfs, base_dir.clone(), last_height)?; DbSnapshot::cleanup(last_height, &base_dir) }); diff --git a/crates/node/src/storage/mod.rs b/crates/node/src/storage/mod.rs index fc8dbe5504..faeb01fb69 100644 --- a/crates/node/src/storage/mod.rs +++ b/crates/node/src/storage/mod.rs @@ -11,7 +11,7 @@ use arse_merkle_tree::H256; use blake2b_rs::{Blake2b, Blake2bBuilder}; use namada::state::StorageHasher; use namada_sdk::state::FullAccessState; -pub use rocksdb::{open, DbSnapshot, RocksDBUpdateVisitor}; +pub use rocksdb::{open, DbSnapshot, RocksDBUpdateVisitor, SnapshotMetadata}; #[derive(Default)] pub struct PersistentStorageHasher(Blake2bHasher); diff --git a/crates/node/src/storage/rocksdb.rs b/crates/node/src/storage/rocksdb.rs index 8f4ce3b6db..e0495a7030 100644 --- a/crates/node/src/storage/rocksdb.rs +++ b/crates/node/src/storage/rocksdb.rs @@ -46,7 +46,7 @@ use std::ffi::OsStr; use std::fs::File; -use std::io::{BufWriter, Write}; +use std::io::{BufRead, BufReader, BufWriter, ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::Mutex; @@ -55,7 +55,7 @@ use borsh::{BorshDeserialize, BorshSerialize}; use borsh_ext::BorshSerializeExt; use data_encoding::HEXLOWER; use itertools::Either; -use namada::core::collections::HashSet; +use namada::core::collections::{HashMap, HashSet}; use namada::core::storage::{BlockHeight, Epoch, Header, Key, KeySeg}; use namada::core::{decode, encode, ethereum_events}; use namada::eth_bridge::storage::proof::BridgePoolRootProof; @@ -740,6 +740,19 @@ impl RocksDB { } } +/// Information about a particular snapshot +/// owned by a node +pub struct SnapshotMetadata { + /// The height at which the snapshot was taken + pub height: BlockHeight, + /// The name of the paths to the file and metadata + /// holding the snapshot minus extensions + pub path_stem: String, + /// Data about the chunks that the snapshot is + /// partitioned into + pub chunks: Vec, +} + pub struct DbSnapshot<'a>(pub rocksdb::Snapshot<'a>); impl<'a> DbSnapshot<'a> { @@ -749,9 +762,11 @@ impl<'a> DbSnapshot<'a> { pub fn write_to_file( &self, cfs: [(&'static str, &'a ColumnFamily); 6], - path: &Path, + base_dir: PathBuf, + height: BlockHeight, ) -> std::io::Result<()> { - let file = File::create(path)?; + let [snap_file, metadata_file] = Self::paths(height, base_dir); + let file = File::create(snap_file)?; let mut buf = BufWriter::new(file); let mut chunker = Chunker::new(MAX_CHUNK_SIZE); for (cf_name, cf) in cfs { @@ -771,11 +786,10 @@ impl<'a> DbSnapshot<'a> { } buf.flush()?; } - let chunks = chunker.finalize(); - let val = base64::encode(chunks.serialize_to_vec()); - let bytes = format!("chunks:{val}"); - buf.write_all(bytes.as_bytes())?; buf.flush()?; + let chunks = chunker.finalize(); + let metadata = base64::encode(chunks.serialize_to_vec()); + std::fs::write(metadata_file, metadata.as_bytes())?; Ok(()) } @@ -784,44 +798,154 @@ impl<'a> DbSnapshot<'a> { latest_height: BlockHeight, base_dir: &Path, ) -> std::io::Result<()> { - let toml = OsStr::new("toml"); + for SnapshotMetadata { + height, path_stem, .. + } in Self::files(base_dir)? + { + if height < latest_height { + let path = PathBuf::from(path_stem); + _ = std::fs::remove_file(&path.with_extension("snap")); + _ = std::fs::remove_file(path.with_extension("meta")); + } + } + Ok(()) + } + + /// List all snapshot files along with the block height at which + /// they were created and their chunks. + pub fn files(base_dir: &Path) -> std::io::Result> { + let snap = OsStr::new("snap"); + let meta = OsStr::new("meta"); + let mut files = + HashMap::, Option>)>::new(); for entry in std::fs::read_dir(base_dir)? { let entry = entry?; - if entry.path().is_file() && Some(toml) == entry.path().extension() + let entry_path = entry.path(); + let entry_ext = entry_path.extension(); + if entry_path.is_file() + && (Some(snap) == entry_ext || Some(meta) == entry_ext) { if let Some(name) = entry.path().file_name() { + // Extract the block height from the file name + // (assuming the file name is of the correct format) let Some(height) = name .to_string_lossy() .strip_prefix("snapshot_") - .and_then(|n| n.strip_suffix(".toml")) + .and_then(|n| { + n.strip_suffix(".meta").or(n.strip_suffix(".snap")) + }) .and_then(|h| BlockHeight::from_str(h).ok()) else { continue; }; - if height < latest_height { - _ = std::fs::remove_file(entry.path()); + // check if we have found the metadata file or snapshot file + // for a given block height + if entry_ext == Some(meta) { + let metadata = std::fs::read_to_string(entry_path)?; + let metadata_bytes = HEXLOWER + .decode(metadata.as_bytes()) + .map_err(|e| { + std::io::Error::new(ErrorKind::InvalidData, e) + })?; + let chunks: Vec = + BorshDeserialize::try_from_slice( + &metadata_bytes[..], + )?; + files.entry(height).or_default().1 = Some(chunks); + } else { + files.entry(height).or_default().0 = Some( + base_dir + .join(format!("snapshot_{}", height)) + .to_string_lossy() + .into(), + ); } }; } } - Ok(()) + let mut res = Vec::with_capacity(files.len()); + for (height, (path, chunks)) in files { + // only include snapshots which have both a .snap and .meta file. + if let Some((path_stem, chunks)) = path.zip(chunks) { + res.push(SnapshotMetadata { + height, + path_stem, + chunks, + }); + } + } + Ok(res) } /// Create a path to save a snapshot at a specific block height. - pub fn path(height: BlockHeight, mut base_dir: PathBuf) -> PathBuf { - base_dir.push(format!("snapshot_{}.toml", height)); - base_dir + pub fn paths(height: BlockHeight, base_dir: PathBuf) -> [PathBuf; 2] { + let snap_file = base_dir.join(format!("snapshot_{}.snap", height)); + let metadata_file = base_dir.join(format!("snapshot_{}.meta", height)); + [snap_file, metadata_file] + } + + /// Load the specified chunk of a snapshot at the given block height + pub fn load_chunk( + height: BlockHeight, + chunk: u64, + base_dir: &Path, + ) -> std::io::Result> { + let files = Self::files(base_dir)?; + let Some(metadata) = files.into_iter().find(|m| m.height == height) + else { + return Err(std::io::Error::new( + ErrorKind::NotFound, + format!( + "Could not find the metadata file for the snapshot at \ + height {}", + height, + ), + )); + }; + let chunk_start = if chunk == 0 { + 0usize + } else { + let prev = checked!(usize::try_from(chunk).unwrap() - 1).unwrap(); + usize::try_from(metadata.chunks[prev].boundary).unwrap() + }; + let chunk_end = metadata + .chunks + .get(usize::try_from(chunk).unwrap()) + .ok_or_else(|| { + std::io::Error::new( + ErrorKind::InvalidInput, + format!("Chunk {} not found", chunk), + ) + })? + .boundary; + let chunk_end = usize::try_from(chunk_end).unwrap(); + + let file = File::open( + PathBuf::from(metadata.path_stem).with_extension("snap"), + )?; + let reader = BufReader::new(file); + let mut bytes: Vec = vec![]; + for line in reader + .lines() + .skip(chunk_start) + .take(checked!(chunk_end - chunk_start).unwrap()) + { + bytes.extend(line?.as_bytes()); + } + Ok(bytes) } } /// A chunk of a snapshot. Includes the last line number in the file /// for this chunk and a hash of the chunk contents. #[derive( - Debug, Clone, Default, PartialEq, Eq, BorshSerialize, BorshDeserialize, + Debug, Clone, Default, PartialEq, Eq, BorshSerialize, BorshDeserialize, Hash, )] -struct Chunk { - boundary: u64, - hash: Hash, +pub struct Chunk { + /// The line number ending the chunk + pub boundary: u64, + /// Sha256 hash of the chunk + pub hash: Hash, } /// Builds a set of chunks from a stream of lines to be @@ -2658,22 +2782,33 @@ mod test { fn test_snapshot_cleanup() { let temp = tempfile::tempdir().expect("Test failed"); let base_dir = temp.path().to_path_buf(); + let chunks = vec![Chunk::default()]; + let chunk_bytes = HEXLOWER.encode(&chunks.serialize_to_vec()); for i in 0..4 { let mut path = base_dir.clone(); - path.push(format!("snapshot_{}.toml", i)); - _ = File::create(path).expect("Test failed") + path.push(format!("snapshot_{}.snap", i)); + _ = File::create(path).expect("Test failed"); + let mut path = base_dir.clone(); + path.push(format!("snapshot_{}.meta", i)); + std::fs::write(&path, chunk_bytes.as_bytes()).expect("Test failed"); } let mut path = base_dir.clone(); - path.push("snapshot_0_backup.toml"); + path.push("snapshot_0_backup.snap"); + _ = File::create(path).expect("Test failed"); + let mut path = base_dir.clone(); + path.push("snapshot_0_backup.meta"); _ = File::create(path).expect("Test failed"); let mut path = base_dir.clone(); path.push("snapshot_0.bak"); _ = File::create(path).expect("Test failed"); DbSnapshot::cleanup(2.into(), &base_dir).expect("Test failed"); let mut expected = HashSet::from([ - "snapshot_2.toml", - "snapshot_3.toml", - "snapshot_0_backup.toml", + "snapshot_2.snap", + "snapshot_2.meta", + "snapshot_3.snap", + "snapshot_3.meta", + "snapshot_0_backup.snap", + "snapshot_0_backup.meta", "snapshot_0.bak", ]); for entry in std::fs::read_dir(base_dir).expect("Test failed") { @@ -2794,12 +2929,14 @@ mod test { let db = open(&temp, true, None).expect("Test failed"); // freeze the database at this point in time let snapshot = db.snapshot(); - let mut path = temp.path().to_path_buf(); - path.push("snapshot_0.toml"); + let path = temp.path().to_path_buf(); + snapshot - .write_to_file(db.column_families(), &path) + .write_to_file(db.column_families(), path.clone(), 0.into()) .expect("Test failed"); - let snapshot = std::fs::read_to_string(path).expect("Test failed"); + let snapshot = + std::fs::read_to_string(path.clone().join("snapshot_0.snap")) + .expect("Test failed"); let chunks = vec![Chunk { boundary: 2, hash: Hash::sha256( @@ -2818,5 +2955,61 @@ mod test { let lines: Vec<&str> = snapshot.split('\n').collect(); assert_eq!(lines, expected); + let metadata = std::fs::read_to_string(path.join("snapshot_0.meta")) + .expect("Test failed"); + assert_eq!(metadata, chunk_val); + } + + /// Test that we load chunks correctly + /// from the snapshot file + #[test] + fn test_load_chunks() { + let temp = tempfile::tempdir().expect("Test failed"); + let mut chunker = Chunker::new(10); + let lines = vec!["fffffggggghh", "aaaa", "bbbbb", "cc", "dddddddd"]; + for l in lines { + chunker.add_line(l); + } + let chunks = chunker.finalize(); + let expected = vec![ + Chunk { + boundary: 1, + hash: Hash::sha256("fffffggggghh"), + }, + Chunk { + boundary: 3, + hash: Hash::sha256("aaaabbbbb".as_bytes()), + }, + Chunk { + boundary: 5, + hash: Hash::sha256("ccdddddddd".as_bytes()), + }, + ]; + assert_eq!(chunks, expected); + let [snap_file, meta_file] = + DbSnapshot::paths(1.into(), temp.path().to_path_buf()); + std::fs::write( + &snap_file, + "fffffggggghh\naaaa\nbbbbb\ncc\ndddddddd".as_bytes(), + ) + .expect("Test failed"); + std::fs::write(meta_file, HEXLOWER.encode(&chunks.serialize_to_vec())) + .expect("Test failed"); + let chunks: Vec<_> = (0..3) + .filter_map(|i| { + DbSnapshot::load_chunk(1.into(), i, temp.path()).ok() + }) + .collect(); + let expected = vec![ + "fffffggggghh".as_bytes().to_vec(), + "aaaabbbbb".as_bytes().to_vec(), + "ccdddddddd".as_bytes().to_vec(), + ]; + assert_eq!(chunks, expected); + + assert!(DbSnapshot::load_chunk(0.into(), 0, temp.path()).is_err()); + assert!(DbSnapshot::load_chunk(0.into(), 4, temp.path()).is_err()); + std::fs::remove_file(snap_file).unwrap(); + assert!(DbSnapshot::load_chunk(0.into(), 0, temp.path()).is_err()); } } From 594d26cc077ff1ebab7c9a039f8f941544fd3235 Mon Sep 17 00:00:00 2001 From: satan Date: Sat, 8 Jun 2024 12:25:38 +0200 Subject: [PATCH 10/11] tinies --- crates/node/src/shell/snapshots.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/node/src/shell/snapshots.rs b/crates/node/src/shell/snapshots.rs index a05d7ed813..2e0ea0f36e 100644 --- a/crates/node/src/shell/snapshots.rs +++ b/crates/node/src/shell/snapshots.rs @@ -7,7 +7,6 @@ use crate::facade::tendermint::abci::types::Snapshot; use crate::facade::tendermint::v0_37::abci::{ request as tm_request, response as tm_response, }; -// use crate::facade::tendermint_proto::v0_37::abci::Snapshot; use crate::shell::Shell; use crate::storage; use crate::storage::{DbSnapshot, SnapshotMetadata}; @@ -27,8 +26,7 @@ impl Shell { .map(|SnapshotMetadata { height, chunks, .. }| { let hash = Hash::sha256(chunks.serialize_to_vec()).0; Snapshot { - #[allow(clippy::cast_possible_truncation)] - height: (height.0 as u32).into(), + height: u32::try_from(height.0).unwrap().into(), format: 0, #[allow(clippy::cast_possible_truncation)] chunks: chunks.len() as u32, From f79afd12290b5815c0a39a7b290350c2d18e77a5 Mon Sep 17 00:00:00 2001 From: satan Date: Wed, 12 Jun 2024 10:26:20 +0200 Subject: [PATCH 11/11] rebasing --- crates/node/src/storage/rocksdb.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/node/src/storage/rocksdb.rs b/crates/node/src/storage/rocksdb.rs index e0495a7030..3091526672 100644 --- a/crates/node/src/storage/rocksdb.rs +++ b/crates/node/src/storage/rocksdb.rs @@ -2945,12 +2945,11 @@ mod test { .as_bytes(), ), }]; - let chunk_val = - format!("chunks:{}", base64::encode(chunks.serialize_to_vec())); + let chunk_val = base64::encode(chunks.serialize_to_vec()); let expected = [ "subspace:bing/fucking/bong=AQ==".to_string(), "rollback:0000000000002/new/bing/fucking/bong=AQ==".to_string(), - chunk_val, + "".to_string(), ]; let lines: Vec<&str> = snapshot.split('\n').collect();