diff --git a/Cargo.lock b/Cargo.lock index fe16d68433e5..e997e07bd6ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -573,7 +573,7 @@ checksum = "3c1ad908cc71012b7bea4d0c53ba96a8cba9962f048fa68d143376143d863b7a" [[package]] name = "hyper" version = "0.14.0-dev" -source = "git+https://github.com/hyperium/hyper.git#1dd761c87de226261599ff2518fe9d231ba1c82d" +source = "git+https://github.com/hyperium/hyper.git#3b3077da1f891b09de18320d9f6ccf94f136943d" dependencies = [ "bytes 0.6.0", "futures-channel", @@ -990,9 +990,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7c6d9b8427445284a09c55be860a15855ab580a417ccad9da88f5a06787ced0" +checksum = "9ccb628cad4f84851442432c60ad8e1f607e29752d0bf072cbd0baf28aa34272" dependencies = [ "cfg-if 1.0.0", "instant", @@ -1526,9 +1526,9 @@ dependencies = [ [[package]] name = "signal-hook-registry" -version = "1.2.2" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce32ea0c6c56d5eacaeb814fbed9960547021d3edd010ded1425f180536b20ab" +checksum = "16f1d0fef1604ba8f7a073c7e701f213e056707210e9020af4528e0101ce11a6" dependencies = [ "libc", ] diff --git a/configs/iptables_tproxy.sh b/configs/iptables_tproxy.sh index 7b753a3cf880..2e1889c2b2d2 100644 --- a/configs/iptables_tproxy.sh +++ b/configs/iptables_tproxy.sh @@ -7,9 +7,9 @@ fi # Strategy Route ip -4 route add local 0/0 dev lo table 100 -ip -4 rule add fwmark 0x2333/0x2333 table 100 +ip -4 rule add fwmark 0x2333 table 100 #ip -6 route add local ::/0 dev lo table 100 -#ip -6 rule add fwmark 0x2333/0x2333 table 100 +#ip -6 rule add fwmark 0x2333 table 100 iptables -t mangle -N SS ip6tables -t mangle -N SS @@ -27,10 +27,10 @@ iptables -t mangle -A SS -d 240/4 -j RETURN #ip6tables -t mangle -A SS -d fe80::/10 -j RETURN # TPROXY TCP/UDP mark 0x2333 to port 60080 -iptables -t mangle -A SS -p udp -j TPROXY --on-port 60080 --tproxy-mark 0x2333/0x2333 -iptables -t mangle -A SS -p tcp -j TPROXY --on-port 60080 --tproxy-mark 0x2333/0x2333 -#ip6tables -t mangle -A SS -p udp -j TPROXY --on-port 60080 --tproxy-mark 0x2333/0x2333 -#ip6tables -t mangle -A SS -p tcp -j TPROXY --on-port 60080 --tproxy-mark 0x2333/0x2333 +iptables -t mangle -A SS -p udp -j TPROXY --on-port 60080 --tproxy-mark 0x2333 +iptables -t mangle -A SS -p tcp -j TPROXY --on-port 60080 --tproxy-mark 0x2333 +#ip6tables -t mangle -A SS -p udp -j TPROXY --on-port 60080 --tproxy-mark 0x2333 +#ip6tables -t mangle -A SS -p tcp -j TPROXY --on-port 60080 --tproxy-mark 0x2333 # Apply iptables -t mangle -A PREROUTING -j SS @@ -57,10 +57,10 @@ iptables -t mangle -A SS-MASK -j RETURN -m mark --mark 0xff #ip6tables -t mangle -A SS-MASK -j RETURN -m mark --mark 0xff # Reroute -iptables -t mangle -A SS-MASK -p udp -j MARK --set-mark 0x2333/0x2333 -iptables -t mangle -A SS-MASK -p tcp -j MARK --set-mark 0x2333/0x2333 -#ip6tables -t mangle -A SS-MASK -p udp -j MARK --set-mark 0x2333/0x2333 -#ip6tables -t mangle -A SS-MASK -p tcp -j MARK --set-mark 0x2333/0x2333 +iptables -t mangle -A SS-MASK -p udp -j MARK --set-mark 0x2333 +iptables -t mangle -A SS-MASK -p tcp -j MARK --set-mark 0x2333 +#ip6tables -t mangle -A SS-MASK -p udp -j MARK --set-mark 0x2333 +#ip6tables -t mangle -A SS-MASK -p tcp -j MARK --set-mark 0x2333 # Apply iptables -t mangle -A OUTPUT -j SS-MASK diff --git a/crates/shadowsocks-service/src/local/dns/client_cache.rs b/crates/shadowsocks-service/src/local/dns/client_cache.rs index f71e2fbdd8d4..cf6e2fd0f2c9 100644 --- a/crates/shadowsocks-service/src/local/dns/client_cache.rs +++ b/crates/shadowsocks-service/src/local/dns/client_cache.rs @@ -13,11 +13,11 @@ use std::{ use log::trace; use shadowsocks::{config::ServerConfig, net::ConnectOpts, relay::socks5::Address}; use tokio::sync::Mutex; -use trust_dns_proto::op::Message; +use trust_dns_proto::{error::ProtoError, op::Message}; use crate::local::context::ServiceContext; -use super::upstream::{DnsClient, LookupError, ResolveError}; +use super::upstream::DnsClient; #[derive(Clone, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)] enum DnsClientKey { @@ -49,9 +49,9 @@ impl DnsClientCache { pub async fn lookup_tcp_local( &self, ns: SocketAddr, - mut msg: Message, + msg: Message, connect_opts: &ConnectOpts, - ) -> Result { + ) -> Result { let mut last_err = None; for _ in 0..self.retry_count { @@ -67,10 +67,9 @@ impl DnsClientCache { } }; - let res = match client.lookup_timeout(msg, self.timeout).await { + let res = match client.lookup_timeout(msg.clone(), self.timeout).await { Ok(msg) => msg, - Err(LookupError { error, msg: omsg }) => { - msg = omsg; + Err(error) => { last_err = Some(error); continue; } @@ -87,9 +86,9 @@ impl DnsClientCache { pub async fn lookup_udp_local( &self, ns: SocketAddr, - mut msg: Message, + msg: Message, connect_opts: &ConnectOpts, - ) -> Result { + ) -> Result { let mut last_err = None; for _ in 0..self.retry_count { @@ -105,10 +104,9 @@ impl DnsClientCache { } }; - let res = match client.lookup_timeout(msg, self.timeout).await { + let res = match client.lookup_timeout(msg.clone(), self.timeout).await { Ok(msg) => msg, - Err(LookupError { error, msg: omsg }) => { - msg = omsg; + Err(error) => { last_err = Some(error); continue; } @@ -123,7 +121,7 @@ impl DnsClientCache { } #[cfg(unix)] - pub async fn lookup_unix_stream>(&self, ns: &P, mut msg: Message) -> Result { + pub async fn lookup_unix_stream>(&self, ns: &P, msg: Message) -> Result { let mut last_err = None; let key = DnsClientKey::UnixStream(ns.as_ref().to_path_buf()); @@ -139,10 +137,9 @@ impl DnsClientCache { } }; - let res = match client.lookup_timeout(msg, self.timeout).await { + let res = match client.lookup_timeout(msg.clone(), self.timeout).await { Ok(msg) => msg, - Err(LookupError { error, msg: omsg }) => { - msg = omsg; + Err(error) => { last_err = Some(error); continue; } @@ -161,8 +158,8 @@ impl DnsClientCache { context: &ServiceContext, svr_cfg: &ServerConfig, ns: &Address, - mut msg: Message, - ) -> Result { + msg: Message, + ) -> Result { let mut last_err = None; let key = DnsClientKey::UdpRemote(ns.clone()); @@ -187,10 +184,9 @@ impl DnsClientCache { } }; - let res = match client.lookup_timeout(msg, self.timeout).await { + let res = match client.lookup_timeout(msg.clone(), self.timeout).await { Ok(msg) => msg, - Err(LookupError { error, msg: omsg }) => { - msg = omsg; + Err(error) => { last_err = Some(error); continue; } @@ -209,8 +205,8 @@ impl DnsClientCache { context: &ServiceContext, svr_cfg: &ServerConfig, ns: &Address, - mut msg: Message, - ) -> Result { + msg: Message, + ) -> Result { let mut last_err = None; let key = DnsClientKey::TcpRemote(ns.clone()); @@ -235,10 +231,9 @@ impl DnsClientCache { } }; - let res = match client.lookup_timeout(msg, self.timeout).await { + let res = match client.lookup_timeout(msg.clone(), self.timeout).await { Ok(msg) => msg, - Err(LookupError { error, msg: omsg }) => { - msg = omsg; + Err(error) => { last_err = Some(error); continue; } diff --git a/crates/shadowsocks-service/src/local/dns/dns_resolver.rs b/crates/shadowsocks-service/src/local/dns/dns_resolver.rs index 6fda5705c1ff..97f25386e4a3 100644 --- a/crates/shadowsocks-service/src/local/dns/dns_resolver.rs +++ b/crates/shadowsocks-service/src/local/dns/dns_resolver.rs @@ -51,7 +51,7 @@ impl DnsResolver { self.connect_opts = connect_opts; } - pub async fn lookup(&self, msg: Message) -> io::Result { + async fn lookup(&self, msg: Message) -> io::Result { let mut last_err = io::Error::new(ErrorKind::InvalidData, "resolve empty"); for _ in 0..self.attempts { diff --git a/crates/shadowsocks-service/src/local/dns/server.rs b/crates/shadowsocks-service/src/local/dns/server.rs index 8fa50db76c06..053e7f80bb17 100644 --- a/crates/shadowsocks-service/src/local/dns/server.rs +++ b/crates/shadowsocks-service/src/local/dns/server.rs @@ -12,7 +12,7 @@ use std::{ use byteorder::{BigEndian, ByteOrder}; use bytes::{BufMut, BytesMut}; -use futures::future; +use futures::future::{self, Either}; use log::{debug, error, info, trace, warn}; use rand::{thread_rng, Rng}; use shadowsocks::{ @@ -555,7 +555,9 @@ impl DnsClient { for _ in 0..self.attempts { match self.lookup_remote_inner(query, remote_addr).await { - Ok(m) => return Ok(m), + Ok(m) => { + return Ok(m); + } Err(err) => last_err = err, } } @@ -563,46 +565,110 @@ impl DnsClient { Err(last_err) } + // async fn lookup_remote_inner(&self, query: &Query, remote_addr: &Address) -> io::Result { + // let mut message = Message::new(); + // message.set_id(thread_rng().gen()); + // message.set_recursion_desired(true); + // message.add_query(query.clone()); + + // // Query UDP then TCP + // let mut last_err = io::Error::new(ErrorKind::InvalidData, "resolve empty"); + + // if self.mode.enable_udp() { + // let server = self.balancer.best_udp_server(); + + // match self + // .client_cache + // .lookup_udp_remote(&self.context, server.server_config(), remote_addr, message.clone()) + // .await + // { + // Ok(msg) => return Ok(msg), + // Err(err) => { + // last_err = err.into(); + // } + // } + // } + + // if self.mode.enable_tcp() { + // let server = self.balancer.best_tcp_server(); + + // match self + // .client_cache + // .lookup_tcp_remote(&self.context, server.server_config(), remote_addr, message) + // .await + // { + // Ok(msg) => return Ok(msg), + // Err(err) => { + // last_err = err.into(); + // } + // } + // } + + // Err(last_err) + // } + async fn lookup_remote_inner(&self, query: &Query, remote_addr: &Address) -> io::Result { let mut message = Message::new(); message.set_id(thread_rng().gen()); message.set_recursion_desired(true); message.add_query(query.clone()); - // Query UDP then TCP - let mut last_err = io::Error::new(ErrorKind::InvalidData, "resolve empty"); - - if self.mode.enable_udp() { - let server = self.balancer.best_udp_server(); + // Query UDP and TCP - match self - .client_cache - .lookup_udp_remote(&self.context, server.server_config(), remote_addr, message.clone()) - .await - { - Ok(msg) => return Ok(msg), - Err(err) => { - last_err = err.into(); - } + match self.mode { + Mode::TcpOnly => { + let server = self.balancer.best_tcp_server(); + self.client_cache + .lookup_tcp_remote(&self.context, server.server_config(), remote_addr, message) + .await + .map_err(From::from) } - } - - if self.mode.enable_tcp() { - let server = self.balancer.best_tcp_server(); + Mode::UdpOnly => { + let server = self.balancer.best_udp_server(); + self.client_cache + .lookup_udp_remote(&self.context, server.server_config(), remote_addr, message) + .await + .map_err(From::from) + } + Mode::TcpAndUdp => { + // Query TCP & UDP simutaneously + + let message2 = message.clone(); + let tcp_fut = async { + // For most cases UDP query will return in 1s, + // Then this future will be disabled and have no effect + // + // Randomly choose from 500ms ~ 1.5s for preventing obvious request pattern + let sleep_time = thread_rng().gen_range(500, 1500); + time::sleep(Duration::from_millis(sleep_time)).await; + + let server = self.balancer.best_tcp_server(); + self.client_cache + .lookup_tcp_remote(&self.context, server.server_config(), remote_addr, message2) + .await + }; + let udp_fut = async { + let server = self.balancer.best_udp_server(); + self.client_cache + .lookup_udp_remote(&self.context, server.server_config(), remote_addr, message) + .await + }; - match self - .client_cache - .lookup_tcp_remote(&self.context, server.server_config(), remote_addr, message) - .await - { - Ok(msg) => return Ok(msg), - Err(err) => { - last_err = err.into(); + tokio::pin!(tcp_fut); + tokio::pin!(udp_fut); + + match future::select(tcp_fut, udp_fut).await { + Either::Left((res, next)) => match res { + Ok(o) => Ok(o), + Err(..) => next.await.map_err(From::from), + }, + Either::Right((res, next)) => match res { + Ok(o) => Ok(o), + Err(..) => next.await.map_err(From::from), + }, } } } - - Err(last_err) } async fn lookup_local(&self, query: &Query, local_addr: &NameServerAddr) -> io::Result { @@ -610,7 +676,9 @@ impl DnsClient { for _ in 0..self.attempts { match self.lookup_local_inner(query, local_addr).await { - Ok(m) => return Ok(m), + Ok(m) => { + return Ok(m); + } Err(err) => last_err = err, } } diff --git a/crates/shadowsocks-service/src/local/dns/upstream.rs b/crates/shadowsocks-service/src/local/dns/upstream.rs index 413d23251414..6b6c88d6d0b8 100644 --- a/crates/shadowsocks-service/src/local/dns/upstream.rs +++ b/crates/shadowsocks-service/src/local/dns/upstream.rs @@ -2,12 +2,7 @@ #[cfg(unix)] use std::path::Path; -use std::{ - io::{self, ErrorKind}, - net::SocketAddr, - sync::Arc, - time::Duration, -}; +use std::{io, net::SocketAddr, sync::Arc, time::Duration}; use byteorder::{BigEndian, ByteOrder}; use bytes::{BufMut, BytesMut}; @@ -19,7 +14,6 @@ use shadowsocks::{ net::{ConnectOpts, TcpStream as ShadowTcpStream, UdpSocket as ShadowUdpSocket}, relay::{tcprelay::ProxyClientStream, udprelay::ProxySocket, Address}, }; -use thiserror::Error; #[cfg(unix)] use tokio::net::UnixStream; use tokio::{ @@ -27,7 +21,10 @@ use tokio::{ net::{TcpStream, UdpSocket}, time, }; -use trust_dns_proto::{error::ProtoError, op::Message}; +use trust_dns_proto::{ + error::{ProtoError, ProtoErrorKind}, + op::Message, +}; use crate::net::{FlowStat, MonProxySocket, MonProxyStream}; @@ -102,29 +99,20 @@ impl DnsClient { /// Make a DNS lookup #[allow(dead_code)] - pub async fn lookup(&mut self, mut msg: Message) -> Result { - match self.inner_lookup(&mut msg).await { - Ok(msg) => Ok(msg), - Err(error) => Err(LookupError { error, msg }), - } + pub async fn lookup(&mut self, mut msg: Message) -> Result { + self.inner_lookup(&mut msg).await } /// Make a DNS lookup with timeout - pub async fn lookup_timeout(&mut self, mut msg: Message, timeout: Duration) -> Result { + pub async fn lookup_timeout(&mut self, mut msg: Message, timeout: Duration) -> Result { match time::timeout(timeout, self.inner_lookup(&mut msg)).await { Ok(Ok(msg)) => Ok(msg), - Ok(Err(error)) => Err(LookupError { error, msg }), - Err(..) => { - let error: io::Error = ErrorKind::TimedOut.into(); - Err(LookupError { - error: error.into(), - msg, - }) - } + Ok(Err(error)) => Err(error), + Err(..) => Err(ProtoErrorKind::Timeout.into()), } } - async fn inner_lookup(&mut self, msg: &mut Message) -> Result { + async fn inner_lookup(&mut self, msg: &mut Message) -> Result { // Make a random ID msg.set_id(thread_rng().gen()); @@ -139,7 +127,7 @@ impl DnsClient { let mut recv_buf = [0u8; 256]; let n = socket.recv(&mut recv_buf).await?; - Message::from_vec(&recv_buf[..n]).map_err(From::from) + Message::from_vec(&recv_buf[..n]) } #[cfg(unix)] DnsClient::UnixStream { ref mut stream } => stream_query(stream, msg).await, @@ -151,13 +139,13 @@ impl DnsClient { let mut recv_buf = [0u8; 256]; let (n, _) = socket.recv(&mut recv_buf).await?; - Message::from_vec(&recv_buf[..n]).map_err(From::from) + Message::from_vec(&recv_buf[..n]) } } } } -pub async fn stream_query(stream: &mut S, r: &Message) -> Result +pub async fn stream_query(stream: &mut S, r: &Message) -> Result where S: AsyncRead + AsyncWrite + Unpin, { @@ -182,30 +170,5 @@ where } stream.read_exact(&mut rsp_bytes).await?; - Message::from_vec(&rsp_bytes).map_err(From::from) -} - -/// DNS Resolve Error -#[derive(Error, Debug)] -pub enum ResolveError { - #[error("{0}")] - IoError(#[from] io::Error), - #[error("{0}")] - ProtoError(#[from] ProtoError), -} - -impl From for io::Error { - fn from(e: ResolveError) -> io::Error { - match e { - ResolveError::IoError(e) => e, - ResolveError::ProtoError(e) => From::from(e), - } - } -} - -/// `lookup` Error -#[derive(Debug)] -pub struct LookupError { - pub error: ResolveError, - pub msg: Message, + Message::from_vec(&rsp_bytes) }