Skip to content

Commit

Permalink
coredns: work on tcp requests concurrently
Browse files Browse the repository at this point in the history
Right now for a single network all requests where processed serial and
with tcp a caller is able to block us for a long time if it just opens
the connection but sends very little or no data. To avoid this always
spawn a new task if we accept a new tcp connection.

We could do the same for udp however my testing with contrib/perf/run.sh
has shown that it slows things down as the overhead of spawning a task
is greater than the few quick simple map lookups so we only spawn where
needed. We still have to spawn when forwarding external requests as this
can take a long time.

Fixes containers#500

Signed-off-by: Paul Holzinger <[email protected]>
  • Loading branch information
Luap99 committed Sep 4, 2024
1 parent aa109bb commit 4a27dcf
Showing 1 changed file with 86 additions and 43 deletions.
129 changes: 86 additions & 43 deletions src/dns/coredns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ use tokio::net::UdpSocket;
const CONTAINER_TTL: u32 = 60;

pub struct CoreDns {
rx: flume::Receiver<()>, // kill switch receiver
inner: CoreDnsData,
}

#[derive(Clone)]
struct CoreDnsData {
network_name: String, // raw network name
backend: &'static ArcSwap<DNSBackend>, // server's data store
rx: flume::Receiver<()>, // kill switch receiver
no_proxy: bool, // do not forward to external resolvers
nameservers: Arc<Mutex<Vec<ScopedIp>>>, // host nameservers from resolv.conf
}
Expand All @@ -57,11 +62,13 @@ impl CoreDns {
nameservers: Arc<Mutex<Vec<ScopedIp>>>,
) -> Self {
CoreDns {
network_name,
backend,
rx,
no_proxy,
nameservers,
inner: CoreDnsData {
network_name,
backend,
no_proxy,
nameservers,
},
}
}

