Skip to content

Commit

Permalink
Merge remote-tracking branch 'namada/yuji/write_tree_stores_less_ofte…
Browse files Browse the repository at this point in the history
…n' into draft-0.15

* namada/yuji/write_tree_stores_less_often:
  fix comments
  retry CI
  [ci] wasm checksums update
  fix to rebuild merkle tree before read
  fix the unit test
  compare keys as string
  fix for the first height
  [ci] wasm checksums update
  add changelog
  write Merkle tree stores less often
  • Loading branch information
juped committed Mar 16, 2023
2 parents a745ace + a674174 commit d1296f4
Show file tree
Hide file tree
Showing 9 changed files with 433 additions and 81 deletions.
2 changes: 2 additions & 0 deletions .changelog/unreleased/improvements/1113-write-tree-stores.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Write Merkle tree stores only when a new epoch
([#1113](https://github.com/anoma/namada/issues/1113))
36 changes: 21 additions & 15 deletions apps/src/lib/node/ledger/shell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,9 @@ mod test_utils {
use namada::types::chain::ChainId;
use namada::types::hash::Hash;
use namada::types::key::*;
use namada::types::storage::{BlockHash, BlockResults, Epoch, Header};
use namada::types::storage::{
BlockHash, BlockResults, Epoch, Epochs, Header,
};
use namada::types::transaction::{Fee, WrapperTx};
use tempfile::tempdir;
use tokio::sync::mpsc::UnboundedReceiver;
Expand Down Expand Up @@ -1022,25 +1024,29 @@ mod test_utils {
let merkle_tree = MerkleTree::<Sha256Hasher>::default();
let stores = merkle_tree.stores();
let hash = BlockHash([0; 32]);
let pred_epochs = Default::default();
let mut pred_epochs: Epochs = Default::default();
pred_epochs.new_epoch(BlockHeight(1), 1000);
let address_gen = EstablishedAddressGen::new("test");
shell
.wl_storage
.storage
.db
.write_block(BlockStateWrite {
merkle_tree_stores: stores,
header: None,
hash: &hash,
height: BlockHeight(1),
epoch: Epoch(0),
pred_epochs: &pred_epochs,
next_epoch_min_start_height: BlockHeight(3),
next_epoch_min_start_time: DateTimeUtc::now(),
address_gen: &address_gen,
results: &BlockResults::default(),
tx_queue: &shell.wl_storage.storage.tx_queue,
})
.write_block(
BlockStateWrite {
merkle_tree_stores: stores,
header: None,
hash: &hash,
height: BlockHeight(1),
epoch: Epoch(1),
pred_epochs: &pred_epochs,
next_epoch_min_start_height: BlockHeight(3),
next_epoch_min_start_time: DateTimeUtc::now(),
address_gen: &address_gen,
results: &BlockResults::default(),
tx_queue: &shell.wl_storage.storage.tx_queue,
},
true,
)
.expect("Test failed");

// Drop the shell
Expand Down
122 changes: 122 additions & 0 deletions apps/src/lib/node/ledger/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ fn new_blake2b() -> Blake2b {

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use itertools::Itertools;
use namada::ledger::storage::{types, WlStorage};
use namada::ledger::storage_api::{self, StorageWrite};
Expand Down Expand Up @@ -131,6 +133,8 @@ mod tests {
storage
.write(&key, value_bytes.clone())
.expect("write failed");
storage.block.epoch = storage.block.epoch.next();
storage.block.pred_epochs.new_epoch(BlockHeight(100), 1000);
storage.commit_block().expect("commit failed");

// save the last state and drop the storage
Expand Down Expand Up @@ -246,6 +250,11 @@ mod tests {
fn test_read_with_height(blocks_write_value in vec(any::<bool>(), 20)) {
test_read_with_height_aux(blocks_write_value).unwrap()
}

#[test]
fn test_get_merkle_tree(blocks_write_type in vec(0..5_u64, 50)) {
test_get_merkle_tree_aux(blocks_write_type).unwrap()
}
}

/// Test reads at arbitrary block heights.
Expand Down Expand Up @@ -347,6 +356,119 @@ mod tests {
Ok(())
}

/// Test the restore of the merkle tree
fn test_get_merkle_tree_aux(
blocks_write_type: Vec<u64>,
) -> namada::ledger::storage::Result<()> {
let db_path =
TempDir::new().expect("Unable to create a temporary DB directory");
let mut storage = PersistentStorage::open(
db_path.path(),
ChainId::default(),
address::nam(),
None,
);

let num_keys = 5;
let blocks_write_type = blocks_write_type.into_iter().enumerate().map(
|(index, write_type)| {
// try to update some keys at each height
let height = BlockHeight::from(index as u64 / num_keys + 1);
let key = Key::parse(format!("key{}", index as u64 % num_keys))
.unwrap();
(height, key, write_type)
},
);

let mut roots = HashMap::new();

// write values at Height 0 like init_storage
for i in 0..num_keys {
let key = Key::parse(format!("key{}", i)).unwrap();
let value_bytes = types::encode(&storage.block.height);
storage.write(&key, value_bytes)?;
}

// Update and commit
let hash = BlockHash::default();
storage.begin_block(hash, BlockHeight(1))?;
for (height, key, write_type) in blocks_write_type.clone() {
let mut batch = PersistentStorage::batch();
if height != storage.block.height {
// to check the root later
roots.insert(storage.block.height, storage.merkle_root());
if storage.block.height.0 % 5 == 0 {
// new epoch every 5 heights
storage.block.epoch = storage.block.epoch.next();
storage
.block
.pred_epochs
.new_epoch(storage.block.height, 1000);
}
storage.commit_block()?;
let hash = BlockHash::default();
storage
.begin_block(hash, storage.block.height.next_height())?;
}
match write_type {
0 => {
// no update
}
1 => {
storage.delete(&key)?;
}
2 => {
let value_bytes = types::encode(&storage.block.height);
storage.write(&key, value_bytes)?;
}
3 => {
storage.batch_delete_subspace_val(&mut batch, &key)?;
}
_ => {
let value_bytes = types::encode(&storage.block.height);
storage.batch_write_subspace_val(
&mut batch,
&key,
value_bytes,
)?;
}
}
storage.exec_batch(batch)?;
}
roots.insert(storage.block.height, storage.merkle_root());
storage.commit_block()?;

let mut current_state = HashMap::new();
for i in 0..num_keys {
let key = Key::parse(format!("key{}", i)).unwrap();
current_state.insert(key, true);
}
// Check a Merkle tree
for (height, key, write_type) in blocks_write_type {
let tree = storage.get_merkle_tree(height)?;
assert_eq!(tree.root().0, roots.get(&height).unwrap().0);
match write_type {
0 => {
if *current_state.get(&key).unwrap() {
assert!(tree.has_key(&key)?);
} else {
assert!(!tree.has_key(&key)?);
}
}
1 | 3 => {
assert!(!tree.has_key(&key)?);
current_state.insert(key, false);
}
_ => {
assert!(tree.has_key(&key)?);
current_state.insert(key, true);
}
}
}

Ok(())
}

/// Test the prefix iterator with RocksDB.
#[test]
fn test_persistent_storage_prefix_iter() {
Expand Down
111 changes: 84 additions & 27 deletions apps/src/lib/node/ledger/storage/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ use namada::ledger::storage::{
};
use namada::types::internal::TxQueue;
use namada::types::storage::{
BlockHeight, BlockResults, Header, Key, KeySeg, KEY_SEGMENT_SEPARATOR,
BlockHeight, BlockResults, Epochs, Header, Key, KeySeg,
KEY_SEGMENT_SEPARATOR,
};
use namada::types::time::DateTimeUtc;
use rocksdb::{
Expand Down Expand Up @@ -488,7 +489,11 @@ impl DB for RocksDB {
}
}

fn write_block(&mut self, state: BlockStateWrite) -> Result<()> {
fn write_block(
&mut self,
state: BlockStateWrite,
is_full_commit: bool,
) -> Result<()> {
let mut batch = WriteBatch::default();
let BlockStateWrite {
merkle_tree_stores,
Expand Down Expand Up @@ -548,23 +553,25 @@ impl DB for RocksDB {
.push(&"tree".to_owned())
.map_err(Error::KeyError)?;
for st in StoreType::iter() {
let prefix_key = prefix_key
.push(&st.to_string())
.map_err(Error::KeyError)?;
let root_key = prefix_key
.push(&"root".to_owned())
.map_err(Error::KeyError)?;
batch.put(
root_key.to_string(),
types::encode(merkle_tree_stores.root(st)),
);
let store_key = prefix_key
.push(&"store".to_owned())
.map_err(Error::KeyError)?;
batch.put(
store_key.to_string(),
merkle_tree_stores.store(st).encode(),
);
if *st == StoreType::Base || is_full_commit {
let prefix_key = prefix_key
.push(&st.to_string())
.map_err(Error::KeyError)?;
let root_key = prefix_key
.push(&"root".to_owned())
.map_err(Error::KeyError)?;
batch.put(
root_key.to_string(),
types::encode(merkle_tree_stores.root(st)),
);
let store_key = prefix_key
.push(&"store".to_owned())
.map_err(Error::KeyError)?;
batch.put(
store_key.to_string(),
merkle_tree_stores.store(st).encode(),
);
}
}
}
// Block header
Expand Down Expand Up @@ -644,12 +651,30 @@ impl DB for RocksDB {
fn read_merkle_tree_stores(
&self,
height: BlockHeight,
) -> Result<Option<MerkleTreeStoresRead>> {
let mut merkle_tree_stores = MerkleTreeStoresRead::default();
) -> Result<Option<(BlockHeight, MerkleTreeStoresRead)>> {
// Get the latest height at which the tree stores were written
let height_key = Key::from(height.to_db_key());
let tree_key = height_key
let key = height_key
.push(&"pred_epochs".to_owned())
.expect("Cannot obtain a storage key");
let pred_epochs: Epochs = match self
.0
.get(key.to_string())
.map_err(|e| Error::DBError(e.into_string()))?
{
Some(b) => types::decode(b).map_err(Error::CodingError)?,
None => return Ok(None),
};
// Read the tree at the first height if no epoch update
let stored_height = match pred_epochs.get_epoch_start_height(height) {
Some(BlockHeight(0)) | None => BlockHeight(1),
Some(h) => h,
};

let tree_key = Key::from(stored_height.to_db_key())
.push(&"tree".to_owned())
.map_err(Error::KeyError)?;
let mut merkle_tree_stores = MerkleTreeStoresRead::default();
for st in StoreType::iter() {
let prefix_key =
tree_key.push(&st.to_string()).map_err(Error::KeyError)?;
Expand Down Expand Up @@ -682,7 +707,7 @@ impl DB for RocksDB {
None => return Ok(None),
}
}
Ok(Some(merkle_tree_stores))
Ok(Some((stored_height, merkle_tree_stores)))
}

fn read_subspace_val(&self, key: &Key) -> Result<Option<Vec<u8>>> {
Expand Down Expand Up @@ -923,7 +948,7 @@ impl DB for RocksDB {
// Check the length of previous value, if any
let prev_len = match self
.0
.get(key.to_string())
.get(subspace_key.to_string())
.map_err(|e| Error::DBError(e.into_string()))?
{
Some(prev_value) => {
Expand Down Expand Up @@ -955,7 +980,7 @@ impl<'iter> DBIter<'iter> for RocksDB {
&'iter self,
prefix: &Key,
) -> PersistentPrefixIterator<'iter> {
iter_prefix(self, prefix)
iter_subspace_prefix(self, prefix)
}

fn iter_results(&'iter self) -> PersistentPrefixIterator<'iter> {
Expand All @@ -977,15 +1002,47 @@ impl<'iter> DBIter<'iter> for RocksDB {
);
PersistentPrefixIterator(PrefixIterator::new(iter, db_prefix))
}

fn iter_old_diffs(
&'iter self,
height: BlockHeight,
) -> PersistentPrefixIterator<'iter> {
iter_diffs_prefix(self, height, true)
}

fn iter_new_diffs(
&'iter self,
height: BlockHeight,
) -> PersistentPrefixIterator<'iter> {
iter_diffs_prefix(self, height, false)
}
}

fn iter_prefix<'iter>(
fn iter_subspace_prefix<'iter>(
db: &'iter RocksDB,
prefix: &Key,
) -> PersistentPrefixIterator<'iter> {
let db_prefix = "subspace/".to_owned();
let prefix = format!("{}{}", db_prefix, prefix);
iter_prefix(db, db_prefix, prefix)
}

fn iter_diffs_prefix(
db: &RocksDB,
height: BlockHeight,
is_old: bool,
) -> PersistentPrefixIterator {
let prefix = if is_old { "old" } else { "new" };
let db_prefix = format!("{}/diffs/{}/", height.0.raw(), prefix);
// get keys without a prefix
iter_prefix(db, db_prefix.clone(), db_prefix)
}

fn iter_prefix(
db: &RocksDB,
db_prefix: String,
prefix: String,
) -> PersistentPrefixIterator {
let mut read_opts = ReadOptions::default();
// don't use the prefix bloom filter
read_opts.set_total_order_seek(true);
Expand Down Expand Up @@ -1161,7 +1218,7 @@ mod test {
tx_queue: &tx_queue,
};

db.write_block(block).unwrap();
db.write_block(block, true).unwrap();

let _state = db
.read_last_block()
Expand Down
Loading

0 comments on commit d1296f4

Please sign in to comment.