diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index a6bf498c3364..6af70648be1f 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -37,10 +37,16 @@ jobs: components: clippy - name: Build & Test (Default) run: cargo test --verbose --no-fail-fast + - name: Build & Test (Default) - shadowsocks + run: cargo test --manifest-path ./crates/shadowsocks/Cargo.toml --verbose --no-fail-fast - name: Build & Test (--no-default-features) run: cargo test --verbose --no-default-features --no-fail-fast + - name: Build & Test (--no-default-features) - shadowsocks + run: cargo test --manifest-path ./crates/shadowsocks/Cargo.toml --verbose --no-default-features --no-fail-fast - name: Build with All Features Enabled run: cargo build --verbose --features "local-http-rustls local-redir local-dns dns-over-tls dns-over-https stream-cipher" + - name: Build with All Features Enabled - shadowsocks + run: cargo build --manifest-path ./crates/shadowsocks/Cargo.toml --verbose --features "stream-cipher" - name: Clippy Check uses: actions-rs/clippy-check@v1 with: diff --git a/Cargo.lock b/Cargo.lock index 3acd732f7e2b..e655d9911c18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1470,7 +1470,7 @@ dependencies = [ [[package]] name = "shadowsocks" -version = "1.10.3" +version = "1.11.0" dependencies = [ "arc-swap 1.2.0", "async-trait", @@ -1514,7 +1514,7 @@ dependencies = [ [[package]] name = "shadowsocks-rust" -version = "1.10.9" +version = "1.11.0" dependencies = [ "byte_string", "byteorder", @@ -1535,7 +1535,7 @@ dependencies = [ [[package]] name = "shadowsocks-service" -version = "1.10.6" +version = "1.11.0" dependencies = [ "async-trait", "byte_string", diff --git a/Cargo.toml b/Cargo.toml index da16bcd6d3ba..16aaba1b8fcc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shadowsocks-rust" -version = "1.10.9" +version = "1.11.0" authors = ["Shadowsocks Contributors"] description = "shadowsocks is a fast tunnel proxy that helps you bypass firewalls." repository = "https://github.com/shadowsocks/shadowsocks-rust" diff --git a/bin/sslocal.rs b/bin/sslocal.rs index 4119249a2173..33513d62111d 100644 --- a/bin/sslocal.rs +++ b/bin/sslocal.rs @@ -66,11 +66,11 @@ fn main() { (@arg DNS: --dns +takes_value "DNS nameservers, formatted like [(tcp|udp)://]host[:port][,host[:port]]..., or unix:///path/to/dns, or predefined keys like \"google\", \"cloudflare\"") (@arg TCP_NO_DELAY: --("tcp-no-delay") !takes_value alias("no-delay") "Set TCP_NODELAY option for socket") + (@arg TCP_FAST_OPEN: --("tcp-fast-open") !takes_value alias("fast-open") "Enable TCP Fast Open (TFO)") (@arg UDP_TIMEOUT: --("udp-timeout") +takes_value {validator::validate_u64} "Timeout seconds for UDP relay") (@arg UDP_MAX_ASSOCIATIONS: --("udp-max-associations") +takes_value {validator::validate_u64} "Maximum associations to be kept simultaneously for UDP relay") - (@arg INBOUND_SEND_BUFFER_SIZE: --("inbound-send-buffer-size") +takes_value {validator::validate_u32} "Set inbound sockets' SO_SNDBUF option") (@arg INBOUND_RECV_BUFFER_SIZE: --("inbound-recv-buffer-size") +takes_value {validator::validate_u32} "Set inbound sockets' SO_RCVBUF option") (@arg OUTBOUND_SEND_BUFFER_SIZE: --("outbound-send-buffer-size") +takes_value {validator::validate_u32} "Set outbound sockets' SO_SNDBUF option") @@ -346,6 +346,10 @@ fn main() { config.no_delay = true; } + if matches.is_present("TCP_FAST_OPEN") { + config.fast_open = true; + } + #[cfg(any(target_os = "linux", target_os = "android"))] if let Some(mark) = matches.value_of("OUTBOUND_FWMARK") { config.outbound_fwmark = Some(mark.parse::().expect("an unsigned integer for `outbound-fwmark`")); diff --git a/bin/ssmanager.rs b/bin/ssmanager.rs index 4d5405f38096..2476057f9c0c 100644 --- a/bin/ssmanager.rs +++ b/bin/ssmanager.rs @@ -51,7 +51,6 @@ fn main() { (@arg BIND_ADDR: -b --("bind-addr") +takes_value {validator::validate_ip_addr} "Bind address, outbound socket will bind this address") (@arg SERVER_HOST: -s --("server-host") +takes_value "Host name or IP address of your remote server") - (@arg MANAGER_ADDRESS: --("manager-address") +takes_value {validator::validate_manager_addr} "ShadowSocks Manager (ssmgr) address, could be ip:port, domain:port or /path/to/unix.sock") (@arg ENCRYPT_METHOD: -m --("encrypt-method") +takes_value possible_values(available_ciphers()) "Default encryption method") (@arg TIMEOUT: --timeout +takes_value {validator::validate_u64} "Default timeout seconds for TCP relay") @@ -60,6 +59,7 @@ fn main() { (@arg DNS: --dns +takes_value "DNS nameservers, formatted like [(tcp|udp)://]host[:port][,host[:port]]..., or unix:///path/to/dns, or predefined keys like \"google\", \"cloudflare\"") (@arg TCP_NO_DELAY: --("tcp-no-delay") !takes_value alias("no-delay") "Set TCP_NODELAY option for socket") + (@arg TCP_FAST_OPEN: --("tcp-fast-open") !takes_value alias("fast-open") "Enable TCP Fast Open (TFO)") (@arg UDP_TIMEOUT: --("udp-timeout") +takes_value {validator::validate_u64} "Timeout seconds for UDP relay") (@arg UDP_MAX_ASSOCIATIONS: --("udp-max-associations") +takes_value {validator::validate_u64} "Maximum associations to be kept simultaneously for UDP relay") @@ -154,6 +154,10 @@ fn main() { config.no_delay = true; } + if matches.is_present("TCP_FAST_OPEN") { + config.fast_open = true; + } + #[cfg(any(target_os = "linux", target_os = "android"))] if let Some(mark) = matches.value_of("OUTBOUND_FWMARK") { config.outbound_fwmark = Some(mark.parse::().expect("an unsigned integer for `outbound-fwmark`")); diff --git a/bin/ssserver.rs b/bin/ssserver.rs index b3104b0a507f..144d6f2a35e8 100644 --- a/bin/ssserver.rs +++ b/bin/ssserver.rs @@ -61,6 +61,7 @@ fn main() { (@arg DNS: --dns +takes_value "DNS nameservers, formatted like [(tcp|udp)://]host[:port][,host[:port]]..., or unix:///path/to/dns, or predefined keys like \"google\", \"cloudflare\"") (@arg TCP_NO_DELAY: --("tcp-no-delay") !takes_value alias("no-delay") "Set TCP_NODELAY option for socket") + (@arg TCP_FAST_OPEN: --("tcp-fast-open") !takes_value alias("fast-open") "Enable TCP Fast Open (TFO)") (@arg UDP_TIMEOUT: --("udp-timeout") +takes_value {validator::validate_u64} "Timeout seconds for UDP relay") (@arg UDP_MAX_ASSOCIATIONS: --("udp-max-associations") +takes_value {validator::validate_u64} "Maximum associations to be kept simultaneously for UDP relay") @@ -194,6 +195,10 @@ fn main() { config.no_delay = true; } + if matches.is_present("TCP_FAST_OPEN") { + config.fast_open = true; + } + #[cfg(any(target_os = "linux", target_os = "android"))] if let Some(mark) = matches.value_of("OUTBOUND_FWMARK") { config.outbound_fwmark = Some(mark.parse::().expect("an unsigned integer for `outbound-fwmark`")); diff --git a/crates/shadowsocks-service/Cargo.toml b/crates/shadowsocks-service/Cargo.toml index 0c2d785de167..d15c20d36636 100644 --- a/crates/shadowsocks-service/Cargo.toml +++ b/crates/shadowsocks-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shadowsocks-service" -version = "1.10.6" +version = "1.11.0" authors = ["Shadowsocks Contributors"] description = "shadowsocks is a fast tunnel proxy that helps you bypass firewalls." repository = "https://github.com/shadowsocks/shadowsocks-rust" diff --git a/crates/shadowsocks-service/src/config.rs b/crates/shadowsocks-service/src/config.rs index 7f904656559c..b2a17fbfb705 100644 --- a/crates/shadowsocks-service/src/config.rs +++ b/crates/shadowsocks-service/src/config.rs @@ -41,8 +41,6 @@ //! //! These defined server will be used with a load balancing algorithm. -#[cfg(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "ios"))] -use std::ffi::OsString; #[cfg(any(unix, target_os = "android", feature = "local-flow-stat"))] use std::path::PathBuf; use std::{ @@ -130,6 +128,8 @@ struct SSConfig { nofile: Option, #[serde(skip_serializing_if = "Option::is_none")] ipv6_first: Option, + #[serde(skip_serializing_if = "Option::is_none")] + fast_open: Option, } #[derive(Serialize, Deserialize, Debug, Default)] @@ -720,6 +720,8 @@ pub struct Config { /// Set `TCP_NODELAY` socket option pub no_delay: bool, + /// Set `TCP_FASTOPEN` socket option + pub fast_open: bool, /// `RLIMIT_NOFILE` option for *nix systems #[cfg(all(unix, not(target_os = "android")))] pub nofile: Option, @@ -729,7 +731,7 @@ pub struct Config { pub outbound_fwmark: Option, /// Set `SO_BINDTODEVICE` socket option for outbound sockets #[cfg(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "ios"))] - pub outbound_bind_interface: Option, + pub outbound_bind_interface: Option, /// Path to protect callback unix address, only for Android #[cfg(target_os = "android")] pub outbound_vpn_protect_path: Option, @@ -833,6 +835,7 @@ impl Config { ipv6_first: false, no_delay: false, + fast_open: false, #[cfg(all(unix, not(target_os = "android")))] nofile: None, @@ -1280,6 +1283,11 @@ impl Config { nconfig.no_delay = b; } + // TCP fast open + if let Some(b) = config.fast_open { + nconfig.fast_open = b; + } + // UDP nconfig.udp_timeout = config.udp_timeout.map(Duration::from_secs); @@ -1755,6 +1763,10 @@ impl fmt::Display for Config { jconf.no_delay = Some(self.no_delay); } + if self.fast_open { + jconf.fast_open = Some(self.fast_open); + } + match self.dns { DnsConfig::System => {} #[cfg(feature = "trust-dns")] diff --git a/crates/shadowsocks-service/src/local/dns/upstream.rs b/crates/shadowsocks-service/src/local/dns/upstream.rs index c6595f620c07..ab2b6dc52a2c 100644 --- a/crates/shadowsocks-service/src/local/dns/upstream.rs +++ b/crates/shadowsocks-service/src/local/dns/upstream.rs @@ -18,7 +18,7 @@ use shadowsocks::{ use tokio::net::UnixStream; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, - net::{TcpStream, UdpSocket}, + net::UdpSocket, time, }; use trust_dns_resolver::proto::{ @@ -32,7 +32,7 @@ use crate::net::{FlowStat, MonProxySocket, MonProxyStream}; #[allow(clippy::large_enum_variant)] pub enum DnsClient { TcpLocal { - stream: TcpStream, + stream: ShadowTcpStream, }, UdpLocal { socket: UdpSocket, @@ -43,7 +43,7 @@ pub enum DnsClient { stream: UnixStream, }, TcpRemote { - stream: ProxyClientStream>, + stream: ProxyClientStream>, }, UdpRemote { socket: MonProxySocket, @@ -54,7 +54,7 @@ pub enum DnsClient { impl DnsClient { /// Connect to local provided TCP DNS server pub async fn connect_tcp_local(ns: SocketAddr, connect_opts: &ConnectOpts) -> io::Result { - let stream = ShadowTcpStream::connect_with_opts(&ns, connect_opts).await?.into(); + let stream = ShadowTcpStream::connect_with_opts(&ns, connect_opts).await?; Ok(DnsClient::TcpLocal { stream }) } diff --git a/crates/shadowsocks-service/src/local/mod.rs b/crates/shadowsocks-service/src/local/mod.rs index b204b7c88d4d..a2098fff8bc8 100644 --- a/crates/shadowsocks-service/src/local/mod.rs +++ b/crates/shadowsocks-service/src/local/mod.rs @@ -9,7 +9,7 @@ use futures::{ stream::{FuturesUnordered, StreamExt}, FutureExt, }; -use log::{error, trace, warn}; +use log::{error, trace}; use shadowsocks::{ config::Mode, net::{AcceptOpts, ConnectOpts}, @@ -53,7 +53,7 @@ pub async fn run(mut config: Config) -> io::Result<()> { #[cfg(feature = "stream-cipher")] for server in config.server.iter() { if server.method().is_stream() { - warn!("stream cipher {} for server {} have inherent weaknesses (see discussion in https://github.com/shadowsocks/shadowsocks-org/issues/36). \ + log::warn!("stream cipher {} for server {} have inherent weaknesses (see discussion in https://github.com/shadowsocks/shadowsocks-org/issues/36). \ DO NOT USE. It will be removed in the future.", server.method(), server.addr()); } } @@ -62,7 +62,7 @@ pub async fn run(mut config: Config) -> io::Result<()> { if let Some(nofile) = config.nofile { use crate::sys::set_nofile; if let Err(err) = set_nofile(nofile) { - warn!("set_nofile {} failed, error: {}", nofile, err); + log::warn!("set_nofile {} failed, error: {}", nofile, err); } } @@ -82,12 +82,14 @@ pub async fn run(mut config: Config) -> io::Result<()> { }; connect_opts.tcp.send_buffer_size = config.outbound_send_buffer_size; connect_opts.tcp.recv_buffer_size = config.outbound_recv_buffer_size; + connect_opts.tcp.fastopen = config.fast_open; context.set_connect_opts(connect_opts); let mut accept_opts = AcceptOpts::default(); accept_opts.tcp.send_buffer_size = config.inbound_send_buffer_size; accept_opts.tcp.recv_buffer_size = config.inbound_recv_buffer_size; accept_opts.tcp.nodelay = config.no_delay; + accept_opts.tcp.fastopen = config.fast_open; if let Some(resolver) = build_dns_resolver(config.dns, config.ipv6_first, context.connect_opts_ref()).await { context.set_dns_resolver(Arc::new(resolver)); diff --git a/crates/shadowsocks-service/src/local/net/tcp/auto_proxy_stream.rs b/crates/shadowsocks-service/src/local/net/tcp/auto_proxy_stream.rs index 5430d5a34367..00d08782c95a 100644 --- a/crates/shadowsocks-service/src/local/net/tcp/auto_proxy_stream.rs +++ b/crates/shadowsocks-service/src/local/net/tcp/auto_proxy_stream.rs @@ -16,13 +16,7 @@ use shadowsocks::{ tcprelay::proxy_stream::{ProxyClientStream, ProxyClientStreamReadHalf, ProxyClientStreamWriteHalf}, }, }; -use tokio::{ - io::{AsyncRead, AsyncWrite, ReadBuf}, - net::{ - tcp::{OwnedReadHalf, OwnedWriteHalf}, - TcpStream as TokioTcpStream, - }, -}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf, ReadHalf, WriteHalf}; use crate::{ local::{context::ServiceContext, loadbalancing::ServerIdent}, @@ -34,8 +28,8 @@ use super::auto_proxy_io::AutoProxyIo; /// Unified stream for bypassed and proxied connections #[pin_project(project = AutoProxyClientStreamProj)] pub enum AutoProxyClientStream { - Proxied(#[pin] ProxyClientStream>), - Bypassed(#[pin] TokioTcpStream), + Proxied(#[pin] ProxyClientStream>), + Bypassed(#[pin] TcpStream), } impl AutoProxyClientStream { @@ -160,8 +154,8 @@ impl AsyncWrite for AutoProxyClientStream { } } -impl From>> for AutoProxyClientStream { - fn from(s: ProxyClientStream>) -> Self { +impl From>> for AutoProxyClientStream { + fn from(s: ProxyClientStream>) -> Self { AutoProxyClientStream::Proxied(s) } } @@ -177,7 +171,7 @@ impl AutoProxyClientStream { ) } AutoProxyClientStream::Bypassed(s) => { - let (r, w) = s.into_split(); + let (r, w) = tokio::io::split(s); ( AutoProxyClientStreamReadHalf::Bypassed(r), AutoProxyClientStreamWriteHalf::Bypassed(w), @@ -189,8 +183,8 @@ impl AutoProxyClientStream { #[pin_project(project = AutoProxyClientStreamReadHalfProj)] pub enum AutoProxyClientStreamReadHalf { - Proxied(#[pin] ProxyClientStreamReadHalf>), - Bypassed(#[pin] OwnedReadHalf), + Proxied(#[pin] ProxyClientStreamReadHalf>), + Bypassed(#[pin] ReadHalf), } impl AutoProxyIo for AutoProxyClientStreamReadHalf { @@ -210,8 +204,8 @@ impl AsyncRead for AutoProxyClientStreamReadHalf { #[pin_project(project = AutoProxyClientStreamWriteHalfProj)] pub enum AutoProxyClientStreamWriteHalf { - Proxied(#[pin] ProxyClientStreamWriteHalf>), - Bypassed(#[pin] OwnedWriteHalf), + Proxied(#[pin] ProxyClientStreamWriteHalf>), + Bypassed(#[pin] WriteHalf), } impl AutoProxyIo for AutoProxyClientStreamWriteHalf { diff --git a/crates/shadowsocks-service/src/manager/mod.rs b/crates/shadowsocks-service/src/manager/mod.rs index 5067c9a0cc17..54c5b3a62ca2 100644 --- a/crates/shadowsocks-service/src/manager/mod.rs +++ b/crates/shadowsocks-service/src/manager/mod.rs @@ -50,11 +50,13 @@ pub async fn run(config: Config) -> io::Result<()> { connect_opts.tcp.send_buffer_size = config.outbound_send_buffer_size; connect_opts.tcp.recv_buffer_size = config.outbound_recv_buffer_size; connect_opts.tcp.nodelay = config.no_delay; + connect_opts.tcp.fastopen = config.fast_open; let mut accept_opts = AcceptOpts::default(); accept_opts.tcp.send_buffer_size = config.inbound_send_buffer_size; accept_opts.tcp.recv_buffer_size = config.inbound_recv_buffer_size; accept_opts.tcp.nodelay = config.no_delay; + accept_opts.tcp.fastopen = config.fast_open; if let Some(resolver) = build_dns_resolver(config.dns, config.ipv6_first, &connect_opts).await { manager.set_dns_resolver(Arc::new(resolver)); diff --git a/crates/shadowsocks-service/src/server/mod.rs b/crates/shadowsocks-service/src/server/mod.rs index 5a56f4246e43..0c09510b5466 100644 --- a/crates/shadowsocks-service/src/server/mod.rs +++ b/crates/shadowsocks-service/src/server/mod.rs @@ -3,7 +3,7 @@ use std::{io, sync::Arc}; use futures::{future, FutureExt}; -use log::{trace, warn}; +use log::trace; use shadowsocks::net::{AcceptOpts, ConnectOpts}; use crate::{ @@ -30,7 +30,7 @@ pub async fn run(config: Config) -> io::Result<()> { #[cfg(feature = "stream-cipher")] for server in config.server.iter() { if server.method().is_stream() { - warn!("stream cipher {} for server {} have inherent weaknesses (see discussion in https://github.com/shadowsocks/shadowsocks-org/issues/36). \ + log::warn!("stream cipher {} for server {} have inherent weaknesses (see discussion in https://github.com/shadowsocks/shadowsocks-org/issues/36). \ DO NOT USE. It will be removed in the future.", server.method(), server.addr()); } } @@ -39,7 +39,7 @@ pub async fn run(config: Config) -> io::Result<()> { if let Some(nofile) = config.nofile { use crate::sys::set_nofile; if let Err(err) = set_nofile(nofile) { - warn!("set_nofile {} failed, error: {}", nofile, err); + log::warn!("set_nofile {} failed, error: {}", nofile, err); } } @@ -63,11 +63,13 @@ pub async fn run(config: Config) -> io::Result<()> { connect_opts.tcp.send_buffer_size = config.outbound_send_buffer_size; connect_opts.tcp.recv_buffer_size = config.outbound_recv_buffer_size; connect_opts.tcp.nodelay = config.no_delay; + connect_opts.tcp.fastopen = config.fast_open; let mut accept_opts = AcceptOpts::default(); accept_opts.tcp.send_buffer_size = config.inbound_send_buffer_size; accept_opts.tcp.recv_buffer_size = config.inbound_recv_buffer_size; accept_opts.tcp.nodelay = config.no_delay; + accept_opts.tcp.fastopen = config.fast_open; let resolver = build_dns_resolver(config.dns, config.ipv6_first, &connect_opts) .await diff --git a/crates/shadowsocks/Cargo.toml b/crates/shadowsocks/Cargo.toml index da8660e5b1b8..0f0d23de6a39 100644 --- a/crates/shadowsocks/Cargo.toml +++ b/crates/shadowsocks/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shadowsocks" -version = "1.10.3" +version = "1.11.0" authors = ["Shadowsocks Contributors"] description = "shadowsocks is a fast tunnel proxy that helps you bypass firewalls." repository = "https://github.com/shadowsocks/shadowsocks-rust" @@ -55,7 +55,7 @@ async-trait = "0.1" mio = "0.7" socket2 = "0.4" -tokio = { version = "1", features = ["io-util", "macros", "net", "parking_lot", "process", "rt", "sync"] } +tokio = { version = "1.5", features = ["io-util", "macros", "net", "parking_lot", "process", "rt", "sync", "time"] } trust-dns-resolver = { version = "0.20", optional = true } arc-swap = { version = "1.2", optional = true } diff --git a/crates/shadowsocks/src/dns_resolver/resolver.rs b/crates/shadowsocks/src/dns_resolver/resolver.rs index 8475d7e72442..9dc7d9a7c4e4 100644 --- a/crates/shadowsocks/src/dns_resolver/resolver.rs +++ b/crates/shadowsocks/src/dns_resolver/resolver.rs @@ -12,9 +12,12 @@ use std::{ #[cfg(feature = "trust-dns")] use arc_swap::ArcSwap; use async_trait::async_trait; +use cfg_if::cfg_if; #[cfg(feature = "trust-dns")] use futures::future::{self, AbortHandle}; -use log::{error, log_enabled, trace, Level}; +#[cfg(feature = "trust-dns")] +use log::error; +use log::{log_enabled, trace, Level}; use tokio::net::lookup_host; #[cfg(feature = "trust-dns")] use trust_dns_resolver::{config::ResolverConfig, TokioAsyncResolver}; @@ -78,39 +81,54 @@ impl Drop for DnsResolver { } } -struct EmptyResolveResult; - -impl Iterator for EmptyResolveResult { - type Item = SocketAddr; - - fn next(&mut self) -> Option { - None - } -} - -// Resolved result -enum EitherResolved { - Tokio(A), - TrustDnsSystem(B), - TrustDns(C), - Custom(D), -} +cfg_if! { + if #[cfg(feature = "trust-dns")] { + /// Resolved result + enum EitherResolved { + Tokio(A), + TrustDnsSystem(B), + TrustDns(C), + Custom(D), + } -impl Iterator for EitherResolved -where - A: Iterator, - B: Iterator, - C: Iterator, - D: Iterator, -{ - type Item = SocketAddr; + impl Iterator for EitherResolved + where + A: Iterator, + B: Iterator, + C: Iterator, + D: Iterator, + { + type Item = SocketAddr; + + fn next(&mut self) -> Option { + match *self { + EitherResolved::Tokio(ref mut a) => a.next(), + EitherResolved::TrustDnsSystem(ref mut b) => b.next(), + EitherResolved::TrustDns(ref mut c) => c.next(), + EitherResolved::Custom(ref mut d) => d.next(), + } + } + } + } else { + /// Resolved result + enum EitherResolved { + Tokio(A), + Custom(D), + } - fn next(&mut self) -> Option { - match *self { - EitherResolved::Tokio(ref mut a) => a.next(), - EitherResolved::TrustDnsSystem(ref mut b) => b.next(), - EitherResolved::TrustDns(ref mut c) => c.next(), - EitherResolved::Custom(ref mut d) => d.next(), + impl Iterator for EitherResolved + where + A: Iterator, + D: Iterator, + { + type Item = SocketAddr; + + fn next(&mut self) -> Option { + match *self { + EitherResolved::Tokio(ref mut a) => a.next(), + EitherResolved::Custom(ref mut d) => d.next(), + } + } } } } diff --git a/crates/shadowsocks/src/net/mod.rs b/crates/shadowsocks/src/net/mod.rs index e135a72ddbbf..8fc6d776dfa5 100644 --- a/crates/shadowsocks/src/net/mod.rs +++ b/crates/shadowsocks/src/net/mod.rs @@ -9,6 +9,7 @@ pub use self::{ }; mod option; +mod sys; pub mod tcp; pub mod udp; diff --git a/crates/shadowsocks/src/net/option.rs b/crates/shadowsocks/src/net/option.rs index befa57992b48..4cca34e94c16 100644 --- a/crates/shadowsocks/src/net/option.rs +++ b/crates/shadowsocks/src/net/option.rs @@ -1,7 +1,5 @@ //! Options for connecting to remote server -#[cfg(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "ios"))] -use std::ffi::OsString; use std::net::IpAddr; /// Options for connecting to TCP remote server @@ -15,6 +13,9 @@ pub struct TcpSocketOpts { /// `TCP_NODELAY` pub nodelay: bool, + + /// `TCP_FASTOPEN`, enables TFO + pub fastopen: bool, } impl Default for TcpSocketOpts { @@ -23,6 +24,7 @@ impl Default for TcpSocketOpts { send_buffer_size: None, recv_buffer_size: None, nodelay: false, + fastopen: false, } } } @@ -47,7 +49,7 @@ pub struct ConnectOpts { /// Outbound socket binds to interface #[cfg(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "ios"))] - pub bind_interface: Option, + pub bind_interface: Option, /// TCP options pub tcp: TcpSocketOpts, diff --git a/crates/shadowsocks/src/net/sys/mod.rs b/crates/shadowsocks/src/net/sys/mod.rs new file mode 100644 index 000000000000..7c9fcb79e491 --- /dev/null +++ b/crates/shadowsocks/src/net/sys/mod.rs @@ -0,0 +1,54 @@ +use std::{ + io, + net::{IpAddr, SocketAddr}, +}; + +use cfg_if::cfg_if; +use tokio::net::TcpSocket; + +use super::ConnectOpts; + +cfg_if! { + if #[cfg(unix)] { + mod unix; + pub use self::unix::*; + } else if #[cfg(windows)] { + mod windows; + pub use self::windows::*; + } +} + +fn set_common_sockopt_for_connect(addr: SocketAddr, socket: &TcpSocket, opts: &ConnectOpts) -> io::Result<()> { + // Binds to IP address + if let Some(ip) = opts.bind_local_addr { + match (ip, addr.ip()) { + (IpAddr::V4(..), IpAddr::V4(..)) => { + socket.bind(SocketAddr::new(ip, 0))?; + } + (IpAddr::V6(..), IpAddr::V6(..)) => { + socket.bind(SocketAddr::new(ip, 0))?; + } + _ => {} + } + } + + // Set `SO_SNDBUF` + if let Some(buf_size) = opts.tcp.send_buffer_size { + socket.set_send_buffer_size(buf_size)?; + } + + // Set `SO_RCVBUF` + if let Some(buf_size) = opts.tcp.recv_buffer_size { + socket.set_recv_buffer_size(buf_size)?; + } + + Ok(()) +} + +fn set_common_sockopt_after_connect(stream: &tokio::net::TcpStream, opts: &ConnectOpts) -> io::Result<()> { + if opts.tcp.nodelay { + stream.set_nodelay(true)?; + } + + Ok(()) +} diff --git a/crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs b/crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs new file mode 100644 index 000000000000..8c8177068209 --- /dev/null +++ b/crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs @@ -0,0 +1,198 @@ +use std::{ + io::{self, ErrorKind}, + mem, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpStream as StdTcpStream}, + ops::{Deref, DerefMut}, + os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}, + pin::Pin, + task::{self, Poll}, +}; + +use futures::ready; +use log::error; +use pin_project::pin_project; +use socket2::SockAddr; +use tokio::{ + io::{AsyncRead, AsyncWrite, ReadBuf}, + net::{TcpSocket, TcpStream as TokioTcpStream, UdpSocket}, +}; + +use crate::net::{ + sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect}, + AddrFamily, + ConnectOpts, +}; + +enum TcpStreamState { + Connected, + FastOpenConnect(SocketAddr), +} + +/// A `TcpStream` that supports TFO (TCP Fast Open) +#[pin_project] +pub struct TcpStream { + #[pin] + inner: TokioTcpStream, + state: TcpStreamState, +} + +impl TcpStream { + pub async fn connect(addr: SocketAddr, opts: &ConnectOpts) -> io::Result { + let socket = match addr { + SocketAddr::V4(..) => TcpSocket::new_v4()?, + SocketAddr::V6(..) => TcpSocket::new_v6()?, + }; + + set_common_sockopt_for_connect(addr, &socket, opts)?; + + if !opts.tcp.fastopen { + // If TFO is not enabled, it just works like a normal TcpStream + let stream = socket.connect(addr).await?; + set_common_sockopt_after_connect(&stream, opts)?; + + return Ok(TcpStream { + inner: stream, + state: TcpStreamState::Connected, + }); + } + + unsafe { + let enable: libc::c_int = 1; + + let ret = libc::setsockopt( + socket.as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_FASTOPEN, + &enable as *const _ as *const libc::c_void, + mem::size_of_val(&enable) as libc::socklen_t, + ); + + if ret != 0 { + let err = io::Error::last_os_error(); + error!("set TCP_FASTOPEN error: {}", err); + return Err(err); + } + } + + let stream = TokioTcpStream::from_std(unsafe { StdTcpStream::from_raw_fd(socket.into_raw_fd()) })?; + set_common_sockopt_after_connect(&stream, opts)?; + + Ok(TcpStream { + inner: stream, + state: TcpStreamState::FastOpenConnect(addr), + }) + } +} + +impl Deref for TcpStream { + type Target = TokioTcpStream; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for TcpStream { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl AsyncRead for TcpStream { + fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + self.project().inner.poll_read(cx, buf) + } +} + +impl AsyncWrite for TcpStream { + fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { + let this = self.project(); + + if let TcpStreamState::FastOpenConnect(addr) = this.state { + loop { + // TCP_FASTOPEN was supported since FreeBSD 12.0 + // + // Example program: + // + + // Wait until socket is writable + ready!(this.inner.poll_write_ready(cx))?; + + unsafe { + let saddr = SockAddr::from(*addr); + + let ret = libc::sendto( + this.inner.as_raw_fd(), + buf.as_ptr() as *const libc::c_void, + buf.len(), + 0, // Yes, BSD doesn't need MSG_FASTOPEN + saddr.as_ptr(), + saddr.len(), + ); + + if ret >= 0 { + // Connect successfully. + *(this.state) = TcpStreamState::Connected; + return Ok(ret as usize).into(); + } else { + // Error occurs + let err = io::Error::last_os_error(); + if err.kind() != ErrorKind::WouldBlock { + return Err(err).into(); + } + } + } + } + } else { + this.inner.poll_write(cx, buf) + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().inner.poll_shutdown(cx) + } +} + +/// Enable `TCP_FASTOPEN` +/// +/// TCP_FASTOPEN was supported since FreeBSD 12.0 +/// +/// Example program: +pub fn set_tcp_fastopen(socket: &S) -> io::Result<()> { + let enable: libc::c_int = 1; + + unsafe { + let ret = libc::setsockopt( + socket.as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_FASTOPEN, + &enable as *const _ as *const libc::c_void, + mem::size_of_val(&enable) as libc::socklen_t, + ); + + if ret != 0 { + let err = io::Error::last_os_error(); + error!("set TCP_FASTOPEN error: {}", err); + return Err(err); + } + } + + Ok(()) +} + +/// Create a `UdpSocket` for connecting to `addr` +#[inline(always)] +pub async fn create_outbound_udp_socket(af: AddrFamily, config: &ConnectOpts) -> io::Result { + let bind_addr = match (af, config.bind_local_addr) { + (AddrFamily::Ipv4, Some(IpAddr::V4(ip))) => SocketAddr::new(ip.into(), 0), + (AddrFamily::Ipv6, Some(IpAddr::V6(ip))) => SocketAddr::new(ip.into(), 0), + (AddrFamily::Ipv4, ..) => SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0), + (AddrFamily::Ipv6, ..) => SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0), + }; + + UdpSocket::bind(bind_addr).await +} diff --git a/crates/shadowsocks/src/net/sys/unix/bsd/macos.rs b/crates/shadowsocks/src/net/sys/unix/bsd/macos.rs new file mode 100644 index 000000000000..cc6719121cb4 --- /dev/null +++ b/crates/shadowsocks/src/net/sys/unix/bsd/macos.rs @@ -0,0 +1,210 @@ +use std::{ + io::{self, ErrorKind}, + mem, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpStream as StdTcpStream}, + ops::{Deref, DerefMut}, + os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}, + pin::Pin, + ptr, + task::{self, Poll}, +}; + +use log::error; +use pin_project::pin_project; +use socket2::SockAddr; +use tokio::{ + io::{AsyncRead, AsyncWrite, ReadBuf}, + net::{TcpSocket, TcpStream as TokioTcpStream, UdpSocket}, +}; + +use crate::net::{ + sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect}, + AddrFamily, + ConnectOpts, +}; + +/// A `TcpStream` that supports TFO (TCP Fast Open) +#[pin_project] +pub struct TcpStream(#[pin] TokioTcpStream); + +impl TcpStream { + pub async fn connect(addr: SocketAddr, opts: &ConnectOpts) -> io::Result { + let socket = match addr { + SocketAddr::V4(..) => TcpSocket::new_v4()?, + SocketAddr::V6(..) => TcpSocket::new_v6()?, + }; + + // Binds to a specific network interface (device) + if let Some(ref iface) = opts.bind_interface { + set_ip_bound_if(&socket, addr, iface)?; + } + + set_common_sockopt_for_connect(addr, &socket, opts)?; + + if !opts.tcp.fastopen { + // If TFO is not enabled, it just works like a normal TcpStream + let stream = socket.connect(addr).await?; + set_common_sockopt_after_connect(&stream, opts)?; + return Ok(TcpStream(stream)); + } + + // TFO in macos uses connectx + + unsafe { + let raddr = SockAddr::from(addr); + + let mut endpoints: libc::sa_endpoints_t = mem::zeroed(); + endpoints.sae_dstaddr = raddr.as_ptr(); + endpoints.sae_dstaddrlen = raddr.len(); + + let ret = libc::connectx( + socket.as_raw_fd(), + &endpoints as *const _, + libc::SAE_ASSOCID_ANY, + libc::CONNECT_DATA_IDEMPOTENT /* Enable TFO */ | libc::CONNECT_RESUME_ON_READ_WRITE, /* Send SYN with subsequence send/recv */ + ptr::null(), + 0, + ptr::null_mut(), + ptr::null_mut(), + ); + + if ret != 0 { + return Err(io::Error::last_os_error()); + } + } + + let stream = TokioTcpStream::from_std(unsafe { StdTcpStream::from_raw_fd(socket.into_raw_fd()) })?; + set_common_sockopt_after_connect(&stream, opts)?; + + Ok(TcpStream(stream)) + } +} + +impl Deref for TcpStream { + type Target = TokioTcpStream; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for TcpStream { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl AsyncRead for TcpStream { + fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + self.project().0.poll_read(cx, buf) + } +} + +impl AsyncWrite for TcpStream { + fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { + self.project().0.poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().0.poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().0.poll_shutdown(cx) + } +} + +/// Enable `TCP_FASTOPEN` +/// +/// `TCP_FASTOPEN` was supported since +/// macosx(10.11), ios(9.0), tvos(9.0), watchos(2.0) +pub fn set_tcp_fastopen(socket: &S) -> io::Result<()> { + let enable: libc::c_int = 1; + + unsafe { + let ret = libc::setsockopt( + socket.as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_FASTOPEN, + &enable as *const _ as *const libc::c_void, + mem::size_of_val(&enable) as libc::socklen_t, + ); + + if ret != 0 { + let err = io::Error::last_os_error(); + error!("set TCP_FASTOPEN error: {}", err); + return Err(err); + } + } + + Ok(()) +} + +fn set_ip_bound_if(socket: &S, addr: SocketAddr, iface: &str) -> io::Result<()> { + const IP_BOUND_IF: libc::c_int = 25; // bsd/netinet/in.h + const IPV6_BOUND_IF: libc::c_int = 125; // bsd/netinet6/in6.h + + unsafe { + let mut ciface = [0u8; libc::IFNAMSIZ]; + if iface.len() >= ciface.len() { + return Err(ErrorKind::InvalidInput.into()); + } + + let iface_bytes = iface.as_bytes(); + ptr::copy_nonoverlapping(iface_bytes.as_ptr(), ciface.as_mut_ptr(), iface_bytes.len()); + + let index = libc::if_nametoindex(ciface.as_ptr() as *const libc::c_char); + if index == 0 { + let err = io::Error::last_os_error(); + error!("if_nametoindex ifname: {} error: {}", iface, err); + return Err(err); + } + + let ret = match addr { + SocketAddr::V4(..) => libc::setsockopt( + socket.as_raw_fd(), + libc::IPPROTO_IP, + IP_BOUND_IF, + &index as *const _ as *const _, + mem::size_of_val(&index) as libc::socklen_t, + ), + SocketAddr::V6(..) => libc::setsockopt( + socket.as_raw_fd(), + libc::IPPROTO_IPV6, + IPV6_BOUND_IF, + &index as *const _ as *const _, + mem::size_of_val(&index) as libc::socklen_t, + ), + }; + + if ret < 0 { + let err = io::Error::last_os_error(); + error!( + "set IF_BOUND_IF/IPV6_BOUND_IF ifname: {} ifindex: {} error: {}", + iface, index, err + ); + return Err(err); + } + } + + Ok(()) +} + +/// Create a `UdpSocket` for connecting to `addr` +pub async fn create_outbound_udp_socket(af: AddrFamily, config: &ConnectOpts) -> io::Result { + let bind_addr = match (af, config.bind_local_addr) { + (AddrFamily::Ipv4, Some(IpAddr::V4(ip))) => SocketAddr::new(ip.into(), 0), + (AddrFamily::Ipv6, Some(IpAddr::V6(ip))) => SocketAddr::new(ip.into(), 0), + (AddrFamily::Ipv4, ..) => SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0), + (AddrFamily::Ipv6, ..) => SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0), + }; + + let socket = UdpSocket::bind(bind_addr).await?; + + // Set IP_BOUND_IF for BSD-like + if let Some(ref iface) = config.bind_interface { + set_ip_bound_if(&socket, bind_addr, iface)?; + } + + Ok(socket) +} diff --git a/crates/shadowsocks/src/net/sys/unix/bsd/mod.rs b/crates/shadowsocks/src/net/sys/unix/bsd/mod.rs new file mode 100644 index 000000000000..6a9e2f5bc94a --- /dev/null +++ b/crates/shadowsocks/src/net/sys/unix/bsd/mod.rs @@ -0,0 +1,14 @@ +use cfg_if::cfg_if; + +cfg_if! { + if #[cfg(target_os = "freebsd")] { + mod freebsd; + pub use self::freebsd::*; + } else if #[cfg(any(target_os = "macos", target_os = "ios", target_os = "watchos", target_os = "tvos"))] { + mod macos; + pub use self::macos::*; + } else { + mod others; + pub use self::others::*; + } +} diff --git a/crates/shadowsocks/src/net/sys/unix/bsd/others.rs b/crates/shadowsocks/src/net/sys/unix/bsd/others.rs new file mode 100644 index 000000000000..944f0456f7fb --- /dev/null +++ b/crates/shadowsocks/src/net/sys/unix/bsd/others.rs @@ -0,0 +1 @@ +include!("../others.rs"); diff --git a/crates/shadowsocks/src/net/sys/unix/linux/mod.rs b/crates/shadowsocks/src/net/sys/unix/linux/mod.rs new file mode 100644 index 000000000000..a009c9805237 --- /dev/null +++ b/crates/shadowsocks/src/net/sys/unix/linux/mod.rs @@ -0,0 +1,366 @@ +use std::{ + io::{self, ErrorKind}, + mem, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpStream as StdTcpStream}, + ops::{Deref, DerefMut}, + os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}, + pin::Pin, + sync::atomic::{AtomicBool, Ordering}, + task::{self, Poll}, +}; + +use cfg_if::cfg_if; +use futures::ready; +use log::error; +use pin_project::pin_project; +use socket2::SockAddr; +use tokio::{ + io::{AsyncRead, AsyncWrite, ReadBuf}, + net::{TcpSocket, TcpStream as TokioTcpStream, UdpSocket}, +}; + +use crate::net::{ + sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect}, + AddrFamily, + ConnectOpts, +}; + +enum TcpStreamState { + Connected, + FastOpenConnect(SocketAddr), +} + +/// A `TcpStream` that supports TFO (TCP Fast Open) +#[pin_project] +pub struct TcpStream { + #[pin] + inner: TokioTcpStream, + state: TcpStreamState, +} + +impl TcpStream { + pub async fn connect(addr: SocketAddr, opts: &ConnectOpts) -> io::Result { + let socket = match addr { + SocketAddr::V4(..) => TcpSocket::new_v4()?, + SocketAddr::V6(..) => TcpSocket::new_v6()?, + }; + + // Any traffic to localhost should not be protected + // This is a workaround for VPNService + #[cfg(target_os = "android")] + if !addr.ip().is_loopback() { + use std::time::Duration; + use tokio::time; + + if let Some(ref path) = opts.vpn_protect_path { + // RPC calls to `VpnService.protect()` + // Timeout in 3 seconds like shadowsocks-libev + match time::timeout(Duration::from_secs(3), vpn_protect(path, socket.as_raw_fd())).await { + Ok(Ok(..)) => {} + Ok(Err(err)) => return Err(err), + Err(..) => return Err(io::Error::new(ErrorKind::TimedOut, "protect() timeout")), + } + } + } + + // Set SO_MARK for mark-based routing on Linux (since 2.6.25) + // NOTE: This will require CAP_NET_ADMIN capability (root in most cases) + if let Some(mark) = opts.fwmark { + let ret = unsafe { + libc::setsockopt( + socket.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_MARK, + &mark as *const _ as *const _, + mem::size_of_val(&mark) as libc::socklen_t, + ) + }; + if ret != 0 { + let err = io::Error::last_os_error(); + error!("set SO_MARK error: {}", err); + return Err(err); + } + } + + // Set SO_BINDTODEVICE for binding to a specific interface + if let Some(ref iface) = opts.bind_interface { + set_bindtodevice(&socket, iface)?; + } + + set_common_sockopt_for_connect(addr, &socket, opts)?; + + if !opts.tcp.fastopen { + // If TFO is not enabled, it just works like a normal TcpStream + let stream = socket.connect(addr).await?; + set_common_sockopt_after_connect(&stream, opts)?; + + return Ok(TcpStream { + inner: stream, + state: TcpStreamState::Connected, + }); + } + + let mut connected = false; + + // TFO in Linux was supported since 3.7 + // + // But TCP_FASTOPEN_CONNECT was supported since 4.1, so we have to be compatible with it + static SUPPORT_TCP_FASTOPEN_CONNECT: AtomicBool = AtomicBool::new(true); + if SUPPORT_TCP_FASTOPEN_CONNECT.load(Ordering::Relaxed) { + unsafe { + let enable: libc::c_int = 1; + + let ret = libc::setsockopt( + socket.as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_FASTOPEN_CONNECT, + &enable as *const _ as *const libc::c_void, + mem::size_of_val(&enable) as libc::socklen_t, + ); + + if ret != 0 { + let err = io::Error::last_os_error(); + if let Some(libc::ENOPROTOOPT) = err.raw_os_error() { + // `TCP_FASTOPEN_CONNECT` is not supported, maybe kernel version < 4.11 + // Fallback to `sendto` with `MSG_FASTOPEN` (Supported after 3.7) + SUPPORT_TCP_FASTOPEN_CONNECT.store(false, Ordering::Relaxed); + } else { + error!("set TCP_FASTOPEN_CONNECT error: {}", err); + return Err(err); + } + } else { + connected = true; + } + } + } + + let stream = if connected { + // call connect() if TCP_FASTOPEN_CONNECT is set + socket.connect(addr).await? + } else { + // call sendto() with MSG_FASTOPEN in poll_read + TokioTcpStream::from_std(unsafe { StdTcpStream::from_raw_fd(socket.into_raw_fd()) })? + }; + + set_common_sockopt_after_connect(&stream, opts)?; + + Ok(TcpStream { + inner: stream, + state: if connected { + TcpStreamState::Connected + } else { + TcpStreamState::FastOpenConnect(addr) + }, + }) + } +} + +impl Deref for TcpStream { + type Target = TokioTcpStream; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for TcpStream { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl AsyncRead for TcpStream { + fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + self.project().inner.poll_read(cx, buf) + } +} + +impl AsyncWrite for TcpStream { + fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { + let this = self.project(); + + if let TcpStreamState::FastOpenConnect(addr) = this.state { + loop { + // Fallback mode. Must be kernal < 4.11 + // + // Uses sendto as BSD-like systems + + // Wait until socket is writable + ready!(this.inner.poll_write_ready(cx))?; + + unsafe { + let saddr = SockAddr::from(*addr); + + let ret = libc::sendto( + this.inner.as_raw_fd(), + buf.as_ptr() as *const libc::c_void, + buf.len(), + libc::MSG_FASTOPEN, + saddr.as_ptr(), + saddr.len(), + ); + + if ret >= 0 { + // Connect successfully. + *(this.state) = TcpStreamState::Connected; + return Ok(ret as usize).into(); + } else { + // Error occurs + let err = io::Error::last_os_error(); + if err.kind() != ErrorKind::WouldBlock { + return Err(err).into(); + } + } + } + } + } else { + this.inner.poll_write(cx, buf) + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().inner.poll_shutdown(cx) + } +} + +/// Enable `TCP_FASTOPEN` +/// +/// `TCP_FASTOPEN` was supported since Linux 3.7 +pub fn set_tcp_fastopen(socket: &S) -> io::Result<()> { + let queue: libc::c_int = 5; + + unsafe { + let ret = libc::setsockopt( + socket.as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_FASTOPEN, + &queue as *const _ as *const libc::c_void, + mem::size_of_val(&queue) as libc::socklen_t, + ); + + if ret != 0 { + let err = io::Error::last_os_error(); + error!("set TCP_FASTOPEN error: {}", err); + return Err(err); + } + } + + Ok(()) +} + +/// Create a `UdpSocket` for connecting to `addr` +pub async fn create_outbound_udp_socket(af: AddrFamily, config: &ConnectOpts) -> io::Result { + let bind_addr = match (af, config.bind_local_addr) { + (AddrFamily::Ipv4, Some(IpAddr::V4(ip))) => SocketAddr::new(ip.into(), 0), + (AddrFamily::Ipv6, Some(IpAddr::V6(ip))) => SocketAddr::new(ip.into(), 0), + (AddrFamily::Ipv4, ..) => SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0), + (AddrFamily::Ipv6, ..) => SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0), + }; + + let socket = UdpSocket::bind(bind_addr).await?; + + // Any traffic except localhost should be protected + // This is a workaround for VPNService + #[cfg(target_os = "android")] + { + use std::time::Duration; + use tokio::time; + + if let Some(ref path) = config.vpn_protect_path { + // RPC calls to `VpnService.protect()` + // Timeout in 3 seconds like shadowsocks-libev + match time::timeout(Duration::from_secs(3), vpn_protect(path, socket.as_raw_fd())).await { + Ok(Ok(..)) => {} + Ok(Err(err)) => return Err(err), + Err(..) => return Err(io::Error::new(ErrorKind::TimedOut, "protect() timeout")), + } + } + } + + // Set SO_MARK for mark-based routing on Linux (since 2.6.25) + // NOTE: This will require CAP_NET_ADMIN capability (root in most cases) + if let Some(mark) = config.fwmark { + let ret = unsafe { + libc::setsockopt( + socket.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_MARK, + &mark as *const _ as *const _, + mem::size_of_val(&mark) as libc::socklen_t, + ) + }; + if ret != 0 { + let err = io::Error::last_os_error(); + error!("set SO_MARK error: {}", err); + return Err(err); + } + } + + // Set SO_BINDTODEVICE for binding to a specific interface + if let Some(ref iface) = config.bind_interface { + set_bindtodevice(&socket, iface)?; + } + + Ok(socket) +} + +fn set_bindtodevice(socket: &S, iface: &str) -> io::Result<()> { + let iface_bytes = iface.as_bytes(); + + unsafe { + let ret = libc::setsockopt( + socket.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_BINDTODEVICE, + iface_bytes.as_ptr() as *const _ as *const libc::c_void, + iface_bytes.len() as libc::socklen_t, + ); + + if ret != 0 { + let err = io::Error::last_os_error(); + error!("set SO_BINDTODEVICE error: {}", err); + return Err(err); + } + } + + Ok(()) +} + +cfg_if! { + if #[cfg(target_os = "android")] { + use std::path::Path; + use std::os::unix::io::RawFd; + + mod uds; + + /// This is a RPC for Android to `protect()` socket for connecting to remote servers + /// + /// https://developer.android.com/reference/android/net/VpnService#protect(java.net.Socket) + /// + /// More detail could be found in [shadowsocks-android](https://github.com/shadowsocks/shadowsocks-android) project. + async fn vpn_protect>(protect_path: P, fd: RawFd) -> io::Result<()> { + use tokio::io::AsyncReadExt; + + let mut stream = self::uds::UnixStream::connect(protect_path).await?; + + // send fds + let dummy: [u8; 1] = [1]; + let fds: [RawFd; 1] = [fd]; + stream.send_with_fd(&dummy, &fds).await?; + + // receive the return value + let mut response = [0; 1]; + stream.read_exact(&mut response).await?; + + if response[0] == 0xFF { + return Err(io::Error::new(ErrorKind::Other, "protect() failed")); + } + + Ok(()) + } + } +} diff --git a/crates/shadowsocks/src/relay/sys/unix/uds.rs b/crates/shadowsocks/src/net/sys/unix/linux/uds.rs similarity index 100% rename from crates/shadowsocks/src/relay/sys/unix/uds.rs rename to crates/shadowsocks/src/net/sys/unix/linux/uds.rs diff --git a/crates/shadowsocks/src/net/sys/unix/mod.rs b/crates/shadowsocks/src/net/sys/unix/mod.rs new file mode 100644 index 000000000000..425c82f18679 --- /dev/null +++ b/crates/shadowsocks/src/net/sys/unix/mod.rs @@ -0,0 +1,75 @@ +use std::{ + io::{self, ErrorKind}, + net::SocketAddr, +}; + +use cfg_if::cfg_if; +use log::{debug, warn}; +use socket2::{Domain, Protocol, SockAddr, Socket, Type}; +use tokio::net::UdpSocket; + +cfg_if! { + if #[cfg(any(target_os = "linux", target_os = "android"))] { + mod linux; + pub use self::linux::*; + } else if #[cfg(any(target_os = "freebsd", + target_os = "openbsd", + target_os = "netbsd", + target_os = "dragonfly", + target_os = "macos", + target_os = "ios", + target_os = "watchos", + target_os = "tvos"))] { + mod bsd; + pub use self::bsd::*; + } else { + mod others; + pub use self::others::*; + } +} + +/// Create a `UdpSocket` binded to `addr` +pub async fn create_inbound_udp_socket(addr: &SocketAddr) -> io::Result { + let set_dual_stack = if let SocketAddr::V6(ref v6) = *addr { + v6.ip().is_unspecified() + } else { + false + }; + + if !set_dual_stack { + UdpSocket::bind(addr).await + } else { + let socket = Socket::new(Domain::for_address(*addr), Type::DGRAM, Some(Protocol::UDP))?; + + if let Err(err) = socket.set_only_v6(false) { + warn!("failed to set IPV6_V6ONLY: false for listener, error: {}", err); + + // This is not a fatal error, just warn and skip + } + + let saddr = SockAddr::from(*addr); + + match socket.bind(&saddr) { + Ok(..) => {} + Err(ref err) if err.kind() == ErrorKind::AddrInUse => { + // This is probably 0.0.0.0 with the same port has already been occupied + debug!( + "0.0.0.0:{} may have already been occupied, retry with IPV6_V6ONLY", + addr.port() + ); + + if let Err(err) = socket.set_only_v6(true) { + warn!("failed to set IPV6_V6ONLY: true for listener, error: {}", err); + + // This is not a fatal error, just warn and skip + } + socket.bind(&saddr)?; + } + Err(err) => return Err(err), + } + + // UdpSocket::from_std requires socket to be non-blocked + socket.set_nonblocking(true)?; + UdpSocket::from_std(socket.into()) + } +} diff --git a/crates/shadowsocks/src/net/sys/unix/others.rs b/crates/shadowsocks/src/net/sys/unix/others.rs new file mode 100644 index 000000000000..8913040c5d50 --- /dev/null +++ b/crates/shadowsocks/src/net/sys/unix/others.rs @@ -0,0 +1,90 @@ +use std::{ + io, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, + ops::{Deref, DerefMut}, + pin::Pin, + task::{self, Poll}, +}; + +use pin_project::pin_project; +use tokio::{ + io::{AsyncRead, AsyncWrite, ReadBuf}, + net::{TcpSocket, TcpStream as TokioTcpStream, UdpSocket}, +}; + +use crate::net::{ + sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect}, + AddrFamily, + ConnectOpts, +}; + +/// A wrapper of `TcpStream` +#[pin_project] +pub struct TcpStream(#[pin] TokioTcpStream); + +impl TcpStream { + pub async fn connect(addr: SocketAddr, opts: &ConnectOpts) -> io::Result { + let socket = match addr { + SocketAddr::V4(..) => TcpSocket::new_v4()?, + SocketAddr::V6(..) => TcpSocket::new_v6()?, + }; + + set_common_sockopt_for_connect(addr, &socket, opts)?; + + let stream = socket.connect(addr).await?; + set_common_sockopt_after_connect(&stream, opts)?; + Ok(TcpStream(stream)) + } +} + +impl Deref for TcpStream { + type Target = TokioTcpStream; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for TcpStream { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl AsyncRead for TcpStream { + fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + self.project().0.poll_read(cx, buf) + } +} + +impl AsyncWrite for TcpStream { + fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { + self.project().0.poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().0.poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().0.poll_shutdown(cx) + } +} + +/// Create a `UdpSocket` for connecting to `addr` +#[inline(always)] +pub async fn create_outbound_udp_socket(af: AddrFamily, config: &ConnectOpts) -> io::Result { + let bind_addr = match (af, config.bind_local_addr) { + (AddrFamily::Ipv4, Some(IpAddr::V4(ip))) => SocketAddr::new(ip.into(), 0), + (AddrFamily::Ipv6, Some(IpAddr::V6(ip))) => SocketAddr::new(ip.into(), 0), + (AddrFamily::Ipv4, ..) => SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0), + (AddrFamily::Ipv6, ..) => SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0), + }; + + UdpSocket::bind(bind_addr).await +} + +pub fn set_tcp_fastopen(_: &S) -> io::Result<()> { + let err = io::Error::new(ErrorKind::Other, "TFO is not supported in this platform"); + Err(err) +} diff --git a/crates/shadowsocks/src/net/sys/windows/mod.rs b/crates/shadowsocks/src/net/sys/windows/mod.rs new file mode 100644 index 000000000000..96d272b5f247 --- /dev/null +++ b/crates/shadowsocks/src/net/sys/windows/mod.rs @@ -0,0 +1,471 @@ +use std::{ + io::{self, ErrorKind}, + mem, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpStream as StdTcpStream}, + ops::{Deref, DerefMut}, + os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket}, + pin::Pin, + ptr, + task::{self, Poll}, +}; + +use futures::ready; +use log::{debug, error, warn}; +use once_cell::sync::Lazy; +use pin_project::pin_project; +use socket2::{Domain, Protocol, SockAddr, Socket, Type}; +use tokio::{ + io::{AsyncRead, AsyncWrite, ReadBuf}, + net::{TcpSocket, TcpStream as TokioTcpStream, UdpSocket}, +}; +use winapi::{ + ctypes::{c_char, c_int}, + shared::{ + minwindef::{BOOL, DWORD, FALSE, LPDWORD, LPVOID, TRUE}, + winerror::ERROR_IO_PENDING, + ws2def::{ + ADDRESS_FAMILY, + AF_INET, + AF_INET6, + IPPROTO_TCP, + SIO_GET_EXTENSION_FUNCTION_POINTER, + SOCKADDR, + SOCKADDR_IN, + }, + }, + um::{ + minwinbase::{LPOVERLAPPED, OVERLAPPED}, + mswsock::{LPFN_CONNECTEX, SIO_UDP_CONNRESET, SO_UPDATE_CONNECT_CONTEXT, WSAID_CONNECTEX}, + winnt::PVOID, + winsock2::{ + bind, + closesocket, + setsockopt, + socket, + WSAGetLastError, + WSAGetOverlappedResult, + WSAIoctl, + INVALID_SOCKET, + SOCKET, + SOCKET_ERROR, + SOCK_STREAM, + SOL_SOCKET, + WSA_IO_INCOMPLETE, + }, + }, +}; + +use crate::net::{ + sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect}, + AddrFamily, + ConnectOpts, +}; + +// ws2ipdef.h +// FIXME: Use winapi's definition if issue resolved +// https://github.com/retep998/winapi-rs/issues/856 +const TCP_FASTOPEN: DWORD = 15; + +static PFN_CONNECTEX_OPT: Lazy = Lazy::new(|| unsafe { + let socket = socket(AF_INET, SOCK_STREAM, 0); + if socket == INVALID_SOCKET { + return None; + } + + let mut guid = WSAID_CONNECTEX; + let mut num_bytes: DWORD = 0; + + let mut connectex: LPFN_CONNECTEX = None; + + let ret = WSAIoctl( + socket, + SIO_GET_EXTENSION_FUNCTION_POINTER, + &mut guid as *mut _ as LPVOID, + mem::size_of_val(&guid) as DWORD, + &mut connectex as *mut _ as LPVOID, + mem::size_of_val(&connectex) as DWORD, + &mut num_bytes as *mut _, + ptr::null_mut(), + None, + ); + + if ret != 0 { + let err = WSAGetLastError(); + let e = io::Error::from_raw_os_error(err); + + warn!("Failed to get ConnectEx function from WSA extension, error: {}", e); + } + + let _ = closesocket(socket); + + connectex +}); + +enum TcpStreamState { + Connected, + FastOpenConnect(SocketAddr), + FastOpenConnecting(Box), +} + +// unsafe: OVERLAPPED can be sent between threads +unsafe impl Send for TcpStreamState {} +unsafe impl Sync for TcpStreamState {} + +/// A `TcpStream` that supports TFO (TCP Fast Open) +#[pin_project] +pub struct TcpStream { + #[pin] + inner: TokioTcpStream, + state: TcpStreamState, +} + +impl TcpStream { + pub async fn connect(addr: SocketAddr, opts: &ConnectOpts) -> io::Result { + let socket = match addr { + SocketAddr::V4(..) => TcpSocket::new_v4()?, + SocketAddr::V6(..) => TcpSocket::new_v6()?, + }; + + set_common_sockopt_for_connect(addr, &socket, opts)?; + + if !opts.tcp.fastopen { + // If TFO is not enabled, it just works like a normal TcpStream + let stream = socket.connect(addr).await?; + set_common_sockopt_after_connect(&stream, opts)?; + + return Ok(TcpStream { + inner: stream, + state: TcpStreamState::Connected, + }); + } + + let sock = socket.as_raw_socket() as SOCKET; + + unsafe { + // TCP_FASTOPEN was supported since Windows 10 + + // Enable TCP_FASTOPEN option + + let enable: DWORD = 1; + + let ret = setsockopt( + sock, + IPPROTO_TCP as c_int, + TCP_FASTOPEN as c_int, + &enable as *const _ as *const c_char, + mem::size_of_val(&enable) as c_int, + ); + + if ret == SOCKET_ERROR { + let err = io::Error::from_raw_os_error(WSAGetLastError()); + error!("set TCP_FASTOPEN error: {}", err); + return Err(err); + } + + if opts.bind_local_addr.is_none() { + // Bind to a dummy address (required) + let mut dummy_addr: SOCKADDR_IN = mem::zeroed(); + match addr.ip() { + IpAddr::V4(..) => dummy_addr.sin_family = AF_INET as ADDRESS_FAMILY, + IpAddr::V6(..) => dummy_addr.sin_family = AF_INET6 as ADDRESS_FAMILY, + } + + let ret = bind( + sock, + &dummy_addr as *const _ as *const SOCKADDR, + mem::size_of_val(&dummy_addr) as c_int, + ); + + if ret == SOCKET_ERROR { + let err = WSAGetLastError(); + return Err(io::Error::from_raw_os_error(err)); + } + } + } + + let stream = TokioTcpStream::from_std(unsafe { StdTcpStream::from_raw_socket(socket.into_raw_socket()) })?; + set_common_sockopt_after_connect(&stream, opts)?; + + Ok(TcpStream { + inner: stream, + state: TcpStreamState::FastOpenConnect(addr), + }) + } +} + +impl Deref for TcpStream { + type Target = TokioTcpStream; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for TcpStream { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl AsyncRead for TcpStream { + fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + self.project().inner.poll_read(cx, buf) + } +} + +fn set_update_connect_context(sock: SOCKET) -> io::Result<()> { + unsafe { + // Make getpeername work + // https://docs.microsoft.com/en-us/windows/win32/api/mswsock/nc-mswsock-lpfn_connectex + let ret = setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, ptr::null(), 0); + if ret == SOCKET_ERROR { + let err = WSAGetLastError(); + return Err(io::Error::from_raw_os_error(err)); + } + } + + Ok(()) +} + +impl AsyncWrite for TcpStream { + fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { + let this = self.project(); + + loop { + match this.state { + TcpStreamState::Connected => { + return this.inner.poll_write(cx, buf); + } + TcpStreamState::FastOpenConnect(addr) => { + unsafe { + // https://docs.microsoft.com/en-us/windows/win32/api/mswsock/nc-mswsock-lpfn_connectex + let connect_ex = PFN_CONNECTEX_OPT + .expect("LPFN_CONNECTEX function doesn't exist. It is only supported after Windows 10"); + let saddr = SockAddr::from(*addr); + + let sock = this.inner.as_raw_socket() as SOCKET; + + let mut overlapped: Box = Box::new(mem::zeroed()); + + let mut bytes_sent: DWORD = 0; + let ret: BOOL = connect_ex( + sock, + saddr.as_ptr(), + saddr.len() as c_int, + buf.as_ptr() as PVOID, + buf.len() as DWORD, + &mut bytes_sent as LPDWORD, + overlapped.as_mut() as LPOVERLAPPED, + ); + + if ret == TRUE { + // Connected successfully. + + // Make getpeername() works + set_update_connect_context(sock)?; + + debug_assert!(bytes_sent as usize <= buf.len()); + + *(this.state) = TcpStreamState::Connected; + return Ok(bytes_sent as usize).into(); + } + + let err = WSAGetLastError(); + if err != ERROR_IO_PENDING as c_int { + return Err(io::Error::from_raw_os_error(err)).into(); + } + + // ConnectEx pending (ERROR_IO_PENDING), check later in FastOpenConnecting + *(this.state) = TcpStreamState::FastOpenConnecting(overlapped); + } + } + TcpStreamState::FastOpenConnecting(ref mut overlapped) => { + // Wait until socket is writable + ready!(this.inner.poll_write_ready(cx))?; + + unsafe { + let sock = this.inner.as_raw_socket() as SOCKET; + + let mut bytes_sent: DWORD = 0; + let mut flags: DWORD = 0; + + // Fetch ConnectEx's result in a non-blocking way. + let ret: BOOL = WSAGetOverlappedResult( + sock, + overlapped.as_mut() as LPOVERLAPPED, + &mut bytes_sent as LPDWORD, + FALSE, // fWait = false, non-blocking, returns WSA_IO_INCOMPLETE + &mut flags as LPDWORD, + ); + + if ret == TRUE { + // Get ConnectEx's result successfully. Socket is connected + + // Make getpeername() works + set_update_connect_context(sock)?; + + debug_assert!(bytes_sent as usize <= buf.len()); + + *(this.state) = TcpStreamState::Connected; + return Ok(bytes_sent as usize).into(); + } + + let err = WSAGetLastError(); + if err == WSA_IO_INCOMPLETE { + // ConnectEx is still not connected. Wait for the next round + } else { + return Err(io::Error::from_raw_os_error(err)).into(); + } + } + } + } + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().inner.poll_shutdown(cx) + } +} + +/// Enable `TCP_FASTOPEN` +/// +/// Program borrowed from +/// https://social.msdn.microsoft.com/Forums/en-US/94d1fe8e-4f17-4b28-89eb-1ac776a2e134/how-to-create-tcp-fast-open-connections-with-winsock-?forum=windowsgeneraldevelopmentissues +/// +/// TCP_FASTOPEN document +/// https://docs.microsoft.com/en-us/windows/win32/winsock/ipproto-tcp-socket-options +/// +/// TCP_FASTOPEN is supported since Windows 10 +pub fn set_tcp_fastopen(socket: &S) -> io::Result<()> { + let enable: DWORD = 1; + + unsafe { + let ret = setsockopt( + socket.as_raw_socket() as SOCKET, + IPPROTO_TCP as c_int, + TCP_FASTOPEN as c_int, + &enable as *const _ as *const c_char, + mem::size_of_val(&enable) as c_int, + ); + + if ret == SOCKET_ERROR { + let err = io::Error::from_raw_os_error(WSAGetLastError()); + error!("set TCP_FASTOPEN error: {}", err); + return Err(err); + } + } + + Ok(()) +} + +fn disable_connection_reset(socket: &UdpSocket) -> io::Result<()> { + let handle = socket.as_raw_socket() as SOCKET; + + unsafe { + // Ignoring UdpSocket's WSAECONNRESET error + // https://github.com/shadowsocks/shadowsocks-rust/issues/179 + // https://stackoverflow.com/questions/30749423/is-winsock-error-10054-wsaeconnreset-normal-with-udp-to-from-localhost + // + // This is because `UdpSocket::recv_from` may return WSAECONNRESET + // if you called `UdpSocket::send_to` a destination that is not existed (may be closed). + // + // It is not an error. Could be ignored completely. + // We have to ignore it here because it will crash the server. + + let mut bytes_returned: DWORD = 0; + let mut enable: BOOL = FALSE; + + let ret = WSAIoctl( + handle, + SIO_UDP_CONNRESET, + &mut enable as *mut _ as LPVOID, + mem::size_of_val(&enable) as DWORD, + ptr::null_mut(), + 0, + &mut bytes_returned as *mut _ as LPDWORD, + ptr::null_mut(), + None, + ); + + if ret == SOCKET_ERROR { + use std::io::Error; + + // Error occurs + let err_code = WSAGetLastError(); + return Err(Error::from_raw_os_error(err_code)); + } + } + + Ok(()) +} + +/// Create a `UdpSocket` binded to `addr` +/// +/// It also disables `WSAECONNRESET` for UDP socket +pub async fn create_inbound_udp_socket(addr: &SocketAddr) -> io::Result { + let set_dual_stack = if let SocketAddr::V6(ref v6) = *addr { + v6.ip().is_unspecified() + } else { + false + }; + + let socket = if !set_dual_stack { + UdpSocket::bind(addr).await? + } else { + let socket = Socket::new(Domain::for_address(*addr), Type::DGRAM, Some(Protocol::UDP))?; + + if let Err(err) = socket.set_only_v6(false) { + warn!("failed to set IPV6_V6ONLY: false for listener, error: {}", err); + + // This is not a fatal error, just warn and skip + } + + let saddr = SockAddr::from(*addr); + + match socket.bind(&saddr) { + Ok(..) => {} + Err(ref err) if err.kind() == ErrorKind::AddrInUse => { + // This is probably 0.0.0.0 with the same port has already been occupied + debug!( + "0.0.0.0:{} may have already been occupied, retry with IPV6_V6ONLY", + addr.port() + ); + + if let Err(err) = socket.set_only_v6(true) { + warn!("failed to set IPV6_V6ONLY: true for listener, error: {}", err); + + // This is not a fatal error, just warn and skip + } + socket.bind(&saddr)?; + } + Err(err) => return Err(err), + } + + // UdpSocket::from_std requires socket to be non-blocked + socket.set_nonblocking(true)?; + UdpSocket::from_std(socket.into())? + }; + + disable_connection_reset(&socket)?; + Ok(socket) +} + +/// Create a `UdpSocket` for connecting to `addr` +#[inline(always)] +pub async fn create_outbound_udp_socket(af: AddrFamily, opts: &ConnectOpts) -> io::Result { + let bind_addr = match (af, opts.bind_local_addr) { + (AddrFamily::Ipv4, Some(IpAddr::V4(ip))) => SocketAddr::new(ip.into(), 0), + (AddrFamily::Ipv6, Some(IpAddr::V6(ip))) => SocketAddr::new(ip.into(), 0), + (AddrFamily::Ipv4, ..) => SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0), + (AddrFamily::Ipv6, ..) => SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0), + }; + + let socket = UdpSocket::bind(bind_addr).await?; + disable_connection_reset(&socket)?; + + Ok(socket) +} diff --git a/crates/shadowsocks/src/net/tcp.rs b/crates/shadowsocks/src/net/tcp.rs index 1fcf566c3558..e210553743fb 100644 --- a/crates/shadowsocks/src/net/tcp.rs +++ b/crates/shadowsocks/src/net/tcp.rs @@ -21,22 +21,23 @@ use tokio::{ net::{TcpListener as TokioTcpListener, TcpSocket, TcpStream as TokioTcpStream}, }; -use crate::{ - context::Context, - relay::{socks5::Address, sys::tcp_stream_connect}, - ServerAddr, -}; +use crate::{context::Context, relay::socks5::Address, ServerAddr}; -use super::{AcceptOpts, ConnectOpts}; +use super::{ + sys::{set_tcp_fastopen, TcpStream as SysTcpStream}, + AcceptOpts, + ConnectOpts, +}; /// TcpStream for outbound connections #[pin_project] -pub struct TcpStream(#[pin] TokioTcpStream); +pub struct TcpStream(#[pin] SysTcpStream); impl TcpStream { /// Connects to address pub async fn connect_with_opts(addr: &SocketAddr, opts: &ConnectOpts) -> io::Result { - tcp_stream_connect(addr, opts).await.map(TcpStream) + // tcp_stream_connect(addr, opts).await.map(TcpStream) + SysTcpStream::connect(*addr, opts).await.map(TcpStream) } /// Connects shadowsocks server @@ -46,10 +47,10 @@ impl TcpStream { opts: &ConnectOpts, ) -> io::Result { let stream = match *addr { - ServerAddr::SocketAddr(ref addr) => tcp_stream_connect(addr, opts).await?, + ServerAddr::SocketAddr(ref addr) => SysTcpStream::connect(*addr, opts).await?, ServerAddr::DomainName(ref domain, port) => { lookup_then!(&context, &domain, port, |addr| { - tcp_stream_connect(&addr, opts).await + SysTcpStream::connect(addr, opts).await })? .1 } @@ -65,10 +66,10 @@ impl TcpStream { opts: &ConnectOpts, ) -> io::Result { let stream = match *addr { - Address::SocketAddress(ref addr) => tcp_stream_connect(addr, opts).await?, + Address::SocketAddress(ref addr) => SysTcpStream::connect(*addr, opts).await?, Address::DomainNameAddress(ref domain, port) => { lookup_then!(&context, &domain, port, |addr| { - tcp_stream_connect(&addr, opts).await + SysTcpStream::connect(addr, opts).await })? .1 } @@ -112,18 +113,6 @@ impl AsyncWrite for TcpStream { } } -impl From for TcpStream { - fn from(s: TokioTcpStream) -> TcpStream { - TcpStream(s) - } -} - -impl From for TokioTcpStream { - fn from(s: TcpStream) -> TokioTcpStream { - s.0 - } -} - /// `TcpListener` for accepting inbound connections pub struct TcpListener { inner: TokioTcpListener, @@ -133,25 +122,28 @@ pub struct TcpListener { impl TcpListener { /// Creates a new TcpListener, which will be bound to the specified address. pub async fn bind_with_opts(addr: &SocketAddr, accept_opts: AcceptOpts) -> io::Result { + let socket = match *addr { + SocketAddr::V4(..) => TcpSocket::new_v4()?, + SocketAddr::V6(..) => TcpSocket::new_v6()?, + }; + + // On platforms with Berkeley-derived sockets, this allows to quickly + // rebind a socket, without needing to wait for the OS to clean up the + // previous one. + // + // On Windows, this allows rebinding sockets which are actively in use, + // which allows “socket hijacking”, so we explicitly don't set it here. + // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse + #[cfg(not(windows))] + socket.set_reuseaddr(true)?; + let set_dual_stack = if let SocketAddr::V6(ref v6) = *addr { v6.ip().is_unspecified() } else { false }; - if !set_dual_stack { - let inner = TokioTcpListener::bind(addr).await?; - Ok(TcpListener { inner, accept_opts }) - } else { - let socket = match *addr { - SocketAddr::V4(..) => TcpSocket::new_v4()?, - SocketAddr::V6(..) => TcpSocket::new_v6()?, - }; - - // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse - #[cfg(not(windows))] - socket.set_reuseaddr(true)?; - + if set_dual_stack { // Set to DUAL STACK mode by default. // WARNING: This would fail if you want to start another program listening on the same port. // @@ -191,11 +183,20 @@ impl TcpListener { } Err(err) => return Err(err), } + } else { + socket.bind(*addr)?; + } - // mio's default backlog is 1024 - let inner = socket.listen(1024)?; - Ok(TcpListener { inner, accept_opts }) + // mio's default backlog is 1024 + let inner = socket.listen(1024)?; + + // Enable TFO if supported + // macos requires TCP_FASTOPEN to be set after listen(), but other platform doesn't have this constraint + if accept_opts.tcp.fastopen { + set_tcp_fastopen(&inner)?; } + + Ok(TcpListener { inner, accept_opts }) } /// Create a `TcpListener` from tokio's `TcpListener` diff --git a/crates/shadowsocks/src/net/udp.rs b/crates/shadowsocks/src/net/udp.rs index 0e77c15a2bed..769ffb90e6b2 100644 --- a/crates/shadowsocks/src/net/udp.rs +++ b/crates/shadowsocks/src/net/udp.rs @@ -8,16 +8,13 @@ use std::{ use pin_project::pin_project; -use crate::{ - context::Context, - relay::{ - socks5::Address, - sys::{create_inbound_udp_socket, create_outbound_udp_socket}, - }, - ServerAddr, -}; +use crate::{context::Context, relay::socks5::Address, ServerAddr}; -use super::{AddrFamily, ConnectOpts}; +use super::{ + sys::{create_inbound_udp_socket, create_outbound_udp_socket}, + AddrFamily, + ConnectOpts, +}; /// Wrappers for outbound `UdpSocket` #[pin_project] diff --git a/crates/shadowsocks/src/relay/mod.rs b/crates/shadowsocks/src/relay/mod.rs index 1b203f1e0583..cac1b18ce51f 100644 --- a/crates/shadowsocks/src/relay/mod.rs +++ b/crates/shadowsocks/src/relay/mod.rs @@ -3,6 +3,5 @@ pub use self::socks5::Address; pub mod socks5; -pub(crate) mod sys; pub mod tcprelay; pub mod udprelay; diff --git a/crates/shadowsocks/src/relay/sys/mod.rs b/crates/shadowsocks/src/relay/sys/mod.rs deleted file mode 100644 index 0f9a1a2ac768..000000000000 --- a/crates/shadowsocks/src/relay/sys/mod.rs +++ /dev/null @@ -1,11 +0,0 @@ -use cfg_if::cfg_if; - -cfg_if! { - if #[cfg(unix)] { - mod unix; - pub use self::unix::*; - } else if #[cfg(windows)] { - mod windows; - pub use self::windows::*; - } -} diff --git a/crates/shadowsocks/src/relay/sys/unix/mod.rs b/crates/shadowsocks/src/relay/sys/unix/mod.rs deleted file mode 100644 index 99e12f49f69b..000000000000 --- a/crates/shadowsocks/src/relay/sys/unix/mod.rs +++ /dev/null @@ -1,335 +0,0 @@ -#[cfg(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "ios"))] -use std::os::unix::io::AsRawFd; -use std::{ - io::{self, Error, ErrorKind}, - mem, - net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, -}; -#[cfg(any(target_os = "android"))] -use std::{os::unix::io::RawFd, path::Path}; -#[cfg(any(target_os = "macos", target_os = "ios"))] -use std::{os::unix::prelude::OsStrExt, ptr}; - -use cfg_if::cfg_if; -use log::{debug, warn}; -use socket2::{Domain, Protocol, SockAddr, Socket, Type}; -use tokio::net::{TcpSocket, TcpStream, UdpSocket}; - -use crate::net::{AddrFamily, ConnectOpts}; - -cfg_if! { - if #[cfg(target_os = "android")] { - mod uds; - - /// This is a RPC for Android to `protect()` socket for connecting to remote servers - /// - /// https://developer.android.com/reference/android/net/VpnService#protect(java.net.Socket) - /// - /// More detail could be found in [shadowsocks-android](https://github.com/shadowsocks/shadowsocks-android) project. - async fn protect>(protect_path: P, fd: RawFd) -> io::Result<()> { - use tokio::io::AsyncReadExt; - - let mut stream = self::uds::UnixStream::connect(protect_path).await?; - - // send fds - let dummy: [u8; 1] = [1]; - let fds: [RawFd; 1] = [fd]; - stream.send_with_fd(&dummy, &fds).await?; - - // receive the return value - let mut response = [0; 1]; - stream.read_exact(&mut response).await?; - - if response[0] == 0xFF { - return Err(Error::new(ErrorKind::Other, "protect() failed")); - } - - Ok(()) - } - } -} - -/// create a new TCP stream -#[inline(always)] -#[allow(unused_variables)] -pub async fn tcp_stream_connect(saddr: &SocketAddr, config: &ConnectOpts) -> io::Result { - let socket = match *saddr { - SocketAddr::V4(..) => TcpSocket::new_v4()?, - SocketAddr::V6(..) => TcpSocket::new_v6()?, - }; - - // Any traffic to localhost should not be protected - // This is a workaround for VPNService - #[cfg(target_os = "android")] - if !saddr.ip().is_loopback() { - use std::time::Duration; - use tokio::time; - - if let Some(ref path) = config.vpn_protect_path { - // RPC calls to `VpnService.protect()` - // Timeout in 3 seconds like shadowsocks-libev - match time::timeout(Duration::from_secs(3), protect(path, socket.as_raw_fd())).await { - Ok(Ok(..)) => {} - Ok(Err(err)) => return Err(err), - Err(..) => return Err(Error::new(ErrorKind::TimedOut, "protect() timeout")), - } - } - } - - // Set SO_MARK for mark-based routing on Linux (since 2.6.25) - // NOTE: This will require CAP_NET_ADMIN capability (root in most cases) - #[cfg(any(target_os = "linux", target_os = "android"))] - if let Some(mark) = config.fwmark { - let ret = unsafe { - libc::setsockopt( - socket.as_raw_fd(), - libc::SOL_SOCKET, - libc::SO_MARK, - &mark as *const _ as *const _, - mem::size_of_val(&mark) as libc::socklen_t, - ) - }; - if ret != 0 { - return Err(Error::last_os_error()); - } - } - - // Set SO_BINDTODEVICE for binding to a specific interface - #[cfg(any(target_os = "linux", target_os = "android"))] - if let Some(ref iface) = config.bind_interface { - use nix::sys::socket::{setsockopt, sockopt::BindToDevice}; - if let Err(err) = setsockopt::(socket.as_raw_fd(), BindToDevice, iface) { - return match err.as_errno() { - Some(errno) => Err(errno.into()), - None => Err(Error::new(ErrorKind::Other, err)), - }; - } - } - - // Set IP_BOUND_IF for BSD-like - #[cfg(any(target_os = "macos", target_os = "ios"))] - if let Some(ref iface) = config.bind_interface { - const IP_BOUND_IF: libc::c_int = 25; // bsd/netinet/in.h - const IPV6_BOUND_IF: libc::c_int = 125; // bsd/netinet6/in6.h - - unsafe { - let mut ciface = [0u8; libc::IFNAMSIZ]; - if iface.len() >= ciface.len() { - return Err(ErrorKind::InvalidInput.into()); - } - - ptr::copy_nonoverlapping(iface.as_bytes().as_ptr(), ciface.as_mut_ptr(), iface.len()); - - let index = libc::if_nametoindex(ciface.as_ptr() as *const libc::c_char); - if index == 0 { - return Err(Error::last_os_error()); - } - - let ret = match *saddr { - SocketAddr::V4(..) => libc::setsockopt( - socket.as_raw_fd(), - libc::IPPROTO_IP, - IP_BOUND_IF, - &index as *const _ as *const _, - mem::size_of_val(&index) as libc::socklen_t, - ), - SocketAddr::V6(..) => libc::setsockopt( - socket.as_raw_fd(), - libc::IPPROTO_IPV6, - IPV6_BOUND_IF, - &index as *const _ as *const _, - mem::size_of_val(&index) as libc::socklen_t, - ), - }; - - if ret < 0 { - return Err(Error::last_os_error()); - } - } - } - - // Binds to IP address - if let Some(ip) = config.bind_local_addr { - match (ip, saddr.ip()) { - (IpAddr::V4(..), IpAddr::V4(..)) => { - socket.bind(SocketAddr::new(ip, 0))?; - } - (IpAddr::V6(..), IpAddr::V6(..)) => { - socket.bind(SocketAddr::new(ip, 0))?; - } - _ => {} - } - } - - // Set `SO_SNDBUF` - if let Some(buf_size) = config.tcp.send_buffer_size { - socket.set_send_buffer_size(buf_size)?; - } - - // Set `SO_RCVBUF` - if let Some(buf_size) = config.tcp.recv_buffer_size { - socket.set_recv_buffer_size(buf_size)?; - } - - // it's important that the socket is protected before connecting - let stream = socket.connect(*saddr).await?; - - if config.tcp.nodelay { - stream.set_nodelay(true)?; - } - - Ok(stream) -} - -/// Create a `UdpSocket` for connecting to `addr` -#[inline(always)] -#[allow(unused_variables)] -pub async fn create_outbound_udp_socket(af: AddrFamily, config: &ConnectOpts) -> io::Result { - let bind_addr = match (af, config.bind_local_addr) { - (AddrFamily::Ipv4, Some(IpAddr::V4(ip))) => SocketAddr::new(ip.into(), 0), - (AddrFamily::Ipv6, Some(IpAddr::V6(ip))) => SocketAddr::new(ip.into(), 0), - (AddrFamily::Ipv4, ..) => SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0), - (AddrFamily::Ipv6, ..) => SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0), - }; - - let socket = UdpSocket::bind(bind_addr).await?; - - // Any traffic except localhost should be protected - // This is a workaround for VPNService - #[cfg(target_os = "android")] - { - use std::time::Duration; - use tokio::time; - - if let Some(ref path) = config.vpn_protect_path { - // RPC calls to `VpnService.protect()` - // Timeout in 3 seconds like shadowsocks-libev - match time::timeout(Duration::from_secs(3), protect(path, socket.as_raw_fd())).await { - Ok(Ok(..)) => {} - Ok(Err(err)) => return Err(err), - Err(..) => return Err(Error::new(ErrorKind::TimedOut, "protect() timeout")), - } - } - } - - // Set SO_MARK for mark-based routing on Linux (since 2.6.25) - // NOTE: This will require CAP_NET_ADMIN capability (root in most cases) - #[cfg(any(target_os = "linux", target_os = "android"))] - if let Some(mark) = config.fwmark { - let ret = unsafe { - libc::setsockopt( - socket.as_raw_fd(), - libc::SOL_SOCKET, - libc::SO_MARK, - &mark as *const _ as *const _, - mem::size_of_val(&mark) as libc::socklen_t, - ) - }; - if ret != 0 { - return Err(Error::last_os_error()); - } - } - - // Set SO_BINDTODEVICE for binding to a specific interface - #[cfg(any(target_os = "linux", target_os = "android"))] - if let Some(ref iface) = config.bind_interface { - use nix::sys::socket::{setsockopt, sockopt::BindToDevice}; - if let Err(err) = setsockopt::(socket.as_raw_fd(), BindToDevice, iface) { - return match err.as_errno() { - Some(errno) => Err(errno.into()), - None => Err(Error::new(ErrorKind::Other, err)), - }; - } - } - - // Set IP_BOUND_IF for BSD-like - #[cfg(any(target_os = "macos", target_os = "ios"))] - if let Some(ref iface) = config.bind_interface { - const IP_BOUND_IF: libc::c_int = 25; // bsd/netinet/in.h - const IPV6_BOUND_IF: libc::c_int = 125; // bsd/netinet6/in6.h - - unsafe { - let mut ciface = [0u8; libc::IFNAMSIZ]; - if iface.len() >= ciface.len() { - return Err(ErrorKind::InvalidInput.into()); - } - - ptr::copy_nonoverlapping(iface.as_bytes().as_ptr(), ciface.as_mut_ptr(), iface.len()); - - let index = libc::if_nametoindex(ciface.as_ptr() as *const libc::c_char); - if index == 0 { - return Err(Error::last_os_error()); - } - - let ret = match bind_addr { - SocketAddr::V4(..) => libc::setsockopt( - socket.as_raw_fd(), - libc::IPPROTO_IP, - IP_BOUND_IF, - &index as *const _ as *const _, - mem::size_of_val(&index) as libc::socklen_t, - ), - SocketAddr::V6(..) => libc::setsockopt( - socket.as_raw_fd(), - libc::IPPROTO_IPV6, - IPV6_BOUND_IF, - &index as *const _ as *const _, - mem::size_of_val(&index) as libc::socklen_t, - ), - }; - - if ret < 0 { - return Err(Error::last_os_error()); - } - } - } - - Ok(socket) -} - -/// Create a `UdpSocket` binded to `addr` -#[inline(always)] -pub async fn create_inbound_udp_socket(addr: &SocketAddr) -> io::Result { - let set_dual_stack = if let SocketAddr::V6(ref v6) = *addr { - v6.ip().is_unspecified() - } else { - false - }; - - if !set_dual_stack { - UdpSocket::bind(addr).await - } else { - let socket = Socket::new(Domain::for_address(*addr), Type::DGRAM, Some(Protocol::UDP))?; - - if let Err(err) = socket.set_only_v6(false) { - warn!("failed to set IPV6_V6ONLY: false for listener, error: {}", err); - - // This is not a fatal error, just warn and skip - } - - let saddr = SockAddr::from(*addr); - - match socket.bind(&saddr) { - Ok(..) => {} - Err(ref err) if err.kind() == ErrorKind::AddrInUse => { - // This is probably 0.0.0.0 with the same port has already been occupied - debug!( - "0.0.0.0:{} may have already been occupied, retry with IPV6_V6ONLY", - addr.port() - ); - - if let Err(err) = socket.set_only_v6(true) { - warn!("failed to set IPV6_V6ONLY: true for listener, error: {}", err); - - // This is not a fatal error, just warn and skip - } - socket.bind(&saddr)?; - } - Err(err) => return Err(err), - } - - // UdpSocket::from_std requires socket to be non-blocked - socket.set_nonblocking(true)?; - UdpSocket::from_std(socket.into()) - } -} diff --git a/crates/shadowsocks/src/relay/sys/windows/mod.rs b/crates/shadowsocks/src/relay/sys/windows/mod.rs deleted file mode 100644 index a49429b91094..000000000000 --- a/crates/shadowsocks/src/relay/sys/windows/mod.rs +++ /dev/null @@ -1,161 +0,0 @@ -use std::{ - io::{self, ErrorKind}, - mem, - net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, - os::windows::io::AsRawSocket, - ptr, -}; - -use log::{debug, warn}; -use socket2::{Domain, Protocol, SockAddr, Socket, Type}; -use tokio::net::{TcpSocket, TcpStream, UdpSocket}; -use winapi::{ - shared::minwindef::{BOOL, DWORD, FALSE, LPDWORD, LPVOID}, - um::{ - mswsock::SIO_UDP_CONNRESET, - winsock2::{WSAGetLastError, WSAIoctl, SOCKET, SOCKET_ERROR}, - }, -}; - -use crate::net::{AddrFamily, ConnectOpts}; - -fn disable_connection_reset(socket: &UdpSocket) -> io::Result<()> { - let handle = socket.as_raw_socket() as SOCKET; - - unsafe { - // Ignoring UdpSocket's WSAECONNRESET error - // https://github.com/shadowsocks/shadowsocks-rust/issues/179 - // https://stackoverflow.com/questions/30749423/is-winsock-error-10054-wsaeconnreset-normal-with-udp-to-from-localhost - // - // This is because `UdpSocket::recv_from` may return WSAECONNRESET - // if you called `UdpSocket::send_to` a destination that is not existed (may be closed). - // - // It is not an error. Could be ignored completely. - // We have to ignore it here because it will crash the server. - - let mut bytes_returned: DWORD = 0; - let mut enable: BOOL = FALSE; - - let ret = WSAIoctl( - handle, - SIO_UDP_CONNRESET, - &mut enable as *mut _ as LPVOID, - mem::size_of_val(&enable) as DWORD, - ptr::null_mut(), - 0, - &mut bytes_returned as *mut _ as LPDWORD, - ptr::null_mut(), - None, - ); - - if ret == SOCKET_ERROR { - use std::io::Error; - - // Error occurs - let err_code = WSAGetLastError(); - return Err(Error::from_raw_os_error(err_code)); - } - } - - Ok(()) -} - -/// Create a `UdpSocket` binded to `addr` -/// -/// It also disables `WSAECONNRESET` for UDP socket -pub async fn create_inbound_udp_socket(addr: &SocketAddr) -> io::Result { - let set_dual_stack = if let SocketAddr::V6(ref v6) = *addr { - v6.ip().is_unspecified() - } else { - false - }; - - let socket = if !set_dual_stack { - UdpSocket::bind(addr).await? - } else { - let socket = Socket::new(Domain::for_address(*addr), Type::DGRAM, Some(Protocol::UDP))?; - - if let Err(err) = socket.set_only_v6(false) { - warn!("failed to set IPV6_V6ONLY: false for listener, error: {}", err); - - // This is not a fatal error, just warn and skip - } - - let saddr = SockAddr::from(*addr); - - match socket.bind(&saddr) { - Ok(..) => {} - Err(ref err) if err.kind() == ErrorKind::AddrInUse => { - // This is probably 0.0.0.0 with the same port has already been occupied - debug!( - "0.0.0.0:{} may have already been occupied, retry with IPV6_V6ONLY", - addr.port() - ); - - if let Err(err) = socket.set_only_v6(true) { - warn!("failed to set IPV6_V6ONLY: true for listener, error: {}", err); - - // This is not a fatal error, just warn and skip - } - socket.bind(&saddr)?; - } - Err(err) => return Err(err), - } - - // UdpSocket::from_std requires socket to be non-blocked - socket.set_nonblocking(true)?; - UdpSocket::from_std(socket.into())? - }; - - disable_connection_reset(&socket)?; - Ok(socket) -} - -/// create a new TCP stream -#[inline(always)] -pub async fn tcp_stream_connect(saddr: &SocketAddr, opts: &ConnectOpts) -> io::Result { - let stream = if let Some(ip) = opts.bind_local_addr { - let socket = match *saddr { - SocketAddr::V4(..) => TcpSocket::new_v4()?, - SocketAddr::V6(..) => TcpSocket::new_v6()?, - }; - - // Binds to IP address - match (ip, saddr.ip()) { - (IpAddr::V4(..), IpAddr::V4(..)) => { - socket.bind(SocketAddr::new(ip, 0))?; - } - (IpAddr::V6(..), IpAddr::V6(..)) => { - socket.bind(SocketAddr::new(ip, 0))?; - } - _ => {} - } - - // it's important that the socket is binded before connecting - socket.connect(*saddr).await? - } else { - TcpStream::connect(saddr).await? - }; - - if opts.tcp.nodelay { - stream.set_nodelay(true)?; - } - - Ok(stream) -} - -/// Create a `UdpSocket` for connecting to `addr` -#[inline(always)] -pub async fn create_outbound_udp_socket(af: AddrFamily, opts: &ConnectOpts) -> io::Result { - let bind_addr = match (af, opts.bind_local_addr) { - (AddrFamily::Ipv4, Some(IpAddr::V4(ip))) => SocketAddr::new(ip.into(), 0), - (AddrFamily::Ipv6, Some(IpAddr::V6(ip))) => SocketAddr::new(ip.into(), 0), - (AddrFamily::Ipv4, ..) => SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0), - (AddrFamily::Ipv6, ..) => SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0), - }; - - let socket = UdpSocket::bind(bind_addr).await?; - disable_connection_reset(&socket)?; - - Ok(socket) -} diff --git a/crates/shadowsocks/src/relay/tcprelay/proxy_stream/client.rs b/crates/shadowsocks/src/relay/tcprelay/proxy_stream/client.rs index 2ebdd00cd0d8..8b55e775c550 100644 --- a/crates/shadowsocks/src/relay/tcprelay/proxy_stream/client.rs +++ b/crates/shadowsocks/src/relay/tcprelay/proxy_stream/client.rs @@ -13,7 +13,6 @@ use once_cell::sync::Lazy; use pin_project::pin_project; use tokio::{ io::{AsyncRead, AsyncWrite, ReadBuf}, - net::TcpStream, time, }; @@ -44,13 +43,13 @@ pub struct ProxyClientStream { static DEFAULT_CONNECT_OPTS: Lazy = Lazy::new(Default::default); -impl ProxyClientStream { +impl ProxyClientStream { /// Connect to target `addr` via shadowsocks' server configured by `svr_cfg` pub async fn connect( context: SharedContext, svr_cfg: &ServerConfig, addr: A, - ) -> io::Result> + ) -> io::Result> where A: Into
, { @@ -63,7 +62,7 @@ impl ProxyClientStream { svr_cfg: &ServerConfig, addr: A, opts: &ConnectOpts, - ) -> io::Result> + ) -> io::Result> where A: Into
, { @@ -84,7 +83,7 @@ where ) -> io::Result> where A: Into
, - F: FnOnce(TcpStream) -> S, + F: FnOnce(OutboundTcpStream) -> S, { ProxyClientStream::connect_with_opts_map(context, svr_cfg, addr, &DEFAULT_CONNECT_OPTS, map_fn).await } @@ -99,7 +98,7 @@ where ) -> io::Result> where A: Into
, - F: FnOnce(TcpStream) -> S, + F: FnOnce(OutboundTcpStream) -> S, { let stream = match svr_cfg.timeout() { Some(d) => { @@ -129,12 +128,7 @@ where opts ); - Ok(ProxyClientStream::from_stream( - context, - map_fn(stream.into()), - svr_cfg, - addr, - )) + Ok(ProxyClientStream::from_stream(context, map_fn(stream), svr_cfg, addr)) } /// Create a `ProxyClientStream` with a connected `stream` to a shadowsocks' server diff --git a/crates/shadowsocks/tests/tcp_tfo.rs b/crates/shadowsocks/tests/tcp_tfo.rs new file mode 100644 index 000000000000..058a01cd51ea --- /dev/null +++ b/crates/shadowsocks/tests/tcp_tfo.rs @@ -0,0 +1,104 @@ +#![cfg(any( + windows, + target_os = "linux", + target_os = "android", + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd" +))] + +use byte_string::ByteStr; +use futures::future; +use log::debug; +use shadowsocks::{ + config::ServerType, + context::Context, + crypto::v1::CipherKind, + net::{AcceptOpts, ConnectOpts}, + relay::{ + socks5::Address, + tcprelay::utils::{copy_from_encrypted, copy_to_encrypted}, + }, + ProxyClientStream, + ProxyListener, + ServerConfig, +}; +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + net::TcpStream, +}; + +#[tokio::test] +async fn tcp_tunnel_tfo() { + let _ = env_logger::try_init(); + + let svr_cfg = ServerConfig::new(("127.0.0.1", 41000), "?", CipherKind::NONE); + let svr_cfg_client = svr_cfg.clone(); + + tokio::spawn(async move { + let context = Context::new_shared(ServerType::Server); + + let mut accept_opts = AcceptOpts::default(); + accept_opts.tcp.fastopen = true; + + let listener = ProxyListener::bind_with_opts(context, &svr_cfg, accept_opts) + .await + .unwrap(); + + while let Ok((mut stream, peer_addr)) = listener.accept().await { + debug!("accepted {}", peer_addr); + + tokio::spawn(async move { + let addr = Address::read_from(&mut stream).await.unwrap(); + let remote = match addr { + Address::SocketAddress(a) => TcpStream::connect(a).await.unwrap(), + Address::DomainNameAddress(name, port) => TcpStream::connect((name.as_str(), port)).await.unwrap(), + }; + + let (mut lr, mut lw) = stream.into_split(); + let (mut rr, mut rw) = remote.into_split(); + + let l2r = copy_from_encrypted(CipherKind::NONE, &mut lr, &mut rw); + let r2l = copy_to_encrypted(CipherKind::NONE, &mut rr, &mut lw); + + tokio::pin!(l2r); + tokio::pin!(r2l); + + let _ = future::select(l2r, r2l).await; + }); + } + }); + + tokio::task::yield_now().await; + + let context = Context::new_shared(ServerType::Local); + + let mut connect_opts = ConnectOpts::default(); + connect_opts.tcp.fastopen = true; + + let mut client = ProxyClientStream::connect_with_opts( + context, + &svr_cfg_client, + ("www.example.com".to_owned(), 80), + &connect_opts, + ) + .await + .unwrap(); + + client + .write_all(b"GET / HTTP/1.0\r\nHost: www.example.com\r\nAccept: */*\r\nConnection: close\r\n\r\n") + .await + .unwrap(); + + let mut reader = BufReader::new(client); + + let mut buffer = Vec::new(); + reader.read_until(b'\n', &mut buffer).await.unwrap(); + + println!("{:?}", ByteStr::new(&buffer)); + + static HTTP_RESPONSE_STATUS: &[u8] = b"HTTP/1.0 200 OK\r\n"; + assert!(buffer.starts_with(HTTP_RESPONSE_STATUS)); +}