Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune Merkle tree stores #1237

Merged
merged 3 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions apps/src/lib/node/ledger/shell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,13 @@ where
.expect("Creating directory for Namada should not fail");
}
// load last state from storage
let mut storage =
Storage::open(db_path, chain_id.clone(), native_token, db_cache);
let mut storage = Storage::open(
db_path,
chain_id.clone(),
native_token,
db_cache,
config.shell.storage_read_past_height_limit,
);
storage
.load_last_state()
.map_err(|e| {
Expand Down
69 changes: 69 additions & 0 deletions apps/src/lib/node/ledger/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
let key = Key::parse("key").expect("cannot parse the key string");
let value: u64 = 1;
Expand Down Expand Up @@ -121,6 +122,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
storage
.begin_block(BlockHash::default(), BlockHeight(100))
Expand Down Expand Up @@ -149,6 +151,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
storage
.load_last_state()
Expand All @@ -172,6 +175,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
storage
.begin_block(BlockHash::default(), BlockHeight(100))
Expand Down Expand Up @@ -216,6 +220,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
storage
.begin_block(BlockHash::default(), BlockHeight(100))
Expand Down Expand Up @@ -279,6 +284,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);

// 1. For each `blocks_write_value`, write the current block height if
Expand Down Expand Up @@ -367,6 +373,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);

let num_keys = 5;
Expand Down Expand Up @@ -469,6 +476,67 @@ mod tests {
Ok(())
}

/// Test the restore of the merkle tree
#[test]
fn test_prune_merkle_tree_stores() {
let db_path =
TempDir::new().expect("Unable to create a temporary DB directory");
let mut storage = PersistentStorage::open(
db_path.path(),
ChainId::default(),
address::nam(),
None,
Some(5),
);
storage
.begin_block(BlockHash::default(), BlockHeight(1))
.expect("begin_block failed");

let key = Key::parse("key").expect("cannot parse the key string");
let value: u64 = 1;
storage
.write(&key, types::encode(&value))
.expect("write failed");

storage.block.epoch = storage.block.epoch.next();
storage.block.pred_epochs.new_epoch(BlockHeight(1), 1000);
storage.commit_block().expect("commit failed");

storage
.begin_block(BlockHash::default(), BlockHeight(6))
.expect("begin_block failed");

let key = Key::parse("key2").expect("cannot parse the key string");
let value: u64 = 2;
storage
.write(&key, types::encode(&value))
.expect("write failed");

storage.block.epoch = storage.block.epoch.next();
storage.block.pred_epochs.new_epoch(BlockHeight(6), 1000);
storage.commit_block().expect("commit failed");

let result = storage.get_merkle_tree(1.into());
assert!(result.is_ok(), "The tree at Height 1 should be restored");

storage
.begin_block(BlockHash::default(), BlockHeight(11))
.expect("begin_block failed");
storage.block.epoch = storage.block.epoch.next();
storage.block.pred_epochs.new_epoch(BlockHeight(11), 1000);
storage.commit_block().expect("commit failed");

let result = storage.get_merkle_tree(1.into());
assert!(result.is_err(), "The tree at Height 1 should be pruned");
let result = storage.get_merkle_tree(5.into());
assert!(
result.is_err(),
"The tree at Height 5 shouldn't be able to be restored"
);
let result = storage.get_merkle_tree(6.into());
assert!(result.is_ok(), "The tree should be restored");
}

/// Test the prefix iterator with RocksDB.
#[test]
fn test_persistent_storage_prefix_iter() {
Expand All @@ -479,6 +547,7 @@ mod tests {
ChainId::default(),
address::nam(),
None,
None,
);
let mut storage = WlStorage {
storage,
Expand Down
24 changes: 24 additions & 0 deletions apps/src/lib/node/ledger/storage/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,30 @@ impl DB for RocksDB {

Ok(prev_len)
}

fn prune_merkle_tree_stores(&mut self, height: BlockHeight) -> Result<()> {
let mut batch = WriteBatch::default();
let prefix_key = Key::from(height.to_db_key())
.push(&"tree".to_owned())
.map_err(Error::KeyError)?;
for st in StoreType::iter() {
if *st != StoreType::Base {
let prefix_key = prefix_key
.push(&st.to_string())
.map_err(Error::KeyError)?;
let root_key = prefix_key
.push(&"root".to_owned())
.map_err(Error::KeyError)?;
batch.delete(root_key.to_string());
let store_key = prefix_key
.push(&"store".to_owned())
.map_err(Error::KeyError)?;
batch.delete(store_key.to_string());
}
}

self.exec_batch(batch)
}
}

impl<'iter> DBIter<'iter> for RocksDB {
Expand Down
22 changes: 22 additions & 0 deletions core/src/ledger/storage/mockdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,28 @@ impl DB for MockDB {
None => 0,
})
}

