Skip to content

Commit

Permalink
queries UDP & TCP simutaneously
Browse files Browse the repository at this point in the history
  • Loading branch information
zonyitoo committed Dec 23, 2020
1 parent 1af4206 commit 3a1054a
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 125 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions configs/iptables_tproxy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ fi

# Strategy Route
ip -4 route add local 0/0 dev lo table 100
ip -4 rule add fwmark 0x2333/0x2333 table 100
ip -4 rule add fwmark 0x2333 table 100
#ip -6 route add local ::/0 dev lo table 100
#ip -6 rule add fwmark 0x2333/0x2333 table 100
#ip -6 rule add fwmark 0x2333 table 100

iptables -t mangle -N SS
ip6tables -t mangle -N SS
Expand All @@ -27,10 +27,10 @@ iptables -t mangle -A SS -d 240/4 -j RETURN
#ip6tables -t mangle -A SS -d fe80::/10 -j RETURN

# TPROXY TCP/UDP mark 0x2333 to port 60080
iptables -t mangle -A SS -p udp -j TPROXY --on-port 60080 --tproxy-mark 0x2333/0x2333
iptables -t mangle -A SS -p tcp -j TPROXY --on-port 60080 --tproxy-mark 0x2333/0x2333
#ip6tables -t mangle -A SS -p udp -j TPROXY --on-port 60080 --tproxy-mark 0x2333/0x2333
#ip6tables -t mangle -A SS -p tcp -j TPROXY --on-port 60080 --tproxy-mark 0x2333/0x2333
iptables -t mangle -A SS -p udp -j TPROXY --on-port 60080 --tproxy-mark 0x2333
iptables -t mangle -A SS -p tcp -j TPROXY --on-port 60080 --tproxy-mark 0x2333
#ip6tables -t mangle -A SS -p udp -j TPROXY --on-port 60080 --tproxy-mark 0x2333
#ip6tables -t mangle -A SS -p tcp -j TPROXY --on-port 60080 --tproxy-mark 0x2333

# Apply
iptables -t mangle -A PREROUTING -j SS
Expand All @@ -57,10 +57,10 @@ iptables -t mangle -A SS-MASK -j RETURN -m mark --mark 0xff
#ip6tables -t mangle -A SS-MASK -j RETURN -m mark --mark 0xff

# Reroute
iptables -t mangle -A SS-MASK -p udp -j MARK --set-mark 0x2333/0x2333
iptables -t mangle -A SS-MASK -p tcp -j MARK --set-mark 0x2333/0x2333
#ip6tables -t mangle -A SS-MASK -p udp -j MARK --set-mark 0x2333/0x2333
#ip6tables -t mangle -A SS-MASK -p tcp -j MARK --set-mark 0x2333/0x2333
iptables -t mangle -A SS-MASK -p udp -j MARK --set-mark 0x2333
iptables -t mangle -A SS-MASK -p tcp -j MARK --set-mark 0x2333
#ip6tables -t mangle -A SS-MASK -p udp -j MARK --set-mark 0x2333
#ip6tables -t mangle -A SS-MASK -p tcp -j MARK --set-mark 0x2333

