Skip to content

Commit

Permalink
send RpcOut to connection handler
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Nov 10, 2023
1 parent f5739b8 commit 838b14f
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 137 deletions.
66 changes: 29 additions & 37 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,7 @@ where
let event = RpcOut::Subscriptions(vec![Subscription {
topic_hash: topic_hash.clone(),
action: SubscriptionAction::Subscribe,
}])
.into_protobuf();
}]);

for peer in peer_list {
tracing::debug!(%peer, "Sending SUBSCRIBE to peer");
Expand Down Expand Up @@ -575,8 +574,7 @@ where
let event = RpcOut::Subscriptions(vec![Subscription {
topic_hash: topic_hash.clone(),
action: SubscriptionAction::Unsubscribe,
}])
.into_protobuf();
}]);

for peer in peer_list {
tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
Expand Down Expand Up @@ -616,13 +614,13 @@ where
topic: raw_message.topic.clone(),
});

let event = RpcOut::Publish(raw_message.clone()).into_protobuf();

// check that the size doesn't exceed the max transmission size
if event.get_size() > self.config.max_transmit_size() {
if raw_message.raw_protobuf_len() > self.config.max_transmit_size() {
return Err(PublishError::MessageTooLarge);
}

let event = RpcOut::Publish(raw_message.clone());

// Check the if the message has been published before
if self.duplicate_cache.contains(&msg_id) {
// This message has already been seen. We don't re-publish messages that have already
Expand All @@ -638,10 +636,6 @@ where

let topic_hash = raw_message.topic.clone();

// If we are not flood publishing forward the message to mesh peers.
let mesh_peers_sent = !self.config.flood_publish()
&& self.forward_msg(&msg_id, raw_message.clone(), None, HashSet::new())?;

let mut recipient_peers = HashSet::new();
if let Some(set) = self.topic_peers.get(&topic_hash) {
if self.config.flood_publish() {
Expand All @@ -655,6 +649,13 @@ where
.cloned(),
);
} else {
// Mesh peers
if let Some(mesh_peers) = self.mesh.get(&raw_message.topic) {
for peer_id in mesh_peers {
recipient_peers.insert(*peer_id);
}
}

// Explicit peers
for peer in &self.explicit_peers {
if set.contains(peer) {
Expand Down Expand Up @@ -712,13 +713,14 @@ where
}
}

if recipient_peers.is_empty() && !mesh_peers_sent {
if recipient_peers.is_empty() {
return Err(PublishError::InsufficientPeers);
}

// If the message isn't a duplicate and we have sent it to some peers add it to the
// duplicate cache and memcache.
self.duplicate_cache.insert(msg_id.clone());
let msg_bytes = raw_message.raw_protobuf_len();
self.mcache.put(&msg_id, raw_message);

// If the message is anonymous or has a random author add it to the published message ids
Expand All @@ -730,7 +732,6 @@ where
}

// Send to peers we know are subscribed to the topic.
let msg_bytes = event.get_size();
for peer_id in recipient_peers.iter() {
tracing::trace!(peer=%peer_id, "Sending message to peer");
self.send_message(*peer_id, event.clone())?;
Expand Down Expand Up @@ -1347,9 +1348,11 @@ where
.map(|message| message.topic.clone())
.collect::<HashSet<TopicHash>>();

let message = RpcOut::Forward(message_list).into_protobuf();
let msg_bytes = message_list
.iter()
.fold(0, |acc, m| acc + m.raw_protobuf_len());

let msg_bytes = message.get_size();
let message = RpcOut::Forward(message_list);

if self.send_message(*peer_id, message).is_err() {
tracing::error!("Failed to send cached messages. Messages too large");
Expand Down Expand Up @@ -1519,9 +1522,7 @@ where
"GRAFT: Not subscribed to topics - Sending PRUNE to peer"
);

if let Err(e) =
self.send_message(*peer_id, RpcOut::Control(prune_messages).into_protobuf())
{
if let Err(e) = self.send_message(*peer_id, RpcOut::Control(prune_messages)) {
tracing::error!("Failed to send PRUNE: {:?}", e);
}
}
Expand Down Expand Up @@ -2024,8 +2025,7 @@ where
.into_iter()
.map(|topic_hash| ControlAction::Graft { topic_hash })
.collect(),
)
.into_protobuf(),
),
)
.is_err()
{
Expand Down Expand Up @@ -2586,7 +2586,7 @@ where

// send the control messages
if self
.send_message(peer, RpcOut::Control(control_msgs).into_protobuf())
.send_message(peer, RpcOut::Control(control_msgs))
.is_err()
{
tracing::error!("Failed to send control messages. Message too large");
Expand Down Expand Up @@ -2618,7 +2618,7 @@ where
}

if self
.send_message(*peer, RpcOut::Control(remaining_prunes).into_protobuf())
.send_message(*peer, RpcOut::Control(remaining_prunes))
.is_err()
{
tracing::error!("Failed to send prune messages. Message too large");
Expand Down Expand Up @@ -2679,9 +2679,9 @@ where

// forward the message to peers
if !recipient_peers.is_empty() {
let event = RpcOut::Forward(vec![message.clone()]).into_protobuf();
let msg_bytes = message.raw_protobuf_len();
let event = RpcOut::Forward(vec![message.clone()]);

let msg_bytes = event.get_size();
for peer in recipient_peers.iter() {
tracing::debug!(%peer, message=%msg_id, "Sending message to peer");
self.send_message(*peer, event.clone())?;
Expand Down Expand Up @@ -2800,10 +2800,7 @@ where
/// Takes each control action mapping and turns it into a message
fn flush_control_pool(&mut self) {
for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
if self
.send_message(peer, RpcOut::Control(controls).into_protobuf())
.is_err()
{
if self.send_message(peer, RpcOut::Control(controls)).is_err() {
tracing::error!("Failed to flush control pool. Message too large");
}
}
Expand All @@ -2812,9 +2809,9 @@ where
self.pending_iwant_msgs.clear();
}

/// Send a [`Rpc`] message to a peer. This will wrap the message in an arc if it
/// Send a [`RpcOut`] message to a peer. This will wrap the message in an arc if it
/// is not already an arc.
fn send_message(&mut self, peer_id: PeerId, message: proto::RPC) -> Result<(), PublishError> {
fn send_message(&mut self, peer_id: PeerId, message: RpcOut) -> Result<(), PublishError> {
// If the message is oversized, try and fragment it. If it cannot be fragmented, log an
// error and drop the message (all individual messages should be small enough to fit in the
// max_transmit_size)
Expand Down Expand Up @@ -2891,10 +2888,7 @@ where
if !subscriptions.is_empty() {
// send our subscriptions to the peer
if self
.send_message(
peer_id,
RpcOut::Subscriptions(subscriptions).into_protobuf(),
)
.send_message(peer_id, RpcOut::Subscriptions(subscriptions))
.is_err()
{
tracing::error!("Failed to send subscriptions, message too large");
Expand Down Expand Up @@ -3195,9 +3189,7 @@ where
// Handle messages
for (count, raw_message) in rpc.messages.into_iter().enumerate() {
// Only process the amount of messages the configuration allows.
if self.config.max_messages_per_rpc().is_some()
&& Some(count) >= self.config.max_messages_per_rpc()
{
if Some(count) >= self.config.max_messages_per_rpc() {
tracing::warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
break;
}
Expand Down
Loading

0 comments on commit 838b14f

Please sign in to comment.