Skip to content

Commit

Permalink
Merge remote-tracking branch 'namada/yuji/prune_tree_stores' (#1237) …
Browse files Browse the repository at this point in the history
…into maint-0.14

* namada/yuji/prune_tree_stores:
  DB prune function given an epoch
  prune older merkle tree stores
  • Loading branch information
juped committed Mar 23, 2023
2 parents a8d17bb + 12f2207 commit 37e053d
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 4 deletions.
9 changes: 7 additions & 2 deletions apps/src/lib/node/ledger/shell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,13 @@ where
.expect("Creating directory for Namada should not fail");
}
// load last state from storage
let mut storage =
Storage::open(db_path, chain_id.clone(), native_token, db_cache);
let mut storage = Storage::open(
db_path,
chain_id.clone(),
native_token,
db_cache,
config.shell.storage_read_past_height_limit,
);
storage
.load_last_state()
.map_err(|e| {
Expand Down
69 changes: 69 additions & 0 deletions apps/src/lib/node/ledger/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
let key = Key::parse("key").expect("cannot parse the key string");
let value: u64 = 1;
Expand Down Expand Up @@ -121,6 +122,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
storage
.begin_block(BlockHash::default(), BlockHeight(100))
Expand Down Expand Up @@ -149,6 +151,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
storage
.load_last_state()
Expand All @@ -172,6 +175,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
storage
.begin_block(BlockHash::default(), BlockHeight(100))
Expand Down Expand Up @@ -216,6 +220,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
storage
.begin_block(BlockHash::default(), BlockHeight(100))
Expand Down Expand Up @@ -279,6 +284,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);

// 1. For each `blocks_write_value`, write the current block height if
Expand Down Expand Up @@ -367,6 +373,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);

let num_keys = 5;
Expand Down Expand Up @@ -469,6 +476,67 @@ mod tests {
Ok(())
}

/// Test the restore of the merkle tree
#[test]
fn test_prune_merkle_tree_stores() {
let db_path =
TempDir::new().expect("Unable to create a temporary DB directory");
let mut storage = PersistentStorage::open(
db_path.path(),
ChainId::default(),
address::nam(),
None,
Some(5),
);
storage
.begin_block(BlockHash::default(), BlockHeight(1))
.expect("begin_block failed");

let key = Key::parse("key").expect("cannot parse the key string");
let value: u64 = 1;
storage
.write(&key, types::encode(&value))
.expect("write failed");

storage.block.epoch = storage.block.epoch.next();
storage.block.pred_epochs.new_epoch(BlockHeight(1), 1000);
storage.commit_block().expect("commit failed");

storage
.begin_block(BlockHash::default(), BlockHeight(6))
.expect("begin_block failed");

let key = Key::parse("key2").expect("cannot parse the key string");
let value: u64 = 2;
storage
.write(&key, types::encode(&value))
.expect("write failed");

storage.block.epoch = storage.block.epoch.next();
storage.block.pred_epochs.new_epoch(BlockHeight(6), 1000);
storage.commit_block().expect("commit failed");

let result = storage.get_merkle_tree(1.into());
assert!(result.is_ok(), "The tree at Height 1 should be restored");

storage
.begin_block(BlockHash::default(), BlockHeight(11))
.expect("begin_block failed");
storage.block.epoch = storage.block.epoch.next();
storage.block.pred_epochs.new_epoch(BlockHeight(11), 1000);
storage.commit_block().expect("commit failed");

let result = storage.get_merkle_tree(1.into());
assert!(result.is_err(), "The tree at Height 1 should be pruned");
let result = storage.get_merkle_tree(5.into());
assert!(
result.is_err(),
"The tree at Height 5 shouldn't be able to be restored"
);
let result = storage.get_merkle_tree(6.into());
assert!(result.is_ok(), "The tree should be restored");
}

/// Test the prefix iterator with RocksDB.
#[test]
fn test_persistent_storage_prefix_iter() {
Expand All @@ -479,6 +547,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
let mut storage = WlStorage {
storage,
Expand Down
34 changes: 33 additions & 1 deletion apps/src/lib/node/ledger/storage/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use namada::ledger::storage::{
};
use namada::types::internal::TxQueue;
use namada::types::storage::{
BlockHeight, BlockResults, Epochs, Header, Key, KeySeg,
BlockHeight, BlockResults, Epoch, Epochs, Header, Key, KeySeg,
KEY_SEGMENT_SEPARATOR,
};
use namada::types::time::DateTimeUtc;
Expand Down Expand Up @@ -971,6 +971,38 @@ impl DB for RocksDB {

Ok(prev_len)
}

fn prune_merkle_tree_stores(
&mut self,
epoch: Epoch,
pred_epochs: &Epochs,
) -> Result<()> {
match pred_epochs.get_start_height_of_epoch(epoch) {
Some(height) => {
let mut batch = WriteBatch::default();
let prefix_key = Key::from(height.to_db_key())
.push(&"tree".to_owned())
.map_err(Error::KeyError)?;
for st in StoreType::iter() {
if *st != StoreType::Base {
let prefix_key = prefix_key
.push(&st.to_string())
.map_err(Error::KeyError)?;
let root_key = prefix_key
.push(&"root".to_owned())
.map_err(Error::KeyError)?;
batch.delete(root_key.to_string());
let store_key = prefix_key
.push(&"store".to_owned())
.map_err(Error::KeyError)?;
batch.delete(store_key.to_string());
}
}
self.exec_batch(batch)
}
None => Ok(()),
}
}
}

impl<'iter> DBIter<'iter> for RocksDB {
Expand Down
34 changes: 33 additions & 1 deletion core/src/ledger/storage/mockdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use crate::ledger::storage::types::{self, KVBytes, PrefixIterator};
#[cfg(feature = "ferveo-tpke")]
use crate::types::internal::TxQueue;
use crate::types::storage::{
BlockHeight, BlockResults, Header, Key, KeySeg, KEY_SEGMENT_SEPARATOR,
BlockHeight, BlockResults, Epoch, Epochs, Header, Key, KeySeg,
KEY_SEGMENT_SEPARATOR,
};
use crate::types::time::DateTimeUtc;

Expand Down Expand Up @@ -441,6 +442,37 @@ impl DB for MockDB {
None => 0,
})
}

