Skip to content

Commit

Permalink
address review
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Nov 14, 2023
1 parent f2c5edd commit b375b2f
Show file tree
Hide file tree
Showing 3 changed files with 319 additions and 236 deletions.
219 changes: 99 additions & 120 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,14 +534,10 @@ where
}

// send subscription request to all peers
let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
if !peer_list.is_empty() {
for peer in self.peer_topics.keys().cloned().collect::<Vec<_>>() {
tracing::debug!(%peer, "Sending SUBSCRIBE to peer");
let event = RpcOut::Subscribe(topic_hash.clone());

for peer in peer_list {
tracing::debug!(%peer, "Sending SUBSCRIBE to peer");
self.send_message(peer, event.clone());
}
self.send_message(peer, event);
}

// call JOIN(topic)
Expand Down Expand Up @@ -608,8 +604,6 @@ where
return Err(PublishError::MessageTooLarge);
}

let event = RpcOut::Publish(raw_message.clone());

// Check the if the message has been published before
if self.duplicate_cache.contains(&msg_id) {
// This message has already been seen. We don't re-publish messages that have already
Expand Down Expand Up @@ -638,10 +632,48 @@ where
.cloned(),
);
} else {
// Mesh peers
if let Some(mesh_peers) = self.mesh.get(&raw_message.topic) {
for peer_id in mesh_peers {
recipient_peers.insert(*peer_id);
match self.mesh.get(&raw_message.topic) {
// Mesh peers
Some(mesh_peers) => {
recipient_peers.extend(mesh_peers);
}
// Gossipsub peers
None => {
tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
// If we have fanout peers add them to the map.
if self.fanout.contains_key(&topic_hash) {
for peer in self.fanout.get(&topic_hash).expect("Topic must exist") {
recipient_peers.insert(*peer);
}
} else {
// We have no fanout peers, select mesh_n of them and add them to the fanout
let mesh_n = self.config.mesh_n();
let new_peers = get_random_peers(
&self.topic_peers,
&self.connected_peers,
&topic_hash,
mesh_n,
{
|p| {
!self.explicit_peers.contains(p)
&& !self
.score_below_threshold(p, |pst| {
pst.publish_threshold
})
.0
}
},
);
// Add the new peers to the fanout and recipient peers
self.fanout.insert(topic_hash.clone(), new_peers.clone());
for peer in new_peers {
tracing::debug!(%peer, "Peer added to fanout");
recipient_peers.insert(peer);
}
}
// We are publishing to fanout peers - update the time we published
self.fanout_last_pub
.insert(topic_hash.clone(), Instant::now());
}
}

Expand All @@ -662,43 +694,6 @@ where
recipient_peers.insert(*peer);
}
}

// Gossipsub peers
if self.mesh.get(&topic_hash).is_none() {
tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
// If we have fanout peers add them to the map.
if self.fanout.contains_key(&topic_hash) {
for peer in self.fanout.get(&topic_hash).expect("Topic must exist") {
recipient_peers.insert(*peer);
}
} else {
// We have no fanout peers, select mesh_n of them and add them to the fanout
let mesh_n = self.config.mesh_n();
let new_peers = get_random_peers(
&self.topic_peers,
&self.connected_peers,
&topic_hash,
mesh_n,
{
|p| {
!self.explicit_peers.contains(p)
&& !self
.score_below_threshold(p, |pst| pst.publish_threshold)
.0
}
},
);
// Add the new peers to the fanout and recipient peers
self.fanout.insert(topic_hash.clone(), new_peers.clone());
for peer in new_peers {
tracing::debug!(%peer, "Peer added to fanout");
recipient_peers.insert(peer);
}
}
// We are publishing to fanout peers - update the time we published
self.fanout_last_pub
.insert(topic_hash.clone(), Instant::now());
}
}
}

Expand All @@ -709,7 +704,7 @@ where
// If the message isn't a duplicate and we have sent it to some peers add it to the
// duplicate cache and memcache.
self.duplicate_cache.insert(msg_id.clone());
self.mcache.put(&msg_id, raw_message);
self.mcache.put(&msg_id, raw_message.clone());

// If the message is anonymous or has a random author add it to the published message ids
// cache.
Expand All @@ -722,7 +717,7 @@ where
// Send to peers we know are subscribed to the topic.
for peer_id in recipient_peers.iter() {
tracing::trace!(peer=%peer_id, "Sending message to peer");
self.send_message(*peer_id, event.clone());
self.send_message(*peer_id, RpcOut::Publish(raw_message.clone()));
}

tracing::debug!(message=%msg_id, "Published message");
Expand Down Expand Up @@ -1303,30 +1298,27 @@ where
}

tracing::debug!(peer=%peer_id, "Handling IWANT for peer");
// build a hashmap of available messages
let mut cached_messages = HashMap::new();

for id in iwant_msgs {
// If we have it and the IHAVE count is not above the threshold, add it do the
// cached_messages mapping
if let Some((msg, count)) = self.mcache.get_with_iwant_counts(&id, peer_id) {
// If we have it and the IHAVE count is not above the threshold,
// foward the message.
if let Some((msg, count)) = self
.mcache
.get_with_iwant_counts(&id, peer_id)
.map(|(msg, count)| (msg.clone(), count))
{
if count > self.config.gossip_retransimission() {
tracing::debug!(
peer=%peer_id,
message=%id,
"IWANT: Peer has asked for message too many times; ignoring request"
);
} else {
cached_messages.insert(id.clone(), msg.clone());
tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
self.send_message(*peer_id, RpcOut::Forward(msg));
}
}
}

// Forward cached messages.
for message in cached_messages.into_iter().map(|entry| entry.1) {
tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
self.send_message(*peer_id, RpcOut::Forward(message));
}
tracing::debug!(peer=%peer_id, "Completed IWANT handling for peer");
}

