Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple connections per peer #1440

Merged
merged 34 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4fa8360
Allow multiple connections per peer in libp2p-core.
Jan 16, 2020
5f182f0
Fix intra-rustdoc links.
Feb 13, 2020
66566e4
Update core/src/connection/pool.rs
romanb Feb 14, 2020
51b7102
Address some review feedback and fix doc links.
Feb 13, 2020
ae6bfe9
Allow responses to be sent on the same connection.
Feb 16, 2020
df6c55a
Remove unnecessary remainders of inject_replaced.
Feb 16, 2020
0ae2400
Update swarm/src/behaviour.rs
romanb Feb 17, 2020
1b3c9cf
Update swarm/src/lib.rs
romanb Feb 17, 2020
76fd5d9
Update core/src/connection/manager.rs
romanb Feb 17, 2020
4c81357
Update core/src/connection/manager.rs
romanb Feb 17, 2020
0e836fc
Update core/src/connection/pool.rs
romanb Feb 17, 2020
38f283d
Incorporate more review feedback.
Feb 17, 2020
f3bfe54
Move module declaration below imports.
Feb 17, 2020
b0b39fb
Update core/src/connection/manager.rs
romanb Feb 17, 2020
53810e6
Update core/src/connection/manager.rs
romanb Feb 17, 2020
017d0e9
Simplify as per review.
Feb 18, 2020
f95a1dd
Fix rustoc link.
Feb 18, 2020
21e9919
Add try_notify_handler and simplify.
Feb 18, 2020
0a34b93
Relocate DialingConnection and DialingAttempt.
Feb 18, 2020
efa5fde
Small cleanup.
Feb 19, 2020
73d6241
Merge branch 'master' into multicon
Feb 20, 2020
92d20e8
Small cleanup. More robust EstablishedConnectionIter.
Feb 23, 2020
3540588
Clarify semantics of `DialingPeer::connect`.
Feb 23, 2020
3838c5e
Don't call inject_disconnected on InvalidPeerId.
Feb 24, 2020
d9938b8
Provide public ConnectionId constructor.
Feb 24, 2020
1d251d6
Move the established connection limit check to the right place.
Feb 24, 2020
6161f1c
Clean up connection error handling.
Feb 24, 2020
1a5b184
Revert change in log level and clarify an invariant.
Feb 25, 2020
df58cde
Remove inject_replaced entirely.
Feb 25, 2020
abf249b
Allow notifying all connection handlers.
Feb 25, 2020
b7d6c58
Merge branch 'master' into multicon
Feb 26, 2020
7fe86db
Merge branch 'master' into multicon
Feb 28, 2020
3879b65
Finishing touches.
Feb 29, 2020
86cf85b
Merge branch 'master' into multicon
tomaka Mar 3, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ categories = ["network-programming", "asynchronous"]
asn1_der = "0.6.1"
bs58 = "0.3.0"
ed25519-dalek = "1.0.0-pre.3"
either = "1.5"
fnv = "1.0"
futures = { version = "0.3.1", features = ["compat", "io-compat", "executor", "thread-pool"] }
futures-timer = "3"
Expand Down
331 changes: 331 additions & 0 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,331 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

mod error;
mod handler;
mod listeners;
mod substream;

pub(crate) mod manager;
pub(crate) mod pool;

pub use error::ConnectionError;
pub use handler::{ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler};
pub use listeners::{ListenerId, ListenersStream, ListenersEvent};
pub use manager::ConnectionId;
pub use substream::{Substream, SubstreamEndpoint, Close};
pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection};

use crate::muxing::StreamMuxer;
use crate::{Multiaddr, PeerId};
use std::{fmt, pin::Pin, task::Context, task::Poll};
use std::hash::Hash;
use substream::{Muxing, SubstreamEvent};

/// The endpoint roles associated with a peer-to-peer communication channel.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum Endpoint {
/// The socket comes from a dialer.
Dialer,
/// The socket comes from a listener.
Listener,
}

impl std::ops::Not for Endpoint {
type Output = Endpoint;

fn not(self) -> Self::Output {
match self {
Endpoint::Dialer => Endpoint::Listener,
Endpoint::Listener => Endpoint::Dialer
}
}
}

impl Endpoint {
/// Is this endpoint a dialer?
pub fn is_dialer(self) -> bool {
if let Endpoint::Dialer = self {
true
} else {
false
}
}

/// Is this endpoint a listener?
pub fn is_listener(self) -> bool {
if let Endpoint::Listener = self {
true
} else {
false
}
}
}

/// The endpoint roles associated with a peer-to-peer connection.
#[derive(PartialEq, Eq, Debug, Clone, Hash)]
pub enum ConnectedPoint {
/// We dialed the node.
Dialer {
/// Multiaddress that was successfully dialed.
address: Multiaddr,
},
/// We received the node.
Listener {
/// Local connection address.
local_addr: Multiaddr,
/// Stack of protocols used to send back data to the remote.
send_back_addr: Multiaddr,
}
}

impl From<&'_ ConnectedPoint> for Endpoint {
fn from(endpoint: &'_ ConnectedPoint) -> Endpoint {
endpoint.to_endpoint()
}
}

impl From<ConnectedPoint> for Endpoint {
fn from(endpoint: ConnectedPoint) -> Endpoint {
endpoint.to_endpoint()
}
}

