Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write Merkle tree stores only when a new epoch #1113

Merged
merged 11 commits into from
Mar 28, 2023
2 changes: 2 additions & 0 deletions .changelog/unreleased/improvements/1113-write-tree-stores.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Write Merkle tree stores only when a new epoch
([#1113](https://github.com/anoma/namada/issues/1113))
36 changes: 21 additions & 15 deletions apps/src/lib/node/ledger/shell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,9 @@ mod test_utils {
use namada::types::chain::ChainId;
use namada::types::hash::Hash;
use namada::types::key::*;
use namada::types::storage::{BlockHash, BlockResults, Epoch, Header};
use namada::types::storage::{
BlockHash, BlockResults, Epoch, Epochs, Header,
};
use namada::types::transaction::{Fee, WrapperTx};
use tempfile::tempdir;
use tokio::sync::mpsc::UnboundedReceiver;
Expand Down Expand Up @@ -1023,25 +1025,29 @@ mod test_utils {
let merkle_tree = MerkleTree::<Sha256Hasher>::default();
let stores = merkle_tree.stores();
let hash = BlockHash([0; 32]);
let pred_epochs = Default::default();
let mut pred_epochs: Epochs = Default::default();
pred_epochs.new_epoch(BlockHeight(1), 1000);
let address_gen = EstablishedAddressGen::new("test");
shell
.wl_storage
.storage
.db
.write_block(BlockStateWrite {
merkle_tree_stores: stores,
header: None,
hash: &hash,
height: BlockHeight(1),
epoch: Epoch(0),
pred_epochs: &pred_epochs,
next_epoch_min_start_height: BlockHeight(3),
next_epoch_min_start_time: DateTimeUtc::now(),
address_gen: &address_gen,
results: &BlockResults::default(),
tx_queue: &shell.wl_storage.storage.tx_queue,
})
.write_block(
BlockStateWrite {
merkle_tree_stores: stores,
header: None,
hash: &hash,
height: BlockHeight(1),
epoch: Epoch(1),
pred_epochs: &pred_epochs,
next_epoch_min_start_height: BlockHeight(3),
next_epoch_min_start_time: DateTimeUtc::now(),
address_gen: &address_gen,
results: &BlockResults::default(),
tx_queue: &shell.wl_storage.storage.tx_queue,
},
true,
)
.expect("Test failed");

// Drop the shell
Expand Down
122 changes: 122 additions & 0 deletions apps/src/lib/node/ledger/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ fn new_blake2b() -> Blake2b {

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use itertools::Itertools;
use namada::ledger::storage::{types, WlStorage};
use namada::ledger::storage_api::{self, StorageWrite};
Expand Down Expand Up @@ -131,6 +133,8 @@ mod tests {
storage
.write(&key, value_bytes.clone())
.expect("write failed");
storage.block.epoch = storage.block.epoch.next();
storage.block.pred_epochs.new_epoch(BlockHeight(100), 1000);
storage.commit_block().expect("commit failed");

// save the last state and drop the storage
Expand Down Expand Up @@ -246,6 +250,11 @@ mod tests {
fn test_read_with_height(blocks_write_value in vec(any::<bool>(), 20)) {
test_read_with_height_aux(blocks_write_value).unwrap()
}

#[test]
fn test_get_merkle_tree(blocks_write_type in vec(0..5_u64, 50)) {
test_get_merkle_tree_aux(blocks_write_type).unwrap()
}
}

/// Test reads at arbitrary block heights.
Expand Down Expand Up @@ -347,6 +356,119 @@ mod tests {
Ok(())
}

/// Test the restore of the merkle tree
fn test_get_merkle_tree_aux(
blocks_write_type: Vec<u64>,
) -> namada::ledger::storage::Result<()> {
let db_path =
TempDir::new().expect("Unable to create a temporary DB directory");
let mut storage = PersistentStorage::open(
db_path.path(),
ChainId::default(),
address::nam(),
None,
);

let num_keys = 5;
let blocks_write_type = blocks_write_type.into_iter().enumerate().map(
|(index, write_type)| {
// try to update some keys at each height
let height = BlockHeight::from(index as u64 / num_keys + 1);
let key = Key::parse(format!("key{}", index as u64 % num_keys))
.unwrap();
(height, key, write_type)
},
);

let mut roots = HashMap::new();

// write values at Height 0 like init_storage
for i in 0..num_keys {
let key = Key::parse(format!("key{}", i)).unwrap();
let value_bytes = types::encode(&storage.block.height);
storage.write(&key, value_bytes)?;
}

// Update and commit
let hash = BlockHash::default();
storage.begin_block(hash, BlockHeight(1))?;
for (height, key, write_type) in blocks_write_type.clone() {
let mut batch = PersistentStorage::batch();
if height != storage.block.height {
// to check the root later
roots.insert(storage.block.height, storage.merkle_root());
if storage.block.height.0 % 5 == 0 {
// new epoch every 5 heights
storage.block.epoch = storage.block.epoch.next();
storage
.block
.pred_epochs
.new_epoch(storage.block.height, 1000);
}
storage.commit_block()?;
let hash = BlockHash::default();
storage
.begin_block(hash, storage.block.height.next_height())?;
}
match write_type {
0 => {
// no update
}
1 => {
storage.delete(&key)?;
}
2 => {
let value_bytes = types::encode(&storage.block.height);
storage.write(&key, value_bytes)?;
}
3 => {
storage.batch_delete_subspace_val(&mut batch, &key)?;
}
_ => {
let value_bytes = types::encode(&storage.block.height);
storage.batch_write_subspace_val(
&mut batch,
&key,
value_bytes,
)?;
}
}
storage.exec_batch(batch)?;
}
roots.insert(storage.block.height, storage.merkle_root());
storage.commit_block()?;

let mut current_state = HashMap::new();
for i in 0..num_keys {
let key = Key::parse(format!("key{}", i)).unwrap();
current_state.insert(key, true);
}
// Check a Merkle tree
for (height, key, write_type) in blocks_write_type {
let tree = storage.get_merkle_tree(height)?;
assert_eq!(tree.root().0, roots.get(&height).unwrap().0);
match write_type {
0 => {
if *current_state.get(&key).unwrap() {
assert!(tree.has_key(&key)?);
} else {
assert!(!tree.has_key(&key)?);
}
}
1 | 3 => {
assert!(!tree.has_key(&key)?);
current_state.insert(key, false);
}
_ => {
assert!(tree.has_key(&key)?);
current_state.insert(key, true);
}
}
}

Ok(())
}

/// Test the prefix iterator with RocksDB.
#[test]
fn test_persistent_storage_prefix_iter() {
Expand Down
111 changes: 84 additions & 27 deletions apps/src/lib/node/ledger/storage/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ use namada::ledger::storage::{
};
use namada::types::internal::TxQueue;
use namada::types::storage::{
BlockHeight, BlockResults, Header, Key, KeySeg, KEY_SEGMENT_SEPARATOR,
BlockHeight, BlockResults, Epochs, Header, Key, KeySeg,
KEY_SEGMENT_SEPARATOR,
};
use namada::types::time::DateTimeUtc;
use rocksdb::{
Expand Down Expand Up @@ -488,7 +489,11 @@ impl DB for RocksDB {
}
}

fn write_block(&mut self, state: BlockStateWrite) -> Result<()> {
fn write_block(
&mut self,
state: BlockStateWrite,
is_full_commit: bool,
) -> Result<()> {
let mut batch = WriteBatch::default();
let BlockStateWrite {
merkle_tree_stores,
Expand Down Expand Up @@ -548,23 +553,25 @@ impl DB for RocksDB {
.push(&"tree".to_owned())
.map_err(Error::KeyError)?;
for st in StoreType::iter() {
let prefix_key = prefix_key
.push(&st.to_string())
.map_err(Error::KeyError)?;
let root_key = prefix_key
.push(&"root".to_owned())
.map_err(Error::KeyError)?;
batch.put(
root_key.to_string(),
types::encode(merkle_tree_stores.root(st)),
);
let store_key = prefix_key
.push(&"store".to_owned())
.map_err(Error::KeyError)?;
batch.put(
store_key.to_string(),
merkle_tree_stores.store(st).encode(),
);
if *st == StoreType::Base || is_full_commit {
let prefix_key = prefix_key
.push(&st.to_string())
.map_err(Error::KeyError)?;
let root_key = prefix_key
.push(&"root".to_owned())
.map_err(Error::KeyError)?;
batch.put(
root_key.to_string(),
types::encode(merkle_tree_stores.root(st)),
);
let store_key = prefix_key
.push(&"store".to_owned())
.map_err(Error::KeyError)?;
batch.put(
store_key.to_string(),
merkle_tree_stores.store(st).encode(),
);
}
}
}
// Block header
Expand Down Expand Up @@ -644,12 +651,30 @@ impl DB for RocksDB {
fn read_merkle_tree_stores(
&self,
height: BlockHeight,
) -> Result<Option<MerkleTreeStoresRead>> {
let mut merkle_tree_stores = MerkleTreeStoresRead::default();
) -> Result<Option<(BlockHeight, MerkleTreeStoresRead)>> {
// Get the latest height at which the tree stores were written
let height_key = Key::from(height.to_db_key());
let tree_key = height_key
let key = height_key
.push(&"pred_epochs".to_owned())
.expect("Cannot obtain a storage key");
let pred_epochs: Epochs = match self
.0
.get(key.to_string())
.map_err(|e| Error::DBError(e.into_string()))?
{
Some(b) => types::decode(b).map_err(Error::CodingError)?,
None => return Ok(None),
};
// Read the tree at the first height if no epoch update
let stored_height = match pred_epochs.get_epoch_start_height(height) {
Some(BlockHeight(0)) | None => BlockHeight(1),
Some(h) => h,
};

let tree_key = Key::from(stored_height.to_db_key())
.push(&"tree".to_owned())
.map_err(Error::KeyError)?;
let mut merkle_tree_stores = MerkleTreeStoresRead::default();
for st in StoreType::iter() {
let prefix_key =
tree_key.push(&st.to_string()).map_err(Error::KeyError)?;
Expand Down Expand Up @@ -682,7 +707,7 @@ impl DB for RocksDB {
None => return Ok(None),
}
}
Ok(Some(merkle_tree_stores))
Ok(Some((stored_height, merkle_tree_stores)))
}

fn read_subspace_val(&self, key: &Key) -> Result<Option<Vec<u8>>> {
Expand Down Expand Up @@ -923,7 +948,7 @@ impl DB for RocksDB {
// Check the length of previous value, if any
let prev_len = match self
.0
.get(key.to_string())
.get(subspace_key.to_string())
.map_err(|e| Error::DBError(e.into_string()))?
{
Some(prev_value) => {
Expand Down Expand Up @@ -955,7 +980,7 @@ impl<'iter> DBIter<'iter> for RocksDB {
&'iter self,
prefix: &Key,
) -> PersistentPrefixIterator<'iter> {
iter_prefix(self, prefix)
iter_subspace_prefix(self, prefix)
}

fn iter_results(&'iter self) -> PersistentPrefixIterator<'iter> {
Expand All @@ -977,15 +1002,47 @@ impl<'iter> DBIter<'iter> for RocksDB {
);
PersistentPrefixIterator(PrefixIterator::new(iter, db_prefix))
}

fn iter_old_diffs(
&'iter self,
height: BlockHeight,
) -> PersistentPrefixIterator<'iter> {
iter_diffs_prefix(self, height, true)
}

fn iter_new_diffs(
&'iter self,
height: BlockHeight,
) -> PersistentPrefixIterator<'iter> {
iter_diffs_prefix(self, height, false)
}
}

fn iter_prefix<'iter>(
fn iter_subspace_prefix<'iter>(
db: &'iter RocksDB,
prefix: &Key,
) -> PersistentPrefixIterator<'iter> {
let db_prefix = "subspace/".to_owned();
let prefix = format!("{}{}", db_prefix, prefix);
iter_prefix(db, db_prefix, prefix)
}

fn iter_diffs_prefix(
db: &RocksDB,
height: BlockHeight,
is_old: bool,
) -> PersistentPrefixIterator {
let prefix = if is_old { "old" } else { "new" };
let db_prefix = format!("{}/diffs/{}/", height.0.raw(), prefix);
// get keys without a prefix
iter_prefix(db, db_prefix.clone(), db_prefix)
}

fn iter_prefix(
db: &RocksDB,
db_prefix: String,
prefix: String,
) -> PersistentPrefixIterator {
let mut read_opts = ReadOptions::default();
// don't use the prefix bloom filter
read_opts.set_total_order_seek(true);
Expand Down Expand Up @@ -1161,7 +1218,7 @@ mod test {
tx_queue: &tx_queue,
};

db.write_block(block).unwrap();
db.write_block(block, true).unwrap();

let _state = db
.read_last_block()
Expand Down
Loading