Skip to content

Commit

Permalink
Merge remote-tracking branch 'namada/yuji/prune_tree_stores' (#1237) …
Browse files Browse the repository at this point in the history
…into maint-0.14

* namada/yuji/prune_tree_stores:
  DB prune function given an epoch
  prune older merkle tree stores
  • Loading branch information
juped committed Mar 28, 2023
2 parents a8d17bb + 9b76f07 commit a93a77f
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 42 deletions.
49 changes: 17 additions & 32 deletions apps/src/lib/node/ledger/shell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,13 @@ where
.expect("Creating directory for Namada should not fail");
}
// load last state from storage
let mut storage =
Storage::open(db_path, chain_id.clone(), native_token, db_cache);
let mut storage = Storage::open(
db_path,
chain_id.clone(),
native_token,
db_cache,
config.shell.storage_read_past_height_limit,
);
storage
.load_last_state()
.map_err(|e| {
Expand Down Expand Up @@ -770,14 +775,11 @@ mod test_utils {
use std::path::PathBuf;

use namada::ledger::storage::mockdb::MockDB;
use namada::ledger::storage::{BlockStateWrite, MerkleTree, Sha256Hasher};
use namada::types::address::EstablishedAddressGen;
use namada::ledger::storage::{update_allowed_conversions, Sha256Hasher};
use namada::types::chain::ChainId;
use namada::types::hash::Hash;
use namada::types::key::*;
use namada::types::storage::{
BlockHash, BlockResults, Epoch, Epochs, Header,
};
use namada::types::storage::{BlockHash, Epoch, Epochs, Header};
use namada::types::transaction::{Fee, WrapperTx};
use tempfile::tempdir;
use tokio::sync::mpsc::UnboundedReceiver;
Expand Down Expand Up @@ -995,6 +997,11 @@ mod test_utils {
tx_wasm_compilation_cache,
native_token.clone(),
);
shell
.wl_storage
.storage
.begin_block(BlockHash::default(), BlockHeight(1))
.expect("begin_block failed");
let keypair = gen_keypair();
// enqueue a wrapper tx
let tx = Tx::new(
Expand All @@ -1021,33 +1028,11 @@ mod test_utils {
});
// Artificially increase the block height so that chain
// will read the new block when restarted
let merkle_tree = MerkleTree::<Sha256Hasher>::default();
let stores = merkle_tree.stores();
let hash = BlockHash([0; 32]);
let mut pred_epochs: Epochs = Default::default();
pred_epochs.new_epoch(BlockHeight(1), 1000);
let address_gen = EstablishedAddressGen::new("test");
shell
.wl_storage
.storage
.db
.write_block(
BlockStateWrite {
merkle_tree_stores: stores,
header: None,
hash: &hash,
height: BlockHeight(1),
epoch: Epoch(1),
pred_epochs: &pred_epochs,
next_epoch_min_start_height: BlockHeight(3),
next_epoch_min_start_time: DateTimeUtc::now(),
address_gen: &address_gen,
results: &BlockResults::default(),
tx_queue: &shell.wl_storage.storage.tx_queue,
},
true,
)
.expect("Test failed");
update_allowed_conversions(&mut shell.wl_storage)
.expect("update conversions failed");
shell.wl_storage.commit_block().expect("commit failed");

// Drop the shell
std::mem::drop(shell);
Expand Down
92 changes: 84 additions & 8 deletions apps/src/lib/node/ledger/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ mod tests {
use std::collections::HashMap;

use itertools::Itertools;
use namada::ledger::storage::{types, WlStorage};
use namada::ledger::storage::write_log::WriteLog;
use namada::ledger::storage::{
types, update_allowed_conversions, WlStorage,
};
use namada::ledger::storage_api::{self, StorageWrite};
use namada::types::chain::ChainId;
use namada::types::storage::{BlockHash, BlockHeight, Key};
Expand All @@ -74,6 +77,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
let key = Key::parse("key").expect("cannot parse the key string");
let value: u64 = 1;
Expand Down Expand Up @@ -121,6 +125,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
storage
.begin_block(BlockHash::default(), BlockHeight(100))
Expand All @@ -135,20 +140,25 @@ mod tests {
.expect("write failed");
storage.block.epoch = storage.block.epoch.next();
storage.block.pred_epochs.new_epoch(BlockHeight(100), 1000);
storage.commit_block().expect("commit failed");

// save the last state and drop the storage
let root = storage.merkle_root().0;
let hash = storage.get_block_hash().0;
let address_gen = storage.address_gen.clone();
drop(storage);
// make wl_storage to update conversion for a new epoch
let mut wl_storage = WlStorage::new(WriteLog::default(), storage);
update_allowed_conversions(&mut wl_storage)
.expect("update conversions failed");
wl_storage.commit_block().expect("commit failed");

// save the last state and the storage
let root = wl_storage.storage.merkle_root().0;
let hash = wl_storage.storage.get_block_hash().0;
let address_gen = wl_storage.storage.address_gen.clone();
drop(wl_storage);

// load the last state
let mut storage = PersistentStorage::open(
db_path.path(),
ChainId::default(),
address::nam(),
None,
None,
);
storage
.load_last_state()
Expand All @@ -172,6 +182,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
storage
.begin_block(BlockHash::default(), BlockHeight(100))
Expand Down Expand Up @@ -216,6 +227,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
storage
.begin_block(BlockHash::default(), BlockHeight(100))
Expand Down Expand Up @@ -279,6 +291,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);

// 1. For each `blocks_write_value`, write the current block height if
Expand Down Expand Up @@ -367,6 +380,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);

let num_keys = 5;
Expand Down Expand Up @@ -469,6 +483,67 @@ mod tests {
Ok(())
}

/// Test the restore of the merkle tree
#[test]
fn test_prune_merkle_tree_stores() {
let db_path =
TempDir::new().expect("Unable to create a temporary DB directory");
let mut storage = PersistentStorage::open(
db_path.path(),
ChainId::default(),
address::nam(),
None,
Some(5),
);
storage
.begin_block(BlockHash::default(), BlockHeight(1))
.expect("begin_block failed");

let key = Key::parse("key").expect("cannot parse the key string");
let value: u64 = 1;
storage
.write(&key, types::encode(&value))
.expect("write failed");

storage.block.epoch = storage.block.epoch.next();
storage.block.pred_epochs.new_epoch(BlockHeight(1), 1000);
storage.commit_block().expect("commit failed");

storage
.begin_block(BlockHash::default(), BlockHeight(6))
.expect("begin_block failed");

let key = Key::parse("key2").expect("cannot parse the key string");
let value: u64 = 2;
storage
.write(&key, types::encode(&value))
.expect("write failed");

storage.block.epoch = storage.block.epoch.next();
storage.block.pred_epochs.new_epoch(BlockHeight(6), 1000);
storage.commit_block().expect("commit failed");

let result = storage.get_merkle_tree(1.into());
assert!(result.is_ok(), "The tree at Height 1 should be restored");

storage
.begin_block(BlockHash::default(), BlockHeight(11))
.expect("begin_block failed");
storage.block.epoch = storage.block.epoch.next();
storage.block.pred_epochs.new_epoch(BlockHeight(11), 1000);
storage.commit_block().expect("commit failed");

let result = storage.get_merkle_tree(1.into());
assert!(result.is_err(), "The tree at Height 1 should be pruned");
let result = storage.get_merkle_tree(5.into());
assert!(
result.is_err(),
"The tree at Height 5 shouldn't be able to be restored"
);
let result = storage.get_merkle_tree(6.into());
assert!(result.is_ok(), "The tree should be restored");
}

/// Test the prefix iterator with RocksDB.
#[test]
fn test_persistent_storage_prefix_iter() {
Expand All @@ -479,6 +554,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
let mut storage = WlStorage {
storage,
Expand Down
34 changes: 33 additions & 1 deletion apps/src/lib/node/ledger/storage/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use namada::ledger::storage::{
};
use namada::types::internal::TxQueue;
use namada::types::storage::{
BlockHeight, BlockResults, Epochs, Header, Key, KeySeg,
BlockHeight, BlockResults, Epoch, Epochs, Header, Key, KeySeg,
KEY_SEGMENT_SEPARATOR,
};
use namada::types::time::DateTimeUtc;
Expand Down Expand Up @@ -971,6 +971,38 @@ impl DB for RocksDB {

Ok(prev_len)
}

fn prune_merkle_tree_stores(
&mut self,
epoch: Epoch,
pred_epochs: &Epochs,
) -> Result<()> {
match pred_epochs.get_start_height_of_epoch(epoch) {
Some(height) => {
let mut batch = WriteBatch::default();
let prefix_key = Key::from(height.to_db_key())
.push(&"tree".to_owned())
.map_err(Error::KeyError)?;
for st in StoreType::iter() {
if *st != StoreType::Base {
let prefix_key = prefix_key
.push(&st.to_string())
.map_err(Error::KeyError)?;
let root_key = prefix_key
.push(&"root".to_owned())
.map_err(Error::KeyError)?;
batch.delete(root_key.to_string());
let store_key = prefix_key
.push(&"store".to_owned())
.map_err(Error::KeyError)?;
batch.delete(store_key.to_string());
}
}
self.exec_batch(batch)
}
None => Ok(()),
}
}
}

impl<'iter> DBIter<'iter> for RocksDB {
Expand Down
34 changes: 33 additions & 1 deletion core/src/ledger/storage/mockdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use crate::ledger::storage::types::{self, KVBytes, PrefixIterator};
#[cfg(feature = "ferveo-tpke")]
use crate::types::internal::TxQueue;
use crate::types::storage::{
BlockHeight, BlockResults, Header, Key, KeySeg, KEY_SEGMENT_SEPARATOR,
BlockHeight, BlockResults, Epoch, Epochs, Header, Key, KeySeg,
KEY_SEGMENT_SEPARATOR,
};
use crate::types::time::DateTimeUtc;

Expand Down Expand Up @@ -441,6 +442,37 @@ impl DB for MockDB {
None => 0,
})
}

fn prune_merkle_tree_stores(
&mut self,
epoch: Epoch,
pred_epochs: &Epochs,
) -> Result<()> {
match pred_epochs.get_start_height_of_epoch(epoch) {
Some(height) => {
let prefix_key = Key::from(height.to_db_key())
.push(&"tree".to_owned())
.map_err(Error::KeyError)?;
for st in StoreType::iter() {
if *st != StoreType::Base {
let prefix_key = prefix_key
.push(&st.to_string())
.map_err(Error::KeyError)?;
let root_key = prefix_key
.push(&"root".to_owned())
.map_err(Error::KeyError)?;
self.0.borrow_mut().remove(&root_key.to_string());
let store_key = prefix_key
.push(&"store".to_owned())
.map_err(Error::KeyError)?;
self.0.borrow_mut().remove(&store_key.to_string());
}
}
Ok(())
}
None => Ok(()),
}
}
}

impl<'iter> DBIter<'iter> for MockDB {
Expand Down
Loading

0 comments on commit a93a77f

Please sign in to comment.