fn prune_merkle_tree_stores(&mut self, height: BlockHeight) -> Result<()> {
let prefix_key = Key::from(height.to_db_key())
.push(&"tree".to_owned())
.map_err(Error::KeyError)?;
for st in StoreType::iter() {
if *st != StoreType::Base {
let prefix_key = prefix_key
.push(&st.to_string())
.map_err(Error::KeyError)?;
let root_key = prefix_key
.push(&"root".to_owned())
.map_err(Error::KeyError)?;
self.0.borrow_mut().remove(&root_key.to_string());
let store_key = prefix_key
.push(&"store".to_owned())
.map_err(Error::KeyError)?;
self.0.borrow_mut().remove(&store_key.to_string());
}
}
Ok(())
}
}

impl<'iter> DBIter<'iter> for MockDB {
Expand Down
41 changes: 41 additions & 0 deletions core/src/ledger/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ where
/// Wrapper txs to be decrypted in the next block proposal
#[cfg(feature = "ferveo-tpke")]
pub tx_queue: TxQueue,
/// How many block heights in the past can the storage be queried
pub storage_read_past_height_limit: Option<u64>,
}

/// The block storage data
Expand Down Expand Up @@ -285,6 +287,9 @@ pub trait DB: std::fmt::Debug {
height: BlockHeight,
key: &Key,
) -> Result<i64>;

/// Prune old Merkle tree stores
fn prune_merkle_tree_stores(&mut self, height: BlockHeight) -> Result<()>;
}

/// A database prefix iterator.
Expand Down Expand Up @@ -334,6 +339,7 @@ where
chain_id: ChainId,
native_token: Address,
cache: Option<&D::Cache>,
storage_read_past_height_limit: Option<u64>,
) -> Self {
let block = BlockStorage {
tree: MerkleTree::default(),
Expand All @@ -360,6 +366,7 @@ where
#[cfg(feature = "ferveo-tpke")]
tx_queue: TxQueue::default(),
native_token,
storage_read_past_height_limit,
}
}

Expand Down Expand Up @@ -454,6 +461,10 @@ where
self.last_height = self.block.height;
self.last_epoch = self.block.epoch;
self.header = None;
if is_full_commit {
// prune old merkle tree stores
self.prune_merkle_tree_stores()?;
}
Ok(())
}

Expand Down Expand Up @@ -889,6 +900,35 @@ where
self.db
.batch_delete_subspace_val(batch, self.block.height, key)
}

// Prune merkle tree stores. Use after updating self.block.height in the
// commit.
fn prune_merkle_tree_stores(&mut self) -> Result<()> {
if let Some(limit) = self.storage_read_past_height_limit {
if self.last_height.0 <= limit {
return Ok(());
}

let min_height = (self.last_height.0 - limit).into();
if let Some(epoch) = self.block.pred_epochs.get_epoch(min_height) {
if epoch.0 == 0 {
return Ok(());
}
// get the start height of the previous epoch because the Merkle
// tree stores at the starting height of the epoch would be used
// to restore stores at a height (> min_height) in the epoch
if let Some(pruned_height) = self
.block
.pred_epochs
.get_start_height_of_epoch(epoch.prev())
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it might not be better to have the interface prune by epoch rather than block? i.e., prune_merkle_tree_stores(10) to prune all of epoch 10's blocks (moving the logic in get_start_height_of_epoch() to prune_merkle_tree_stores()).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds better. Thanks!

self.db.prune_merkle_tree_stores(pruned_height)?;
}
}
}

Ok(())
}
}

impl From<MerkleTreeError> for Error {
Expand Down Expand Up @@ -944,6 +984,7 @@ pub mod testing {
#[cfg(feature = "ferveo-tpke")]
tx_queue: TxQueue::default(),
native_token: address::nam(),
storage_read_past_height_limit: Some(1000),
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions core/src/types/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,26 @@ impl Epochs {
}
None
}

/// Look-up the starting block height of the given epoch
pub fn get_start_height_of_epoch(
&self,
epoch: Epoch,
) -> Option<BlockHeight> {
if epoch < self.first_known_epoch {
return None;
}

let mut cur_epoch = self.first_known_epoch;
for height in &self.first_block_heights {
if epoch == cur_epoch {
return Some(*height);
} else {
cur_epoch = cur_epoch.next();
}
}
None
}
}

/// A value of a storage prefix iterator.
Expand Down