Skip to content

Commit

Permalink
Refine boxing during transport construction. (#1794)
Browse files Browse the repository at this point in the history
* Rework boxing during transport construction.

* Cleanup

* Fix chat-tokio example.

* Update changelogs and versions.
  • Loading branch information
romanb authored Oct 16, 2020
1 parent 2dbf834 commit dc56d44
Show file tree
Hide file tree
Showing 51 changed files with 357 additions and 319 deletions.
30 changes: 15 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,22 @@ atomic = "0.5.0"
bytes = "0.5"
futures = "0.3.1"
lazy_static = "1.2"
libp2p-core = { version = "0.22.2", path = "core" }
libp2p-core = { version = "0.23.0", path = "core" }
libp2p-core-derive = { version = "0.20.2", path = "misc/core-derive" }
libp2p-floodsub = { version = "0.23.0", path = "protocols/floodsub", optional = true }
libp2p-gossipsub = { version = "0.22.1", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.22.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.23.1", path = "protocols/kad", optional = true }
libp2p-gossipsub = { version = "0.23.0", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.23.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.24.0", path = "protocols/kad", optional = true }
libp2p-mplex = { version = "0.23.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.24.1", path = "protocols/noise", optional = true }
libp2p-ping = { version = "0.22.0", path = "protocols/ping", optional = true }
libp2p-noise = { version = "0.25.0", path = "protocols/noise", optional = true }
libp2p-ping = { version = "0.23.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.23.0", path = "protocols/plaintext", optional = true }
libp2p-pnet = { version = "0.19.2", path = "protocols/pnet", optional = true }
libp2p-request-response = { version = "0.4.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.22.1", path = "swarm" }
libp2p-uds = { version = "0.22.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.22.0", path = "transports/wasm-ext", optional = true }
libp2p-yamux = { version = "0.25.0", path = "muxers/yamux", optional = true }
libp2p-swarm = { version = "0.23.0", path = "swarm" }
libp2p-uds = { version = "0.23.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.23.0", path = "transports/wasm-ext", optional = true }
libp2p-yamux = { version = "0.26.0", path = "muxers/yamux", optional = true }
multiaddr = { package = "parity-multiaddr", version = "0.9.3", path = "misc/multiaddr" }
multihash = "0.11.0"
parking_lot = "0.11.0"
Expand All @@ -86,11 +86,11 @@ smallvec = "1.0"
wasm-timer = "0.2.4"

[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.22.0", path = "protocols/deflate", optional = true }
libp2p-dns = { version = "0.22.0", path = "transports/dns", optional = true }
libp2p-mdns = { version = "0.22.1", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.22.1", path = "transports/tcp", optional = true }
libp2p-websocket = { version = "0.23.1", path = "transports/websocket", optional = true }
libp2p-deflate = { version = "0.23.0", path = "protocols/deflate", optional = true }
libp2p-dns = { version = "0.23.0", path = "transports/dns", optional = true }
libp2p-mdns = { version = "0.23.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.23.0", path = "transports/tcp", optional = true }
libp2p-websocket = { version = "0.24.0", path = "transports/websocket", optional = true }

[dev-dependencies]
async-std = "1.6.2"
Expand Down
6 changes: 5 additions & 1 deletion core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# 0.22.2 [unreleased]
# 0.23.0 [unreleased]

- Rework transport boxing and move timeout configuration
to the transport builder.
[PR 1794](https://github.com/libp2p/rust-libp2p/pull/1794).

- Update dependencies.

Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-core"
edition = "2018"
description = "Core traits and structs of libp2p"
version = "0.22.2"
version = "0.23.0"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
46 changes: 5 additions & 41 deletions core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@
//! any desired protocols. The rest of the module defines combinators for
//! modifying a transport through composition with other transports or protocol upgrades.
use crate::{ConnectedPoint, ConnectionInfo, muxing::{StreamMuxer, StreamMuxerBox}};
use crate::ConnectedPoint;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{error::Error, fmt};
use std::time::Duration;

pub mod and_then;
pub mod choice;
Expand Down Expand Up @@ -129,24 +128,16 @@ pub trait Transport {
where
Self: Sized;

/// Boxes an authenticated, multiplexed transport, including the
/// `StreamMuxer` and transport errors.
fn boxed<I, M>(self) -> boxed::Boxed<(I, StreamMuxerBox), std::io::Error>
/// Boxes the transport, including custom transport errors.
fn boxed(self) -> boxed::Boxed<Self::Output>
where
Self: Transport<Output = (I, M)> + Sized + Clone + Send + Sync + 'static,
Self: Transport + Sized + Clone + Send + Sync + 'static,
Self::Dial: Send + 'static,
Self::Listener: Send + 'static,
Self::ListenerUpgrade: Send + 'static,
Self::Error: Send + Sync,
I: ConnectionInfo,
M: StreamMuxer + Send + Sync + 'static,
M::Substream: Send + 'static,
M::OutboundSubstream: Send + 'static

{
boxed::boxed(
self.map(|(i, m), _| (i, StreamMuxerBox::new(m)))
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)))
boxed::boxed(self)
}

/// Applies a function on the connections created by the transport.
Expand Down Expand Up @@ -198,33 +189,6 @@ pub trait Transport {
and_then::AndThen::new(self, f)
}

/// Adds a timeout to the connection setup (including upgrades) for all
/// inbound and outbound connections established through the transport.
fn timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
where
Self: Sized
{
timeout::TransportTimeout::new(self, timeout)
}

/// Adds a timeout to the connection setup (including upgrades) for all outbound
/// connections established through the transport.
fn outbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
where
Self: Sized
{
timeout::TransportTimeout::with_outgoing_timeout(self, timeout)
}

/// Adds a timeout to the connection setup (including upgrades) for all inbound
/// connections established through the transport.
fn inbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
where
Self: Sized
{
timeout::TransportTimeout::with_ingoing_timeout(self, timeout)
}

/// Begins a series of protocol upgrades via an
/// [`upgrade::Builder`](upgrade::Builder).
fn upgrade(self, version: upgrade::Version) -> upgrade::Builder<Self>
Expand Down
74 changes: 42 additions & 32 deletions core/src/transport/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
use crate::transport::{ListenerEvent, Transport, TransportError};
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{error, fmt, pin::Pin, sync::Arc};
use std::{error::Error, fmt, io, pin::Pin, sync::Arc};

/// See the `Transport::boxed` method.
pub fn boxed<T>(transport: T) -> Boxed<T::Output, T::Error>
/// Creates a new [`Boxed`] transport from the given transport.
pub fn boxed<T>(transport: T) -> Boxed<T::Output>
where
T: Transport + Clone + Send + Sync + 'static,
T::Error: Send + Sync,
T::Dial: Send + 'static,
T::Listener: Send + 'static,
T::ListenerUpgrade: Send + 'static,
Expand All @@ -36,64 +37,69 @@ where
}
}

/// See the `Transport::boxed` method.
pub struct Boxed<O, E> {
inner: Arc<dyn Abstract<O, E> + Send + Sync>,
/// A `Boxed` transport is a `Transport` whose `Dial`, `Listener`
/// and `ListenerUpgrade` futures are `Box`ed and only the `Output`
/// and `Error` types are captured in type variables.
pub struct Boxed<O> {
inner: Arc<dyn Abstract<O> + Send + Sync>,
}

type Dial<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
type Listener<O, E> = Pin<Box<dyn Stream<Item = Result<ListenerEvent<ListenerUpgrade<O, E>, E>, E>> + Send>>;
type ListenerUpgrade<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
type Dial<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
type Listener<O> = Pin<Box<dyn Stream<Item = io::Result<ListenerEvent<ListenerUpgrade<O>, io::Error>>> + Send>>;
type ListenerUpgrade<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;

trait Abstract<O, E> {
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O, E>, TransportError<E>>;
fn dial(&self, addr: Multiaddr) -> Result<Dial<O, E>, TransportError<E>>;
trait Abstract<O> {
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O>, TransportError<io::Error>>;
fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>;
}

impl<T, O, E> Abstract<O, E> for T
impl<T, O> Abstract<O> for T
where
T: Transport<Output = O, Error = E> + Clone + 'static,
E: error::Error,
T: Transport<Output = O> + Clone + 'static,
T::Error: Send + Sync,
T::Dial: Send + 'static,
T::Listener: Send + 'static,
T::ListenerUpgrade: Send + 'static,
{
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O, E>, TransportError<E>> {
let listener = Transport::listen_on(self.clone(), addr)?;
let fut = listener.map_ok(|event| event.map(|upgrade| {
Box::pin(upgrade) as ListenerUpgrade<O, E>
}));
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O>, TransportError<io::Error>> {
let listener = Transport::listen_on(self.clone(), addr).map_err(|e| e.map(box_err))?;
let fut = listener.map_ok(|event|
event.map(|upgrade| {
let up = upgrade.map_err(box_err);
Box::pin(up) as ListenerUpgrade<O>
}).map_err(box_err)
).map_err(box_err);
Ok(Box::pin(fut))
}

fn dial(&self, addr: Multiaddr) -> Result<Dial<O, E>, TransportError<E>> {
let fut = Transport::dial(self.clone(), addr)?;
Ok(Box::pin(fut) as Dial<_, _>)
fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>> {
let fut = Transport::dial(self.clone(), addr)
.map(|r| r.map_err(box_err))
.map_err(|e| e.map(box_err))?;
Ok(Box::pin(fut) as Dial<_>)
}
}

impl<O, E> fmt::Debug for Boxed<O, E> {
impl<O> fmt::Debug for Boxed<O> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "BoxedTransport")
}
}

impl<O, E> Clone for Boxed<O, E> {
impl<O> Clone for Boxed<O> {
fn clone(&self) -> Self {
Boxed {
inner: self.inner.clone(),
}
}
}

impl<O, E> Transport for Boxed<O, E>
where E: error::Error,
{
impl<O> Transport for Boxed<O> {
type Output = O;
type Error = E;
type Listener = Listener<O, E>;
type ListenerUpgrade = ListenerUpgrade<O, E>;
type Dial = Dial<O, E>;
type Error = io::Error;
type Listener = Listener<O>;
type ListenerUpgrade = ListenerUpgrade<O>;
type Dial = Dial<O>;

fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
self.inner.listen_on(addr)
Expand All @@ -103,3 +109,7 @@ where E: error::Error,
self.inner.dial(addr)
}
}

fn box_err<E: Error + Send + Sync + 'static>(e: E) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
Loading

0 comments on commit dc56d44

Please sign in to comment.