diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 1ebf11b0d2a..bccbd85f86f 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,5 +1,8 @@ # 0.22.1 [unreleased] +- Respect inbound timeouts and upgrade versions in the `MultiHandler`. + [PR 1786](https://github.com/libp2p/rust-libp2p/pull/1786). + - Instead of iterating each inbound and outbound substream upgrade looking for one to make progress, use a `FuturesUnordered` for both pending inbound and pending outbound upgrades. As a result only those upgrades are polled that are diff --git a/swarm/src/protocols_handler/multi.rs b/swarm/src/protocols_handler/multi.rs index 9d4a4fd6ba8..4bc2e04aec5 100644 --- a/swarm/src/protocols_handler/multi.rs +++ b/swarm/src/protocols_handler/multi.rs @@ -37,18 +37,20 @@ use crate::upgrade::{ }; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{ConnectedPoint, Multiaddr, PeerId}; -use libp2p_core::upgrade::{ProtocolName, UpgradeError, NegotiationError, ProtocolError}; +use libp2p_core::upgrade::{self, ProtocolName, UpgradeError, NegotiationError, ProtocolError}; use rand::Rng; use std::{ + cmp, collections::{HashMap, HashSet}, error, fmt, hash::Hash, iter::{self, FromIterator}, - task::{Context, Poll} + task::{Context, Poll}, + time::Duration }; -/// A [`ProtocolsHandler`] for multiple other `ProtocolsHandler`s. +/// A [`ProtocolsHandler`] for multiple `ProtocolsHandler`s of the same type. #[derive(Clone)] pub struct MultiHandler { handlers: HashMap @@ -74,6 +76,9 @@ where /// Create and populate a `MultiHandler` from the given handler iterator. /// /// It is an error for any two protocols handlers to share the same protocol name. + /// + /// > **Note**: All handlers should use the same [`upgrade::Version`] for + /// > the inbound and outbound [`SubstreamProtocol`]s. pub fn try_from_iter(iter: I) -> Result where I: IntoIterator @@ -100,17 +105,34 @@ where type OutboundOpenInfo = (K, ::OutboundOpenInfo); fn listen_protocol(&self) -> SubstreamProtocol { - let (upgrade, info) = self.handlers.iter() + let (upgrade, info, timeout, version) = self.handlers.iter() .map(|(k, h)| { - let (_, u, i) = h.listen_protocol().into_upgrade(); - (k.clone(), (u, i)) + let p = h.listen_protocol(); + let t = *p.timeout(); + let (v, u, i) = p.into_upgrade(); + (k.clone(), (v, u, i, t)) }) - .fold((Upgrade::new(), Info::new()), |(mut upg, mut inf), (k, (u, i))| { - upg.upgrades.push((k.clone(), u)); - inf.infos.push((k, i)); - (upg, inf) - }); + .fold((Upgrade::new(), Info::new(), Duration::from_secs(0), None), + |(mut upg, mut inf, mut timeout, mut version), (k, (v, u, i, t))| { + upg.upgrades.push((k.clone(), u)); + inf.infos.push((k, i)); + timeout = cmp::max(timeout, t); + version = version.map_or(Some(v), |vv| + if v != vv { + // Different upgrade (i.e. protocol negotiation) protocol + // versions are usually incompatible and not negotiated + // themselves, so a protocol upgrade may fail. + log::warn!("Differing upgrade versions. Defaulting to V1."); + Some(upgrade::Version::V1) + } else { + Some(v) + }); + (upg, inf, timeout, version) + } + ); SubstreamProtocol::new(upgrade, info) + .with_timeout(timeout) + .with_upgrade_protocol(version.unwrap_or(upgrade::Version::V1)) } fn inject_fully_negotiated_outbound ( @@ -293,6 +315,9 @@ where /// Create and populate an `IntoMultiHandler` from the given iterator. /// /// It is an error for any two protocols handlers to share the same protocol name. + /// + /// > **Note**: All handlers should use the same [`upgrade::Version`] for + /// > the inbound and outbound [`SubstreamProtocol`]s. pub fn try_from_iter(iter: I) -> Result where I: IntoIterator