impl ConnectedPoint {
/// Turns the `ConnectedPoint` into the corresponding `Endpoint`.
pub fn to_endpoint(&self) -> Endpoint {
match self {
ConnectedPoint::Dialer { .. } => Endpoint::Dialer,
ConnectedPoint::Listener { .. } => Endpoint::Listener
}
}

/// Returns true if we are `Dialer`.
pub fn is_dialer(&self) -> bool {
match self {
ConnectedPoint::Dialer { .. } => true,
ConnectedPoint::Listener { .. } => false
}
}

/// Returns true if we are `Listener`.
pub fn is_listener(&self) -> bool {
match self {
ConnectedPoint::Dialer { .. } => false,
ConnectedPoint::Listener { .. } => true
}
}
}

/// Information about a successfully established connection.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Connected<I> {
/// The connected endpoint, including network address information.
pub endpoint: ConnectedPoint,
/// Information obtained from the transport.
pub info: I,
}

impl<I> Connected<I>
where
I: ConnectionInfo
{
pub fn peer_id(&self) -> &I::PeerId {
self.info.peer_id()
}
}

/// Information about a connection.
pub trait ConnectionInfo {
/// Identity of the node we are connected to.
type PeerId: Eq + Hash;

/// Returns the identity of the node we are connected to on this connection.
fn peer_id(&self) -> &Self::PeerId;
}

impl ConnectionInfo for PeerId {
type PeerId = PeerId;

fn peer_id(&self) -> &PeerId {
self
}
}

/// A multiplexed connection to a peer with an associated `ConnectionHandler`.
pub struct Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
/// Node that handles the muxing.
muxing: substream::Muxing<TMuxer, THandler::OutboundOpenInfo>,
/// Handler that processes substreams.
handler: THandler,
}

impl<TMuxer, THandler> fmt::Debug for Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection")
.field("muxing", &self.muxing)
.field("handler", &self.handler)
.finish()
}
}

impl<TMuxer, THandler> Unpin for Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
}

impl<TMuxer, THandler> Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
/// Builds a new `Connection` from the given substream multiplexer
/// and connection handler.
pub fn new(muxer: TMuxer, handler: THandler) -> Self {
Connection {
muxing: Muxing::new(muxer),
handler,
}
}

/// Returns a reference to the `ConnectionHandler`
pub fn handler(&self) -> &THandler {
&self.handler
}

/// Returns a mutable reference to the `ConnectionHandler`
pub fn handler_mut(&mut self) -> &mut THandler {
&mut self.handler
}

/// Notifies the connection handler of an event.
pub fn inject_event(&mut self, event: THandler::InEvent) {
self.handler.inject_event(event);
}

/// Returns `true` if the remote has shown any sign of activity
/// since the connection has been established.
///
/// See also [`StreamMuxer::is_remote_acknowledged`].
pub fn is_remote_acknowledged(&self) -> bool {
self.muxing.is_remote_acknowledged()
}

/// Begins an orderly shutdown of the connection, returning a
/// `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> Close<TMuxer> {
self.muxing.close().0
}

/// Polls the connection for events produced by the associated handler
/// as a result of I/O activity on the substream multiplexer.
pub fn poll<TTransErr>(mut self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Result<THandler::OutEvent, ConnectionError<THandler::Error, TTransErr>>>
{
loop {
let mut io_pending = false;

// Perform I/O on the connection through the muxer, informing the handler
// of new substreams.
match self.muxing.poll(cx) {
Poll::Pending => io_pending = true,
Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => {
self.handler.inject_substream(substream, SubstreamEndpoint::Listener)
}
Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { user_data, substream })) => {
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint)
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))),
}

// Poll the handler for new events.
match self.handler.poll(cx) {
Poll::Pending => {
if io_pending {
return Poll::Pending // Nothing to do
}
}
Poll::Ready(Ok(ConnectionHandlerEvent::OutboundSubstreamRequest(user_data))) => {
self.muxing.open_substream(user_data);
}
Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))) => {
return Poll::Ready(Ok(event));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))),
}
}
}
}

/// Borrowed information about an incoming connection currently being negotiated.
#[derive(Debug, Copy, Clone)]
pub struct IncomingInfo<'a> {
/// Local connection address.
pub local_addr: &'a Multiaddr,
/// Stack of protocols used to send back data to the remote.
pub send_back_addr: &'a Multiaddr,
}

impl<'a> IncomingInfo<'a> {
/// Builds the `ConnectedPoint` corresponding to the incoming connection.
pub fn to_connected_point(&self) -> ConnectedPoint {
ConnectedPoint::Listener {
local_addr: self.local_addr.clone(),
send_back_addr: self.send_back_addr.clone(),
}
}
}

/// Borrowed information about an outgoing connection currently being negotiated.
#[derive(Debug, Copy, Clone)]
pub struct OutgoingInfo<'a, TPeerId> {
pub address: &'a Multiaddr,
pub peer_id: Option<&'a TPeerId>,
}

impl<'a, TPeerId> OutgoingInfo<'a, TPeerId> {
/// Builds a `ConnectedPoint` corresponding to the outgoing connection.
pub fn to_connected_point(&self) -> ConnectedPoint {
ConnectedPoint::Dialer {
address: self.address.clone()
}
}
}

/// Information about a connection limit.
#[derive(Debug, Clone)]
pub struct ConnectionLimit {
/// The maximum number of connections.
pub limit: usize,
/// The current number of connections.
pub current: usize,
}

Loading