From 0b2bcbd3205f80f1c018d2159c69eb9ba3f64c93 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Mon, 16 Mar 2020 15:51:39 +0100 Subject: [PATCH 1/8] Add `protocols_handler::multi` module. An implementation of `ProtocolsHandler` that contains multiple other `ProtocolsHandler`s indexed by some key type. --- swarm/src/protocols_handler.rs | 1 + swarm/src/protocols_handler/multi.rs | 260 +++++++++++++++++++++++++++ 2 files changed, 261 insertions(+) create mode 100644 swarm/src/protocols_handler/multi.rs diff --git a/swarm/src/protocols_handler.rs b/swarm/src/protocols_handler.rs index 4be0d76c347..160989180a9 100644 --- a/swarm/src/protocols_handler.rs +++ b/swarm/src/protocols_handler.rs @@ -43,6 +43,7 @@ mod map_out; mod node_handler; mod one_shot; mod select; +pub mod multi; pub use crate::upgrade::{ InboundUpgradeSend, diff --git a/swarm/src/protocols_handler/multi.rs b/swarm/src/protocols_handler/multi.rs new file mode 100644 index 00000000000..80209d57531 --- /dev/null +++ b/swarm/src/protocols_handler/multi.rs @@ -0,0 +1,260 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! A [`ProtocolsHandler`] implementation that combines multiple other `ProtocolsHandler`s +//! indexed by some key. + +use futures::{future::BoxFuture, prelude::*}; +use libp2p_core::upgrade::ProtocolName; +use crate::NegotiatedSubstream; +use crate::protocols_handler::{ + KeepAlive, + ProtocolsHandler, + ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr, + SubstreamProtocol +}; +use crate::upgrade::{ + InboundUpgradeSend, + OutboundUpgradeSend, + UpgradeInfoSend +}; +use std::{collections::HashMap, fmt, hash::Hash, iter::{self, FromIterator}, task::{Context, Poll}}; + +/// A [`ProtocolsHandler`] for multiple other `ProtocolsHandler`s. +#[derive(Clone)] +pub struct Handler { + handlers: HashMap +} + +impl fmt::Debug for Handler { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("Handler") + } +} + +impl Handler +where + K: Clone + std::fmt::Debug + Hash + Eq + Send + 'static, + H: ProtocolsHandler, + H::InboundProtocol: InboundUpgradeSend, + H::OutboundProtocol: OutboundUpgradeSend +{ + /// Create a new empty handler. + pub fn new() -> Self { + Handler { handlers: HashMap::new() } + } + + /// Insert a [`ProtocolsHandler`] at index `key`. + pub fn add(&mut self, key: K, handler: H) { + self.handlers.insert(key, handler); + } +} + +impl FromIterator<(K, H)> for Handler +where + K: Clone + std::fmt::Debug + Hash + Eq + Send + 'static, + H: ProtocolsHandler, + H::InboundProtocol: InboundUpgradeSend, + H::OutboundProtocol: OutboundUpgradeSend +{ + fn from_iter(iter: T) -> Self + where + T: IntoIterator + { + Handler { handlers: HashMap::from_iter(iter) } + } +} + +impl ProtocolsHandler for Handler +where + K: Clone + std::fmt::Debug + Hash + Eq + Send + 'static, + H: ProtocolsHandler, + H::InboundProtocol: InboundUpgradeSend, + H::OutboundProtocol: OutboundUpgradeSend +{ + type InEvent = (K, ::InEvent); + type OutEvent = (K, ::OutEvent); + type Error = ::Error; + type InboundProtocol = Upgrade::InboundProtocol>; + type OutboundProtocol = ::OutboundProtocol; + type OutboundOpenInfo = (K, ::OutboundOpenInfo); + + fn listen_protocol(&self) -> SubstreamProtocol { + let upgrades = self.handlers.iter() + .map(|(k, h)| (k.clone(), h.listen_protocol().into_upgrade().1)) + .collect(); + SubstreamProtocol::new(Upgrade { upgrades }) + } + + fn inject_fully_negotiated_outbound ( + &mut self, + protocol: ::Output, + (key, arg): Self::OutboundOpenInfo + ) { + if let Some(h) = self.handlers.get_mut(&key) { + h.inject_fully_negotiated_outbound(protocol, arg) + } else { + log::error!("inject_fully_negotiated_outbound: no handler for key {:?}", key) + } + } + + fn inject_fully_negotiated_inbound ( + &mut self, + (key, arg): ::Output + ) { + if let Some(h) = self.handlers.get_mut(&key) { + h.inject_fully_negotiated_inbound(arg) + } else { + log::error!("inject_fully_negotiated_inbound: no handler for key {:?}", key) + } + } + + fn inject_event(&mut self, (key, event): Self::InEvent) { + if let Some(h) = self.handlers.get_mut(&key) { + h.inject_event(event) + } else { + log::error!("inject_event: no handler for key {:?}", key) + } + } + + fn inject_dial_upgrade_error ( + &mut self, + (key, arg): Self::OutboundOpenInfo, + error: ProtocolsHandlerUpgrErr<::Error> + ) { + if let Some(h) = self.handlers.get_mut(&key) { + h.inject_dial_upgrade_error(arg, error) + } else { + log::error!("inject_dial_upgrade_error: no handler for protocol {:?}", key) + } + } + + fn connection_keep_alive(&self) -> KeepAlive { + self.handlers.values() + .map(|h| h.connection_keep_alive()) + .max() + .unwrap_or(KeepAlive::No) + } + + fn poll(&mut self, cx: &mut Context) + -> Poll> + { + for (k, h) in self.handlers.iter_mut() { + if let Poll::Ready(e) = h.poll(cx) { + let e = e.map_outbound_open_info(|i| (k.clone(), i)).map_custom(|p| (k.clone(), p)); + return Poll::Ready(e) + } + } + Poll::Pending + } +} + +/// Key and protocol name pair used as `UpgradeInfo::Info`. +#[derive(Debug, Clone)] +pub struct KeyedProtoName(K, H); + +impl ProtocolName for KeyedProtoName { + fn protocol_name(&self) -> &[u8] { + self.1.protocol_name() + } +} + +/// Inbound and outbound upgrade for all `ProtocolsHandler`s. +#[derive(Clone)] +pub struct Upgrade { + upgrades: HashMap +} + +impl fmt::Debug for Upgrade { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("Upgrade") + } +} + +impl UpgradeInfoSend for Upgrade +where + K: Hash + Eq + Clone + Send + 'static, + H: UpgradeInfoSend +{ + type Info = KeyedProtoName; + type InfoIter = std::vec::IntoIter; + + fn protocol_info(&self) -> Self::InfoIter { + self.upgrades.iter().map(|(k, i)| iter::repeat(k.clone()).zip(i.protocol_info())) + .flatten() + .map(|(k, i)| KeyedProtoName(k, i)) + .collect::>() + .into_iter() + } +} + +impl InboundUpgradeSend for Upgrade +where + H: InboundUpgradeSend, + K: Clone + Hash + Eq + Send + 'static +{ + type Output = (K, ::Output); + type Error = (K, ::Error); + type Future = BoxFuture<'static, Result>; + + fn upgrade_inbound(mut self, resource: NegotiatedSubstream, info: Self::Info) -> Self::Future { + let KeyedProtoName(key, info) = info; + let u = self.upgrades.remove(&key).expect( + "`upgrade_inbound` is applied to a key from `protocol_info`, which only contains \ + keys from the same set of upgrades we are searching here, therefore looking for this \ + key is guaranteed to give us a non-empty result; qed" + ); + u.upgrade_inbound(resource, info).map(move |out| { + match out { + Ok(o) => Ok((key, o)), + Err(e) => Err((key, e)) + } + }) + .boxed() + } +} + +impl OutboundUpgradeSend for Upgrade +where + H: OutboundUpgradeSend, + K: Clone + Hash + Eq + Send + 'static +{ + type Output = (K, ::Output); + type Error = (K, ::Error); + type Future = BoxFuture<'static, Result>; + + fn upgrade_outbound(mut self, resource: NegotiatedSubstream, info: Self::Info) -> Self::Future { + let KeyedProtoName(key, info) = info; + let u = self.upgrades.remove(&key).expect( + "`upgrade_outbound` is applied to a key from `protocol_info`, which only contains \ + keys from the same set of upgrades we are searching here, therefore looking for this \ + key is guaranteed to give us a non-empty result; qed" + ); + u.upgrade_outbound(resource, info).map(move |out| { + match out { + Ok(o) => Ok((key, o)), + Err(e) => Err((key, e)) + } + }) + .boxed() + } +} + From cb9bbebc2ffdbf7a90ac6f7e8910ea448349fc83 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Tue, 17 Mar 2020 11:05:05 +0100 Subject: [PATCH 2/8] Randomise start position of handler polling. --- swarm/Cargo.toml | 1 + swarm/src/protocols_handler/multi.rs | 23 ++++++++++++++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index ddacec46852..29bccee8239 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -13,6 +13,7 @@ categories = ["network-programming", "asynchronous"] futures = "0.3.1" libp2p-core = { version = "0.16.0", path = "../core" } log = "0.4" +rand = "0.7" smallvec = "1.0" wasm-timer = "0.2" void = "1" diff --git a/swarm/src/protocols_handler/multi.rs b/swarm/src/protocols_handler/multi.rs index 80209d57531..7673af9364e 100644 --- a/swarm/src/protocols_handler/multi.rs +++ b/swarm/src/protocols_handler/multi.rs @@ -21,8 +21,6 @@ //! A [`ProtocolsHandler`] implementation that combines multiple other `ProtocolsHandler`s //! indexed by some key. -use futures::{future::BoxFuture, prelude::*}; -use libp2p_core::upgrade::ProtocolName; use crate::NegotiatedSubstream; use crate::protocols_handler::{ KeepAlive, @@ -36,6 +34,9 @@ use crate::upgrade::{ OutboundUpgradeSend, UpgradeInfoSend }; +use futures::{future::BoxFuture, prelude::*}; +use libp2p_core::upgrade::ProtocolName; +use rand::Rng; use std::{collections::HashMap, fmt, hash::Hash, iter::{self, FromIterator}, task::{Context, Poll}}; /// A [`ProtocolsHandler`] for multiple other `ProtocolsHandler`s. @@ -66,6 +67,11 @@ where pub fn add(&mut self, key: K, handler: H) { self.handlers.insert(key, handler); } + + /// Remove a [`ProtocolsHandler`] at index `key`. + pub fn del(&mut self, key: &K) { + self.handlers.remove(key); + } } impl FromIterator<(K, H)> for Handler @@ -157,12 +163,23 @@ where fn poll(&mut self, cx: &mut Context) -> Poll> { - for (k, h) in self.handlers.iter_mut() { + // Not always polling handlers in the same order should give anyone the chance to make progress. + let pos = rand::thread_rng().gen_range(0, self.handlers.len()); + + for (k, h) in self.handlers.iter_mut().skip(pos) { if let Poll::Ready(e) = h.poll(cx) { let e = e.map_outbound_open_info(|i| (k.clone(), i)).map_custom(|p| (k.clone(), p)); return Poll::Ready(e) } } + + for (k, h) in self.handlers.iter_mut().take(pos) { + if let Poll::Ready(e) = h.poll(cx) { + let e = e.map_outbound_open_info(|i| (k.clone(), i)).map_custom(|p| (k.clone(), p)); + return Poll::Ready(e) + } + } + Poll::Pending } } From 2bb2f9e0f03aa3999c84a230565af0144c72cf10 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Thu, 19 Mar 2020 11:10:47 +0100 Subject: [PATCH 3/8] Address some review concerns. --- swarm/src/protocols_handler/multi.rs | 39 ++++++++++++++-------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/swarm/src/protocols_handler/multi.rs b/swarm/src/protocols_handler/multi.rs index 7673af9364e..894139d31a5 100644 --- a/swarm/src/protocols_handler/multi.rs +++ b/swarm/src/protocols_handler/multi.rs @@ -45,18 +45,21 @@ pub struct Handler { handlers: HashMap } -impl fmt::Debug for Handler { +impl fmt::Debug for Handler +where + K: fmt::Debug + Eq + Hash, + H: fmt::Debug +{ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str("Handler") + f.debug_struct("Handler") + .field("handlers", &self.handlers) + .finish() } } impl Handler where - K: Clone + std::fmt::Debug + Hash + Eq + Send + 'static, - H: ProtocolsHandler, - H::InboundProtocol: InboundUpgradeSend, - H::OutboundProtocol: OutboundUpgradeSend + K: Hash + Eq { /// Create a new empty handler. pub fn new() -> Self { @@ -64,22 +67,14 @@ where } /// Insert a [`ProtocolsHandler`] at index `key`. - pub fn add(&mut self, key: K, handler: H) { - self.handlers.insert(key, handler); - } - - /// Remove a [`ProtocolsHandler`] at index `key`. - pub fn del(&mut self, key: &K) { - self.handlers.remove(key); + pub fn insert(&mut self, key: K, handler: H) -> Option { + self.handlers.insert(key, handler) } } impl FromIterator<(K, H)> for Handler where - K: Clone + std::fmt::Debug + Hash + Eq + Send + 'static, - H: ProtocolsHandler, - H::InboundProtocol: InboundUpgradeSend, - H::OutboundProtocol: OutboundUpgradeSend + K: Hash + Eq { fn from_iter(iter: T) -> Self where @@ -200,9 +195,15 @@ pub struct Upgrade { upgrades: HashMap } -impl fmt::Debug for Upgrade { +impl fmt::Debug for Upgrade +where + K: fmt::Debug + Eq + Hash, + H: fmt::Debug +{ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str("Upgrade") + f.debug_struct("Upgrade") + .field("upgrades", &self.upgrades) + .finish() } } From 7fb13f51f32dfe10cc028e01bdd1c61e3e4d2d5c Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Thu, 19 Mar 2020 16:34:41 +0100 Subject: [PATCH 4/8] Add `IntoMultiHandler`. --- swarm/src/protocols_handler/multi.rs | 97 +++++++++++++++++++++------- 1 file changed, 75 insertions(+), 22 deletions(-) diff --git a/swarm/src/protocols_handler/multi.rs b/swarm/src/protocols_handler/multi.rs index 894139d31a5..875c3e68bfb 100644 --- a/swarm/src/protocols_handler/multi.rs +++ b/swarm/src/protocols_handler/multi.rs @@ -24,6 +24,7 @@ use crate::NegotiatedSubstream; use crate::protocols_handler::{ KeepAlive, + IntoProtocolsHandler, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, @@ -35,44 +36,29 @@ use crate::upgrade::{ UpgradeInfoSend }; use futures::{future::BoxFuture, prelude::*}; -use libp2p_core::upgrade::ProtocolName; +use libp2p_core::{ConnectedPoint, PeerId, upgrade::ProtocolName}; use rand::Rng; use std::{collections::HashMap, fmt, hash::Hash, iter::{self, FromIterator}, task::{Context, Poll}}; /// A [`ProtocolsHandler`] for multiple other `ProtocolsHandler`s. #[derive(Clone)] -pub struct Handler { +pub struct MultiHandler { handlers: HashMap } -impl fmt::Debug for Handler +impl fmt::Debug for MultiHandler where K: fmt::Debug + Eq + Hash, H: fmt::Debug { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Handler") + f.debug_struct("MultiHandler") .field("handlers", &self.handlers) .finish() } } -impl Handler -where - K: Hash + Eq -{ - /// Create a new empty handler. - pub fn new() -> Self { - Handler { handlers: HashMap::new() } - } - - /// Insert a [`ProtocolsHandler`] at index `key`. - pub fn insert(&mut self, key: K, handler: H) -> Option { - self.handlers.insert(key, handler) - } -} - -impl FromIterator<(K, H)> for Handler +impl FromIterator<(K, H)> for MultiHandler where K: Hash + Eq { @@ -80,11 +66,11 @@ where where T: IntoIterator { - Handler { handlers: HashMap::from_iter(iter) } + MultiHandler { handlers: HashMap::from_iter(iter) } } } -impl ProtocolsHandler for Handler +impl ProtocolsHandler for MultiHandler where K: Clone + std::fmt::Debug + Hash + Eq + Send + 'static, H: ProtocolsHandler, @@ -179,6 +165,73 @@ where } } +/// A [`IntoProtocolsHandler`] for multiple other `IntoProtocolsHandler`s. +#[derive(Clone)] +pub struct IntoMultiHandler { + handlers: HashMap +} + +impl fmt::Debug for IntoMultiHandler +where + K: fmt::Debug + Eq + Hash, + H: fmt::Debug +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("IntoMultiHandler") + .field("handlers", &self.handlers) + .finish() + } +} + +impl IntoMultiHandler +where + K: Hash + Eq +{ + /// Create a new empty `IntoMultiHandler` value. + pub fn new() -> Self { + IntoMultiHandler { + handlers: HashMap::new() + } + } + + /// Insert a [`IntoProtocolsHandler`] at index `key`. + pub fn insert(&mut self, key: K, handler: H) -> Option { + self.handlers.insert(key, handler) + } +} + +impl FromIterator<(K, H)> for IntoMultiHandler +where + K: Hash + Eq +{ + fn from_iter(iter: T) -> Self + where + T: IntoIterator + { + IntoMultiHandler { handlers: HashMap::from_iter(iter) } + } +} + +impl IntoProtocolsHandler for IntoMultiHandler +where + K: Clone + fmt::Debug + Eq + Hash + Send + 'static, + H: IntoProtocolsHandler +{ + type Handler = MultiHandler; + + fn into_handler(self, p: &PeerId, c: &ConnectedPoint) -> Self::Handler { + MultiHandler::from_iter(self.handlers.into_iter().map(|(k, h)| (k, h.into_handler(p, c)))) + } + + fn inbound_protocol(&self) -> ::InboundProtocol { + Upgrade { + upgrades: self.handlers.iter() + .map(|(k, h)| (k.clone(), h.inbound_protocol())) + .collect() + } + } +} + /// Key and protocol name pair used as `UpgradeInfo::Info`. #[derive(Debug, Clone)] pub struct KeyedProtoName(K, H); From 3361abbd286c2a8065bcd4c4cda1b9ef723dc73c Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Thu, 19 Mar 2020 17:41:18 +0100 Subject: [PATCH 5/8] Check protocol names for uniqueness. --- swarm/src/protocols_handler/multi.rs | 85 +++++++++++++++++++--------- 1 file changed, 58 insertions(+), 27 deletions(-) diff --git a/swarm/src/protocols_handler/multi.rs b/swarm/src/protocols_handler/multi.rs index 875c3e68bfb..9a5bec5830f 100644 --- a/swarm/src/protocols_handler/multi.rs +++ b/swarm/src/protocols_handler/multi.rs @@ -58,15 +58,19 @@ where } } -impl FromIterator<(K, H)> for MultiHandler +impl MultiHandler where - K: Hash + Eq + K: Hash + Eq, + H: ProtocolsHandler { - fn from_iter(iter: T) -> Self + /// Create and populate a `MultiHandler` from the given handler iterator. + pub fn try_from_iter(iter: I) -> Result where - T: IntoIterator + I: IntoIterator { - MultiHandler { handlers: HashMap::from_iter(iter) } + let m = MultiHandler { handlers: HashMap::from_iter(iter) }; + uniq_proto_names(m.handlers.values().map(|h| h.listen_protocol().into_upgrade().1))?; + Ok(m) } } @@ -183,32 +187,19 @@ where } } -impl IntoMultiHandler -where - K: Hash + Eq -{ - /// Create a new empty `IntoMultiHandler` value. - pub fn new() -> Self { - IntoMultiHandler { - handlers: HashMap::new() - } - } - - /// Insert a [`IntoProtocolsHandler`] at index `key`. - pub fn insert(&mut self, key: K, handler: H) -> Option { - self.handlers.insert(key, handler) - } -} -impl FromIterator<(K, H)> for IntoMultiHandler +impl IntoMultiHandler where - K: Hash + Eq + K: Hash + Eq, + H: IntoProtocolsHandler { - fn from_iter(iter: T) -> Self + pub fn try_from_iter(iter: I) -> Result where - T: IntoIterator + I: IntoIterator { - IntoMultiHandler { handlers: HashMap::from_iter(iter) } + let m = IntoMultiHandler { handlers: HashMap::from_iter(iter) }; + uniq_proto_names(m.handlers.values().map(|h| h.inbound_protocol()))?; + Ok(m) } } @@ -220,7 +211,11 @@ where type Handler = MultiHandler; fn into_handler(self, p: &PeerId, c: &ConnectedPoint) -> Self::Handler { - MultiHandler::from_iter(self.handlers.into_iter().map(|(k, h)| (k, h.into_handler(p, c)))) + MultiHandler { + handlers: self.handlers.into_iter() + .map(|(k, h)| (k, h.into_handler(p, c))) + .collect() + } } fn inbound_protocol(&self) -> ::InboundProtocol { @@ -329,3 +324,39 @@ where } } +/// Check that no two protocol names are equal. +fn uniq_proto_names(iter: I) -> Result<(), DuplicateProtoname> +where + I: Iterator, + T: UpgradeInfoSend +{ + let mut set = std::collections::HashSet::new(); + for infos in iter { + for i in infos.protocol_info() { + let v = Vec::from(i.protocol_name()); + if set.contains(&v) { + return Err(DuplicateProtoname(v)) + } else { + set.insert(v); + } + } + } + Ok(()) +} + +/// It is an error if two handlers share the same protocol name. +#[derive(Debug, Clone)] +pub struct DuplicateProtoname(pub Vec); + +impl fmt::Display for DuplicateProtoname { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + if let Ok(s) = std::str::from_utf8(&self.0) { + write!(f, "duplicate protocol name: {}", s) + } else { + write!(f, "duplicate protocol name: {:?}", self.0) + } + } +} + +impl std::error::Error for DuplicateProtoname {} + From 34641beade36d2a0c07bd6a6c58f6cbb323718e4 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Tue, 24 Mar 2020 14:29:01 +0100 Subject: [PATCH 6/8] Changes due to review. - No more `Debug` bound for the key type and more generic log messages. - Additional comments. - Imports instead of fully-qualified use. - Renamed `DuplicateProtoname` to `DuplicateProtonameError`. --- swarm/src/protocols_handler/multi.rs | 49 +++++++++++++++++++--------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/swarm/src/protocols_handler/multi.rs b/swarm/src/protocols_handler/multi.rs index 9a5bec5830f..1e359cefa32 100644 --- a/swarm/src/protocols_handler/multi.rs +++ b/swarm/src/protocols_handler/multi.rs @@ -38,7 +38,14 @@ use crate::upgrade::{ use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{ConnectedPoint, PeerId, upgrade::ProtocolName}; use rand::Rng; -use std::{collections::HashMap, fmt, hash::Hash, iter::{self, FromIterator}, task::{Context, Poll}}; +use std::{ + collections::{HashMap, HashSet}, + error, + fmt, + hash::Hash, + iter::{self, FromIterator}, + task::{Context, Poll} +}; /// A [`ProtocolsHandler`] for multiple other `ProtocolsHandler`s. #[derive(Clone)] @@ -64,7 +71,9 @@ where H: ProtocolsHandler { /// Create and populate a `MultiHandler` from the given handler iterator. - pub fn try_from_iter(iter: I) -> Result + /// + /// It is an error for any two protocols handlers to share the same protocol name. + pub fn try_from_iter(iter: I) -> Result where I: IntoIterator { @@ -76,7 +85,7 @@ where impl ProtocolsHandler for MultiHandler where - K: Clone + std::fmt::Debug + Hash + Eq + Send + 'static, + K: Clone + Hash + Eq + Send + 'static, H: ProtocolsHandler, H::InboundProtocol: InboundUpgradeSend, H::OutboundProtocol: OutboundUpgradeSend @@ -103,7 +112,7 @@ where if let Some(h) = self.handlers.get_mut(&key) { h.inject_fully_negotiated_outbound(protocol, arg) } else { - log::error!("inject_fully_negotiated_outbound: no handler for key {:?}", key) + log::error!("inject_fully_negotiated_outbound: no handler for key") } } @@ -114,7 +123,7 @@ where if let Some(h) = self.handlers.get_mut(&key) { h.inject_fully_negotiated_inbound(arg) } else { - log::error!("inject_fully_negotiated_inbound: no handler for key {:?}", key) + log::error!("inject_fully_negotiated_inbound: no handler for key") } } @@ -122,7 +131,7 @@ where if let Some(h) = self.handlers.get_mut(&key) { h.inject_event(event) } else { - log::error!("inject_event: no handler for key {:?}", key) + log::error!("inject_event: no handler for key") } } @@ -134,7 +143,7 @@ where if let Some(h) = self.handlers.get_mut(&key) { h.inject_dial_upgrade_error(arg, error) } else { - log::error!("inject_dial_upgrade_error: no handler for protocol {:?}", key) + log::error!("inject_dial_upgrade_error: no handler for protocol") } } @@ -193,7 +202,10 @@ where K: Hash + Eq, H: IntoProtocolsHandler { - pub fn try_from_iter(iter: I) -> Result + /// Create and populate an `IntoMultiHandler` from the given iterator. + /// + /// It is an error for any two protocols handlers to share the same protocol name. + pub fn try_from_iter(iter: I) -> Result where I: IntoIterator { @@ -205,7 +217,7 @@ where impl IntoProtocolsHandler for IntoMultiHandler where - K: Clone + fmt::Debug + Eq + Hash + Send + 'static, + K: Clone + Eq + Hash + Send + 'static, H: IntoProtocolsHandler { type Handler = MultiHandler; @@ -325,17 +337,17 @@ where } /// Check that no two protocol names are equal. -fn uniq_proto_names(iter: I) -> Result<(), DuplicateProtoname> +fn uniq_proto_names(iter: I) -> Result<(), DuplicateProtonameError> where I: Iterator, T: UpgradeInfoSend { - let mut set = std::collections::HashSet::new(); + let mut set = HashSet::new(); for infos in iter { for i in infos.protocol_info() { let v = Vec::from(i.protocol_name()); if set.contains(&v) { - return Err(DuplicateProtoname(v)) + return Err(DuplicateProtonameError(v)) } else { set.insert(v); } @@ -346,9 +358,16 @@ where /// It is an error if two handlers share the same protocol name. #[derive(Debug, Clone)] -pub struct DuplicateProtoname(pub Vec); +pub struct DuplicateProtonameError(Vec); + +impl DuplicateProtonameError { + /// The protocol name bytes that occured in more than one handler. + pub fn protocol_name(&self) -> &[u8] { + &self.0 + } +} -impl fmt::Display for DuplicateProtoname { +impl fmt::Display for DuplicateProtonameError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { if let Ok(s) = std::str::from_utf8(&self.0) { write!(f, "duplicate protocol name: {}", s) @@ -358,5 +377,5 @@ impl fmt::Display for DuplicateProtoname { } } -impl std::error::Error for DuplicateProtoname {} +impl error::Error for DuplicateProtonameError {} From 7db7c64d3e6493ddea72408646dec76c1a8b220d Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Tue, 24 Mar 2020 15:25:38 +0100 Subject: [PATCH 7/8] Replace `HashMap` with `Vec` in `Upgrades`. --- swarm/src/protocols_handler/multi.rs | 60 ++++++++++++++++------------ 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/swarm/src/protocols_handler/multi.rs b/swarm/src/protocols_handler/multi.rs index 1e359cefa32..c3ae44342ab 100644 --- a/swarm/src/protocols_handler/multi.rs +++ b/swarm/src/protocols_handler/multi.rs @@ -252,7 +252,7 @@ impl ProtocolName for KeyedProtoName { /// Inbound and outbound upgrade for all `ProtocolsHandler`s. #[derive(Clone)] pub struct Upgrade { - upgrades: HashMap + upgrades: Vec<(K, H)> } impl fmt::Debug for Upgrade @@ -295,18 +295,23 @@ where fn upgrade_inbound(mut self, resource: NegotiatedSubstream, info: Self::Info) -> Self::Future { let KeyedProtoName(key, info) = info; - let u = self.upgrades.remove(&key).expect( - "`upgrade_inbound` is applied to a key from `protocol_info`, which only contains \ - keys from the same set of upgrades we are searching here, therefore looking for this \ - key is guaranteed to give us a non-empty result; qed" - ); - u.upgrade_inbound(resource, info).map(move |out| { - match out { - Ok(o) => Ok((key, o)), - Err(e) => Err((key, e)) - } - }) - .boxed() + + let i = self.upgrades.iter().position(|(k, _)| k == &key) + .expect( + "`upgrade_inbound` is applied to a key from `protocol_info`, which only \ + contains keys from the same set of upgrades we are searching here, therefore \ + looking for this key is guaranteed to give us a non-empty result; qed" + ); + + self.upgrades.remove(i).1 + .upgrade_inbound(resource, info) + .map(move |out| { + match out { + Ok(o) => Ok((key, o)), + Err(e) => Err((key, e)) + } + }) + .boxed() } } @@ -321,18 +326,23 @@ where fn upgrade_outbound(mut self, resource: NegotiatedSubstream, info: Self::Info) -> Self::Future { let KeyedProtoName(key, info) = info; - let u = self.upgrades.remove(&key).expect( - "`upgrade_outbound` is applied to a key from `protocol_info`, which only contains \ - keys from the same set of upgrades we are searching here, therefore looking for this \ - key is guaranteed to give us a non-empty result; qed" - ); - u.upgrade_outbound(resource, info).map(move |out| { - match out { - Ok(o) => Ok((key, o)), - Err(e) => Err((key, e)) - } - }) - .boxed() + + let i = self.upgrades.iter().position(|(k, _)| k == &key) + .expect( + "`upgrade_outbound` is applied to a key from `protocol_info`, which only \ + contains keys from the same set of upgrades we are searching here, therefore \ + looking for this key is guaranteed to give us a non-empty result; qed" + ); + + self.upgrades.remove(i).1 + .upgrade_outbound(resource, info) + .map(move |out| { + match out { + Ok(o) => Ok((key, o)), + Err(e) => Err((key, e)) + } + }) + .boxed() } } From 1e371eb5954942a26f7a85cd5c2b1cc80fbc7abe Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Tue, 24 Mar 2020 16:20:11 +0100 Subject: [PATCH 8/8] Review suggestion. --- swarm/src/protocols_handler/multi.rs | 49 ++++++++++------------------ 1 file changed, 17 insertions(+), 32 deletions(-) diff --git a/swarm/src/protocols_handler/multi.rs b/swarm/src/protocols_handler/multi.rs index c3ae44342ab..3ecaab9b0fd 100644 --- a/swarm/src/protocols_handler/multi.rs +++ b/swarm/src/protocols_handler/multi.rs @@ -239,11 +239,11 @@ where } } -/// Key and protocol name pair used as `UpgradeInfo::Info`. +/// Index and protocol name pair used as `UpgradeInfo::Info`. #[derive(Debug, Clone)] -pub struct KeyedProtoName(K, H); +pub struct IndexedProtoName(usize, H); -impl ProtocolName for KeyedProtoName { +impl ProtocolName for IndexedProtoName { fn protocol_name(&self) -> &[u8] { self.1.protocol_name() } @@ -269,16 +269,17 @@ where impl UpgradeInfoSend for Upgrade where - K: Hash + Eq + Clone + Send + 'static, - H: UpgradeInfoSend + H: UpgradeInfoSend, + K: Send + 'static { - type Info = KeyedProtoName; + type Info = IndexedProtoName; type InfoIter = std::vec::IntoIter; fn protocol_info(&self) -> Self::InfoIter { - self.upgrades.iter().map(|(k, i)| iter::repeat(k.clone()).zip(i.protocol_info())) + self.upgrades.iter().enumerate() + .map(|(i, (_, h))| iter::repeat(i).zip(h.protocol_info())) .flatten() - .map(|(k, i)| KeyedProtoName(k, i)) + .map(|(i, h)| IndexedProtoName(i, h)) .collect::>() .into_iter() } @@ -287,24 +288,16 @@ where impl InboundUpgradeSend for Upgrade where H: InboundUpgradeSend, - K: Clone + Hash + Eq + Send + 'static + K: Send + 'static { type Output = (K, ::Output); type Error = (K, ::Error); type Future = BoxFuture<'static, Result>; fn upgrade_inbound(mut self, resource: NegotiatedSubstream, info: Self::Info) -> Self::Future { - let KeyedProtoName(key, info) = info; - - let i = self.upgrades.iter().position(|(k, _)| k == &key) - .expect( - "`upgrade_inbound` is applied to a key from `protocol_info`, which only \ - contains keys from the same set of upgrades we are searching here, therefore \ - looking for this key is guaranteed to give us a non-empty result; qed" - ); - - self.upgrades.remove(i).1 - .upgrade_inbound(resource, info) + let IndexedProtoName(index, info) = info; + let (key, upgrade) = self.upgrades.remove(index); + upgrade.upgrade_inbound(resource, info) .map(move |out| { match out { Ok(o) => Ok((key, o)), @@ -318,24 +311,16 @@ where impl OutboundUpgradeSend for Upgrade where H: OutboundUpgradeSend, - K: Clone + Hash + Eq + Send + 'static + K: Send + 'static { type Output = (K, ::Output); type Error = (K, ::Error); type Future = BoxFuture<'static, Result>; fn upgrade_outbound(mut self, resource: NegotiatedSubstream, info: Self::Info) -> Self::Future { - let KeyedProtoName(key, info) = info; - - let i = self.upgrades.iter().position(|(k, _)| k == &key) - .expect( - "`upgrade_outbound` is applied to a key from `protocol_info`, which only \ - contains keys from the same set of upgrades we are searching here, therefore \ - looking for this key is guaranteed to give us a non-empty result; qed" - ); - - self.upgrades.remove(i).1 - .upgrade_outbound(resource, info) + let IndexedProtoName(index, info) = info; + let (key, upgrade) = self.upgrades.remove(index); + upgrade.upgrade_outbound(resource, info) .map(move |out| { match out { Ok(o) => Ok((key, o)),