fn prune_merkle_tree_stores(
&mut self,
epoch: Epoch,
pred_epochs: &Epochs,
) -> Result<()> {
match pred_epochs.get_start_height_of_epoch(epoch) {
Some(height) => {
let prefix_key = Key::from(height.to_db_key())
.push(&"tree".to_owned())
.map_err(Error::KeyError)?;
for st in StoreType::iter() {
if *st != StoreType::Base {
let prefix_key = prefix_key
.push(&st.to_string())
.map_err(Error::KeyError)?;
let root_key = prefix_key
.push(&"root".to_owned())
.map_err(Error::KeyError)?;
self.0.borrow_mut().remove(&root_key.to_string());
let store_key = prefix_key
.push(&"store".to_owned())
.map_err(Error::KeyError)?;
self.0.borrow_mut().remove(&store_key.to_string());
}
}
Ok(())
}
None => Ok(()),
}
}
}

impl<'iter> DBIter<'iter> for MockDB {
Expand Down
44 changes: 44 additions & 0 deletions core/src/ledger/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ where
/// Wrapper txs to be decrypted in the next block proposal
#[cfg(feature = "ferveo-tpke")]
pub tx_queue: TxQueue,
/// How many block heights in the past can the storage be queried
pub storage_read_past_height_limit: Option<u64>,
}

/// The block storage data
Expand Down Expand Up @@ -285,6 +287,13 @@ pub trait DB: std::fmt::Debug {
height: BlockHeight,
key: &Key,
) -> Result<i64>;

/// Prune Merkle tree stores at the given epoch
fn prune_merkle_tree_stores(
&mut self,
pruned_epoch: Epoch,
pred_epochs: &Epochs,
) -> Result<()>;
}

/// A database prefix iterator.
Expand Down Expand Up @@ -334,6 +343,7 @@ where
chain_id: ChainId,
native_token: Address,
cache: Option<&D::Cache>,
storage_read_past_height_limit: Option<u64>,
) -> Self {
let block = BlockStorage {
tree: MerkleTree::default(),
Expand All @@ -360,6 +370,7 @@ where
#[cfg(feature = "ferveo-tpke")]
tx_queue: TxQueue::default(),
native_token,
storage_read_past_height_limit,
}
}

Expand Down Expand Up @@ -454,6 +465,10 @@ where
self.last_height = self.block.height;
self.last_epoch = self.block.epoch;
self.header = None;
if is_full_commit {
// prune old merkle tree stores
self.prune_merkle_tree_stores()?;
}
Ok(())
}

Expand Down Expand Up @@ -855,6 +870,34 @@ where
self.db
.batch_delete_subspace_val(batch, self.block.height, key)
}

// Prune merkle tree stores. Use after updating self.block.height in the
// commit.
fn prune_merkle_tree_stores(&mut self) -> Result<()> {
if let Some(limit) = self.storage_read_past_height_limit {
if self.last_height.0 <= limit {
return Ok(());
}

let min_height = (self.last_height.0 - limit).into();
if let Some(epoch) = self.block.pred_epochs.get_epoch(min_height) {
if epoch.0 == 0 {
return Ok(());
} else {
// get the start height of the previous epoch because the
// Merkle tree stores at the starting
// height of the epoch would be used
// to restore stores at a height (> min_height) in the epoch
self.db.prune_merkle_tree_stores(
epoch.prev(),
&self.block.pred_epochs,
)?;
}
}
}

Ok(())
}
}

impl From<MerkleTreeError> for Error {
Expand Down Expand Up @@ -910,6 +953,7 @@ pub mod testing {
#[cfg(feature = "ferveo-tpke")]
tx_queue: TxQueue::default(),
native_token: address::nam(),
storage_read_past_height_limit: Some(1000),
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions core/src/types/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,26 @@ impl Epochs {
}
None
}

/// Look-up the starting block height of the given epoch
pub fn get_start_height_of_epoch(
&self,
epoch: Epoch,
) -> Option<BlockHeight> {
if epoch < self.first_known_epoch {
return None;
}

let mut cur_epoch = self.first_known_epoch;
for height in &self.first_block_heights {
if epoch == cur_epoch {
return Some(*height);
} else {
cur_epoch = cur_epoch.next();
}
}
None
}
}

/// A value of a storage prefix iterator.
Expand Down

0 comments on commit 37e053d

Please sign in to comment.