Skip to content

Commit

Permalink
feat(gossipsub): start implementing backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Nov 6, 2023
1 parent 4e1ad09 commit 8d79411
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 214 deletions.
203 changes: 75 additions & 128 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::types::{
ControlAction, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, Subscription,
SubscriptionAction,
};
use crate::types::{PeerConnections, PeerKind, Rpc};
use crate::types::{PeerConnections, PeerKind, RpcType};
use crate::{rpc_proto::proto, TopicScoreParams};
use crate::{PublishError, SubscriptionError, ValidationError};
use instant::SystemTime;
Expand Down Expand Up @@ -536,19 +536,15 @@ where
// send subscription request to all peers
let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
if !peer_list.is_empty() {
let event = Rpc {
messages: Vec::new(),
subscriptions: vec![Subscription {
topic_hash: topic_hash.clone(),
action: SubscriptionAction::Subscribe,
}],
control_msgs: Vec::new(),
}
.into_protobuf();
let subscription = Subscription {
topic_hash: topic_hash.clone(),
action: SubscriptionAction::Subscribe,
};
let event = proto::RPC::from(vec![subscription]);

for peer in peer_list {
tracing::debug!(%peer, "Sending SUBSCRIBE to peer");
self.send_message(peer, event.clone())
self.send_message(peer, RpcType::Subscription, event.clone())
.map_err(SubscriptionError::PublishError)?;
}
}
Expand Down Expand Up @@ -576,19 +572,15 @@ where
// announce to all peers
let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
if !peer_list.is_empty() {
let event = Rpc {
messages: Vec::new(),
subscriptions: vec![Subscription {
topic_hash: topic_hash.clone(),
action: SubscriptionAction::Unsubscribe,
}],
control_msgs: Vec::new(),
}
.into_protobuf();
let subscription = Subscription {
topic_hash: topic_hash.clone(),
action: SubscriptionAction::Unsubscribe,
};
let event = proto::RPC::from(vec![subscription]);

for peer in peer_list {
tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
self.send_message(peer, event.clone())?;
self.send_message(peer, RpcType::Subscription, event.clone())?;
}
}

Expand Down Expand Up @@ -624,12 +616,7 @@ where
topic: raw_message.topic.clone(),
});

let event = Rpc {
subscriptions: Vec::new(),
messages: vec![raw_message.clone()],
control_msgs: Vec::new(),
}
.into_protobuf();
let event = proto::RPC::from(vec![raw_message.clone()]);

