Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

{core,swarm}: Remove Network abstraction #2492

Merged
merged 10 commits into from
Feb 13, 2022
5 changes: 5 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# 0.32.0 [unreleased]

- Remove `Network`. `libp2p-core` is from now on an auxiliary crate only. Users
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting take (the "auxiliary" part).

My PoV is that libp2p-core is / should remain a standalone foundation that should make it possible to write wire-compatible libp2p applications. i.e. the entire connection upgrade process etc that is currently in there is pretty foundational. Perhaps we agree on this and "auxiliary" was just not the best word to use here but if we don't agree then I'd like to discuss it :D

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think we are on the same page. "Foundational" would have been a better word here.

that have previously used `Network` only, will need to use `Swarm` instead. See
[PR 2492].

- Update to `multiaddr` `v0.14.0`.

- Update to `multihash` `v0.16.0`.
Expand All @@ -10,6 +14,7 @@

[PR 2456]: https://github.com/libp2p/rust-libp2p/pull/2456
[RUSTSEC-2022-0009]: https://rustsec.org/advisories/RUSTSEC-2022-0009.html
[PR 2492]: https://github.com/libp2p/rust-libp2p/pull/2492

# 0.31.0 [2022-01-27]

Expand Down
4 changes: 2 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ criterion = "0.3"
libp2p-mplex = { path = "../muxers/mplex" }
libp2p-noise = { path = "../transports/noise" }
libp2p-tcp = { path = "../transports/tcp" }
serde_json = "1.0"
rmp-serde = "1.0"
multihash = { version = "0.16", default-features = false, features = ["arb"] }
quickcheck = "0.9.0"
rand07 = { package = "rand", version = "0.7" }
rmp-serde = "1.0"
serde_json = "1.0"

[build-dependencies]
prost-build = "0.9"
Expand Down
229 changes: 28 additions & 201 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

mod error;
pub(crate) mod handler;
mod listeners;
mod substream;

pub(crate) mod pool;

pub use error::{
ConnectionError, PendingConnectionError, PendingInboundConnectionError,
PendingOutboundConnectionError,
};
pub use handler::{ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler};
pub use listeners::{ListenerId, ListenersEvent, ListenersStream};
pub use pool::{ConnectionCounters, ConnectionLimits};
pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection};
pub use substream::{Close, Substream, SubstreamEndpoint};

use crate::multiaddr::{Multiaddr, Protocol};
use crate::muxing::StreamMuxer;
use crate::PeerId;
use std::hash::Hash;
use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
use substream::{Muxing, SubstreamEvent};

/// Connection identifier.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
Expand All @@ -53,7 +31,34 @@ impl ConnectionId {
/// in test environments. There is in general no guarantee
/// that all connection IDs are based on non-negative integers.
pub fn new(id: usize) -> Self {
ConnectionId(id)
Self(id)
}
}

impl std::ops::Add<usize> for ConnectionId {
type Output = Self;

fn add(self, other: usize) -> Self {
Self(self.0 + other)
}
}

/// The ID of a single listener.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ListenerId(u64);

impl ListenerId {
/// Creates a `ListenerId` from a non-negative integer.
pub fn new(id: u64) -> Self {
Self(id)
}
}

impl std::ops::Add<u64> for ListenerId {
type Output = Self;

fn add(self, other: u64) -> Self {
Self(self.0 + other)
}
}

Expand Down Expand Up @@ -236,181 +241,3 @@ impl ConnectedPoint {
}
}
}

/// Information about a successfully established connection.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Connected {
/// The connected endpoint, including network address information.
pub endpoint: ConnectedPoint,
/// Information obtained from the transport.
pub peer_id: PeerId,
}

/// Event generated by a [`Connection`].
#[derive(Debug, Clone)]
pub enum Event<T> {
/// Event generated by the [`ConnectionHandler`].
Handler(T),
/// Address of the remote has changed.
AddressChange(Multiaddr),
}

/// A multiplexed connection to a peer with an associated `ConnectionHandler`.
pub struct Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
/// Node that handles the muxing.
muxing: substream::Muxing<TMuxer, THandler::OutboundOpenInfo>,
/// Handler that processes substreams.
handler: THandler,
}

