From 578f9721181609daaa348e65f8f33b0f1fc4bc3d Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 5 May 2020 15:42:33 +0200 Subject: [PATCH 01/14] Move the legacy protocol handshake to the legacy substream --- client/network/src/protocol.rs | 182 ++++++------------ .../src/protocol/generic_proto/behaviour.rs | 31 ++- .../protocol/generic_proto/handler/group.rs | 17 +- .../protocol/generic_proto/handler/legacy.rs | 39 ++-- .../src/protocol/generic_proto/tests.rs | 2 +- .../protocol/generic_proto/upgrade/legacy.rs | 83 +++++--- 6 files changed, 183 insertions(+), 171 deletions(-) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 895624f08de6a..21983255edf97 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -126,7 +126,6 @@ mod rep { } struct Metrics { - handshaking_peers: Gauge, obsolete_requests: Gauge, peers: Gauge, queued_blocks: Gauge, @@ -138,10 +137,6 @@ struct Metrics { impl Metrics { fn register(r: &Registry) -> Result { Ok(Metrics { - handshaking_peers: { - let g = Gauge::new("sync_handshaking_peers", "Number of newly connected peers")?; - register(g, r)? - }, obsolete_requests: { let g = Gauge::new("sync_obsolete_requests", "Number of obsolete requests")?; register(g, r)? @@ -197,8 +192,6 @@ pub struct Protocol { /// List of nodes for which we perform additional logging because they are important for the /// user. important_peers: HashSet, - // Connected peers pending Status message. - handshaking_peers: HashMap, /// Used to report reputation changes. peerset_handle: sc_peerset::PeersetHandle, transaction_pool: Arc>, @@ -230,13 +223,6 @@ struct PacketStats { count_in: u64, count_out: u64, } - -/// A peer that we are connected to -/// and from whom we have not yet received a Status message. -struct HandshakingPeer { - timestamp: Instant, -} - /// Peer information #[derive(Debug, Clone)] struct Peer { @@ -317,6 +303,22 @@ impl BlockAnnouncesHandshake { } } +/// Builds a SCALE-encoded "Status" message to send as handshake for the legacy protocol. +fn build_status_message(protocol_config: &ProtocolConfig, chain: &Arc>) -> Vec { + let info = chain.info(); + let status = message::generic::Status { + version: CURRENT_VERSION, + min_supported_version: MIN_VERSION, + genesis_hash: info.genesis_hash, + roles: protocol_config.roles.into(), + best_number: info.best_number, + best_hash: info.best_hash, + chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible + }; + + Message::::Status(status).encode() +} + /// Fallback mechanism to use to send a notification if no substream is open. #[derive(Debug, Clone, PartialEq, Eq)] enum Fallback { @@ -365,7 +367,13 @@ impl Protocol { let (peerset, peerset_handle) = sc_peerset::Peerset::from_config(peerset_config); let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::>(); - let mut behaviour = GenericProto::new(protocol_id.clone(), versions, peerset, queue_size_report); + let mut behaviour = GenericProto::new( + protocol_id.clone(), + versions, + build_status_message(&config, &chain), + peerset, + queue_size_report, + ); let mut legacy_equiv_by_name = HashMap::new(); @@ -402,7 +410,6 @@ impl Protocol { }, genesis_hash: info.genesis_hash, sync, - handshaking_peers: HashMap::new(), important_peers, transaction_pool, finality_proof_provider, @@ -561,7 +568,8 @@ impl Protocol { stats.count_in += 1; match message { - GenericMessage::Status(s) => return self.on_status_message(who, s), + GenericMessage::Status(_) => + warn!(target: "sub-libp2p", "Received unexpected Status"), GenericMessage::BlockRequest(r) => self.on_block_request(who, r), GenericMessage::BlockResponse(r) => { if let Some(request) = self.handle_response(who.clone(), &r) { @@ -660,13 +668,6 @@ impl Protocol { ); } - /// Called when a new peer is connected - pub fn on_peer_connected(&mut self, who: PeerId) { - trace!(target: "sync", "Connecting {}", who); - self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: Instant::now() }); - self.send_status(who); - } - /// Called by peer when it is disconnecting pub fn on_peer_disconnected(&mut self, peer: PeerId) -> CustomMessageOutcome { if self.important_peers.contains(&peer) { @@ -676,11 +677,7 @@ impl Protocol { } // lock all the the peer lists so that add/remove peer events are in order - let removed = { - self.handshaking_peers.remove(&peer); - self.context_data.peers.remove(&peer) - }; - if let Some(_peer_data) = removed { + if let Some(_peer_data) = self.context_data.peers.remove(&peer) { self.sync.peer_disconnected(peer.clone()); // Notify all the notification protocols as closed. @@ -895,16 +892,6 @@ impl Protocol { aborting.push(who.clone()); } } - for (who, _) in self.handshaking_peers.iter() - .filter(|(_, handshaking)| (tick - handshaking.timestamp).as_secs() > REQUEST_TIMEOUT_SEC) - { - log!( - target: "sync", - if self.important_peers.contains(who) { Level::Warn } else { Level::Trace }, - "Handshake timeout {}", who - ); - aborting.push(who.clone()); - } } for p in aborting { @@ -914,7 +901,7 @@ impl Protocol { } /// Called by peer to report status - fn on_status_message(&mut self, who: PeerId, status: message::Status) -> CustomMessageOutcome { + pub fn on_peer_connected(&mut self, who: PeerId, status: message::Status) -> CustomMessageOutcome { trace!(target: "sync", "New peer {} {:?}", who, status); let _protocol_version = { if self.context_data.peers.contains_key(&who) { @@ -986,23 +973,13 @@ impl Protocol { } } - let info = match self.handshaking_peers.remove(&who) { - Some(_handshaking) => { - PeerInfo { - protocol_version: status.version, - roles: status.roles, - best_hash: status.best_hash, - best_number: status.best_number - } - }, - None => { - error!(target: "sync", "Received status from previously unconnected node {}", who); - return CustomMessageOutcome::None; - }, - }; - let peer = Peer { - info, + info: PeerInfo { + protocol_version: status.version, + roles: status.roles, + best_hash: status.best_hash, + best_number: status.best_number + }, block_request: None, known_extrinsics: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_EXTRINSICS) .expect("Constant is nonzero")), @@ -1264,22 +1241,6 @@ impl Protocol { } } - /// Send Status message - fn send_status(&mut self, who: PeerId) { - let info = self.context_data.chain.info(); - let status = message::generic::Status { - version: CURRENT_VERSION, - min_supported_version: MIN_VERSION, - genesis_hash: info.genesis_hash, - roles: self.config.roles.into(), - best_number: info.best_number, - best_hash: info.best_hash, - chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible - }; - - self.send_message(&who, None, GenericMessage::Status(status)) - } - fn on_block_announce( &mut self, who: PeerId, @@ -1365,6 +1326,7 @@ impl Protocol { pub fn on_block_imported(&mut self, header: &B::Header, is_best: bool) { if is_best { self.sync.update_chain_info(header); + self.behaviour.set_legacy_handshake_message(build_status_message(&self.config, &self.context_data.chain)); self.behaviour.set_notif_protocol_handshake( &self.block_announces_protocol, BlockAnnouncesHandshake::build(&self.config, &self.context_data.chain).encode() @@ -1780,9 +1742,6 @@ impl Protocol { } metrics.obsolete_requests.set(obsolete_requests); - let n = self.handshaking_peers.len().try_into().unwrap_or(std::u64::MAX); - metrics.handshaking_peers.set(n); - let n = self.context_data.peers.len().try_into().unwrap_or(std::u64::MAX); metrics.peers.set(n); @@ -2009,9 +1968,31 @@ impl NetworkBehaviour for Protocol { }; let outcome = match event { - GenericProtoOut::CustomProtocolOpen { peer_id, .. } => { - self.on_peer_connected(peer_id.clone()); - CustomMessageOutcome::None + GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, .. } => { + match as Decode>::decode(&mut &received_handshake[..]) { + Ok(GenericMessage::Status(handshake)) => self.on_peer_connected(peer_id, handshake), + Ok(msg) => { + debug!( + target: "sync", + "Expected Status message from {}, but got {:?}", + peer_id, + msg, + ); + self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); + CustomMessageOutcome::None + } + Err(err) => { + debug!( + target: "sync", + "Couldn't decode handshake sent by {}: {:?}: {}", + peer_id, + received_handshake, + err.what() + ); + self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); + CustomMessageOutcome::None + } + } } GenericProtoOut::CustomProtocolClosed { peer_id, .. } => { self.on_peer_disconnected(peer_id.clone()) @@ -2106,48 +2087,3 @@ impl Drop for Protocol { debug!(target: "sync", "Network stats:\n{}", self.format_stats()); } } - -#[cfg(test)] -mod tests { - use crate::PeerId; - use crate::config::EmptyTransactionPool; - use super::{CustomMessageOutcome, Protocol, ProtocolConfig}; - - use sp_consensus::block_validation::DefaultBlockAnnounceValidator; - use std::sync::Arc; - use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt}; - use substrate_test_runtime_client::runtime::{Block, Hash}; - - #[test] - fn no_handshake_no_notif_closed() { - let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0); - - let (mut protocol, _) = Protocol::::new( - ProtocolConfig::default(), - client.clone(), - Arc::new(EmptyTransactionPool), - None, - None, - From::from(&b"test"[..]), - sc_peerset::PeersetConfig { - in_peers: 10, - out_peers: 10, - bootnodes: Vec::new(), - reserved_only: false, - priority_groups: Vec::new(), - }, - Box::new(DefaultBlockAnnounceValidator::new(client.clone())), - None, - Default::default(), - true, - None, - ).unwrap(); - - let dummy_peer_id = PeerId::random(); - let _ = protocol.on_peer_connected(dummy_peer_id.clone()); - match protocol.on_peer_disconnected(dummy_peer_id) { - CustomMessageOutcome::None => {}, - _ => panic!() - }; - } -} diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index 3ce98dc11ed3c..38796ca167d89 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -275,6 +275,9 @@ pub enum GenericProtoOut { CustomProtocolOpen { /// Id of the peer we are connected to. peer_id: PeerId, + /// Handshake that was sent to us. + /// This is normally a "Status" message, but this out of the concern of this code. + received_handshake: Vec, }, /// Closed a custom protocol with the remote. @@ -323,10 +326,11 @@ impl GenericProto { pub fn new( protocol: impl Into, versions: &[u8], + handshake_message: Vec, peerset: sc_peerset::Peerset, queue_size_report: Option, ) -> Self { - let legacy_protocol = RegisteredProtocol::new(protocol, versions); + let legacy_protocol = RegisteredProtocol::new(protocol, versions, handshake_message); GenericProto { legacy_protocol, @@ -380,6 +384,27 @@ impl GenericProto { } } + /// Modifies the handshake of the legacy protocol. + pub fn set_legacy_handshake_message( + &mut self, + handshake_message: impl Into> + ) { + let handshake_message = handshake_message.into(); + + // Send an event to all the peers we're connected to, updating the handshake message. + for (peer_id, _) in self.peers.iter().filter(|(_, state)| state.is_connected()) { + self.events.push(NetworkBehaviourAction::NotifyHandler { + peer_id: peer_id.clone(), + handler: NotifyHandler::All, + event: NotifsHandlerIn::UpdateLegacyHandshake { + handshake_message: handshake_message.clone(), + }, + }); + } + + self.legacy_protocol.set_handshake_message(handshake_message); + } + /// Returns the number of discovered nodes that we keep in memory. pub fn num_discovered_peers(&self) -> usize { self.peerset.num_discovered_peers() @@ -1158,7 +1183,7 @@ impl NetworkBehaviour for GenericProto { } } - NotifsHandlerOut::Open { endpoint } => { + NotifsHandlerOut::Open { endpoint, received_handshake } => { debug!(target: "sub-libp2p", "Handler({:?}) => Endpoint {:?} open for custom protocols.", source, endpoint); @@ -1181,7 +1206,7 @@ impl NetworkBehaviour for GenericProto { if first { debug!(target: "sub-libp2p", "External API <= Open({:?})", source); - let event = GenericProtoOut::CustomProtocolOpen { peer_id: source }; + let event = GenericProtoOut::CustomProtocolOpen { peer_id: source, received_handshake }; self.events.push(NetworkBehaviourAction::GenerateEvent(event)); } else { debug!(target: "sub-libp2p", "Secondary connection opened custom protocol."); diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 0e453a368c222..ee0e603542815 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -161,6 +161,13 @@ pub enum NotifsHandlerIn { message: Vec, }, + /// Modifies the handshake message of the legacy protocol. + UpdateLegacyHandshake { + /// The new handshake message to send if we open a substream or if the remote opens a + /// substream towards us. + handshake_message: Vec, + }, + /// Modifies the handshake message of a notifications protocol. UpdateHandshake { /// Name of the protocol for the message. @@ -198,6 +205,9 @@ pub enum NotifsHandlerOut { Open { /// The endpoint of the connection that is open for custom protocols. endpoint: ConnectedPoint, + /// Handshake that was sent to us. + /// This is normally a "Status" message, but this out of the concern of this code. + received_handshake: Vec, }, /// The connection is closed for custom protocols. @@ -375,6 +385,9 @@ impl ProtocolsHandler for NotifsHandler { }, NotifsHandlerIn::SendLegacy { message } => self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message }), + NotifsHandlerIn::UpdateLegacyHandshake { handshake_message } => { + self.legacy.set_handshake_message(handshake_message); + } NotifsHandlerIn::UpdateHandshake { protocol_name, handshake_message } => { for (handler, current_handshake) in &mut self.in_handlers { if handler.protocol_name() == &*protocol_name { @@ -490,9 +503,9 @@ impl ProtocolsHandler for NotifsHandler { protocol: protocol.map_upgrade(EitherUpgrade::B), info: None, }), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, .. }) => + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, received_handshake, .. }) => return Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::Open { endpoint } + NotifsHandlerOut::Open { endpoint, received_handshake } )), ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) => return Poll::Ready(ProtocolsHandlerEvent::Custom( diff --git a/client/network/src/protocol/generic_proto/handler/legacy.rs b/client/network/src/protocol/generic_proto/handler/legacy.rs index bc84fd847c9c4..db50bf678504d 100644 --- a/client/network/src/protocol/generic_proto/handler/legacy.rs +++ b/client/network/src/protocol/generic_proto/handler/legacy.rs @@ -150,7 +150,8 @@ enum ProtocolState { /// Waiting for the behaviour to tell the handler whether it is enabled or disabled. Init { /// List of substreams opened by the remote but that haven't been processed yet. - substreams: SmallVec<[RegisteredProtocolSubstream; 6]>, + /// For each substream, also includes the handshake message that we have received. + substreams: SmallVec<[(RegisteredProtocolSubstream, Vec); 6]>, /// Deadline after which the initialization is abnormally long. init_deadline: Delay, }, @@ -218,6 +219,9 @@ pub enum LegacyProtoHandlerOut { CustomProtocolOpen { /// Version of the protocol that has been opened. version: u8, + /// Handshake message that has been sent to us. + /// This is normally a "Status" message, but this out of the concern of this code. + received_handshake: Vec, /// The connected endpoint. endpoint: ConnectedPoint, }, @@ -253,6 +257,11 @@ pub enum LegacyProtoHandlerOut { } impl LegacyProtoHandler { + /// Modifies the handshake message. + pub fn set_handshake_message(&mut self, handshake_message: Vec) { + self.protocol.set_handshake_message(handshake_message); + } + /// Returns true if the legacy substream is currently open. pub fn is_open(&self) -> bool { match &self.state { @@ -274,7 +283,7 @@ impl LegacyProtoHandler { ProtocolState::Poisoned } - ProtocolState::Init { substreams: incoming, .. } => { + ProtocolState::Init { substreams: mut incoming, .. } => { if incoming.is_empty() { if let ConnectedPoint::Dialer { .. } = self.endpoint { self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { @@ -287,12 +296,13 @@ impl LegacyProtoHandler { } } else { let event = LegacyProtoHandlerOut::CustomProtocolOpen { - version: incoming[0].protocol_version(), - endpoint: self.endpoint.clone() + version: incoming[0].0.protocol_version(), + endpoint: self.endpoint.clone(), + received_handshake: mem::replace(&mut incoming[0].1, Vec::new()), }; self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); ProtocolState::Normal { - substreams: incoming.into_iter().collect(), + substreams: incoming.into_iter().map(|(s, _)| s).collect(), shutdown: SmallVec::new() } } @@ -316,7 +326,8 @@ impl LegacyProtoHandler { ProtocolState::Poisoned } - ProtocolState::Init { substreams: mut shutdown, .. } => { + ProtocolState::Init { substreams: shutdown, .. } => { + let mut shutdown = shutdown.into_iter().map(|(s, _)| s).collect::>(); for s in &mut shutdown { s.shutdown(); } @@ -465,7 +476,8 @@ impl LegacyProtoHandler { /// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`. fn inject_fully_negotiated( &mut self, - mut substream: RegisteredProtocolSubstream + mut substream: RegisteredProtocolSubstream, + received_handshake: Vec, ) { self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) { ProtocolState::Poisoned => { @@ -479,14 +491,15 @@ impl LegacyProtoHandler { error!(target: "sub-libp2p", "Opened dialing substream with {:?} before \ initialization", self.remote_peer_id); } - substreams.push(substream); + substreams.push((substream, received_handshake)); ProtocolState::Init { substreams, init_deadline } } ProtocolState::Opening { .. } => { let event = LegacyProtoHandlerOut::CustomProtocolOpen { version: substream.protocol_version(), - endpoint: self.endpoint.clone() + endpoint: self.endpoint.clone(), + received_handshake, }; self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); ProtocolState::Normal { @@ -536,17 +549,17 @@ impl ProtocolsHandler for LegacyProtoHandler { fn inject_fully_negotiated_inbound( &mut self, - proto: >::Output + (substream, handshake): >::Output ) { - self.inject_fully_negotiated(proto); + self.inject_fully_negotiated(substream, handshake); } fn inject_fully_negotiated_outbound( &mut self, - proto: >::Output, + (substream, handshake): >::Output, _: Self::OutboundOpenInfo ) { - self.inject_fully_negotiated(proto); + self.inject_fully_negotiated(substream, handshake); } fn inject_event(&mut self, message: LegacyProtoHandlerIn) { diff --git a/client/network/src/protocol/generic_proto/tests.rs b/client/network/src/protocol/generic_proto/tests.rs index 1bc6e745f8876..a2cc385ebfaa6 100644 --- a/client/network/src/protocol/generic_proto/tests.rs +++ b/client/network/src/protocol/generic_proto/tests.rs @@ -82,7 +82,7 @@ fn build_nodes() -> (Swarm, Swarm) { }); let behaviour = CustomProtoWithAddr { - inner: GenericProto::new(&b"test"[..], &[1], peerset, None), + inner: GenericProto::new(&b"test"[..], &[1], vec![], peerset, None), addrs: addrs .iter() .enumerate() diff --git a/client/network/src/protocol/generic_proto/upgrade/legacy.rs b/client/network/src/protocol/generic_proto/upgrade/legacy.rs index 311e0b04f9724..9a4eabd7c05ff 100644 --- a/client/network/src/protocol/generic_proto/upgrade/legacy.rs +++ b/client/network/src/protocol/generic_proto/upgrade/legacy.rs @@ -36,12 +36,14 @@ pub struct RegisteredProtocol { /// List of protocol versions that we support. /// Ordered in descending order so that the best comes first. supported_versions: Vec, + /// Handshake to send after the substream is open. + handshake_message: Vec, } impl RegisteredProtocol { /// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be /// passed inside the `RegisteredProtocolOutput`. - pub fn new(protocol: impl Into, versions: &[u8]) + pub fn new(protocol: impl Into, versions: &[u8], handshake_message: Vec) -> Self { let protocol = protocol.into(); let mut base_name = b"/substrate/".to_vec(); @@ -56,8 +58,14 @@ impl RegisteredProtocol { tmp.sort_unstable_by(|a, b| b.cmp(&a)); tmp }, + handshake_message, } } + + /// Modifies the handshake message. + pub fn set_handshake_message(&mut self, handshake_message: Vec) { + self.handshake_message = handshake_message; + } } impl Clone for RegisteredProtocol { @@ -66,6 +74,7 @@ impl Clone for RegisteredProtocol { id: self.id.clone(), base_name: self.base_name.clone(), supported_versions: self.supported_versions.clone(), + handshake_message: self.handshake_message.clone(), } } } @@ -242,10 +251,10 @@ impl ProtocolName for RegisteredProtocolName { } impl InboundUpgrade for RegisteredProtocol -where TSubstream: AsyncRead + AsyncWrite + Unpin, +where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - type Output = RegisteredProtocolSubstream; - type Future = future::Ready>; + type Output = (RegisteredProtocolSubstream, Vec); + type Future = Pin> + Send>>; type Error = io::Error; fn upgrade_inbound( @@ -253,26 +262,32 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, socket: TSubstream, info: Self::Info, ) -> Self::Future { - let framed = { - let mut codec = UviBytes::default(); - codec.set_max_len(16 * 1024 * 1024); // 16 MiB hard limit for packets. - Framed::new(socket, codec) - }; - - future::ok(RegisteredProtocolSubstream { - is_closing: false, - endpoint: Endpoint::Listener, - send_queue: VecDeque::new(), - requires_poll_flush: false, - inner: framed.fuse(), - protocol_version: info.version, - clogged_fuse: false, + Box::pin(async move { + let mut framed = { + let mut codec = UviBytes::default(); + codec.set_max_len(16 * 1024 * 1024); // 16 MiB hard limit for packets. + Framed::new(socket, codec) + }; + + framed.send(BytesMut::from(&self.handshake_message[..])).await?; + let received_handshake = framed.next().await + .ok_or_else(|| io::ErrorKind::UnexpectedEof)??; + + Ok((RegisteredProtocolSubstream { + is_closing: false, + endpoint: Endpoint::Listener, + send_queue: VecDeque::new(), + requires_poll_flush: false, + inner: framed.fuse(), + protocol_version: info.version, + clogged_fuse: false, + }, received_handshake.to_vec())) }) } } impl OutboundUpgrade for RegisteredProtocol -where TSubstream: AsyncRead + AsyncWrite + Unpin, +where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, { type Output = >::Output; type Future = >::Future; @@ -283,16 +298,26 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, socket: TSubstream, info: Self::Info, ) -> Self::Future { - let framed = Framed::new(socket, UviBytes::default()); - - future::ok(RegisteredProtocolSubstream { - is_closing: false, - endpoint: Endpoint::Dialer, - send_queue: VecDeque::new(), - requires_poll_flush: false, - inner: framed.fuse(), - protocol_version: info.version, - clogged_fuse: false, + Box::pin(async move { + let mut framed = { + let mut codec = UviBytes::default(); + codec.set_max_len(16 * 1024 * 1024); // 16 MiB hard limit for packets. + Framed::new(socket, codec) + }; + + framed.send(BytesMut::from(&self.handshake_message[..])).await?; + let received_handshake = framed.next().await + .ok_or_else(|| io::ErrorKind::UnexpectedEof)??; + + Ok((RegisteredProtocolSubstream { + is_closing: false, + endpoint: Endpoint::Dialer, + send_queue: VecDeque::new(), + requires_poll_flush: false, + inner: framed.fuse(), + protocol_version: info.version, + clogged_fuse: false, + }, received_handshake.to_vec())) }) } } From 9ca434b78f9b5a00826e863ccf9dafb1bfbccaa5 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 14 May 2020 16:44:08 +0200 Subject: [PATCH 02/14] Fix tests --- client/network/src/protocol.rs | 13 +++++++++++++ client/service/test/src/lib.rs | 1 + 2 files changed, 14 insertions(+) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 4fc6e326e0d23..3ee60eab842fa 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -1903,6 +1903,19 @@ impl NetworkBehaviour for Protocol { type OutEvent = CustomMessageOutcome; fn new_handler(&mut self) -> Self::ProtocolsHandler { + // TODO: https://github.com/paritytech/substrate/issues/6033 + // These functions calls that update the state of the chain are necessary because the test + // infrastructure doesn't call the `on_block_import` method, despite the documentation + // saying that you must call this function after importing a block. Calling these functions + // too often has no downside except degrading performances. + self.behaviour.set_legacy_handshake_message( + build_status_message(&self.config, &self.context_data.chain) + ); + self.behaviour.set_notif_protocol_handshake( + &self.block_announces_protocol, + BlockAnnouncesHandshake::build(&self.config, &self.context_data.chain).encode() + ); + self.behaviour.new_handler() } diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index fd6774136597c..09eb066eb5185 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -454,6 +454,7 @@ pub fn sync( } make_block_and_import(&first_service.get(), first_user_data); + first_service.0.lock().unwrap().network().on_block_imported(unimplemented!(), true); } network.full_nodes[0].3.clone() }; From 1971d2d944d41c15b1160b92fe1ed4861808f6b8 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 14 May 2020 16:50:06 +0200 Subject: [PATCH 03/14] Remove line that wasn't supposed to be committed --- client/service/test/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index 09eb066eb5185..fd6774136597c 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -454,7 +454,6 @@ pub fn sync( } make_block_and_import(&first_service.get(), first_user_data); - first_service.0.lock().unwrap().network().on_block_imported(unimplemented!(), true); } network.full_nodes[0].3.clone() }; From 5d41c605e6448576082c20946db370f0dfcdb85c Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 18 May 2020 09:56:44 +0200 Subject: [PATCH 04/14] Remove hack --- client/network/src/protocol.rs | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 96fb65ba98dff..b4d13ceb96292 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -1909,19 +1909,6 @@ impl NetworkBehaviour for Protocol { type OutEvent = CustomMessageOutcome; fn new_handler(&mut self) -> Self::ProtocolsHandler { - // TODO: https://github.com/paritytech/substrate/issues/6033 - // These functions calls that update the state of the chain are necessary because the test - // infrastructure doesn't call the `on_block_import` method, despite the documentation - // saying that you must call this function after importing a block. Calling these functions - // too often has no downside except degrading performances. - self.behaviour.set_legacy_handshake_message( - build_status_message(&self.config, &self.context_data.chain) - ); - self.behaviour.set_notif_protocol_handshake( - &self.block_announces_protocol, - BlockAnnouncesHandshake::build(&self.config, &self.context_data.chain).encode() - ); - self.behaviour.new_handler() } From 5795d6fa90f57e380c3947a547611e4a12f696a9 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 25 May 2020 16:22:12 +0200 Subject: [PATCH 05/14] Rework how it's done --- .../src/protocol/generic_proto/behaviour.rs | 53 +++---------------- .../protocol/generic_proto/handler/group.rs | 53 ++++--------------- .../protocol/generic_proto/handler/legacy.rs | 5 -- .../protocol/generic_proto/upgrade/legacy.rs | 22 ++++---- 4 files changed, 30 insertions(+), 103 deletions(-) diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index 81b7db2ef55fb..3c23ba0b26973 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -30,12 +30,13 @@ use libp2p::swarm::{ PollParameters }; use log::{debug, error, trace, warn}; +use parking_lot::RwLock; use prometheus_endpoint::HistogramVec; use rand::distributions::{Distribution as _, Uniform}; use smallvec::SmallVec; use std::task::{Context, Poll}; use std::{borrow::Cow, cmp, collections::{hash_map::Entry, VecDeque}}; -use std::{error, mem, pin::Pin, str, time::Duration}; +use std::{error, mem, pin::Pin, str, sync::Arc, time::Duration}; use wasm_timer::Instant; /// Network behaviour that handles opening substreams for custom protocols with other peers. @@ -118,7 +119,7 @@ pub struct GenericProto { /// Notification protocols. Entries are only ever added and not removed. /// Contains, for each protocol, the protocol name and the message to send as part of the /// initial handshake. - notif_protocols: Vec<(Cow<'static, [u8]>, Vec)>, + notif_protocols: Vec<(Cow<'static, [u8]>, Arc>>)>, /// Receiver for instructions about who to connect to or disconnect from. peerset: sc_peerset::Peerset, @@ -204,20 +205,6 @@ enum PeerState { } impl PeerState { - /// True if there exists any established connection to the peer. - fn is_connected(&self) -> bool { - match self { - PeerState::Disabled { .. } | - PeerState::DisabledPendingEnable { .. } | - PeerState::Enabled { .. } | - PeerState::PendingRequest { .. } | - PeerState::Requested | - PeerState::Incoming { .. } => true, - PeerState::Poisoned | - PeerState::Banned { .. } => false, - } - } - /// True if there exists an established connection to the peer /// that is open for custom protocol traffic. fn is_open(&self) -> bool { @@ -334,7 +321,8 @@ impl GenericProto { peerset: sc_peerset::Peerset, queue_size_report: Option, ) -> Self { - let legacy_protocol = RegisteredProtocol::new(protocol, versions, handshake_message); + let legacy_handshake_message = Arc::new(RwLock::new(handshake_message)); + let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message); GenericProto { local_peer_id, @@ -358,7 +346,7 @@ impl GenericProto { protocol_name: impl Into>, handshake_msg: impl Into> ) { - self.notif_protocols.push((protocol_name.into(), handshake_msg.into())); + self.notif_protocols.push((protocol_name.into(), Arc::new(RwLock::new(handshake_msg.into())))); } /// Modifies the handshake of the given notifications protocol. @@ -371,22 +359,10 @@ impl GenericProto { ) { let handshake_message = handshake_message.into(); if let Some(protocol) = self.notif_protocols.iter_mut().find(|(name, _)| name == &protocol_name) { - protocol.1 = handshake_message.clone(); + *protocol.1.write() = handshake_message; } else { return; } - - // Send an event to all the peers we're connected to, updating the handshake message. - for (peer_id, _) in self.peers.iter().filter(|(_, state)| state.is_connected()) { - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: peer_id.clone(), - handler: NotifyHandler::All, - event: NotifsHandlerIn::UpdateHandshake { - protocol_name: Cow::Owned(protocol_name.to_owned()), - handshake_message: handshake_message.clone(), - }, - }); - } } /// Modifies the handshake of the legacy protocol. @@ -394,20 +370,7 @@ impl GenericProto { &mut self, handshake_message: impl Into> ) { - let handshake_message = handshake_message.into(); - - // Send an event to all the peers we're connected to, updating the handshake message. - for (peer_id, _) in self.peers.iter().filter(|(_, state)| state.is_connected()) { - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: peer_id.clone(), - handler: NotifyHandler::All, - event: NotifsHandlerIn::UpdateLegacyHandshake { - handshake_message: handshake_message.clone(), - }, - }); - } - - self.legacy_protocol.set_handshake_message(handshake_message); + *self.legacy_protocol.handshake_message().write() = handshake_message.into(); } /// Returns the number of discovered nodes that we keep in memory. diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index d143a0de06a78..37f7f6126aaf6 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -64,8 +64,9 @@ use libp2p::swarm::{ NegotiatedSubstream, }; use log::{debug, error}; +use parking_lot::RwLock; use prometheus_endpoint::HistogramVec; -use std::{borrow::Cow, error, io, str, task::{Context, Poll}}; +use std::{borrow::Cow, error, io, str, sync::Arc, task::{Context, Poll}}; /// Implements the `IntoProtocolsHandler` trait of libp2p. /// @@ -77,10 +78,10 @@ use std::{borrow::Cow, error, io, str, task::{Context, Poll}}; pub struct NotifsHandlerProto { /// Prototypes for handlers for inbound substreams, and the message we respond with in the /// handshake. - in_handlers: Vec<(NotifsInHandlerProto, Vec)>, + in_handlers: Vec<(NotifsInHandlerProto, Arc>>)>, /// Prototypes for handlers for outbound substreams, and the initial handshake message we send. - out_handlers: Vec<(NotifsOutHandlerProto, Vec)>, + out_handlers: Vec<(NotifsOutHandlerProto, Arc>>)>, /// Prototype for handler for backwards-compatibility. legacy: LegacyProtoHandlerProto, @@ -91,10 +92,10 @@ pub struct NotifsHandlerProto { /// See the documentation at the module level for more information. pub struct NotifsHandler { /// Handlers for inbound substreams, and the message we respond with in the handshake. - in_handlers: Vec<(NotifsInHandler, Vec)>, + in_handlers: Vec<(NotifsInHandler, Arc>>)>, /// Handlers for outbound substreams, and the initial handshake message we send. - out_handlers: Vec<(NotifsOutHandler, Vec)>, + out_handlers: Vec<(NotifsOutHandler, Arc>>)>, /// Handler for backwards-compatibility. legacy: LegacyProtoHandler, @@ -161,25 +162,6 @@ pub enum NotifsHandlerIn { message: Vec, }, - /// Modifies the handshake message of the legacy protocol. - UpdateLegacyHandshake { - /// The new handshake message to send if we open a substream or if the remote opens a - /// substream towards us. - handshake_message: Vec, - }, - - /// Modifies the handshake message of a notifications protocol. - UpdateHandshake { - /// Name of the protocol for the message. - /// - /// Must match one of the registered protocols. - protocol_name: Cow<'static, [u8]>, - - /// The new handshake message to send if we open a substream or if the remote opens a - /// substream towards us. - handshake_message: Vec, - }, - /// Sends a notifications message. SendNotification { /// Name of the protocol for the message. @@ -263,7 +245,7 @@ impl NotifsHandlerProto { /// messages queue. If passed, it must have one label for the protocol name. pub fn new( legacy: RegisteredProtocol, - list: impl Into, Vec)>>, + list: impl Into, Arc>>)>>, queue_size_report: Option ) -> Self { let list = list.into(); @@ -357,11 +339,11 @@ impl ProtocolsHandler for NotifsHandler { self.legacy.inject_event(LegacyProtoHandlerIn::Enable); for (handler, initial_message) in &mut self.out_handlers { handler.inject_event(NotifsOutHandlerIn::Enable { - initial_message: initial_message.clone(), + initial_message: initial_message.read().clone(), }); } for num in self.pending_in.drain(..) { - let handshake_message = self.in_handlers[num].1.clone(); + let handshake_message = self.in_handlers[num].1.read().clone(); self.in_handlers[num].0 .inject_event(NotifsInHandlerIn::Accept(handshake_message)); } @@ -385,21 +367,6 @@ impl ProtocolsHandler for NotifsHandler { }, NotifsHandlerIn::SendLegacy { message } => self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message }), - NotifsHandlerIn::UpdateLegacyHandshake { handshake_message } => { - self.legacy.set_handshake_message(handshake_message); - } - NotifsHandlerIn::UpdateHandshake { protocol_name, handshake_message } => { - for (handler, current_handshake) in &mut self.in_handlers { - if handler.protocol_name() == &*protocol_name { - *current_handshake = handshake_message.clone(); - } - } - for (handler, current_handshake) in &mut self.out_handlers { - if handler.protocol_name() == &*protocol_name { - *current_handshake = handshake_message.clone(); - } - } - } NotifsHandlerIn::SendNotification { message, encoded_fallback_message, protocol_name } => { for (handler, _) in &mut self.out_handlers { if handler.protocol_name() != &protocol_name[..] { @@ -538,7 +505,7 @@ impl ProtocolsHandler for NotifsHandler { match self.enabled { EnabledState::Initial => self.pending_in.push(handler_num), EnabledState::Enabled => - handler.inject_event(NotifsInHandlerIn::Accept(handshake_message.clone())), + handler.inject_event(NotifsInHandlerIn::Accept(handshake_message.read().clone())), EnabledState::Disabled => handler.inject_event(NotifsInHandlerIn::Refuse), }, diff --git a/client/network/src/protocol/generic_proto/handler/legacy.rs b/client/network/src/protocol/generic_proto/handler/legacy.rs index 1fe025dddff6e..d2f54fa3aaf9d 100644 --- a/client/network/src/protocol/generic_proto/handler/legacy.rs +++ b/client/network/src/protocol/generic_proto/handler/legacy.rs @@ -257,11 +257,6 @@ pub enum LegacyProtoHandlerOut { } impl LegacyProtoHandler { - /// Modifies the handshake message. - pub fn set_handshake_message(&mut self, handshake_message: Vec) { - self.protocol.set_handshake_message(handshake_message); - } - /// Returns true if the legacy substream is currently open. pub fn is_open(&self) -> bool { match &self.state { diff --git a/client/network/src/protocol/generic_proto/upgrade/legacy.rs b/client/network/src/protocol/generic_proto/upgrade/legacy.rs index 371665c409c35..7a43ee85dcbc3 100644 --- a/client/network/src/protocol/generic_proto/upgrade/legacy.rs +++ b/client/network/src/protocol/generic_proto/upgrade/legacy.rs @@ -21,7 +21,8 @@ use bytes::BytesMut; use futures::prelude::*; use futures_codec::Framed; use libp2p::core::{Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName}; -use std::{collections::VecDeque, io, pin::Pin, vec::IntoIter as VecIntoIter}; +use parking_lot::RwLock; +use std::{collections::VecDeque, io, pin::Pin, sync::Arc, vec::IntoIter as VecIntoIter}; use std::task::{Context, Poll}; use unsigned_varint::codec::UviBytes; @@ -39,13 +40,12 @@ pub struct RegisteredProtocol { /// Ordered in descending order so that the best comes first. supported_versions: Vec, /// Handshake to send after the substream is open. - handshake_message: Vec, + handshake_message: Arc>>, } impl RegisteredProtocol { - /// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be - /// passed inside the `RegisteredProtocolOutput`. - pub fn new(protocol: impl Into, versions: &[u8], handshake_message: Vec) + /// Creates a new `RegisteredProtocol`. + pub fn new(protocol: impl Into, versions: &[u8], handshake_message: Arc>>) -> Self { let protocol = protocol.into(); let mut base_name = b"/substrate/".to_vec(); @@ -64,9 +64,9 @@ impl RegisteredProtocol { } } - /// Modifies the handshake message. - pub fn set_handshake_message(&mut self, handshake_message: Vec) { - self.handshake_message = handshake_message; + /// Returns the `Arc` to the handshake message that was passed at initialization. + pub fn handshake_message(&self) -> &Arc>> { + &self.handshake_message } } @@ -271,7 +271,8 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, Framed::new(socket, codec) }; - framed.send(BytesMut::from(&self.handshake_message[..])).await?; + let handshake = BytesMut::from(&self.handshake_message.read()[..]); + framed.send(handshake).await?; let received_handshake = framed.next().await .ok_or_else(|| io::ErrorKind::UnexpectedEof)??; @@ -307,7 +308,8 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, Framed::new(socket, codec) }; - framed.send(BytesMut::from(&self.handshake_message[..])).await?; + let handshake = BytesMut::from(&self.handshake_message.read()[..]); + framed.send(handshake).await?; let received_handshake = framed.next().await .ok_or_else(|| io::ErrorKind::UnexpectedEof)??; From 2653c0d2c63273f3fe2d58f339cc2ad165327e81 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 25 May 2020 16:58:49 +0200 Subject: [PATCH 06/14] Some little changes --- .../src/protocol/generic_proto/behaviour.rs | 5 +---- .../src/protocol/generic_proto/handler/group.rs | 15 ++++++++++++--- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index 3c23ba0b26973..d5700c16301ce 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -357,11 +357,8 @@ impl GenericProto { protocol_name: &[u8], handshake_message: impl Into> ) { - let handshake_message = handshake_message.into(); if let Some(protocol) = self.notif_protocols.iter_mut().find(|(name, _)| name == &protocol_name) { - *protocol.1.write() = handshake_message; - } else { - return; + *protocol.1.write() = handshake_message.into(); } } diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 37f7f6126aaf6..535341a6681ce 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -338,11 +338,16 @@ impl ProtocolsHandler for NotifsHandler { self.enabled = EnabledState::Enabled; self.legacy.inject_event(LegacyProtoHandlerIn::Enable); for (handler, initial_message) in &mut self.out_handlers { + // We create `initial_message` on a separate line to be sure that the lock + // is released as soon as possible. + let initial_message = initial_message.read().clone(); handler.inject_event(NotifsOutHandlerIn::Enable { - initial_message: initial_message.read().clone(), + initial_message, }); } for num in self.pending_in.drain(..) { + // We create `handshake_message` on a separate line to be sure + // that the lock is released as soon as possible. let handshake_message = self.in_handlers[num].1.read().clone(); self.in_handlers[num].0 .inject_event(NotifsInHandlerIn::Accept(handshake_message)); @@ -504,8 +509,12 @@ impl ProtocolsHandler for NotifsHandler { ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) => match self.enabled { EnabledState::Initial => self.pending_in.push(handler_num), - EnabledState::Enabled => - handler.inject_event(NotifsInHandlerIn::Accept(handshake_message.read().clone())), + EnabledState::Enabled => { + // We create `handshake_message` on a separate line to be sure + // that the lock is released as soon as possible. + let handshake_message = handshake_message.read().clone(); + handler.inject_event(NotifsInHandlerIn::Accept(handshake_message)) + }, EnabledState::Disabled => handler.inject_event(NotifsInHandlerIn::Refuse), }, From 88c4da64478e98650606179179bb36304eff3be1 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 25 May 2020 17:59:05 +0200 Subject: [PATCH 07/14] update_chain wasn't doing its thing --- client/network/src/protocol.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index b4ff03e6f5d7e..98ba36ad91539 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -549,6 +549,11 @@ impl Protocol { pub fn update_chain(&mut self) { let info = self.context_data.chain.info(); self.sync.update_chain_info(&info.best_hash, info.best_number); + self.behaviour.set_legacy_handshake_message(build_status_message(&self.config, &self.context_data.chain)); + self.behaviour.set_notif_protocol_handshake( + &self.block_announces_protocol, + BlockAnnouncesHandshake::build(&self.config, &self.context_data.chain).encode() + ); } fn update_peer_info(&mut self, who: &PeerId) { From 6c50854c0b8a8b6e2dc87d521ac0d33127625db7 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 3 Jun 2020 15:29:53 +0200 Subject: [PATCH 08/14] Fix service tests not calling update_chain --- client/network/src/service.rs | 14 ++++++++++++++ client/service/test/src/lib.rs | 1 + 2 files changed, 15 insertions(+) diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 6256cdd64da4d..c4b6d97909a56 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -713,6 +713,17 @@ impl NetworkService { pub fn num_connected(&self) -> usize { self.num_connected.load(Ordering::Relaxed) } + + /// This function should be called when blocks are added to the chain by something other + /// than the import queue. + /// + /// > **Important**: This function is a hack and can be removed at any time. Do **not** use it. + pub fn update_chain(&self) { + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::UpdateChain); + } + } impl sp_consensus::SyncOracle @@ -778,6 +789,7 @@ enum ServiceToWorkerMsg { protocol_name: Cow<'static, [u8]>, }, DisconnectPeer(PeerId), + UpdateChain, } /// Main network worker. Must be polled in order for the network to advance. @@ -1106,6 +1118,8 @@ impl Future for NetworkWorker { }, ServiceToWorkerMsg::DisconnectPeer(who) => this.network_service.user_protocol_mut().disconnect_peer(&who), + ServiceToWorkerMsg::UpdateChain => + this.network_service.user_protocol_mut().update_chain(), } } diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index 63c7e0795dc1a..206153082505c 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -457,6 +457,7 @@ pub fn sync( make_block_and_import(&first_service.get(), first_user_data); } + (network.full_nodes[0].1).0.lock().unwrap().network().update_chain(); network.full_nodes[0].3.clone() }; From 8a52b8ec0f141550c3e43193491e4304996431fd Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 9 Jun 2020 16:14:29 +0200 Subject: [PATCH 09/14] Update client/network/src/protocol/generic_proto/behaviour.rs Co-authored-by: Max Inden --- client/network/src/protocol/generic_proto/behaviour.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index 16df1a27db443..2ab3616b32266 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -282,7 +282,7 @@ pub enum GenericProtoOut { /// Id of the peer we are connected to. peer_id: PeerId, /// Handshake that was sent to us. - /// This is normally a "Status" message, but this out of the concern of this code. + /// This is normally a "Status" message, but this is out of the concern of this code. received_handshake: Vec, }, From 2b892e6a7637c0b1297e6ecdbb919321c9098ff5 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 15 Jun 2020 10:49:41 +0200 Subject: [PATCH 10/14] [WIP] --- .../src/protocol/generic_proto/behaviour.rs | 34 +++++++++++++------ .../protocol/generic_proto/handler/group.rs | 19 +++++++++-- .../protocol/generic_proto/handler/legacy.rs | 15 +++++--- 3 files changed, 50 insertions(+), 18 deletions(-) diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index 2227aad284551..6098e17cb8199 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -688,7 +688,9 @@ impl GenericProto { self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: occ_entry.key().clone(), handler: NotifyHandler::All, - event: NotifsHandlerIn::Enable, + event: NotifsHandlerIn::Enable { + send_legacy_handshake: true, + }, }); *occ_entry.into_mut() = PeerState::Enabled { open }; }, @@ -707,7 +709,9 @@ impl GenericProto { self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: occ_entry.key().clone(), handler: NotifyHandler::All, - event: NotifsHandlerIn::Enable, + event: NotifsHandlerIn::Enable { + send_legacy_handshake: true, + }, }); *occ_entry.into_mut() = PeerState::Enabled { open: SmallVec::new() }; }, @@ -827,7 +831,9 @@ impl GenericProto { self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: incoming.peer_id, handler: NotifyHandler::All, - event: NotifsHandlerIn::Enable, + event: NotifsHandlerIn::Enable { + send_legacy_handshake: true, + }, }); *state = PeerState::Enabled { open: SmallVec::new() }; } @@ -907,7 +913,9 @@ impl NetworkBehaviour for GenericProto { self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), handler: NotifyHandler::One(*conn), - event: NotifsHandlerIn::Enable + event: NotifsHandlerIn::Enable { + send_legacy_handshake: true, + } }); } @@ -967,7 +975,9 @@ impl NetworkBehaviour for GenericProto { self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), handler: NotifyHandler::One(*conn), - event: NotifsHandlerIn::Enable + event: NotifsHandlerIn::Enable { + send_legacy_handshake: false, + } }); } @@ -1409,11 +1419,15 @@ impl NetworkBehaviour for GenericProto { PeerState::DisabledPendingEnable { timer, open, .. } if *timer == delay_id => { debug!(target: "sub-libp2p", "Handler({:?}) <= Enable (ban expired)", peer_id); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::All, - event: NotifsHandlerIn::Enable, - }); + for (index_in_open, open) in open.iter().enumerate() { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: peer_id.clone(), + handler: NotifyHandler::One(*open), + event: NotifsHandlerIn::Enable { + send_legacy_handshake: index_in_open == 0, + }, + }); + } *peer_state = PeerState::Enabled { open: mem::replace(open, Default::default()) }; } diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 535341a6681ce..848d670c46340 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -148,7 +148,20 @@ impl IntoProtocolsHandler for NotifsHandlerProto { #[derive(Debug, Clone)] pub enum NotifsHandlerIn { /// The node should start using custom protocols. - Enable, + Enable { + /// In the legacy protocol, the `Status` message (that serves as a handshake) is only ever + /// sent on the first substream that is open with a given peer. If a second substream with + /// that same peer is later opened (no matter whether it is on the same connection or a + /// different one), no `Status` message is sent. + /// + /// This flag indicates whether or not this connection should send a `Status` message. + /// + /// Note that this only concerns the `Status` message that our node sends out. Whatever + /// the value of this flag is, we should always propagate any `Status` sent by the remote, + /// and consider as successful the lack of `Status` message. If it the role of the upper + /// layers to properly enforce the uniqueness of the `Status` message sent by the remote. + send_legacy_handshake: bool, + }, /// The node should stop using custom protocols. Disable, @@ -331,12 +344,12 @@ impl ProtocolsHandler for NotifsHandler { fn inject_event(&mut self, message: NotifsHandlerIn) { match message { - NotifsHandlerIn::Enable => { + NotifsHandlerIn::Enable { send_legacy_handshake } => { if let EnabledState::Enabled = self.enabled { debug!("enabling already-enabled handler"); } self.enabled = EnabledState::Enabled; - self.legacy.inject_event(LegacyProtoHandlerIn::Enable); + self.legacy.inject_event(LegacyProtoHandlerIn::Enable { send_legacy_handshake }); for (handler, initial_message) in &mut self.out_handlers { // We create `initial_message` on a separate line to be sure that the lock // is released as soon as possible. diff --git a/client/network/src/protocol/generic_proto/handler/legacy.rs b/client/network/src/protocol/generic_proto/handler/legacy.rs index d2f54fa3aaf9d..e934d469a7e38 100644 --- a/client/network/src/protocol/generic_proto/handler/legacy.rs +++ b/client/network/src/protocol/generic_proto/handler/legacy.rs @@ -200,7 +200,10 @@ enum ProtocolState { #[derive(Debug)] pub enum LegacyProtoHandlerIn { /// The node should start using custom protocols. - Enable, + Enable { + /// See the documentation of this flag in the parent module. + send_legacy_handshake: bool, + }, /// The node should stop using custom protocols. Disable, @@ -219,8 +222,9 @@ pub enum LegacyProtoHandlerOut { CustomProtocolOpen { /// Version of the protocol that has been opened. version: u8, - /// Handshake message that has been sent to us. - /// This is normally a "Status" message, but this out of the concern of this code. + /// First message that has been sent to us. + /// This can be a "Status" message, or a non-"Status" message in case this is not the + /// first substream that's been opened, but this out of the concern of this code. received_handshake: Vec, /// The connected endpoint. endpoint: ConnectedPoint, @@ -270,7 +274,7 @@ impl LegacyProtoHandler { } /// Enables the handler. - fn enable(&mut self) { + fn enable(&mut self, send_legacy_handshake: bool) { self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) { ProtocolState::Poisoned => { error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state", @@ -560,7 +564,8 @@ impl ProtocolsHandler for LegacyProtoHandler { fn inject_event(&mut self, message: LegacyProtoHandlerIn) { match message { LegacyProtoHandlerIn::Disable => self.disable(), - LegacyProtoHandlerIn::Enable => self.enable(), + LegacyProtoHandlerIn::Enable { send_legacy_handshake } => + self.enable(send_legacy_handshake), LegacyProtoHandlerIn::SendCustomMessage { message } => self.send_message(message), } From d4ddc963f64294e8bd4cf6ce1f5fe8765012eaaf Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 18 Jun 2020 16:21:53 +0200 Subject: [PATCH 11/14] Revert "[WIP]" This reverts commit 2b892e6a7637c0b1297e6ecdbb919321c9098ff5. --- .../src/protocol/generic_proto/behaviour.rs | 34 ++++++------------- .../protocol/generic_proto/handler/group.rs | 19 ++--------- .../protocol/generic_proto/handler/legacy.rs | 15 +++----- 3 files changed, 18 insertions(+), 50 deletions(-) diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index 6098e17cb8199..2227aad284551 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -688,9 +688,7 @@ impl GenericProto { self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: occ_entry.key().clone(), handler: NotifyHandler::All, - event: NotifsHandlerIn::Enable { - send_legacy_handshake: true, - }, + event: NotifsHandlerIn::Enable, }); *occ_entry.into_mut() = PeerState::Enabled { open }; }, @@ -709,9 +707,7 @@ impl GenericProto { self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: occ_entry.key().clone(), handler: NotifyHandler::All, - event: NotifsHandlerIn::Enable { - send_legacy_handshake: true, - }, + event: NotifsHandlerIn::Enable, }); *occ_entry.into_mut() = PeerState::Enabled { open: SmallVec::new() }; }, @@ -831,9 +827,7 @@ impl GenericProto { self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: incoming.peer_id, handler: NotifyHandler::All, - event: NotifsHandlerIn::Enable { - send_legacy_handshake: true, - }, + event: NotifsHandlerIn::Enable, }); *state = PeerState::Enabled { open: SmallVec::new() }; } @@ -913,9 +907,7 @@ impl NetworkBehaviour for GenericProto { self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), handler: NotifyHandler::One(*conn), - event: NotifsHandlerIn::Enable { - send_legacy_handshake: true, - } + event: NotifsHandlerIn::Enable }); } @@ -975,9 +967,7 @@ impl NetworkBehaviour for GenericProto { self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), handler: NotifyHandler::One(*conn), - event: NotifsHandlerIn::Enable { - send_legacy_handshake: false, - } + event: NotifsHandlerIn::Enable }); } @@ -1419,15 +1409,11 @@ impl NetworkBehaviour for GenericProto { PeerState::DisabledPendingEnable { timer, open, .. } if *timer == delay_id => { debug!(target: "sub-libp2p", "Handler({:?}) <= Enable (ban expired)", peer_id); - for (index_in_open, open) in open.iter().enumerate() { - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: peer_id.clone(), - handler: NotifyHandler::One(*open), - event: NotifsHandlerIn::Enable { - send_legacy_handshake: index_in_open == 0, - }, - }); - } + self.events.push_back(NetworkBehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::All, + event: NotifsHandlerIn::Enable, + }); *peer_state = PeerState::Enabled { open: mem::replace(open, Default::default()) }; } diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 848d670c46340..535341a6681ce 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -148,20 +148,7 @@ impl IntoProtocolsHandler for NotifsHandlerProto { #[derive(Debug, Clone)] pub enum NotifsHandlerIn { /// The node should start using custom protocols. - Enable { - /// In the legacy protocol, the `Status` message (that serves as a handshake) is only ever - /// sent on the first substream that is open with a given peer. If a second substream with - /// that same peer is later opened (no matter whether it is on the same connection or a - /// different one), no `Status` message is sent. - /// - /// This flag indicates whether or not this connection should send a `Status` message. - /// - /// Note that this only concerns the `Status` message that our node sends out. Whatever - /// the value of this flag is, we should always propagate any `Status` sent by the remote, - /// and consider as successful the lack of `Status` message. If it the role of the upper - /// layers to properly enforce the uniqueness of the `Status` message sent by the remote. - send_legacy_handshake: bool, - }, + Enable, /// The node should stop using custom protocols. Disable, @@ -344,12 +331,12 @@ impl ProtocolsHandler for NotifsHandler { fn inject_event(&mut self, message: NotifsHandlerIn) { match message { - NotifsHandlerIn::Enable { send_legacy_handshake } => { + NotifsHandlerIn::Enable => { if let EnabledState::Enabled = self.enabled { debug!("enabling already-enabled handler"); } self.enabled = EnabledState::Enabled; - self.legacy.inject_event(LegacyProtoHandlerIn::Enable { send_legacy_handshake }); + self.legacy.inject_event(LegacyProtoHandlerIn::Enable); for (handler, initial_message) in &mut self.out_handlers { // We create `initial_message` on a separate line to be sure that the lock // is released as soon as possible. diff --git a/client/network/src/protocol/generic_proto/handler/legacy.rs b/client/network/src/protocol/generic_proto/handler/legacy.rs index e934d469a7e38..d2f54fa3aaf9d 100644 --- a/client/network/src/protocol/generic_proto/handler/legacy.rs +++ b/client/network/src/protocol/generic_proto/handler/legacy.rs @@ -200,10 +200,7 @@ enum ProtocolState { #[derive(Debug)] pub enum LegacyProtoHandlerIn { /// The node should start using custom protocols. - Enable { - /// See the documentation of this flag in the parent module. - send_legacy_handshake: bool, - }, + Enable, /// The node should stop using custom protocols. Disable, @@ -222,9 +219,8 @@ pub enum LegacyProtoHandlerOut { CustomProtocolOpen { /// Version of the protocol that has been opened. version: u8, - /// First message that has been sent to us. - /// This can be a "Status" message, or a non-"Status" message in case this is not the - /// first substream that's been opened, but this out of the concern of this code. + /// Handshake message that has been sent to us. + /// This is normally a "Status" message, but this out of the concern of this code. received_handshake: Vec, /// The connected endpoint. endpoint: ConnectedPoint, @@ -274,7 +270,7 @@ impl LegacyProtoHandler { } /// Enables the handler. - fn enable(&mut self, send_legacy_handshake: bool) { + fn enable(&mut self) { self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) { ProtocolState::Poisoned => { error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state", @@ -564,8 +560,7 @@ impl ProtocolsHandler for LegacyProtoHandler { fn inject_event(&mut self, message: LegacyProtoHandlerIn) { match message { LegacyProtoHandlerIn::Disable => self.disable(), - LegacyProtoHandlerIn::Enable { send_legacy_handshake } => - self.enable(send_legacy_handshake), + LegacyProtoHandlerIn::Enable => self.enable(), LegacyProtoHandlerIn::SendCustomMessage { message } => self.send_message(message), } From f83e529e5a85cdec9a2260f1e1d1a211c38bf825 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 19 Jun 2020 11:06:51 +0200 Subject: [PATCH 12/14] Update client/network/src/protocol.rs Co-authored-by: Max Inden --- client/network/src/protocol.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index e45cf28a7d038..ba46e715de191 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -960,7 +960,7 @@ impl Protocol { } } - /// Called by peer to report status + /// Called on receipt of a status message via the legacy protocol on the first connection between two peers. pub fn on_peer_connected(&mut self, who: PeerId, status: message::Status) -> CustomMessageOutcome { trace!(target: "sync", "New peer {} {:?}", who, status); let _protocol_version = { From a8b03019ebffe8ce2677d2399e83d8b9a4dd64fa Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 22 Jun 2020 16:42:35 +0200 Subject: [PATCH 13/14] Fix received message not being handshake --- client/network/src/protocol.rs | 2 +- .../src/protocol/generic_proto/behaviour.rs | 26 ++++++++++++++++++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 55c690de0bbdd..8d74355e0961f 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -596,7 +596,7 @@ impl Protocol { match message { GenericMessage::Status(_) => - warn!(target: "sub-libp2p", "Received unexpected Status"), + debug!(target: "sub-libp2p", "Received unexpected Status"), GenericMessage::BlockRequest(r) => self.on_block_request(who, r), GenericMessage::BlockResponse(r) => { let outcome = self.on_block_response(who.clone(), r); diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index 2227aad284551..71fe8264753bb 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -1280,8 +1280,32 @@ impl NetworkBehaviour for GenericProto { debug!(target: "sub-libp2p", "External API <= Open({:?})", source); let event = GenericProtoOut::CustomProtocolOpen { peer_id: source, received_handshake }; self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); + } else { - debug!(target: "sub-libp2p", "Secondary connection opened custom protocol."); + // In normal situations, the handshake is supposed to be a Status message, and + // we would discard Status messages received from secondary connections. + // However, in Polkadot 0.8.10 and below, nodes don't send a Status message + // when opening secondary connections and instead directly consider the + // substream as open. When connecting to such a node, the first message sent + // by the remote will always be considered by our local node as the handshake, + // even when it is a regular message. + // In order to maintain backwards compatibility, we therefore report the + // handshake as if it was a regular message, and the upper layer will ignore + // any superfluous Status message. + // The code below should be removed once Polkadot 0.8.10 and below are no + // longer widely in use, and should be replaced with simply printing a log + // entry. + debug!( + target: "sub-libp2p", + "Handler({:?}) => Secondary connection opened custom protocol", + source + ); + trace!(target: "sub-libp2p", "External API <= Message({:?})", source); + let ev = GenericProtoOut::LegacyMessage { + peer_id: source, + message: From::from(&received_handshake[..]), + }; + self.events.push_back(NetworkBehaviourAction::GenerateEvent(ev)); } } From 84e785201c56c1ebf59ddd31090422505834bea3 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 23 Jun 2020 10:31:09 +0200 Subject: [PATCH 14/14] Update client/network/src/protocol/generic_proto/behaviour.rs Co-authored-by: Max Inden --- client/network/src/protocol/generic_proto/behaviour.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index 71fe8264753bb..4712c244e599d 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -1301,11 +1301,11 @@ impl NetworkBehaviour for GenericProto { source ); trace!(target: "sub-libp2p", "External API <= Message({:?})", source); - let ev = GenericProtoOut::LegacyMessage { + let event = GenericProtoOut::LegacyMessage { peer_id: source, message: From::from(&received_handshake[..]), }; - self.events.push_back(NetworkBehaviourAction::GenerateEvent(ev)); + self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } }