Skip to content

Commit

Permalink
[swarm] MultiHandler: Respect inbound timeouts and upgrade versions. (#…
Browse files Browse the repository at this point in the history
…1786)

* Respect inbound timeouts and upgrade versions.

* Update CHANGELOG
  • Loading branch information
romanb authored Oct 12, 2020
1 parent 6528114 commit 6ed92ab
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 11 deletions.
3 changes: 3 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# 0.22.1 [unreleased]

- Respect inbound timeouts and upgrade versions in the `MultiHandler`.
[PR 1786](https://github.com/libp2p/rust-libp2p/pull/1786).

- Instead of iterating each inbound and outbound substream upgrade looking for
one to make progress, use a `FuturesUnordered` for both pending inbound and
pending outbound upgrades. As a result only those upgrades are polled that are
Expand Down
47 changes: 36 additions & 11 deletions swarm/src/protocols_handler/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,20 @@ use crate::upgrade::{
};
use futures::{future::BoxFuture, prelude::*};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p_core::upgrade::{ProtocolName, UpgradeError, NegotiationError, ProtocolError};
use libp2p_core::upgrade::{self, ProtocolName, UpgradeError, NegotiationError, ProtocolError};
use rand::Rng;
use std::{
cmp,
collections::{HashMap, HashSet},
error,
fmt,
hash::Hash,
iter::{self, FromIterator},
task::{Context, Poll}
task::{Context, Poll},
time::Duration
};

/// A [`ProtocolsHandler`] for multiple other `ProtocolsHandler`s.
/// A [`ProtocolsHandler`] for multiple `ProtocolsHandler`s of the same type.
#[derive(Clone)]
pub struct MultiHandler<K, H> {
handlers: HashMap<K, H>
Expand All @@ -74,6 +76,9 @@ where
/// Create and populate a `MultiHandler` from the given handler iterator.
///
/// It is an error for any two protocols handlers to share the same protocol name.
///
/// > **Note**: All handlers should use the same [`upgrade::Version`] for
/// > the inbound and outbound [`SubstreamProtocol`]s.
pub fn try_from_iter<I>(iter: I) -> Result<Self, DuplicateProtonameError>
where
I: IntoIterator<Item = (K, H)>
Expand All @@ -100,17 +105,34 @@ where
type OutboundOpenInfo = (K, <H as ProtocolsHandler>::OutboundOpenInfo);

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
let (upgrade, info) = self.handlers.iter()
let (upgrade, info, timeout, version) = self.handlers.iter()
.map(|(k, h)| {
let (_, u, i) = h.listen_protocol().into_upgrade();
(k.clone(), (u, i))
let p = h.listen_protocol();
let t = *p.timeout();
let (v, u, i) = p.into_upgrade();
(k.clone(), (v, u, i, t))
})
.fold((Upgrade::new(), Info::new()), |(mut upg, mut inf), (k, (u, i))| {
upg.upgrades.push((k.clone(), u));
inf.infos.push((k, i));
(upg, inf)
});
.fold((Upgrade::new(), Info::new(), Duration::from_secs(0), None),
|(mut upg, mut inf, mut timeout, mut version), (k, (v, u, i, t))| {
upg.upgrades.push((k.clone(), u));
inf.infos.push((k, i));
timeout = cmp::max(timeout, t);
version = version.map_or(Some(v), |vv|
if v != vv {
// Different upgrade (i.e. protocol negotiation) protocol
// versions are usually incompatible and not negotiated
// themselves, so a protocol upgrade may fail.
log::warn!("Differing upgrade versions. Defaulting to V1.");
Some(upgrade::Version::V1)
} else {
Some(v)
});
(upg, inf, timeout, version)
}
);
SubstreamProtocol::new(upgrade, info)
.with_timeout(timeout)
.with_upgrade_protocol(version.unwrap_or(upgrade::Version::V1))
}

fn inject_fully_negotiated_outbound (
Expand Down Expand Up @@ -293,6 +315,9 @@ where
/// Create and populate an `IntoMultiHandler` from the given iterator.
///
/// It is an error for any two protocols handlers to share the same protocol name.
///
/// > **Note**: All handlers should use the same [`upgrade::Version`] for
/// > the inbound and outbound [`SubstreamProtocol`]s.
pub fn try_from_iter<I>(iter: I) -> Result<Self, DuplicateProtonameError>
where
I: IntoIterator<Item = (K, H)>
Expand Down

0 comments on commit 6ed92ab

Please sign in to comment.