impl<TMuxer, THandler> fmt::Debug for Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection")
.field("muxing", &self.muxing)
.field("handler", &self.handler)
.finish()
}
}

impl<TMuxer, THandler> Unpin for Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
}

impl<TMuxer, THandler> Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
/// Builds a new `Connection` from the given substream multiplexer
/// and connection handler.
pub fn new(muxer: TMuxer, handler: THandler) -> Self {
Connection {
muxing: Muxing::new(muxer),
handler,
}
}

/// Returns a reference to the `ConnectionHandler`
pub fn handler(&self) -> &THandler {
&self.handler
}

/// Returns a mutable reference to the `ConnectionHandler`
pub fn handler_mut(&mut self) -> &mut THandler {
&mut self.handler
}

/// Notifies the connection handler of an event.
pub fn inject_event(&mut self, event: THandler::InEvent) {
self.handler.inject_event(event);
}

/// Begins an orderly shutdown of the connection, returning the connection
/// handler and a `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> (THandler, Close<TMuxer>) {
(self.handler, self.muxing.close().0)
}

/// Polls the connection for events produced by the associated handler
/// as a result of I/O activity on the substream multiplexer.
pub fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>> {
loop {
let mut io_pending = false;

// Perform I/O on the connection through the muxer, informing the handler
// of new substreams.
match self.muxing.poll(cx) {
Poll::Pending => io_pending = true,
Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => self
.handler
.inject_substream(substream, SubstreamEndpoint::Listener),
Poll::Ready(Ok(SubstreamEvent::OutboundSubstream {
user_data,
substream,
})) => {
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint)
}
Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => {
self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address)));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))),
}

// Poll the handler for new events.
match self.handler.poll(cx) {
Poll::Pending => {
if io_pending {
return Poll::Pending; // Nothing to do
}
}
Poll::Ready(Ok(ConnectionHandlerEvent::OutboundSubstreamRequest(user_data))) => {
self.muxing.open_substream(user_data);
}
Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))) => {
return Poll::Ready(Ok(Event::Handler(event)));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))),
}
}
}
}

/// Borrowed information about an incoming connection currently being negotiated.
#[derive(Debug, Copy, Clone)]
pub struct IncomingInfo<'a> {
/// Local connection address.
pub local_addr: &'a Multiaddr,
/// Address used to send back data to the remote.
pub send_back_addr: &'a Multiaddr,
}

impl<'a> IncomingInfo<'a> {
/// Builds the [`PendingPoint`] corresponding to the incoming connection.
pub fn to_pending_point(&self) -> PendingPoint {
PendingPoint::Listener {
local_addr: self.local_addr.clone(),
send_back_addr: self.send_back_addr.clone(),
}
}
/// Builds the [`ConnectedPoint`] corresponding to the incoming connection.
pub fn to_connected_point(&self) -> ConnectedPoint {
ConnectedPoint::Listener {
local_addr: self.local_addr.clone(),
send_back_addr: self.send_back_addr.clone(),
}
}
}

/// Information about a connection limit.
#[derive(Debug, Clone)]
pub struct ConnectionLimit {
/// The maximum number of connections.
pub limit: u32,
/// The current number of connections.
pub current: u32,
}

impl fmt::Display for ConnectionLimit {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", self.current, self.limit)
}
}

/// A `ConnectionLimit` can represent an error if it has been exceeded.
impl Error for ConnectionLimit {}
4 changes: 1 addition & 3 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,16 @@ pub mod connection;
pub mod either;
pub mod identity;
pub mod muxing;
pub mod network;
pub mod peer_record;
pub mod signed_envelope;
pub mod transport;
pub mod upgrade;

pub use connection::{Connected, ConnectedPoint, Endpoint};
pub use connection::{ConnectedPoint, Endpoint};
pub use identity::PublicKey;
pub use multiaddr::Multiaddr;
pub use multihash;
pub use muxing::StreamMuxer;
pub use network::{DialOpts, Network};
pub use peer_id::PeerId;
pub use peer_record::PeerRecord;
pub use signed_envelope::SignedEnvelope;
Expand Down
Loading