Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tcp: Set IPV6_V6ONLY for IPv6 listeners. #1555

Merged
merged 3 commits into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions transports/tcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ get_if_addrs = "0.5.3"
ipnet = "2.0.0"
libp2p-core = { version = "0.18.0", path = "../../core" }
log = "0.4.1"
socket2 = "0.3.12"
tokio = { version = "0.2", default-features = false, features = ["tcp"], optional = true }

[features]
Expand Down
162 changes: 94 additions & 68 deletions transports/tcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ use libp2p_core::{
transport::{ListenerEvent, TransportError}
};
use log::{debug, trace};
use socket2::{Socket, Domain, Type};
use std::{
collections::VecDeque,
convert::TryFrom,
io,
iter::{self, FromIterator},
net::{IpAddr, SocketAddr},
Expand Down Expand Up @@ -108,7 +110,24 @@ impl Transport for $tcp_config {
async fn do_listen(cfg: $tcp_config, socket_addr: SocketAddr)
-> Result<impl Stream<Item = Result<ListenerEvent<Ready<Result<$tcp_trans_stream, io::Error>>, io::Error>, io::Error>>, io::Error>
{
let listener = <$tcp_listener>::bind(&socket_addr).await?;
let socket = if socket_addr.is_ipv4() {
Socket::new(Domain::ipv4(), Type::stream(), Some(socket2::Protocol::tcp()))?
} else {
let s = Socket::new(Domain::ipv6(), Type::stream(), Some(socket2::Protocol::tcp()))?;
s.set_only_v6(true)?;
s
};
if cfg!(target_family = "unix") {
socket.set_reuse_address(true)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that imply that on non-unix platforms one can not use dualstack nor two listeners (ipv4 and ipv6 on the same port)? (If so, not sure whether it would be important.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on the particular platform. On Windows for example IPV6_V6ONLY seems to be true by default and use of SO_REUSEADDR is discouraged if I read this correctly, but frankly I do not know Windows and if someone has ideas how to configure things more optimally on this and other non-unix platforms we can make those changes in subsequent PRs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea of the implications of that change. What are the reasons for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not know why it is restricted to Unix, but again this is what MIO does which in turn does what libstd does.

}
socket.bind(&socket_addr.into())?;
socket.listen(1024)?; // we may want to make this configurable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default in Rust seems to be 128
https://github.com/rust-lang/rust/blob/c06e4aca19046b07d952b16e9f002bfab38fde6b/src/libstd/sys/unix/ext/net.rs#L823

Suggested change
socket.listen(1024)?; // we may want to make this configurable
socket.listen(128)?; // we may want to make this configurable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following MIOs lead here as it is what tokio and async-std use.

mxinden marked this conversation as resolved.
Show resolved Hide resolved

let listener = match <$tcp_listener>::try_from(socket.into_tcp_listener()) {
twittner marked this conversation as resolved.
Show resolved Hide resolved
Ok(listener) => listener,
Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e))
};

let local_addr = listener.local_addr()?;
let port = local_addr.port();

Expand Down Expand Up @@ -485,42 +504,45 @@ mod tests {
#[test]
#[cfg(feature = "async-std")]
fn wildcard_expansion() {
let mut listener = TcpConfig::new()
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.expect("listener");

// Get the first address.
let addr = futures::executor::block_on_stream(listener.by_ref())
.next()
.expect("some event")
.expect("no error")
.into_new_address()
.expect("listen address");

// Process all initial `NewAddress` events and make sure they
// do not contain wildcard address or port.
let server = listener
.take_while(|event| match event.as_ref().unwrap() {
ListenerEvent::NewAddress(a) => {
let mut iter = a.iter();
match iter.next().expect("ip address") {
Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
other => panic!("Unexpected protocol: {}", other)
}
if let Protocol::Tcp(port) = iter.next().expect("port") {
assert_ne!(0, port)
} else {
panic!("No TCP port in address: {}", a)
fn test(addr: Multiaddr) {
let mut listener = TcpConfig::new().listen_on(addr).expect("listener");

// Get the first address.
let addr = futures::executor::block_on_stream(listener.by_ref())
.next()
.expect("some event")
.expect("no error")
.into_new_address()
.expect("listen address");

// Process all initial `NewAddress` events and make sure they
// do not contain wildcard address or port.
let server = listener
.take_while(|event| match event.as_ref().unwrap() {
ListenerEvent::NewAddress(a) => {
let mut iter = a.iter();
match iter.next().expect("ip address") {
Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
other => panic!("Unexpected protocol: {}", other)
}
if let Protocol::Tcp(port) = iter.next().expect("port") {
assert_ne!(0, port)
} else {
panic!("No TCP port in address: {}", a)
}
futures::future::ready(true)
}
futures::future::ready(true)
}
_ => futures::future::ready(false)
})
.for_each(|_| futures::future::ready(()));
_ => futures::future::ready(false)
})
.for_each(|_| futures::future::ready(()));

let client = TcpConfig::new().dial(addr).expect("dialer");
async_std::task::block_on(futures::future::join(server, client)).1.unwrap();
let client = TcpConfig::new().dial(addr).expect("dialer");
async_std::task::block_on(futures::future::join(server, client)).1.unwrap();
}

test("/ip4/0.0.0.0/tcp/0".parse().unwrap());
test("/ip6/::1/tcp/0".parse().unwrap());
}

#[test]
Expand Down Expand Up @@ -575,43 +597,47 @@ mod tests {
#[test]
#[cfg(feature = "async-std")]
fn communicating_between_dialer_and_listener() {
let (ready_tx, ready_rx) = futures::channel::oneshot::channel();
let mut ready_tx = Some(ready_tx);

async_std::task::spawn(async move {
let addr = "/ip4/127.0.0.1/tcp/0".parse::<Multiaddr>().unwrap();
let tcp = TcpConfig::new();
let mut listener = tcp.listen_on(addr).unwrap();

loop {
match listener.next().await.unwrap().unwrap() {
ListenerEvent::NewAddress(listen_addr) => {
ready_tx.take().unwrap().send(listen_addr).unwrap();
},
ListenerEvent::Upgrade { upgrade, .. } => {
let mut upgrade = upgrade.await.unwrap();
let mut buf = [0u8; 3];
upgrade.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [1, 2, 3]);
upgrade.write_all(&[4, 5, 6]).await.unwrap();
},
_ => unreachable!()
fn test(addr: Multiaddr) {
let (ready_tx, ready_rx) = futures::channel::oneshot::channel();
let mut ready_tx = Some(ready_tx);

async_std::task::spawn(async move {
let tcp = TcpConfig::new();
let mut listener = tcp.listen_on(addr).unwrap();

loop {
match listener.next().await.unwrap().unwrap() {
ListenerEvent::NewAddress(listen_addr) => {
ready_tx.take().unwrap().send(listen_addr).unwrap();
},
ListenerEvent::Upgrade { upgrade, .. } => {
let mut upgrade = upgrade.await.unwrap();
let mut buf = [0u8; 3];
upgrade.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [1, 2, 3]);
upgrade.write_all(&[4, 5, 6]).await.unwrap();
},
_ => unreachable!()
}
}
}
});
});

async_std::task::block_on(async move {
let addr = ready_rx.await.unwrap();
let tcp = TcpConfig::new();
async_std::task::block_on(async move {
let addr = ready_rx.await.unwrap();
let tcp = TcpConfig::new();

// Obtain a future socket through dialing
let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap();
socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
// Obtain a future socket through dialing
let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap();
socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();

let mut buf = [0u8; 3];
socket.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [4, 5, 6]);
});
}

let mut buf = [0u8; 3];
socket.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [4, 5, 6]);
});
test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
test("/ip6/::1/tcp/0".parse().unwrap());
}

#[test]
Expand Down