From b9518ec3dcf4bb1e5410833cb9986e1a61d3a244 Mon Sep 17 00:00:00 2001 From: dgarus Date: Tue, 26 Sep 2023 16:11:13 +0300 Subject: [PATCH 01/10] removed Inbound updates --- Cargo.lock | 1 + protocols/identify/Cargo.toml | 1 + protocols/identify/src/handler.rs | 76 ++++++++++++++++++++++-------- protocols/identify/src/protocol.rs | 38 ++------------- 4 files changed, 62 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a7f74567ea2..8155a029563 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2617,6 +2617,7 @@ dependencies = [ "either", "env_logger 0.10.0", "futures", + "futures-bounded", "futures-timer", "libp2p-core", "libp2p-identity", diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 4b56e6c1237..557f5a18736 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -14,6 +14,7 @@ categories = ["network-programming", "asynchronous"] asynchronous-codec = "0.6" futures = "0.3.28" futures-timer = "3.0.2" +futures-bounded = { workspace = true } libp2p-core = { workspace = true } libp2p-swarm = { workspace = true } libp2p-identity = { workspace = true } diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 162a1e8fb06..c6a2194b170 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -18,14 +18,15 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol::{Identify, InboundPush, OutboundPush, Push, UpgradeError}; +use crate::protocol::{recv_push, Identify, OutboundPush, Push, UpgradeError}; use crate::protocol::{Info, PushInfo}; +use crate::{PROTOCOL_NAME, PUSH_PROTOCOL_NAME}; use either::Either; use futures::future::BoxFuture; use futures::prelude::*; -use futures::stream::FuturesUnordered; +use futures_bounded::Timeout; use futures_timer::Delay; -use libp2p_core::upgrade::SelectUpgrade; +use libp2p_core::upgrade::{ReadyUpgrade, SelectUpgrade}; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_identity::PublicKey; @@ -42,6 +43,9 @@ use smallvec::SmallVec; use std::collections::HashSet; use std::{io, task::Context, task::Poll, time::Duration}; +const STREAM_TIMEOUT: Duration = Duration::from_secs(60); +const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10; + /// Protocol handler for sending and receiving identification requests. /// /// Outbound requests are sent periodically. The handler performs expects @@ -56,7 +60,7 @@ pub struct Handler { >, /// Pending identification replies, awaiting being sent. - pending_replies: FuturesUnordered>>, + pending_replies: futures_bounded::FuturesSet>, /// Future that fires when we need to identify the node again. trigger_next_identify: Delay, @@ -127,7 +131,10 @@ impl Handler { remote_peer_id, inbound_identify_push: Default::default(), events: SmallVec::new(), - pending_replies: FuturesUnordered::new(), + pending_replies: futures_bounded::FuturesSet::new( + STREAM_TIMEOUT, + MAX_CONCURRENT_STREAMS_PER_CONNECTION, + ), trigger_next_identify: Delay::new(initial_delay), exchanged_one_periodic_identify: false, interval, @@ -152,14 +159,23 @@ impl Handler { >, ) { match output { - future::Either::Left(substream) => { + future::Either::Left(stream) => { let info = self.build_info(); - self.pending_replies - .push(crate::protocol::send(substream, info).boxed()); + if self + .pending_replies + .try_push(crate::protocol::send(stream, info)) + .is_err() + { + warn!("Dropping inbound stream because we are at capacity"); + } } - future::Either::Right(fut) => { - if self.inbound_identify_push.replace(fut).is_some() { + future::Either::Right(stream) => { + if self + .inbound_identify_push + .replace(recv_push(stream).boxed()) + .is_some() + { warn!( "New inbound identify push stream from {} while still \ upgrading previous one. Replacing previous with new.", @@ -268,13 +284,20 @@ impl ConnectionHandler for Handler { type FromBehaviour = InEvent; type ToBehaviour = Event; type Error = io::Error; - type InboundProtocol = SelectUpgrade>; + type InboundProtocol = + SelectUpgrade, ReadyUpgrade>; type OutboundProtocol = Either>; type OutboundOpenInfo = (); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(SelectUpgrade::new(Identify, Push::inbound()), ()) + SubstreamProtocol::new( + SelectUpgrade::new( + ReadyUpgrade::new(PROTOCOL_NAME), + ReadyUpgrade::new(PUSH_PROTOCOL_NAME), + ), + (), + ) } fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { @@ -342,14 +365,29 @@ impl ConnectionHandler for Handler { } } - // Check for pending replies to send. - if let Poll::Ready(Some(result)) = self.pending_replies.poll_next_unpin(cx) { - let event = result - .map(|()| Event::Identification) - .unwrap_or_else(|err| Event::IdentificationError(StreamUpgradeError::Apply(err))); - self.exchanged_one_periodic_identify = true; + // Check for pending replies. + match self.pending_replies.poll_unpin(cx) { + Poll::Ready(Ok(Ok(()))) => { + self.exchanged_one_periodic_identify = true; + + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::Identification, + )); + } + Poll::Ready(Ok(Err(e))) => { + self.exchanged_one_periodic_identify = true; - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + // return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationError(StreamUpgradeError::Apply(e)), + )); + } + Poll::Ready(Err(Timeout { .. })) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationError(StreamUpgradeError::Timeout), + )); + } + Poll::Pending => {} } Poll::Pending diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 989c94a4d67..557af6246f9 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -20,10 +20,10 @@ use crate::proto; use asynchronous_codec::{FramedRead, FramedWrite}; -use futures::{future::BoxFuture, prelude::*}; +use futures::prelude::*; use libp2p_core::{ multiaddr, - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, + upgrade::{OutboundUpgrade, UpgradeInfo}, Multiaddr, }; use libp2p_identity as identity; @@ -33,7 +33,6 @@ use log::{debug, trace}; use std::convert::TryFrom; use std::{io, iter, pin::Pin}; use thiserror::Error; -use void::Void; const MAX_MESSAGE_SIZE_BYTES: usize = 4096; @@ -48,15 +47,8 @@ pub struct Identify; /// Substream upgrade protocol for `/ipfs/id/push/1.0.0`. #[derive(Debug, Clone)] pub struct Push(T); -pub struct InboundPush(); pub struct OutboundPush(Info); -impl Push { - pub fn inbound() -> Self { - Push(InboundPush()) - } -} - impl Push { pub fn outbound(info: Info) -> Self { Push(OutboundPush(info)) @@ -126,16 +118,6 @@ impl UpgradeInfo for Identify { } } -impl InboundUpgrade for Identify { - type Output = C; - type Error = UpgradeError; - type Future = future::Ready>; - - fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { - future::ok(socket) - } -} - impl OutboundUpgrade for Identify where C: AsyncRead + AsyncWrite + Unpin + Send + 'static, @@ -158,20 +140,6 @@ impl UpgradeInfo for Push { } } -impl InboundUpgrade for Push -where - C: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - type Output = BoxFuture<'static, Result>; - type Error = Void; - type Future = future::Ready>; - - fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { - // Lazily upgrade stream, thus allowing upgrade to happen within identify's handler. - future::ok(recv_push(socket).boxed()) - } -} - impl OutboundUpgrade for Push where C: AsyncWrite + Unpin + Send + 'static, @@ -219,7 +187,7 @@ where Ok(()) } -async fn recv_push(socket: T) -> Result +pub(crate) async fn recv_push(socket: T) -> Result where T: AsyncRead + AsyncWrite + Unpin, { From 2e9e67ad692515f9639e875920f1845cb598f962 Mon Sep 17 00:00:00 2001 From: dgarus Date: Tue, 26 Sep 2023 16:44:06 +0300 Subject: [PATCH 02/10] removed Outbound updates --- protocols/identify/src/handler.rs | 137 +++++++++++++++++++++-------- protocols/identify/src/protocol.rs | 69 +-------------- 2 files changed, 102 insertions(+), 104 deletions(-) diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index c6a2194b170..61971032bcd 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol::{recv_push, Identify, OutboundPush, Push, UpgradeError}; +use crate::protocol::{recv_identify, recv_push, send, UpgradeError}; use crate::protocol::{Info, PushInfo}; use crate::{PROTOCOL_NAME, PUSH_PROTOCOL_NAME}; use either::Either; @@ -31,8 +31,7 @@ use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_identity::PublicKey; use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ProtocolSupport, + ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound, ProtocolSupport, }; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError, @@ -56,12 +55,23 @@ pub struct Handler { inbound_identify_push: Option>>, /// Pending events to yield. events: SmallVec< - [ConnectionHandlerEvent>, (), Event, io::Error>; 4], + [ConnectionHandlerEvent< + Either, ReadyUpgrade>, + (), + Event, + io::Error, + >; 4], >, /// Pending identification replies, awaiting being sent. pending_replies: futures_bounded::FuturesSet>, + /// Pending identify requests. + outbound_identify_futs: futures_bounded::FuturesSet>, + + /// Pending identify/push requests. + outbound_identify_push_futs: futures_bounded::FuturesSet>, + /// Future that fires when we need to identify the node again. trigger_next_identify: Delay, @@ -135,6 +145,14 @@ impl Handler { STREAM_TIMEOUT, MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), + outbound_identify_futs: futures_bounded::FuturesSet::new( + STREAM_TIMEOUT, + MAX_CONCURRENT_STREAMS_PER_CONNECTION, + ), + outbound_identify_push_futs: futures_bounded::FuturesSet::new( + STREAM_TIMEOUT, + MAX_CONCURRENT_STREAMS_PER_CONNECTION, + ), trigger_next_identify: Delay::new(initial_delay), exchanged_one_periodic_identify: false, interval, @@ -196,34 +214,29 @@ impl Handler { >, ) { match output { - future::Either::Left(remote_info) => { - self.handle_incoming_info(&remote_info); + future::Either::Left(stream) => { + if self + .outbound_identify_futs + .try_push(recv_identify(stream)) + .is_err() + { + warn!("Dropping outbound identify stream because we are at capacity"); + } + } + future::Either::Right(stream) => { + let info = self.build_info(); - self.events - .push(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified( - remote_info, - ))); + if self + .outbound_identify_push_futs + .try_push(send(stream, info)) + .is_err() + { + warn!("Dropping outbound identify push stream because we are at capacity"); + } } - future::Either::Right(()) => self.events.push(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationPushed, - )), } } - fn on_dial_upgrade_error( - &mut self, - DialUpgradeError { error: err, .. }: DialUpgradeError< - ::OutboundOpenInfo, - ::OutboundProtocol, - >, - ) { - let err = err.map_upgrade_err(|e| e.into_inner()); - self.events.push(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationError(err), - )); - self.trigger_next_identify.reset(self.interval); - } - fn build_info(&mut self) -> Info { Info { public_key: self.public_key.clone(), @@ -286,7 +299,7 @@ impl ConnectionHandler for Handler { type Error = io::Error; type InboundProtocol = SelectUpgrade, ReadyUpgrade>; - type OutboundProtocol = Either>; + type OutboundProtocol = Either, ReadyUpgrade>; type OutboundOpenInfo = (); type InboundOpenInfo = (); @@ -306,10 +319,12 @@ impl ConnectionHandler for Handler { self.external_addresses = addresses; } InEvent::Push => { - let info = self.build_info(); self.events .push(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(Either::Right(Push::outbound(info)), ()), + protocol: SubstreamProtocol::new( + Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)), + (), + ), }); } } @@ -340,10 +355,13 @@ impl ConnectionHandler for Handler { // Poll the future that fires when we need to identify the node again. if let Poll::Ready(()) = self.trigger_next_identify.poll_unpin(cx) { self.trigger_next_identify.reset(self.interval); - let ev = ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(Either::Left(Identify), ()), + let event = ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new( + Either::Left(ReadyUpgrade::new(PROTOCOL_NAME)), + (), + ), }; - return Poll::Ready(ev); + return Poll::Ready(event); } if let Some(Poll::Ready(res)) = self @@ -377,7 +395,48 @@ impl ConnectionHandler for Handler { Poll::Ready(Ok(Err(e))) => { self.exchanged_one_periodic_identify = true; - // return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationError(StreamUpgradeError::Apply(e)), + )); + } + Poll::Ready(Err(Timeout { .. })) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationError(StreamUpgradeError::Timeout), + )); + } + Poll::Pending => {} + } + + match self.outbound_identify_futs.poll_unpin(cx) { + Poll::Ready(Ok(Ok(remote_info))) => { + self.handle_incoming_info(&remote_info); + + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified( + remote_info, + ))); + } + Poll::Ready(Ok(Err(e))) => { + self.trigger_next_identify.reset(self.interval); + + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationError(StreamUpgradeError::Apply(e)), + )); + } + Poll::Ready(Err(Timeout { .. })) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationError(StreamUpgradeError::Timeout), + )); + } + Poll::Pending => {} + } + + match self.outbound_identify_push_futs.poll_unpin(cx) { + Poll::Ready(Ok(Ok(()))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationPushed, + )); + } + Poll::Ready(Ok(Err(e))) => { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::IdentificationError(StreamUpgradeError::Apply(e)), )); @@ -409,8 +468,11 @@ impl ConnectionHandler for Handler { ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { self.on_fully_negotiated_outbound(fully_negotiated_outbound) } - ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { - self.on_dial_upgrade_error(dial_upgrade_error) + ConnectionEvent::DialUpgradeError(_dial_upgrade_error) => { + self.events.push(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationError(StreamUpgradeError::NegotiationFailed), + )); + self.trigger_next_identify.reset(self.interval); } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) @@ -430,11 +492,10 @@ impl ConnectionHandler for Handler { self.remote_peer_id ); - let info = self.build_info(); self.events .push(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new( - Either::Right(Push::outbound(info)), + Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)), (), ), }); diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 557af6246f9..3430bc336f9 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -21,17 +21,13 @@ use crate::proto; use asynchronous_codec::{FramedRead, FramedWrite}; use futures::prelude::*; -use libp2p_core::{ - multiaddr, - upgrade::{OutboundUpgrade, UpgradeInfo}, - Multiaddr, -}; +use libp2p_core::{multiaddr, Multiaddr}; use libp2p_identity as identity; use libp2p_identity::PublicKey; use libp2p_swarm::StreamProtocol; use log::{debug, trace}; use std::convert::TryFrom; -use std::{io, iter, pin::Pin}; +use std::io; use thiserror::Error; const MAX_MESSAGE_SIZE_BYTES: usize = 4096; @@ -40,21 +36,6 @@ pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/1.0.0"); pub const PUSH_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/push/1.0.0"); -/// Substream upgrade protocol for `/ipfs/id/1.0.0`. -#[derive(Debug, Clone)] -pub struct Identify; - -/// Substream upgrade protocol for `/ipfs/id/push/1.0.0`. -#[derive(Debug, Clone)] -pub struct Push(T); -pub struct OutboundPush(Info); - -impl Push { - pub fn outbound(info: Info) -> Self { - Push(OutboundPush(info)) - } -} - /// Identify information of a peer sent in protocol messages. #[derive(Debug, Clone)] pub struct Info { @@ -109,50 +90,6 @@ pub struct PushInfo { pub observed_addr: Option, } -impl UpgradeInfo for Identify { - type Info = StreamProtocol; - type InfoIter = iter::Once; - - fn protocol_info(&self) -> Self::InfoIter { - iter::once(PROTOCOL_NAME) - } -} - -impl OutboundUpgrade for Identify -where - C: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - type Output = Info; - type Error = UpgradeError; - type Future = Pin> + Send>>; - - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { - recv_identify(socket).boxed() - } -} - -impl UpgradeInfo for Push { - type Info = StreamProtocol; - type InfoIter = iter::Once; - - fn protocol_info(&self) -> Self::InfoIter { - iter::once(PUSH_PROTOCOL_NAME) - } -} - -impl OutboundUpgrade for Push -where - C: AsyncWrite + Unpin + Send + 'static, -{ - type Output = (); - type Error = UpgradeError; - type Future = Pin> + Send>>; - - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { - send(socket, self.0 .0).boxed() - } -} - pub(crate) async fn send(io: T, info: Info) -> Result<(), UpgradeError> where T: AsyncWrite + Unpin, @@ -198,7 +135,7 @@ where Ok(info) } -async fn recv_identify(socket: T) -> Result +pub(crate) async fn recv_identify(socket: T) -> Result where T: AsyncRead + AsyncWrite + Unpin, { From 66d975726a8cd582c0363b97547dd88c6a95e526 Mon Sep 17 00:00:00 2001 From: dgarus Date: Wed, 27 Sep 2023 10:40:12 +0300 Subject: [PATCH 03/10] fix `DialUpgradeError` comment --- protocols/identify/src/handler.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 61971032bcd..d037fbb850e 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -31,7 +31,8 @@ use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_identity::PublicKey; use libp2p_swarm::handler::{ - ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound, ProtocolSupport, + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + ProtocolSupport, }; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError, @@ -468,9 +469,16 @@ impl ConnectionHandler for Handler { ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { self.on_fully_negotiated_outbound(fully_negotiated_outbound) } - ConnectionEvent::DialUpgradeError(_dial_upgrade_error) => { + ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { + let upgrade_error = match error { + StreamUpgradeError::Timeout => StreamUpgradeError::Timeout, + StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed, + StreamUpgradeError::Io(e) => StreamUpgradeError::Io(e), + StreamUpgradeError::Apply(v) => unreachable!("{}", v), + }; + self.events.push(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationError(StreamUpgradeError::NegotiationFailed), + Event::IdentificationError(upgrade_error), )); self.trigger_next_identify.reset(self.interval); } From 8fb1ac8dcd1eef5a160a28c5edbb373ac177e70b Mon Sep 17 00:00:00 2001 From: dgarus Date: Wed, 27 Sep 2023 17:09:35 +0300 Subject: [PATCH 04/10] fix review comments --- protocols/identify/src/handler.rs | 137 +++++++---------------------- protocols/identify/src/protocol.rs | 42 +++++++-- 2 files changed, 69 insertions(+), 110 deletions(-) diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index d037fbb850e..145f96abc83 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -18,11 +18,11 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol::{recv_identify, recv_push, send, UpgradeError}; -use crate::protocol::{Info, PushInfo}; +use crate::protocol::{ + identify_push_send, recv_identify, recv_push, Info, OperationOkValue, UpgradeError, +}; use crate::{PROTOCOL_NAME, PUSH_PROTOCOL_NAME}; use either::Either; -use futures::future::BoxFuture; use futures::prelude::*; use futures_bounded::Timeout; use futures_timer::Delay; @@ -44,7 +44,7 @@ use std::collections::HashSet; use std::{io, task::Context, task::Poll, time::Duration}; const STREAM_TIMEOUT: Duration = Duration::from_secs(60); -const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10; +const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 4 * 10; /// Protocol handler for sending and receiving identification requests. /// @@ -53,7 +53,6 @@ const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10; /// permitting the underlying connection to be closed. pub struct Handler { remote_peer_id: PeerId, - inbound_identify_push: Option>>, /// Pending events to yield. events: SmallVec< [ConnectionHandlerEvent< @@ -64,14 +63,7 @@ pub struct Handler { >; 4], >, - /// Pending identification replies, awaiting being sent. - pending_replies: futures_bounded::FuturesSet>, - - /// Pending identify requests. - outbound_identify_futs: futures_bounded::FuturesSet>, - - /// Pending identify/push requests. - outbound_identify_push_futs: futures_bounded::FuturesSet>, + identify_futs: futures_bounded::FuturesSet>, /// Future that fires when we need to identify the node again. trigger_next_identify: Delay, @@ -140,17 +132,8 @@ impl Handler { ) -> Self { Self { remote_peer_id, - inbound_identify_push: Default::default(), events: SmallVec::new(), - pending_replies: futures_bounded::FuturesSet::new( - STREAM_TIMEOUT, - MAX_CONCURRENT_STREAMS_PER_CONNECTION, - ), - outbound_identify_futs: futures_bounded::FuturesSet::new( - STREAM_TIMEOUT, - MAX_CONCURRENT_STREAMS_PER_CONNECTION, - ), - outbound_identify_push_futs: futures_bounded::FuturesSet::new( + identify_futs: futures_bounded::FuturesSet::new( STREAM_TIMEOUT, MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), @@ -182,8 +165,8 @@ impl Handler { let info = self.build_info(); if self - .pending_replies - .try_push(crate::protocol::send(stream, info)) + .identify_futs + .try_push(crate::protocol::identify_send(stream, info)) .is_err() { warn!("Dropping inbound stream because we are at capacity"); @@ -191,15 +174,11 @@ impl Handler { } future::Either::Right(stream) => { if self - .inbound_identify_push - .replace(recv_push(stream).boxed()) - .is_some() + .identify_futs + .try_push(recv_push(stream).boxed()) + .is_err() { - warn!( - "New inbound identify push stream from {} while still \ - upgrading previous one. Replacing previous with new.", - self.remote_peer_id, - ); + warn!("Dropping inbound identify push stream because we are at capacity"); } } } @@ -216,11 +195,7 @@ impl Handler { ) { match output { future::Either::Left(stream) => { - if self - .outbound_identify_futs - .try_push(recv_identify(stream)) - .is_err() - { + if self.identify_futs.try_push(recv_identify(stream)).is_err() { warn!("Dropping outbound identify stream because we are at capacity"); } } @@ -228,8 +203,8 @@ impl Handler { let info = self.build_info(); if self - .outbound_identify_push_futs - .try_push(send(stream, info)) + .identify_futs + .try_push(identify_push_send(stream, info)) .is_err() { warn!("Dropping outbound identify push stream because we are at capacity"); @@ -332,11 +307,7 @@ impl ConnectionHandler for Handler { } fn connection_keep_alive(&self) -> KeepAlive { - if self.inbound_identify_push.is_some() { - return KeepAlive::Yes; - } - - if !self.pending_replies.is_empty() { + if !self.identify_futs.is_empty() { return KeepAlive::Yes; } @@ -365,79 +336,39 @@ impl ConnectionHandler for Handler { return Poll::Ready(event); } - if let Some(Poll::Ready(res)) = self - .inbound_identify_push - .as_mut() - .map(|f| f.poll_unpin(cx)) - { - self.inbound_identify_push.take(); - - if let Ok(remote_push_info) = res { - if let Some(mut info) = self.remote_info.clone() { - info.merge(remote_push_info); - self.handle_incoming_info(&info); - - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::Identified(info), - )); - }; - } - } - - // Check for pending replies. - match self.pending_replies.poll_unpin(cx) { - Poll::Ready(Ok(Ok(()))) => { - self.exchanged_one_periodic_identify = true; - - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::Identification, - )); - } - Poll::Ready(Ok(Err(e))) => { - self.exchanged_one_periodic_identify = true; - - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationError(StreamUpgradeError::Apply(e)), - )); - } - Poll::Ready(Err(Timeout { .. })) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationError(StreamUpgradeError::Timeout), - )); - } - Poll::Pending => {} - } - - match self.outbound_identify_futs.poll_unpin(cx) { - Poll::Ready(Ok(Ok(remote_info))) => { + match self.identify_futs.poll_unpin(cx) { + Poll::Ready(Ok(Ok(OperationOkValue::ReceiveIdentify(remote_info)))) => { self.handle_incoming_info(&remote_info); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified( remote_info, ))); } - Poll::Ready(Ok(Err(e))) => { - self.trigger_next_identify.reset(self.interval); - + Poll::Ready(Ok(Ok(OperationOkValue::SendIdentifyPush))) => { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationError(StreamUpgradeError::Apply(e)), + Event::IdentificationPushed, )); } - Poll::Ready(Err(Timeout { .. })) => { + Poll::Ready(Ok(Ok(OperationOkValue::SendIdentify))) => { + self.exchanged_one_periodic_identify = true; + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationError(StreamUpgradeError::Timeout), + Event::Identification, )); } - Poll::Pending => {} - } + Poll::Ready(Ok(Ok(OperationOkValue::ReceiveIdentifyPush(remote_push_info)))) => { + if let Some(mut info) = self.remote_info.clone() { + info.merge(remote_push_info); + self.handle_incoming_info(&info); - match self.outbound_identify_push_futs.poll_unpin(cx) { - Poll::Ready(Ok(Ok(()))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationPushed, - )); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::Identified(info), + )); + }; } Poll::Ready(Ok(Err(e))) => { + self.trigger_next_identify.reset(self.interval); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::IdentificationError(StreamUpgradeError::Apply(e)), )); diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 3430bc336f9..58583319b2a 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -90,7 +90,28 @@ pub struct PushInfo { pub observed_addr: Option, } -pub(crate) async fn send(io: T, info: Info) -> Result<(), UpgradeError> +pub(crate) async fn identify_send(io: T, info: Info) -> Result +where + T: AsyncWrite + Unpin, +{ + send(io, info).await?; + + Ok(OperationOkValue::SendIdentify) +} + +pub(crate) async fn identify_push_send( + io: T, + info: Info, +) -> Result +where + T: AsyncWrite + Unpin, +{ + send(io, info).await?; + + Ok(OperationOkValue::SendIdentifyPush) +} + +async fn send(io: T, info: Info) -> Result<(), UpgradeError> where T: AsyncWrite + Unpin, { @@ -124,26 +145,26 @@ where Ok(()) } -pub(crate) async fn recv_push(socket: T) -> Result +pub(crate) async fn recv_push(socket: T) -> Result where T: AsyncRead + AsyncWrite + Unpin, { - let info = recv(socket).await?.try_into()?; + let info: PushInfo = recv(socket).await?.try_into()?; trace!("Received {:?}", info); - Ok(info) + Ok(OperationOkValue::ReceiveIdentifyPush(info)) } -pub(crate) async fn recv_identify(socket: T) -> Result +pub(crate) async fn recv_identify(socket: T) -> Result where T: AsyncRead + AsyncWrite + Unpin, { - let info = recv(socket).await?.try_into()?; + let info: Info = recv(socket).await?.try_into()?; trace!("Received {:?}", info); - Ok(info) + Ok(OperationOkValue::ReceiveIdentify(info)) } async fn recv(socket: T) -> Result @@ -254,6 +275,13 @@ impl TryFrom for PushInfo { } } +pub(crate) enum OperationOkValue { + SendIdentify, + ReceiveIdentify(Info), + SendIdentifyPush, + ReceiveIdentifyPush(PushInfo), +} + #[derive(Debug, Error)] pub enum UpgradeError { #[error(transparent)] From 7167162b260f121c2741dfdba4b03f363aa9567c Mon Sep 17 00:00:00 2001 From: dgarus Date: Thu, 28 Sep 2023 11:09:06 +0300 Subject: [PATCH 05/10] fix review comments --- protocols/identify/src/handler.rs | 37 ++++++++++++++++--------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 145f96abc83..b74eb4c7a5e 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -18,10 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol::{ - identify_push_send, recv_identify, recv_push, Info, OperationOkValue, UpgradeError, -}; -use crate::{PROTOCOL_NAME, PUSH_PROTOCOL_NAME}; +use crate::protocol::{Info, OperationOkValue, UpgradeError}; +use crate::{protocol, PROTOCOL_NAME, PUSH_PROTOCOL_NAME}; use either::Either; use futures::prelude::*; use futures_bounded::Timeout; @@ -44,7 +42,7 @@ use std::collections::HashSet; use std::{io, task::Context, task::Poll, time::Duration}; const STREAM_TIMEOUT: Duration = Duration::from_secs(60); -const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 4 * 10; +const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10; /// Protocol handler for sending and receiving identification requests. /// @@ -63,7 +61,7 @@ pub struct Handler { >; 4], >, - identify_futs: futures_bounded::FuturesSet>, + active_streams: futures_bounded::FuturesSet>, /// Future that fires when we need to identify the node again. trigger_next_identify: Delay, @@ -133,7 +131,7 @@ impl Handler { Self { remote_peer_id, events: SmallVec::new(), - identify_futs: futures_bounded::FuturesSet::new( + active_streams: futures_bounded::FuturesSet::new( STREAM_TIMEOUT, MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), @@ -165,8 +163,8 @@ impl Handler { let info = self.build_info(); if self - .identify_futs - .try_push(crate::protocol::identify_send(stream, info)) + .active_streams + .try_push(protocol::identify_send(stream, info)) .is_err() { warn!("Dropping inbound stream because we are at capacity"); @@ -174,8 +172,8 @@ impl Handler { } future::Either::Right(stream) => { if self - .identify_futs - .try_push(recv_push(stream).boxed()) + .active_streams + .try_push(protocol::recv_push(stream)) .is_err() { warn!("Dropping inbound identify push stream because we are at capacity"); @@ -195,7 +193,7 @@ impl Handler { ) { match output { future::Either::Left(stream) => { - if self.identify_futs.try_push(recv_identify(stream)).is_err() { + if self.active_streams.try_push(protocol::recv_identify(stream)).is_err() { warn!("Dropping outbound identify stream because we are at capacity"); } } @@ -203,8 +201,8 @@ impl Handler { let info = self.build_info(); if self - .identify_futs - .try_push(identify_push_send(stream, info)) + .active_streams + .try_push(protocol::identify_push_send(stream, info)) .is_err() { warn!("Dropping outbound identify push stream because we are at capacity"); @@ -307,7 +305,7 @@ impl ConnectionHandler for Handler { } fn connection_keep_alive(&self) -> KeepAlive { - if !self.identify_futs.is_empty() { + if !self.active_streams.is_empty() { return KeepAlive::Yes; } @@ -336,7 +334,7 @@ impl ConnectionHandler for Handler { return Poll::Ready(event); } - match self.identify_futs.poll_unpin(cx) { + match self.active_streams.poll_unpin(cx) { Poll::Ready(Ok(Ok(OperationOkValue::ReceiveIdentify(remote_info)))) => { self.handle_incoming_info(&remote_info); @@ -367,7 +365,10 @@ impl ConnectionHandler for Handler { }; } Poll::Ready(Ok(Err(e))) => { - self.trigger_next_identify.reset(self.interval); + // todo + // self.trigger_next_identify.reset(self.interval); //ReceiveIdentify Event::Identified(remote_info) + // or + // self.exchanged_one_periodic_identify = true; //SendIdentify Event::Identification, return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::IdentificationError(StreamUpgradeError::Apply(e)), @@ -405,7 +406,7 @@ impl ConnectionHandler for Handler { StreamUpgradeError::Timeout => StreamUpgradeError::Timeout, StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed, StreamUpgradeError::Io(e) => StreamUpgradeError::Io(e), - StreamUpgradeError::Apply(v) => unreachable!("{}", v), + StreamUpgradeError::Apply(v) => void::unreachable(v.into_inner()), }; self.events.push(ConnectionHandlerEvent::NotifyBehaviour( From 2af6d9484f8dafe38e0e9a707716131a45965a9a Mon Sep 17 00:00:00 2001 From: dgarus Date: Thu, 28 Sep 2023 12:54:47 +0300 Subject: [PATCH 06/10] fix review comments --- protocols/identify/src/handler.rs | 48 +++++++++++++++++++++++------- protocols/identify/src/protocol.rs | 42 +++++--------------------- 2 files changed, 45 insertions(+), 45 deletions(-) diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index b74eb4c7a5e..da27a22ae42 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol::{Info, OperationOkValue, UpgradeError}; +use crate::protocol::{Info, PushInfo, UpgradeError}; use crate::{protocol, PROTOCOL_NAME, PUSH_PROTOCOL_NAME}; use either::Either; use futures::prelude::*; @@ -61,7 +61,7 @@ pub struct Handler { >; 4], >, - active_streams: futures_bounded::FuturesSet>, + active_streams: futures_bounded::FuturesSet>, /// Future that fires when we need to identify the node again. trigger_next_identify: Delay, @@ -164,7 +164,11 @@ impl Handler { if self .active_streams - .try_push(protocol::identify_send(stream, info)) + .try_push(async move { + protocol::send_identify(stream, info).await?; + + Ok(Success::SendIdentify) + }) .is_err() { warn!("Dropping inbound stream because we are at capacity"); @@ -173,7 +177,12 @@ impl Handler { future::Either::Right(stream) => { if self .active_streams - .try_push(protocol::recv_push(stream)) + .try_push(async { + let info = protocol::recv_push(stream).await?; + + Ok(Success::ReceiveIdentifyPush(info)) + } + ) .is_err() { warn!("Dropping inbound identify push stream because we are at capacity"); @@ -193,7 +202,13 @@ impl Handler { ) { match output { future::Either::Left(stream) => { - if self.active_streams.try_push(protocol::recv_identify(stream)).is_err() { + if self.active_streams.try_push( + async move { + let info = protocol::recv_identify(stream).await?; + + Ok(Success::ReceiveIdentify(info)) + } + ).is_err() { warn!("Dropping outbound identify stream because we are at capacity"); } } @@ -202,7 +217,13 @@ impl Handler { if self .active_streams - .try_push(protocol::identify_push_send(stream, info)) + .try_push( + async move { + protocol::send_identify(stream, info).await?; + + Ok(Success::SendIdentifyPush) + } + ) .is_err() { warn!("Dropping outbound identify push stream because we are at capacity"); @@ -335,26 +356,26 @@ impl ConnectionHandler for Handler { } match self.active_streams.poll_unpin(cx) { - Poll::Ready(Ok(Ok(OperationOkValue::ReceiveIdentify(remote_info)))) => { + Poll::Ready(Ok(Ok(Success::ReceiveIdentify(remote_info)))) => { self.handle_incoming_info(&remote_info); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified( remote_info, ))); } - Poll::Ready(Ok(Ok(OperationOkValue::SendIdentifyPush))) => { + Poll::Ready(Ok(Ok(Success::SendIdentifyPush))) => { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::IdentificationPushed, )); } - Poll::Ready(Ok(Ok(OperationOkValue::SendIdentify))) => { + Poll::Ready(Ok(Ok(Success::SendIdentify))) => { self.exchanged_one_periodic_identify = true; return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::Identification, )); } - Poll::Ready(Ok(Ok(OperationOkValue::ReceiveIdentifyPush(remote_push_info)))) => { + Poll::Ready(Ok(Ok(Success::ReceiveIdentifyPush(remote_push_info)))) => { if let Some(mut info) = self.remote_info.clone() { info.merge(remote_push_info); self.handle_incoming_info(&info); @@ -444,3 +465,10 @@ impl ConnectionHandler for Handler { } } } + +pub enum Success { + SendIdentify, + ReceiveIdentify(Info), + SendIdentifyPush, + ReceiveIdentifyPush(PushInfo), +} diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 58583319b2a..5e2891e04e4 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -90,28 +90,7 @@ pub struct PushInfo { pub observed_addr: Option, } -pub(crate) async fn identify_send(io: T, info: Info) -> Result -where - T: AsyncWrite + Unpin, -{ - send(io, info).await?; - - Ok(OperationOkValue::SendIdentify) -} - -pub(crate) async fn identify_push_send( - io: T, - info: Info, -) -> Result -where - T: AsyncWrite + Unpin, -{ - send(io, info).await?; - - Ok(OperationOkValue::SendIdentifyPush) -} - -async fn send(io: T, info: Info) -> Result<(), UpgradeError> +pub(crate) async fn send_identify(io: T, info: Info) -> Result<(), UpgradeError> where T: AsyncWrite + Unpin, { @@ -145,26 +124,26 @@ where Ok(()) } -pub(crate) async fn recv_push(socket: T) -> Result +pub(crate) async fn recv_push(socket: T) -> Result where T: AsyncRead + AsyncWrite + Unpin, { - let info: PushInfo = recv(socket).await?.try_into()?; + let info = recv(socket).await?.try_into()?; trace!("Received {:?}", info); - Ok(OperationOkValue::ReceiveIdentifyPush(info)) + Ok(info) } -pub(crate) async fn recv_identify(socket: T) -> Result +pub(crate) async fn recv_identify(socket: T) -> Result where T: AsyncRead + AsyncWrite + Unpin, { - let info: Info = recv(socket).await?.try_into()?; + let info = recv(socket).await?.try_into()?; trace!("Received {:?}", info); - Ok(OperationOkValue::ReceiveIdentify(info)) + Ok(info) } async fn recv(socket: T) -> Result @@ -275,13 +254,6 @@ impl TryFrom for PushInfo { } } -pub(crate) enum OperationOkValue { - SendIdentify, - ReceiveIdentify(Info), - SendIdentifyPush, - ReceiveIdentifyPush(PushInfo), -} - #[derive(Debug, Error)] pub enum UpgradeError { #[error(transparent)] From 3de42309d0ad2959b2d4a82a992e297b539d14e1 Mon Sep 17 00:00:00 2001 From: dgarus Date: Thu, 28 Sep 2023 13:32:38 +0300 Subject: [PATCH 07/10] fix review comments --- protocols/identify/src/handler.rs | 51 ++++++++++++------------------- 1 file changed, 19 insertions(+), 32 deletions(-) diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index da27a22ae42..06aa1a51da6 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -164,11 +164,9 @@ impl Handler { if self .active_streams - .try_push(async move { - protocol::send_identify(stream, info).await?; - - Ok(Success::SendIdentify) - }) + .try_push( + protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentify), + ) .is_err() { warn!("Dropping inbound stream because we are at capacity"); @@ -177,12 +175,7 @@ impl Handler { future::Either::Right(stream) => { if self .active_streams - .try_push(async { - let info = protocol::recv_push(stream).await?; - - Ok(Success::ReceiveIdentifyPush(info)) - } - ) + .try_push(protocol::recv_push(stream).map_ok(Success::ReceivedIdentifyPush)) .is_err() { warn!("Dropping inbound identify push stream because we are at capacity"); @@ -202,13 +195,11 @@ impl Handler { ) { match output { future::Either::Left(stream) => { - if self.active_streams.try_push( - async move { - let info = protocol::recv_identify(stream).await?; - - Ok(Success::ReceiveIdentify(info)) - } - ).is_err() { + if self + .active_streams + .try_push(protocol::recv_identify(stream).map_ok(Success::ReceivedIdentify)) + .is_err() + { warn!("Dropping outbound identify stream because we are at capacity"); } } @@ -218,11 +209,7 @@ impl Handler { if self .active_streams .try_push( - async move { - protocol::send_identify(stream, info).await?; - - Ok(Success::SendIdentifyPush) - } + protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentifyPush), ) .is_err() { @@ -356,26 +343,26 @@ impl ConnectionHandler for Handler { } match self.active_streams.poll_unpin(cx) { - Poll::Ready(Ok(Ok(Success::ReceiveIdentify(remote_info)))) => { + Poll::Ready(Ok(Ok(Success::ReceivedIdentify(remote_info)))) => { self.handle_incoming_info(&remote_info); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified( remote_info, ))); } - Poll::Ready(Ok(Ok(Success::SendIdentifyPush))) => { + Poll::Ready(Ok(Ok(Success::SentIdentifyPush))) => { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::IdentificationPushed, )); } - Poll::Ready(Ok(Ok(Success::SendIdentify))) => { + Poll::Ready(Ok(Ok(Success::SentIdentify))) => { self.exchanged_one_periodic_identify = true; return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::Identification, )); } - Poll::Ready(Ok(Ok(Success::ReceiveIdentifyPush(remote_push_info)))) => { + Poll::Ready(Ok(Ok(Success::ReceivedIdentifyPush(remote_push_info)))) => { if let Some(mut info) = self.remote_info.clone() { info.merge(remote_push_info); self.handle_incoming_info(&info); @@ -466,9 +453,9 @@ impl ConnectionHandler for Handler { } } -pub enum Success { - SendIdentify, - ReceiveIdentify(Info), - SendIdentifyPush, - ReceiveIdentifyPush(PushInfo), +enum Success { + SentIdentify, + ReceivedIdentify(Info), + SentIdentifyPush, + ReceivedIdentifyPush(PushInfo), } From 5f7ebdc226aaca47bf6ed856bbe7f1c183d35b70 Mon Sep 17 00:00:00 2001 From: dgarus Date: Thu, 28 Sep 2023 14:48:12 +0300 Subject: [PATCH 08/10] bug fixes --- protocols/identify/src/handler.rs | 141 ++++++++++++++++++++---------- 1 file changed, 97 insertions(+), 44 deletions(-) diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 06aa1a51da6..3815c7ebdd8 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -61,7 +61,8 @@ pub struct Handler { >; 4], >, - active_streams: futures_bounded::FuturesSet>, + next_stream_id: u32, + active_streams: futures_bounded::FuturesMap>, /// Future that fires when we need to identify the node again. trigger_next_identify: Delay, @@ -131,7 +132,8 @@ impl Handler { Self { remote_peer_id, events: SmallVec::new(), - active_streams: futures_bounded::FuturesSet::new( + next_stream_id: 0, + active_streams: futures_bounded::FuturesMap::new( STREAM_TIMEOUT, MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), @@ -149,6 +151,12 @@ impl Handler { } } + fn next_stream_id(&mut self) -> u32 { + self.next_stream_id = self.next_stream_id.wrapping_add(1); + + self.next_stream_id + } + fn on_fully_negotiated_inbound( &mut self, FullyNegotiatedInbound { @@ -158,6 +166,8 @@ impl Handler { ::InboundOpenInfo, >, ) { + let stream_id = self.next_stream_id(); + match output { future::Either::Left(stream) => { let info = self.build_info(); @@ -165,6 +175,7 @@ impl Handler { if self .active_streams .try_push( + StreamId::for_sent_identify(stream_id), protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentify), ) .is_err() @@ -175,7 +186,10 @@ impl Handler { future::Either::Right(stream) => { if self .active_streams - .try_push(protocol::recv_push(stream).map_ok(Success::ReceivedIdentifyPush)) + .try_push( + StreamId::for_receive_identify_push(stream_id), + protocol::recv_push(stream).map_ok(Success::ReceivedIdentifyPush), + ) .is_err() { warn!("Dropping inbound identify push stream because we are at capacity"); @@ -193,11 +207,15 @@ impl Handler { ::OutboundOpenInfo, >, ) { + let stream_id = self.next_stream_id(); match output { future::Either::Left(stream) => { if self .active_streams - .try_push(protocol::recv_identify(stream).map_ok(Success::ReceivedIdentify)) + .try_push( + StreamId::for_receive_identify(stream_id), + protocol::recv_identify(stream).map_ok(Success::ReceivedIdentify), + ) .is_err() { warn!("Dropping outbound identify stream because we are at capacity"); @@ -209,6 +227,7 @@ impl Handler { if self .active_streams .try_push( + StreamId::for_sent_identify_push(stream_id), protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentifyPush), ) .is_err() @@ -342,52 +361,54 @@ impl ConnectionHandler for Handler { return Poll::Ready(event); } - match self.active_streams.poll_unpin(cx) { - Poll::Ready(Ok(Ok(Success::ReceivedIdentify(remote_info)))) => { - self.handle_incoming_info(&remote_info); - - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified( - remote_info, - ))); - } - Poll::Ready(Ok(Ok(Success::SentIdentifyPush))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationPushed, - )); - } - Poll::Ready(Ok(Ok(Success::SentIdentify))) => { + if let Poll::Ready((id, stream_res)) = self.active_streams.poll_unpin(cx) { + if id.is_sent_identify() { self.exchanged_one_periodic_identify = true; - - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::Identification, - )); } - Poll::Ready(Ok(Ok(Success::ReceivedIdentifyPush(remote_push_info)))) => { - if let Some(mut info) = self.remote_info.clone() { - info.merge(remote_push_info); - self.handle_incoming_info(&info); + + match stream_res { + Ok(Ok(Success::ReceivedIdentify(remote_info))) => { + self.handle_incoming_info(&remote_info); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::Identified(info), + Event::Identified(remote_info), )); - }; - } - Poll::Ready(Ok(Err(e))) => { - // todo - // self.trigger_next_identify.reset(self.interval); //ReceiveIdentify Event::Identified(remote_info) - // or - // self.exchanged_one_periodic_identify = true; //SendIdentify Event::Identification, - - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationError(StreamUpgradeError::Apply(e)), - )); - } - Poll::Ready(Err(Timeout { .. })) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationError(StreamUpgradeError::Timeout), - )); + } + Ok(Ok(Success::SentIdentifyPush)) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationPushed, + )); + } + Ok(Ok(Success::SentIdentify)) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::Identification, + )); + } + Ok(Ok(Success::ReceivedIdentifyPush(remote_push_info))) => { + if let Some(mut info) = self.remote_info.clone() { + info.merge(remote_push_info); + self.handle_incoming_info(&info); + + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::Identified(info), + )); + }; + } + Ok(Err(e)) => { + if id.is_receive_identify() { + self.trigger_next_identify.reset(self.interval); + } + + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationError(StreamUpgradeError::Apply(e)), + )); + } + Err(Timeout { .. }) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationError(StreamUpgradeError::Timeout), + )); + } } - Poll::Pending => {} } Poll::Pending @@ -459,3 +480,35 @@ enum Success { SentIdentifyPush, ReceivedIdentifyPush(PushInfo), } + +#[derive(Clone, Hash, PartialEq, Eq)] +struct StreamId { + st: u8, + id: u32, +} + +impl StreamId { + fn for_sent_identify(id: u32) -> Self { + StreamId { st: 0, id } + } + + fn for_receive_identify(id: u32) -> Self { + StreamId { st: 1, id } + } + + fn for_sent_identify_push(id: u32) -> Self { + StreamId { st: 2, id } + } + + fn for_receive_identify_push(id: u32) -> Self { + StreamId { st: 3, id } + } + + fn is_sent_identify(&self) -> bool { + self.st == 0 + } + + fn is_receive_identify(&self) -> bool { + self.st == 1 + } +} From 8844fa26cf447356d4668e199a16f425d63336ca Mon Sep 17 00:00:00 2001 From: dgarus Date: Thu, 28 Sep 2023 15:57:38 +0300 Subject: [PATCH 09/10] bug fixes --- protocols/identify/src/handler.rs | 138 +++++++++--------------------- 1 file changed, 40 insertions(+), 98 deletions(-) diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 3815c7ebdd8..b2e83dd71bf 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -61,8 +61,7 @@ pub struct Handler { >; 4], >, - next_stream_id: u32, - active_streams: futures_bounded::FuturesMap>, + active_streams: futures_bounded::FuturesSet>, /// Future that fires when we need to identify the node again. trigger_next_identify: Delay, @@ -132,8 +131,7 @@ impl Handler { Self { remote_peer_id, events: SmallVec::new(), - next_stream_id: 0, - active_streams: futures_bounded::FuturesMap::new( + active_streams: futures_bounded::FuturesSet::new( STREAM_TIMEOUT, MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), @@ -151,12 +149,6 @@ impl Handler { } } - fn next_stream_id(&mut self) -> u32 { - self.next_stream_id = self.next_stream_id.wrapping_add(1); - - self.next_stream_id - } - fn on_fully_negotiated_inbound( &mut self, FullyNegotiatedInbound { @@ -166,8 +158,6 @@ impl Handler { ::InboundOpenInfo, >, ) { - let stream_id = self.next_stream_id(); - match output { future::Either::Left(stream) => { let info = self.build_info(); @@ -175,21 +165,19 @@ impl Handler { if self .active_streams .try_push( - StreamId::for_sent_identify(stream_id), protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentify), ) .is_err() { warn!("Dropping inbound stream because we are at capacity"); + } else { + self.exchanged_one_periodic_identify = true; } } future::Either::Right(stream) => { if self .active_streams - .try_push( - StreamId::for_receive_identify_push(stream_id), - protocol::recv_push(stream).map_ok(Success::ReceivedIdentifyPush), - ) + .try_push(protocol::recv_push(stream).map_ok(Success::ReceivedIdentifyPush)) .is_err() { warn!("Dropping inbound identify push stream because we are at capacity"); @@ -207,15 +195,11 @@ impl Handler { ::OutboundOpenInfo, >, ) { - let stream_id = self.next_stream_id(); match output { future::Either::Left(stream) => { if self .active_streams - .try_push( - StreamId::for_receive_identify(stream_id), - protocol::recv_identify(stream).map_ok(Success::ReceivedIdentify), - ) + .try_push(protocol::recv_identify(stream).map_ok(Success::ReceivedIdentify)) .is_err() { warn!("Dropping outbound identify stream because we are at capacity"); @@ -227,7 +211,6 @@ impl Handler { if self .active_streams .try_push( - StreamId::for_sent_identify_push(stream_id), protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentifyPush), ) .is_err() @@ -361,54 +344,45 @@ impl ConnectionHandler for Handler { return Poll::Ready(event); } - if let Poll::Ready((id, stream_res)) = self.active_streams.poll_unpin(cx) { - if id.is_sent_identify() { - self.exchanged_one_periodic_identify = true; - } - - match stream_res { - Ok(Ok(Success::ReceivedIdentify(remote_info))) => { - self.handle_incoming_info(&remote_info); + match self.active_streams.poll_unpin(cx) { + Poll::Ready(Ok(Ok(Success::ReceivedIdentify(remote_info)))) => { + self.handle_incoming_info(&remote_info); - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::Identified(remote_info), - )); - } - Ok(Ok(Success::SentIdentifyPush)) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationPushed, - )); - } - Ok(Ok(Success::SentIdentify)) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::Identification, - )); - } - Ok(Ok(Success::ReceivedIdentifyPush(remote_push_info))) => { - if let Some(mut info) = self.remote_info.clone() { - info.merge(remote_push_info); - self.handle_incoming_info(&info); - - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::Identified(info), - )); - }; - } - Ok(Err(e)) => { - if id.is_receive_identify() { - self.trigger_next_identify.reset(self.interval); - } + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified( + remote_info, + ))); + } + Poll::Ready(Ok(Ok(Success::SentIdentifyPush))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationPushed, + )); + } + Poll::Ready(Ok(Ok(Success::SentIdentify))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::Identification, + )); + } + Poll::Ready(Ok(Ok(Success::ReceivedIdentifyPush(remote_push_info)))) => { + if let Some(mut info) = self.remote_info.clone() { + info.merge(remote_push_info); + self.handle_incoming_info(&info); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationError(StreamUpgradeError::Apply(e)), + Event::Identified(info), )); - } - Err(Timeout { .. }) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationError(StreamUpgradeError::Timeout), - )); - } + }; + } + Poll::Ready(Ok(Err(e))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationError(StreamUpgradeError::Apply(e)), + )); + } + Poll::Ready(Err(Timeout { .. })) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationError(StreamUpgradeError::Timeout), + )); } + Poll::Pending => {} } Poll::Pending @@ -480,35 +454,3 @@ enum Success { SentIdentifyPush, ReceivedIdentifyPush(PushInfo), } - -#[derive(Clone, Hash, PartialEq, Eq)] -struct StreamId { - st: u8, - id: u32, -} - -impl StreamId { - fn for_sent_identify(id: u32) -> Self { - StreamId { st: 0, id } - } - - fn for_receive_identify(id: u32) -> Self { - StreamId { st: 1, id } - } - - fn for_sent_identify_push(id: u32) -> Self { - StreamId { st: 2, id } - } - - fn for_receive_identify_push(id: u32) -> Self { - StreamId { st: 3, id } - } - - fn is_sent_identify(&self) -> bool { - self.st == 0 - } - - fn is_receive_identify(&self) -> bool { - self.st == 1 - } -} From e9293a02a91e64d3baca1ff3c59e81f58dce0003 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 29 Sep 2023 09:11:44 +1000 Subject: [PATCH 10/10] Use `map_upgrade_err` utility --- protocols/identify/src/handler.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index b2e83dd71bf..50b9882f2c5 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -405,15 +405,10 @@ impl ConnectionHandler for Handler { self.on_fully_negotiated_outbound(fully_negotiated_outbound) } ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { - let upgrade_error = match error { - StreamUpgradeError::Timeout => StreamUpgradeError::Timeout, - StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed, - StreamUpgradeError::Io(e) => StreamUpgradeError::Io(e), - StreamUpgradeError::Apply(v) => void::unreachable(v.into_inner()), - }; - self.events.push(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationError(upgrade_error), + Event::IdentificationError( + error.map_upgrade_err(|e| void::unreachable(e.into_inner())), + ), )); self.trigger_next_identify.reset(self.interval); }