From df8c15abd9a78f80dc8da7870624e2c895c7b3ba Mon Sep 17 00:00:00 2001 From: satan Date: Fri, 7 Jun 2024 12:10:51 +0200 Subject: [PATCH 1/3] Added changelog --- .../3386-listsnapshots-loadsnapshotchunk.md | 7 + crates/node/src/lib.rs | 8 +- crates/node/src/shell/mod.rs | 3 + crates/node/src/shell/snapshots.rs | 61 +++++ crates/node/src/shims/abcipp_shim.rs | 4 +- crates/node/src/storage/mod.rs | 2 +- crates/node/src/storage/rocksdb.rs | 255 +++++++++++++++--- 7 files changed, 301 insertions(+), 39 deletions(-) create mode 100644 .changelog/unreleased/improvements/3386-listsnapshots-loadsnapshotchunk.md create mode 100644 crates/node/src/shell/snapshots.rs diff --git a/.changelog/unreleased/improvements/3386-listsnapshots-loadsnapshotchunk.md b/.changelog/unreleased/improvements/3386-listsnapshots-loadsnapshotchunk.md new file mode 100644 index 0000000000..24fb15fb6d --- /dev/null +++ b/.changelog/unreleased/improvements/3386-listsnapshots-loadsnapshotchunk.md @@ -0,0 +1,7 @@ + - Addresses the third point and part of the fourth point of Issue + [\#3307](https://github.com/anoma/namada/issues/3307) + * Adds chunking logic to snapshots + * Implements the `ListSnapshots` ABCI call + * Implements the `LoadSnapshotChunk` ABCI call + + ([\#3386](https://github.com/anoma/namada/pull/3386)) diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 89e9e3fcdb..aaf4b4be0d 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -153,14 +153,14 @@ impl Shell { Ok(Response::CheckTx(self.mempool_validate(&tx.tx, r#type))) } Request::ListSnapshots => { - Ok(Response::ListSnapshots(Default::default())) + self.list_snapshots().map(Response::ListSnapshots) } Request::OfferSnapshot(_) => { Ok(Response::OfferSnapshot(Default::default())) } - Request::LoadSnapshotChunk(_) => { - Ok(Response::LoadSnapshotChunk(Default::default())) - } + Request::LoadSnapshotChunk(req) => self + .load_snapshot_chunk(req) + .map(Response::LoadSnapshotChunk), Request::ApplySnapshotChunk(_) => { Ok(Response::ApplySnapshotChunk(Default::default())) } diff --git a/crates/node/src/shell/mod.rs b/crates/node/src/shell/mod.rs index 06c9b0ac59..4f9eae585e 100644 --- a/crates/node/src/shell/mod.rs +++ b/crates/node/src/shell/mod.rs @@ -16,6 +16,7 @@ pub mod prepare_proposal; use namada::state::State; pub mod process_proposal; pub(super) mod queries; +mod snapshots; mod stats; #[cfg(any(test, feature = "testing"))] #[allow(dead_code)] @@ -122,6 +123,8 @@ pub enum Error { Storage(#[from] namada::state::StorageError), #[error("Transaction replay attempt: {0}")] ReplayAttempt(String), + #[error("Error with snapshots: {0}")] + Snapshot(std::io::Error), } impl From for TxResult { diff --git a/crates/node/src/shell/snapshots.rs b/crates/node/src/shell/snapshots.rs new file mode 100644 index 0000000000..a05d7ed813 --- /dev/null +++ b/crates/node/src/shell/snapshots.rs @@ -0,0 +1,61 @@ +use borsh_ext::BorshSerializeExt; +use namada::hash::{Hash, Sha256Hasher}; +use namada::state::BlockHeight; + +use super::{Error, Result}; +use crate::facade::tendermint::abci::types::Snapshot; +use crate::facade::tendermint::v0_37::abci::{ + request as tm_request, response as tm_response, +}; +// use crate::facade::tendermint_proto::v0_37::abci::Snapshot; +use crate::shell::Shell; +use crate::storage; +use crate::storage::{DbSnapshot, SnapshotMetadata}; + +impl Shell { + /// List the snapshot files held locally. Furthermore, the number + /// of chunks, as hash of each chunk, and a hash of the chunk + /// metadata are provided so that syncing nodes can verify can verify + /// snapshots they receive. + pub fn list_snapshots(&self) -> Result { + if self.blocks_between_snapshots.is_none() { + Ok(Default::default()) + } else { + let snapshots = DbSnapshot::files(&self.base_dir) + .map_err(Error::Snapshot)? + .into_iter() + .map(|SnapshotMetadata { height, chunks, .. }| { + let hash = Hash::sha256(chunks.serialize_to_vec()).0; + Snapshot { + #[allow(clippy::cast_possible_truncation)] + height: (height.0 as u32).into(), + format: 0, + #[allow(clippy::cast_possible_truncation)] + chunks: chunks.len() as u32, + hash: hash.into_iter().collect(), + metadata: Default::default(), + } + }) + .collect(); + + Ok(tm_response::ListSnapshots { snapshots }) + } + } + + /// Load the bytes of a requested chunk and return them + /// to cometbft. + pub fn load_snapshot_chunk( + &self, + req: tm_request::LoadSnapshotChunk, + ) -> Result { + let chunk = DbSnapshot::load_chunk( + BlockHeight(req.height.into()), + u64::from(req.chunk), + &self.base_dir, + ) + .map_err(Error::Snapshot)?; + Ok(tm_response::LoadSnapshotChunk { + chunk: chunk.into_iter().collect(), + }) + } +} diff --git a/crates/node/src/shims/abcipp_shim.rs b/crates/node/src/shims/abcipp_shim.rs index a490b043a5..998292b2c9 100644 --- a/crates/node/src/shims/abcipp_shim.rs +++ b/crates/node/src/shims/abcipp_shim.rs @@ -231,9 +231,7 @@ impl AbcippShim { .expect("Last block should exists") .height; let cfs = db.column_families(); - let path = DbSnapshot::path(last_height, base_dir.clone()); - - snapshot.write_to_file(cfs, &path)?; + snapshot.write_to_file(cfs, base_dir.clone(), last_height)?; DbSnapshot::cleanup(last_height, &base_dir) }); diff --git a/crates/node/src/storage/mod.rs b/crates/node/src/storage/mod.rs index fc8dbe5504..faeb01fb69 100644 --- a/crates/node/src/storage/mod.rs +++ b/crates/node/src/storage/mod.rs @@ -11,7 +11,7 @@ use arse_merkle_tree::H256; use blake2b_rs::{Blake2b, Blake2bBuilder}; use namada::state::StorageHasher; use namada_sdk::state::FullAccessState; -pub use rocksdb::{open, DbSnapshot, RocksDBUpdateVisitor}; +pub use rocksdb::{open, DbSnapshot, RocksDBUpdateVisitor, SnapshotMetadata}; #[derive(Default)] pub struct PersistentStorageHasher(Blake2bHasher); diff --git a/crates/node/src/storage/rocksdb.rs b/crates/node/src/storage/rocksdb.rs index 8f4ce3b6db..e0495a7030 100644 --- a/crates/node/src/storage/rocksdb.rs +++ b/crates/node/src/storage/rocksdb.rs @@ -46,7 +46,7 @@ use std::ffi::OsStr; use std::fs::File; -use std::io::{BufWriter, Write}; +use std::io::{BufRead, BufReader, BufWriter, ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::Mutex; @@ -55,7 +55,7 @@ use borsh::{BorshDeserialize, BorshSerialize}; use borsh_ext::BorshSerializeExt; use data_encoding::HEXLOWER; use itertools::Either; -use namada::core::collections::HashSet; +use namada::core::collections::{HashMap, HashSet}; use namada::core::storage::{BlockHeight, Epoch, Header, Key, KeySeg}; use namada::core::{decode, encode, ethereum_events}; use namada::eth_bridge::storage::proof::BridgePoolRootProof; @@ -740,6 +740,19 @@ impl RocksDB { } } +/// Information about a particular snapshot +/// owned by a node +pub struct SnapshotMetadata { + /// The height at which the snapshot was taken + pub height: BlockHeight, + /// The name of the paths to the file and metadata + /// holding the snapshot minus extensions + pub path_stem: String, + /// Data about the chunks that the snapshot is + /// partitioned into + pub chunks: Vec, +} + pub struct DbSnapshot<'a>(pub rocksdb::Snapshot<'a>); impl<'a> DbSnapshot<'a> { @@ -749,9 +762,11 @@ impl<'a> DbSnapshot<'a> { pub fn write_to_file( &self, cfs: [(&'static str, &'a ColumnFamily); 6], - path: &Path, + base_dir: PathBuf, + height: BlockHeight, ) -> std::io::Result<()> { - let file = File::create(path)?; + let [snap_file, metadata_file] = Self::paths(height, base_dir); + let file = File::create(snap_file)?; let mut buf = BufWriter::new(file); let mut chunker = Chunker::new(MAX_CHUNK_SIZE); for (cf_name, cf) in cfs { @@ -771,11 +786,10 @@ impl<'a> DbSnapshot<'a> { } buf.flush()?; } - let chunks = chunker.finalize(); - let val = base64::encode(chunks.serialize_to_vec()); - let bytes = format!("chunks:{val}"); - buf.write_all(bytes.as_bytes())?; buf.flush()?; + let chunks = chunker.finalize(); + let metadata = base64::encode(chunks.serialize_to_vec()); + std::fs::write(metadata_file, metadata.as_bytes())?; Ok(()) } @@ -784,44 +798,154 @@ impl<'a> DbSnapshot<'a> { latest_height: BlockHeight, base_dir: &Path, ) -> std::io::Result<()> { - let toml = OsStr::new("toml"); + for SnapshotMetadata { + height, path_stem, .. + } in Self::files(base_dir)? + { + if height < latest_height { + let path = PathBuf::from(path_stem); + _ = std::fs::remove_file(&path.with_extension("snap")); + _ = std::fs::remove_file(path.with_extension("meta")); + } + } + Ok(()) + } + + /// List all snapshot files along with the block height at which + /// they were created and their chunks. + pub fn files(base_dir: &Path) -> std::io::Result> { + let snap = OsStr::new("snap"); + let meta = OsStr::new("meta"); + let mut files = + HashMap::, Option>)>::new(); for entry in std::fs::read_dir(base_dir)? { let entry = entry?; - if entry.path().is_file() && Some(toml) == entry.path().extension() + let entry_path = entry.path(); + let entry_ext = entry_path.extension(); + if entry_path.is_file() + && (Some(snap) == entry_ext || Some(meta) == entry_ext) { if let Some(name) = entry.path().file_name() { + // Extract the block height from the file name + // (assuming the file name is of the correct format) let Some(height) = name .to_string_lossy() .strip_prefix("snapshot_") - .and_then(|n| n.strip_suffix(".toml")) + .and_then(|n| { + n.strip_suffix(".meta").or(n.strip_suffix(".snap")) + }) .and_then(|h| BlockHeight::from_str(h).ok()) else { continue; }; - if height < latest_height { - _ = std::fs::remove_file(entry.path()); + // check if we have found the metadata file or snapshot file + // for a given block height + if entry_ext == Some(meta) { + let metadata = std::fs::read_to_string(entry_path)?; + let metadata_bytes = HEXLOWER + .decode(metadata.as_bytes()) + .map_err(|e| { + std::io::Error::new(ErrorKind::InvalidData, e) + })?; + let chunks: Vec = + BorshDeserialize::try_from_slice( + &metadata_bytes[..], + )?; + files.entry(height).or_default().1 = Some(chunks); + } else { + files.entry(height).or_default().0 = Some( + base_dir + .join(format!("snapshot_{}", height)) + .to_string_lossy() + .into(), + ); } }; } } - Ok(()) + let mut res = Vec::with_capacity(files.len()); + for (height, (path, chunks)) in files { + // only include snapshots which have both a .snap and .meta file. + if let Some((path_stem, chunks)) = path.zip(chunks) { + res.push(SnapshotMetadata { + height, + path_stem, + chunks, + }); + } + } + Ok(res) } /// Create a path to save a snapshot at a specific block height. - pub fn path(height: BlockHeight, mut base_dir: PathBuf) -> PathBuf { - base_dir.push(format!("snapshot_{}.toml", height)); - base_dir + pub fn paths(height: BlockHeight, base_dir: PathBuf) -> [PathBuf; 2] { + let snap_file = base_dir.join(format!("snapshot_{}.snap", height)); + let metadata_file = base_dir.join(format!("snapshot_{}.meta", height)); + [snap_file, metadata_file] + } + + /// Load the specified chunk of a snapshot at the given block height + pub fn load_chunk( + height: BlockHeight, + chunk: u64, + base_dir: &Path, + ) -> std::io::Result> { + let files = Self::files(base_dir)?; + let Some(metadata) = files.into_iter().find(|m| m.height == height) + else { + return Err(std::io::Error::new( + ErrorKind::NotFound, + format!( + "Could not find the metadata file for the snapshot at \ + height {}", + height, + ), + )); + }; + let chunk_start = if chunk == 0 { + 0usize + } else { + let prev = checked!(usize::try_from(chunk).unwrap() - 1).unwrap(); + usize::try_from(metadata.chunks[prev].boundary).unwrap() + }; + let chunk_end = metadata + .chunks + .get(usize::try_from(chunk).unwrap()) + .ok_or_else(|| { + std::io::Error::new( + ErrorKind::InvalidInput, + format!("Chunk {} not found", chunk), + ) + })? + .boundary; + let chunk_end = usize::try_from(chunk_end).unwrap(); + + let file = File::open( + PathBuf::from(metadata.path_stem).with_extension("snap"), + )?; + let reader = BufReader::new(file); + let mut bytes: Vec = vec![]; + for line in reader + .lines() + .skip(chunk_start) + .take(checked!(chunk_end - chunk_start).unwrap()) + { + bytes.extend(line?.as_bytes()); + } + Ok(bytes) } } /// A chunk of a snapshot. Includes the last line number in the file /// for this chunk and a hash of the chunk contents. #[derive( - Debug, Clone, Default, PartialEq, Eq, BorshSerialize, BorshDeserialize, + Debug, Clone, Default, PartialEq, Eq, BorshSerialize, BorshDeserialize, Hash, )] -struct Chunk { - boundary: u64, - hash: Hash, +pub struct Chunk { + /// The line number ending the chunk + pub boundary: u64, + /// Sha256 hash of the chunk + pub hash: Hash, } /// Builds a set of chunks from a stream of lines to be @@ -2658,22 +2782,33 @@ mod test { fn test_snapshot_cleanup() { let temp = tempfile::tempdir().expect("Test failed"); let base_dir = temp.path().to_path_buf(); + let chunks = vec![Chunk::default()]; + let chunk_bytes = HEXLOWER.encode(&chunks.serialize_to_vec()); for i in 0..4 { let mut path = base_dir.clone(); - path.push(format!("snapshot_{}.toml", i)); - _ = File::create(path).expect("Test failed") + path.push(format!("snapshot_{}.snap", i)); + _ = File::create(path).expect("Test failed"); + let mut path = base_dir.clone(); + path.push(format!("snapshot_{}.meta", i)); + std::fs::write(&path, chunk_bytes.as_bytes()).expect("Test failed"); } let mut path = base_dir.clone(); - path.push("snapshot_0_backup.toml"); + path.push("snapshot_0_backup.snap"); + _ = File::create(path).expect("Test failed"); + let mut path = base_dir.clone(); + path.push("snapshot_0_backup.meta"); _ = File::create(path).expect("Test failed"); let mut path = base_dir.clone(); path.push("snapshot_0.bak"); _ = File::create(path).expect("Test failed"); DbSnapshot::cleanup(2.into(), &base_dir).expect("Test failed"); let mut expected = HashSet::from([ - "snapshot_2.toml", - "snapshot_3.toml", - "snapshot_0_backup.toml", + "snapshot_2.snap", + "snapshot_2.meta", + "snapshot_3.snap", + "snapshot_3.meta", + "snapshot_0_backup.snap", + "snapshot_0_backup.meta", "snapshot_0.bak", ]); for entry in std::fs::read_dir(base_dir).expect("Test failed") { @@ -2794,12 +2929,14 @@ mod test { let db = open(&temp, true, None).expect("Test failed"); // freeze the database at this point in time let snapshot = db.snapshot(); - let mut path = temp.path().to_path_buf(); - path.push("snapshot_0.toml"); + let path = temp.path().to_path_buf(); + snapshot - .write_to_file(db.column_families(), &path) + .write_to_file(db.column_families(), path.clone(), 0.into()) .expect("Test failed"); - let snapshot = std::fs::read_to_string(path).expect("Test failed"); + let snapshot = + std::fs::read_to_string(path.clone().join("snapshot_0.snap")) + .expect("Test failed"); let chunks = vec![Chunk { boundary: 2, hash: Hash::sha256( @@ -2818,5 +2955,61 @@ mod test { let lines: Vec<&str> = snapshot.split('\n').collect(); assert_eq!(lines, expected); + let metadata = std::fs::read_to_string(path.join("snapshot_0.meta")) + .expect("Test failed"); + assert_eq!(metadata, chunk_val); + } + + /// Test that we load chunks correctly + /// from the snapshot file + #[test] + fn test_load_chunks() { + let temp = tempfile::tempdir().expect("Test failed"); + let mut chunker = Chunker::new(10); + let lines = vec!["fffffggggghh", "aaaa", "bbbbb", "cc", "dddddddd"]; + for l in lines { + chunker.add_line(l); + } + let chunks = chunker.finalize(); + let expected = vec![ + Chunk { + boundary: 1, + hash: Hash::sha256("fffffggggghh"), + }, + Chunk { + boundary: 3, + hash: Hash::sha256("aaaabbbbb".as_bytes()), + }, + Chunk { + boundary: 5, + hash: Hash::sha256("ccdddddddd".as_bytes()), + }, + ]; + assert_eq!(chunks, expected); + let [snap_file, meta_file] = + DbSnapshot::paths(1.into(), temp.path().to_path_buf()); + std::fs::write( + &snap_file, + "fffffggggghh\naaaa\nbbbbb\ncc\ndddddddd".as_bytes(), + ) + .expect("Test failed"); + std::fs::write(meta_file, HEXLOWER.encode(&chunks.serialize_to_vec())) + .expect("Test failed"); + let chunks: Vec<_> = (0..3) + .filter_map(|i| { + DbSnapshot::load_chunk(1.into(), i, temp.path()).ok() + }) + .collect(); + let expected = vec![ + "fffffggggghh".as_bytes().to_vec(), + "aaaabbbbb".as_bytes().to_vec(), + "ccdddddddd".as_bytes().to_vec(), + ]; + assert_eq!(chunks, expected); + + assert!(DbSnapshot::load_chunk(0.into(), 0, temp.path()).is_err()); + assert!(DbSnapshot::load_chunk(0.into(), 4, temp.path()).is_err()); + std::fs::remove_file(snap_file).unwrap(); + assert!(DbSnapshot::load_chunk(0.into(), 0, temp.path()).is_err()); } } From 594d26cc077ff1ebab7c9a039f8f941544fd3235 Mon Sep 17 00:00:00 2001 From: satan Date: Sat, 8 Jun 2024 12:25:38 +0200 Subject: [PATCH 2/3] tinies --- crates/node/src/shell/snapshots.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/node/src/shell/snapshots.rs b/crates/node/src/shell/snapshots.rs index a05d7ed813..2e0ea0f36e 100644 --- a/crates/node/src/shell/snapshots.rs +++ b/crates/node/src/shell/snapshots.rs @@ -7,7 +7,6 @@ use crate::facade::tendermint::abci::types::Snapshot; use crate::facade::tendermint::v0_37::abci::{ request as tm_request, response as tm_response, }; -// use crate::facade::tendermint_proto::v0_37::abci::Snapshot; use crate::shell::Shell; use crate::storage; use crate::storage::{DbSnapshot, SnapshotMetadata}; @@ -27,8 +26,7 @@ impl Shell { .map(|SnapshotMetadata { height, chunks, .. }| { let hash = Hash::sha256(chunks.serialize_to_vec()).0; Snapshot { - #[allow(clippy::cast_possible_truncation)] - height: (height.0 as u32).into(), + height: u32::try_from(height.0).unwrap().into(), format: 0, #[allow(clippy::cast_possible_truncation)] chunks: chunks.len() as u32, From f79afd12290b5815c0a39a7b290350c2d18e77a5 Mon Sep 17 00:00:00 2001 From: satan Date: Wed, 12 Jun 2024 10:26:20 +0200 Subject: [PATCH 3/3] rebasing --- crates/node/src/storage/rocksdb.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/node/src/storage/rocksdb.rs b/crates/node/src/storage/rocksdb.rs index e0495a7030..3091526672 100644 --- a/crates/node/src/storage/rocksdb.rs +++ b/crates/node/src/storage/rocksdb.rs @@ -2945,12 +2945,11 @@ mod test { .as_bytes(), ), }]; - let chunk_val = - format!("chunks:{}", base64::encode(chunks.serialize_to_vec())); + let chunk_val = base64::encode(chunks.serialize_to_vec()); let expected = [ "subspace:bing/fucking/bong=AQ==".to_string(), "rollback:0000000000002/new/bing/fucking/bong=AQ==".to_string(), - chunk_val, + "".to_string(), ]; let lines: Vec<&str> = snapshot.split('\n').collect();