Skip to content

Commit

Permalink
Add push and get methods for RestartLastVotedForkSlots (solana-labs#3…
Browse files Browse the repository at this point in the history
…3613)

* Add push and get methods for RestartLastVotedForkSlots

* Improve expression format.

* Remove fill() from RestartLastVotedForkSlots and move into constructor.

* Update ABI signature.

* Use flate2 compress directly instead of relying on CompressedSlots.

* Make constructor of RestartLastVotedForkSlots return error if necessary.

* Use minmax and remove unnecessary code.

* Replace flate2 with run-length encoding in RestartLastVotedForkSlots.

* Remove accidentally added file.

* The passed in last_voted_fork don't need to be mutable any more.

* Switch to different type of run-length encoding.

* Fix typo.

* Move constant into RestartLastVotedForkSlots.

* Use BitVec in RawOffsets.

* Remove the unnecessary clone.

* Use iter functions for RLE.

* Use take_while instead of loop.

* Change Run length encoding to iterator implementation.

* Allow one slot in RestartLastVotedForkSlots.

* Various simplifications.

* Fix various errors and use customized error type.

* Various simplifications.

* Return error from push_get_restart_last_voted_fork_slots and
remove unnecessary constraints in to_slots.

* Allow 81k slots on RestartLastVotedForkSlots.

* Limit MAX_SLOTS to 65535 so we can go back to u16.

* Use u16::MAX instead of 65535.
  • Loading branch information
wen-coding authored Nov 16, 2023
1 parent 9a78924 commit 3081b43
Show file tree
Hide file tree
Showing 2 changed files with 346 additions and 73 deletions.
112 changes: 110 additions & 2 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use {
},
crds_value::{
self, AccountsHashes, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot,
NodeInstance, SnapshotHashes, Version, Vote, MAX_WALLCLOCK,
NodeInstance, RestartLastVotedForkSlots, RestartLastVotedForkSlotsError,
SnapshotHashes, Version, Vote, MAX_WALLCLOCK,
},
duplicate_shred::DuplicateShred,
epoch_slots::EpochSlots,
Expand Down Expand Up @@ -267,7 +268,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "CVvKB495YW6JN4w1rWwajyZmG5wvNhmD97V99rSv9fGw")]
#[frozen_abi(digest = "HvA9JnnQrJnmkcGxrp8SmTB1b4iSyQ4VK2p6LpSBaoWR")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
Expand Down Expand Up @@ -962,6 +963,26 @@ impl ClusterInfo {
}
}

pub fn push_restart_last_voted_fork_slots(
&self,
fork: &[Slot],
last_vote_bankhash: Hash,
) -> Result<(), RestartLastVotedForkSlotsError> {
let now = timestamp();
let last_voted_fork_slots = RestartLastVotedForkSlots::new(
self.id(),
now,
fork,
last_vote_bankhash,
self.my_shred_version(),
)?;
self.push_message(CrdsValue::new_signed(
CrdsData::RestartLastVotedForkSlots(last_voted_fork_slots),
&self.keypair(),
));
Ok(())
}

fn time_gossip_read_lock<'a>(
&'a self,
label: &'static str,
Expand Down Expand Up @@ -1214,6 +1235,24 @@ impl ClusterInfo {
.collect()
}

pub fn get_restart_last_voted_fork_slots(
&self,
cursor: &mut Cursor,
) -> Vec<RestartLastVotedForkSlots> {
let self_shred_version = self.my_shred_version();
let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds
.get_entries(cursor)
.filter_map(|entry| {
let CrdsData::RestartLastVotedForkSlots(slots) = &entry.value.data else {
return None;
};
(slots.shred_version == self_shred_version).then_some(slots)
})
.cloned()
.collect()
}

/// Returns duplicate-shreds inserted since the given cursor.
pub(crate) fn get_duplicate_shreds(&self, cursor: &mut Cursor) -> Vec<DuplicateShred> {
let gossip_crds = self.gossip.crds.read().unwrap();
Expand Down Expand Up @@ -4487,4 +4526,73 @@ mod tests {
assert_eq!(shred_data.chunk_index() as usize, i);
}
}

#[test]
fn test_push_restart_last_voted_fork_slots() {
let keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0);
let cluster_info = ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified);
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default());
assert!(slots.is_empty());
let mut update: Vec<Slot> = vec![0];
for i in 0..81 {
for j in 0..1000 {
update.push(i * 1050 + j);
}
}
assert!(cluster_info
.push_restart_last_voted_fork_slots(&update, Hash::default())
.is_ok());
cluster_info.flush_push_queue();

let mut cursor = Cursor::default();
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut cursor);
assert_eq!(slots.len(), 1);
let retrieved_slots = slots[0].to_slots(0);
assert!(retrieved_slots[0] < 69000);
assert_eq!(retrieved_slots.last(), Some(84999).as_ref());

let slots = cluster_info.get_restart_last_voted_fork_slots(&mut cursor);
assert!(slots.is_empty());

// Test with different shred versions.
let mut rng = rand::thread_rng();
let node_pubkey = Pubkey::new_unique();
let mut node = LegacyContactInfo::new_rand(&mut rng, Some(node_pubkey));
node.set_shred_version(42);
let mut slots = RestartLastVotedForkSlots::new_rand(&mut rng, Some(node_pubkey));
slots.shred_version = 42;
let entries = vec![
CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(node)),
CrdsValue::new_unsigned(CrdsData::RestartLastVotedForkSlots(slots)),
];
{
let mut gossip_crds = cluster_info.gossip.crds.write().unwrap();
for entry in entries {
assert!(gossip_crds
.insert(entry, /*now=*/ 0, GossipRoute::LocalMessage)
.is_ok());
}
}
// Should exclude other node's last-voted-fork-slot because of different
// shred-version.
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default());
assert_eq!(slots.len(), 1);
assert_eq!(slots[0].from, cluster_info.id());

// Match shred versions.
{
let mut node = cluster_info.my_contact_info.write().unwrap();
node.set_shred_version(42);
}
assert!(cluster_info
.push_restart_last_voted_fork_slots(&update, Hash::default())
.is_ok());
cluster_info.flush_push_queue();
// Should now include both slots.
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default());
assert_eq!(slots.len(), 2);
assert_eq!(slots[0].from, node_pubkey);
assert_eq!(slots[1].from, cluster_info.id());
}
}
Loading

0 comments on commit 3081b43

Please sign in to comment.