Expand Down Expand Up @@ -1476,16 +1468,18 @@ where
if !to_prune_topics.is_empty() {
// build the prune messages to send
let on_unsubscribe = false;
let prune_messages = to_prune_topics
for action in to_prune_topics
.iter()
.map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
.collect();
.collect::<Vec<_>>()
{
self.send_message(*peer_id, RpcOut::Control(action));
}
// Send the prune messages to the peer
tracing::debug!(
peer=%peer_id,
"GRAFT: Not subscribed to topics - Sending PRUNE to peer"
);
self.send_message(*peer_id, RpcOut::Control(prune_messages));
}
tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer");
}
Expand Down Expand Up @@ -1977,16 +1971,12 @@ where

// If we need to send grafts to peer, do so immediately, rather than waiting for the
// heartbeat.
if !topics_to_graft.is_empty() {
self.send_message(
*propagation_source,
RpcOut::Control(
topics_to_graft
.into_iter()
.map(|topic_hash| ControlAction::Graft { topic_hash })
.collect(),
),
)
for action in topics_to_graft
.into_iter()
.map(|topic_hash| ControlAction::Graft { topic_hash })
.collect::<Vec<_>>()
{
self.send_message(*propagation_source, RpcOut::Control(action))
}

// Notify the application of the subscriptions
Expand Down Expand Up @@ -2512,52 +2502,47 @@ where
&self.connected_peers,
);
}
let mut control_msgs: Vec<ControlAction> = topics
.iter()
.map(|topic_hash| ControlAction::Graft {
topic_hash: topic_hash.clone(),
})
.collect();
let control_msgs = topics.iter().map(|topic_hash| ControlAction::Graft {
topic_hash: topic_hash.clone(),
});

// If there are prunes associated with the same peer add them.
// NOTE: In this case a peer has been added to a topic mesh, and removed from another.
// It therefore must be in at least one mesh and we do not need to inform the handler
// of its removal from another.

// The following prunes are not due to unsubscribing.
let on_unsubscribe = false;
if let Some(topics) = to_prune.remove(&peer) {
let mut prunes = topics
.iter()
.map(|topic_hash| {
self.make_prune(
topic_hash,
&peer,
self.config.do_px() && !no_px.contains(&peer),
on_unsubscribe,
)
})
.collect::<Vec<_>>();
control_msgs.append(&mut prunes);
}
let prunes = to_prune
.remove(&peer)
.into_iter()
.flatten()
.map(|topic_hash| {
self.make_prune(
&topic_hash,
&peer,
self.config.do_px() && !no_px.contains(&peer),
false,
)
});

// send the control messages
self.send_message(peer, RpcOut::Control(control_msgs));
for msg in control_msgs.chain(prunes).collect::<Vec<_>>() {
self.send_message(peer, RpcOut::Control(msg));
}
}

// handle the remaining prunes
// The following prunes are not due to unsubscribing.
let on_unsubscribe = false;
for (peer, topics) in to_prune.iter() {
let mut remaining_prunes = Vec::new();
for topic_hash in topics {
let prune = self.make_prune(
topic_hash,
peer,
self.config.do_px() && !no_px.contains(peer),
on_unsubscribe,
false,
);
remaining_prunes.push(prune);
self.send_message(*peer, RpcOut::Control(prune));

// inform the handler
peer_removed_from_mesh(
*peer,
Expand All @@ -2568,8 +2553,6 @@ where
&self.connected_peers,
);
}

self.send_message(*peer, RpcOut::Control(remaining_prunes))
}
}

Expand Down Expand Up @@ -2743,7 +2726,9 @@ where
/// Takes each control action mapping and turns it into a message
fn flush_control_pool(&mut self) {
for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
self.send_message(peer, RpcOut::Control(controls));
for msg in controls {
self.send_message(peer, RpcOut::Control(msg));
}
}

// This clears all pending IWANT messages
Expand Down Expand Up @@ -2817,6 +2802,13 @@ where
return; // Not our first connection to this peer, hence nothing to do.
}

// Insert an empty set of the topics of this peer until known.
self.peer_topics.insert(peer_id, Default::default());

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.add_peer(peer_id);
}

// Ignore connections from blacklisted peers.
if self.blacklisted_peers.contains(&peer_id) {
tracing::debug!(peer=%peer_id, "Ignoring connection from blacklisted peer");
Expand All @@ -2828,13 +2820,6 @@ where
for topic_hash in self.mesh.keys().cloned().collect::<Vec<_>>() {
self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
}

// Insert an empty set of the topics of this peer until known.
self.peer_topics.insert(peer_id, Default::default());

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.add_peer(peer_id);
}
}

fn on_connection_closed(
Expand Down Expand Up @@ -3434,13 +3419,7 @@ mod local_test {
1 => RpcOut::Unsubscribe(IdentTopic::new("TestTopic").hash()),
2 => RpcOut::Publish(test_message()),
3 => RpcOut::Forward(test_message()),
4 => {
let mut control = Vec::new();
for _ in 0..g.gen_range(0..10u8) {
control.push(test_control());
}
RpcOut::Control(control)
}
4 => RpcOut::Control(test_control()),
_ => panic!("outside range"),
}
}
Expand Down
Loading

0 comments on commit b375b2f

Please sign in to comment.