Skip to content

Commit

Permalink
Move Gossip values added for wen_retart into restart_crds_values. (so…
Browse files Browse the repository at this point in the history
…lana-labs#34128)

* HvA9J

* Rename file and change orders of definitions.

* Use .from() on u16 to usize which shouldn't fail.

* Update ABI congest.
  • Loading branch information
wen-coding authored Nov 17, 2023
1 parent 45290c4 commit ae4b62c
Show file tree
Hide file tree
Showing 4 changed files with 325 additions and 303 deletions.
6 changes: 3 additions & 3 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ use {
},
crds_value::{
self, AccountsHashes, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot,
NodeInstance, RestartLastVotedForkSlots, RestartLastVotedForkSlotsError,
SnapshotHashes, Version, Vote, MAX_WALLCLOCK,
NodeInstance, SnapshotHashes, Version, Vote, MAX_WALLCLOCK,
},
duplicate_shred::DuplicateShred,
epoch_slots::EpochSlots,
gossip_error::GossipError,
ping_pong::{self, PingCache, Pong},
restart_crds_values::{RestartLastVotedForkSlots, RestartLastVotedForkSlotsError},
socketaddr, socketaddr_any,
weighted_shuffle::WeightedShuffle,
},
Expand Down Expand Up @@ -268,7 +268,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "FW5Ycg6GXPsY5Ek9b2VjP69toxRb95bSNQRRWLSdKv2Y")]
#[frozen_abi(digest = "7a2P1GeQjyqCHMyBrhNPTKfPfG4iv32vki7XHahoN55z")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
Expand Down
301 changes: 1 addition & 300 deletions gossip/src/crds_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,16 @@ use {
duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
epoch_slots::EpochSlots,
legacy_contact_info::LegacyContactInfo,
restart_crds_values::RestartLastVotedForkSlots,
},
bincode::{serialize, serialized_size},
bv::BitVec,
itertools::Itertools,
rand::{CryptoRng, Rng},
serde::de::{Deserialize, Deserializer},
solana_sdk::{
clock::Slot,
hash::Hash,
pubkey::{self, Pubkey},
sanitize::{Sanitize, SanitizeError},
serde_varint,
signature::{Keypair, Signable, Signature, Signer},
timing::timestamp,
transaction::Transaction,
Expand All @@ -29,7 +27,6 @@ use {
collections::{hash_map::Entry, BTreeSet, HashMap},
fmt,
},
thiserror::Error,
};

pub const MAX_WALLCLOCK: u64 = 1_000_000_000_000_000;
Expand Down Expand Up @@ -494,175 +491,6 @@ impl Sanitize for NodeInstance {
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample, AbiEnumVisitor)]
enum SlotsOffsets {
RunLengthEncoding(RunLengthEncoding),
RawOffsets(RawOffsets),
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, AbiExample)]
struct U16(#[serde(with = "serde_varint")] u16);

// The vector always starts with 1. Encode number of 1's and 0's consecutively.
// For example, 110000111 is [2, 4, 3].
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, AbiExample)]
struct RunLengthEncoding(Vec<U16>);

impl RunLengthEncoding {
fn new(bits: &BitVec<u8>) -> Self {
let encoded = (0..bits.len())
.map(|i| bits.get(i))
.dedup_with_count()
.map_while(|(count, _)| u16::try_from(count).ok())
.scan(0, |current_bytes, count| {
*current_bytes += ((u16::BITS - count.leading_zeros() + 6) / 7).max(1) as usize;
(*current_bytes <= RestartLastVotedForkSlots::MAX_BYTES).then_some(U16(count))
})
.collect();
Self(encoded)
}

fn num_encoded_slots(&self) -> usize {
self.0
.iter()
.map(|x| usize::try_from(x.0).unwrap())
.sum::<usize>()
}

fn to_slots(&self, last_slot: Slot, min_slot: Slot) -> Vec<Slot> {
let mut slots: Vec<Slot> = self
.0
.iter()
.map_while(|bit_count| usize::try_from(bit_count.0).ok())
.zip([1, 0].iter().cycle())
.flat_map(|(bit_count, bit)| std::iter::repeat(bit).take(bit_count))
.enumerate()
.filter(|(_, bit)| **bit == 1)
.map_while(|(offset, _)| {
let offset = Slot::try_from(offset).ok()?;
last_slot.checked_sub(offset)
})
.take(RestartLastVotedForkSlots::MAX_SLOTS)
.take_while(|slot| *slot >= min_slot)
.collect();
slots.reverse();
slots
}
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, AbiExample)]
struct RawOffsets(BitVec<u8>);

