From 25893b2b5e9ac00b705d2ef42d25bcd3af9865ac Mon Sep 17 00:00:00 2001 From: Josh Wilson Date: Fri, 27 Oct 2023 13:33:35 +0200 Subject: [PATCH] fix(gossip): convert gossip data to Bytes. This should help avoid potentially costly clones over as it is processed and published --- examples/chat/src/main.rs | 2 +- examples/ipfs-private/src/main.rs | 2 +- protocols/gossipsub/CHANGELOG.md | 1 + protocols/gossipsub/src/behaviour.rs | 12 +++++---- protocols/gossipsub/src/behaviour/tests.rs | 28 ++++++++++----------- protocols/gossipsub/src/config.rs | 3 ++- protocols/gossipsub/src/mcache.rs | 3 ++- protocols/gossipsub/src/peer_score/tests.rs | 4 ++- protocols/gossipsub/src/protocol.rs | 16 ++++++------ protocols/gossipsub/src/transform.rs | 14 +++-------- protocols/gossipsub/src/types.rs | 9 ++++--- 11 files changed, 49 insertions(+), 45 deletions(-) diff --git a/examples/chat/src/main.rs b/examples/chat/src/main.rs index 0a261873f35..8fcf252b83c 100644 --- a/examples/chat/src/main.rs +++ b/examples/chat/src/main.rs @@ -93,7 +93,7 @@ async fn main() -> Result<(), Box> { Ok(Some(line)) = stdin.next_line() => { if let Err(e) = swarm .behaviour_mut().gossipsub - .publish(topic.clone(), line.as_bytes()) { + .publish(topic.clone(), line.as_bytes().to_vec()) { println!("Publish error: {e:?}"); } } diff --git a/examples/ipfs-private/src/main.rs b/examples/ipfs-private/src/main.rs index 861648fecdd..87263fe7598 100644 --- a/examples/ipfs-private/src/main.rs +++ b/examples/ipfs-private/src/main.rs @@ -177,7 +177,7 @@ async fn main() -> Result<(), Box> { if let Err(e) = swarm .behaviour_mut() .gossipsub - .publish(gossipsub_topic.clone(), line.as_bytes()) + .publish(gossipsub_topic.clone(), line.as_bytes().to_vec()) { println!("Publish error: {e:?}"); } diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index b86ec4de6d4..354dd19a929 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,5 +1,6 @@ ## 0.46.0 - unreleased +- Convert `publish` to require `data: impl Into`. - Remove `fast_message_id_fn` mechanism from `Config`. See [PR 4285](https://github.com/libp2p/rust-libp2p/pull/4285). - Remove deprecated `gossipsub::Config::idle_timeout` in favor of `SwarmBuilder::idle_connection_timeout`. diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 2a3a13ea6e7..993c3b74b2e 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -29,6 +29,7 @@ use std::{ time::Duration, }; +use bytes::Bytes; use futures::StreamExt; use futures_ticker::Ticker; use log::{debug, error, trace, warn}; @@ -605,9 +606,10 @@ where pub fn publish( &mut self, topic: impl Into, - data: impl Into>, + data: impl Into, ) -> Result { - let data = data.into(); + let data = Bytes::from(data.into()); + // Convert the input topic into TopicHash let topic = topic.into(); // Transform the data before building a raw_message. @@ -2734,7 +2736,7 @@ where pub(crate) fn build_raw_message( &mut self, topic: TopicHash, - data: Vec, + data: Bytes, ) -> Result { match &mut self.publish_config { PublishConfig::Signing { @@ -2748,7 +2750,7 @@ where let signature = { let message = proto::Message { from: Some(author.to_bytes()), - data: Some(data.clone()), + data: Some(data.to_vec()), seqno: Some(sequence_number.to_be_bytes().to_vec()), topic: topic.clone().into_string(), signature: None, @@ -3672,7 +3674,7 @@ mod local_test { fn test_message() -> RawMessage { RawMessage { source: Some(PeerId::random()), - data: vec![0; 100], + data: Bytes::from(vec![0; 100]), sequence_number: None, topic: TopicHash::from_raw("test_topic"), signature: None, diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index dba5db4c01d..7544527695b 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -284,7 +284,7 @@ fn proto_to_message(rpc: &proto::RPC) -> Rpc { for message in rpc.publish.into_iter() { messages.push(RawMessage { source: message.from.map(|x| PeerId::from_bytes(&x).unwrap()), - data: message.data.unwrap_or_default(), + data: Bytes::from(message.data.unwrap_or_default()), sequence_number: message.seqno.map(|x| BigEndian::read_u64(&x)), // don't inform the application topic: TopicHash::from_raw(message.topic), signature: message.signature, // don't inform the application @@ -1021,7 +1021,7 @@ fn test_handle_iwant_msg_cached() { let raw_message = RawMessage { source: Some(peers[11]), - data: vec![1, 2, 3, 4], + data: Bytes::from(vec![1, 2, 3, 4]), sequence_number: Some(1u64), topic: TopicHash::from_raw("topic"), signature: None, @@ -1079,7 +1079,7 @@ fn test_handle_iwant_msg_cached_shifted() { for shift in 1..10 { let raw_message = RawMessage { source: Some(peers[11]), - data: vec![1, 2, 3, 4], + data: Bytes::from(vec![1, 2, 3, 4]), sequence_number: Some(shift), topic: TopicHash::from_raw("topic"), signature: None, @@ -1552,7 +1552,7 @@ fn do_forward_messages_to_explicit_peers() { let message = RawMessage { source: Some(peers[1]), - data: vec![12], + data: Bytes::from(vec![12]), sequence_number: Some(0), topic: topic_hashes[0].clone(), signature: None, @@ -1696,7 +1696,7 @@ fn no_gossip_gets_sent_to_explicit_peers() { let message = RawMessage { source: Some(peers[1]), - data: vec![], + data: Bytes::new(), sequence_number: Some(0), topic: topic_hashes[0].clone(), signature: None, @@ -2166,7 +2166,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { //receive message let raw_message = RawMessage { source: Some(PeerId::random()), - data: vec![], + data: Bytes::new(), sequence_number: Some(0), topic: topic_hashes[0].clone(), signature: None, @@ -2211,7 +2211,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() { //receive message let raw_message = RawMessage { source: Some(PeerId::random()), - data: vec![], + data: Bytes::new(), sequence_number: Some(0), topic: topic_hashes[0].clone(), signature: None, @@ -2575,7 +2575,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { // Receive message let raw_message = RawMessage { source: Some(PeerId::random()), - data: vec![], + data: Bytes::new(), sequence_number: Some(0), topic: topics[0].clone(), signature: None, @@ -2652,7 +2652,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { // Receive message let raw_message = RawMessage { source: Some(PeerId::random()), - data: vec![], + data: Bytes::new(), sequence_number: Some(0), topic: topics[0].clone(), signature: None, @@ -2743,7 +2743,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { //message that other peers have let raw_message = RawMessage { source: Some(PeerId::random()), - data: vec![], + data: Bytes::new(), sequence_number: Some(0), topic: topics[0].clone(), signature: None, @@ -2928,7 +2928,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { let raw_message1 = RawMessage { source: Some(PeerId::random()), - data: vec![1, 2, 3, 4], + data: Bytes::from(vec![1, 2, 3, 4]), sequence_number: Some(1u64), topic: topics[0].clone(), signature: None, @@ -2938,7 +2938,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { let raw_message2 = RawMessage { source: Some(PeerId::random()), - data: vec![1, 2, 3, 4, 5], + data: Bytes::from(vec![1, 2, 3, 4, 5]), sequence_number: Some(2u64), topic: topics[0].clone(), signature: None, @@ -2948,7 +2948,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { let raw_message3 = RawMessage { source: Some(PeerId::random()), - data: vec![1, 2, 3, 4, 5, 6], + data: Bytes::from(vec![1, 2, 3, 4, 5, 6]), sequence_number: Some(3u64), topic: topics[0].clone(), signature: None, @@ -2958,7 +2958,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { let raw_message4 = RawMessage { source: Some(PeerId::random()), - data: vec![1, 2, 3, 4, 5, 6, 7], + data: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7]), sequence_number: Some(4u64), topic: topics[0].clone(), signature: None, diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 7e79912cc4a..47f7160031c 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -862,6 +862,7 @@ mod test { use crate::topic::IdentityHash; use crate::types::PeerKind; use crate::Topic; + use bytes::Bytes; use libp2p_core::UpgradeInfo; use libp2p_swarm::StreamProtocol; use std::collections::hash_map::DefaultHasher; @@ -961,7 +962,7 @@ mod test { fn get_gossipsub_message() -> Message { Message { source: None, - data: vec![12, 34, 56], + data: Bytes::from(vec![12, 34, 56]), sequence_number: None, topic: Topic::::new("test").hash(), } diff --git a/protocols/gossipsub/src/mcache.rs b/protocols/gossipsub/src/mcache.rs index e85a5bf9c6a..bd8bc27ca58 100644 --- a/protocols/gossipsub/src/mcache.rs +++ b/protocols/gossipsub/src/mcache.rs @@ -224,6 +224,7 @@ mod tests { use super::*; use crate::types::RawMessage; use crate::{IdentTopic as Topic, TopicHash}; + use bytes::Bytes; use libp2p_identity::PeerId; fn gen_testm(x: u64, topic: TopicHash) -> (MessageId, RawMessage) { @@ -235,7 +236,7 @@ mod tests { }; let u8x: u8 = x as u8; let source = Some(PeerId::random()); - let data: Vec = vec![u8x]; + let data = Bytes::from(vec![u8x]); let sequence_number = Some(x); let m = RawMessage { diff --git a/protocols/gossipsub/src/peer_score/tests.rs b/protocols/gossipsub/src/peer_score/tests.rs index 064e277eed7..91e57442714 100644 --- a/protocols/gossipsub/src/peer_score/tests.rs +++ b/protocols/gossipsub/src/peer_score/tests.rs @@ -18,6 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use bytes::Bytes; + /// A collection of unit tests mostly ported from the go implementation. use super::*; @@ -36,7 +38,7 @@ fn within_variance(value: f64, expected: f64, variance: f64) -> bool { fn make_test_message(seq: u64) -> (MessageId, RawMessage) { let raw_message = RawMessage { source: Some(PeerId::random()), - data: vec![12, 34, 56], + data: Bytes::from(vec![12, 34, 56]), sequence_number: Some(seq), topic: Topic::new("test").hash(), signature: None, diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 15d2f59755a..df00da87edc 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -28,7 +28,7 @@ use crate::types::{ use crate::ValidationError; use asynchronous_codec::{Decoder, Encoder, Framed}; use byteorder::{BigEndian, ByteOrder}; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use futures::future; use futures::prelude::*; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; @@ -294,7 +294,7 @@ impl Decoder for GossipsubCodec { if let Some(validation_error) = invalid_kind.take() { let message = RawMessage { source: None, // don't bother inform the application - data: message.data.unwrap_or_default(), + data: Bytes::from(message.data.unwrap_or_default()), sequence_number: None, // don't inform the application topic: TopicHash::from_raw(message.topic), signature: None, // don't inform the application @@ -314,7 +314,7 @@ impl Decoder for GossipsubCodec { // and source) let message = RawMessage { source: None, // don't bother inform the application - data: message.data.unwrap_or_default(), + data: Bytes::from(message.data.unwrap_or_default()), sequence_number: None, // don't inform the application topic: TopicHash::from_raw(message.topic), signature: None, // don't inform the application @@ -339,7 +339,7 @@ impl Decoder for GossipsubCodec { ); let message = RawMessage { source: None, // don't bother inform the application - data: message.data.unwrap_or_default(), + data: Bytes::from(message.data.unwrap_or_default()), sequence_number: None, // don't inform the application topic: TopicHash::from_raw(message.topic), signature: message.signature, // don't inform the application @@ -358,7 +358,7 @@ impl Decoder for GossipsubCodec { debug!("Sequence number not present but expected"); let message = RawMessage { source: None, // don't bother inform the application - data: message.data.unwrap_or_default(), + data: Bytes::from(message.data.unwrap_or_default()), sequence_number: None, // don't inform the application topic: TopicHash::from_raw(message.topic), signature: message.signature, // don't inform the application @@ -384,7 +384,7 @@ impl Decoder for GossipsubCodec { debug!("Message source has an invalid PeerId"); let message = RawMessage { source: None, // don't bother inform the application - data: message.data.unwrap_or_default(), + data: Bytes::from(message.data.unwrap_or_default()), sequence_number, topic: TopicHash::from_raw(message.topic), signature: message.signature, // don't inform the application @@ -408,7 +408,7 @@ impl Decoder for GossipsubCodec { // This message has passed all validation, add it to the validated messages. messages.push(RawMessage { source, - data: message.data.unwrap_or_default(), + data: Bytes::from(message.data.unwrap_or_default()), sequence_number, topic: TopicHash::from_raw(message.topic), signature: message.signature, @@ -515,6 +515,7 @@ mod tests { use crate::config::Config; use crate::{Behaviour, ConfigBuilder}; use crate::{IdentTopic as Topic, Version}; + use bytes::Bytes; use libp2p_identity::Keypair; use quickcheck::*; @@ -532,6 +533,7 @@ mod tests { let data = (0..g.gen_range(10..10024u32)) .map(|_| u8::arbitrary(g)) .collect::>(); + let data = Bytes::from(data); let topic_id = TopicId::arbitrary(g).0; Message(gs.build_raw_message(topic_id, data).unwrap()) } diff --git a/protocols/gossipsub/src/transform.rs b/protocols/gossipsub/src/transform.rs index 6f57d9fc46b..7934eecf30f 100644 --- a/protocols/gossipsub/src/transform.rs +++ b/protocols/gossipsub/src/transform.rs @@ -25,6 +25,8 @@ //! algorithms that can be topic-specific. Once the raw data is transformed the message-id is then //! calculated, allowing for applications to employ message-id functions post compression. +use bytes::Bytes; + use crate::{Message, RawMessage, TopicHash}; /// A general trait of transforming a [`RawMessage`] into a [`Message`]. The @@ -41,11 +43,7 @@ pub trait DataTransform { /// Takes the data to be published (a topic and associated data) transforms the data. The /// transformed data will then be used to create a [`crate::RawMessage`] to be sent to peers. - fn outbound_transform( - &self, - topic: &TopicHash, - data: Vec, - ) -> Result, std::io::Error>; + fn outbound_transform(&self, topic: &TopicHash, data: Bytes) -> Result; } /// The default transform, the raw data is propagated as is to the application layer gossipsub. @@ -62,11 +60,7 @@ impl DataTransform for IdentityTransform { }) } - fn outbound_transform( - &self, - _topic: &TopicHash, - data: Vec, - ) -> Result, std::io::Error> { + fn outbound_transform(&self, _topic: &TopicHash, data: Bytes) -> Result { Ok(data) } } diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 196468b8d32..1936610cbe7 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -20,6 +20,7 @@ //! A collection of types using the Gossipsub system. use crate::TopicHash; +use bytes::Bytes; use libp2p_identity::PeerId; use libp2p_swarm::ConnectionId; use prometheus_client::encoding::EncodeLabelValue; @@ -99,7 +100,7 @@ pub struct RawMessage { pub source: Option, /// Content of the message. Its meaning is out of scope of this library. - pub data: Vec, + pub data: Bytes, /// A random sequence number. pub sequence_number: Option, @@ -122,7 +123,7 @@ impl RawMessage { pub fn raw_protobuf_len(&self) -> usize { let message = proto::Message { from: self.source.map(|m| m.to_bytes()), - data: Some(self.data.clone()), + data: Some(self.data.to_vec()), seqno: self.sequence_number.map(|s| s.to_be_bytes().to_vec()), topic: TopicHash::into_string(self.topic.clone()), signature: self.signature.clone(), @@ -140,7 +141,7 @@ pub struct Message { pub source: Option, /// Content of the message. - pub data: Vec, + pub data: Bytes, /// A random sequence number. pub sequence_number: Option, @@ -248,7 +249,7 @@ impl From for proto::RPC { for message in rpc.messages.into_iter() { let message = proto::Message { from: message.source.map(|m| m.to_bytes()), - data: Some(message.data), + data: Some(message.data.to_vec()), seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()), topic: TopicHash::into_string(message.topic), signature: message.signature,