Skip to content

Commit

Permalink
calculate per shard merkle root on sync
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Jul 31, 2024
1 parent f0696f0 commit 5c88b46
Show file tree
Hide file tree
Showing 27 changed files with 265 additions and 136 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 7 additions & 9 deletions applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::convert::{TryFrom, TryInto};

use log::*;
use tari_bor::{decode_exact, encode};
use tari_dan_common_types::{optional::Optional, shard::Shard, Epoch, PeerAddress, ShardGroup, SubstateAddress};
use tari_dan_common_types::{optional::Optional, shard::Shard, Epoch, PeerAddress, SubstateAddress};
use tari_dan_p2p::{
proto,
proto::rpc::{
Expand Down Expand Up @@ -371,21 +371,19 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl {

let (sender, receiver) = mpsc::channel(10);

let last_state_transition_for_chain =
StateTransitionId::new(Epoch(req.start_epoch), Shard::from(req.start_shard), req.start_seq);
let start_epoch = Epoch(req.start_epoch);
let start_shard = Shard::from(req.start_shard);
let last_state_transition_for_chain = StateTransitionId::new(start_epoch, start_shard, req.start_seq);

// TODO: validate that we can provide the required sync data
let current_shard = ShardGroup::decode_from_u32(req.current_shard_group)
.ok_or_else(|| RpcStatus::bad_request("Invalid shard group"))?;
let current_epoch = Epoch(req.current_epoch);
info!(target: LOG_TARGET, "🌍peer initiated sync with this node ({current_epoch}, {current_shard})");
let end_epoch = Epoch(req.current_epoch);
info!(target: LOG_TARGET, "🌍peer initiated sync with this node ({}, {}, seq={}) to {}", start_epoch, start_shard, req.start_seq, end_epoch);

task::spawn(
StateSyncTask::new(
self.shard_state_store.clone(),
sender,
last_state_transition_for_chain,
current_epoch,
end_epoch,
)
.run(),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

use log::*;
use tari_dan_common_types::Epoch;
use tari_dan_common_types::{optional::Optional, Epoch};
use tari_dan_p2p::proto::rpc::SyncStateResponse;
use tari_dan_storage::{
consensus_models::{StateTransition, StateTransitionId},
Expand Down Expand Up @@ -43,7 +43,7 @@ impl<TStateStore: StateStore> StateSyncTask<TStateStore> {
pub async fn run(mut self) -> Result<(), ()> {
let mut buffer = Vec::with_capacity(BATCH_SIZE);
let mut current_state_transition_id = self.start_state_transition_id;
let mut counter = 0;
let mut counter = 0usize;
loop {
match self.fetch_next_batch(&mut buffer, current_state_transition_id) {
Ok(Some(last_state_transition_id)) => {
Expand All @@ -57,6 +57,7 @@ impl<TStateStore: StateStore> StateSyncTask<TStateStore> {
// ))))
// .await?;

info!(target: LOG_TARGET, "🌍sync complete ({}). {} update(s) sent.", current_state_transition_id, counter);
// Finished
return Ok(());
},
Expand Down Expand Up @@ -92,7 +93,9 @@ impl<TStateStore: StateStore> StateSyncTask<TStateStore> {
) -> Result<Option<StateTransitionId>, StorageError> {
self.store.with_read_tx(|tx| {
let state_transitions =
StateTransition::get_n_after(tx, BATCH_SIZE, current_state_transition_id, self.current_epoch)?;
StateTransition::get_n_after(tx, BATCH_SIZE, current_state_transition_id, self.current_epoch)
.optional()?
.unwrap_or_default();

let Some(last) = state_transitions.last() else {
return Ok(None);
Expand Down
2 changes: 1 addition & 1 deletion dan_layer/common_types/src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl PartialEq<Shard> for u32 {

impl Display for Shard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_u32())
write!(f, "Shard({})", self.as_u32())
}
}

Expand Down
2 changes: 2 additions & 0 deletions dan_layer/consensus/src/hotstuff/block_change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ impl ProposedBlockChangeSet {
self.quorum_decision = None;
self.block_diff = Vec::new();
self.transaction_changes.clear();
self.state_tree_diffs.clear();
self.substate_locks.clear();
self
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ where TConsensusSpec: ConsensusSpec

// Store used for transactions that have inputs without specific versions.
// It lives through the entire block so multiple transactions can be sequenced together in the same block
// let tree_store = ChainScopedTreeStore::new(block.epoch(), block.shard_group(), tx);
let mut substate_store = PendingSubstateStore::new(tx, *block.parent(), self.config.num_preshards);
let mut proposed_block_change_set = ProposedBlockChangeSet::new(block.as_leaf_block());

Expand Down Expand Up @@ -719,8 +718,7 @@ where TConsensusSpec: ConsensusSpec
block.total_leader_fee(),
total_leader_fee
);
// TODO: investigate
// return Ok(proposed_block_change_set.no_vote());
return Ok(proposed_block_change_set.no_vote());
}

let pending = PendingStateTreeDiff::get_all_up_to_commit_block(tx, block.justify().block_id())?;
Expand Down Expand Up @@ -883,7 +881,7 @@ where TConsensusSpec: ConsensusSpec
// NOTE: this must happen before we commit the diff because the state transitions use this version
let pending = PendingStateTreeDiff::remove_by_block(tx, block.id())?;
let mut state_tree = ShardedStateTree::new(tx);
state_tree.commit_diff(pending)?;
state_tree.commit_diffs(pending)?;
let tx = state_tree.into_transaction();

let local_diff = diff.into_filtered(local_committee_info);
Expand Down
1 change: 1 addition & 0 deletions dan_layer/consensus/src/hotstuff/substate_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ mod sharded_store;
pub use error::*;
pub use pending_store::*;
pub use sharded_state_tree::*;
pub use sharded_store::*;
30 changes: 1 addition & 29 deletions dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,35 +151,6 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor
Ok(substate.into_substate())
}

// pub fn calculate_jmt_diff_for_block(
// &mut self,
// block: &Block,
// ) -> Result<(FixedHash, StateHashTreeDiff), SubstateStoreError> {
// let current_version = block.justify().block_height().as_u64();
// let next_version = block.height().as_u64();
//
// let pending = PendingStateTreeDiff::get_all_up_to_commit_block(
// self.read_transaction(),
// block.epoch(),
// block.shard_group(),
// block.justify().block_id(),
// )?;
//
// let changes = self.diff.iter().map(|ch| match ch {
// SubstateChange::Up { id, substate, .. } => SubstateTreeChange::Up {
// id: id.substate_id.clone(),
// value_hash: hash_substate(substate.substate_value(), substate.version()),
// },
// SubstateChange::Down { id, .. } => SubstateTreeChange::Down {
// id: id.substate_id.clone(),
// },
// });
// let (state_root, state_tree_diff) =
// calculate_state_merkle_diff(&self.store, current_version, next_version, pending, changes)?;
//
// Ok((state_root, state_tree_diff))
// }

pub fn try_lock_all<I: IntoIterator<Item = VersionedSubstateIdLockIntent>>(
&mut self,
transaction_id: TransactionId,
Expand Down Expand Up @@ -311,6 +282,7 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor
// - it MUST NOT be locked as READ, WRITE or OUTPUT, unless
// - if Same-Transaction OR Local-Only-Rules:
// - it MAY be locked as WRITE or READ
// - it MUST NOT be locked as OUTPUT
SubstateLockFlag::Output => {
if !same_transaction && !has_local_only_rules {
warn!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tari_state_tree::{
JmtStorageError,
SpreadPrefixStateTree,
StagedTreeStore,
StateHashTreeDiff,
StateTreeError,
SubstateTreeChange,
TreeStoreWriter,
Expand All @@ -29,22 +30,26 @@ const LOG_TARGET: &str = "tari::dan::consensus::sharded_state_tree";
pub struct ShardedStateTree<TTx> {
tx: TTx,
pending_diffs: HashMap<Shard, Vec<PendingStateTreeDiff>>,
current_tree_diffs: IndexMap<Shard, VersionedStateHashTreeDiff>,
sharded_tree_diffs: IndexMap<Shard, VersionedStateHashTreeDiff>,
}

impl<TTx> ShardedStateTree<TTx> {
pub fn new(tx: TTx) -> Self {
Self {
tx,
pending_diffs: HashMap::new(),
current_tree_diffs: IndexMap::new(),
sharded_tree_diffs: IndexMap::new(),
}
}

pub fn with_pending_diffs(self, pending_diffs: HashMap<Shard, Vec<PendingStateTreeDiff>>) -> Self {
Self { pending_diffs, ..self }
}

pub fn transaction(&self) -> &TTx {
&self.tx
}

pub fn into_transaction(self) -> TTx {
self.tx
}
Expand All @@ -69,7 +74,7 @@ impl<TTx: StateStoreReadTransaction> ShardedStateTree<&TTx> {
}

pub fn into_versioned_tree_diffs(self) -> IndexMap<Shard, VersionedStateHashTreeDiff> {
self.current_tree_diffs
self.sharded_tree_diffs
}

pub fn put_substate_tree_changes(
Expand Down Expand Up @@ -104,7 +109,7 @@ impl<TTx: StateStoreReadTransaction> ShardedStateTree<&TTx> {
debug!(target: LOG_TARGET, "v{next_version} contains {} tree change(s) for shard {shard}", changes.len());
let state_root = state_tree.put_substate_changes(current_version, next_version, changes)?;
state_roots.update(&state_root);
self.current_tree_diffs
self.sharded_tree_diffs
.insert(shard, VersionedStateHashTreeDiff::new(next_version, store.into_diff()));
}

Expand All @@ -114,30 +119,40 @@ impl<TTx: StateStoreReadTransaction> ShardedStateTree<&TTx> {
}

impl<TTx: StateStoreWriteTransaction> ShardedStateTree<&mut TTx> {
pub fn commit_diff(&mut self, diffs: IndexMap<Shard, Vec<PendingStateTreeDiff>>) -> Result<(), StateTreeError> {
pub fn commit_diffs(&mut self, diffs: IndexMap<Shard, Vec<PendingStateTreeDiff>>) -> Result<(), StateTreeError> {
for (shard, pending_diffs) in diffs {
for pending_diff in pending_diffs {
let version = pending_diff.version;
let diff = pending_diff.diff;
let mut store = ShardScopedTreeStoreWriter::new(self.tx, shard);

for stale_tree_node in diff.stale_tree_nodes {
debug!(
"(shard={shard}) Recording stale tree node: {}",
stale_tree_node.as_node_key()
);
store.record_stale_tree_node(stale_tree_node)?;
}
self.commit_diff(shard, version, diff)?;
}
}

for (key, node) in diff.new_nodes {
debug!("(shard={shard}) Inserting node: {}", key);
store.insert_node(key, node)?;
}
Ok(())
}

store.set_version(version)?;
}
pub fn commit_diff(
&mut self,
shard: Shard,
version: Version,
diff: StateHashTreeDiff,
) -> Result<(), StateTreeError> {
let mut store = ShardScopedTreeStoreWriter::new(self.tx, shard);

for stale_tree_node in diff.stale_tree_nodes {
debug!(
"(shard={shard}) Recording stale tree node: {}",
stale_tree_node.as_node_key()
);
store.record_stale_tree_node(stale_tree_node)?;
}

for (key, node) in diff.new_nodes {
debug!("(shard={shard}) Inserting node: {}", key);
store.insert_node(key, node)?;
}

store.set_version(version)?;
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ impl<'a, TTx: StateStoreWriteTransaction> ShardScopedTreeStoreWriter<'a, TTx> {
.state_tree_shard_versions_set(self.shard, version)
.map_err(|e| tari_state_tree::JmtStorageError::UnexpectedError(e.to_string()))
}

pub fn transaction(&mut self) -> &mut TTx {
self.tx
}
}

impl<'a, TTx> TreeStoreReader<Version> for ShardScopedTreeStoreWriter<'a, TTx>
Expand Down
1 change: 0 additions & 1 deletion dan_layer/p2p/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ message SyncStateRequest {
// The shard in the current shard-epoch that is requested.
// This will limit the state transitions returned to those that fall within this shard-epoch.
uint64 current_epoch = 4;
uint32 current_shard_group = 5;
}

message SyncStateResponse {
Expand Down
1 change: 1 addition & 0 deletions dan_layer/rpc_state_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ tari_state_tree = { workspace = true }
anyhow = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
indexmap = { workspace = true }
log = { workspace = true }
thiserror = { workspace = true }
16 changes: 16 additions & 0 deletions dan_layer/rpc_state_sync/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tari_dan_storage::{
};
use tari_epoch_manager::EpochManagerError;
use tari_rpc_framework::{RpcError, RpcStatus};
use tari_state_tree::JmtStorageError;
use tari_validator_node_rpc::ValidatorNodeRpcClientError;

#[derive(Debug, thiserror::Error)]
Expand All @@ -34,12 +35,27 @@ pub enum CommsRpcConsensusSyncError {
StateTreeError(#[from] tari_state_tree::StateTreeError),
}

impl CommsRpcConsensusSyncError {
pub fn error_at_remote(self) -> Result<CommsRpcConsensusSyncError, CommsRpcConsensusSyncError> {
match &self {
CommsRpcConsensusSyncError::InvalidResponse(_) | CommsRpcConsensusSyncError::RpcError(_) => Err(self),
_ => Ok(self),
}
}
}

impl From<CommsRpcConsensusSyncError> for HotStuffError {
fn from(value: CommsRpcConsensusSyncError) -> Self {
HotStuffError::SyncError(value.into())
}
}

impl From<JmtStorageError> for CommsRpcConsensusSyncError {
fn from(value: JmtStorageError) -> Self {
Self::StateTreeError(value.into())
}
}

impl From<RpcStatus> for CommsRpcConsensusSyncError {
fn from(value: RpcStatus) -> Self {
Self::RpcError(value.into())
Expand Down
Loading

0 comments on commit 5c88b46

Please sign in to comment.