# Apply
iptables -t mangle -A OUTPUT -j SS-MASK
Expand Down
47 changes: 21 additions & 26 deletions crates/shadowsocks-service/src/local/dns/client_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ use std::{
use log::trace;
use shadowsocks::{config::ServerConfig, net::ConnectOpts, relay::socks5::Address};
use tokio::sync::Mutex;
use trust_dns_proto::op::Message;
use trust_dns_proto::{error::ProtoError, op::Message};

use crate::local::context::ServiceContext;

use super::upstream::{DnsClient, LookupError, ResolveError};
use super::upstream::DnsClient;

#[derive(Clone, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
enum DnsClientKey {
Expand Down Expand Up @@ -49,9 +49,9 @@ impl DnsClientCache {
pub async fn lookup_tcp_local(
&self,
ns: SocketAddr,
mut msg: Message,
msg: Message,
connect_opts: &ConnectOpts,
) -> Result<Message, ResolveError> {
) -> Result<Message, ProtoError> {
let mut last_err = None;

for _ in 0..self.retry_count {
Expand All @@ -67,10 +67,9 @@ impl DnsClientCache {
}
};

let res = match client.lookup_timeout(msg, self.timeout).await {
let res = match client.lookup_timeout(msg.clone(), self.timeout).await {
Ok(msg) => msg,
Err(LookupError { error, msg: omsg }) => {
msg = omsg;
Err(error) => {
last_err = Some(error);
continue;
}
Expand All @@ -87,9 +86,9 @@ impl DnsClientCache {
pub async fn lookup_udp_local(
&self,
ns: SocketAddr,
mut msg: Message,
msg: Message,
connect_opts: &ConnectOpts,
) -> Result<Message, ResolveError> {
) -> Result<Message, ProtoError> {
let mut last_err = None;

for _ in 0..self.retry_count {
Expand All @@ -105,10 +104,9 @@ impl DnsClientCache {
}
};

let res = match client.lookup_timeout(msg, self.timeout).await {
let res = match client.lookup_timeout(msg.clone(), self.timeout).await {
Ok(msg) => msg,
Err(LookupError { error, msg: omsg }) => {
msg = omsg;
Err(error) => {
last_err = Some(error);
continue;
}
Expand All @@ -123,7 +121,7 @@ impl DnsClientCache {
}

#[cfg(unix)]
pub async fn lookup_unix_stream<P: AsRef<Path>>(&self, ns: &P, mut msg: Message) -> Result<Message, ResolveError> {
pub async fn lookup_unix_stream<P: AsRef<Path>>(&self, ns: &P, msg: Message) -> Result<Message, ProtoError> {
let mut last_err = None;

let key = DnsClientKey::UnixStream(ns.as_ref().to_path_buf());
Expand All @@ -139,10 +137,9 @@ impl DnsClientCache {
}
};

let res = match client.lookup_timeout(msg, self.timeout).await {
let res = match client.lookup_timeout(msg.clone(), self.timeout).await {
Ok(msg) => msg,
Err(LookupError { error, msg: omsg }) => {
msg = omsg;
Err(error) => {
last_err = Some(error);
continue;
}
Expand All @@ -161,8 +158,8 @@ impl DnsClientCache {
context: &ServiceContext,
svr_cfg: &ServerConfig,
ns: &Address,
mut msg: Message,
) -> Result<Message, ResolveError> {
msg: Message,
) -> Result<Message, ProtoError> {
let mut last_err = None;

let key = DnsClientKey::UdpRemote(ns.clone());
Expand All @@ -187,10 +184,9 @@ impl DnsClientCache {
}
};

let res = match client.lookup_timeout(msg, self.timeout).await {
let res = match client.lookup_timeout(msg.clone(), self.timeout).await {
Ok(msg) => msg,
Err(LookupError { error, msg: omsg }) => {
msg = omsg;
Err(error) => {
last_err = Some(error);
continue;
}
Expand All @@ -209,8 +205,8 @@ impl DnsClientCache {
context: &ServiceContext,
svr_cfg: &ServerConfig,
ns: &Address,
mut msg: Message,
) -> Result<Message, ResolveError> {
msg: Message,
) -> Result<Message, ProtoError> {
let mut last_err = None;

let key = DnsClientKey::TcpRemote(ns.clone());
Expand All @@ -235,10 +231,9 @@ impl DnsClientCache {
}
};

let res = match client.lookup_timeout(msg, self.timeout).await {
let res = match client.lookup_timeout(msg.clone(), self.timeout).await {
Ok(msg) => msg,
Err(LookupError { error, msg: omsg }) => {
msg = omsg;
Err(error) => {
last_err = Some(error);
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion crates/shadowsocks-service/src/local/dns/dns_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl DnsResolver {
self.connect_opts = connect_opts;
}

pub async fn lookup(&self, msg: Message) -> io::Result<Message> {
async fn lookup(&self, msg: Message) -> io::Result<Message> {
let mut last_err = io::Error::new(ErrorKind::InvalidData, "resolve empty");

for _ in 0..self.attempts {
Expand Down
130 changes: 99 additions & 31 deletions crates/shadowsocks-service/src/local/dns/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{

use byteorder::{BigEndian, ByteOrder};
use bytes::{BufMut, BytesMut};
use futures::future;
use futures::future::{self, Either};
use log::{debug, error, info, trace, warn};
use rand::{thread_rng, Rng};
use shadowsocks::{
Expand Down Expand Up @@ -555,62 +555,130 @@ impl DnsClient {

for _ in 0..self.attempts {
match self.lookup_remote_inner(query, remote_addr).await {
Ok(m) => return Ok(m),
Ok(m) => {
return Ok(m);
}
Err(err) => last_err = err,
}
}

Err(last_err)
}

// async fn lookup_remote_inner(&self, query: &Query, remote_addr: &Address) -> io::Result<Message> {
// let mut message = Message::new();
// message.set_id(thread_rng().gen());
// message.set_recursion_desired(true);
// message.add_query(query.clone());

// // Query UDP then TCP
// let mut last_err = io::Error::new(ErrorKind::InvalidData, "resolve empty");

// if self.mode.enable_udp() {
// let server = self.balancer.best_udp_server();

// match self
// .client_cache
// .lookup_udp_remote(&self.context, server.server_config(), remote_addr, message.clone())
// .await
// {
// Ok(msg) => return Ok(msg),
// Err(err) => {
// last_err = err.into();
// }
// }
// }

// if self.mode.enable_tcp() {
// let server = self.balancer.best_tcp_server();

// match self
// .client_cache
// .lookup_tcp_remote(&self.context, server.server_config(), remote_addr, message)
// .await
// {
// Ok(msg) => return Ok(msg),
// Err(err) => {
// last_err = err.into();
// }
// }
// }

// Err(last_err)
// }

async fn lookup_remote_inner(&self, query: &Query, remote_addr: &Address) -> io::Result<Message> {
let mut message = Message::new();
message.set_id(thread_rng().gen());
message.set_recursion_desired(true);
message.add_query(query.clone());

// Query UDP then TCP
let mut last_err = io::Error::new(ErrorKind::InvalidData, "resolve empty");

if self.mode.enable_udp() {
let server = self.balancer.best_udp_server();
// Query UDP and TCP

match self
.client_cache
.lookup_udp_remote(&self.context, server.server_config(), remote_addr, message.clone())
.await
{
Ok(msg) => return Ok(msg),
Err(err) => {
last_err = err.into();
}
match self.mode {
Mode::TcpOnly => {
let server = self.balancer.best_tcp_server();
self.client_cache
.lookup_tcp_remote(&self.context, server.server_config(), remote_addr, message)
.await
.map_err(From::from)
}
}

if self.mode.enable_tcp() {
let server = self.balancer.best_tcp_server();
Mode::UdpOnly => {
let server = self.balancer.best_udp_server();
self.client_cache
.lookup_udp_remote(&self.context, server.server_config(), remote_addr, message)
.await
.map_err(From::from)
}
Mode::TcpAndUdp => {
// Query TCP & UDP simutaneously

let message2 = message.clone();
let tcp_fut = async {
// For most cases UDP query will return in 1s,
// Then this future will be disabled and have no effect
//
// Randomly choose from 500ms ~ 1.5s for preventing obvious request pattern
let sleep_time = thread_rng().gen_range(500, 1500);
time::sleep(Duration::from_millis(sleep_time)).await;

let server = self.balancer.best_tcp_server();
self.client_cache
.lookup_tcp_remote(&self.context, server.server_config(), remote_addr, message2)
.await
};
let udp_fut = async {
let server = self.balancer.best_udp_server();
self.client_cache
.lookup_udp_remote(&self.context, server.server_config(), remote_addr, message)
.await
};

match self
.client_cache
.lookup_tcp_remote(&self.context, server.server_config(), remote_addr, message)
.await
{
Ok(msg) => return Ok(msg),
Err(err) => {
last_err = err.into();
tokio::pin!(tcp_fut);
tokio::pin!(udp_fut);

match future::select(tcp_fut, udp_fut).await {
Either::Left((res, next)) => match res {
Ok(o) => Ok(o),
Err(..) => next.await.map_err(From::from),
},
Either::Right((res, next)) => match res {
Ok(o) => Ok(o),
Err(..) => next.await.map_err(From::from),
},
}
}
}

Err(last_err)
}

async fn lookup_local(&self, query: &Query, local_addr: &NameServerAddr) -> io::Result<Message> {
let mut last_err = io::Error::new(ErrorKind::InvalidData, "resolve empty");

for _ in 0..self.attempts {
match self.lookup_local_inner(query, local_addr).await {
Ok(m) => return Ok(m),
Ok(m) => {
return Ok(m);
}
Err(err) => last_err = err,
}
}
Expand Down
Loading

0 comments on commit 3a1054a

Please sign in to comment.