Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Move the legacy protocol handshake to the legacy substream #5938

Merged
30 commits merged into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
578f972
Move the legacy protocol handshake to the legacy substream
tomaka May 5, 2020
0e7d51f
Merge remote-tracking branch 'upstream/master' into move-legacy-hands…
tomaka May 7, 2020
4b5e146
Merge remote-tracking branch 'upstream/master' into move-legacy-hands…
tomaka May 12, 2020
6b14a35
Merge remote-tracking branch 'upstream/master' into move-legacy-hands…
tomaka May 14, 2020
9ca434b
Fix tests
tomaka May 14, 2020
1971d2d
Remove line that wasn't supposed to be committed
tomaka May 14, 2020
b154b80
Merge remote-tracking branch 'upstream/master' into move-legacy-hands…
tomaka May 18, 2020
5d41c60
Remove hack
tomaka May 18, 2020
7f068c8
Merge remote-tracking branch 'upstream/master' into move-legacy-hands…
tomaka May 25, 2020
5795d6f
Rework how it's done
tomaka May 25, 2020
2653c0d
Some little changes
tomaka May 25, 2020
88c4da6
update_chain wasn't doing its thing
tomaka May 25, 2020
53cce8d
Merge remote-tracking branch 'upstream/master' into move-legacy-hands…
tomaka Jun 3, 2020
6c50854
Fix service tests not calling update_chain
tomaka Jun 3, 2020
5be320f
Merge remote-tracking branch 'upstream/master' into move-legacy-hands…
tomaka Jun 3, 2020
ecde0df
Merge branch 'fix-service-test-update-chain' into move-legacy-handshake
tomaka Jun 3, 2020
8a52b8e
Update client/network/src/protocol/generic_proto/behaviour.rs
tomaka Jun 9, 2020
b0fe448
Merge remote-tracking branch 'upstream/master' into move-legacy-hands…
tomaka Jun 12, 2020
2b892e6
[WIP]
tomaka Jun 15, 2020
3b89594
Merge remote-tracking branch 'upstream/master' into move-legacy-hands…
tomaka Jun 17, 2020
d4ddc96
Revert "[WIP]"
tomaka Jun 18, 2020
f83e529
Update client/network/src/protocol.rs
tomaka Jun 19, 2020
d16d620
Merge remote-tracking branch 'upstream/master' into move-legacy-hands…
tomaka Jun 22, 2020
a8b0301
Fix received message not being handshake
tomaka Jun 22, 2020
84e7852
Update client/network/src/protocol/generic_proto/behaviour.rs
tomaka Jun 23, 2020
ee09a83
Merge remote-tracking branch 'upstream/master' into move-legacy-hands…
tomaka Jun 29, 2020
a82fba7
Merge branch 'master' into move-legacy-handshake
tomaka Jun 30, 2020
bf644f5
Merge remote-tracking branch 'upstream/master' into move-legacy-hands…
tomaka Jul 6, 2020
78f4ca6
Merge remote-tracking branch 'upstream/master' into move-legacy-hands…
tomaka Jul 13, 2020
81eee50
Merge remote-tracking branch 'upstream/master' into move-legacy-hands…
tomaka Jul 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 37 additions & 108 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ mod rep {
}