Expand All @@ -87,12 +94,12 @@ impl CoreDns {
continue;
}
};
self.process_message(msg_received, &sender_original, Protocol::Udp);
Self::process_message(&self.inner, msg_received, &sender_original, Protocol::Udp).await;
},
res = tcp_listener.accept() => {
match res {
Ok((sock,addr)) => {
self.process_tcp_stream(sock, addr).await
tokio::spawn(Self::process_tcp_stream(self.inner.clone(), sock, addr));
}
Err(e) => {
error!("Failed to accept new tcp connection: {e}");
Expand All @@ -105,7 +112,11 @@ impl CoreDns {
Ok(())
}

async fn process_tcp_stream(&self, stream: tokio::net::TcpStream, peer: SocketAddr) {
async fn process_tcp_stream(
data: CoreDnsData,
stream: tokio::net::TcpStream,
peer: SocketAddr,
) {
let (mut hickory_stream, sender_original) =
TcpStream::from_stream(AsyncIoTokioAsStd(stream), peer);

Expand All @@ -114,7 +125,7 @@ impl CoreDns {
match tokio::time::timeout(Duration::from_secs(3), hickory_stream.next()).await {
Ok(message) => {
if let Some(msg) = message {
self.process_message(msg, &sender_original, Protocol::Tcp);
Self::process_message(&data, msg, &sender_original, Protocol::Tcp).await;
// The API is a bit strange, first time we call next we get the message,
// but we must call again to send our reply back
hickory_stream.next().await;
Expand All @@ -127,8 +138,8 @@ impl CoreDns {
}
}

fn process_message(
&self,
async fn process_message(
data: &CoreDnsData,
msg_received: Result<SerialMessage, Error>,
sender_original: &BufDnsStreamHandle,
proto: Protocol,
Expand All @@ -140,7 +151,7 @@ impl CoreDns {
return;
}
};
let backend = self.backend.load();
let backend = data.backend.load();
let src_address = msg.addr();
let mut sender = sender_original.with_remote_addr(src_address);
let (request_name, record_type, mut req) = match parse_dns_msg(msg) {
Expand All @@ -153,7 +164,7 @@ impl CoreDns {
let request_name_string = request_name.to_string();

// Create debug and trace info for key parameters.
trace!("server network name: {:?}", self.network_name);
trace!("server network name: {:?}", data.network_name);
debug!("request source address: {:?}", src_address);
trace!("requested record type: {:?}", record_type);
debug!(
Expand All @@ -175,7 +186,7 @@ impl CoreDns {
if let Some(msg) = reply_ip(
&request_name_string,
&request_name,
&self.network_name,
&data.network_name,
record_type,
&backend,
src_address,
Expand All @@ -195,7 +206,7 @@ impl CoreDns {
};

// are we allowed to forward?
if self.no_proxy
if data.no_proxy
|| backend.ctr_is_internal(&src_address.ip())
|| request_name_string.ends_with(&backend.search_domain)
|| request_name_string.matches('.').count() == 1
Expand Down Expand Up @@ -224,41 +235,73 @@ impl CoreDns {
}
// Use host resolvers if no custom resolvers are set for the container.
if nameservers.is_empty() {
nameservers.clone_from(&self.nameservers.lock().expect("lock nameservers"));
nameservers.clone_from(&data.nameservers.lock().expect("lock nameservers"));
}

match proto {
Protocol::Udp => {
tokio::spawn(Self::forward_to_servers(
nameservers,
sender,
src_address,
req,
proto,
));
}
Protocol::Tcp => {
// we already spawned a new future when we read the message so there is no need to spawn another one
Self::forward_to_servers(nameservers, sender, src_address, req, proto).await;
}
}
}
}

tokio::spawn(async move {
// forward dns request to hosts's /etc/resolv.conf
for nameserver in &nameservers {
let addr = SocketAddr::new(nameserver.into(), 53);
let (client, handle) = match proto {
Protocol::Udp => {
let stream = UdpClientStream::<UdpSocket>::new(addr);
let (cl, bg) = AsyncClient::connect(stream).await?;
let handle = tokio::spawn(bg);
(cl, handle)
async fn forward_to_servers(
nameservers: Vec<ScopedIp>,
mut sender: BufDnsStreamHandle,
src_address: SocketAddr,
req: Message,
proto: Protocol,
) {
// forward dns request to hosts's /etc/resolv.conf
for nameserver in &nameservers {
let addr = SocketAddr::new(nameserver.into(), 53);
let (client, handle) = match proto {
Protocol::Udp => {
let stream = UdpClientStream::<UdpSocket>::new(addr);
let (cl, bg) = match AsyncClient::connect(stream).await {
Ok(a) => a,
Err(e) => {
debug!("Failed to connect to {addr}: {e}");
continue;
}
Protocol::Tcp => {
let (stream, sender) = TcpClientStream::<
AsyncIoTokioAsStd<tokio::net::TcpStream>,
>::new(addr);
let (cl, bg) = AsyncClient::new(stream, sender, None).await?;
let handle = tokio::spawn(bg);
(cl, handle)
};
let handle = tokio::spawn(bg);
(cl, handle)
}
Protocol::Tcp => {
let (stream, sender) =
TcpClientStream::<AsyncIoTokioAsStd<tokio::net::TcpStream>>::new(addr);
let (cl, bg) = match AsyncClient::new(stream, sender, None).await {
Ok(a) => a,
Err(e) => {
debug!("Failed to connect to {addr}: {e}");
continue;
}
};
let handle = tokio::spawn(bg);
(cl, handle)
}
};

if let Some(resp) = forward_dns_req(client, req.clone()).await {
if reply(&mut sender, src_address, &resp).is_some() {
// request resolved from following resolver so
// break and don't try other resolvers
break;
}
}
drop(handle);
if let Some(resp) = forward_dns_req(client, req.clone()).await {
if reply(&mut sender, src_address, &resp).is_some() {
// request resolved from following resolver so
// break and don't try other resolvers
break;
}
Ok::<(), std::io::Error>(())
});
}
handle.abort();
}
}
}
Expand Down

0 comments on commit 4a27dcf

Please sign in to comment.