Skip to content

Commit

Permalink
Include shred version in gossip
Browse files Browse the repository at this point in the history
  • Loading branch information
mvines committed Jan 14, 2020
1 parent c948814 commit 990f3f7
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 21 deletions.
45 changes: 30 additions & 15 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ pub const GOSSIP_SLEEP_MILLIS: u64 = 100;
/// the number of slots to respond with when responding to `Orphan` requests
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
/// The maximum size of a bloom filter
pub const MAX_BLOOM_SIZE: usize = 1030;
pub const MAX_BLOOM_SIZE: usize = 1028;
/// The maximum size of a protocol payload
const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE;
/// The largest protocol header size
const MAX_PROTOCOL_HEADER_SIZE: u64 = 202;
const MAX_PROTOCOL_HEADER_SIZE: u64 = 204;

#[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError {
Expand Down Expand Up @@ -272,7 +272,7 @@ impl ClusterInfo {

let ip_addr = node.gossip.ip();
format!(
"{:15} {:2}| {:5} | {:44} | {:5}| {:5}| {:5} | {:5}| {:5} | {:5}| {:5} | {:5}| {:5}\n",
"{:15} {:2}| {:5} | {:44} | {:5}| {:5}| {:5} | {:5}| {:5} | {:5}| {:5} | {:5}| {:5}| v{}\n",
if ContactInfo::is_valid_address(&node.gossip) {
ip_addr.to_string()
} else {
Expand All @@ -290,15 +290,16 @@ impl ClusterInfo {
addr_to_string(&ip_addr, &node.storage_addr),
addr_to_string(&ip_addr, &node.rpc),
addr_to_string(&ip_addr, &node.rpc_pubsub),
node.shred_version,
)
})
.collect();

format!(
"IP Address |Age(ms)| Node identifier \
|Gossip| TPU |TPU fwd| TVU |TVU fwd|Repair|Storage| RPC |PubSub\n\
|Gossip| TPU |TPU fwd| TVU |TVU fwd|Repair|Storage| RPC |PubSub|ShredVer\n\
------------------+-------+----------------------------------------------+\
------+------+-------+------+-------+------+-------+------+------\n\
------+------+-------+------+-------+------+-------+------+------+--------\n\
{}\
Nodes: {}{}{}",
nodes.join(""),
Expand Down Expand Up @@ -405,13 +406,14 @@ impl ClusterInfo {
}

pub fn rpc_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data().id;
let me = self.my_data();
self.gossip
.crds
.table
.values()
.filter_map(|x| x.value.contact_info())
.filter(|x| x.id != me)
.filter(|x| x.id != me.id)
.filter(|x| x.shred_version == me.shred_version)
.filter(|x| ContactInfo::is_valid_address(&x.rpc))
.cloned()
.collect()
Expand Down Expand Up @@ -439,49 +441,54 @@ impl ClusterInfo {
.values()
.filter_map(|x| x.value.contact_info())
.filter(|x| x.id != me)
/* shred_version not considered for gossip peers (ie, spy nodes do not set
shred_version) */
.filter(|x| ContactInfo::is_valid_address(&x.gossip))
.cloned()
.collect()
}

/// all validators that have a valid tvu port.
pub fn tvu_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data().id;
let me = self.my_data();
self.gossip
.crds
.table
.values()
.filter_map(|x| x.value.contact_info())
.filter(|x| ContactInfo::is_valid_address(&x.tvu))
.filter(|x| !ClusterInfo::is_archiver(x))
.filter(|x| x.id != me)
.filter(|x| x.id != me.id)
.filter(|x| x.shred_version == me.shred_version)
.cloned()
.collect()
}

/// all peers that have a valid storage addr
pub fn storage_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data().id;
let me = self.my_data();
self.gossip
.crds
.table
.values()
.filter_map(|x| x.value.contact_info())
.filter(|x| ContactInfo::is_valid_address(&x.storage_addr))
.filter(|x| x.id != me)
.filter(|x| x.id != me.id)
.filter(|x| x.shred_version == me.shred_version)
.cloned()
.collect()
}