struct Metrics {
handshaking_peers: Gauge<U64>,
obsolete_requests: Gauge<U64>,
peers: Gauge<U64>,
queued_blocks: Gauge<U64>,
Expand All @@ -151,10 +150,6 @@ struct Metrics {
impl Metrics {
fn register(r: &Registry) -> Result<Self, PrometheusError> {
Ok(Metrics {
handshaking_peers: {
let g = Gauge::new("sync_handshaking_peers", "Number of newly connected peers")?;
register(g, r)?
},
obsolete_requests: {
let g = Gauge::new("sync_obsolete_requests", "Number of obsolete requests")?;
register(g, r)?
Expand Down Expand Up @@ -242,8 +237,6 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
/// List of nodes for which we perform additional logging because they are important for the
/// user.
important_peers: HashSet<PeerId>,
// Connected peers pending Status message.
handshaking_peers: HashMap<PeerId, HandshakingPeer>,
/// Used to report reputation changes.
peerset_handle: sc_peerset::PeersetHandle,
transaction_pool: Arc<dyn TransactionPool<H, B>>,
Expand Down Expand Up @@ -272,13 +265,6 @@ struct PacketStats {
count_in: u64,
count_out: u64,
}

/// A peer that we are connected to
/// and from whom we have not yet received a Status message.
struct HandshakingPeer {
timestamp: Instant,
}

/// Peer information
#[derive(Debug, Clone)]
struct Peer<B: BlockT, H: ExHashT> {
Expand Down Expand Up @@ -429,7 +415,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
versions,
build_status_message(&config, &chain),
peerset,
queue_size_report
queue_size_report,
);

let mut legacy_equiv_by_name = HashMap::new();
Expand Down Expand Up @@ -469,7 +455,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
},
genesis_hash: info.genesis_hash,
sync,
handshaking_peers: HashMap::new(),
important_peers,
transaction_pool,
finality_proof_provider,
Expand Down Expand Up @@ -619,7 +604,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
stats.count_in += 1;

match message {
GenericMessage::Status(s) => return self.on_status_message(who, s),
GenericMessage::Status(_) =>
debug!(target: "sub-libp2p", "Received unexpected Status"),
GenericMessage::BlockRequest(r) => self.on_block_request(who, r),
GenericMessage::BlockResponse(r) => {
let outcome = self.on_block_response(who.clone(), r);
Expand Down Expand Up @@ -710,12 +696,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
update_peer_request::<B, H>(&mut self.context_data.peers, who, request)
}

/// Called when a new peer is connected
pub fn on_peer_connected(&mut self, who: PeerId) {
trace!(target: "sync", "Connecting {}", who);
self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: Instant::now() });
}

/// Called by peer when it is disconnecting
pub fn on_peer_disconnected(&mut self, peer: PeerId) -> CustomMessageOutcome<B> {
if self.important_peers.contains(&peer) {
Expand All @@ -724,12 +704,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
trace!(target: "sync", "{} disconnected", peer);
}

// lock all the the peer lists so that add/remove peer events are in order
let removed = {
self.handshaking_peers.remove(&peer);
self.context_data.peers.remove(&peer)
};
if let Some(_peer_data) = removed {
if let Some(_peer_data) = self.context_data.peers.remove(&peer) {
self.sync.peer_disconnected(&peer);

// Notify all the notification protocols as closed.
Expand Down Expand Up @@ -974,16 +949,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
aborting.push(who.clone());
}
}
for (who, _) in self.handshaking_peers.iter()
.filter(|(_, handshaking)| (tick - handshaking.timestamp).as_secs() > REQUEST_TIMEOUT_SEC)
{
log!(
target: "sync",
if self.important_peers.contains(who) { Level::Warn } else { Level::Trace },
"Handshake timeout {}", who
);
aborting.push(who.clone());
}
}

for p in aborting {
Expand All @@ -992,8 +957,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}

/// Called by peer to report status
fn on_status_message(&mut self, who: PeerId, status: message::Status<B>) -> CustomMessageOutcome<B> {
/// Called on receipt of a status message via the legacy protocol on the first connection between two peers.
pub fn on_peer_connected(&mut self, who: PeerId, status: message::Status<B>) -> CustomMessageOutcome<B> {
trace!(target: "sync", "New peer {} {:?}", who, status);
let _protocol_version = {
if self.context_data.peers.contains_key(&who) {
Expand Down Expand Up @@ -1060,23 +1025,13 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}

let info = match self.handshaking_peers.remove(&who) {
Some(_handshaking) => {
PeerInfo {
protocol_version: status.version,
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number
}
},
None => {
error!(target: "sync", "Received status from previously unconnected node {}", who);
return CustomMessageOutcome::None;
},
};

let peer = Peer {
info,
info: PeerInfo {
protocol_version: status.version,
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number
},
block_request: None,
known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS)
.expect("Constant is nonzero")),
Expand Down Expand Up @@ -1856,9 +1811,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
metrics.obsolete_requests.set(obsolete_requests);

let n = self.handshaking_peers.len().try_into().unwrap_or(std::u64::MAX);
metrics.handshaking_peers.set(n);

let n = self.context_data.peers.len().try_into().unwrap_or(std::u64::MAX);
metrics.peers.set(n);

Expand Down Expand Up @@ -2061,9 +2013,31 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
};

let outcome = match event {
GenericProtoOut::CustomProtocolOpen { peer_id, .. } => {
self.on_peer_connected(peer_id);
CustomMessageOutcome::None
GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, .. } => {
match <Message<B> as Decode>::decode(&mut &received_handshake[..]) {
Ok(GenericMessage::Status(handshake)) => self.on_peer_connected(peer_id, handshake),
Ok(msg) => {
debug!(
target: "sync",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably out of scope for this pull request as it is done everywhere in protocol.rs: This is not restricted to sync only, no?

"Expected Status message from {}, but got {:?}",
peer_id,
msg,
);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
}
Err(err) => {
debug!(
target: "sync",
"Couldn't decode handshake sent by {}: {:?}: {}",
peer_id,
received_handshake,
err.what()
);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
}
}
}
GenericProtoOut::CustomProtocolClosed { peer_id, .. } => {
self.on_peer_disconnected(peer_id)
Expand Down Expand Up @@ -2158,48 +2132,3 @@ impl<B: BlockT, H: ExHashT> Drop for Protocol<B, H> {
debug!(target: "sync", "Network stats:\n{}", self.format_stats());
}
}

#[cfg(test)]
mod tests {
use crate::PeerId;
use crate::config::EmptyTransactionPool;
use super::{CustomMessageOutcome, Protocol, ProtocolConfig};

use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
use std::sync::Arc;
use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt};
use substrate_test_runtime_client::runtime::{Block, Hash};

#[test]
fn no_handshake_no_notif_closed() {
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);

let (mut protocol, _) = Protocol::<Block, Hash>::new(
ProtocolConfig::default(),
PeerId::random(),
client.clone(),
Arc::new(EmptyTransactionPool),
None,
None,
From::from(&b"test"[..]),
sc_peerset::PeersetConfig {
in_peers: 10,
out_peers: 10,
bootnodes: Vec::new(),
reserved_only: false,
priority_groups: Vec::new(),
},
Box::new(DefaultBlockAnnounceValidator),
None,
Default::default(),
None,
).unwrap();

let dummy_peer_id = PeerId::random();
let _ = protocol.on_peer_connected(dummy_peer_id.clone());
match protocol.on_peer_disconnected(dummy_peer_id) {
CustomMessageOutcome::None => {},
_ => panic!()
};
}
}
33 changes: 30 additions & 3 deletions client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ pub enum GenericProtoOut {
CustomProtocolOpen {
/// Id of the peer we are connected to.
peer_id: PeerId,
/// Handshake that was sent to us.
/// This is normally a "Status" message, but this is out of the concern of this code.
received_handshake: Vec<u8>,
},

/// Closed a custom protocol with the remote.
Expand Down Expand Up @@ -1244,7 +1247,7 @@ impl NetworkBehaviour for GenericProto {
}
}

NotifsHandlerOut::Open { endpoint } => {
NotifsHandlerOut::Open { endpoint, received_handshake } => {
debug!(target: "sub-libp2p",
"Handler({:?}) => Endpoint {:?} open for custom protocols.",
source, endpoint);
Expand Down Expand Up @@ -1275,10 +1278,34 @@ impl NetworkBehaviour for GenericProto {

if first {
debug!(target: "sub-libp2p", "External API <= Open({:?})", source);
let event = GenericProtoOut::CustomProtocolOpen { peer_id: source };
let event = GenericProtoOut::CustomProtocolOpen { peer_id: source, received_handshake };
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));

} else {
debug!(target: "sub-libp2p", "Secondary connection opened custom protocol.");
// In normal situations, the handshake is supposed to be a Status message, and
// we would discard Status messages received from secondary connections.
// However, in Polkadot 0.8.10 and below, nodes don't send a Status message
// when opening secondary connections and instead directly consider the
// substream as open. When connecting to such a node, the first message sent
// by the remote will always be considered by our local node as the handshake,
// even when it is a regular message.
// In order to maintain backwards compatibility, we therefore report the
// handshake as if it was a regular message, and the upper layer will ignore
// any superfluous Status message.
// The code below should be removed once Polkadot 0.8.10 and below are no
// longer widely in use, and should be replaced with simply printing a log
// entry.
debug!(
target: "sub-libp2p",
"Handler({:?}) => Secondary connection opened custom protocol",
source
);
trace!(target: "sub-libp2p", "External API <= Message({:?})", source);
let event = GenericProtoOut::LegacyMessage {
peer_id: source,
message: From::from(&received_handshake[..]),
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
}

Expand Down
7 changes: 5 additions & 2 deletions client/network/src/protocol/generic_proto/handler/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ pub enum NotifsHandlerOut {
Open {
/// The endpoint of the connection that is open for custom protocols.
endpoint: ConnectedPoint,
/// Handshake that was sent to us.
/// This is normally a "Status" message, but this out of the concern of this code.
received_handshake: Vec<u8>,
},

/// The connection is closed for custom protocols.
Expand Down Expand Up @@ -472,9 +475,9 @@ impl ProtocolsHandler for NotifsHandler {
protocol: protocol.map_upgrade(EitherUpgrade::B),
info: None,
}),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, .. }) =>
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, received_handshake, .. }) =>
Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::Open { endpoint }
NotifsHandlerOut::Open { endpoint, received_handshake }
)),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) =>
Poll::Ready(ProtocolsHandlerEvent::Custom(
Expand Down
Loading