Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Move the legacy protocol handshake to the legacy substream (#5938)
Browse files Browse the repository at this point in the history
* Move the legacy protocol handshake to the legacy substream

* Fix tests

* Remove line that wasn't supposed to be committed

* Remove hack

* Rework how it's done

* Some little changes

* update_chain wasn't doing its thing

* Fix service tests not calling update_chain

* Update client/network/src/protocol/generic_proto/behaviour.rs

Co-authored-by: Max Inden <[email protected]>

* [WIP]

* Revert "[WIP]"

This reverts commit 2b892e6.

* Update client/network/src/protocol.rs

Co-authored-by: Max Inden <[email protected]>

* Fix received message not being handshake

* Update client/network/src/protocol/generic_proto/behaviour.rs

Co-authored-by: Max Inden <[email protected]>

Co-authored-by: Max Inden <[email protected]>
  • Loading branch information
tomaka and mxinden authored Jul 15, 2020
1 parent b34c1ef commit f4031f6
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 131 deletions.
145 changes: 37 additions & 108 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ mod rep {
}

struct Metrics {
handshaking_peers: Gauge<U64>,
obsolete_requests: Gauge<U64>,
peers: Gauge<U64>,
queued_blocks: Gauge<U64>,
Expand All @@ -148,10 +147,6 @@ struct Metrics {
impl Metrics {
fn register(r: &Registry) -> Result<Self, PrometheusError> {
Ok(Metrics {
handshaking_peers: {
let g = Gauge::new("sync_handshaking_peers", "Number of newly connected peers")?;
register(g, r)?
},
obsolete_requests: {
let g = Gauge::new("sync_obsolete_requests", "Number of obsolete requests")?;
register(g, r)?
Expand Down Expand Up @@ -239,8 +234,6 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
/// List of nodes for which we perform additional logging because they are important for the
/// user.
important_peers: HashSet<PeerId>,
// Connected peers pending Status message.
handshaking_peers: HashMap<PeerId, HandshakingPeer>,
/// Used to report reputation changes.
peerset_handle: sc_peerset::PeersetHandle,
transaction_pool: Arc<dyn TransactionPool<H, B>>,
Expand Down Expand Up @@ -269,13 +262,6 @@ struct PacketStats {
count_in: u64,
count_out: u64,
}

/// A peer that we are connected to
/// and from whom we have not yet received a Status message.
struct HandshakingPeer {
timestamp: Instant,
}

/// Peer information
#[derive(Debug, Clone)]
struct Peer<B: BlockT, H: ExHashT> {
Expand Down Expand Up @@ -426,7 +412,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
versions,
build_status_message(&config, &chain),
peerset,
queue_size_report
queue_size_report,
);

let mut legacy_equiv_by_name = HashMap::new();
Expand Down Expand Up @@ -466,7 +452,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
},
genesis_hash: info.genesis_hash,
sync,
handshaking_peers: HashMap::new(),
important_peers,
transaction_pool,
finality_proof_provider,
Expand Down Expand Up @@ -616,7 +601,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
stats.count_in += 1;

match message {
GenericMessage::Status(s) => return self.on_status_message(who, s),
GenericMessage::Status(_) =>
debug!(target: "sub-libp2p", "Received unexpected Status"),
GenericMessage::BlockRequest(r) => self.on_block_request(who, r),
GenericMessage::BlockResponse(r) => {
let outcome = self.on_block_response(who.clone(), r);
Expand Down Expand Up @@ -707,12 +693,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
update_peer_request::<B, H>(&mut self.context_data.peers, who, request)
}

/// Called when a new peer is connected
pub fn on_peer_connected(&mut self, who: PeerId) {
trace!(target: "sync", "Connecting {}", who);
self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: Instant::now() });
}

/// Called by peer when it is disconnecting
pub fn on_peer_disconnected(&mut self, peer: PeerId) -> CustomMessageOutcome<B> {
if self.important_peers.contains(&peer) {
Expand All @@ -721,12 +701,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
trace!(target: "sync", "{} disconnected", peer);
}

// lock all the the peer lists so that add/remove peer events are in order
let removed = {
self.handshaking_peers.remove(&peer);
self.context_data.peers.remove(&peer)
};
if let Some(_peer_data) = removed {
if let Some(_peer_data) = self.context_data.peers.remove(&peer) {
self.sync.peer_disconnected(&peer);

// Notify all the notification protocols as closed.
Expand Down Expand Up @@ -955,16 +930,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
aborting.push(who.clone());
}
}
for (who, _) in self.handshaking_peers.iter()
.filter(|(_, handshaking)| (tick - handshaking.timestamp).as_secs() > REQUEST_TIMEOUT_SEC)
{
log!(
target: "sync",
if self.important_peers.contains(who) { Level::Warn } else { Level::Trace },
"Handshake timeout {}", who
);
aborting.push(who.clone());
}
}

for p in aborting {
Expand All @@ -973,8 +938,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}

/// Called by peer to report status
fn on_status_message(&mut self, who: PeerId, status: message::Status<B>) -> CustomMessageOutcome<B> {
/// Called on receipt of a status message via the legacy protocol on the first connection between two peers.
pub fn on_peer_connected(&mut self, who: PeerId, status: message::Status<B>) -> CustomMessageOutcome<B> {
trace!(target: "sync", "New peer {} {:?}", who, status);
let _protocol_version = {
if self.context_data.peers.contains_key(&who) {
Expand Down Expand Up @@ -1041,23 +1006,13 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}

let info = match self.handshaking_peers.remove(&who) {
Some(_handshaking) => {
PeerInfo {
protocol_version: status.version,
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number
}
},
None => {
error!(target: "sync", "Received status from previously unconnected node {}", who);
return CustomMessageOutcome::None;
},
};

let peer = Peer {
info,
info: PeerInfo {
protocol_version: status.version,
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number
},
block_request: None,
known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS)
.expect("Constant is nonzero")),
Expand Down Expand Up @@ -1837,9 +1792,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
metrics.obsolete_requests.set(obsolete_requests);

let n = self.handshaking_peers.len().try_into().unwrap_or(std::u64::MAX);
metrics.handshaking_peers.set(n);

let n = self.context_data.peers.len().try_into().unwrap_or(std::u64::MAX);
metrics.peers.set(n);

Expand Down Expand Up @@ -2042,9 +1994,31 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
};

let outcome = match event {
GenericProtoOut::CustomProtocolOpen { peer_id, .. } => {
self.on_peer_connected(peer_id);
CustomMessageOutcome::None
GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, .. } => {
match <Message<B> as Decode>::decode(&mut &received_handshake[..]) {
Ok(GenericMessage::Status(handshake)) => self.on_peer_connected(peer_id, handshake),
Ok(msg) => {
debug!(
target: "sync",
"Expected Status message from {}, but got {:?}",
peer_id,
msg,
);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
}
Err(err) => {
debug!(
target: "sync",
"Couldn't decode handshake sent by {}: {:?}: {}",
peer_id,
received_handshake,
err.what()
);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
}
}
}
GenericProtoOut::CustomProtocolClosed { peer_id, .. } => {
self.on_peer_disconnected(peer_id)
Expand Down Expand Up @@ -2130,48 +2104,3 @@ impl<B: BlockT, H: ExHashT> Drop for Protocol<B, H> {
debug!(target: "sync", "Network stats:\n{}", self.format_stats());
}
}

#[cfg(test)]
mod tests {
use crate::PeerId;
use crate::config::EmptyTransactionPool;
use super::{CustomMessageOutcome, Protocol, ProtocolConfig};

use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
use std::sync::Arc;
use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt};
use substrate_test_runtime_client::runtime::{Block, Hash};

#[test]
fn no_handshake_no_notif_closed() {
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);

let (mut protocol, _) = Protocol::<Block, Hash>::new(
ProtocolConfig::default(),
PeerId::random(),
client.clone(),
Arc::new(EmptyTransactionPool),
None,
None,
From::from(&b"test"[..]),
sc_peerset::PeersetConfig {
in_peers: 10,
out_peers: 10,
bootnodes: Vec::new(),
reserved_only: false,
priority_groups: Vec::new(),
},
Box::new(DefaultBlockAnnounceValidator),
None,
Default::default(),
None,
).unwrap();

let dummy_peer_id = PeerId::random();
let _ = protocol.on_peer_connected(dummy_peer_id.clone());
match protocol.on_peer_disconnected(dummy_peer_id) {
CustomMessageOutcome::None => {},
_ => panic!()
};
}
}
33 changes: 30 additions & 3 deletions client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ pub enum GenericProtoOut {
CustomProtocolOpen {
/// Id of the peer we are connected to.
peer_id: PeerId,
/// Handshake that was sent to us.
/// This is normally a "Status" message, but this is out of the concern of this code.
received_handshake: Vec<u8>,
},

/// Closed a custom protocol with the remote.
Expand Down Expand Up @@ -1235,7 +1238,7 @@ impl NetworkBehaviour for GenericProto {
}
}

NotifsHandlerOut::Open { endpoint } => {
NotifsHandlerOut::Open { endpoint, received_handshake } => {
debug!(target: "sub-libp2p",
"Handler({:?}) => Endpoint {:?} open for custom protocols.",
source, endpoint);
Expand Down Expand Up @@ -1266,10 +1269,34 @@ impl NetworkBehaviour for GenericProto {

if first {
debug!(target: "sub-libp2p", "External API <= Open({:?})", source);
let event = GenericProtoOut::CustomProtocolOpen { peer_id: source };
let event = GenericProtoOut::CustomProtocolOpen { peer_id: source, received_handshake };
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));

} else {
debug!(target: "sub-libp2p", "Secondary connection opened custom protocol.");
// In normal situations, the handshake is supposed to be a Status message, and
// we would discard Status messages received from secondary connections.
// However, in Polkadot 0.8.10 and below, nodes don't send a Status message
// when opening secondary connections and instead directly consider the
// substream as open. When connecting to such a node, the first message sent
// by the remote will always be considered by our local node as the handshake,
// even when it is a regular message.
// In order to maintain backwards compatibility, we therefore report the
// handshake as if it was a regular message, and the upper layer will ignore
// any superfluous Status message.
// The code below should be removed once Polkadot 0.8.10 and below are no
// longer widely in use, and should be replaced with simply printing a log
// entry.
debug!(
target: "sub-libp2p",
"Handler({:?}) => Secondary connection opened custom protocol",
source
);
trace!(target: "sub-libp2p", "External API <= Message({:?})", source);
let event = GenericProtoOut::LegacyMessage {
peer_id: source,
message: From::from(&received_handshake[..]),
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
}

Expand Down
7 changes: 5 additions & 2 deletions client/network/src/protocol/generic_proto/handler/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ pub enum NotifsHandlerOut {
Open {
/// The endpoint of the connection that is open for custom protocols.
endpoint: ConnectedPoint,
/// Handshake that was sent to us.
/// This is normally a "Status" message, but this out of the concern of this code.
received_handshake: Vec<u8>,
},

/// The connection is closed for custom protocols.
Expand Down Expand Up @@ -465,9 +468,9 @@ impl ProtocolsHandler for NotifsHandler {
protocol: protocol.map_upgrade(EitherUpgrade::B),
info: None,
}),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, .. }) =>
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, received_handshake, .. }) =>
Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::Open { endpoint }
NotifsHandlerOut::Open { endpoint, received_handshake }
)),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) =>
Poll::Ready(ProtocolsHandlerEvent::Custom(
Expand Down
Loading

0 comments on commit f4031f6

Please sign in to comment.