impl RawOffsets {
fn new(mut bits: BitVec<u8>) -> Self {
bits.truncate(RestartLastVotedForkSlots::MAX_BYTES as u64 * 8);
bits.shrink_to_fit();
Self(bits)
}

fn to_slots(&self, last_slot: Slot, min_slot: Slot) -> Vec<Slot> {
let mut slots: Vec<Slot> = (0..self.0.len())
.filter(|index| self.0.get(*index))
.map_while(|offset| last_slot.checked_sub(offset))
.take_while(|slot| *slot >= min_slot)
.collect();
slots.reverse();
slots
}
}

#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, AbiExample, Debug)]
pub struct RestartLastVotedForkSlots {
pub from: Pubkey,
pub wallclock: u64,
offsets: SlotsOffsets,
pub last_voted_slot: Slot,
pub last_voted_hash: Hash,
pub shred_version: u16,
}

impl Sanitize for RestartLastVotedForkSlots {
fn sanitize(&self) -> std::result::Result<(), SanitizeError> {
self.last_voted_hash.sanitize()
}
}

#[derive(Debug, Error)]
pub enum RestartLastVotedForkSlotsError {
#[error("Last voted fork cannot be empty")]
LastVotedForkEmpty,
}

impl RestartLastVotedForkSlots {
// This number is MAX_CRDS_OBJECT_SIZE - empty serialized RestartLastVotedForkSlots.
const MAX_BYTES: usize = 824;

// Per design doc, we should start wen_restart within 7 hours.
pub const MAX_SLOTS: usize = u16::MAX as usize;

pub fn new(
from: Pubkey,
now: u64,
last_voted_fork: &[Slot],
last_voted_hash: Hash,
shred_version: u16,
) -> Result<Self, RestartLastVotedForkSlotsError> {
let Some((&first_voted_slot, &last_voted_slot)) =
last_voted_fork.iter().minmax().into_option()
else {
return Err(RestartLastVotedForkSlotsError::LastVotedForkEmpty);
};
let max_size = last_voted_slot.saturating_sub(first_voted_slot) + 1;
let mut uncompressed_bitvec = BitVec::new_fill(false, max_size);
for slot in last_voted_fork {
uncompressed_bitvec.set(last_voted_slot - *slot, true);
}
let run_length_encoding = RunLengthEncoding::new(&uncompressed_bitvec);
let offsets =
if run_length_encoding.num_encoded_slots() > RestartLastVotedForkSlots::MAX_BYTES * 8 {
SlotsOffsets::RunLengthEncoding(run_length_encoding)
} else {
SlotsOffsets::RawOffsets(RawOffsets::new(uncompressed_bitvec))
};
Ok(Self {
from,
wallclock: now,
offsets,
last_voted_slot,
last_voted_hash,
shred_version,
})
}

/// New random Version for tests and benchmarks.
pub fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
let num_slots = rng.gen_range(2..20);
let slots = std::iter::repeat_with(|| 47825632 + rng.gen_range(0..512))
.take(num_slots)
.collect::<Vec<Slot>>();
RestartLastVotedForkSlots::new(
pubkey,
new_rand_timestamp(rng),
&slots,
Hash::new_unique(),
1,
)
.unwrap()
}

pub fn to_slots(&self, min_slot: Slot) -> Vec<Slot> {
match &self.offsets {
SlotsOffsets::RunLengthEncoding(run_length_encoding) => {
run_length_encoding.to_slots(self.last_voted_slot, min_slot)
}
SlotsOffsets::RawOffsets(raw_offsets) => {
raw_offsets.to_slots(self.last_voted_slot, min_slot)
}
}
}
}

