Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
chains Merkle shreds in fail-entry-verification broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Feb 6, 2024
1 parent 99760e5 commit 6dc8ecb
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 7 deletions.
2 changes: 1 addition & 1 deletion ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ pub mod layout {
Ok(flags & ShredFlags::SHRED_TICK_REFERENCE_MASK.bits())
}

pub(crate) fn get_merkle_root(shred: &[u8]) -> Option<Hash> {
pub fn get_merkle_root(shred: &[u8]) -> Option<Hash> {
match get_shred_variant(shred).ok()? {
ShredVariant::LegacyCode | ShredVariant::LegacyData => None,
ShredVariant::MerkleCode(proof_size, chained) => {
Expand Down
8 changes: 8 additions & 0 deletions turbine/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ pub enum Error {
Blockstore(#[from] solana_ledger::blockstore::BlockstoreError),
#[error(transparent)]
ClusterInfo(#[from] solana_gossip::cluster_info::ClusterInfoError),
#[error("Invalid Merkle root, slot: {slot}, index: {index}")]
InvalidMerkleRoot { slot: Slot, index: u64 },
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Expand All @@ -76,8 +78,14 @@ pub enum Error {
Send,
#[error(transparent)]
Serialize(#[from] std::boxed::Box<bincode::ErrorKind>),
#[error("Shred not found, slot: {slot}, index: {index}")]
ShredNotFound { slot: Slot, index: u64 },
#[error(transparent)]
TransportError(#[from] solana_sdk::transport::TransportError),
#[error("Unknown last index, slot: {0}")]
UnknownLastIndex(Slot),
#[error("Unknown slot meta, slot: {0}")]
UnknownSlotMeta(Slot),
}

type Result<T> = std::result::Result<T, Error>;
Expand Down
37 changes: 34 additions & 3 deletions turbine/src/broadcast_stage/broadcast_utils.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use {
super::Result,
super::{Error, Result},
bincode::serialized_size,
crossbeam_channel::Receiver,
solana_entry::entry::Entry,
solana_ledger::shred::ShredData,
solana_ledger::{
blockstore::Blockstore,
shred::{self, ShredData},
},
solana_poh::poh_recorder::WorkingBankEntry,
solana_runtime::bank::Bank,
solana_sdk::clock::Slot,
solana_sdk::{clock::Slot, hash::Hash},
std::{
sync::Arc,
time::{Duration, Instant},
Expand Down Expand Up @@ -96,6 +99,34 @@ pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result
})
}

// Returns the Merkle root of the last erasure batch of the parent slot.
pub(super) fn get_chained_merkle_root_from_parent(
slot: Slot,
parent: Slot,
blockstore: &Blockstore,
) -> Result<Hash> {
if slot == parent {
debug_assert_eq!(slot, 0u64);
return Ok(Hash::default());
}
debug_assert!(parent < slot, "parent: {parent} >= slot: {slot}");
let index = blockstore
.meta(parent)?
.ok_or_else(|| Error::UnknownSlotMeta(parent))?
.last_index
.ok_or_else(|| Error::UnknownLastIndex(parent))?;
let shred = blockstore
.get_data_shred(parent, index)?
.ok_or(Error::ShredNotFound {
slot: parent,
index,
})?;
shred::layout::get_merkle_root(&shred).ok_or(Error::InvalidMerkleRoot {
slot: parent,
index,
})
}

#[cfg(test)]
mod tests {
use {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub(super) struct FailEntryVerificationBroadcastRun {
shred_version: u16,
good_shreds: Vec<Shred>,
current_slot: Slot,
chained_merkle_root: Hash,
next_shred_index: u32,
next_code_index: u32,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
Expand All @@ -31,6 +32,7 @@ impl FailEntryVerificationBroadcastRun {
shred_version,
good_shreds: vec![],
current_slot: 0,
chained_merkle_root: Hash::default(),
next_shred_index: 0,
next_code_index: 0,
cluster_nodes_cache,
Expand All @@ -54,6 +56,12 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
let last_tick_height = receive_results.last_tick_height;

if bank.slot() != self.current_slot {
self.chained_merkle_root = broadcast_utils::get_chained_merkle_root_from_parent(
bank.slot(),
bank.parent_slot(),
blockstore,
)
.unwrap();
self.next_shred_index = 0;
self.next_code_index = 0;
self.current_slot = bank.slot();
Expand Down Expand Up @@ -92,14 +100,17 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
keypair,
&receive_results.entries,
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
None, // chained_merkle_root
Some(self.chained_merkle_root),
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);

if let Some(shred) = data_shreds.iter().max_by_key(|shred| shred.index()) {
self.chained_merkle_root = shred.merkle_root().unwrap();
}
self.next_shred_index += data_shreds.len() as u32;
if let Some(index) = coding_shreds.iter().map(Shred::index).max() {
self.next_code_index = index + 1;
Expand All @@ -109,7 +120,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
keypair,
&[good_last_entry],
true,
None, // chained_merkle_root
Some(self.chained_merkle_root),
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
Expand All @@ -123,13 +134,15 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
keypair,
&[bad_last_entry],
false,
None, // chained_merkle_root
Some(self.chained_merkle_root),
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
assert_eq!(good_last_data_shred.len(), 1);
self.chained_merkle_root = good_last_data_shred.last().unwrap().merkle_root().unwrap();
self.next_shred_index += 1;
(good_last_data_shred, bad_last_data_shred)
});
Expand Down

0 comments on commit 6dc8ecb

Please sign in to comment.