Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[swarm] Permit configuration override for the substream upgrade protocol to use. #1858

Merged
merged 4 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# 0.25.0 [unreleased]

- Permit a configuration override for the substream upgrade protocol
to use for all (outbound) substreams.
[PR 1858](https://github.com/libp2p/rust-libp2p/pull/1858).

- Changed parameters for connection limits from `usize` to `u32`.
Connection limits are now configured via `SwarmBuilder::connection_limits()`.

Expand Down
43 changes: 35 additions & 8 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ use libp2p_core::{
NetworkConfig,
peer::ConnectedPeer,
},
upgrade::ProtocolName,
upgrade::{ProtocolName},
};
use registry::{Addresses, AddressIntoIter};
use smallvec::SmallVec;
Expand Down Expand Up @@ -286,7 +286,10 @@ where
/// Pending event to be delivered to connection handlers
/// (or dropped if the peer disconnected) before the `behaviour`
/// can be polled again.
pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)>
pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)>,

/// The configured override for substream protocol upgrades, if any.
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
}

impl<TBehaviour, TInEvent, TOutEvent, THandler> Deref for
Expand Down Expand Up @@ -357,8 +360,10 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,

/// Initiates a new dialing attempt to the given address.
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> {
let handler = me.behaviour.new_handler();
me.network.dial(&addr, handler.into_node_handler_builder()).map(|_id| ())
let handler = me.behaviour.new_handler()
.into_node_handler_builder()
.with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
me.network.dial(&addr, handler).map(|_id| ())
}

/// Initiates a new dialing attempt to the given peer.
Expand All @@ -375,7 +380,9 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,

let result =
if let Some(first) = addrs.next() {
let handler = me.behaviour.new_handler().into_node_handler_builder();
let handler = me.behaviour.new_handler()
.into_node_handler_builder()
.with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
me.network.peer(peer_id.clone())
.dial(first, addrs, handler)
.map(|_| ())
Expand Down Expand Up @@ -546,10 +553,12 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
});
},
Poll::Ready(NetworkEvent::IncomingConnection { connection, .. }) => {
let handler = this.behaviour.new_handler();
let handler = this.behaviour.new_handler()
.into_node_handler_builder()
.with_substream_upgrade_protocol_override(this.substream_upgrade_protocol_override);
let local_addr = connection.local_addr.clone();
let send_back_addr = connection.send_back_addr.clone();
if let Err(e) = this.network.accept(connection, handler.into_node_handler_builder()) {
if let Err(e) = this.network.accept(connection, handler) {
log::warn!("Incoming connection rejected: {:?}", e);
}
return Poll::Ready(SwarmEvent::IncomingConnection {
Expand Down Expand Up @@ -962,6 +971,7 @@ pub struct SwarmBuilder<TBehaviour> {
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
network_config: NetworkConfig,
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
}

impl<TBehaviour> SwarmBuilder<TBehaviour>
Expand All @@ -980,6 +990,7 @@ where TBehaviour: NetworkBehaviour,
transport: transport,
behaviour,
network_config: Default::default(),
substream_upgrade_protocol_override: None,
}
}

Expand Down Expand Up @@ -1040,6 +1051,21 @@ where TBehaviour: NetworkBehaviour,
self
}

/// Configures an override for the substream upgrade protocol to use.
///
/// The subtream upgrade protocol is the multistream-select protocol
/// used for protocol negotiation on substreams. Since a listener
/// supports all existing versions, the choice of upgrade protocol
/// only effects the "dialer", i.e. the peer opening a substream.
///
/// > **Note**: If configured, specific upgrade protocols for
/// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour`
/// > are ignored.
pub fn substream_upgrade_protocol_override(mut self, v: libp2p_core::upgrade::Version) -> Self {
self.substream_upgrade_protocol_override = Some(v);
self
}

/// Builds a `Swarm` with the current configuration.
pub fn build(mut self) -> Swarm<TBehaviour> {
let supported_protocols = self.behaviour
Expand Down Expand Up @@ -1075,7 +1101,8 @@ where TBehaviour: NetworkBehaviour,
listened_addrs: SmallVec::new(),
external_addrs: Addresses::default(),
banned_peers: HashSet::new(),
pending_event: None
pending_event: None,
substream_upgrade_protocol_override: self.substream_upgrade_protocol_override,
}
}
}
Expand Down
22 changes: 21 additions & 1 deletion swarm/src/protocols_handler/node_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ use wasm_timer::{Delay, Instant};
pub struct NodeHandlerWrapperBuilder<TIntoProtoHandler> {
/// The underlying handler.
handler: TIntoProtoHandler,
/// The substream upgrade protocol override, if any.
substream_upgrade_protocol_override: Option<upgrade::Version>,
}

impl<TIntoProtoHandler> NodeHandlerWrapperBuilder<TIntoProtoHandler>
Expand All @@ -59,8 +61,17 @@ where
pub(crate) fn new(handler: TIntoProtoHandler) -> Self {
NodeHandlerWrapperBuilder {
handler,
substream_upgrade_protocol_override: None,
}
}

pub(crate) fn with_substream_upgrade_protocol_override(
mut self,
version: Option<upgrade::Version>
) -> Self {
self.substream_upgrade_protocol_override = version;
self
}
}

impl<TIntoProtoHandler, TProtoHandler> IntoConnectionHandler
Expand All @@ -79,6 +90,7 @@ where
queued_dial_upgrades: Vec::new(),
unique_dial_upgrade_id: 0,
shutdown: Shutdown::None,
substream_upgrade_protocol_override: self.substream_upgrade_protocol_override,
}
}
}
Expand Down Expand Up @@ -109,6 +121,8 @@ where
unique_dial_upgrade_id: u64,
/// The currently planned connection & handler shutdown.
shutdown: Shutdown,
/// The substream upgrade protocol override, if any.
substream_upgrade_protocol_override: Option<upgrade::Version>,
}

struct SubstreamUpgrade<UserData, Upgrade> {
Expand Down Expand Up @@ -254,7 +268,13 @@ where
}
};

let (_, (version, upgrade)) = self.queued_dial_upgrades.remove(pos);
let (_, (mut version, upgrade)) = self.queued_dial_upgrades.remove(pos);
if let Some(v) = self.substream_upgrade_protocol_override {
if v != version {
log::debug!("Substream upgrade protocol override: {:?} -> {:?}", version, v);
version = v;
}
}
let upgrade = upgrade::apply_outbound(substream, upgrade, version);
let timeout = Delay::new(timeout);
self.negotiating_out.push(SubstreamUpgrade {
Expand Down