diff --git a/applications/tari_dan_app_utilities/src/consensus_constants.rs b/applications/tari_dan_app_utilities/src/consensus_constants.rs index ca050872c..6d797f6a0 100644 --- a/applications/tari_dan_app_utilities/src/consensus_constants.rs +++ b/applications/tari_dan_app_utilities/src/consensus_constants.rs @@ -42,7 +42,7 @@ impl ConsensusConstants { committee_size: 7, max_base_layer_blocks_ahead: 5, max_base_layer_blocks_behind: 5, - num_preshards: NumPreshards::SixtyFour, + num_preshards: NumPreshards::P64, pacemaker_max_base_time: Duration::from_secs(10), } } diff --git a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs index a865f28fe..7ac5befc7 100644 --- a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs +++ b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs @@ -343,26 +343,15 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl { return Err(RpcStatus::not_found("Cannot generate checkpoint for genesis epoch")); } - let Some(local_committee_info) = self - .epoch_manager - .get_local_committee_info(prev_epoch) - .await - .optional() - .map_err(RpcStatus::log_internal_error(LOG_TARGET))? - else { - return Err(RpcStatus::bad_request(format!( - "This validator node is not registered for the previous epoch {prev_epoch}" - ))); - }; - let checkpoint = self .shard_state_store - .with_read_tx(|tx| EpochCheckpoint::generate(tx, prev_epoch, local_committee_info.shard_group())) + .with_read_tx(|tx| EpochCheckpoint::get(tx, prev_epoch)) .optional() - .map_err(RpcStatus::log_internal_error(LOG_TARGET))?; + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .ok_or_else(|| RpcStatus::not_found("Epoch checkpoint not found"))?; Ok(Response::new(GetCheckpointResponse { - checkpoint: checkpoint.map(Into::into), + checkpoint: Some(checkpoint.into()), })) } diff --git a/dan_layer/common_types/src/hashing.rs b/dan_layer/common_types/src/hashing.rs index efaaa5846..5254f498d 100644 --- a/dan_layer/common_types/src/hashing.rs +++ b/dan_layer/common_types/src/hashing.rs @@ -36,10 +36,6 @@ pub fn block_hasher() -> TariHasher { dan_hasher("Block") } -pub fn state_root_hasher() -> TariHasher { - dan_hasher("JmtStateRoots") -} - pub fn quorum_certificate_hasher() -> TariHasher { dan_hasher("QuorumCertificate") } diff --git a/dan_layer/common_types/src/num_preshards.rs b/dan_layer/common_types/src/num_preshards.rs index 9d72a1ef1..b5835a071 100644 --- a/dan_layer/common_types/src/num_preshards.rs +++ b/dan_layer/common_types/src/num_preshards.rs @@ -12,26 +12,26 @@ use serde::{Deserialize, Serialize}; )] #[derive(Clone, Debug, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] pub enum NumPreshards { - One = 1, - Two = 2, - Four = 4, - Eight = 8, - Sixteen = 16, - ThirtyTwo = 32, - SixtyFour = 64, - OneTwentyEight = 128, - TwoFiftySix = 256, + P1 = 1, + P2 = 2, + P4 = 4, + P8 = 8, + P16 = 16, + P32 = 32, + P64 = 64, + P128 = 128, + P256 = 256, } impl NumPreshards { - pub const MAX: Self = Self::TwoFiftySix; + pub const MAX: Self = Self::P256; pub fn as_u32(self) -> u32 { self as u32 } pub fn is_one(self) -> bool { - self == Self::One + self == Self::P1 } } @@ -40,15 +40,15 @@ impl TryFrom for NumPreshards { fn try_from(value: u32) -> Result { match value { - 1 => Ok(Self::One), - 2 => Ok(Self::Two), - 4 => Ok(Self::Four), - 8 => Ok(Self::Eight), - 16 => Ok(Self::Sixteen), - 32 => Ok(Self::ThirtyTwo), - 64 => Ok(Self::SixtyFour), - 128 => Ok(Self::OneTwentyEight), - 256 => Ok(Self::TwoFiftySix), + 1 => Ok(Self::P1), + 2 => Ok(Self::P2), + 4 => Ok(Self::P4), + 8 => Ok(Self::P8), + 16 => Ok(Self::P16), + 32 => Ok(Self::P32), + 64 => Ok(Self::P64), + 128 => Ok(Self::P128), + 256 => Ok(Self::P256), _ => Err(InvalidNumPreshards(value)), } } diff --git a/dan_layer/common_types/src/shard_group.rs b/dan_layer/common_types/src/shard_group.rs index e5e1bbd4d..b895b908b 100644 --- a/dan_layer/common_types/src/shard_group.rs +++ b/dan_layer/common_types/src/shard_group.rs @@ -142,7 +142,7 @@ mod tests { #[test] fn to_substate_address_range() { let sg = ShardGroup::new(0, 63); - let range = sg.to_substate_address_range(NumPreshards::SixtyFour); + let range = sg.to_substate_address_range(NumPreshards::P64); assert_eq!(*range.start(), SubstateAddress::zero()); assert_eq!(*range.end(), SubstateAddress::max()); } diff --git a/dan_layer/common_types/src/substate_address.rs b/dan_layer/common_types/src/substate_address.rs index c4b799c24..280329328 100644 --- a/dan_layer/common_types/src/substate_address.rs +++ b/dan_layer/common_types/src/substate_address.rs @@ -305,20 +305,20 @@ mod tests { #[test] fn to_committee_shard_and_shard_range_match() { let address = address_at(1, 8); - let shard = address.to_shard(NumPreshards::Eight); + let shard = address.to_shard(NumPreshards::P8); assert_eq!(shard, 1); - let range = Shard::from(0).to_substate_address_range(NumPreshards::Two); + let range = Shard::from(0).to_substate_address_range(NumPreshards::P2); assert_range(range, SubstateAddress::zero()..address_at(1, 2)); - let range = Shard::from(1).to_substate_address_range(NumPreshards::Two); + let range = Shard::from(1).to_substate_address_range(NumPreshards::P2); assert_range(range, address_at(1, 2)..=SubstateAddress::max()); for n in 0..7 { - let range = Shard::from(n).to_substate_address_range(NumPreshards::Eight); + let range = Shard::from(n).to_substate_address_range(NumPreshards::P8); assert_range(range, address_at(n, 8)..address_at(n + 1, 8)); } - let range = Shard::from(7).to_substate_address_range(NumPreshards::Eight); + let range = Shard::from(7).to_substate_address_range(NumPreshards::P8); assert_range(range, address_at(7, 8)..=address_at(8, 8)); } @@ -381,28 +381,28 @@ mod tests { #[test] fn to_shard() { - let shard = SubstateAddress::zero().to_shard(NumPreshards::Two); + let shard = SubstateAddress::zero().to_shard(NumPreshards::P2); assert_eq!(shard, 0); - let shard = address_at(1, 2).to_shard(NumPreshards::Two); + let shard = address_at(1, 2).to_shard(NumPreshards::P2); assert_eq!(shard, 1); - let shard = plus_one(address_at(1, 2)).to_shard(NumPreshards::Two); + let shard = plus_one(address_at(1, 2)).to_shard(NumPreshards::P2); assert_eq!(shard, 1); - let shard = SubstateAddress::max().to_shard(NumPreshards::Two); + let shard = SubstateAddress::max().to_shard(NumPreshards::P2); assert_eq!(shard, 1); for i in 0..=32 { - let shard = divide_shard_space(i, 32).to_shard(NumPreshards::One); + let shard = divide_shard_space(i, 32).to_shard(NumPreshards::P1); assert_eq!(shard, 0); } // 2 shards, exactly half of the physical shard space for i in 0..=8 { - let shard = divide_shard_space(i, 16).to_shard(NumPreshards::Two); + let shard = divide_shard_space(i, 16).to_shard(NumPreshards::P2); assert_eq!(shard, 0, "{shard} is not 0 for i: {i}"); } for i in 9..16 { - let shard = divide_shard_space(i, 16).to_shard(NumPreshards::Two); + let shard = divide_shard_space(i, 16).to_shard(NumPreshards::P2); assert_eq!(shard, 1, "{shard} is not 1 for i: {i}"); } @@ -423,7 +423,7 @@ mod tests { } } - let shard = divide_floor(SubstateAddress::max(), 2).to_shard(NumPreshards::TwoFiftySix); + let shard = divide_floor(SubstateAddress::max(), 2).to_shard(NumPreshards::P256); assert_eq!(shard, 128); } @@ -503,56 +503,56 @@ mod tests { #[test] fn it_returns_the_correct_shard_group() { - let group = SubstateAddress::zero().to_shard_group(NumPreshards::Four, 2); + let group = SubstateAddress::zero().to_shard_group(NumPreshards::P4, 2); assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(1)); - let group = plus_one(address_at(0, 4)).to_shard_group(NumPreshards::Four, 2); + let group = plus_one(address_at(0, 4)).to_shard_group(NumPreshards::P4, 2); assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(1)); - let group = address_at(1, 4).to_shard_group(NumPreshards::Four, 2); + let group = address_at(1, 4).to_shard_group(NumPreshards::P4, 2); assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(1)); - let group = address_at(2, 4).to_shard_group(NumPreshards::Four, 2); + let group = address_at(2, 4).to_shard_group(NumPreshards::P4, 2); assert_eq!(group.as_range(), Shard::from(2)..=Shard::from(3)); - let group = address_at(3, 4).to_shard_group(NumPreshards::Four, 2); + let group = address_at(3, 4).to_shard_group(NumPreshards::P4, 2); assert_eq!(group.as_range(), Shard::from(2)..=Shard::from(3)); - let group = SubstateAddress::max().to_shard_group(NumPreshards::Four, 2); + let group = SubstateAddress::max().to_shard_group(NumPreshards::P4, 2); assert_eq!(group.as_range(), Shard::from(2)..=Shard::from(3)); - let group = minus_one(address_at(1, 64)).to_shard_group(NumPreshards::SixtyFour, 16); + let group = minus_one(address_at(1, 64)).to_shard_group(NumPreshards::P64, 16); assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(3)); - let group = address_at(4, 64).to_shard_group(NumPreshards::SixtyFour, 16); + let group = address_at(4, 64).to_shard_group(NumPreshards::P64, 16); assert_eq!(group.as_range(), Shard::from(4)..=Shard::from(7)); - let group = address_at(8, 64).to_shard_group(NumPreshards::SixtyFour, 2); + let group = address_at(8, 64).to_shard_group(NumPreshards::P64, 2); assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(31)); - let group = address_at(5, 8).to_shard_group(NumPreshards::SixtyFour, 2); + let group = address_at(5, 8).to_shard_group(NumPreshards::P64, 2); assert_eq!(group.as_range(), Shard::from(32)..=Shard::from(63)); // On boundary - let group = address_at(0, 8).to_shard_group(NumPreshards::SixtyFour, 2); + let group = address_at(0, 8).to_shard_group(NumPreshards::P64, 2); assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(31)); - let group = address_at(4, 8).to_shard_group(NumPreshards::SixtyFour, 2); + let group = address_at(4, 8).to_shard_group(NumPreshards::P64, 2); assert_eq!(group.as_range(), Shard::from(32)..=Shard::from(63)); - let group = address_at(8, 8).to_shard_group(NumPreshards::SixtyFour, 2); + let group = address_at(8, 8).to_shard_group(NumPreshards::P64, 2); assert_eq!(group.as_range(), Shard::from(32)..=Shard::from(63)); - let group = plus_one(address_at(3, 64)).to_shard_group(NumPreshards::SixtyFour, 32); + let group = plus_one(address_at(3, 64)).to_shard_group(NumPreshards::P64, 32); assert_eq!(group.as_range(), Shard::from(2)..=Shard::from(3)); - let group = plus_one(address_at(3, 64)).to_shard_group(NumPreshards::SixtyFour, 32); + let group = plus_one(address_at(3, 64)).to_shard_group(NumPreshards::P64, 32); assert_eq!(group.as_range(), Shard::from(2)..=Shard::from(3)); - let group = address_at(16, 64).to_shard_group(NumPreshards::SixtyFour, 32); + let group = address_at(16, 64).to_shard_group(NumPreshards::P64, 32); assert_eq!(group.as_range(), Shard::from(16)..=Shard::from(17)); - let group = minus_one(address_at(1, 4)).to_shard_group(NumPreshards::SixtyFour, 64); + let group = minus_one(address_at(1, 4)).to_shard_group(NumPreshards::P64, 64); assert_eq!(group.as_range(), Shard::from(16)..=Shard::from(16)); - let group = address_at(66, 256).to_shard_group(NumPreshards::SixtyFour, 16); + let group = address_at(66, 256).to_shard_group(NumPreshards::P64, 16); assert_eq!(group.as_range(), Shard::from(16)..=Shard::from(19)); } @@ -588,66 +588,66 @@ mod tests { fn it_returns_the_correct_shard_group_for_odd_num_committees() { // All shard groups except the last have 3 shards each - let group = address_at(0, 64).to_shard_group(NumPreshards::SixtyFour, 3); + let group = address_at(0, 64).to_shard_group(NumPreshards::P64, 3); // First shard group gets an extra shard to cover the remainder assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(21)); assert_eq!(group.len(), 22); - let group = address_at(31, 64).to_shard_group(NumPreshards::SixtyFour, 3); + let group = address_at(31, 64).to_shard_group(NumPreshards::P64, 3); assert_eq!(group.as_range(), Shard::from(22)..=Shard::from(42)); assert_eq!(group.len(), 21); - let group = address_at(50, 64).to_shard_group(NumPreshards::SixtyFour, 3); + let group = address_at(50, 64).to_shard_group(NumPreshards::P64, 3); assert_eq!(group.as_range(), Shard::from(43)..=Shard::from(63)); assert_eq!(group.len(), 21); - let group = address_at(3, 64).to_shard_group(NumPreshards::SixtyFour, 7); + let group = address_at(3, 64).to_shard_group(NumPreshards::P64, 7); assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(9)); assert_eq!(group.len(), 10); - let group = address_at(11, 64).to_shard_group(NumPreshards::SixtyFour, 7); + let group = address_at(11, 64).to_shard_group(NumPreshards::P64, 7); assert_eq!(group.as_range(), Shard::from(10)..=Shard::from(18)); assert_eq!(group.len(), 9); - let group = address_at(22, 64).to_shard_group(NumPreshards::SixtyFour, 7); + let group = address_at(22, 64).to_shard_group(NumPreshards::P64, 7); assert_eq!(group.as_range(), Shard::from(19)..=Shard::from(27)); assert_eq!(group.len(), 9); - let group = address_at(60, 64).to_shard_group(NumPreshards::SixtyFour, 7); + let group = address_at(60, 64).to_shard_group(NumPreshards::P64, 7); assert_eq!(group.as_range(), Shard::from(55)..=Shard::from(63)); assert_eq!(group.len(), 9); - let group = address_at(64, 64).to_shard_group(NumPreshards::SixtyFour, 7); + let group = address_at(64, 64).to_shard_group(NumPreshards::P64, 7); assert_eq!(group.as_range(), Shard::from(55)..=Shard::from(63)); assert_eq!(group.len(), 9); - let group = SubstateAddress::zero().to_shard_group(NumPreshards::Eight, 3); + let group = SubstateAddress::zero().to_shard_group(NumPreshards::P8, 3); assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(2)); - let group = address_at(1, 8).to_shard_group(NumPreshards::Eight, 3); + let group = address_at(1, 8).to_shard_group(NumPreshards::P8, 3); assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(2)); - let group = address_at(1, 8).to_shard_group(NumPreshards::Eight, 3); + let group = address_at(1, 8).to_shard_group(NumPreshards::P8, 3); assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(2)); - let group = address_at(3, 8).to_shard_group(NumPreshards::Eight, 3); + let group = address_at(3, 8).to_shard_group(NumPreshards::P8, 3); assert_eq!(group.as_range(), Shard::from(3)..=Shard::from(5)); - let group = address_at(4, 8).to_shard_group(NumPreshards::Eight, 3); + let group = address_at(4, 8).to_shard_group(NumPreshards::P8, 3); assert_eq!(group.as_range(), Shard::from(3)..=Shard::from(5)); - let group = address_at(5, 8).to_shard_group(NumPreshards::Eight, 3); + let group = address_at(5, 8).to_shard_group(NumPreshards::P8, 3); assert_eq!(group.as_range(), Shard::from(3)..=Shard::from(5)); // - let group = address_at(6, 8).to_shard_group(NumPreshards::Eight, 3); + let group = address_at(6, 8).to_shard_group(NumPreshards::P8, 3); assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(7)); - let group = address_at(7, 8).to_shard_group(NumPreshards::Eight, 3); + let group = address_at(7, 8).to_shard_group(NumPreshards::P8, 3); assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(7)); - let group = address_at(8, 8).to_shard_group(NumPreshards::Eight, 3); + let group = address_at(8, 8).to_shard_group(NumPreshards::P8, 3); assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(7)); // Committee = 5 - let group = address_at(4, 8).to_shard_group(NumPreshards::Eight, 5); + let group = address_at(4, 8).to_shard_group(NumPreshards::P8, 5); assert_eq!(group.as_range(), Shard::from(4)..=Shard::from(5)); - let group = address_at(7, 8).to_shard_group(NumPreshards::Eight, 5); + let group = address_at(7, 8).to_shard_group(NumPreshards::P8, 5); assert_eq!(group.as_range(), Shard::from(7)..=Shard::from(7)); - let group = address_at(8, 8).to_shard_group(NumPreshards::Eight, 5); + let group = address_at(8, 8).to_shard_group(NumPreshards::P8, 5); assert_eq!(group.as_range(), Shard::from(7)..=Shard::from(7)); } } @@ -657,17 +657,17 @@ mod tests { #[test] fn it_works() { - let range = ShardGroup::new(0, 9).to_substate_address_range(NumPreshards::Sixteen); + let range = ShardGroup::new(0, 9).to_substate_address_range(NumPreshards::P16); assert_range(range, SubstateAddress::zero()..address_at(10, 16)); - let range = ShardGroup::new(1, 15).to_substate_address_range(NumPreshards::Sixteen); + let range = ShardGroup::new(1, 15).to_substate_address_range(NumPreshards::P16); // Last shard always includes SubstateAddress::max assert_range(range, address_at(1, 16)..=address_at(16, 16)); - let range = ShardGroup::new(1, 8).to_substate_address_range(NumPreshards::Sixteen); + let range = ShardGroup::new(1, 8).to_substate_address_range(NumPreshards::P16); assert_range(range, address_at(1, 16)..address_at(9, 16)); - let range = ShardGroup::new(8, 15).to_substate_address_range(NumPreshards::Sixteen); + let range = ShardGroup::new(8, 15).to_substate_address_range(NumPreshards::P16); assert_range(range, address_at(8, 16)..=address_at(16, 16)); } } diff --git a/dan_layer/consensus/src/hotstuff/block_change_set.rs b/dan_layer/consensus/src/hotstuff/block_change_set.rs index fa7a55326..fce50dc36 100644 --- a/dan_layer/consensus/src/hotstuff/block_change_set.rs +++ b/dan_layer/consensus/src/hotstuff/block_change_set.rs @@ -11,7 +11,7 @@ use tari_dan_storage::{ BlockDiff, LeafBlock, LockedSubstate, - PendingStateTreeDiff, + PendingShardStateTreeDiff, QuorumDecision, SubstateChange, SubstateRecord, @@ -62,9 +62,9 @@ impl ProposedBlockChangeSet { pub fn no_vote(mut self) -> Self { self.quorum_decision = None; self.block_diff = Vec::new(); - self.transaction_changes.clear(); - self.state_tree_diffs.clear(); - self.substate_locks.clear(); + self.transaction_changes = IndexMap::new(); + self.state_tree_diffs = IndexMap::new(); + self.substate_locks = IndexMap::new(); self } @@ -142,8 +142,9 @@ impl ProposedBlockChangeSet { block_diff.insert(tx)?; // Store the tree diffs for each effected shard - for (shard, diff) in self.state_tree_diffs { - PendingStateTreeDiff::create(tx, *self.block.block_id(), shard, diff)?; + let shard_tree_diffs = self.state_tree_diffs; + for (shard, diff) in shard_tree_diffs { + PendingShardStateTreeDiff::create(tx, *self.block.block_id(), shard, diff)?; } // Save locks diff --git a/dan_layer/consensus/src/hotstuff/common.rs b/dan_layer/consensus/src/hotstuff/common.rs index a95b61411..28200cdb0 100644 --- a/dan_layer/consensus/src/hotstuff/common.rs +++ b/dan_layer/consensus/src/hotstuff/common.rs @@ -1,7 +1,10 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{collections::HashMap, ops::ControlFlow}; +use std::{ + collections::HashMap, + ops::{ControlFlow, Deref}, +}; use indexmap::IndexMap; use log::*; @@ -11,17 +14,26 @@ use tari_dan_common_types::{committee::Committee, shard::Shard, Epoch, NodeAddre use tari_dan_storage::{ consensus_models::{ Block, + EpochCheckpoint, LeafBlock, - PendingStateTreeDiff, + PendingShardStateTreeDiff, QuorumCertificate, SubstateChange, VersionedStateHashTreeDiff, }, StateStoreReadTransaction, + StateStoreWriteTransaction, + StorageError, }; -use tari_state_tree::{Hash, StateTreeError}; +use tari_state_tree::{Hash, JellyfishMerkleTree, StateTreeError}; -use crate::{hotstuff::substate_store::ShardedStateTree, traits::LeaderStrategy}; +use crate::{ + hotstuff::{ + substate_store::{ShardScopedTreeStoreReader, ShardedStateTree}, + HotStuffError, + }, + traits::LeaderStrategy, +}; const LOG_TARGET: &str = "tari::dan::consensus::hotstuff::common"; @@ -169,22 +181,63 @@ fn with_dummy_blocks( } } -pub fn calculate_state_merkle_root( +pub fn calculate_state_merkle_root<'a, TTx: StateStoreReadTransaction, I: IntoIterator>( tx: &TTx, - local_shard_group: ShardGroup, - pending_tree_diffs: HashMap>, - changes: &[SubstateChange], + shard_group: ShardGroup, + pending_tree_diffs: HashMap>, + changes: I, ) -> Result<(Hash, IndexMap), StateTreeError> { - let mut change_map = IndexMap::with_capacity(changes.len()); - - changes - .iter() - .filter(|ch| local_shard_group.contains(&ch.shard())) - .for_each(|ch| { - change_map.entry(ch.shard()).or_insert_with(Vec::new).push(ch.into()); - }); + let mut change_map = IndexMap::new(); + changes.into_iter().for_each(|ch| { + // Group by shard + change_map.entry(ch.shard()).or_insert_with(Vec::new).push(ch.into()); + }); let mut sharded_tree = ShardedStateTree::new(tx).with_pending_diffs(pending_tree_diffs); - let state_root = sharded_tree.put_substate_tree_changes(change_map)?; - Ok((state_root, sharded_tree.into_versioned_tree_diffs())) + let root_hash = sharded_tree.put_substate_tree_changes(shard_group, change_map)?; + + Ok((root_hash, sharded_tree.into_shard_tree_diffs())) +} + +pub(crate) fn create_epoch_checkpoint( + tx: &mut TTx, + epoch: Epoch, + shard_group: ShardGroup, +) -> Result +where + TTx: StateStoreWriteTransaction + Deref, + TTx::Target: StateStoreReadTransaction, +{ + // Get the last 3 blocks in the previous epoch. These blocks should end the epoch. + let mut blocks = Block::get_last_n_in_epoch(&**tx, 3, epoch)?; + if blocks.is_empty() { + return Err(HotStuffError::StorageError(StorageError::NotFound { + item: "Block::get_last_n_in_epoch".to_string(), + key: epoch.to_string(), + })); + } + + let commit_block = blocks.pop().unwrap(); + let qcs = blocks.into_iter().map(|b| b.into_justify()).collect(); + + // Fetch the state roots of the shards in the shard group + let mut shard_roots = IndexMap::with_capacity(shard_group.len()); + for shard in shard_group.shard_iter() { + let Some(version) = tx.state_tree_versions_get_latest(shard)? else { + // At v0 there have been no state changes + continue; + }; + + let scoped_store = ShardScopedTreeStoreReader::new(&**tx, shard); + let jmt = JellyfishMerkleTree::new(&scoped_store); + let root_hash = jmt + .get_root_hash(version) + .map_err(|e| HotStuffError::StateTreeError(e.into()))?; + + shard_roots.insert(shard, root_hash); + } + let checkpoint = EpochCheckpoint::new(commit_block, qcs, shard_roots); + checkpoint.save(tx)?; + + Ok(checkpoint) } diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index 4c890731e..0cc6e4383 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -28,7 +28,7 @@ use tari_dan_storage::{ LastProposed, LeafBlock, LockedBlock, - PendingStateTreeDiff, + PendingShardStateTreeDiff, QuorumCertificate, SubstateChange, SubstateLockFlag, @@ -515,13 +515,16 @@ where TConsensusSpec: ConsensusSpec commands.iter().map(|c| c.to_string()).collect::>().join(",") ); - let pending_tree_diffs = PendingStateTreeDiff::get_all_up_to_commit_block(tx, high_qc.block_id())?; + let pending_tree_diffs = PendingShardStateTreeDiff::get_all_up_to_commit_block(tx, high_qc.block_id())?; let (state_root, _) = calculate_state_merkle_root( tx, local_committee_info.shard_group(), pending_tree_diffs, - substate_store.diff(), + substate_store + .diff() + .iter() + .filter(|ch| local_committee_info.shard_group().contains(&ch.shard())), )?; let non_local_shards = get_non_local_shards(substate_store.diff(), local_committee_info); diff --git a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs index 3883cc483..4f735b6a7 100644 --- a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs +++ b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs @@ -16,7 +16,7 @@ use tari_dan_storage::{ LastExecuted, LastVoted, LockedBlock, - PendingStateTreeDiff, + PendingShardStateTreeDiff, QuorumDecision, SubstateLockFlag, TransactionAtom, @@ -721,9 +721,17 @@ where TConsensusSpec: ConsensusSpec return Ok(proposed_block_change_set.no_vote()); } - let pending = PendingStateTreeDiff::get_all_up_to_commit_block(tx, block.justify().block_id())?; - let (expected_merkle_root, tree_diffs) = - calculate_state_merkle_root(tx, block.shard_group(), pending, substate_store.diff())?; + let pending = PendingShardStateTreeDiff::get_all_up_to_commit_block(tx, block.justify().block_id())?; + let (expected_merkle_root, tree_diffs) = calculate_state_merkle_root( + tx, + block.shard_group(), + pending, + substate_store + .diff() + .iter() + // Calculate for local shards only + .filter(|ch| block.shard_group().contains(&ch.shard())), + )?; if expected_merkle_root != *block.merkle_root() { warn!( target: LOG_TARGET, @@ -878,8 +886,8 @@ where TConsensusSpec: ConsensusSpec "🌳 Committing block {} with {} substate change(s)", block, diff.len() ); - // NOTE: this must happen before we commit the diff because the state transitions use this version - let pending = PendingStateTreeDiff::remove_by_block(tx, block.id())?; + // NOTE: this must happen before we commit the substate diff because the state transitions use this version + let pending = PendingShardStateTreeDiff::remove_by_block(tx, block.id())?; let mut state_tree = ShardedStateTree::new(tx); state_tree.commit_diffs(pending)?; let tx = state_tree.into_transaction(); diff --git a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs index 1585f6c6e..a9d9a908f 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs @@ -29,6 +29,7 @@ use super::proposer::Proposer; use crate::{ hotstuff::{ calculate_dummy_blocks, + create_epoch_checkpoint, error::HotStuffError, on_ready_to_vote_on_local_block::OnReadyToVoteOnLocalBlock, pacemaker_handle::PaceMakerHandle, @@ -212,6 +213,10 @@ impl OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler(()) })?; // Set the pacemaker to next epoch diff --git a/dan_layer/consensus/src/hotstuff/substate_store/mod.rs b/dan_layer/consensus/src/hotstuff/substate_store/mod.rs index b3831edd3..74a2ff41d 100644 --- a/dan_layer/consensus/src/hotstuff/substate_store/mod.rs +++ b/dan_layer/consensus/src/hotstuff/substate_store/mod.rs @@ -3,10 +3,10 @@ mod error; mod pending_store; +mod shard_state_store; mod sharded_state_tree; -mod sharded_store; pub use error::*; pub use pending_store::*; +pub use shard_state_store::*; pub use sharded_state_tree::*; -pub use sharded_store::*; diff --git a/dan_layer/consensus/src/hotstuff/substate_store/sharded_store.rs b/dan_layer/consensus/src/hotstuff/substate_store/shard_state_store.rs similarity index 97% rename from dan_layer/consensus/src/hotstuff/substate_store/sharded_store.rs rename to dan_layer/consensus/src/hotstuff/substate_store/shard_state_store.rs index 2fa8a54c0..630aa732f 100644 --- a/dan_layer/consensus/src/hotstuff/substate_store/sharded_store.rs +++ b/dan_layer/consensus/src/hotstuff/substate_store/shard_state_store.rs @@ -75,7 +75,7 @@ impl<'a, TTx: StateStoreWriteTransaction> TreeStoreWriter for ShardScop fn record_stale_tree_node(&mut self, node: StaleTreeNode) -> Result<(), tari_state_tree::JmtStorageError> { self.tx - .state_tree_nodes_mark_stale_tree_node(self.shard, node) + .state_tree_nodes_record_stale_tree_node(self.shard, node) .map_err(|e| tari_state_tree::JmtStorageError::UnexpectedError(e.to_string())) } } diff --git a/dan_layer/consensus/src/hotstuff/substate_store/sharded_state_tree.rs b/dan_layer/consensus/src/hotstuff/substate_store/sharded_state_tree.rs index 6f23872f3..c0159b176 100644 --- a/dan_layer/consensus/src/hotstuff/substate_store/sharded_state_tree.rs +++ b/dan_layer/consensus/src/hotstuff/substate_store/sharded_state_tree.rs @@ -5,15 +5,17 @@ use std::collections::HashMap; use indexmap::IndexMap; use log::debug; -use tari_dan_common_types::{hashing::state_root_hasher, shard::Shard}; +use tari_dan_common_types::{shard::Shard, ShardGroup}; use tari_dan_storage::{ - consensus_models::{PendingStateTreeDiff, VersionedStateHashTreeDiff}, + consensus_models::{PendingShardStateTreeDiff, VersionedStateHashTreeDiff}, StateStoreReadTransaction, StateStoreWriteTransaction, }; use tari_state_tree::{ + memory_store::MemoryTreeStore, Hash, JmtStorageError, + RootStateTree, SpreadPrefixStateTree, StagedTreeStore, StateHashTreeDiff, @@ -21,16 +23,17 @@ use tari_state_tree::{ SubstateTreeChange, TreeStoreWriter, Version, + SPARSE_MERKLE_PLACEHOLDER_HASH, }; -use crate::hotstuff::substate_store::sharded_store::{ShardScopedTreeStoreReader, ShardScopedTreeStoreWriter}; +use crate::hotstuff::substate_store::shard_state_store::{ShardScopedTreeStoreReader, ShardScopedTreeStoreWriter}; const LOG_TARGET: &str = "tari::dan::consensus::sharded_state_tree"; pub struct ShardedStateTree { tx: TTx, - pending_diffs: HashMap>, - sharded_tree_diffs: IndexMap, + pending_diffs: HashMap>, + shard_tree_diffs: IndexMap, } impl ShardedStateTree { @@ -38,11 +41,11 @@ impl ShardedStateTree { Self { tx, pending_diffs: HashMap::new(), - sharded_tree_diffs: IndexMap::new(), + shard_tree_diffs: IndexMap::new(), } } - pub fn with_pending_diffs(self, pending_diffs: HashMap>) -> Self { + pub fn with_pending_diffs(self, pending_diffs: HashMap>) -> Self { Self { pending_diffs, ..self } } @@ -69,25 +72,20 @@ impl ShardedStateTree<&TTx> { let maybe_version = self .tx .state_tree_versions_get_latest(shard) - .map_err(|e| StateTreeError::StorageError(JmtStorageError::UnexpectedError(e.to_string())))?; + .map_err(|e| StateTreeError::JmtStorageError(JmtStorageError::UnexpectedError(e.to_string())))?; Ok(maybe_version) } - pub fn into_versioned_tree_diffs(self) -> IndexMap { - self.sharded_tree_diffs + pub fn into_shard_tree_diffs(self) -> IndexMap { + self.shard_tree_diffs } pub fn put_substate_tree_changes( &mut self, + shard_group: ShardGroup, changes: IndexMap>, ) -> Result { - // This is here so that the state merkle root is all zeros for no changes (instead of being - // state_root_hasher().result()). - if changes.is_empty() { - return Ok(Hash::zero()); - } - - let mut state_roots = state_root_hasher(); + let mut shard_state_roots = HashMap::with_capacity(changes.len()); for (shard, changes) in changes { let current_version = self.get_current_version(shard)?; let next_version = current_version.unwrap_or(0) + 1; @@ -97,34 +95,74 @@ impl ShardedStateTree<&TTx> { // Staged store that tracks changes to the state tree let mut store = StagedTreeStore::new(&scoped_store); // Apply pending (not yet committed) diffs to the staged store - if let Some(diffs) = self.pending_diffs.remove(&shard) { + if let Some(diffs) = self.pending_diffs.get(&shard) { debug!(target: LOG_TARGET, "Applying {num_diffs} pending diff(s) to shard {shard} (version={version})", num_diffs = diffs.len(), version = diffs.last().map(|d| d.version).unwrap_or(0)); for diff in diffs { - store.apply_pending_diff(diff.diff); + store.apply_pending_diff(diff.diff.clone()); } } // Apply state updates to the state tree that is backed by the staged shard-scoped store let mut state_tree = SpreadPrefixStateTree::new(&mut store); debug!(target: LOG_TARGET, "v{next_version} contains {} tree change(s) for shard {shard}", changes.len()); - let state_root = state_tree.put_substate_changes(current_version, next_version, changes)?; - state_roots.update(&state_root); - self.sharded_tree_diffs + let shard_state_hash = state_tree.put_substate_changes(current_version, next_version, changes)?; + shard_state_roots.insert(shard, shard_state_hash); + self.shard_tree_diffs .insert(shard, VersionedStateHashTreeDiff::new(next_version, store.into_diff())); } - // TODO: use a Merkle tree to generate a root for these hashes - Ok(state_roots.result()) + let root_hash = self.get_shard_group_root(shard_group, shard_state_roots)?; + Ok(root_hash) + } + + fn get_shard_group_root( + &self, + shard_group: ShardGroup, + mut shard_state_roots: HashMap, + ) -> Result { + let mut mem_store = MemoryTreeStore::new(); + let mut root_tree = RootStateTree::new(&mut mem_store); + let mut hashes = Vec::with_capacity(shard_group.len()); + for shard in shard_group.shard_iter() { + match shard_state_roots.remove(&shard) { + Some(r) => hashes.push(r), + None => { + let hash = self.get_state_root_for_shard(shard)?; + hashes.push(hash); + }, + }; + } + root_tree.put_root_hash_changes(None, 1, hashes) + } + + fn get_state_root_for_shard(&self, shard: Shard) -> Result { + let Some(version) = self.get_current_version(shard)? else { + // At v0 there have been no state changes + return Ok(SPARSE_MERKLE_PLACEHOLDER_HASH); + }; + + let scoped_store = ShardScopedTreeStoreReader::new(self.tx, shard); + let mut store = StagedTreeStore::new(&scoped_store); + if let Some(diffs) = self.pending_diffs.get(&shard) { + for diff in diffs { + store.apply_pending_diff(diff.diff.clone()); + } + } + let state_tree = SpreadPrefixStateTree::new(&mut store); + + let root_hash = state_tree.get_root_hash(version)?; + Ok(root_hash) } } impl ShardedStateTree<&mut TTx> { - pub fn commit_diffs(&mut self, diffs: IndexMap>) -> Result<(), StateTreeError> { + pub fn commit_diffs( + &mut self, + diffs: IndexMap>, + ) -> Result<(), StateTreeError> { for (shard, pending_diffs) in diffs { for pending_diff in pending_diffs { - let version = pending_diff.version; - let diff = pending_diff.diff; - self.commit_diff(shard, version, diff)?; + self.commit_diff(shard, pending_diff.version, pending_diff.diff)?; } } @@ -135,7 +173,7 @@ impl ShardedStateTree<&mut TTx> { &mut self, shard: Shard, version: Version, - diff: StateHashTreeDiff, + diff: StateHashTreeDiff, ) -> Result<(), StateTreeError> { let mut store = ShardScopedTreeStoreWriter::new(self.tx, shard); diff --git a/dan_layer/consensus_tests/src/support/mod.rs b/dan_layer/consensus_tests/src/support/mod.rs index b9add837f..2dc3bcca2 100644 --- a/dan_layer/consensus_tests/src/support/mod.rs +++ b/dan_layer/consensus_tests/src/support/mod.rs @@ -4,7 +4,7 @@ // TODO: use all functions // #![allow(dead_code)] -pub const TEST_NUM_PRESHARDS: NumPreshards = NumPreshards::SixtyFour; +pub const TEST_NUM_PRESHARDS: NumPreshards = NumPreshards::P64; mod address; mod epoch_manager; diff --git a/dan_layer/p2p/proto/rpc.proto b/dan_layer/p2p/proto/rpc.proto index 49503860c..0339037eb 100644 --- a/dan_layer/p2p/proto/rpc.proto +++ b/dan_layer/p2p/proto/rpc.proto @@ -231,6 +231,7 @@ message GetCheckpointResponse { message EpochCheckpoint { tari.dan.consensus.Block block = 1; repeated tari.dan.consensus.QuorumCertificate qcs = 2; + map shard_roots = 3; } message SyncStateRequest { diff --git a/dan_layer/p2p/src/conversions/rpc.rs b/dan_layer/p2p/src/conversions/rpc.rs index 4b52d7a35..e3b6d2e05 100644 --- a/dan_layer/p2p/src/conversions/rpc.rs +++ b/dan_layer/p2p/src/conversions/rpc.rs @@ -4,6 +4,7 @@ use std::convert::{TryFrom, TryInto}; use anyhow::anyhow; +use tari_common_types::types::FixedHash; use tari_dan_common_types::{shard::Shard, Epoch}; use tari_dan_storage::consensus_models::{ EpochCheckpoint, @@ -180,9 +181,20 @@ impl TryFrom for EpochCheckpoint { type Error = anyhow::Error; fn try_from(value: proto::rpc::EpochCheckpoint) -> Result { + if value.shard_roots.len() > 256 { + return Err(anyhow!("too many shard roots")); + } + + let shard_roots = value + .shard_roots + .into_iter() + .map(|(k, v)| FixedHash::try_from(v).map(|h| (Shard::from(k), h))) + .collect::>()?; + Ok(Self::new( value.block.ok_or_else(|| anyhow!("block not provided"))?.try_into()?, value.qcs.into_iter().map(TryInto::try_into).collect::>()?, + shard_roots, )) } } @@ -192,6 +204,11 @@ impl From for proto::rpc::EpochCheckpoint { Self { block: Some(value.block().into()), qcs: value.qcs().iter().map(Into::into).collect(), + shard_roots: value + .shard_roots() + .iter() + .map(|(k, v)| (k.as_u32(), v.to_vec())) + .collect(), } } } diff --git a/dan_layer/rpc_state_sync/src/error.rs b/dan_layer/rpc_state_sync/src/error.rs index e98542bdb..27173b24f 100644 --- a/dan_layer/rpc_state_sync/src/error.rs +++ b/dan_layer/rpc_state_sync/src/error.rs @@ -8,7 +8,7 @@ use tari_dan_storage::{ }; use tari_epoch_manager::EpochManagerError; use tari_rpc_framework::{RpcError, RpcStatus}; -use tari_state_tree::JmtStorageError; +use tari_state_tree::{Hash, JmtStorageError}; use tari_validator_node_rpc::ValidatorNodeRpcClientError; #[derive(Debug, thiserror::Error)] @@ -33,6 +33,8 @@ pub enum CommsRpcConsensusSyncError { ProposalValidationError(#[from] ProposalValidationError), #[error("State tree error: {0}")] StateTreeError(#[from] tari_state_tree::StateTreeError), + #[error("State root mismatch. Expected: {expected}, actual: {actual}")] + StateRootMismatch { expected: Hash, actual: Hash }, } impl CommsRpcConsensusSyncError { diff --git a/dan_layer/rpc_state_sync/src/manager.rs b/dan_layer/rpc_state_sync/src/manager.rs index 91f484d39..5cf927027 100644 --- a/dan_layer/rpc_state_sync/src/manager.rs +++ b/dan_layer/rpc_state_sync/src/manager.rs @@ -33,7 +33,15 @@ use tari_dan_storage::{ }; use tari_engine_types::substate::hash_substate; use tari_epoch_manager::EpochManagerReader; -use tari_state_tree::{Hash, SpreadPrefixStateTree, SubstateTreeChange, Version}; +use tari_state_tree::{ + memory_store::MemoryTreeStore, + Hash, + RootStateTree, + SpreadPrefixStateTree, + SubstateTreeChange, + Version, + SPARSE_MERKLE_PLACEHOLDER_HASH, +}; use tari_transaction::VersionedSubstateId; use tari_validator_node_rpc::{ client::{TariValidatorNodeRpcClientFactory, ValidatorNodeClientFactory}, @@ -101,8 +109,8 @@ where TConsensusSpec: ConsensusSpec &self, client: &mut ValidatorNodeRpcClient, shard: Shard, - checkpoint: EpochCheckpoint, - ) -> Result { + checkpoint: &EpochCheckpoint, + ) -> Result, CommsRpcConsensusSyncError> { let current_epoch = self.epoch_manager.current_epoch().await?; let last_state_transition_id = self @@ -113,8 +121,7 @@ where TConsensusSpec: ConsensusSpec let persisted_version = self .state_store - .with_read_tx(|tx| tx.state_tree_versions_get_latest(shard))? - .unwrap_or(0); + .with_read_tx(|tx| tx.state_tree_versions_get_latest(shard))?; if current_epoch == last_state_transition_id.epoch() { info!(target: LOG_TARGET, "🛜Already up to date. No need to sync."); @@ -168,7 +175,7 @@ where TConsensusSpec: ConsensusSpec target: LOG_TARGET, "🛜 Next state updates batch of size {} (v{}-v{})", msg.transitions.len(), - current_version, + current_version.unwrap_or(0), msg.transitions.last().unwrap().state_tree_version, ); @@ -187,12 +194,12 @@ where TConsensusSpec: ConsensusSpec ))); } - if transition.state_tree_version < current_version { + if current_version.map_or(false, |v| transition.state_tree_version < v) { return Err(CommsRpcConsensusSyncError::InvalidResponse(anyhow!( "Received state transition with version {} that is not monotonically increasing (expected \ >= {})", transition.state_tree_version, - persisted_version + persisted_version.unwrap_or(0) ))); } @@ -220,26 +227,26 @@ where TConsensusSpec: ConsensusSpec }, }; - info!(target: LOG_TARGET, "🛜 Applying state update {transition} (v{} to v{})", current_version, transition.state_tree_version); + info!(target: LOG_TARGET, "🛜 Applying state update {transition} (v{} to v{})", current_version.unwrap_or(0), transition.state_tree_version); if next_version != transition.state_tree_version { let mut state_tree = SpreadPrefixStateTree::new(&mut store); - state_tree.put_substate_changes(Some(current_version).filter(|v| *v > 0), next_version, tree_changes.drain(..))?; - current_version = next_version; + state_tree.put_substate_changes(current_version, next_version, tree_changes.drain(..))?; + current_version = Some(next_version); next_version = transition.state_tree_version; } tree_changes.push(change); - self.commit_update(store.transaction(), &checkpoint, transition)?; + self.commit_update(store.transaction(), checkpoint, transition)?; } if !tree_changes.is_empty() { let mut state_tree = SpreadPrefixStateTree::new(&mut store); - state_tree.put_substate_changes(Some(current_version).filter(|v| *v > 0), next_version, tree_changes.drain(..))?; + state_tree.put_substate_changes(current_version, next_version, tree_changes.drain(..))?; } - current_version = next_version; + current_version = Some(next_version); - if current_version > 0 { - store.set_version(current_version)?; + if let Some(v) = current_version { + store.set_version(v)?; } Ok::<_, CommsRpcConsensusSyncError>(()) @@ -249,7 +256,15 @@ where TConsensusSpec: ConsensusSpec Ok(current_version) } - fn get_state_root_for_shard(&self, shard: Shard, version: Version) -> Result { + fn get_state_root_for_shard( + &self, + shard: Shard, + version: Option, + ) -> Result { + let Some(version) = version else { + return Ok(SPARSE_MERKLE_PLACEHOLDER_HASH); + }; + self.state_store.with_read_tx(|tx| { let mut store = ShardScopedTreeStoreReader::new(tx, shard); let state_tree = SpreadPrefixStateTree::new(&mut store); @@ -322,6 +337,26 @@ where TConsensusSpec: ConsensusSpec committees.sort_by_key(|(k, _)| *k); Ok(committees) } + + fn validate_checkpoint(&self, checkpoint: &EpochCheckpoint) -> Result<(), CommsRpcConsensusSyncError> { + // TODO: validate checkpoint + + // Check the merkle root matches the provided shard roots + let mut mem_store = MemoryTreeStore::new(); + let mut root_tree = RootStateTree::new(&mut mem_store); + let shard_group = checkpoint.block().shard_group(); + let hashes = shard_group.shard_iter().map(|shard| checkpoint.get_shard_root(shard)); + let calculated_root = root_tree.put_root_hash_changes(None, 1, hashes)?; + if calculated_root != *checkpoint.block().merkle_root() { + return Err(CommsRpcConsensusSyncError::InvalidResponse(anyhow!( + "Checkpoint merkle root mismatch. Expected {expected} but got {actual}", + expected = checkpoint.block().merkle_root(), + actual = calculated_root, + ))); + } + + Ok(()) + } } #[async_trait] @@ -362,8 +397,10 @@ where TConsensusSpec: ConsensusSpec + Send + Sync + 'static // NOTE: we don't have to worry about substates in address range because shard boundaries are fixed. for (shard, mut committee) in prev_epoch_committees { info!(target: LOG_TARGET, "🛜Syncing state for {shard} and {}", current_epoch.saturating_sub(Epoch(1))); + let mut remaining_members = committee.len(); committee.shuffle(); for (addr, public_key) in committee { + remaining_members = remaining_members.saturating_sub(1); if our_vn.public_key == public_key { continue; } @@ -374,6 +411,9 @@ where TConsensusSpec: ConsensusSpec + Send + Sync + 'static target: LOG_TARGET, "Failed to establish RPC session with vn {addr}: {err}. Attempting another VN if available" ); + if remaining_members == 0 { + return Err(err); + } last_error = Some(err); continue; }, @@ -395,23 +435,51 @@ where TConsensusSpec: ConsensusSpec + Send + Sync + 'static target: LOG_TARGET, "⚠️Failed to fetch checkpoint from {addr}: {err}. Attempting another peer if available" ); + if remaining_members == 0 { + return Err(err); + } last_error = Some(err); continue; }, }; info!(target: LOG_TARGET, "🛜 Checkpoint: {checkpoint}"); - match self.start_state_sync(&mut client, shard, checkpoint).await { + self.validate_checkpoint(&checkpoint)?; + + match self.start_state_sync(&mut client, shard, &checkpoint).await { Ok(current_version) => { - // TODO: validate this MR against the checkpoint - let merkle_root = self.get_state_root_for_shard(shard, current_version)?; - info!(target: LOG_TARGET, "🛜Synced state for {shard} to v{current_version} with root {merkle_root}"); + let state_root = self.get_state_root_for_shard(shard, current_version)?; + + if state_root != checkpoint.get_shard_root(shard) { + error!( + target: LOG_TARGET, + "❌State root mismatch for {shard}. Expected {expected} but got {actual}", + expected = checkpoint.get_shard_root(shard), + actual = state_root, + ); + last_error = Some(CommsRpcConsensusSyncError::StateRootMismatch { + expected: *checkpoint.block().merkle_root(), + actual: state_root, + }); + // TODO: rollback state + if remaining_members == 0 { + return Err(last_error.unwrap()); + } + + continue; + } + + info!(target: LOG_TARGET, "🛜Synced state for {shard} to v{} with root {state_root}", current_version.unwrap_or(0)); }, Err(err) => { warn!( target: LOG_TARGET, "⚠️Failed to sync state from {addr}: {err}. Attempting another peer if available" ); + + if remaining_members == 0 { + return Err(err); + } last_error = Some(err); continue; }, @@ -420,6 +488,10 @@ where TConsensusSpec: ConsensusSpec + Send + Sync + 'static } } - last_error.map(Err).unwrap_or(Ok(())) + if let Some(err) = last_error { + return Err(err); + } + + Ok(()) } } diff --git a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql index 2913003cc..78d5c5cec 100644 --- a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql +++ b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql @@ -359,22 +359,6 @@ create table state_tree_shard_versions created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ); -CREATE TABLE shard_group_state_tree -( - id integer not NULL primary key AUTOINCREMENT, - epoch bigint not NULL, - key text not NULL, - node text not NULL, - is_stale boolean not null default '0' -); - --- Scoping by shard -CREATE INDEX shard_group_state_tree_idx_shard_key on shard_group_state_tree (epoch) WHERE is_stale = false; --- Duplicate keys are not allowed -CREATE UNIQUE INDEX shard_group_state_tree_uniq_idx_key on shard_group_state_tree (epoch, key) WHERE is_stale = false; --- filtering out or by is_stale is used in every query -CREATE INDEX shard_group_state_tree_idx_is_stale on shard_group_state_tree (is_stale); - -- One entry per shard CREATE UNIQUE INDEX state_tree_uniq_shard_versions_shard on state_tree_shard_versions (shard); @@ -392,6 +376,17 @@ CREATE TABLE pending_state_tree_diffs CREATE UNIQUE INDEX pending_state_tree_diffs_uniq_idx_block_id_shard on pending_state_tree_diffs (block_id, shard); + +CREATE TABLE epoch_checkpoints +( + id integer not NULL primary key AUTOINCREMENT, + epoch bigint not NULL, + commit_block text not NULL, + qcs text not NULL, + shard_roots text not NULL, + created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP +); + -- An append-only store of state transitions CREATE TABLE state_transitions ( diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index 1b120aefb..d6d56b23d 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -3,7 +3,6 @@ use std::{ borrow::Borrow, - cmp, collections::{HashMap, HashSet}, marker::PhantomData, ops::RangeInclusive, @@ -38,6 +37,7 @@ use tari_dan_storage::{ BlockDiff, BlockId, Command, + EpochCheckpoint, ForeignProposal, ForeignProposalState, ForeignReceiveCounters, @@ -50,7 +50,7 @@ use tari_dan_storage::{ LeafBlock, LockedBlock, LockedSubstate, - PendingStateTreeDiff, + PendingShardStateTreeDiff, QcId, QuorumCertificate, StateTransition, @@ -717,19 +717,13 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor block.try_convert(qc) } - fn blocks_get_last_n_in_epoch( - &self, - n: usize, - epoch: Epoch, - shard_group: ShardGroup, - ) -> Result, StorageError> { + fn blocks_get_last_n_in_epoch(&self, n: usize, epoch: Epoch) -> Result, StorageError> { use crate::schema::{blocks, quorum_certificates}; let blocks = blocks::table .left_join(quorum_certificates::table.on(blocks::qc_id.eq(quorum_certificates::qc_id))) .select((blocks::all_columns, quorum_certificates::all_columns.nullable())) .filter(blocks::epoch.eq(epoch.as_u64() as i64)) - .filter(blocks::shard_group.eq(shard_group.encode_as_u32() as i32)) .filter(blocks::is_committed.eq(true)) .order_by(blocks::height.desc()) .limit(n as i64) @@ -1953,25 +1947,10 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor lock.try_into_substate_lock() } - fn pending_state_tree_diffs_exists_for_block(&self, block_id: &BlockId) -> Result { - use crate::schema::pending_state_tree_diffs; - - let count = pending_state_tree_diffs::table - .count() - .filter(pending_state_tree_diffs::block_id.eq(serialize_hex(block_id))) - .first::(self.connection()) - .map_err(|e| SqliteStorageError::DieselError { - operation: "pending_state_tree_diffs_exists_for_block", - source: e, - })?; - - Ok(count > 0) - } - fn pending_state_tree_diffs_get_all_up_to_commit_block( &self, block_id: &BlockId, - ) -> Result>, StorageError> { + ) -> Result>, StorageError> { use crate::schema::pending_state_tree_diffs; // Get the last committed block @@ -1995,7 +1974,7 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor let mut diffs = HashMap::new(); for diff in diff_recs { let shard = Shard::from(diff.shard as u32); - let diff = PendingStateTreeDiff::try_from(diff)?; + let diff = PendingShardStateTreeDiff::try_from(diff)?; diffs .entry(shard) .or_insert_with(Vec::new)//PendingStateTreeDiff::default) @@ -2018,25 +1997,11 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor // Never return epoch 0 state transitions let min_epoch = Some(id.epoch().as_u64()).filter(|e| *e > 0).unwrap_or(1) as i64; - let (start_id, seq) = state_transitions::table - .select((state_transitions::id, state_transitions::seq)) - .filter(state_transitions::epoch.ge(min_epoch)) - .filter(state_transitions::shard.eq(id.shard().as_u32() as i32)) - .order_by(state_transitions::seq.asc()) - .first::<(i32, i64)>(self.connection()) - .map_err(|e| SqliteStorageError::DieselError { - operation: "state_transitions_get_n_after", - source: e, - })?; - - let offset = cmp::max(id.seq() as i64, seq); - let start_id = - start_id + i32::try_from(offset).expect("(likely invalid) seq no for transition is too large for SQLite"); - let transitions = state_transitions::table .left_join(substates::table.on(state_transitions::substate_address.eq(substates::address))) .select((state_transitions::all_columns, substates::all_columns.nullable())) - .filter(state_transitions::id.ge(start_id)) + .filter(state_transitions::seq.ge(id.seq() as i64)) + .filter(state_transitions::epoch.ge(min_epoch)) .filter(state_transitions::epoch.lt(end_epoch.as_u64() as i64)) .filter(state_transitions::shard.eq(id.shard().as_u32() as i32)) .limit(n as i64) @@ -2092,7 +2057,7 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor source: e, })?; - let node = serde_json::from_str::(&node).map_err(|e| StorageError::DataInconsistency { + let node = serde_json::from_str::>(&node).map_err(|e| StorageError::DataInconsistency { details: format!("Failed to deserialize state tree node: {}", e), })?; @@ -2115,6 +2080,20 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor Ok(version.map(|v| v as Version)) } + + fn epoch_checkpoint_get(&self, epoch: Epoch) -> Result { + use crate::schema::epoch_checkpoints; + + let checkpoint = epoch_checkpoints::table + .filter(epoch_checkpoints::epoch.eq(epoch.as_u64() as i64)) + .first::(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "epoch_checkpoint_get", + source: e, + })?; + + checkpoint.try_into() + } } #[derive(QueryableByName)] diff --git a/dan_layer/state_store_sqlite/src/schema.rs b/dan_layer/state_store_sqlite/src/schema.rs index c69b9ff95..b75542ec2 100644 --- a/dan_layer/state_store_sqlite/src/schema.rs +++ b/dan_layer/state_store_sqlite/src/schema.rs @@ -42,6 +42,17 @@ diesel::table! { } } +diesel::table! { + epoch_checkpoints (id) { + id -> Integer, + epoch -> BigInt, + commit_block -> Text, + qcs -> Text, + shard_roots -> Text, + created_at -> Timestamp, + } +} + diesel::table! { foreign_proposals (id) { id -> Integer, @@ -382,6 +393,7 @@ diesel::table! { diesel::allow_tables_to_appear_in_same_query!( block_diffs, blocks, + epoch_checkpoints, foreign_proposals, foreign_receive_counters, foreign_send_counters, diff --git a/dan_layer/state_store_sqlite/src/sql_models/epoch_checkpoint.rs b/dan_layer/state_store_sqlite/src/sql_models/epoch_checkpoint.rs new file mode 100644 index 000000000..3600896ec --- /dev/null +++ b/dan_layer/state_store_sqlite/src/sql_models/epoch_checkpoint.rs @@ -0,0 +1,30 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use diesel::Queryable; +use tari_dan_storage::{consensus_models, StorageError}; +use time::PrimitiveDateTime; + +use crate::serialization::deserialize_json; + +#[derive(Debug, Clone, Queryable)] +pub struct EpochCheckpoint { + pub id: i32, + pub epoch: i64, + pub commit_block: String, + pub qcs: String, + pub shard_roots: String, + pub created_at: PrimitiveDateTime, +} + +impl TryFrom for consensus_models::EpochCheckpoint { + type Error = StorageError; + + fn try_from(value: EpochCheckpoint) -> Result { + let commit_block = deserialize_json(&value.commit_block)?; + let qcs = deserialize_json(&value.qcs)?; + let shard_roots = deserialize_json(&value.shard_roots)?; + + Ok(Self::new(commit_block, qcs, shard_roots)) + } +} diff --git a/dan_layer/state_store_sqlite/src/sql_models/mod.rs b/dan_layer/state_store_sqlite/src/sql_models/mod.rs index 77778e603..8546516af 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/mod.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/mod.rs @@ -4,6 +4,7 @@ mod block; mod block_diff; mod bookkeeping; +mod epoch_checkpoint; mod leaf_block; mod pending_state_tree_diff; mod quorum_certificate; @@ -18,6 +19,7 @@ mod vote; pub use block::*; pub use block_diff::*; pub use bookkeeping::*; +pub use epoch_checkpoint::*; pub use leaf_block::*; pub use pending_state_tree_diff::*; pub use quorum_certificate::*; diff --git a/dan_layer/state_store_sqlite/src/sql_models/pending_state_tree_diff.rs b/dan_layer/state_store_sqlite/src/sql_models/pending_state_tree_diff.rs index ab5afafa0..420475c51 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/pending_state_tree_diff.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/pending_state_tree_diff.rs @@ -18,7 +18,7 @@ pub struct PendingStateTreeDiff { pub created_at: PrimitiveDateTime, } -impl TryFrom for consensus_models::PendingStateTreeDiff { +impl TryFrom for consensus_models::PendingShardStateTreeDiff { type Error = StorageError; fn try_from(value: PendingStateTreeDiff) -> Result { diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index 5ba56000c..349e9074d 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -23,6 +23,7 @@ use tari_dan_storage::{ BlockDiff, BlockId, Decision, + EpochCheckpoint, Evidence, ForeignProposal, ForeignReceiveCounters, @@ -35,7 +36,7 @@ use tari_dan_storage::{ LeafBlock, LockedBlock, LockedSubstate, - PendingStateTreeDiff, + PendingShardStateTreeDiff, QcId, QuorumCertificate, SubstateRecord, @@ -1454,7 +1455,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta fn pending_state_tree_diffs_remove_by_block( &mut self, block_id: &BlockId, - ) -> Result>, StorageError> { + ) -> Result>, StorageError> { use crate::schema::pending_state_tree_diffs; let diff_recs = pending_state_tree_diffs::table @@ -1477,7 +1478,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta let mut diffs = IndexMap::new(); for diff in diff_recs { let shard = Shard::from(diff.shard as u32); - let diff = PendingStateTreeDiff::try_from(diff)?; + let diff = PendingShardStateTreeDiff::try_from(diff)?; diffs.entry(shard).or_insert_with(Vec::new).push(diff); } @@ -1531,21 +1532,19 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta diesel::insert_into(state_tree::table) .values(&values) .execute(self.connection()) - .map_err(|e| { - SqliteStorageError::DbInconsistency { - operation: "state_tree_nodes_insert", - details: format!("Failed to insert node for key: {shard} - {key} {e}"), - } - // SqliteStorageError::DieselError { - // operation: "state_tree_nodes_insert", - // source: e, - // } + .map_err(|e| SqliteStorageError::DieselError { + operation: "state_tree_nodes_insert", + source: e, })?; Ok(()) } - fn state_tree_nodes_mark_stale_tree_node(&mut self, shard: Shard, node: StaleTreeNode) -> Result<(), StorageError> { + fn state_tree_nodes_record_stale_tree_node( + &mut self, + shard: Shard, + node: StaleTreeNode, + ) -> Result<(), StorageError> { use crate::schema::state_tree; let key = node.as_node_key(); @@ -1590,6 +1589,27 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta Ok(()) } + + fn epoch_checkpoint_save(&mut self, checkpoint: &EpochCheckpoint) -> Result<(), StorageError> { + use crate::schema::epoch_checkpoints; + + let values = ( + epoch_checkpoints::epoch.eq(checkpoint.block().epoch().as_u64() as i64), + epoch_checkpoints::commit_block.eq(serialize_json(checkpoint.block())?), + epoch_checkpoints::qcs.eq(serialize_json(checkpoint.qcs())?), + epoch_checkpoints::shard_roots.eq(serialize_json(checkpoint.shard_roots())?), + ); + + diesel::insert_into(epoch_checkpoints::table) + .values(values) + .execute(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "epoch_checkpoint_save", + source: e, + })?; + + Ok(()) + } } impl<'a, TAddr> Deref for SqliteStateStoreWriteTransaction<'a, TAddr> { diff --git a/dan_layer/state_store_sqlite/tests/tests.rs b/dan_layer/state_store_sqlite/tests/tests.rs index e970ed2de..a559b0392 100644 --- a/dan_layer/state_store_sqlite/tests/tests.rs +++ b/dan_layer/state_store_sqlite/tests/tests.rs @@ -47,7 +47,7 @@ mod confirm_all_transitions { let atom3 = create_tx_atom(); let network = Default::default(); - let zero_block = Block::zero_block(network, NumPreshards::SixtyFour); + let zero_block = Block::zero_block(network, NumPreshards::P64); zero_block.insert(&mut tx).unwrap(); let block1 = Block::new( network, @@ -55,7 +55,7 @@ mod confirm_all_transitions { zero_block.justify().clone(), NodeHeight(1), Epoch(0), - ShardGroup::all_shards(NumPreshards::SixtyFour), + ShardGroup::all_shards(NumPreshards::P64), Default::default(), // Need to have a command in, otherwise this block will not be included internally in the query because it // cannot cause a state change without any commands diff --git a/dan_layer/state_tree/src/error.rs b/dan_layer/state_tree/src/error.rs index 0744fc9c0..e911e256c 100644 --- a/dan_layer/state_tree/src/error.rs +++ b/dan_layer/state_tree/src/error.rs @@ -1,10 +1,21 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause +use tari_dan_common_types::optional::IsNotFoundError; + use crate::jellyfish::JmtStorageError; #[derive(Debug, thiserror::Error)] pub enum StateTreeError { - #[error("Storage error: {0}")] - StorageError(#[from] JmtStorageError), + #[error("JMT Storage error: {0}")] + JmtStorageError(#[from] JmtStorageError), +} + +impl IsNotFoundError for StateTreeError { + fn is_not_found_error(&self) -> bool { + #[allow(clippy::single_match)] + match self { + StateTreeError::JmtStorageError(err) => err.is_not_found_error(), + } + } } diff --git a/dan_layer/state_tree/src/jellyfish/store.rs b/dan_layer/state_tree/src/jellyfish/store.rs index 65d1c78f3..7fa23663c 100644 --- a/dan_layer/state_tree/src/jellyfish/store.rs +++ b/dan_layer/state_tree/src/jellyfish/store.rs @@ -3,10 +3,7 @@ use serde::{Deserialize, Serialize}; -use crate::{ - jellyfish::{JmtStorageError, Node, NodeKey}, - Version, -}; +use crate::jellyfish::{JmtStorageError, Node, NodeKey}; /// Implementers are able to read nodes from a tree store. pub trait TreeStoreReader

