Skip to content

Commit

Permalink
calculate per shard merkle root on sync
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Aug 2, 2024
1 parent f0696f0 commit 72c696d
Show file tree
Hide file tree
Showing 28 changed files with 315 additions and 142 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use indexmap::IndexMap;
use log::info;
Expand All @@ -13,10 +13,11 @@ use tari_dan_app_utilities::transaction_executor::TransactionExecutor;
use tari_dan_common_types::{optional::Optional, Epoch};
use tari_dan_engine::state_store::{memory::MemoryStateStore, new_memory_store, AtomicDb, StateWriter};
use tari_dan_storage::{
consensus_models::{ExecutedTransaction, TransactionRecord},
consensus_models::{ExecutedTransaction, SubstateLockFlag, TransactionRecord, VersionedSubstateIdLockIntent},
StateStore,
};
use tari_engine_types::{
commit_result::{ExecuteResult, FinalizeResult, RejectReason, TransactionResult},
substate::{Substate, SubstateId},
virtual_substate::{VirtualSubstate, VirtualSubstateId, VirtualSubstates},
};
Expand Down Expand Up @@ -138,10 +139,38 @@ where
store: &PendingSubstateStore<TStateStore>,
current_epoch: Epoch,
) -> Result<ExecutedTransaction, BlockTransactionExecutorError> {
let id: tari_transaction::TransactionId = *transaction.id();
let id = *transaction.id();

// Get the latest input substates
let inputs = self.resolve_substates::<TStateStore>(&transaction, store)?;
let inputs = match self.resolve_substates::<TStateStore>(&transaction, store) {
Ok(inputs) => inputs,
Err(err) => {
// TODO: Hacky - if a transaction uses DOWNed/non-existent inputs we error here. This changes the hard
// error to a propose REJECT. So that we have involved shards, we use the inputs as resolved inputs and
// assume v0 if version is not provided.
let inputs = transaction
.all_inputs_iter()
.map(|input| VersionedSubstateId::new(input.substate_id, input.version.unwrap_or(0)))
.map(|id| VersionedSubstateIdLockIntent::new(id, SubstateLockFlag::Write))
.collect();
return Ok(ExecutedTransaction::new(
transaction,
ExecuteResult {
finalize: FinalizeResult {
transaction_hash: id.into_array().into(),
events: vec![],
logs: vec![],
execution_results: vec![],
result: TransactionResult::Reject(RejectReason::ExecutionFailure(err.to_string())),
fee_receipt: Default::default(),
},
},
inputs,
vec![],
Duration::from_secs(0),
));
},
};
info!(target: LOG_TARGET, "Transaction {} executing. Inputs: {:?}", id, inputs);

// Create a memory db with all the input substates, needed for the transaction execution
Expand Down
16 changes: 7 additions & 9 deletions applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::convert::{TryFrom, TryInto};

use log::*;
use tari_bor::{decode_exact, encode};
use tari_dan_common_types::{optional::Optional, shard::Shard, Epoch, PeerAddress, ShardGroup, SubstateAddress};
use tari_dan_common_types::{optional::Optional, shard::Shard, Epoch, PeerAddress, SubstateAddress};
use tari_dan_p2p::{
proto,
proto::rpc::{
Expand Down Expand Up @@ -371,21 +371,19 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl {

let (sender, receiver) = mpsc::channel(10);

let last_state_transition_for_chain =
StateTransitionId::new(Epoch(req.start_epoch), Shard::from(req.start_shard), req.start_seq);
let start_epoch = Epoch(req.start_epoch);
let start_shard = Shard::from(req.start_shard);
let last_state_transition_for_chain = StateTransitionId::new(start_epoch, start_shard, req.start_seq);

// TODO: validate that we can provide the required sync data
let current_shard = ShardGroup::decode_from_u32(req.current_shard_group)
.ok_or_else(|| RpcStatus::bad_request("Invalid shard group"))?;
let current_epoch = Epoch(req.current_epoch);
info!(target: LOG_TARGET, "🌍peer initiated sync with this node ({current_epoch}, {current_shard})");
let end_epoch = Epoch(req.current_epoch);
info!(target: LOG_TARGET, "🌍peer initiated sync with this node ({}, {}, seq={}) to {}", start_epoch, start_shard, req.start_seq, end_epoch);

task::spawn(
StateSyncTask::new(
self.shard_state_store.clone(),
sender,
last_state_transition_for_chain,
current_epoch,
end_epoch,
)
.run(),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

use log::*;
use tari_dan_common_types::Epoch;
use tari_dan_common_types::{optional::Optional, Epoch};
use tari_dan_p2p::proto::rpc::SyncStateResponse;
use tari_dan_storage::{
consensus_models::{StateTransition, StateTransitionId},
Expand Down Expand Up @@ -43,7 +43,7 @@ impl<TStateStore: StateStore> StateSyncTask<TStateStore> {
pub async fn run(mut self) -> Result<(), ()> {
let mut buffer = Vec::with_capacity(BATCH_SIZE);
let mut current_state_transition_id = self.start_state_transition_id;
let mut counter = 0;
let mut counter = 0usize;
loop {
match self.fetch_next_batch(&mut buffer, current_state_transition_id) {
Ok(Some(last_state_transition_id)) => {
Expand All @@ -57,6 +57,7 @@ impl<TStateStore: StateStore> StateSyncTask<TStateStore> {
// ))))
// .await?;

info!(target: LOG_TARGET, "🌍sync complete ({}). {} update(s) sent.", current_state_transition_id, counter);
// Finished
return Ok(());
},
Expand Down Expand Up @@ -92,7 +93,9 @@ impl<TStateStore: StateStore> StateSyncTask<TStateStore> {
) -> Result<Option<StateTransitionId>, StorageError> {
self.store.with_read_tx(|tx| {
let state_transitions =
StateTransition::get_n_after(tx, BATCH_SIZE, current_state_transition_id, self.current_epoch)?;
StateTransition::get_n_after(tx, BATCH_SIZE, current_state_transition_id, self.current_epoch)
.optional()?
.unwrap_or_default();

let Some(last) = state_transitions.last() else {
return Ok(None);
Expand Down
2 changes: 1 addition & 1 deletion dan_layer/common_types/src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl PartialEq<Shard> for u32 {

impl Display for Shard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_u32())
write!(f, "Shard({})", self.as_u32())
}
}

Expand Down
2 changes: 2 additions & 0 deletions dan_layer/consensus/src/hotstuff/block_change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ impl ProposedBlockChangeSet {
self.quorum_decision = None;
self.block_diff = Vec::new();
self.transaction_changes.clear();
self.state_tree_diffs.clear();
self.substate_locks.clear();
self
}

Expand Down
2 changes: 1 addition & 1 deletion dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ where TConsensusSpec: ConsensusSpec
) -> Result<ExecutedTransaction, HotStuffError> {
let transaction = TransactionRecord::get(store.read_transaction(), transaction_id)?;

// TODO: this can fail due to unknown inputs. Need to return an ABORT executed transaction
// TODO: check the failure cases for this. Some failures should not cause consensus to fail
let executed = self
.transaction_executor
.execute(transaction.into_transaction(), store, current_epoch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ where TConsensusSpec: ConsensusSpec

// Store used for transactions that have inputs without specific versions.
// It lives through the entire block so multiple transactions can be sequenced together in the same block
// let tree_store = ChainScopedTreeStore::new(block.epoch(), block.shard_group(), tx);
let mut substate_store = PendingSubstateStore::new(tx, *block.parent(), self.config.num_preshards);
let mut proposed_block_change_set = ProposedBlockChangeSet::new(block.as_leaf_block());

Expand Down Expand Up @@ -719,8 +718,7 @@ where TConsensusSpec: ConsensusSpec
block.total_leader_fee(),
total_leader_fee
);
// TODO: investigate
// return Ok(proposed_block_change_set.no_vote());
return Ok(proposed_block_change_set.no_vote());
}

let pending = PendingStateTreeDiff::get_all_up_to_commit_block(tx, block.justify().block_id())?;
Expand Down Expand Up @@ -883,7 +881,7 @@ where TConsensusSpec: ConsensusSpec
// NOTE: this must happen before we commit the diff because the state transitions use this version
let pending = PendingStateTreeDiff::remove_by_block(tx, block.id())?;
let mut state_tree = ShardedStateTree::new(tx);
state_tree.commit_diff(pending)?;
state_tree.commit_diffs(pending)?;
let tx = state_tree.into_transaction();

let local_diff = diff.into_filtered(local_committee_info);
Expand Down
1 change: 1 addition & 0 deletions dan_layer/consensus/src/hotstuff/substate_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ mod sharded_store;
pub use error::*;
pub use pending_store::*;
pub use sharded_state_tree::*;
pub use sharded_store::*;
30 changes: 1 addition & 29 deletions dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,35 +151,6 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor
Ok(substate.into_substate())
}

// pub fn calculate_jmt_diff_for_block(
// &mut self,
// block: &Block,
// ) -> Result<(FixedHash, StateHashTreeDiff), SubstateStoreError> {
// let current_version = block.justify().block_height().as_u64();
// let next_version = block.height().as_u64();
//
// let pending = PendingStateTreeDiff::get_all_up_to_commit_block(
// self.read_transaction(),
// block.epoch(),
// block.shard_group(),
// block.justify().block_id(),
// )?;
//
// let changes = self.diff.iter().map(|ch| match ch {
// SubstateChange::Up { id, substate, .. } => SubstateTreeChange::Up {
// id: id.substate_id.clone(),
// value_hash: hash_substate(substate.substate_value(), substate.version()),
// },
// SubstateChange::Down { id, .. } => SubstateTreeChange::Down {
// id: id.substate_id.clone(),
// },
// });
// let (state_root, state_tree_diff) =
// calculate_state_merkle_diff(&self.store, current_version, next_version, pending, changes)?;
//
// Ok((state_root, state_tree_diff))
// }

pub fn try_lock_all<I: IntoIterator<Item = VersionedSubstateIdLockIntent>>(
&mut self,
transaction_id: TransactionId,
Expand Down Expand Up @@ -311,6 +282,7 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor
// - it MUST NOT be locked as READ, WRITE or OUTPUT, unless
// - if Same-Transaction OR Local-Only-Rules:
// - it MAY be locked as WRITE or READ
// - it MUST NOT be locked as OUTPUT
SubstateLockFlag::Output => {
if !same_transaction && !has_local_only_rules {
warn!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tari_state_tree::{
JmtStorageError,
SpreadPrefixStateTree,
StagedTreeStore,
StateHashTreeDiff,
StateTreeError,
SubstateTreeChange,
TreeStoreWriter,
Expand All @@ -29,22 +30,26 @@ const LOG_TARGET: &str = "tari::dan::consensus::sharded_state_tree";
pub struct ShardedStateTree<TTx> {
tx: TTx,
pending_diffs: HashMap<Shard, Vec<PendingStateTreeDiff>>,
current_tree_diffs: IndexMap<Shard, VersionedStateHashTreeDiff>,
sharded_tree_diffs: IndexMap<Shard, VersionedStateHashTreeDiff>,
}

impl<TTx> ShardedStateTree<TTx> {
pub fn new(tx: TTx) -> Self {
Self {
tx,
pending_diffs: HashMap::new(),
current_tree_diffs: IndexMap::new(),
sharded_tree_diffs: IndexMap::new(),
}
}

pub fn with_pending_diffs(self, pending_diffs: HashMap<Shard, Vec<PendingStateTreeDiff>>) -> Self {
Self { pending_diffs, ..self }
}

pub fn transaction(&self) -> &TTx {
&self.tx
}

pub fn into_transaction(self) -> TTx {
self.tx
}
Expand All @@ -69,7 +74,7 @@ impl<TTx: StateStoreReadTransaction> ShardedStateTree<&TTx> {
}

pub fn into_versioned_tree_diffs(self) -> IndexMap<Shard, VersionedStateHashTreeDiff> {
self.current_tree_diffs
self.sharded_tree_diffs
}

pub fn put_substate_tree_changes(
Expand Down Expand Up @@ -104,7 +109,7 @@ impl<TTx: StateStoreReadTransaction> ShardedStateTree<&TTx> {
debug!(target: LOG_TARGET, "v{next_version} contains {} tree change(s) for shard {shard}", changes.len());
let state_root = state_tree.put_substate_changes(current_version, next_version, changes)?;
state_roots.update(&state_root);
self.current_tree_diffs
self.sharded_tree_diffs
.insert(shard, VersionedStateHashTreeDiff::new(next_version, store.into_diff()));
}

Expand All @@ -114,30 +119,40 @@ impl<TTx: StateStoreReadTransaction> ShardedStateTree<&TTx> {
}

impl<TTx: StateStoreWriteTransaction> ShardedStateTree<&mut TTx> {
pub fn commit_diff(&mut self, diffs: IndexMap<Shard, Vec<PendingStateTreeDiff>>) -> Result<(), StateTreeError> {
pub fn commit_diffs(&mut self, diffs: IndexMap<Shard, Vec<PendingStateTreeDiff>>) -> Result<(), StateTreeError> {
for (shard, pending_diffs) in diffs {
for pending_diff in pending_diffs {
let version = pending_diff.version;
let diff = pending_diff.diff;
let mut store = ShardScopedTreeStoreWriter::new(self.tx, shard);

for stale_tree_node in diff.stale_tree_nodes {
debug!(
"(shard={shard}) Recording stale tree node: {}",
stale_tree_node.as_node_key()
);
store.record_stale_tree_node(stale_tree_node)?;
}
self.commit_diff(shard, version, diff)?;
}
}

for (key, node) in diff.new_nodes {
debug!("(shard={shard}) Inserting node: {}", key);
store.insert_node(key, node)?;
}
Ok(())
}

store.set_version(version)?;
}
pub fn commit_diff(
&mut self,
shard: Shard,
version: Version,
diff: StateHashTreeDiff,
) -> Result<(), StateTreeError> {
let mut store = ShardScopedTreeStoreWriter::new(self.tx, shard);

for stale_tree_node in diff.stale_tree_nodes {
debug!(
"(shard={shard}) Recording stale tree node: {}",
stale_tree_node.as_node_key()
);
store.record_stale_tree_node(stale_tree_node)?;
}

for (key, node) in diff.new_nodes {
debug!("(shard={shard}) Inserting node: {}", key);
store.insert_node(key, node)?;
}

store.set_version(version)?;
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ impl<'a, TTx: StateStoreWriteTransaction> ShardScopedTreeStoreWriter<'a, TTx> {
.state_tree_shard_versions_set(self.shard, version)
.map_err(|e| tari_state_tree::JmtStorageError::UnexpectedError(e.to_string()))
}

pub fn transaction(&mut self) -> &mut TTx {
self.tx
}
}

impl<'a, TTx> TreeStoreReader<Version> for ShardScopedTreeStoreWriter<'a, TTx>
Expand Down
1 change: 0 additions & 1 deletion dan_layer/p2p/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ message SyncStateRequest {
// The shard in the current shard-epoch that is requested.
// This will limit the state transitions returned to those that fall within this shard-epoch.
uint64 current_epoch = 4;
uint32 current_shard_group = 5;
}

message SyncStateResponse {
Expand Down
Loading

0 comments on commit 72c696d

Please sign in to comment.