From 3939e349344899480f756a4b6c4c8f654366daed Mon Sep 17 00:00:00 2001 From: yito88 Date: Mon, 20 Mar 2023 16:56:00 +0100 Subject: [PATCH 1/3] prune older merkle tree stores --- apps/src/lib/node/ledger/shell/mod.rs | 9 ++- apps/src/lib/node/ledger/storage/mod.rs | 69 +++++++++++++++++++++ apps/src/lib/node/ledger/storage/rocksdb.rs | 24 +++++++ core/src/ledger/storage/mockdb.rs | 22 +++++++ core/src/ledger/storage/mod.rs | 41 ++++++++++++ core/src/types/storage.rs | 20 ++++++ 6 files changed, 183 insertions(+), 2 deletions(-) diff --git a/apps/src/lib/node/ledger/shell/mod.rs b/apps/src/lib/node/ledger/shell/mod.rs index 35f505897c..f941232c1a 100644 --- a/apps/src/lib/node/ledger/shell/mod.rs +++ b/apps/src/lib/node/ledger/shell/mod.rs @@ -254,8 +254,13 @@ where .expect("Creating directory for Namada should not fail"); } // load last state from storage - let mut storage = - Storage::open(db_path, chain_id.clone(), native_token, db_cache); + let mut storage = Storage::open( + db_path, + chain_id.clone(), + native_token, + db_cache, + config.shell.storage_read_past_height_limit, + ); storage .load_last_state() .map_err(|e| { diff --git a/apps/src/lib/node/ledger/storage/mod.rs b/apps/src/lib/node/ledger/storage/mod.rs index f0a82b9cde..739d7cbdfa 100644 --- a/apps/src/lib/node/ledger/storage/mod.rs +++ b/apps/src/lib/node/ledger/storage/mod.rs @@ -74,6 +74,7 @@ mod tests { ChainId::default(), address::nam(), None, + None, ); let key = Key::parse("key").expect("cannot parse the key string"); let value: u64 = 1; @@ -121,6 +122,7 @@ mod tests { ChainId::default(), address::nam(), None, + None, ); storage .begin_block(BlockHash::default(), BlockHeight(100)) @@ -149,6 +151,7 @@ mod tests { ChainId::default(), address::nam(), None, + None, ); storage .load_last_state() @@ -172,6 +175,7 @@ mod tests { ChainId::default(), address::nam(), None, + None, ); storage .begin_block(BlockHash::default(), BlockHeight(100)) @@ -216,6 +220,7 @@ mod tests { ChainId::default(), address::nam(), None, + None, ); storage .begin_block(BlockHash::default(), BlockHeight(100)) @@ -279,6 +284,7 @@ mod tests { ChainId::default(), address::nam(), None, + None, ); // 1. For each `blocks_write_value`, write the current block height if @@ -367,6 +373,7 @@ mod tests { ChainId::default(), address::nam(), None, + None, ); let num_keys = 5; @@ -469,6 +476,67 @@ mod tests { Ok(()) } + /// Test the restore of the merkle tree + #[test] + fn test_prune_merkle_tree_stores() { + let db_path = + TempDir::new().expect("Unable to create a temporary DB directory"); + let mut storage = PersistentStorage::open( + db_path.path(), + ChainId::default(), + address::nam(), + None, + Some(5), + ); + storage + .begin_block(BlockHash::default(), BlockHeight(1)) + .expect("begin_block failed"); + + let key = Key::parse("key").expect("cannot parse the key string"); + let value: u64 = 1; + storage + .write(&key, types::encode(&value)) + .expect("write failed"); + + storage.block.epoch = storage.block.epoch.next(); + storage.block.pred_epochs.new_epoch(BlockHeight(1), 1000); + storage.commit_block().expect("commit failed"); + + storage + .begin_block(BlockHash::default(), BlockHeight(6)) + .expect("begin_block failed"); + + let key = Key::parse("key2").expect("cannot parse the key string"); + let value: u64 = 2; + storage + .write(&key, types::encode(&value)) + .expect("write failed"); + + storage.block.epoch = storage.block.epoch.next(); + storage.block.pred_epochs.new_epoch(BlockHeight(6), 1000); + storage.commit_block().expect("commit failed"); + + let result = storage.get_merkle_tree(1.into()); + assert!(result.is_ok(), "The tree at Height 1 should be restored"); + + storage + .begin_block(BlockHash::default(), BlockHeight(11)) + .expect("begin_block failed"); + storage.block.epoch = storage.block.epoch.next(); + storage.block.pred_epochs.new_epoch(BlockHeight(11), 1000); + storage.commit_block().expect("commit failed"); + + let result = storage.get_merkle_tree(1.into()); + assert!(result.is_err(), "The tree at Height 1 should be pruned"); + let result = storage.get_merkle_tree(5.into()); + assert!( + result.is_err(), + "The tree at Height 5 shouldn't be able to be restored" + ); + let result = storage.get_merkle_tree(6.into()); + assert!(result.is_ok(), "The tree should be restored"); + } + /// Test the prefix iterator with RocksDB. #[test] fn test_persistent_storage_prefix_iter() { @@ -479,6 +547,7 @@ mod tests { ChainId::default(), address::nam(), None, + None, ); let mut storage = WlStorage { storage, diff --git a/apps/src/lib/node/ledger/storage/rocksdb.rs b/apps/src/lib/node/ledger/storage/rocksdb.rs index 677956f255..88f7a679d2 100644 --- a/apps/src/lib/node/ledger/storage/rocksdb.rs +++ b/apps/src/lib/node/ledger/storage/rocksdb.rs @@ -971,6 +971,30 @@ impl DB for RocksDB { Ok(prev_len) } + + fn prune_merkle_tree_stores(&mut self, height: BlockHeight) -> Result<()> { + let mut batch = WriteBatch::default(); + let prefix_key = Key::from(height.to_db_key()) + .push(&"tree".to_owned()) + .map_err(Error::KeyError)?; + for st in StoreType::iter() { + if *st != StoreType::Base { + let prefix_key = prefix_key + .push(&st.to_string()) + .map_err(Error::KeyError)?; + let root_key = prefix_key + .push(&"root".to_owned()) + .map_err(Error::KeyError)?; + batch.delete(root_key.to_string()); + let store_key = prefix_key + .push(&"store".to_owned()) + .map_err(Error::KeyError)?; + batch.delete(store_key.to_string()); + } + } + + self.exec_batch(batch) + } } impl<'iter> DBIter<'iter> for RocksDB { diff --git a/core/src/ledger/storage/mockdb.rs b/core/src/ledger/storage/mockdb.rs index 47447dc79b..d28d05acda 100644 --- a/core/src/ledger/storage/mockdb.rs +++ b/core/src/ledger/storage/mockdb.rs @@ -441,6 +441,28 @@ impl DB for MockDB { None => 0, }) } + + fn prune_merkle_tree_stores(&mut self, height: BlockHeight) -> Result<()> { + let prefix_key = Key::from(height.to_db_key()) + .push(&"tree".to_owned()) + .map_err(Error::KeyError)?; + for st in StoreType::iter() { + if *st != StoreType::Base { + let prefix_key = prefix_key + .push(&st.to_string()) + .map_err(Error::KeyError)?; + let root_key = prefix_key + .push(&"root".to_owned()) + .map_err(Error::KeyError)?; + self.0.borrow_mut().remove(&root_key.to_string()); + let store_key = prefix_key + .push(&"store".to_owned()) + .map_err(Error::KeyError)?; + self.0.borrow_mut().remove(&store_key.to_string()); + } + } + Ok(()) + } } impl<'iter> DBIter<'iter> for MockDB { diff --git a/core/src/ledger/storage/mod.rs b/core/src/ledger/storage/mod.rs index c86f46d27a..2431950ae6 100644 --- a/core/src/ledger/storage/mod.rs +++ b/core/src/ledger/storage/mod.rs @@ -90,6 +90,8 @@ where /// Wrapper txs to be decrypted in the next block proposal #[cfg(feature = "ferveo-tpke")] pub tx_queue: TxQueue, + /// How many block heights in the past can the storage be queried + pub storage_read_past_height_limit: Option, } /// The block storage data @@ -285,6 +287,9 @@ pub trait DB: std::fmt::Debug { height: BlockHeight, key: &Key, ) -> Result; + + /// Prune old Merkle tree stores + fn prune_merkle_tree_stores(&mut self, height: BlockHeight) -> Result<()>; } /// A database prefix iterator. @@ -334,6 +339,7 @@ where chain_id: ChainId, native_token: Address, cache: Option<&D::Cache>, + storage_read_past_height_limit: Option, ) -> Self { let block = BlockStorage { tree: MerkleTree::default(), @@ -360,6 +366,7 @@ where #[cfg(feature = "ferveo-tpke")] tx_queue: TxQueue::default(), native_token, + storage_read_past_height_limit, } } @@ -454,6 +461,10 @@ where self.last_height = self.block.height; self.last_epoch = self.block.epoch; self.header = None; + if is_full_commit { + // prune old merkle tree stores + self.prune_merkle_tree_stores()?; + } Ok(()) } @@ -889,6 +900,35 @@ where self.db .batch_delete_subspace_val(batch, self.block.height, key) } + + // Prune merkle tree stores. Use after updating self.block.height in the + // commit. + fn prune_merkle_tree_stores(&mut self) -> Result<()> { + if let Some(limit) = self.storage_read_past_height_limit { + if self.last_height.0 <= limit { + return Ok(()); + } + + let min_height = (self.last_height.0 - limit).into(); + if let Some(epoch) = self.block.pred_epochs.get_epoch(min_height) { + if epoch.0 == 0 { + return Ok(()); + } + // get the start height of the previous epoch because the Merkle + // tree stores at the starting height of the epoch would be used + // to restore stores at a height (> min_height) in the epoch + if let Some(pruned_height) = self + .block + .pred_epochs + .get_start_height_of_epoch(epoch.prev()) + { + self.db.prune_merkle_tree_stores(pruned_height)?; + } + } + } + + Ok(()) + } } impl From for Error { @@ -944,6 +984,7 @@ pub mod testing { #[cfg(feature = "ferveo-tpke")] tx_queue: TxQueue::default(), native_token: address::nam(), + storage_read_past_height_limit: Some(1000), } } } diff --git a/core/src/types/storage.rs b/core/src/types/storage.rs index 527d9cd777..f6da9a4bdb 100644 --- a/core/src/types/storage.rs +++ b/core/src/types/storage.rs @@ -1120,6 +1120,26 @@ impl Epochs { } None } + + /// Look-up the starting block height of the given epoch + pub fn get_start_height_of_epoch( + &self, + epoch: Epoch, + ) -> Option { + if epoch < self.first_known_epoch { + return None; + } + + let mut cur_epoch = self.first_known_epoch; + for height in &self.first_block_heights { + if epoch == cur_epoch { + return Some(*height); + } else { + cur_epoch = cur_epoch.next(); + } + } + None + } } /// A value of a storage prefix iterator. From 12f220795e16919da6abbba0f5e91724a4d355ee Mon Sep 17 00:00:00 2001 From: yito88 Date: Wed, 22 Mar 2023 10:32:12 +0100 Subject: [PATCH 2/3] DB prune function given an epoch --- apps/src/lib/node/ledger/storage/rocksdb.rs | 48 ++++++++++++--------- core/src/ledger/storage/mockdb.rs | 46 ++++++++++++-------- core/src/ledger/storage/mod.rs | 27 ++++++------ 3 files changed, 71 insertions(+), 50 deletions(-) diff --git a/apps/src/lib/node/ledger/storage/rocksdb.rs b/apps/src/lib/node/ledger/storage/rocksdb.rs index 88f7a679d2..a358138102 100644 --- a/apps/src/lib/node/ledger/storage/rocksdb.rs +++ b/apps/src/lib/node/ledger/storage/rocksdb.rs @@ -41,7 +41,7 @@ use namada::ledger::storage::{ }; use namada::types::internal::TxQueue; use namada::types::storage::{ - BlockHeight, BlockResults, Epochs, Header, Key, KeySeg, + BlockHeight, BlockResults, Epoch, Epochs, Header, Key, KeySeg, KEY_SEGMENT_SEPARATOR, }; use namada::types::time::DateTimeUtc; @@ -972,28 +972,36 @@ impl DB for RocksDB { Ok(prev_len) } - fn prune_merkle_tree_stores(&mut self, height: BlockHeight) -> Result<()> { - let mut batch = WriteBatch::default(); - let prefix_key = Key::from(height.to_db_key()) - .push(&"tree".to_owned()) - .map_err(Error::KeyError)?; - for st in StoreType::iter() { - if *st != StoreType::Base { - let prefix_key = prefix_key - .push(&st.to_string()) - .map_err(Error::KeyError)?; - let root_key = prefix_key - .push(&"root".to_owned()) - .map_err(Error::KeyError)?; - batch.delete(root_key.to_string()); - let store_key = prefix_key - .push(&"store".to_owned()) + fn prune_merkle_tree_stores( + &mut self, + epoch: Epoch, + pred_epochs: &Epochs, + ) -> Result<()> { + match pred_epochs.get_start_height_of_epoch(epoch) { + Some(height) => { + let mut batch = WriteBatch::default(); + let prefix_key = Key::from(height.to_db_key()) + .push(&"tree".to_owned()) .map_err(Error::KeyError)?; - batch.delete(store_key.to_string()); + for st in StoreType::iter() { + if *st != StoreType::Base { + let prefix_key = prefix_key + .push(&st.to_string()) + .map_err(Error::KeyError)?; + let root_key = prefix_key + .push(&"root".to_owned()) + .map_err(Error::KeyError)?; + batch.delete(root_key.to_string()); + let store_key = prefix_key + .push(&"store".to_owned()) + .map_err(Error::KeyError)?; + batch.delete(store_key.to_string()); + } + } + self.exec_batch(batch) } + None => Ok(()), } - - self.exec_batch(batch) } } diff --git a/core/src/ledger/storage/mockdb.rs b/core/src/ledger/storage/mockdb.rs index d28d05acda..585edb1381 100644 --- a/core/src/ledger/storage/mockdb.rs +++ b/core/src/ledger/storage/mockdb.rs @@ -16,7 +16,8 @@ use crate::ledger::storage::types::{self, KVBytes, PrefixIterator}; #[cfg(feature = "ferveo-tpke")] use crate::types::internal::TxQueue; use crate::types::storage::{ - BlockHeight, BlockResults, Header, Key, KeySeg, KEY_SEGMENT_SEPARATOR, + BlockHeight, BlockResults, Epoch, Epochs, Header, Key, KeySeg, + KEY_SEGMENT_SEPARATOR, }; use crate::types::time::DateTimeUtc; @@ -442,26 +443,35 @@ impl DB for MockDB { }) } - fn prune_merkle_tree_stores(&mut self, height: BlockHeight) -> Result<()> { - let prefix_key = Key::from(height.to_db_key()) - .push(&"tree".to_owned()) - .map_err(Error::KeyError)?; - for st in StoreType::iter() { - if *st != StoreType::Base { - let prefix_key = prefix_key - .push(&st.to_string()) - .map_err(Error::KeyError)?; - let root_key = prefix_key - .push(&"root".to_owned()) - .map_err(Error::KeyError)?; - self.0.borrow_mut().remove(&root_key.to_string()); - let store_key = prefix_key - .push(&"store".to_owned()) + fn prune_merkle_tree_stores( + &mut self, + epoch: Epoch, + pred_epochs: &Epochs, + ) -> Result<()> { + match pred_epochs.get_start_height_of_epoch(epoch) { + Some(height) => { + let prefix_key = Key::from(height.to_db_key()) + .push(&"tree".to_owned()) .map_err(Error::KeyError)?; - self.0.borrow_mut().remove(&store_key.to_string()); + for st in StoreType::iter() { + if *st != StoreType::Base { + let prefix_key = prefix_key + .push(&st.to_string()) + .map_err(Error::KeyError)?; + let root_key = prefix_key + .push(&"root".to_owned()) + .map_err(Error::KeyError)?; + self.0.borrow_mut().remove(&root_key.to_string()); + let store_key = prefix_key + .push(&"store".to_owned()) + .map_err(Error::KeyError)?; + self.0.borrow_mut().remove(&store_key.to_string()); + } + } + Ok(()) } + None => Ok(()), } - Ok(()) } } diff --git a/core/src/ledger/storage/mod.rs b/core/src/ledger/storage/mod.rs index 2431950ae6..6f1d7852fd 100644 --- a/core/src/ledger/storage/mod.rs +++ b/core/src/ledger/storage/mod.rs @@ -288,8 +288,12 @@ pub trait DB: std::fmt::Debug { key: &Key, ) -> Result; - /// Prune old Merkle tree stores - fn prune_merkle_tree_stores(&mut self, height: BlockHeight) -> Result<()>; + /// Prune Merkle tree stores at the given epoch + fn prune_merkle_tree_stores( + &mut self, + pruned_epoch: Epoch, + pred_epochs: &Epochs, + ) -> Result<()>; } /// A database prefix iterator. @@ -913,16 +917,15 @@ where if let Some(epoch) = self.block.pred_epochs.get_epoch(min_height) { if epoch.0 == 0 { return Ok(()); - } - // get the start height of the previous epoch because the Merkle - // tree stores at the starting height of the epoch would be used - // to restore stores at a height (> min_height) in the epoch - if let Some(pruned_height) = self - .block - .pred_epochs - .get_start_height_of_epoch(epoch.prev()) - { - self.db.prune_merkle_tree_stores(pruned_height)?; + } else { + // get the start height of the previous epoch because the + // Merkle tree stores at the starting + // height of the epoch would be used + // to restore stores at a height (> min_height) in the epoch + self.db.prune_merkle_tree_stores( + epoch.prev(), + &self.block.pred_epochs, + )?; } } } From 9b76f077d9fd5f3f9e79460bc9586d00b51fcbed Mon Sep 17 00:00:00 2001 From: yito88 Date: Mon, 27 Mar 2023 18:08:47 +0200 Subject: [PATCH 3/3] fix tests for new epoch --- apps/src/lib/node/ledger/shell/mod.rs | 40 +++++++------------------ apps/src/lib/node/ledger/storage/mod.rs | 23 +++++++++----- 2 files changed, 25 insertions(+), 38 deletions(-) diff --git a/apps/src/lib/node/ledger/shell/mod.rs b/apps/src/lib/node/ledger/shell/mod.rs index f941232c1a..485d62538b 100644 --- a/apps/src/lib/node/ledger/shell/mod.rs +++ b/apps/src/lib/node/ledger/shell/mod.rs @@ -776,14 +776,11 @@ mod test_utils { use std::path::PathBuf; use namada::ledger::storage::mockdb::MockDB; - use namada::ledger::storage::{BlockStateWrite, MerkleTree, Sha256Hasher}; - use namada::types::address::EstablishedAddressGen; + use namada::ledger::storage::{update_allowed_conversions, Sha256Hasher}; use namada::types::chain::ChainId; use namada::types::hash::Hash; use namada::types::key::*; - use namada::types::storage::{ - BlockHash, BlockResults, Epoch, Epochs, Header, - }; + use namada::types::storage::{BlockHash, Epoch, Epochs, Header}; use namada::types::transaction::{Fee, WrapperTx}; use tempfile::tempdir; use tokio::sync::mpsc::UnboundedReceiver; @@ -1001,6 +998,11 @@ mod test_utils { tx_wasm_compilation_cache, native_token.clone(), ); + shell + .wl_storage + .storage + .begin_block(BlockHash::default(), BlockHeight(1)) + .expect("begin_block failed"); let keypair = gen_keypair(); // enqueue a wrapper tx let tx = Tx::new( @@ -1027,33 +1029,11 @@ mod test_utils { }); // Artificially increase the block height so that chain // will read the new block when restarted - let merkle_tree = MerkleTree::::default(); - let stores = merkle_tree.stores(); - let hash = BlockHash([0; 32]); let mut pred_epochs: Epochs = Default::default(); pred_epochs.new_epoch(BlockHeight(1), 1000); - let address_gen = EstablishedAddressGen::new("test"); - shell - .wl_storage - .storage - .db - .write_block( - BlockStateWrite { - merkle_tree_stores: stores, - header: None, - hash: &hash, - height: BlockHeight(1), - epoch: Epoch(1), - pred_epochs: &pred_epochs, - next_epoch_min_start_height: BlockHeight(3), - next_epoch_min_start_time: DateTimeUtc::now(), - address_gen: &address_gen, - results: &BlockResults::default(), - tx_queue: &shell.wl_storage.storage.tx_queue, - }, - true, - ) - .expect("Test failed"); + update_allowed_conversions(&mut shell.wl_storage) + .expect("update conversions failed"); + shell.wl_storage.commit_block().expect("commit failed"); // Drop the shell std::mem::drop(shell); diff --git a/apps/src/lib/node/ledger/storage/mod.rs b/apps/src/lib/node/ledger/storage/mod.rs index 739d7cbdfa..d4c619fe5f 100644 --- a/apps/src/lib/node/ledger/storage/mod.rs +++ b/apps/src/lib/node/ledger/storage/mod.rs @@ -53,7 +53,10 @@ mod tests { use std::collections::HashMap; use itertools::Itertools; - use namada::ledger::storage::{types, WlStorage}; + use namada::ledger::storage::write_log::WriteLog; + use namada::ledger::storage::{ + types, update_allowed_conversions, WlStorage, + }; use namada::ledger::storage_api::{self, StorageWrite}; use namada::types::chain::ChainId; use namada::types::storage::{BlockHash, BlockHeight, Key}; @@ -137,13 +140,17 @@ mod tests { .expect("write failed"); storage.block.epoch = storage.block.epoch.next(); storage.block.pred_epochs.new_epoch(BlockHeight(100), 1000); - storage.commit_block().expect("commit failed"); - - // save the last state and drop the storage - let root = storage.merkle_root().0; - let hash = storage.get_block_hash().0; - let address_gen = storage.address_gen.clone(); - drop(storage); + // make wl_storage to update conversion for a new epoch + let mut wl_storage = WlStorage::new(WriteLog::default(), storage); + update_allowed_conversions(&mut wl_storage) + .expect("update conversions failed"); + wl_storage.commit_block().expect("commit failed"); + + // save the last state and the storage + let root = wl_storage.storage.merkle_root().0; + let hash = wl_storage.storage.get_block_hash().0; + let address_gen = wl_storage.storage.address_gen.clone(); + drop(wl_storage); // load the last state let mut storage = PersistentStorage::open(