// check that the size doesn't exceed the max transmission size
if event.get_size() > self.config.max_transmit_size() {
Expand Down Expand Up @@ -746,7 +733,7 @@ where
let msg_bytes = event.get_size();
for peer_id in recipient_peers.iter() {
tracing::trace!(peer=%peer_id, "Sending message to peer");
self.send_message(*peer_id, event.clone())?;
self.send_message(*peer_id, RpcType::Publish, event.clone())?;

if let Some(m) = self.metrics.as_mut() {
m.msg_sent(&topic_hash, msg_bytes);
Expand Down Expand Up @@ -1360,16 +1347,14 @@ where
.map(|message| message.topic.clone())
.collect::<HashSet<TopicHash>>();

let message = Rpc {
subscriptions: Vec::new(),
messages: message_list,
control_msgs: Vec::new(),
}
.into_protobuf();
let message = proto::RPC::from(message_list);

let msg_bytes = message.get_size();

if self.send_message(*peer_id, message).is_err() {
if self
.send_message(*peer_id, RpcType::Forward, message)
.is_err()
{
tracing::error!("Failed to send cached messages. Messages too large");
} else if let Some(m) = self.metrics.as_mut() {
// Sending of messages succeeded, register them on the internal metrics.
Expand Down Expand Up @@ -1530,7 +1515,7 @@ where
let prune_messages = to_prune_topics
.iter()
.map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
.collect();
.collect::<Vec<_>>();
// Send the prune messages to the peer
tracing::debug!(
peer=%peer_id,
Expand All @@ -1539,12 +1524,8 @@ where

if let Err(e) = self.send_message(
*peer_id,
Rpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: prune_messages,
}
.into_protobuf(),
RpcType::ControlAction,
proto::RPC::from(prune_messages),
) {
tracing::error!("Failed to send PRUNE: {:?}", e);
}
Expand Down Expand Up @@ -2043,15 +2024,12 @@ where
&& self
.send_message(
*propagation_source,
Rpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: topics_to_graft
.into_iter()
.map(|topic_hash| ControlAction::Graft { topic_hash })
.collect(),
}
.into_protobuf(),
RpcType::ControlAction,
topics_to_graft
.into_iter()
.map(|topic_hash| ControlAction::Graft { topic_hash })
.collect::<Vec<_>>()
.into(),
)
.is_err()
{
Expand Down Expand Up @@ -2612,15 +2590,7 @@ where

// send the control messages
if self
.send_message(
peer,
Rpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs,
}
.into_protobuf(),
)
.send_message(peer, RpcType::ControlAction, control_msgs.into())
.is_err()
{
tracing::error!("Failed to send control messages. Message too large");
Expand Down Expand Up @@ -2652,15 +2622,7 @@ where
}

if self
.send_message(
*peer,
Rpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: remaining_prunes,
}
.into_protobuf(),
)
.send_message(*peer, RpcType::ControlAction, remaining_prunes.into())
.is_err()
{
tracing::error!("Failed to send prune messages. Message too large");
Expand Down Expand Up @@ -2721,17 +2683,12 @@ where

// forward the message to peers
if !recipient_peers.is_empty() {
let event = Rpc {
subscriptions: Vec::new(),
messages: vec![message.clone()],
control_msgs: Vec::new(),
}
.into_protobuf();
let event = proto::RPC::from(vec![message.clone()]);

let msg_bytes = event.get_size();
for peer in recipient_peers.iter() {
tracing::debug!(%peer, message=%msg_id, "Sending message to peer");
self.send_message(*peer, event.clone())?;
self.send_message(*peer, RpcType::Forward, event.clone())?;
if let Some(m) = self.metrics.as_mut() {
m.msg_sent(&message.topic, msg_bytes);
}
Expand Down Expand Up @@ -2848,15 +2805,7 @@ where
fn flush_control_pool(&mut self) {
for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
if self
.send_message(
peer,
Rpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: controls,
}
.into_protobuf(),
)
.send_message(peer, RpcType::ControlAction, controls.into())
.is_err()
{
tracing::error!("Failed to flush control pool. Message too large");
Expand All @@ -2869,17 +2818,22 @@ where

/// Send a [`Rpc`] message to a peer. This will wrap the message in an arc if it

Check failure on line 2819 in protocols/gossipsub/src/behaviour.rs

View workflow job for this annotation

GitHub Actions / Check rustdoc intra-doc links

unresolved link to `Rpc`
/// is not already an arc.
fn send_message(&mut self, peer_id: PeerId, message: proto::RPC) -> Result<(), PublishError> {
fn send_message(
&mut self,
peer_id: PeerId,
rpc_type: RpcType,
proto: proto::RPC,
) -> Result<(), PublishError> {
// If the message is oversized, try and fragment it. If it cannot be fragmented, log an
// error and drop the message (all individual messages should be small enough to fit in the
// max_transmit_size)

let messages = self.fragment_message(message)?;
let fragments = self.fragment_message(proto)?;

for message in messages {
for proto in fragments {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
event: HandlerIn::Message(message),
event: HandlerIn::Message { rpc_type, proto },
handler: NotifyHandler::Any,
})
}
Expand Down Expand Up @@ -3071,15 +3025,7 @@ where
if !subscriptions.is_empty() {
// send our subscriptions to the peer
if self
.send_message(
peer_id,
Rpc {
messages: Vec::new(),
subscriptions,
control_msgs: Vec::new(),
}
.into_protobuf(),
)
.send_message(peer_id, RpcType::Subscription, subscriptions.into())
.is_err()
{
tracing::error!("Failed to send subscriptions, message too large");
Expand Down Expand Up @@ -3668,14 +3614,6 @@ mod local_test {
use asynchronous_codec::Encoder;
use quickcheck::*;

fn empty_rpc() -> Rpc {
Rpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: Vec::new(),
}
}

fn test_message() -> RawMessage {
RawMessage {
source: Some(PeerId::random()),
Expand All @@ -3702,20 +3640,32 @@ mod local_test {
}
}

impl Arbitrary for Rpc {
impl Arbitrary for proto::RPC {
fn arbitrary(g: &mut Gen) -> Self {
let mut rpc = empty_rpc();

for _ in 0..g.gen_range(0..10u8) {
rpc.subscriptions.push(test_subscription());
}
for _ in 0..g.gen_range(0..10u8) {
rpc.messages.push(test_message());
}
for _ in 0..g.gen_range(0..10u8) {
rpc.control_msgs.push(test_control());
match u8::arbitrary(g) % 3 {
0 => {
let mut subscriptions = vec![];
for _ in 0..g.gen_range(0..10u8) {
subscriptions.push(test_subscription());
}
subscriptions.into()
}
1 => {
let mut messages = vec![];
for _ in 0..g.gen_range(0..10u8) {
messages.push(test_subscription());
}
messages.into()
}
2 => {
let mut control = vec![];
for _ in 0..g.gen_range(0..10u8) {
control.push(test_control());
}
control.into()
}
_ => panic!("outside range"),
}
rpc
}
}

Expand All @@ -3731,10 +3681,8 @@ mod local_test {
let gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, config).unwrap();

// Message under the limit should be fine.
let mut rpc = empty_rpc();
rpc.messages.push(test_message());

let mut rpc_proto = rpc.clone().into_protobuf();
let mut messages = vec![test_message()];
let mut rpc_proto = proto::RPC::from(messages.clone());
let fragmented_messages = gs.fragment_message(rpc_proto.clone()).unwrap();
assert_eq!(
fragmented_messages,
Expand All @@ -3745,8 +3693,8 @@ mod local_test {
// Messages over the limit should be split

while rpc_proto.get_size() < max_transmit_size {
rpc.messages.push(test_message());
rpc_proto = rpc.clone().into_protobuf();
messages.push(test_message());
rpc_proto = messages.clone().into();
}

let fragmented_messages = gs
Expand All @@ -3769,7 +3717,7 @@ mod local_test {

#[test]
fn test_message_fragmentation() {
fn prop(rpc: Rpc) {
fn prop(rpc: proto::RPC) {
let max_transmit_size = 500;
let config = crate::config::ConfigBuilder::default()
.max_transmit_size(max_transmit_size)
Expand All @@ -3783,12 +3731,11 @@ mod local_test {
let mut codec =
crate::protocol::GossipsubCodec::new(length_codec, ValidationMode::Permissive);

let rpc_proto = rpc.into_protobuf();
let fragmented_messages = gs
.fragment_message(rpc_proto.clone())
.fragment_message(rpc.clone())
.expect("Messages must be valid");

if rpc_proto.get_size() < max_transmit_size {
if rpc.get_size() < max_transmit_size {
assert_eq!(
fragmented_messages.len(),
1,
Expand Down
Loading

0 comments on commit 8d79411

Please sign in to comment.