/// all peers that have a valid tvu
pub fn retransmit_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data().id;
let me = self.my_data();
self.gossip
.crds
.table
.values()
.filter_map(|x| x.value.contact_info())
.filter(|x| x.id != me)
.filter(|x| x.id != me.id)
.filter(|x| x.shred_version == me.shred_version)
.filter(|x| ContactInfo::is_valid_address(&x.tvu))
.filter(|x| ContactInfo::is_valid_address(&x.tvu_forwards))
.cloned()
Expand All @@ -490,10 +497,11 @@ impl ClusterInfo {

/// all tvu peers with valid gossip addrs that likely have the slot being requested
fn repair_peers(&self, slot: Slot) -> Vec<ContactInfo> {
let me = self.my_data().id;
let me = self.my_data();
ClusterInfo::tvu_peers(self)
.into_iter()
.filter(|x| x.id != me)
.filter(|x| x.id != me.id)
.filter(|x| x.shred_version == me.shred_version)
.filter(|x| ContactInfo::is_valid_address(&x.gossip))
.filter(|x| {
self.get_epoch_state_for_node(&x.id, None)
Expand Down Expand Up @@ -2664,6 +2672,13 @@ mod tests {
cluster_info.insert_info(contact_info);
stakes.insert(id3, 10);

// normal but with incorrect shred version
let id3 = Pubkey::new(&[4u8; 32]);
let mut contact_info = ContactInfo::new_localhost(&id3, timestamp());
contact_info.shred_version = 1;
cluster_info.insert_info(contact_info.clone());
stakes.insert(id3, 10);

let stakes = Arc::new(stakes);
let (peers, peers_and_stakes) = cluster_info.sorted_tvu_peers_and_stakes(Some(stakes));
assert_eq!(peers.len(), 2);
Expand Down
4 changes: 4 additions & 0 deletions core/src/contact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub struct ContactInfo {
pub rpc_pubsub: SocketAddr,
/// latest wallclock picked
pub wallclock: u64,
/// node shred version
pub shred_version: u16,
}

impl Ord for ContactInfo {
Expand Down Expand Up @@ -84,6 +86,7 @@ impl Default for ContactInfo {
rpc: socketaddr_any!(),
rpc_pubsub: socketaddr_any!(),
wallclock: 0,
shred_version: 0,
}
}
}
Expand Down Expand Up @@ -115,6 +118,7 @@ impl ContactInfo {
rpc,
rpc_pubsub,
wallclock: now,
shred_version: 0,
}
}

Expand Down
11 changes: 5 additions & 6 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ impl Validator {

info!("entrypoint: {:?}", entrypoint_info_option);

Self::print_node_info(&node);

info!("Initializing sigverify, this could take a while...");
sigverify::init();
info!("Done.");
Expand Down Expand Up @@ -177,15 +175,16 @@ impl Validator {
let bank = bank_forks[bank_info.bank_slot].clone();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
// The version used by shreds, derived from genesis
let shred_version = Shred::version_from_hash(&genesis_hash);

let mut validator_exit = ValidatorExit::default();
let exit_ = exit.clone();
validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed)));
let validator_exit = Arc::new(RwLock::new(Some(validator_exit)));

node.info.wallclock = timestamp();
node.info.shred_version = Shred::version_from_hash(&genesis_hash);
Self::print_node_info(&node);

let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(
node.info.clone(),
keypair.clone(),
Expand Down Expand Up @@ -372,7 +371,7 @@ impl Validator {
block_commitment_cache,
config.dev_sigverify_disabled,
config.partition_cfg.clone(),
shred_version,
node.info.shred_version,
transaction_status_sender.clone(),
);

Expand All @@ -392,7 +391,7 @@ impl Validator {
&blockstore,
&config.broadcast_stage_type,
&exit,
shred_version,
node.info.shred_version,
);

datapoint_info!("validator-new", ("id", id.to_string(), String));
Expand Down

0 comments on commit 990f3f7

Please sign in to comment.