/// Type of the replicated value
/// These are labels for values in a record that is associated with `Pubkey`
#[derive(PartialEq, Hash, Eq, Clone, Debug)]
Expand Down Expand Up @@ -889,7 +717,6 @@ pub(crate) fn sanitize_wallclock(wallclock: u64) -> Result<(), SanitizeError> {
mod test {
use {
super::*,
crate::cluster_info::MAX_CRDS_OBJECT_SIZE,
bincode::{deserialize, Options},
rand::SeedableRng,
rand_chacha::ChaChaRng,
Expand Down Expand Up @@ -1262,130 +1089,4 @@ mod test {
assert!(node.should_force_push(&pubkey));
assert!(!node.should_force_push(&Pubkey::new_unique()));
}

fn make_rand_slots<R: Rng>(rng: &mut R) -> impl Iterator<Item = Slot> + '_ {
repeat_with(|| rng.gen_range(1..5)).scan(0, |slot, step| {
*slot += step;
Some(*slot)
})
}

#[test]
fn test_restart_last_voted_fork_slots_max_bytes() {
let keypair = Keypair::new();
let header = RestartLastVotedForkSlots::new(
keypair.pubkey(),
timestamp(),
&[1, 2],
Hash::default(),
0,
)
.unwrap();
// If the following assert fails, please update RestartLastVotedForkSlots::MAX_BYTES
assert_eq!(
RestartLastVotedForkSlots::MAX_BYTES,
MAX_CRDS_OBJECT_SIZE - serialized_size(&header).unwrap() as usize
);

// Create large enough slots to make sure we are discarding some to make slots fit.
let mut rng = rand::thread_rng();
let large_length = 8000;
let range: Vec<Slot> = make_rand_slots(&mut rng).take(large_length).collect();
let large_slots = RestartLastVotedForkSlots::new(
keypair.pubkey(),
timestamp(),
&range,
Hash::default(),
0,
)
.unwrap();
assert!(serialized_size(&large_slots).unwrap() <= MAX_CRDS_OBJECT_SIZE as u64);
let retrieved_slots = large_slots.to_slots(0);
assert!(retrieved_slots.len() <= range.len());
assert!(retrieved_slots.last().unwrap() - retrieved_slots.first().unwrap() > 5000);
}

#[test]
fn test_restart_last_voted_fork_slots() {
let keypair = Keypair::new();
let slot = 53;
let slot_parent = slot - 5;
let shred_version = 21;
let original_slots_vec = [slot_parent, slot];
let slots = RestartLastVotedForkSlots::new(
keypair.pubkey(),
timestamp(),
&original_slots_vec,
Hash::default(),
shred_version,
)
.unwrap();
let value =
CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(slots.clone()), &keypair);
assert_eq!(value.sanitize(), Ok(()));
let label = value.label();
assert_eq!(
label,
CrdsValueLabel::RestartLastVotedForkSlots(keypair.pubkey())
);
assert_eq!(label.pubkey(), keypair.pubkey());
assert_eq!(value.wallclock(), slots.wallclock);
let retrieved_slots = slots.to_slots(0);
assert_eq!(retrieved_slots.len(), 2);
assert_eq!(retrieved_slots[0], slot_parent);
assert_eq!(retrieved_slots[1], slot);

let bad_value = RestartLastVotedForkSlots::new(
keypair.pubkey(),
timestamp(),
&[],
Hash::default(),
shred_version,
);
assert!(bad_value.is_err());

let last_slot: Slot = 8000;
let large_slots_vec: Vec<Slot> = (0..last_slot + 1).collect();
let large_slots = RestartLastVotedForkSlots::new(
keypair.pubkey(),
timestamp(),
&large_slots_vec,
Hash::default(),
shred_version,
)
.unwrap();
assert!(serialized_size(&large_slots).unwrap() < MAX_CRDS_OBJECT_SIZE as u64);
let retrieved_slots = large_slots.to_slots(0);
assert_eq!(retrieved_slots, large_slots_vec);
}

fn check_run_length_encoding(slots: Vec<Slot>) {
let last_voted_slot = slots[slots.len() - 1];
let mut bitvec = BitVec::new_fill(false, last_voted_slot - slots[0] + 1);
for slot in &slots {
bitvec.set(last_voted_slot - slot, true);
}
let rle = RunLengthEncoding::new(&bitvec);
let retrieved_slots = rle.to_slots(last_voted_slot, 0);
assert_eq!(retrieved_slots, slots);
}

#[test]
fn test_run_length_encoding() {
check_run_length_encoding((1000..16384 + 1000).map(|x| x as Slot).collect_vec());
check_run_length_encoding([1000 as Slot].into());
check_run_length_encoding(
[
1000 as Slot,
RestartLastVotedForkSlots::MAX_SLOTS as Slot + 999,
]
.into(),
);
check_run_length_encoding((1000..1800).step_by(2).map(|x| x as Slot).collect_vec());

let mut rng = rand::thread_rng();
let large_length = 500;
let range: Vec<Slot> = make_rand_slots(&mut rng).take(large_length).collect();
check_run_length_encoding(range);
}
}
1 change: 1 addition & 0 deletions gossip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod legacy_contact_info;
pub mod ping_pong;
mod push_active_set;
mod received_cache;
pub mod restart_crds_values;
pub mod weighted_shuffle;

#[macro_use]
Expand Down
Loading

0 comments on commit ae4b62c

Please sign in to comment.