{ @@ -52,26 +49,26 @@ impl StaleTreeNode { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub enum TreeNode { - V1(Node), +pub enum TreeNode

{ + V1(Node

), } -impl TreeNode { - pub fn new_latest(node: Node) -> Self { +impl

TreeNode

{ + pub fn new_latest(node: Node

) -> Self { Self::new_v1(node) } - pub fn new_v1(node: Node) -> Self { + pub fn new_v1(node: Node

) -> Self { Self::V1(node) } - pub fn as_node(&self) -> &Node { + pub fn as_node(&self) -> &Node

{ match self { Self::V1(node) => node, } } - pub fn into_node(self) -> Node { + pub fn into_node(self) -> Node

{ match self { Self::V1(node) => node, } diff --git a/dan_layer/state_tree/src/jellyfish/tree.rs b/dan_layer/state_tree/src/jellyfish/tree.rs index 7448afa70..70ee10c45 100644 --- a/dan_layer/state_tree/src/jellyfish/tree.rs +++ b/dan_layer/state_tree/src/jellyfish/tree.rs @@ -86,8 +86,6 @@ use std::{ marker::PhantomData, }; -use tari_dan_common_types::optional::Optional; - use super::{ store::TreeStoreReader, types::{ @@ -187,19 +185,15 @@ impl<'a, R: 'a + TreeStoreReader

, P: Clone> JellyfishMerkleTree<'a, R, P> { /// the returned batch, the state `S_{i+1}` is ready to be read from the tree by calling /// [`get_with_proof`](struct.JellyfishMerkleTree.html#method.get_with_proof). Anything inside /// the batch is not reachable from public interfaces before being committed. - pub fn batch_put_value_set( + pub fn batch_put_value_set)>>( &self, - value_set: Vec<(LeafKey, Option<(Hash, P)>)>, + value_set: I, node_hashes: Option<&HashMap>, persisted_version: Option, version: Version, ) -> Result<(Hash, TreeUpdateBatch

), JmtStorageError> { - let deduped_and_sorted_kvs = value_set - .iter() - .map(|(k, v)| (k, v.as_ref())) - .collect::>() - .into_iter() - .collect::>(); + let value_set = value_set.into_iter().collect::>(); + let deduped_and_sorted_kvs = value_set.iter().map(|(k, v)| (k, v.as_ref())).collect::>(); let mut batch = TreeUpdateBatch::new(); let root_node_opt = if let Some(persisted_version) = persisted_version { @@ -609,11 +603,7 @@ impl<'a, R: 'a + TreeStoreReader

, P: Clone> JellyfishMerkleTree<'a, R, P> { } pub fn get_root_hash(&self, version: Version) -> Result { - Ok(self - .get_root_node(version) - .map(|n| n.hash()) - .optional()? - .unwrap_or(SPARSE_MERKLE_PLACEHOLDER_HASH)) + self.get_root_node(version).map(|n| n.hash()) } pub fn get_leaf_count(&self, version: Version) -> Result { diff --git a/dan_layer/state_tree/src/jellyfish/types.rs b/dan_layer/state_tree/src/jellyfish/types.rs index b54ee3009..6e6f2a418 100644 --- a/dan_layer/state_tree/src/jellyfish/types.rs +++ b/dan_layer/state_tree/src/jellyfish/types.rs @@ -702,17 +702,18 @@ pub struct LeafKey { /// becomes unspecified. /// All leaf keys must be evenly distributed across their space - otherwise the tree's /// performance degrades. + /// TARI: always a hash, so replaced heap-allocated Vec with a Hash #[serde(with = "serde_with::hex")] - pub bytes: Vec, + pub bytes: Hash, } impl LeafKey { - pub fn new(bytes: Vec) -> Self { + pub fn new(bytes: Hash) -> Self { Self { bytes } } pub fn as_ref(&self) -> LeafKeyRef<'_> { - LeafKeyRef::new(&self.bytes) + LeafKeyRef::new(self.bytes.as_slice()) } } @@ -736,7 +737,7 @@ impl<'a> LeafKeyRef<'a> { impl PartialEq for LeafKeyRef<'_> { fn eq(&self, other: &LeafKey) -> bool { - self.bytes == other.bytes + self.bytes == other.bytes.as_slice() } } diff --git a/dan_layer/state_tree/src/key_mapper.rs b/dan_layer/state_tree/src/key_mapper.rs index 60ac6b0b0..88f574d75 100644 --- a/dan_layer/state_tree/src/key_mapper.rs +++ b/dan_layer/state_tree/src/key_mapper.rs @@ -3,17 +3,25 @@ use tari_engine_types::substate::SubstateId; -use crate::jellyfish::LeafKey; +use crate::{jellyfish::LeafKey, Hash}; -pub trait DbKeyMapper { - fn map_to_leaf_key(id: &SubstateId) -> LeafKey; +pub trait DbKeyMapper { + fn map_to_leaf_key(id: &T) -> LeafKey; } pub struct SpreadPrefixKeyMapper; -impl DbKeyMapper for SpreadPrefixKeyMapper { +impl DbKeyMapper for SpreadPrefixKeyMapper { fn map_to_leaf_key(id: &SubstateId) -> LeafKey { let hash = crate::jellyfish::jmt_node_hash(id); - LeafKey::new(hash.to_vec()) + LeafKey::new(hash) + } +} + +pub struct HashIdentityKeyMapper; + +impl DbKeyMapper for HashIdentityKeyMapper { + fn map_to_leaf_key(hash: &Hash) -> LeafKey { + LeafKey::new(*hash) } } diff --git a/dan_layer/state_tree/src/memory_store.rs b/dan_layer/state_tree/src/memory_store.rs index 2948cbb5e..367cd3a08 100644 --- a/dan_layer/state_tree/src/memory_store.rs +++ b/dan_layer/state_tree/src/memory_store.rs @@ -1,26 +1,17 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{collections::HashMap, fmt}; +use std::{collections::HashMap, fmt, fmt::Debug}; -use crate::jellyfish::{ - JmtStorageError, - Node, - NodeKey, - StaleTreeNode, - TreeNode, - TreeStoreReader, - TreeStoreWriter, - Version, -}; +use crate::jellyfish::{JmtStorageError, Node, NodeKey, StaleTreeNode, TreeNode, TreeStoreReader, TreeStoreWriter}; #[derive(Debug, Default)] -pub struct MemoryTreeStore { - pub nodes: HashMap, +pub struct MemoryTreeStore

{ + pub nodes: HashMap>, pub stale_nodes: Vec, } -impl MemoryTreeStore { +impl

MemoryTreeStore

{ pub fn new() -> Self { Self { nodes: HashMap::new(), @@ -35,8 +26,8 @@ impl MemoryTreeStore { } } -impl TreeStoreReader for MemoryTreeStore { - fn get_node(&self, key: &NodeKey) -> Result, JmtStorageError> { +impl TreeStoreReader

for MemoryTreeStore

{ + fn get_node(&self, key: &NodeKey) -> Result, JmtStorageError> { self.nodes .get(key) .map(|node| node.clone().into_node()) @@ -44,8 +35,8 @@ impl TreeStoreReader for MemoryTreeStore { } } -impl TreeStoreWriter for MemoryTreeStore { - fn insert_node(&mut self, key: NodeKey, node: Node) -> Result<(), JmtStorageError> { +impl

TreeStoreWriter

for MemoryTreeStore

{ + fn insert_node(&mut self, key: NodeKey, node: Node

) -> Result<(), JmtStorageError> { let node = TreeNode::new_latest(node); self.nodes.insert(key, node); Ok(()) @@ -57,7 +48,7 @@ impl TreeStoreWriter for MemoryTreeStore { } } -impl fmt::Display for MemoryTreeStore { +impl fmt::Display for MemoryTreeStore

{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "MemoryTreeStore")?; writeln!(f, " Nodes:")?; diff --git a/dan_layer/state_tree/src/staged_store.rs b/dan_layer/state_tree/src/staged_store.rs index 084bce300..fd7e91475 100644 --- a/dan_layer/state_tree/src/staged_store.rs +++ b/dan_layer/state_tree/src/staged_store.rs @@ -5,27 +5,18 @@ use std::collections::{HashMap, VecDeque}; use log::debug; -use crate::{ - JmtStorageError, - Node, - NodeKey, - StaleTreeNode, - StateHashTreeDiff, - TreeStoreReader, - TreeStoreWriter, - Version, -}; +use crate::{JmtStorageError, Node, NodeKey, StaleTreeNode, StateHashTreeDiff, TreeStoreReader, TreeStoreWriter}; const LOG_TARGET: &str = "tari::dan::consensus::sharded_state_tree"; -pub struct StagedTreeStore<'s, S> { +pub struct StagedTreeStore<'s, S, P> { readable_store: &'s S, - preceding_pending_state: HashMap>, - new_tree_nodes: HashMap>, + preceding_pending_state: HashMap>, + new_tree_nodes: HashMap>, new_stale_nodes: Vec, } -impl<'s, S: TreeStoreReader> StagedTreeStore<'s, S> { +impl<'s, S: TreeStoreReader

, P> StagedTreeStore<'s, S, P> { pub fn new(readable_store: &'s S) -> Self { Self { readable_store, @@ -35,7 +26,7 @@ impl<'s, S: TreeStoreReader> StagedTreeStore<'s, S> { } } - pub fn apply_pending_diff(&mut self, diff: StateHashTreeDiff) { + pub fn apply_pending_diff(&mut self, diff: StateHashTreeDiff

) { self.preceding_pending_state.reserve(diff.new_nodes.len()); for (key, node) in diff.new_nodes { debug!(target: LOG_TARGET, "PENDING INSERT: node {}", key); @@ -50,7 +41,7 @@ impl<'s, S: TreeStoreReader> StagedTreeStore<'s, S> { } } - pub fn into_diff(self) -> StateHashTreeDiff { + pub fn into_diff(self) -> StateHashTreeDiff

{ StateHashTreeDiff { new_nodes: self.new_tree_nodes.into_iter().collect(), stale_tree_nodes: self.new_stale_nodes, @@ -58,8 +49,8 @@ impl<'s, S: TreeStoreReader> StagedTreeStore<'s, S> { } } -impl<'s, S: TreeStoreReader> TreeStoreReader for StagedTreeStore<'s, S> { - fn get_node(&self, key: &NodeKey) -> Result, JmtStorageError> { +impl<'s, S: TreeStoreReader

, P: Clone> TreeStoreReader

for StagedTreeStore<'s, S, P> { + fn get_node(&self, key: &NodeKey) -> Result, JmtStorageError> { if let Some(node) = self.new_tree_nodes.get(key).cloned() { return Ok(node); } @@ -71,8 +62,8 @@ impl<'s, S: TreeStoreReader> TreeStoreReader for StagedTreeSto } } -impl<'s, S> TreeStoreWriter for StagedTreeStore<'s, S> { - fn insert_node(&mut self, key: NodeKey, node: Node) -> Result<(), JmtStorageError> { +impl<'s, S, P> TreeStoreWriter

for StagedTreeStore<'s, S, P> { + fn insert_node(&mut self, key: NodeKey, node: Node

) -> Result<(), JmtStorageError> { if self.new_tree_nodes.insert(key.clone(), node).is_some() { return Err(JmtStorageError::Conflict(key)); } diff --git a/dan_layer/state_tree/src/tree.rs b/dan_layer/state_tree/src/tree.rs index fa58817a2..3c89df7fc 100644 --- a/dan_layer/state_tree/src/tree.rs +++ b/dan_layer/state_tree/src/tree.rs @@ -9,7 +9,7 @@ use tari_engine_types::substate::SubstateId; use crate::{ error::StateTreeError, jellyfish::{Hash, JellyfishMerkleTree, SparseMerkleProofExt, TreeStore, Version}, - key_mapper::{DbKeyMapper, SpreadPrefixKeyMapper}, + key_mapper::{DbKeyMapper, HashIdentityKeyMapper, SpreadPrefixKeyMapper}, Node, NodeKey, ProofValue, @@ -19,6 +19,7 @@ use crate::{ }; pub type SpreadPrefixStateTree<'a, S> = StateTree<'a, S, SpreadPrefixKeyMapper>; +pub type RootStateTree<'a, S> = StateTree<'a, S, HashIdentityKeyMapper>; pub struct StateTree<'a, S, M> { store: &'a mut S, @@ -34,14 +35,14 @@ impl<'a, S, M> StateTree<'a, S, M> { } } -impl<'a, S: TreeStoreReader, M: DbKeyMapper> StateTree<'a, S, M> { +impl<'a, S: TreeStoreReader, M: DbKeyMapper> StateTree<'a, S, M> { pub fn get_proof( &self, version: Version, - substate_id: &SubstateId, + key: &SubstateId, ) -> Result<(Option>, SparseMerkleProofExt), StateTreeError> { let smt = JellyfishMerkleTree::new(self.store); - let key = M::map_to_leaf_key(substate_id); + let key = M::map_to_leaf_key(key); let (maybe_value, proof) = smt.get_with_proof_ext(key.as_ref(), version)?; Ok((maybe_value, proof)) } @@ -53,22 +54,31 @@ impl<'a, S: TreeStoreReader, M: DbKeyMapper> StateTree<'a, S, M> { } } -impl<'a, S: TreeStore, M: DbKeyMapper> StateTree<'a, S, M> { - /// Stores the substate changes in the state tree and returns the new root hash. - pub fn put_substate_changes>( +impl<'a, S: TreeStore, M: DbKeyMapper> StateTree<'a, S, M> { + fn calculate_substate_changes>( &mut self, current_version: Option, next_version: Version, changes: I, - ) -> Result { + ) -> Result<(Hash, StateHashTreeDiff), StateTreeError> { let (root_hash, update_batch) = calculate_substate_changes::<_, M, _>(self.store, current_version, next_version, changes)?; + Ok((root_hash, update_batch.into())) + } - self.commit_diff(update_batch.into())?; + /// Stores the substate changes in the state tree and returns the new root hash. + pub fn put_substate_changes>( + &mut self, + current_version: Option, + next_version: Version, + changes: I, + ) -> Result { + let (root_hash, update_batch) = self.calculate_substate_changes(current_version, next_version, changes)?; + self.commit_diff(update_batch)?; Ok(root_hash) } - pub fn commit_diff(&mut self, diff: StateHashTreeDiff) -> Result<(), StateTreeError> { + pub fn commit_diff(&mut self, diff: StateHashTreeDiff) -> Result<(), StateTreeError> { for (key, node) in diff.new_nodes { log::debug!("Inserting node: {}", key); self.store.insert_node(key, node)?; @@ -83,10 +93,38 @@ impl<'a, S: TreeStore, M: DbKeyMapper> StateTree<'a, S, M> { } } +impl<'a, S: TreeStore<()>, M: DbKeyMapper> StateTree<'a, S, M> { + pub fn put_root_hash_changes>( + &mut self, + current_version: Option, + next_version: Version, + changes: I, + ) -> Result { + let jmt = JellyfishMerkleTree::<_, ()>::new(self.store); + + let changes = changes + .into_iter() + .map(|hash| (M::map_to_leaf_key(&hash), Some((hash, ())))); + + let (root_hash, update_result) = jmt.batch_put_value_set(changes, None, current_version, next_version)?; + + for (k, node) in update_result.node_batch { + self.store.insert_node(k, node)?; + } + + for stale_tree_node in update_result.stale_node_index_batch { + self.store + .record_stale_tree_node(StaleTreeNode::Node(stale_tree_node.node_key))?; + } + + Ok(root_hash) + } +} + /// Calculates the new root hash and tree updates for the given substate changes. fn calculate_substate_changes< S: TreeStoreReader, - M: DbKeyMapper, + M: DbKeyMapper, I: IntoIterator, >( store: &mut S, @@ -96,13 +134,10 @@ fn calculate_substate_changes< ) -> Result<(Hash, TreeUpdateBatch), StateTreeError> { let jmt = JellyfishMerkleTree::new(store); - let changes = changes - .into_iter() - .map(|ch| match ch { - SubstateTreeChange::Up { id, value_hash } => (M::map_to_leaf_key(&id), Some((value_hash, next_version))), - SubstateTreeChange::Down { id } => (M::map_to_leaf_key(&id), None), - }) - .collect::>(); + let changes = changes.into_iter().map(|ch| match ch { + SubstateTreeChange::Up { id, value_hash } => (M::map_to_leaf_key(&id), Some((value_hash, next_version))), + SubstateTreeChange::Down { id } => (M::map_to_leaf_key(&id), None), + }); let (root_hash, update_result) = jmt.batch_put_value_set(changes, None, current_version, next_version)?; @@ -124,12 +159,12 @@ impl SubstateTreeChange { } #[derive(Debug, Clone, Serialize, Deserialize, Default)] -pub struct StateHashTreeDiff { - pub new_nodes: Vec<(NodeKey, Node)>, +pub struct StateHashTreeDiff

{ + pub new_nodes: Vec<(NodeKey, Node

)>, pub stale_tree_nodes: Vec, } -impl StateHashTreeDiff { +impl

StateHashTreeDiff

{ pub fn new() -> Self { Self { new_nodes: Vec::new(), @@ -138,8 +173,8 @@ impl StateHashTreeDiff { } } -impl From> for StateHashTreeDiff { - fn from(batch: TreeUpdateBatch) -> Self { +impl

From> for StateHashTreeDiff

{ + fn from(batch: TreeUpdateBatch

) -> Self { Self { new_nodes: batch.node_batch, stale_tree_nodes: batch diff --git a/dan_layer/state_tree/tests/support.rs b/dan_layer/state_tree/tests/support.rs index 7dcf31c83..88adf30d3 100644 --- a/dan_layer/state_tree/tests/support.rs +++ b/dan_layer/state_tree/tests/support.rs @@ -60,33 +60,32 @@ impl> HashTreeTester { fn apply_database_updates(&mut self, changes: impl IntoIterator) -> Hash { let next_version = self.current_version.unwrap_or(0) + 1; let current_version = self.current_version.replace(next_version); - StateTree::<_, IdentityMapper>::new(&mut self.tree_store) - .put_substate_changes(current_version, next_version, changes) - .unwrap() + self.put_changes_at_version(current_version, next_version, changes) } - pub fn put_changes_at_version(&mut self, changes: impl IntoIterator) -> Hash { - let next_version = self - .current_version - .expect("call put_changes_at_version with None version"); - let current_version = self.current_version.unwrap(); - StateTree::<_, IdentityMapper>::new(&mut self.tree_store) - .put_substate_changes(Some(current_version), next_version, changes) + pub fn put_changes_at_version( + &mut self, + current_version: Option, + next_version: Version, + changes: impl IntoIterator, + ) -> Hash { + StateTree::<_, TestMapper>::new(&mut self.tree_store) + .put_substate_changes(current_version, next_version, changes) .unwrap() } } -impl HashTreeTester { +impl HashTreeTester> { pub fn new_empty() -> Self { Self::new(MemoryTreeStore::new(), None) } } -pub struct IdentityMapper; +pub struct TestMapper; -impl DbKeyMapper for IdentityMapper { +impl DbKeyMapper for TestMapper { fn map_to_leaf_key(id: &SubstateId) -> LeafKey { - LeafKey::new(test_hasher32().chain(&id).result().to_vec()) + LeafKey::new(test_hasher32().chain(&id).result().into_array().into()) } } diff --git a/dan_layer/state_tree/tests/test.rs b/dan_layer/state_tree/tests/test.rs index 7fb093d3f..b48685064 100644 --- a/dan_layer/state_tree/tests/test.rs +++ b/dan_layer/state_tree/tests/test.rs @@ -95,12 +95,23 @@ fn hash_computed_consistently_after_adding_higher_tier_sibling() { #[test] fn hash_allows_putting_in_same_version() { let mut tester_1 = HashTreeTester::new_empty(); - tester_1.put_substate_changes(vec![change(1, Some(30))]); - tester_1.put_substate_changes(vec![change(2, Some(31))]); - // Append another change to the same version - let hash_1 = tester_1.put_changes_at_version(vec![change(3, Some(32))]); + tester_1.put_changes_at_version(None, 1, vec![change(1, Some(30))]); + tester_1.put_changes_at_version(Some(1), 1, vec![change(2, Some(31))]); + tester_1.put_changes_at_version(Some(1), 1, vec![change(3, Some(32))]); + tester_1.put_changes_at_version(Some(1), 1, vec![change(4, Some(33))]); + tester_1.put_changes_at_version(Some(1), 1, vec![change(5, Some(34))]); + let hash_1 = tester_1.put_changes_at_version(Some(1), 1, vec![change(6, Some(35))]); let mut tester_2 = HashTreeTester::new_empty(); - let hash_2 = tester_2.put_substate_changes(vec![change(1, Some(30)), change(2, Some(31)), change(3, Some(32))]); + tester_2.put_changes_at_version(None, 1, vec![ + change(1, Some(30)), + change(2, Some(31)), + change(3, Some(32)), + ]); + let hash_2 = tester_2.put_changes_at_version(Some(1), 2, vec![ + change(4, Some(33)), + change(5, Some(34)), + change(6, Some(35)), + ]); assert_eq!(hash_1, hash_2); } diff --git a/dan_layer/storage/src/consensus_models/block.rs b/dan_layer/storage/src/consensus_models/block.rs index d3d5d1cff..d00ba3539 100644 --- a/dan_layer/storage/src/consensus_models/block.rs +++ b/dan_layer/storage/src/consensus_models/block.rs @@ -534,6 +534,14 @@ impl Block { ) } + pub fn get_last_n_in_epoch( + tx: &TTx, + n: usize, + epoch: Epoch, + ) -> Result, StorageError> { + tx.blocks_get_last_n_in_epoch(n, epoch) + } + pub fn exists(&self, tx: &TTx) -> Result { Self::record_exists(tx, self.id()) } diff --git a/dan_layer/storage/src/consensus_models/epoch_checkpoint.rs b/dan_layer/storage/src/consensus_models/epoch_checkpoint.rs index 6b444fe7b..ef886ebc5 100644 --- a/dan_layer/storage/src/consensus_models/epoch_checkpoint.rs +++ b/dan_layer/storage/src/consensus_models/epoch_checkpoint.rs @@ -3,55 +3,60 @@ use std::fmt::Display; -use tari_dan_common_types::{Epoch, ShardGroup}; +use indexmap::IndexMap; +use tari_dan_common_types::{shard::Shard, Epoch}; +use tari_state_tree::{Hash, SPARSE_MERKLE_PLACEHOLDER_HASH}; use crate::{ consensus_models::{Block, QuorumCertificate}, StateStoreReadTransaction, + StateStoreWriteTransaction, StorageError, }; #[derive(Debug, Clone)] pub struct EpochCheckpoint { block: Block, - qcs: Vec, + linked_qcs: Vec, + shard_roots: IndexMap, } impl EpochCheckpoint { - pub fn new(block: Block, qcs: Vec) -> Self { - Self { block, qcs } + pub fn new(block: Block, linked_qcs: Vec, shard_roots: IndexMap) -> Self { + Self { + block, + linked_qcs, + shard_roots, + } } pub fn qcs(&self) -> &[QuorumCertificate] { - &self.qcs + &self.linked_qcs } pub fn block(&self) -> &Block { &self.block } + + pub fn shard_roots(&self) -> &IndexMap { + &self.shard_roots + } + + pub fn get_shard_root(&self, shard: Shard) -> Hash { + self.shard_roots + .get(&shard) + .copied() + .unwrap_or(SPARSE_MERKLE_PLACEHOLDER_HASH) + } } impl EpochCheckpoint { - pub fn generate( - tx: &TTx, - epoch: Epoch, - shard_group: ShardGroup, - ) -> Result { - let mut blocks = tx.blocks_get_last_n_in_epoch(3, epoch, shard_group)?; - if blocks.is_empty() { - return Err(StorageError::NotFound { - item: format!("EpochCheckpoint: No blocks found for epoch {epoch}, shard group {shard_group}"), - key: epoch.to_string(), - }); - } - - let commit_block = blocks.pop().unwrap(); - let qcs = blocks.into_iter().map(|b| b.into_justify()).collect(); + pub fn get(tx: &TTx, epoch: Epoch) -> Result { + tx.epoch_checkpoint_get(epoch) + } - Ok(Self { - block: commit_block, - qcs, - }) + pub fn save(&self, tx: &mut TTx) -> Result<(), StorageError> { + tx.epoch_checkpoint_save(self) } } diff --git a/dan_layer/storage/src/consensus_models/state_tree_diff.rs b/dan_layer/storage/src/consensus_models/state_tree_diff.rs index 59d55dc84..47effa39b 100644 --- a/dan_layer/storage/src/consensus_models/state_tree_diff.rs +++ b/dan_layer/storage/src/consensus_models/state_tree_diff.rs @@ -13,18 +13,18 @@ use tari_state_tree::{StateHashTreeDiff, Version}; use crate::{consensus_models::BlockId, StateStoreReadTransaction, StateStoreWriteTransaction, StorageError}; #[derive(Debug, Clone, Default)] -pub struct PendingStateTreeDiff { +pub struct PendingShardStateTreeDiff { pub version: Version, - pub diff: StateHashTreeDiff, + pub diff: StateHashTreeDiff, } -impl PendingStateTreeDiff { - pub fn load(version: Version, diff: StateHashTreeDiff) -> Self { +impl PendingShardStateTreeDiff { + pub fn load(version: Version, diff: StateHashTreeDiff) -> Self { Self { version, diff } } } -impl PendingStateTreeDiff { +impl PendingShardStateTreeDiff { /// Returns all pending state tree diffs from the last committed block (exclusive) to the given block (inclusive). pub fn get_all_up_to_commit_block( tx: &TTx, @@ -54,24 +54,18 @@ impl PendingStateTreeDiff { TTx: Deref + StateStoreWriteTransaction, TTx::Target: StateStoreReadTransaction, { - // if tx.pending_state_tree_diffs_exists_for_block(&block_id)? { - // Ok(false) - // } else { - tx.pending_state_tree_diffs_insert(block_id, shard, diff)?; - // Ok(true) - // } - Ok(()) + tx.pending_state_tree_diffs_insert(block_id, shard, diff) } } #[derive(Debug, Clone)] pub struct VersionedStateHashTreeDiff { pub version: Version, - pub diff: StateHashTreeDiff, + pub diff: StateHashTreeDiff, } impl VersionedStateHashTreeDiff { - pub fn new(version: Version, diff: StateHashTreeDiff) -> Self { + pub fn new(version: Version, diff: StateHashTreeDiff) -> Self { Self { version, diff } } } diff --git a/dan_layer/storage/src/state_store/mod.rs b/dan_layer/storage/src/state_store/mod.rs index 14a848002..6bf381218 100644 --- a/dan_layer/storage/src/state_store/mod.rs +++ b/dan_layer/storage/src/state_store/mod.rs @@ -23,6 +23,7 @@ use crate::{ BlockDiff, BlockId, Decision, + EpochCheckpoint, Evidence, ForeignProposal, ForeignReceiveCounters, @@ -35,7 +36,7 @@ use crate::{ LeafBlock, LockedBlock, LockedSubstate, - PendingStateTreeDiff, + PendingShardStateTreeDiff, QcId, QuorumCertificate, StateTransition, @@ -136,12 +137,7 @@ pub trait StateStoreReadTransaction: Sized { from_block_id: &BlockId, ) -> Result; fn blocks_get(&self, block_id: &BlockId) -> Result; - fn blocks_get_last_n_in_epoch( - &self, - n: usize, - epoch: Epoch, - shard_group: ShardGroup, - ) -> Result, StorageError>; + fn blocks_get_last_n_in_epoch(&self, n: usize, epoch: Epoch) -> Result, StorageError>; /// Returns all blocks from and excluding the start block (lower height) to the end block (inclusive) fn blocks_get_all_between( &self, @@ -277,11 +273,10 @@ pub trait StateStoreReadTransaction: Sized { fn substate_locks_get_latest_for_substate(&self, substate_id: &SubstateId) -> Result; - fn pending_state_tree_diffs_exists_for_block(&self, block_id: &BlockId) -> Result; fn pending_state_tree_diffs_get_all_up_to_commit_block( &self, block_id: &BlockId, - ) -> Result>, StorageError>; + ) -> Result>, StorageError>; fn state_transitions_get_n_after( &self, @@ -294,6 +289,9 @@ pub trait StateStoreReadTransaction: Sized { fn state_tree_nodes_get(&self, shard: Shard, key: &NodeKey) -> Result, StorageError>; fn state_tree_versions_get_latest(&self, shard: Shard) -> Result, StorageError>; + + // -------------------------------- Epoch checkpoint -------------------------------- // + fn epoch_checkpoint_get(&self, epoch: Epoch) -> Result; } pub trait StateStoreWriteTransaction { @@ -439,13 +437,20 @@ pub trait StateStoreWriteTransaction { fn pending_state_tree_diffs_remove_by_block( &mut self, block_id: &BlockId, - ) -> Result>, StorageError>; + ) -> Result>, StorageError>; //---------------------------------- State tree --------------------------------------------// fn state_tree_nodes_insert(&mut self, shard: Shard, key: NodeKey, node: Node) -> Result<(), StorageError>; - fn state_tree_nodes_mark_stale_tree_node(&mut self, shard: Shard, node: StaleTreeNode) -> Result<(), StorageError>; + fn state_tree_nodes_record_stale_tree_node( + &mut self, + shard: Shard, + node: StaleTreeNode, + ) -> Result<(), StorageError>; fn state_tree_shard_versions_set(&mut self, shard: Shard, version: Version) -> Result<(), StorageError>; + + // -------------------------------- Epoch checkpoint -------------------------------- // + fn epoch_checkpoint_save(&mut self, checkpoint: &EpochCheckpoint) -> Result<(), StorageError>; } #[derive(Debug, Clone, Copy, Serialize, Deserialize)]