From dd537f6b8d85478878635ab640711265e0a55fee Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 26 Feb 2020 13:28:39 +1100 Subject: [PATCH] Replace the macro in libp2p-tcp with a generic, inner implementation To avoid code duplication, a generic inner implementation is used that does all of the heavy lifting. To abstract over the individual types, we create traits that are private to the module. Consumers of the crate shouldn't be concerned with this because a) none of the generic types are exported b) there are no type parameters in public interfaces --- Cargo.toml | 5 +- core/src/connection/listeners.rs | 2 +- core/tests/network_dial_error.rs | 10 +- examples/ipfs-private.rs | 2 +- muxers/mplex/tests/async_write.rs | 2 +- muxers/mplex/tests/two_peers.rs | 2 +- protocols/deflate/tests/test.rs | 2 +- protocols/identify/src/identify.rs | 2 +- protocols/identify/src/protocol.rs | 2 +- protocols/noise/src/lib.rs | 2 +- protocols/noise/tests/smoke.rs | 2 +- protocols/ping/tests/ping.rs | 2 +- protocols/secio/src/lib.rs | 2 +- src/lib.rs | 6 +- transports/tcp/Cargo.toml | 3 +- transports/tcp/src/async_std.rs | 232 +++++++++++ transports/tcp/src/internal.rs | 455 ++++++++++++++++++++ transports/tcp/src/lib.rs | 642 +---------------------------- transports/tcp/src/macros.rs | 100 +++++ transports/tcp/src/tokio.rs | 44 ++ transports/websocket/src/lib.rs | 2 +- 21 files changed, 866 insertions(+), 655 deletions(-) create mode 100644 transports/tcp/src/async_std.rs create mode 100644 transports/tcp/src/internal.rs create mode 100644 transports/tcp/src/macros.rs create mode 100644 transports/tcp/src/tokio.rs diff --git a/Cargo.toml b/Cargo.toml index c034745bd3d8..f68838fbce8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ default = [ "pnet", "secio", "secp256k1", - "tcp", + "tcp-async-std", "uds", "wasm-ext", "websocket", @@ -44,7 +44,8 @@ ping = ["libp2p-ping"] plaintext = ["libp2p-plaintext"] pnet = ["libp2p-pnet"] secio = ["libp2p-secio"] -tcp = ["libp2p-tcp"] +tcp-async-std = ["libp2p-tcp", "libp2p-tcp/async-std"] +tcp-tokio = ["libp2p-tcp", "libp2p-tcp/tokio"] uds = ["libp2p-uds"] wasm-ext = ["libp2p-wasm-ext"] websocket = ["libp2p-websocket"] diff --git a/core/src/connection/listeners.rs b/core/src/connection/listeners.rs index b905b44421d1..610f8f94bc10 100644 --- a/core/src/connection/listeners.rs +++ b/core/src/connection/listeners.rs @@ -45,7 +45,7 @@ use std::{collections::VecDeque, fmt, pin::Pin}; /// use futures::prelude::*; /// use libp2p_core::connection::{ListenersEvent, ListenersStream}; /// -/// let mut listeners = ListenersStream::new(libp2p_tcp::TcpConfig::new()); +/// let mut listeners = ListenersStream::new(libp2p_tcp::async_std::TcpConfig::new()); /// /// // Ask the `listeners` to start listening on the given multiaddress. /// listeners.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index c01cebddfa2a..1c33641a0cd3 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -45,7 +45,7 @@ fn deny_incoming_connec() { let mut swarm1 = { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); - let transport = libp2p_tcp::TcpConfig::new() + let transport = libp2p_tcp::async_std::TcpConfig::new() .upgrade(upgrade::Version::V1) .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()) @@ -56,7 +56,7 @@ fn deny_incoming_connec() { let mut swarm2 = { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); - let transport = libp2p_tcp::TcpConfig::new() + let transport = libp2p_tcp::async_std::TcpConfig::new() .upgrade(upgrade::Version::V1) .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()) @@ -122,7 +122,7 @@ fn dial_self() { let mut swarm = { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); - let transport = libp2p_tcp::TcpConfig::new() + let transport = libp2p_tcp::async_std::TcpConfig::new() .upgrade(upgrade::Version::V1) .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()) @@ -197,7 +197,7 @@ fn dial_self_by_id() { let mut swarm = { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); - let transport = libp2p_tcp::TcpConfig::new() + let transport = libp2p_tcp::async_std::TcpConfig::new() .upgrade(upgrade::Version::V1) .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()) @@ -216,7 +216,7 @@ fn multiple_addresses_err() { let mut swarm = { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); - let transport = libp2p_tcp::TcpConfig::new() + let transport = libp2p_tcp::async_std::TcpConfig::new() .upgrade(upgrade::Version::V1) .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()) diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index 9d3476853f05..c4c495e3775e 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -43,7 +43,7 @@ use libp2p::{ pnet::{PnetConfig, PreSharedKey}, secio::SecioConfig, swarm::NetworkBehaviourEventProcess, - tcp::TcpConfig, + tcp::async_std::TcpConfig, yamux::Config as YamuxConfig, Multiaddr, NetworkBehaviour, PeerId, Swarm, Transport, }; diff --git a/muxers/mplex/tests/async_write.rs b/muxers/mplex/tests/async_write.rs index e0b708e340f9..d64de9dbfc04 100644 --- a/muxers/mplex/tests/async_write.rs +++ b/muxers/mplex/tests/async_write.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use libp2p_core::{muxing, upgrade, Transport}; -use libp2p_tcp::TcpConfig; +use libp2p_tcp::async_std::TcpConfig; use futures::{prelude::*, channel::oneshot}; use std::sync::Arc; diff --git a/muxers/mplex/tests/two_peers.rs b/muxers/mplex/tests/two_peers.rs index 51293a37cfc5..44a1adf00545 100644 --- a/muxers/mplex/tests/two_peers.rs +++ b/muxers/mplex/tests/two_peers.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use libp2p_core::{muxing, upgrade, Transport}; -use libp2p_tcp::TcpConfig; +use libp2p_tcp::async_std::TcpConfig; use futures::{channel::oneshot, prelude::*}; use std::sync::Arc; diff --git a/protocols/deflate/tests/test.rs b/protocols/deflate/tests/test.rs index 896fb491349f..09718811558a 100644 --- a/protocols/deflate/tests/test.rs +++ b/protocols/deflate/tests/test.rs @@ -21,7 +21,7 @@ use futures::{future, prelude::*}; use libp2p_core::{transport::Transport, upgrade}; use libp2p_deflate::DeflateConfig; -use libp2p_tcp::TcpConfig; +use libp2p_tcp::async_std::TcpConfig; use quickcheck::{QuickCheck, RngCore, TestResult}; #[test] diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index fe78bc1b4bdb..c14a3c22567c 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -262,7 +262,7 @@ mod tests { Transport, upgrade }; - use libp2p_tcp::TcpConfig; + use libp2p_tcp::async_std::TcpConfig; use libp2p_secio::SecioConfig; use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_mplex::MplexConfig; diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 79f3bcdd0871..ef726e2e503d 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -207,7 +207,7 @@ fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result<(IdentifyInfo, Multiaddr), i #[cfg(test)] mod tests { use crate::protocol::{IdentifyInfo, RemoteInfo, IdentifyProtocolConfig}; - use libp2p_tcp::TcpConfig; + use libp2p_tcp::async_std::TcpConfig; use futures::{prelude::*, channel::oneshot}; use libp2p_core::{ identity, diff --git a/protocols/noise/src/lib.rs b/protocols/noise/src/lib.rs index da9794fc14eb..14c4800d00a1 100644 --- a/protocols/noise/src/lib.rs +++ b/protocols/noise/src/lib.rs @@ -37,7 +37,7 @@ //! //! ``` //! use libp2p_core::{identity, Transport, upgrade}; -//! use libp2p_tcp::TcpConfig; +//! use libp2p_tcp::async_std::TcpConfig; //! use libp2p_noise::{Keypair, X25519, NoiseConfig}; //! //! # fn main() { diff --git a/protocols/noise/tests/smoke.rs b/protocols/noise/tests/smoke.rs index 1ac04491a684..bbfc71768f80 100644 --- a/protocols/noise/tests/smoke.rs +++ b/protocols/noise/tests/smoke.rs @@ -23,7 +23,7 @@ use libp2p_core::identity; use libp2p_core::upgrade::{self, Negotiated, apply_inbound, apply_outbound}; use libp2p_core::transport::{Transport, ListenerEvent}; use libp2p_noise::{Keypair, X25519, NoiseConfig, RemoteIdentity, NoiseError, NoiseOutput}; -use libp2p_tcp::{TcpConfig, TcpTransStream}; +use libp2p_tcp::{async_std::TcpConfig, async_std::TcpTransStream}; use log::info; use quickcheck::QuickCheck; use std::{convert::TryInto, io}; diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 30e8de601ee0..f88deeaeecc8 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -31,7 +31,7 @@ use libp2p_core::{ use libp2p_ping::*; use libp2p_secio::SecioConfig; use libp2p_swarm::Swarm; -use libp2p_tcp::TcpConfig; +use libp2p_tcp::async_std::TcpConfig; use futures::{prelude::*, channel::mpsc}; use std::{io, time::Duration}; diff --git a/protocols/secio/src/lib.rs b/protocols/secio/src/lib.rs index 8c067f6b3cea..63cc6c9393d0 100644 --- a/protocols/secio/src/lib.rs +++ b/protocols/secio/src/lib.rs @@ -34,7 +34,7 @@ //! use libp2p_core::{PeerId, Multiaddr, identity, upgrade}; //! use libp2p_core::transport::Transport; //! use libp2p_mplex::MplexConfig; -//! use libp2p_tcp::TcpConfig; +//! use libp2p_tcp::async_std::TcpConfig; //! //! // Create a local peer identity. //! let local_keys = identity::Keypair::generate_ed25519(); diff --git a/src/lib.rs b/src/lib.rs index c907a2a73486..897e6233bbff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,7 +47,7 @@ //! Example (Dialing a TCP/IP multi-address): //! //! ```rust -//! use libp2p::{Multiaddr, Transport, tcp::TcpConfig}; +//! use libp2p::{Multiaddr, Transport, tcp::async_std::TcpConfig}; //! let tcp = TcpConfig::new(); //! let addr: Multiaddr = "/ip4/98.97.96.95/tcp/20500".parse().expect("invalid multiaddr"); //! let _conn = tcp.dial(addr); @@ -86,7 +86,7 @@ //! //! ```rust //! # #[cfg(all(not(any(target_os = "emscripten", target_os = "unknown")), feature = "tcp", feature = "secio", feature = "yamux"))] { -//! use libp2p::{Transport, core::upgrade, tcp::TcpConfig, secio::SecioConfig, identity::Keypair, yamux}; +//! use libp2p::{Transport, core::upgrade, tcp::async_std::TcpConfig, secio::SecioConfig, identity::Keypair, yamux}; //! let tcp = TcpConfig::new(); //! let secio = SecioConfig::new(Keypair::generate_ed25519()); //! let yamux = yamux::Config::default(); @@ -141,7 +141,7 @@ //! [`Keypair`]: identity::Keypair //! [`PublicKey`]: identity::PublicKey //! [`Future`]: futures::Future -//! [`TcpConfig`]: tcp::TcpConfig +//! [`TcpConfig`]: tcp::async_std::TcpConfig //! [`NetworkBehaviour`]: swarm::NetworkBehaviour //! [`StreamMuxer`]: core::muxing::StreamMuxer //! [`Yamux`]: yamux::Yamux diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index b04330daf63f..1166ae7ae8db 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -18,6 +18,7 @@ ipnet = "2.0.0" libp2p-core = { version = "0.16.0", path = "../../core" } log = "0.4.1" tokio = { version = "0.2", default-features = false, features = ["tcp"], optional = true } +async-trait = "0.1" [features] -default = ["async-std"] +default = [] diff --git a/transports/tcp/src/async_std.rs b/transports/tcp/src/async_std.rs new file mode 100644 index 000000000000..70d9aedfb866 --- /dev/null +++ b/transports/tcp/src/async_std.rs @@ -0,0 +1,232 @@ +use crate::internal; +use async_std::net::{TcpListener, TcpStream}; +use futures::{AsyncRead, AsyncWrite}; +use libp2p_core::{transport::TransportError, Multiaddr, Transport}; +use std::{ + io, + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, +}; + +pub type TcpTransStream = internal::TcpTransStream; + +/// Represents the configuration for a TCP/IP transport capability for libp2p. +/// +/// This implementation uses the network futures from async-std. +#[derive(Debug, Clone, Default)] +pub struct TcpConfig { + inner: internal::TcpConfig, +} + +impl TcpConfig { + /// Creates a new configuration object for TCP/IP. + pub fn new() -> Self { + Self::default() + } + + /// Sets the TTL to set for opened sockets. + pub fn ttl(mut self, value: u32) -> Self { + self.inner.ttl = Some(value); + self + } + + /// Sets the `TCP_NODELAY` to set for opened sockets. + pub fn nodelay(mut self, value: bool) -> Self { + self.inner.nodelay = Some(value); + self + } +} + +impl_transport!(TcpConfig, internal::TcpConfig); +impl_internal_traits!(TcpListener, TcpStream); +impl_async_read_and_write!(internal::TcpTransStream); + +#[cfg(test)] +mod tests { + use super::TcpConfig; + use crate::internal::multiaddr_to_socketaddr; + use futures::prelude::*; + use libp2p_core::{ + multiaddr::{Multiaddr, Protocol}, + transport::ListenerEvent, + Transport, + }; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + #[test] + fn wildcard_expansion() { + let mut listener = TcpConfig::new() + .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) + .expect("listener"); + + // Get the first address. + let addr = futures::executor::block_on_stream(listener.by_ref()) + .next() + .expect("some event") + .expect("no error") + .into_new_address() + .expect("listen address"); + + // Process all initial `NewAddress` events and make sure they + // do not contain wildcard address or port. + let server = listener + .take_while(|event| match event.as_ref().unwrap() { + ListenerEvent::NewAddress(a) => { + let mut iter = a.iter(); + match iter.next().expect("ip address") { + Protocol::Ip4(ip) => assert!(!ip.is_unspecified()), + Protocol::Ip6(ip) => assert!(!ip.is_unspecified()), + other => panic!("Unexpected protocol: {}", other), + } + if let Protocol::Tcp(port) = iter.next().expect("port") { + assert_ne!(0, port) + } else { + panic!("No TCP port in address: {}", a) + } + futures::future::ready(true) + } + _ => futures::future::ready(false), + }) + .for_each(|_| futures::future::ready(())); + + let client = TcpConfig::new().dial(addr).expect("dialer"); + async_std::task::block_on(futures::future::join(server, client)) + .1 + .unwrap(); + } + + #[test] + fn multiaddr_to_tcp_conversion() { + use std::net::Ipv6Addr; + + assert!( + multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::().unwrap()) + .is_err() + ); + + assert_eq!( + multiaddr_to_socketaddr(&"/ip4/127.0.0.1/tcp/12345".parse::().unwrap()), + Ok(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 12345, + )) + ); + assert_eq!( + multiaddr_to_socketaddr( + &"/ip4/255.255.255.255/tcp/8080" + .parse::() + .unwrap() + ), + Ok(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), + 8080, + )) + ); + assert_eq!( + multiaddr_to_socketaddr(&"/ip6/::1/tcp/12345".parse::().unwrap()), + Ok(SocketAddr::new( + IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), + 12345, + )) + ); + assert_eq!( + multiaddr_to_socketaddr( + &"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080" + .parse::() + .unwrap() + ), + Ok(SocketAddr::new( + IpAddr::V6(Ipv6Addr::new( + 65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535, + )), + 8080, + )) + ); + } + + #[test] + fn communicating_between_dialer_and_listener() { + let (ready_tx, ready_rx) = futures::channel::oneshot::channel(); + let mut ready_tx = Some(ready_tx); + + async_std::task::spawn(async move { + let addr = "/ip4/127.0.0.1/tcp/0".parse::().unwrap(); + let tcp = TcpConfig::new(); + let mut listener = tcp.listen_on(addr).unwrap(); + + loop { + match listener.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(listen_addr) => { + ready_tx.take().unwrap().send(listen_addr).unwrap(); + } + ListenerEvent::Upgrade { upgrade, .. } => { + let mut upgrade = upgrade.await.unwrap(); + let mut buf = [0u8; 3]; + upgrade.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [1, 2, 3]); + upgrade.write_all(&[4, 5, 6]).await.unwrap(); + } + _ => unreachable!(), + } + } + }); + + async_std::task::block_on(async move { + let addr = ready_rx.await.unwrap(); + let tcp = TcpConfig::new(); + + // Obtain a future socket through dialing + let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap(); + socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap(); + + let mut buf = [0u8; 3]; + socket.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [4, 5, 6]); + }); + } + + #[test] + fn replace_port_0_in_returned_multiaddr_ipv4() { + let tcp = TcpConfig::new(); + + let addr = "/ip4/127.0.0.1/tcp/0".parse::().unwrap(); + assert!(addr.to_string().contains("tcp/0")); + + let new_addr = futures::executor::block_on_stream(tcp.listen_on(addr).unwrap()) + .next() + .expect("some event") + .expect("no error") + .into_new_address() + .expect("listen address"); + + assert!(!new_addr.to_string().contains("tcp/0")); + } + + #[test] + fn replace_port_0_in_returned_multiaddr_ipv6() { + let tcp = TcpConfig::new(); + + let addr: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap(); + assert!(addr.to_string().contains("tcp/0")); + + let new_addr = futures::executor::block_on_stream(tcp.listen_on(addr).unwrap()) + .next() + .expect("some event") + .expect("no error") + .into_new_address() + .expect("listen address"); + + assert!(!new_addr.to_string().contains("tcp/0")); + } + + #[test] + fn larger_addr_denied() { + let tcp = TcpConfig::new(); + + let addr = "/ip4/127.0.0.1/tcp/12345/tcp/12345" + .parse::() + .unwrap(); + assert!(tcp.listen_on(addr).is_err()); + } +} diff --git a/transports/tcp/src/internal.rs b/transports/tcp/src/internal.rs new file mode 100644 index 000000000000..5ccd84392834 --- /dev/null +++ b/transports/tcp/src/internal.rs @@ -0,0 +1,455 @@ +use futures::{future, future::Ready, prelude::*, Stream, TryFutureExt}; +use futures_timer::Delay; +use get_if_addrs::{get_if_addrs, IfAddr}; +use ipnet::{IpNet, Ipv4Net, Ipv6Net}; +use libp2p_core::{ + multiaddr::Protocol, + transport::{ListenerEvent, TransportError}, + Multiaddr, Transport, +}; +use std::{ + collections::VecDeque, + future::Future, + io, iter, + iter::FromIterator, + marker::PhantomData, + net::{IpAddr, SocketAddr}, + pin::Pin, + time::Duration, +}; + +#[derive(Debug)] +pub struct TcpConfig { + pub sleep_on_error: Duration, + + pub ttl: Option, + + pub nodelay: Option, + + _bind: PhantomData, +} + +impl Default for TcpConfig { + fn default() -> Self { + Self { + sleep_on_error: Duration::from_millis(100), + ttl: Default::default(), + nodelay: Default::default(), + _bind: PhantomData, + } + } +} + +impl Clone for TcpConfig { + fn clone(&self) -> Self { + Self { + sleep_on_error: self.sleep_on_error.clone(), + ttl: self.ttl.clone(), + nodelay: self.nodelay.clone(), + _bind: PhantomData, + } + } +} + +#[async_trait::async_trait] +pub trait TcpListener: Sized + Send + Sync + 'static { + type TcpStream: TcpStream + Send; + + async fn bind(addrs: &SocketAddr) -> io::Result; + async fn accept(&mut self) -> io::Result<(Self::TcpStream, SocketAddr)>; + fn local_addr(&self) -> io::Result; +} + +#[async_trait::async_trait] +pub trait TcpStream: Sized { + async fn connect(addrs: &SocketAddr) -> io::Result; + fn set_ttl(&self, ttl: u32) -> io::Result<()>; + fn set_nodelay(&self, nodelay: bool) -> io::Result<()>; + fn peer_addr(&self) -> io::Result; + fn local_addr(&self) -> io::Result; +} + +impl Transport for TcpConfig +where + L: TcpListener, +{ + type Output = TcpTransStream; + type Error = io::Error; + type Listener = Pin< + Box< + dyn Stream< + Item = Result, Self::Error>, + > + Send, + >, + >; + type ListenerUpgrade = Ready>; + type Dial = + Pin, io::Error>> + Send>>; + + fn listen_on(self, addr: Multiaddr) -> Result> { + let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(&addr) { + sa + } else { + return Err(TransportError::MultiaddrNotSupported(addr)); + }; + + Ok(Box::pin(do_listen(self, socket_addr).try_flatten_stream())) + } + + fn dial(self, addr: Multiaddr) -> Result> { + let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { + if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { + log::debug!("Instantly refusing dialing {}, as it is invalid", addr); + return Err(TransportError::Other( + io::ErrorKind::ConnectionRefused.into(), + )); + } + socket_addr + } else { + return Err(TransportError::MultiaddrNotSupported(addr)); + }; + + log::debug!("Dialing {}", addr); + + Ok(Box::pin(do_dial(self, socket_addr))) + } +} + +async fn do_dial( + cfg: TcpConfig, + socket_addr: SocketAddr, +) -> Result, io::Error> +where + L: TcpListener, +{ + let stream = ::connect(&socket_addr).await?; + apply_config(&cfg, &stream)?; + Ok(TcpTransStream { inner: stream }) +} + +async fn do_listen( + cfg: TcpConfig, + socket_addr: SocketAddr, +) -> Result< + impl Stream< + Item = Result< + ListenerEvent, io::Error>>, io::Error>, + io::Error, + >, + > + Send, + io::Error, +> +where + L: TcpListener, +{ + let listener = L::bind(&socket_addr).await?; + let local_addr = listener.local_addr()?; + let port = local_addr.port(); + + let addrs = if socket_addr.ip().is_unspecified() { + let addrs = host_addresses(port)?; + log::debug!( + "Listening on {:?}", + addrs.iter().map(|(_, _, ma)| ma).collect::>() + ); + Addresses::Many(addrs) + } else { + let ma = ip_to_multiaddr(local_addr.ip(), port); + log::debug!("Listening on {:?}", ma); + Addresses::One(ma) + }; + + let pending = match addrs { + Addresses::One(ref ma) => { + let event = ListenerEvent::NewAddress(ma.clone()); + let mut list = VecDeque::new(); + list.push_back(Ok(event)); + list + } + Addresses::Many(ref aa) => aa + .iter() + .map(|(_, _, ma)| ma) + .cloned() + .map(ListenerEvent::NewAddress) + .map(Result::Ok) + .collect::>(), + }; + + let listen_stream = TcpListenStream { + stream: listener, + pause: None, + pause_duration: cfg.sleep_on_error, + port, + addrs, + pending, + config: cfg, + }; + + Ok(stream::unfold(listen_stream, |s| s.next().map(Some))) +} + +pub struct TcpListenStream { + stream: L, + + pause: Option, + + pause_duration: Duration, + + port: u16, + + addrs: Addresses, + + pending: Buffer, + + config: TcpConfig, +} + +impl TcpListenStream> +where + L: TcpListener, +{ + async fn next( + mut self, + ) -> ( + Result< + ListenerEvent, io::Error>>, io::Error>, + io::Error, + >, + Self, + ) { + loop { + if let Some(event) = self.pending.pop_front() { + return (event, self); + } + + if let Some(pause) = self.pause.take() { + let _ = pause.await; + } + + let (sock, _) = match self.stream.accept().await { + Ok(s) => s, + Err(e) => { + log::debug!("error accepting incoming connection: {}", e); + self.pause = Some(Delay::new(self.pause_duration)); + return (Ok(ListenerEvent::Error(e)), self); + } + }; + + let sock_addr = match sock.peer_addr() { + Ok(addr) => addr, + Err(err) => { + log::debug!("Failed to get peer address: {:?}", err); + continue; + } + }; + + let local_addr = match sock.local_addr() { + Ok(sock_addr) => { + if let Addresses::Many(ref mut addrs) = self.addrs { + if let Err(err) = check_for_interface_changes( + &sock_addr, + self.port, + addrs, + &mut self.pending, + ) { + return (Ok(ListenerEvent::Error(err)), self); + } + } + ip_to_multiaddr(sock_addr.ip(), sock_addr.port()) + } + Err(err) => { + log::debug!("Failed to get local address of incoming socket: {:?}", err); + continue; + } + }; + + let remote_addr = ip_to_multiaddr(sock_addr.ip(), sock_addr.port()); + + match apply_config(&self.config, &sock) { + Ok(()) => { + log::trace!("Incoming connection from {} at {}", remote_addr, local_addr); + self.pending.push_back(Ok(ListenerEvent::Upgrade { + upgrade: future::ok(TcpTransStream { inner: sock }), + local_addr, + remote_addr, + })) + } + Err(err) => { + log::debug!( + "Error upgrading incoming connection from {}: {:?}", + remote_addr, + err + ); + self.pending.push_back(Ok(ListenerEvent::Upgrade { + upgrade: future::err(err), + local_addr, + remote_addr, + })) + } + } + } + } +} + +#[derive(Debug)] +pub struct TcpTransStream +where + I: TcpStream, +{ + pub inner: I, +} + +impl Drop for TcpTransStream +where + I: TcpStream, +{ + fn drop(&mut self) { + if let Ok(addr) = self.inner.peer_addr() { + log::debug!("Dropped TCP connection to {:?}", addr); + } else { + log::debug!("Dropped TCP connection to undeterminate peer"); + } + } +} + +fn apply_config(config: &TcpConfig, socket: &L::TcpStream) -> Result<(), io::Error> +where + L: TcpListener, +{ + if let Some(ttl) = config.ttl { + socket.set_ttl(ttl)?; + } + + if let Some(nodelay) = config.nodelay { + socket.set_nodelay(nodelay)?; + } + + Ok(()) +} + +// Create a [`Multiaddr`] from the given IP address and port number. +fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr { + let proto = match ip { + IpAddr::V4(ip) => Protocol::Ip4(ip), + IpAddr::V6(ip) => Protocol::Ip6(ip), + }; + let it = iter::once(proto).chain(iter::once(Protocol::Tcp(port))); + Multiaddr::from_iter(it) +} + +// Collect all local host addresses and use the provided port number as listen port. +fn host_addresses(port: u16) -> io::Result> { + let mut addrs = Vec::new(); + for iface in get_if_addrs()? { + let ip = iface.ip(); + let ma = ip_to_multiaddr(ip, port); + let ipn = match iface.addr { + IfAddr::V4(ip4) => { + let prefix_len = (!u32::from_be_bytes(ip4.netmask.octets())).leading_zeros(); + let ipnet = Ipv4Net::new(ip4.ip, prefix_len as u8) + .expect("prefix_len is the number of bits in a u32, so can not exceed 32"); + IpNet::V4(ipnet) + } + IfAddr::V6(ip6) => { + let prefix_len = (!u128::from_be_bytes(ip6.netmask.octets())).leading_zeros(); + let ipnet = Ipv6Net::new(ip6.ip, prefix_len as u8) + .expect("prefix_len is the number of bits in a u128, so can not exceed 128"); + IpNet::V6(ipnet) + } + }; + addrs.push((ip, ipn, ma)) + } + Ok(addrs) +} + +/// Listen address information. +#[derive(Debug)] +enum Addresses { + /// A specific address is used to listen. + One(Multiaddr), + /// A set of addresses is used to listen. + Many(Vec<(IpAddr, IpNet, Multiaddr)>), +} + +type Buffer = VecDeque>, io::Error>, io::Error>>; + +// If we listen on all interfaces, find out to which interface the given +// socket address belongs. In case we think the address is new, check +// all host interfaces again and report new and expired listen addresses. +fn check_for_interface_changes( + socket_addr: &SocketAddr, + listen_port: u16, + listen_addrs: &mut Vec<(IpAddr, IpNet, Multiaddr)>, + pending: &mut Buffer, +) -> Result<(), io::Error> { + // Check for exact match: + if listen_addrs + .iter() + .find(|(ip, ..)| ip == &socket_addr.ip()) + .is_some() + { + return Ok(()); + } + + // No exact match => check netmask + if listen_addrs + .iter() + .find(|(_, net, _)| net.contains(&socket_addr.ip())) + .is_some() + { + return Ok(()); + } + + // The local IP address of this socket is new to us. + // We check for changes in the set of host addresses and report new + // and expired addresses. + // + // TODO: We do not detect expired addresses unless there is a new address. + let old_listen_addrs = std::mem::replace(listen_addrs, host_addresses(listen_port)?); + + // Check for addresses no longer in use. + for (ip, _, ma) in old_listen_addrs.iter() { + if listen_addrs.iter().find(|(i, ..)| i == ip).is_none() { + log::debug!("Expired listen address: {}", ma); + pending.push_back(Ok(ListenerEvent::AddressExpired(ma.clone()))); + } + } + + // Check for new addresses. + for (ip, _, ma) in listen_addrs.iter() { + if old_listen_addrs.iter().find(|(i, ..)| i == ip).is_none() { + log::debug!("New listen address: {}", ma); + pending.push_back(Ok(ListenerEvent::NewAddress(ma.clone()))); + } + } + + // We should now be able to find the local address, if not something + // is seriously wrong and we report an error. + if listen_addrs + .iter() + .find(|(ip, net, _)| ip == &socket_addr.ip() || net.contains(&socket_addr.ip())) + .is_none() + { + let msg = format!("{} does not match any listen address", socket_addr.ip()); + return Err(io::Error::new(io::ErrorKind::Other, msg)); + } + + Ok(()) +} + +// This type of logic should probably be moved into the multiaddr package +pub fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result { + let mut iter = addr.iter(); + let proto1 = iter.next().ok_or(())?; + let proto2 = iter.next().ok_or(())?; + + if iter.next().is_some() { + return Err(()); + } + + match (proto1, proto2) { + (Protocol::Ip4(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)), + (Protocol::Ip6(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)), + _ => Err(()), + } +} diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index aaa70f6ed1bf..5f55205eead0 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -22,642 +22,20 @@ //! //! # Usage //! -//! This crate provides two structs, `TcpConfig` and `TokioTcpConfig`, depending on which +//! This crate provides two structs, `async_std::TcpConfig` and `tokio::TcpConfig`, depending on which //! features are enabled. //! -//! Both the `TcpConfig` and `TokioTcpConfig` structs implement the `Transport` trait of the -//! `core` library. See the documentation of `core` and of libp2p in general to learn how to -//! use the `Transport` trait. - -use futures::{future::{self, Ready}, prelude::*}; -use futures_timer::Delay; -use get_if_addrs::{IfAddr, get_if_addrs}; -use ipnet::{IpNet, Ipv4Net, Ipv6Net}; -use libp2p_core::{ - Transport, - multiaddr::{Protocol, Multiaddr}, - transport::{ListenerEvent, TransportError} -}; -use log::{debug, trace}; -use std::{ - collections::VecDeque, - io, - iter::{self, FromIterator}, - net::{IpAddr, SocketAddr}, - pin::Pin, - task::{Context, Poll}, - time::Duration -}; - -macro_rules! codegen { - ($feature_name:expr, $tcp_config:ident, $tcp_trans_stream:ident, $tcp_listen_stream:ident, $apply_config:ident, $tcp_stream:ty, $tcp_listener:ty) => { - -/// Represents the configuration for a TCP/IP transport capability for libp2p. -/// -/// The TCP sockets created by libp2p will need to be progressed by running the futures and streams -/// obtained by libp2p through the tokio reactor. -#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))] -#[derive(Debug, Clone, Default)] -pub struct $tcp_config { - /// How long a listener should sleep after receiving an error, before trying again. - sleep_on_error: Duration, - /// TTL to set for opened sockets, or `None` to keep default. - ttl: Option, - /// `TCP_NODELAY` to set for opened sockets, or `None` to keep default. - nodelay: Option, -} - -impl $tcp_config { - /// Creates a new configuration object for TCP/IP. - pub fn new() -> $tcp_config { - $tcp_config { - sleep_on_error: Duration::from_millis(100), - ttl: None, - nodelay: None, - } - } - - /// Sets the TTL to set for opened sockets. - pub fn ttl(mut self, value: u32) -> Self { - self.ttl = Some(value); - self - } - - /// Sets the `TCP_NODELAY` to set for opened sockets. - pub fn nodelay(mut self, value: bool) -> Self { - self.nodelay = Some(value); - self - } -} - -impl Transport for $tcp_config { - type Output = $tcp_trans_stream; - type Error = io::Error; - type Listener = Pin, Self::Error>> + Send>>; - type ListenerUpgrade = Ready>; - type Dial = Pin> + Send>>; - - fn listen_on(self, addr: Multiaddr) -> Result> { - let socket_addr = - if let Ok(sa) = multiaddr_to_socketaddr(&addr) { - sa - } else { - return Err(TransportError::MultiaddrNotSupported(addr)) - }; - - async fn do_listen(cfg: $tcp_config, socket_addr: SocketAddr) - -> Result>, io::Error>, io::Error>>, io::Error> - { - let listener = <$tcp_listener>::bind(&socket_addr).await?; - let local_addr = listener.local_addr()?; - let port = local_addr.port(); - - // Determine all our listen addresses which is either a single local IP address - // or (if a wildcard IP address was used) the addresses of all our interfaces, - // as reported by `get_if_addrs`. - let addrs = - if socket_addr.ip().is_unspecified() { - let addrs = host_addresses(port)?; - debug!("Listening on {:?}", addrs.iter().map(|(_, _, ma)| ma).collect::>()); - Addresses::Many(addrs) - } else { - let ma = ip_to_multiaddr(local_addr.ip(), port); - debug!("Listening on {:?}", ma); - Addresses::One(ma) - }; - - // Generate `NewAddress` events for each new `Multiaddr`. - let pending = match addrs { - Addresses::One(ref ma) => { - let event = ListenerEvent::NewAddress(ma.clone()); - let mut list = VecDeque::new(); - list.push_back(Ok(event)); - list - } - Addresses::Many(ref aa) => { - aa.iter() - .map(|(_, _, ma)| ma) - .cloned() - .map(ListenerEvent::NewAddress) - .map(Result::Ok) - .collect::>() - } - }; - - let listen_stream = $tcp_listen_stream { - stream: listener, - pause: None, - pause_duration: cfg.sleep_on_error, - port, - addrs, - pending, - config: cfg - }; - - Ok(stream::unfold(listen_stream, |s| s.next().map(Some))) - } - - Ok(Box::pin(do_listen(self, socket_addr).try_flatten_stream())) - } - - fn dial(self, addr: Multiaddr) -> Result> { - let socket_addr = - if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { - if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { - debug!("Instantly refusing dialing {}, as it is invalid", addr); - return Err(TransportError::Other(io::ErrorKind::ConnectionRefused.into())) - } - socket_addr - } else { - return Err(TransportError::MultiaddrNotSupported(addr)) - }; - - debug!("Dialing {}", addr); - - async fn do_dial(cfg: $tcp_config, socket_addr: SocketAddr) -> Result<$tcp_trans_stream, io::Error> { - let stream = <$tcp_stream>::connect(&socket_addr).await?; - $apply_config(&cfg, &stream)?; - Ok($tcp_trans_stream { inner: stream }) - } - - Ok(Box::pin(do_dial(self, socket_addr))) - } -} - -/// Stream that listens on an TCP/IP address. -#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))] -pub struct $tcp_listen_stream { - /// The incoming connections. - stream: $tcp_listener, - /// The current pause if any. - pause: Option, - /// How long to pause after an error. - pause_duration: Duration, - /// The port which we use as our listen port in listener event addresses. - port: u16, - /// The set of known addresses. - addrs: Addresses, - /// Temporary buffer of listener events. - pending: Buffer<$tcp_trans_stream>, - /// Original configuration. - config: $tcp_config -} - -impl $tcp_listen_stream { - /// Takes ownership of the listener, and returns the next incoming event and the listener. - async fn next(mut self) -> (Result>, io::Error>, io::Error>, Self) { - loop { - if let Some(event) = self.pending.pop_front() { - return (event, self); - } - - if let Some(pause) = self.pause.take() { - let _ = pause.await; - } - - // TODO: do we get the peer_addr at the same time? - let (sock, _) = match self.stream.accept().await { - Ok(s) => s, - Err(e) => { - debug!("error accepting incoming connection: {}", e); - self.pause = Some(Delay::new(self.pause_duration)); - return (Ok(ListenerEvent::Error(e)), self); - } - }; - - let sock_addr = match sock.peer_addr() { - Ok(addr) => addr, - Err(err) => { - debug!("Failed to get peer address: {:?}", err); - continue - } - }; - - let local_addr = match sock.local_addr() { - Ok(sock_addr) => { - if let Addresses::Many(ref mut addrs) = self.addrs { - if let Err(err) = check_for_interface_changes(&sock_addr, self.port, addrs, &mut self.pending) { - return (Ok(ListenerEvent::Error(err)), self); - } - } - ip_to_multiaddr(sock_addr.ip(), sock_addr.port()) - } - Err(err) => { - debug!("Failed to get local address of incoming socket: {:?}", err); - continue - } - }; - - let remote_addr = ip_to_multiaddr(sock_addr.ip(), sock_addr.port()); - - match $apply_config(&self.config, &sock) { - Ok(()) => { - trace!("Incoming connection from {} at {}", remote_addr, local_addr); - self.pending.push_back(Ok(ListenerEvent::Upgrade { - upgrade: future::ok($tcp_trans_stream { inner: sock }), - local_addr, - remote_addr - })) - } - Err(err) => { - debug!("Error upgrading incoming connection from {}: {:?}", remote_addr, err); - self.pending.push_back(Ok(ListenerEvent::Upgrade { - upgrade: future::err(err), - local_addr, - remote_addr - })) - } - } - } - } -} - -/// Wraps around a `TcpStream` and adds logging for important events. -#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))] -#[derive(Debug)] -pub struct $tcp_trans_stream { - inner: $tcp_stream, -} - -impl Drop for $tcp_trans_stream { - fn drop(&mut self) { - if let Ok(addr) = self.inner.peer_addr() { - debug!("Dropped TCP connection to {:?}", addr); - } else { - debug!("Dropped TCP connection to undeterminate peer"); - } - } -} - -/// Applies the socket configuration parameters to a socket. -fn $apply_config(config: &$tcp_config, socket: &$tcp_stream) -> Result<(), io::Error> { - if let Some(ttl) = config.ttl { - socket.set_ttl(ttl)?; - } - - if let Some(nodelay) = config.nodelay { - socket.set_nodelay(nodelay)?; - } - - Ok(()) -} - -}; -} - -#[cfg(feature = "async-std")] -codegen!("async-std", TcpConfig, TcpTransStream, TcpListenStream, apply_config_async_std, async_std::net::TcpStream, async_std::net::TcpListener); - -#[cfg(feature = "tokio")] -codegen!("tokio", TokioTcpConfig, TokioTcpTransStream, TokioTcpListenStream, apply_config_tokio, tokio::net::TcpStream, tokio::net::TcpListener); +//! Both structs structs implement the `Transport` trait of the `core` library. +//! See the documentation of `core` and of libp2p in general to learn how to use the `Transport` +//! trait. +//! -#[cfg(feature = "async-std")] -impl AsyncRead for TcpTransStream { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { - AsyncRead::poll_read(Pin::new(&mut self.inner), cx, buf) - } -} +mod internal; +#[macro_use] +mod macros; #[cfg(feature = "async-std")] -impl AsyncWrite for TcpTransStream { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - AsyncWrite::poll_write(Pin::new(&mut self.inner), cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - AsyncWrite::poll_flush(Pin::new(&mut self.inner), cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - AsyncWrite::poll_close(Pin::new(&mut self.inner), cx) - } -} - -#[cfg(feature = "tokio")] -impl AsyncRead for TokioTcpTransStream { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { - tokio::io::AsyncRead::poll_read(Pin::new(&mut self.inner), cx, buf) - } -} +pub mod async_std; #[cfg(feature = "tokio")] -impl AsyncWrite for TokioTcpTransStream { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - tokio::io::AsyncWrite::poll_write(Pin::new(&mut self.inner), cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - tokio::io::AsyncWrite::poll_flush(Pin::new(&mut self.inner), cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - tokio::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.inner), cx) - } -} - -// This type of logic should probably be moved into the multiaddr package -fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result { - let mut iter = addr.iter(); - let proto1 = iter.next().ok_or(())?; - let proto2 = iter.next().ok_or(())?; - - if iter.next().is_some() { - return Err(()); - } - - match (proto1, proto2) { - (Protocol::Ip4(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)), - (Protocol::Ip6(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)), - _ => Err(()), - } -} - -// Create a [`Multiaddr`] from the given IP address and port number. -fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr { - let proto = match ip { - IpAddr::V4(ip) => Protocol::Ip4(ip), - IpAddr::V6(ip) => Protocol::Ip6(ip) - }; - let it = iter::once(proto).chain(iter::once(Protocol::Tcp(port))); - Multiaddr::from_iter(it) -} - -// Collect all local host addresses and use the provided port number as listen port. -fn host_addresses(port: u16) -> io::Result> { - let mut addrs = Vec::new(); - for iface in get_if_addrs()? { - let ip = iface.ip(); - let ma = ip_to_multiaddr(ip, port); - let ipn = match iface.addr { - IfAddr::V4(ip4) => { - let prefix_len = (!u32::from_be_bytes(ip4.netmask.octets())).leading_zeros(); - let ipnet = Ipv4Net::new(ip4.ip, prefix_len as u8) - .expect("prefix_len is the number of bits in a u32, so can not exceed 32"); - IpNet::V4(ipnet) - } - IfAddr::V6(ip6) => { - let prefix_len = (!u128::from_be_bytes(ip6.netmask.octets())).leading_zeros(); - let ipnet = Ipv6Net::new(ip6.ip, prefix_len as u8) - .expect("prefix_len is the number of bits in a u128, so can not exceed 128"); - IpNet::V6(ipnet) - } - }; - addrs.push((ip, ipn, ma)) - } - Ok(addrs) -} - -/// Listen address information. -#[derive(Debug)] -enum Addresses { - /// A specific address is used to listen. - One(Multiaddr), - /// A set of addresses is used to listen. - Many(Vec<(IpAddr, IpNet, Multiaddr)>) -} - -type Buffer = VecDeque>, io::Error>, io::Error>>; - -// If we listen on all interfaces, find out to which interface the given -// socket address belongs. In case we think the address is new, check -// all host interfaces again and report new and expired listen addresses. -fn check_for_interface_changes( - socket_addr: &SocketAddr, - listen_port: u16, - listen_addrs: &mut Vec<(IpAddr, IpNet, Multiaddr)>, - pending: &mut Buffer -) -> Result<(), io::Error> { - // Check for exact match: - if listen_addrs.iter().find(|(ip, ..)| ip == &socket_addr.ip()).is_some() { - return Ok(()) - } - - // No exact match => check netmask - if listen_addrs.iter().find(|(_, net, _)| net.contains(&socket_addr.ip())).is_some() { - return Ok(()) - } - - // The local IP address of this socket is new to us. - // We check for changes in the set of host addresses and report new - // and expired addresses. - // - // TODO: We do not detect expired addresses unless there is a new address. - let old_listen_addrs = std::mem::replace(listen_addrs, host_addresses(listen_port)?); - - // Check for addresses no longer in use. - for (ip, _, ma) in old_listen_addrs.iter() { - if listen_addrs.iter().find(|(i, ..)| i == ip).is_none() { - debug!("Expired listen address: {}", ma); - pending.push_back(Ok(ListenerEvent::AddressExpired(ma.clone()))); - } - } - - // Check for new addresses. - for (ip, _, ma) in listen_addrs.iter() { - if old_listen_addrs.iter().find(|(i, ..)| i == ip).is_none() { - debug!("New listen address: {}", ma); - pending.push_back(Ok(ListenerEvent::NewAddress(ma.clone()))); - } - } - - // We should now be able to find the local address, if not something - // is seriously wrong and we report an error. - if listen_addrs.iter() - .find(|(ip, net, _)| ip == &socket_addr.ip() || net.contains(&socket_addr.ip())) - .is_none() - { - let msg = format!("{} does not match any listen address", socket_addr.ip()); - return Err(io::Error::new(io::ErrorKind::Other, msg)) - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - use futures::prelude::*; - use libp2p_core::{Transport, multiaddr::{Multiaddr, Protocol}, transport::ListenerEvent}; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use super::multiaddr_to_socketaddr; - #[cfg(feature = "async-std")] - use super::TcpConfig; - - #[test] - #[cfg(feature = "async-std")] - fn wildcard_expansion() { - let mut listener = TcpConfig::new() - .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) - .expect("listener"); - - // Get the first address. - let addr = futures::executor::block_on_stream(listener.by_ref()) - .next() - .expect("some event") - .expect("no error") - .into_new_address() - .expect("listen address"); - - // Process all initial `NewAddress` events and make sure they - // do not contain wildcard address or port. - let server = listener - .take_while(|event| match event.as_ref().unwrap() { - ListenerEvent::NewAddress(a) => { - let mut iter = a.iter(); - match iter.next().expect("ip address") { - Protocol::Ip4(ip) => assert!(!ip.is_unspecified()), - Protocol::Ip6(ip) => assert!(!ip.is_unspecified()), - other => panic!("Unexpected protocol: {}", other) - } - if let Protocol::Tcp(port) = iter.next().expect("port") { - assert_ne!(0, port) - } else { - panic!("No TCP port in address: {}", a) - } - futures::future::ready(true) - } - _ => futures::future::ready(false) - }) - .for_each(|_| futures::future::ready(())); - - let client = TcpConfig::new().dial(addr).expect("dialer"); - async_std::task::block_on(futures::future::join(server, client)).1.unwrap(); - } - - #[test] - fn multiaddr_to_tcp_conversion() { - use std::net::Ipv6Addr; - - assert!( - multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::().unwrap()) - .is_err() - ); - - assert_eq!( - multiaddr_to_socketaddr(&"/ip4/127.0.0.1/tcp/12345".parse::().unwrap()), - Ok(SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - 12345, - )) - ); - assert_eq!( - multiaddr_to_socketaddr( - &"/ip4/255.255.255.255/tcp/8080" - .parse::() - .unwrap() - ), - Ok(SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), - 8080, - )) - ); - assert_eq!( - multiaddr_to_socketaddr(&"/ip6/::1/tcp/12345".parse::().unwrap()), - Ok(SocketAddr::new( - IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), - 12345, - )) - ); - assert_eq!( - multiaddr_to_socketaddr( - &"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080" - .parse::() - .unwrap() - ), - Ok(SocketAddr::new( - IpAddr::V6(Ipv6Addr::new( - 65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535, - )), - 8080, - )) - ); - } - - #[test] - #[cfg(feature = "async-std")] - fn communicating_between_dialer_and_listener() { - let (ready_tx, ready_rx) = futures::channel::oneshot::channel(); - let mut ready_tx = Some(ready_tx); - - async_std::task::spawn(async move { - let addr = "/ip4/127.0.0.1/tcp/0".parse::().unwrap(); - let tcp = TcpConfig::new(); - let mut listener = tcp.listen_on(addr).unwrap(); - - loop { - match listener.next().await.unwrap().unwrap() { - ListenerEvent::NewAddress(listen_addr) => { - ready_tx.take().unwrap().send(listen_addr).unwrap(); - }, - ListenerEvent::Upgrade { upgrade, .. } => { - let mut upgrade = upgrade.await.unwrap(); - let mut buf = [0u8; 3]; - upgrade.read_exact(&mut buf).await.unwrap(); - assert_eq!(buf, [1, 2, 3]); - upgrade.write_all(&[4, 5, 6]).await.unwrap(); - }, - _ => unreachable!() - } - } - }); - - async_std::task::block_on(async move { - let addr = ready_rx.await.unwrap(); - let tcp = TcpConfig::new(); - - // Obtain a future socket through dialing - let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap(); - socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap(); - - let mut buf = [0u8; 3]; - socket.read_exact(&mut buf).await.unwrap(); - assert_eq!(buf, [4, 5, 6]); - }); - } - - #[test] - #[cfg(feature = "async-std")] - fn replace_port_0_in_returned_multiaddr_ipv4() { - let tcp = TcpConfig::new(); - - let addr = "/ip4/127.0.0.1/tcp/0".parse::().unwrap(); - assert!(addr.to_string().contains("tcp/0")); - - let new_addr = futures::executor::block_on_stream(tcp.listen_on(addr).unwrap()) - .next() - .expect("some event") - .expect("no error") - .into_new_address() - .expect("listen address"); - - assert!(!new_addr.to_string().contains("tcp/0")); - } - - #[test] - #[cfg(feature = "async-std")] - fn replace_port_0_in_returned_multiaddr_ipv6() { - let tcp = TcpConfig::new(); - - let addr: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap(); - assert!(addr.to_string().contains("tcp/0")); - - let new_addr = futures::executor::block_on_stream(tcp.listen_on(addr).unwrap()) - .next() - .expect("some event") - .expect("no error") - .into_new_address() - .expect("listen address"); - - assert!(!new_addr.to_string().contains("tcp/0")); - } - - #[test] - #[cfg(feature = "async-std")] - fn larger_addr_denied() { - let tcp = TcpConfig::new(); - - let addr = "/ip4/127.0.0.1/tcp/12345/tcp/12345" - .parse::() - .unwrap(); - assert!(tcp.listen_on(addr).is_err()); - } -} +pub mod tokio; diff --git a/transports/tcp/src/macros.rs b/transports/tcp/src/macros.rs new file mode 100644 index 000000000000..7b118a64089d --- /dev/null +++ b/transports/tcp/src/macros.rs @@ -0,0 +1,100 @@ +macro_rules! impl_transport { + ($config:ty,$inner:ty) => { + impl Transport for $config { + type Output = <$inner as Transport>::Output; + type Error = <$inner as Transport>::Error; + type Listener = <$inner as Transport>::Listener; + type ListenerUpgrade = <$inner as Transport>::ListenerUpgrade; + type Dial = <$inner as Transport>::Dial; + + fn listen_on( + self, + addr: Multiaddr, + ) -> Result> { + self.inner.listen_on(addr) + } + + fn dial(self, addr: Multiaddr) -> Result> { + self.inner.dial(addr) + } + } + }; +} + +macro_rules! impl_async_read_and_write { + ($target:ty) => { + impl AsyncRead for $target { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + AsyncRead::poll_read(Pin::new(&mut self.inner), cx, buf) + } + } + + impl AsyncWrite for $target { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + AsyncWrite::poll_write(Pin::new(&mut self.inner), cx, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { + AsyncWrite::poll_flush(Pin::new(&mut self.inner), cx) + } + + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { + AsyncWrite::poll_close(Pin::new(&mut self.inner), cx) + } + } + }; +} + +macro_rules! impl_internal_traits { + ($listener:tt,$stream:tt) => { + #[async_trait::async_trait] + impl internal::TcpListener for $listener { + type TcpStream = $stream; + + async fn bind(addrs: &SocketAddr) -> io::Result<$listener> { + <$listener>::bind(addrs).await + } + + async fn accept(&mut self) -> io::Result<($stream, SocketAddr)> { + <$listener>::accept(self).await + } + + fn local_addr(&self) -> io::Result { + <$listener>::local_addr(self) + } + } + + #[async_trait::async_trait] + impl internal::TcpStream for $stream { + async fn connect(addrs: &SocketAddr) -> io::Result<$stream> { + <$stream>::connect(addrs).await + } + fn set_ttl(&self, ttl: u32) -> io::Result<()> { + <$stream>::set_ttl(self, ttl) + } + fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { + <$stream>::set_nodelay(self, nodelay) + } + fn peer_addr(&self) -> io::Result { + <$stream>::peer_addr(self) + } + fn local_addr(&self) -> io::Result { + <$stream>::local_addr(self) + } + } + }; +} diff --git a/transports/tcp/src/tokio.rs b/transports/tcp/src/tokio.rs new file mode 100644 index 000000000000..0f1a07d1cf9b --- /dev/null +++ b/transports/tcp/src/tokio.rs @@ -0,0 +1,44 @@ +use crate::internal; +use futures::{AsyncRead, AsyncWrite}; +use libp2p_core::{transport::TransportError, Multiaddr, Transport}; +use std::{ + io, + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::net::{TcpListener, TcpStream}; + +pub type TcpTransStream = internal::TcpTransStream; + +/// Represents the configuration for a TCP/IP transport capability for libp2p. +/// +/// This implementation uses the network futures from tokio. As such, a tokio reactor must be +/// spawned to avoid runtime panics. +#[derive(Debug, Clone, Default)] +pub struct TcpConfig { + inner: internal::TcpConfig, +} + +impl TcpConfig { + /// Creates a new configuration object for TCP/IP. + pub fn new() -> Self { + Self::default() + } + + /// Sets the TTL to set for opened sockets. + pub fn ttl(mut self, value: u32) -> Self { + self.inner.ttl = Some(value); + self + } + + /// Sets the `TCP_NODELAY` to set for opened sockets. + pub fn nodelay(mut self, value: bool) -> Self { + self.inner.nodelay = Some(value); + self + } +} + +impl_transport!(TcpConfig, internal::TcpConfig); +impl_internal_traits!(TcpListener, TcpStream); +impl_async_read_and_write!(internal::TcpTransStream); diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index b55dd25851b6..ede64ac5ffa9 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -203,7 +203,7 @@ mod tests { } async fn connect(listen_addr: Multiaddr) { - let ws_config = WsConfig::new(tcp::TcpConfig::new()); + let ws_config = WsConfig::new(tcp::async_std::TcpConfig::new()); let mut listener = ws_config.clone() .listen_on(listen_addr)