diff --git a/core/network/src/legacy_proto/behaviour.rs b/core/network/src/legacy_proto/behaviour.rs index 1c83329ba0e10..d1d378174a21b 100644 --- a/core/network/src/legacy_proto/behaviour.rs +++ b/core/network/src/legacy_proto/behaviour.rs @@ -17,7 +17,7 @@ use crate::{DiscoveryNetBehaviour, config::ProtocolId}; use crate::legacy_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOut, CustomProtoHandlerIn}; use crate::legacy_proto::upgrade::RegisteredProtocol; -use crate::protocol::message::Message; +use bytes::BytesMut; use fnv::FnvHashMap; use futures::prelude::*; use futures03::{compat::Compat, TryFutureExt as _, StreamExt as _, TryStreamExt as _}; @@ -25,7 +25,6 @@ use libp2p::core::{ConnectedPoint, Multiaddr, PeerId}; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use log::{debug, error, trace, warn}; use rand::distributions::{Distribution as _, Uniform}; -use sr_primitives::traits::Block as BlockT; use smallvec::SmallVec; use std::{borrow::Cow, collections::hash_map::Entry, cmp, error, marker::PhantomData, mem, pin::Pin}; use std::time::{Duration, Instant}; @@ -61,9 +60,9 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// Note that this "banning" system is not an actual ban. If a "banned" node tries to connect to /// us, we accept the connection. The "banning" system is only about delaying dialing attempts. /// -pub struct LegacyProto { +pub struct LegacyProto< TSubstream> { /// List of protocols to open with peers. Never modified. - protocol: RegisteredProtocol, + protocol: RegisteredProtocol, /// Receiver for instructions about who to connect to or disconnect from. peerset: peerset::Peerset, @@ -80,7 +79,7 @@ pub struct LegacyProto { next_incoming_index: peerset::IncomingIndex, /// Events to produce from `poll()`. - events: SmallVec<[NetworkBehaviourAction, LegacyProtoOut>; 4]>, + events: SmallVec<[NetworkBehaviourAction; 4]>, /// Marker to pin the generics. marker: PhantomData, @@ -189,7 +188,7 @@ struct IncomingPeer { /// Event that can be emitted by the `LegacyProto`. #[derive(Debug)] -pub enum LegacyProtoOut { +pub enum LegacyProtoOut { /// Opened a custom protocol with the remote. CustomProtocolOpen { /// Version of the protocol that has been opened. @@ -213,7 +212,7 @@ pub enum LegacyProtoOut { /// Id of the peer the message came from. peer_id: PeerId, /// Message that has been received. - message: Message, + message: BytesMut, }, /// The substream used by the protocol is pretty large. We should print avoid sending more @@ -222,11 +221,11 @@ pub enum LegacyProtoOut { /// Id of the peer which is clogged. peer_id: PeerId, /// Copy of the messages that are within the buffer, for further diagnostic. - messages: Vec>, + messages: Vec>, }, } -impl LegacyProto { +impl LegacyProto { /// Creates a `CustomProtos`. pub fn new( protocol: impl Into, @@ -350,8 +349,7 @@ impl LegacyProto { /// /// Also note that even we have a valid open substream, it may in fact be already closed /// without us knowing, in which case the packet will not be received. - pub fn send_packet(&mut self, target: &PeerId, message: Message) - where B: BlockT { + pub fn send_packet(&mut self, target: &PeerId, message: Vec) { if !self.is_open(target) { return; } @@ -607,7 +605,7 @@ impl LegacyProto { } } -impl DiscoveryNetBehaviour for LegacyProto { +impl DiscoveryNetBehaviour for LegacyProto { fn add_discovered_nodes(&mut self, peer_ids: impl Iterator) { self.peerset.discovered(peer_ids.into_iter().map(|peer_id| { debug!(target: "sub-libp2p", "PSM <= Discovered({:?})", peer_id); @@ -616,13 +614,12 @@ impl DiscoveryNetBehaviour for LegacyProto } } -impl NetworkBehaviour for LegacyProto +impl NetworkBehaviour for LegacyProto where TSubstream: AsyncRead + AsyncWrite, - B: BlockT, { - type ProtocolsHandler = CustomProtoHandlerProto; - type OutEvent = LegacyProtoOut; + type ProtocolsHandler = CustomProtoHandlerProto; + type OutEvent = LegacyProtoOut; fn new_handler(&mut self) -> Self::ProtocolsHandler { CustomProtoHandlerProto::new(self.protocol.clone()) @@ -825,7 +822,7 @@ where fn inject_node_event( &mut self, source: PeerId, - event: CustomProtoHandlerOut, + event: CustomProtoHandlerOut, ) { match event { CustomProtoHandlerOut::CustomProtocolClosed { reason } => { @@ -954,7 +951,7 @@ where _params: &mut impl PollParameters, ) -> Async< NetworkBehaviourAction< - CustomProtoHandlerIn, + CustomProtoHandlerIn, Self::OutEvent, >, > { diff --git a/core/network/src/legacy_proto/handler.rs b/core/network/src/legacy_proto/handler.rs index 3fe88d3cfd410..7bdbe4a31ff7c 100644 --- a/core/network/src/legacy_proto/handler.rs +++ b/core/network/src/legacy_proto/handler.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see . use crate::legacy_proto::upgrade::{RegisteredProtocol, RegisteredProtocolEvent, RegisteredProtocolSubstream}; -use crate::protocol::message::Message; +use bytes::BytesMut; use futures::prelude::*; use futures03::{compat::Compat, TryFutureExt as _}; use futures_timer::Delay; @@ -29,7 +29,6 @@ use libp2p::swarm::{ SubstreamProtocol, }; use log::{debug, error}; -use sr_primitives::traits::Block as BlockT; use smallvec::{smallvec, SmallVec}; use std::{borrow::Cow, error, fmt, io, marker::PhantomData, mem, time::Duration}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -88,21 +87,20 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// We consider that we are now "closed" if the remote closes all the existing substreams. /// Re-opening it can then be performed by closing all active substream and re-opening one. /// -pub struct CustomProtoHandlerProto { +pub struct CustomProtoHandlerProto { /// Configuration for the protocol upgrade to negotiate. - protocol: RegisteredProtocol, + protocol: RegisteredProtocol, /// Marker to pin the generic type. marker: PhantomData, } -impl CustomProtoHandlerProto +impl CustomProtoHandlerProto where TSubstream: AsyncRead + AsyncWrite, - B: BlockT, { /// Builds a new `CustomProtoHandlerProto`. - pub fn new(protocol: RegisteredProtocol) -> Self { + pub fn new(protocol: RegisteredProtocol) -> Self { CustomProtoHandlerProto { protocol, marker: PhantomData, @@ -110,14 +108,13 @@ where } } -impl IntoProtocolsHandler for CustomProtoHandlerProto +impl IntoProtocolsHandler for CustomProtoHandlerProto where TSubstream: AsyncRead + AsyncWrite, - B: BlockT, { - type Handler = CustomProtoHandler; + type Handler = CustomProtoHandler; - fn inbound_protocol(&self) -> RegisteredProtocol { + fn inbound_protocol(&self) -> RegisteredProtocol { self.protocol.clone() } @@ -136,12 +133,12 @@ where } /// The actual handler once the connection has been established. -pub struct CustomProtoHandler { +pub struct CustomProtoHandler { /// Configuration for the protocol upgrade to negotiate. - protocol: RegisteredProtocol, + protocol: RegisteredProtocol, /// State of the communications with the remote. - state: ProtocolState, + state: ProtocolState, /// Identifier of the node we're talking to. Used only for logging purposes and shouldn't have /// any influence on the behaviour. @@ -155,15 +152,15 @@ pub struct CustomProtoHandler { /// /// This queue must only ever be modified to insert elements at the back, or remove the first /// element. - events_queue: SmallVec<[ProtocolsHandlerEvent, (), CustomProtoHandlerOut>; 16]>, + events_queue: SmallVec<[ProtocolsHandlerEvent; 16]>, } /// State of the handler. -enum ProtocolState { +enum ProtocolState { /// Waiting for the behaviour to tell the handler whether it is enabled or disabled. Init { /// List of substreams opened by the remote but that haven't been processed yet. - substreams: SmallVec<[RegisteredProtocolSubstream; 6]>, + substreams: SmallVec<[RegisteredProtocolSubstream; 6]>, /// Deadline after which the initialization is abnormally long. init_deadline: Compat, }, @@ -179,9 +176,9 @@ enum ProtocolState { /// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside. Normal { /// The substreams where bidirectional communications happen. - substreams: SmallVec<[RegisteredProtocolSubstream; 4]>, + substreams: SmallVec<[RegisteredProtocolSubstream; 4]>, /// Contains substreams which are being shut down. - shutdown: SmallVec<[RegisteredProtocolSubstream; 4]>, + shutdown: SmallVec<[RegisteredProtocolSubstream; 4]>, }, /// We are disabled. Contains substreams that are being closed. @@ -189,7 +186,7 @@ enum ProtocolState { /// outside or we have never sent any `CustomProtocolOpen` in the first place. Disabled { /// List of substreams to shut down. - shutdown: SmallVec<[RegisteredProtocolSubstream; 6]>, + shutdown: SmallVec<[RegisteredProtocolSubstream; 6]>, /// If true, we should reactivate the handler after all the substreams in `shutdown` have /// been closed. @@ -210,7 +207,7 @@ enum ProtocolState { /// Event that can be received by a `CustomProtoHandler`. #[derive(Debug)] -pub enum CustomProtoHandlerIn { +pub enum CustomProtoHandlerIn { /// The node should start using custom protocols. Enable, @@ -220,13 +217,13 @@ pub enum CustomProtoHandlerIn { /// Sends a message through a custom protocol substream. SendCustomMessage { /// The message to send. - message: Message, + message: Vec, }, } /// Event that can be emitted by a `CustomProtoHandler`. #[derive(Debug)] -pub enum CustomProtoHandlerOut { +pub enum CustomProtoHandlerOut { /// Opened a custom protocol with the remote. CustomProtocolOpen { /// Version of the protocol that has been opened. @@ -242,14 +239,14 @@ pub enum CustomProtoHandlerOut { /// Receives a message on a custom protocol substream. CustomMessage { /// Message that has been received. - message: Message, + message: BytesMut, }, /// A substream to the remote is clogged. The send buffer is very large, and we should print /// a diagnostic message and/or avoid sending more data. Clogged { /// Copy of the messages that are within the buffer, for further diagnostic. - messages: Vec>, + messages: Vec>, }, /// An error has happened on the protocol level with this node. @@ -261,10 +258,9 @@ pub enum CustomProtoHandlerOut { }, } -impl CustomProtoHandler +impl CustomProtoHandler where TSubstream: AsyncRead + AsyncWrite, - B: BlockT, { /// Enables the handler. fn enable(&mut self) { @@ -342,7 +338,7 @@ where /// Polls the state for events. Optionally returns an event to produce. #[must_use] fn poll_state(&mut self) - -> Option, (), CustomProtoHandlerOut>> { + -> Option> { match mem::replace(&mut self.state, ProtocolState::Poisoned) { ProtocolState::Poisoned => { error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state", @@ -471,7 +467,7 @@ where /// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`. fn inject_fully_negotiated( &mut self, - mut substream: RegisteredProtocolSubstream + mut substream: RegisteredProtocolSubstream ) { self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) { ProtocolState::Poisoned => { @@ -516,7 +512,7 @@ where } /// Sends a message to the remote. - fn send_message(&mut self, message: Message) { + fn send_message(&mut self, message: Vec) { match self.state { ProtocolState::Normal { ref mut substreams, .. } => substreams[0].send_message(message), @@ -527,14 +523,14 @@ where } } -impl ProtocolsHandler for CustomProtoHandler -where TSubstream: AsyncRead + AsyncWrite, B: BlockT { - type InEvent = CustomProtoHandlerIn; - type OutEvent = CustomProtoHandlerOut; +impl ProtocolsHandler for CustomProtoHandler +where TSubstream: AsyncRead + AsyncWrite { + type InEvent = CustomProtoHandlerIn; + type OutEvent = CustomProtoHandlerOut; type Substream = TSubstream; type Error = ConnectionKillError; - type InboundProtocol = RegisteredProtocol; - type OutboundProtocol = RegisteredProtocol; + type InboundProtocol = RegisteredProtocol; + type OutboundProtocol = RegisteredProtocol; type OutboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { @@ -556,7 +552,7 @@ where TSubstream: AsyncRead + AsyncWrite, B: BlockT { self.inject_fully_negotiated(proto); } - fn inject_event(&mut self, message: CustomProtoHandlerIn) { + fn inject_event(&mut self, message: CustomProtoHandlerIn) { match message { CustomProtoHandlerIn::Disable => self.disable(), CustomProtoHandlerIn::Enable => self.enable(), @@ -613,7 +609,7 @@ where TSubstream: AsyncRead + AsyncWrite, B: BlockT { } } -impl fmt::Debug for CustomProtoHandler +impl fmt::Debug for CustomProtoHandler where TSubstream: AsyncRead + AsyncWrite, { @@ -625,9 +621,9 @@ where /// Given a list of substreams, tries to shut them down. The substreams that have been successfully /// shut down are removed from the list. -fn shutdown_list - (list: &mut SmallVec>>) -where TSubstream: AsyncRead + AsyncWrite, B: BlockT { +fn shutdown_list + (list: &mut SmallVec>>) +where TSubstream: AsyncRead + AsyncWrite { 'outer: for n in (0..list.len()).rev() { let mut substream = list.swap_remove(n); loop { diff --git a/core/network/src/legacy_proto/tests.rs b/core/network/src/legacy_proto/tests.rs index 8fd47843df2e5..49ab38e3b7e06 100644 --- a/core/network/src/legacy_proto/tests.rs +++ b/core/network/src/legacy_proto/tests.rs @@ -17,6 +17,7 @@ #![cfg(test)] use futures::{future, prelude::*, try_ready}; +use codec::{Encode, Decode}; use libp2p::core::nodes::Substream; use libp2p::core::{ConnectedPoint, transport::boxed::Boxed, muxing::StreamMuxerBox}; use libp2p::swarm::{Swarm, ProtocolsHandler, IntoProtocolsHandler}; @@ -24,9 +25,9 @@ use libp2p::swarm::{PollParameters, NetworkBehaviour, NetworkBehaviourAction}; use libp2p::{PeerId, Multiaddr, Transport}; use rand::seq::SliceRandom; use std::{io, time::Duration, time::Instant}; -use test_client::runtime::Block; -use crate::message::generic::Message; +use crate::message::Message; use crate::legacy_proto::{LegacyProto, LegacyProtoOut}; +use test_client::runtime::Block; /// Builds two nodes that have each other as bootstrap nodes. /// This is to be used only for testing, and a panic will happen if something goes wrong. @@ -101,12 +102,12 @@ fn build_nodes() /// Wraps around the `CustomBehaviour` network behaviour, and adds hardcoded node addresses to it. struct CustomProtoWithAddr { - inner: LegacyProto>, + inner: LegacyProto>, addrs: Vec<(PeerId, Multiaddr)>, } impl std::ops::Deref for CustomProtoWithAddr { - type Target = LegacyProto>; + type Target = LegacyProto>; fn deref(&self) -> &Self::Target { &self.inner @@ -121,8 +122,8 @@ impl std::ops::DerefMut for CustomProtoWithAddr { impl NetworkBehaviour for CustomProtoWithAddr { type ProtocolsHandler = - > as NetworkBehaviour>::ProtocolsHandler; - type OutEvent = > as NetworkBehaviour>::OutEvent; + > as NetworkBehaviour>::ProtocolsHandler; + type OutEvent = > as NetworkBehaviour>::OutEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { self.inner.new_handler() @@ -209,7 +210,7 @@ fn two_nodes_transfer_lots_of_packets() { for n in 0 .. NUM_PACKETS { service1.send_packet( &peer_id, - Message::ChainSpecific(vec![(n % 256) as u8]) + Message::::ChainSpecific(vec![(n % 256) as u8]).encode() ); } }, @@ -223,11 +224,16 @@ fn two_nodes_transfer_lots_of_packets() { loop { match try_ready!(service2.poll()) { Some(LegacyProtoOut::CustomProtocolOpen { .. }) => {}, - Some(LegacyProtoOut::CustomMessage { message: Message::ChainSpecific(message), .. }) => { - assert_eq!(message.len(), 1); - packet_counter += 1; - if packet_counter == NUM_PACKETS { - return Ok(Async::Ready(())) + Some(LegacyProtoOut::CustomMessage { message, .. }) => { + match Message::::decode(&mut &message[..]).unwrap() { + Message::::ChainSpecific(message) => { + assert_eq!(message.len(), 1); + packet_counter += 1; + if packet_counter == NUM_PACKETS { + return Ok(Async::Ready(())) + } + }, + _ => panic!(), } } _ => panic!(), @@ -248,7 +254,7 @@ fn basic_two_nodes_requests_in_parallel() { let mut to_send = Vec::new(); for _ in 0..200 { // Note: don't make that number too high or the CPU usage will explode. let msg = (0..10).map(|_| rand::random::()).collect::>(); - to_send.push(Message::ChainSpecific(msg)); + to_send.push(Message::::ChainSpecific(msg)); } to_send }; @@ -263,7 +269,7 @@ fn basic_two_nodes_requests_in_parallel() { match try_ready!(service1.poll()) { Some(LegacyProtoOut::CustomProtocolOpen { peer_id, .. }) => { for msg in to_send.drain(..) { - service1.send_packet(&peer_id, msg); + service1.send_packet(&peer_id, msg.encode()); } }, _ => panic!(), @@ -276,7 +282,7 @@ fn basic_two_nodes_requests_in_parallel() { match try_ready!(service2.poll()) { Some(LegacyProtoOut::CustomProtocolOpen { .. }) => {}, Some(LegacyProtoOut::CustomMessage { message, .. }) => { - let pos = to_receive.iter().position(|m| *m == message).unwrap(); + let pos = to_receive.iter().position(|m| m.encode() == message).unwrap(); to_receive.remove(pos); if to_receive.is_empty() { return Ok(Async::Ready(())) diff --git a/core/network/src/legacy_proto/upgrade.rs b/core/network/src/legacy_proto/upgrade.rs index 8831d16f91636..fdf23ec351c8a 100644 --- a/core/network/src/legacy_proto/upgrade.rs +++ b/core/network/src/legacy_proto/upgrade.rs @@ -15,15 +15,11 @@ // along with Substrate. If not, see . use crate::config::ProtocolId; -use crate::protocol::message::Message; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use libp2p::core::{Negotiated, Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName}; use libp2p::tokio_codec::Framed; -use log::debug; -use std::{collections::VecDeque, io, marker::PhantomData, vec::IntoIter as VecIntoIter}; +use std::{collections::VecDeque, io, vec::IntoIter as VecIntoIter}; use futures::{prelude::*, future, stream}; -use codec::{Decode, Encode}; -use sr_primitives::traits::Block as BlockT; use tokio_io::{AsyncRead, AsyncWrite}; use unsigned_varint::codec::UviBytes; @@ -31,7 +27,7 @@ use unsigned_varint::codec::UviBytes; /// /// Note that "a single protocol" here refers to `par` for example. However /// each protocol can have multiple different versions for networking purposes. -pub struct RegisteredProtocol { +pub struct RegisteredProtocol { /// Id of the protocol for API purposes. id: ProtocolId, /// Base name of the protocol as advertised on the network. @@ -40,11 +36,9 @@ pub struct RegisteredProtocol { /// List of protocol versions that we support. /// Ordered in descending order so that the best comes first. supported_versions: Vec, - /// Marker to pin the generic. - marker: PhantomData, } -impl RegisteredProtocol { +impl RegisteredProtocol { /// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be /// passed inside the `RegisteredProtocolOutput`. pub fn new(protocol: impl Into, versions: &[u8]) @@ -62,24 +56,22 @@ impl RegisteredProtocol { tmp.sort_unstable_by(|a, b| b.cmp(&a)); tmp }, - marker: PhantomData, } } } -impl Clone for RegisteredProtocol { +impl Clone for RegisteredProtocol { fn clone(&self) -> Self { RegisteredProtocol { id: self.id.clone(), base_name: self.base_name.clone(), supported_versions: self.supported_versions.clone(), - marker: PhantomData, } } } /// Output of a `RegisteredProtocol` upgrade. -pub struct RegisteredProtocolSubstream { +pub struct RegisteredProtocolSubstream { /// If true, we are in the process of closing the sink. is_closing: bool, /// Whether the local node opened this substream (dialer), or we received this substream from @@ -96,11 +88,9 @@ pub struct RegisteredProtocolSubstream { /// If true, we have sent a "remote is clogged" event recently and shouldn't send another one /// unless the buffer empties then fills itself again. clogged_fuse: bool, - /// Marker to pin the generic. - marker: PhantomData, } -impl RegisteredProtocolSubstream { +impl RegisteredProtocolSubstream { /// Returns the version of the protocol that was negotiated. pub fn protocol_version(&self) -> u8 { self.protocol_version @@ -124,33 +114,32 @@ impl RegisteredProtocolSubstream { } /// Sends a message to the substream. - pub fn send_message(&mut self, data: Message) - where B: BlockT { + pub fn send_message(&mut self, data: Vec) { if self.is_closing { return } - self.send_queue.push_back(data.encode()); + self.send_queue.push_back(data); } } /// Event produced by the `RegisteredProtocolSubstream`. #[derive(Debug, Clone)] -pub enum RegisteredProtocolEvent { +pub enum RegisteredProtocolEvent { /// Received a message from the remote. - Message(Message), + Message(BytesMut), /// Diagnostic event indicating that the connection is clogged and we should avoid sending too /// many messages to it. Clogged { /// Copy of the messages that are within the buffer, for further diagnostic. - messages: Vec>, + messages: Vec>, }, } -impl Stream for RegisteredProtocolSubstream -where TSubstream: AsyncRead + AsyncWrite, B: BlockT { - type Item = RegisteredProtocolEvent; +impl Stream for RegisteredProtocolSubstream +where TSubstream: AsyncRead + AsyncWrite { + type Item = RegisteredProtocolEvent; type Error = io::Error; fn poll(&mut self) -> Poll, Self::Error> { @@ -179,8 +168,7 @@ where TSubstream: AsyncRead + AsyncWrite, B: BlockT { self.clogged_fuse = true; return Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages: self.send_queue.iter() - .map(|m| Decode::decode(&mut &m[..])) - .filter_map(Result::ok) + .map(|m| m.clone()) .collect(), }))) } @@ -199,15 +187,7 @@ where TSubstream: AsyncRead + AsyncWrite, B: BlockT { // Note that `inner` is wrapped in a `Fuse`, therefore we can poll it forever. match self.inner.poll()? { Async::Ready(Some(data)) => { - let message = as Decode>::decode(&mut &data[..]) - .map_err(|err| { - debug!( - target: "sub-libp2p", - "Couldn't decode packet sent by the remote: {:?}: {}", data, err.what(), - ); - io::ErrorKind::InvalidData - })?; - Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) + Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(data)))) } Async::Ready(None) => if !self.requires_poll_complete && self.send_queue.is_empty() { @@ -220,7 +200,7 @@ where TSubstream: AsyncRead + AsyncWrite, B: BlockT { } } -impl UpgradeInfo for RegisteredProtocol { +impl UpgradeInfo for RegisteredProtocol { type Info = RegisteredProtocolName; type InfoIter = VecIntoIter; @@ -255,10 +235,10 @@ impl ProtocolName for RegisteredProtocolName { } } -impl InboundUpgrade for RegisteredProtocol +impl InboundUpgrade for RegisteredProtocol where TSubstream: AsyncRead + AsyncWrite, { - type Output = RegisteredProtocolSubstream; + type Output = RegisteredProtocolSubstream; type Future = future::FutureResult; type Error = io::Error; @@ -281,12 +261,11 @@ where TSubstream: AsyncRead + AsyncWrite, inner: framed.fuse(), protocol_version: info.version, clogged_fuse: false, - marker: PhantomData, }) } } -impl OutboundUpgrade for RegisteredProtocol +impl OutboundUpgrade for RegisteredProtocol where TSubstream: AsyncRead + AsyncWrite, { type Output = >::Output; @@ -308,7 +287,6 @@ where TSubstream: AsyncRead + AsyncWrite, inner: framed.fuse(), protocol_version: info.version, clogged_fuse: false, - marker: PhantomData, }) } } diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index 6afb4c8b116ae..8b3b5a49e40ec 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -16,6 +16,7 @@ use crate::{DiscoveryNetBehaviour, config::ProtocolId}; use crate::legacy_proto::{LegacyProto, LegacyProtoOut}; +use bytes::BytesMut; use futures::prelude::*; use futures03::{StreamExt as _, TryStreamExt as _}; use libp2p::{Multiaddr, PeerId}; @@ -28,6 +29,7 @@ use consensus::{ block_validation::BlockAnnounceValidator, import_queue::{BlockImportResult, BlockImportError, IncomingBlock, Origin} }; +use codec::{Decode, Encode}; use sr_primitives::{generic::BlockId, ConsensusEngineId, Justification}; use sr_primitives::traits::{ Block as BlockT, Header as HeaderT, NumberFor, One, Zero, @@ -44,6 +46,7 @@ use crate::config::{BoxFinalityProofRequestBuilder, Roles}; use rustc_hex::ToHex; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; +use std::fmt::Write; use std::{cmp, num::NonZeroUsize, time}; use log::{trace, debug, warn, error}; use crate::chain::{Client, FinalityProofProvider}; @@ -90,6 +93,8 @@ const PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE: i32 = -(1 << 8); const NEW_EXTRINSIC_REPUTATION_CHANGE: i32 = 1 << 7; /// We sent an RPC query to the given node, but it failed. const RPC_FAILED_REPUTATION_CHANGE: i32 = -(1 << 12); +/// We received a message that failed to decode. +const BAD_MESSAGE_REPUTATION_CHANGE: i32 = -(1 << 12); // Lock must always be taken in order declared here. pub struct Protocol, H: ExHashT> { @@ -113,7 +118,15 @@ pub struct Protocol, H: ExHashT> { /// When asked for a proof of finality, we use this struct to build one. finality_proof_provider: Option>>, /// Handles opening the unique substream and sending and receiving raw messages. - behaviour: LegacyProto>, + behaviour: LegacyProto>, +} + +#[derive(Default)] +struct PacketStats { + bytes_in: u64, + bytes_out: u64, + count_in: u64, + count_out: u64, } /// A peer that we are connected to @@ -151,12 +164,12 @@ pub struct PeerInfo { pub best_number: ::Number, } -struct LightDispatchIn<'a, B: BlockT> { - behaviour: &'a mut LegacyProto>, +struct LightDispatchIn<'a> { + behaviour: &'a mut LegacyProto>, peerset: peerset::PeersetHandle, } -impl<'a, B: BlockT> LightDispatchNetwork for LightDispatchIn<'a, B> { +impl<'a, B: BlockT> LightDispatchNetwork for LightDispatchIn<'a> { fn report_peer(&mut self, who: &PeerId, reputation: i32) { self.peerset.report_peer(who.clone(), reputation) } @@ -166,12 +179,12 @@ impl<'a, B: BlockT> LightDispatchNetwork for LightDispatchIn<'a, B> { } fn send_header_request(&mut self, who: &PeerId, id: RequestId, block: <::Header as HeaderT>::Number) { - let message = message::generic::Message::RemoteHeaderRequest(message::RemoteHeaderRequest { + let message: Message = message::generic::Message::RemoteHeaderRequest(message::RemoteHeaderRequest { id, block, }); - self.behaviour.send_packet(who, message) + self.behaviour.send_packet(who, message.encode()) } fn send_read_request( @@ -181,13 +194,13 @@ impl<'a, B: BlockT> LightDispatchNetwork for LightDispatchIn<'a, B> { block: ::Hash, keys: Vec>, ) { - let message = message::generic::Message::RemoteReadRequest(message::RemoteReadRequest { + let message: Message = message::generic::Message::RemoteReadRequest(message::RemoteReadRequest { id, block, keys, }); - self.behaviour.send_packet(who, message) + self.behaviour.send_packet(who, message.encode()) } fn send_read_child_request( @@ -198,14 +211,14 @@ impl<'a, B: BlockT> LightDispatchNetwork for LightDispatchIn<'a, B> { storage_key: Vec, keys: Vec>, ) { - let message = message::generic::Message::RemoteReadChildRequest(message::RemoteReadChildRequest { + let message: Message = message::generic::Message::RemoteReadChildRequest(message::RemoteReadChildRequest { id, block, storage_key, keys, }); - self.behaviour.send_packet(who, message) + self.behaviour.send_packet(who, message.encode()) } fn send_call_request( @@ -216,14 +229,14 @@ impl<'a, B: BlockT> LightDispatchNetwork for LightDispatchIn<'a, B> { method: String, data: Vec ) { - let message = message::generic::Message::RemoteCallRequest(message::RemoteCallRequest { + let message: Message = message::generic::Message::RemoteCallRequest(message::RemoteCallRequest { id, block, method, data, }); - self.behaviour.send_packet(who, message) + self.behaviour.send_packet(who, message.encode()) } fn send_changes_request( @@ -237,7 +250,7 @@ impl<'a, B: BlockT> LightDispatchNetwork for LightDispatchIn<'a, B> { storage_key: Option>, key: Vec, ) { - let message = message::generic::Message::RemoteChangesRequest(message::RemoteChangesRequest { + let message: Message = message::generic::Message::RemoteChangesRequest(message::RemoteChangesRequest { id, first, last, @@ -247,7 +260,7 @@ impl<'a, B: BlockT> LightDispatchNetwork for LightDispatchIn<'a, B> { key, }); - self.behaviour.send_packet(who, message) + self.behaviour.send_packet(who, message.encode()) } fn send_body_request( @@ -260,7 +273,7 @@ impl<'a, B: BlockT> LightDispatchNetwork for LightDispatchIn<'a, B> { direction: Direction, max: Option ) { - let message = message::generic::Message::BlockRequest(message::BlockRequest:: { + let message: Message = message::generic::Message::BlockRequest(message::BlockRequest:: { id, fields, from, @@ -269,7 +282,7 @@ impl<'a, B: BlockT> LightDispatchNetwork for LightDispatchIn<'a, B> { max, }); - self.behaviour.send_packet(who, message) + self.behaviour.send_packet(who, message.encode()) } } @@ -291,7 +304,7 @@ pub trait Context { /// Protocol context. struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { - behaviour: &'a mut LegacyProto>, + behaviour: &'a mut LegacyProto>, context_data: &'a mut ContextData, peerset_handle: &'a peerset::PeersetHandle, } @@ -299,7 +312,7 @@ struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> { fn new( context_data: &'a mut ContextData, - behaviour: &'a mut LegacyProto>, + behaviour: &'a mut LegacyProto>, peerset_handle: &'a peerset::PeersetHandle, ) -> Self { ProtocolContext { context_data, peerset_handle, behaviour } @@ -316,19 +329,19 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, } fn send_consensus(&mut self, who: PeerId, consensus: ConsensusMessage) { - send_message( + send_message:: ( self.behaviour, - &mut self.context_data.peers, - who, + &mut self.context_data.stats, + &who, GenericMessage::Consensus(consensus) ) } fn send_chain_specific(&mut self, who: PeerId, message: Vec) { - send_message( + send_message:: ( self.behaviour, - &mut self.context_data.peers, - who, + &mut self.context_data.stats, + &who, GenericMessage::ChainSpecific(message) ) } @@ -338,6 +351,7 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, struct ContextData { // All connected peers peers: HashMap>, + stats: HashMap<&'static str, PacketStats>, pub chain: Arc>, } @@ -388,6 +402,7 @@ impl, H: ExHashT> Protocol { config, context_data: ContextData { peers: HashMap::new(), + stats: HashMap::new(), chain, }, light_dispatch: LightDispatch::new(checker), @@ -517,8 +532,22 @@ impl, H: ExHashT> Protocol { pub fn on_custom_message( &mut self, who: PeerId, - message: Message, + data: BytesMut, ) -> CustomMessageOutcome { + + let message = match as Decode>::decode(&mut &data[..]) { + Ok(message) => message, + Err(err) => { + debug!(target: "sync", "Couldn't decode packet sent by {}: {:?}: {}", who, data, err.what()); + self.peerset_handle.report_peer(who.clone(), BAD_MESSAGE_REPUTATION_CHANGE); + return CustomMessageOutcome::None; + } + }; + + let mut stats = self.context_data.stats.entry(message.id()).or_default(); + stats.bytes_in += data.len() as u64; + stats.count_in += 1; + match message { GenericMessage::Status(s) => self.on_status_message(who, s), GenericMessage::BlockRequest(r) => self.on_block_request(who, r), @@ -581,15 +610,25 @@ impl, H: ExHashT> Protocol { CustomMessageOutcome::None } - fn send_message(&mut self, who: PeerId, message: Message) { - send_message::( + fn send_request(&mut self, who: &PeerId, message: Message) { + send_request::( &mut self.behaviour, + &mut self.context_data.stats, &mut self.context_data.peers, who, message, ); } + fn send_message(&mut self, who: &PeerId, message: Message) { + send_message::( + &mut self.behaviour, + &mut self.context_data.stats, + who, + message, + ); + } + /// Locks `self` and returns a context plus the `ConsensusGossip` struct. pub fn consensus_gossip_lock<'a>( &'a mut self, @@ -622,7 +661,7 @@ impl, H: ExHashT> Protocol { GossipMessageRecipient::BroadcastNew => self.consensus_gossip.multicast(&mut context, topic, message, false), GossipMessageRecipient::Peer(who) => - self.send_message(who, GenericMessage::Consensus(message)), + self.send_message(&who, GenericMessage::Consensus(message)), } } @@ -745,7 +784,7 @@ impl, H: ExHashT> Protocol { blocks: blocks, }; trace!(target: "sync", "Sending BlockResponse with {} blocks", response.blocks.len()); - self.send_message(peer, GenericMessage::BlockResponse(response)) + self.send_message(&peer, GenericMessage::BlockResponse(response)) } /// Adjusts the reputation of a node. @@ -792,7 +831,7 @@ impl, H: ExHashT> Protocol { Ok(sync::OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), Ok(sync::OnBlockData::Request(peer, req)) => { - self.send_message(peer, GenericMessage::BlockRequest(req)); + self.send_request(&peer, GenericMessage::BlockRequest(req)); CustomMessageOutcome::None } Err(sync::BadPeer(id, repu)) => { @@ -939,7 +978,7 @@ impl, H: ExHashT> Protocol { }, who.clone(), status.roles, status.best_number); match self.sync.new_peer(who.clone(), info) { Ok(None) => (), - Ok(Some(req)) => self.send_message(who.clone(), GenericMessage::BlockRequest(req)), + Ok(Some(req)) => self.send_request(&who, GenericMessage::BlockRequest(req)), Err(sync::BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id); self.peerset_handle.report_peer(id, repu) @@ -1020,7 +1059,12 @@ impl, H: ExHashT> Protocol { .push(who.to_base58()); } trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); - self.behaviour.send_packet(who, GenericMessage::Transactions(to_send)) + send_message:: ( + &mut self.behaviour, + &mut self.context_data.stats, + &who, + GenericMessage::Transactions(to_send) + ) } } @@ -1061,7 +1105,7 @@ impl, H: ExHashT> Protocol { trace!(target: "sync", "Announcing block {:?} to {}", hash, who); let inserted = peer.known_blocks.insert(hash); if inserted || force { - let message = GenericMessage::BlockAnnounce(message::BlockAnnounce { + let message: Message = GenericMessage::BlockAnnounce(message::BlockAnnounce { header: header.clone(), state: if peer.info.protocol_version >= 4 { if is_best { @@ -1079,7 +1123,12 @@ impl, H: ExHashT> Protocol { }, }); - self.behaviour.send_packet(who, message) + send_message:: ( + &mut self.behaviour, + &mut self.context_data.stats, + &who, + message, + ) } } } @@ -1097,7 +1146,7 @@ impl, H: ExHashT> Protocol { chain_status: self.specialization.status(), }; - self.send_message(who, GenericMessage::Status(status)) + self.send_message(&who, GenericMessage::Status(status)) } fn on_block_announce(&mut self, who: PeerId, announce: BlockAnnounce) -> CustomMessageOutcome { @@ -1157,7 +1206,7 @@ impl, H: ExHashT> Protocol { match blocks_to_import { Ok(sync::OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), Ok(sync::OnBlockData::Request(peer, req)) => { - self.send_message(peer, GenericMessage::BlockRequest(req)); + self.send_request(&peer, GenericMessage::BlockRequest(req)); CustomMessageOutcome::None } Err(sync::BadPeer(id, repu)) => { @@ -1226,7 +1275,7 @@ impl, H: ExHashT> Protocol { }; self.send_message( - who, + &who, GenericMessage::RemoteCallResponse(message::RemoteCallResponse { id: request.id, proof, @@ -1269,7 +1318,13 @@ impl, H: ExHashT> Protocol { match result { Ok((id, req)) => { let msg = GenericMessage::BlockRequest(req); - send_message(&mut self.behaviour, &mut self.context_data.peers, id, msg) + send_request( + &mut self.behaviour, + &mut self.context_data.stats, + &mut self.context_data.peers, + &id, + msg + ) } Err(sync::BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id); @@ -1342,7 +1397,7 @@ impl, H: ExHashT> Protocol { } }; self.send_message( - who, + &who, GenericMessage::RemoteReadResponse(message::RemoteReadResponse { id: request.id, proof, @@ -1385,7 +1440,7 @@ impl, H: ExHashT> Protocol { } }; self.send_message( - who, + &who, GenericMessage::RemoteReadResponse(message::RemoteReadResponse { id: request.id, proof, @@ -1425,7 +1480,7 @@ impl, H: ExHashT> Protocol { } }; self.send_message( - who, + &who, GenericMessage::RemoteHeaderResponse(message::RemoteHeaderResponse { id: request.id, header, @@ -1495,7 +1550,7 @@ impl, H: ExHashT> Protocol { } }; self.send_message( - who, + &who, GenericMessage::RemoteChangesResponse(message::RemoteChangesResponse { id: request.id, max: proof.max_block, @@ -1545,7 +1600,7 @@ impl, H: ExHashT> Protocol { }, }; self.send_message( - who, + &who, GenericMessage::FinalityProofResponse(message::FinalityProofResponse { id: 0, block: request.block, @@ -1582,6 +1637,22 @@ impl, H: ExHashT> Protocol { peerset: self.peerset_handle.clone(), }, peer, response); } + + fn format_stats(&self) -> String { + let mut out = String::new(); + for (id, stats) in &self.context_data.stats { + let _ = writeln!( + &mut out, + "{}: In: {} bytes ({}), Out: {} bytes ({})", + id, + stats.bytes_in, + stats.count_in, + stats.bytes_out, + stats.count_out, + ); + } + out + } } /// Outcome of an incoming custom message. @@ -1593,14 +1664,15 @@ pub enum CustomMessageOutcome { None, } -fn send_message( - behaviour: &mut LegacyProto>, +fn send_request( + behaviour: &mut LegacyProto>, + stats: &mut HashMap<&'static str, PacketStats>, peers: &mut HashMap>, - who: PeerId, + who: &PeerId, mut message: Message, ) { if let GenericMessage::BlockRequest(ref mut r) = message { - if let Some(ref mut peer) = peers.get_mut(&who) { + if let Some(ref mut peer) = peers.get_mut(who) { r.id = peer.next_request_id; peer.next_request_id = peer.next_request_id + 1; if let Some((timestamp, request)) = peer.block_request.take() { @@ -1610,12 +1682,25 @@ fn send_message( peer.block_request = Some((time::Instant::now(), r.clone())); } } - behaviour.send_packet(&who, message); + send_message::(behaviour, stats, who, message) +} + +fn send_message( + behaviour: &mut LegacyProto>, + stats: &mut HashMap<&'static str, PacketStats>, + who: &PeerId, + message: Message, +) { + let encoded = message.encode(); + let mut stats = stats.entry(message.id()).or_default(); + stats.bytes_out += encoded.len() as u64; + stats.count_out += 1; + behaviour.send_packet(who, encoded); } impl, H: ExHashT> NetworkBehaviour for Protocol { - type ProtocolsHandler = > as NetworkBehaviour>::ProtocolsHandler; + type ProtocolsHandler = > as NetworkBehaviour>::ProtocolsHandler; type OutEvent = CustomMessageOutcome; fn new_handler(&mut self) -> Self::ProtocolsHandler { @@ -1660,13 +1745,30 @@ Protocol { } for (id, r) in self.sync.block_requests() { - send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::BlockRequest(r)) + send_request( + &mut self.behaviour, + &mut self.context_data.stats, + &mut self.context_data.peers, + &id, + GenericMessage::BlockRequest(r) + ) } for (id, r) in self.sync.justification_requests() { - send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::BlockRequest(r)) + send_request( + &mut self.behaviour, + &mut self.context_data.stats, + &mut self.context_data.peers, + &id, + GenericMessage::BlockRequest(r) + ) } for (id, r) in self.sync.finality_proof_requests() { - send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::FinalityProofRequest(r)) + send_request( + &mut self.behaviour, + &mut self.context_data.stats, + &mut self.context_data.peers, + &id, + GenericMessage::FinalityProofRequest(r)) } let event = match self.behaviour.poll(params) { @@ -1700,8 +1802,9 @@ Protocol { LegacyProtoOut::Clogged { peer_id, messages } => { debug!(target: "sync", "{} clogging messages:", messages.len()); for msg in messages.into_iter().take(5) { - debug!(target: "sync", "{:?}", msg); - self.on_clogged_peer(peer_id.clone(), Some(msg)); + let message: Option> = Decode::decode(&mut &msg[..]).ok(); + debug!(target: "sync", "{:?}", message); + self.on_clogged_peer(peer_id.clone(), message); } CustomMessageOutcome::None } @@ -1749,3 +1852,9 @@ impl, H: ExHashT> DiscoveryNetBehaviour f self.behaviour.add_discovered_nodes(peer_ids) } } + +impl, H: ExHashT> Drop for Protocol { + fn drop(&mut self) { + debug!(target: "sync", "Network stats:\n{}", self.format_stats()); + } +} diff --git a/core/network/src/protocol/message.rs b/core/network/src/protocol/message.rs index ef5e4dbb1db0f..22fb35806da86 100644 --- a/core/network/src/protocol/message.rs +++ b/core/network/src/protocol/message.rs @@ -222,6 +222,32 @@ pub mod generic { ChainSpecific(Vec), } + impl Message { + /// Message id useful for logging. + pub fn id(&self) -> &'static str { + match self { + Message::Status(_) => "Status", + Message::BlockRequest(_) => "BlockRequest", + Message::BlockResponse(_) => "BlockResponse", + Message::BlockAnnounce(_) => "BlockAnnounce", + Message::Transactions(_) => "Transactions", + Message::Consensus(_) => "Consensus", + Message::RemoteCallRequest(_) => "RemoteCallRequest", + Message::RemoteCallResponse(_) => "RemoteCallResponse", + Message::RemoteReadRequest(_) => "RemoteReadRequest", + Message::RemoteReadResponse(_) => "RemoteReadResponse", + Message::RemoteHeaderRequest(_) => "RemoteHeaderRequest", + Message::RemoteHeaderResponse(_) => "RemoteHeaderResponse", + Message::RemoteChangesRequest(_) => "RemoteChangesRequest", + Message::RemoteChangesResponse(_) => "RemoteChangesResponse", + Message::RemoteReadChildRequest(_) => "RemoteReadChildRequest", + Message::FinalityProofRequest(_) => "FinalityProofRequest", + Message::FinalityProofResponse(_) => "FinalityProofResponse", + Message::ChainSpecific(_) => "ChainSpecific", + } + } + } + /// Status sent on connection. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] pub struct Status {