From 4b9d15eb94cb9aaa6891b41cfb07e5c4122ef1d4 Mon Sep 17 00:00:00 2001 From: Benjamin Saunders Date: Tue, 19 Dec 2023 15:21:39 -0800 Subject: [PATCH] udp: Test concurrent recvs on cloned sockets --- tokio/tests/udp.rs | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tokio/tests/udp.rs b/tokio/tests/udp.rs index eea281c2316..5a9b71746f7 100644 --- a/tokio/tests/udp.rs +++ b/tokio/tests/udp.rs @@ -643,3 +643,46 @@ async fn poll_ready() { } } } + +#[tokio::test] +async fn concurrent_recv() { + use std::sync::atomic::{AtomicBool, Ordering}; + + // Create two copies of the same server socket + let server = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let server2 = server.try_clone().unwrap(); + let saddr = server.local_addr().unwrap(); + + // Create client + let client = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + + const MSG1: [u8; 3] = [1, 2, 3]; + const MSG2: [u8; 3] = [4, 5, 6]; + + let got_msg1 = Arc::new(AtomicBool::new(false)); + let got_msg2 = Arc::new(AtomicBool::new(false)); + + async fn recv(socket: UdpSocket, m1: Arc, m2: Arc) { + let mut buf = [0; 3]; + socket.recv(&mut buf).await.unwrap(); + match buf { + MSG1 => &m1, + MSG2 => &m2, + _ => return, + } + .store(true, Ordering::Relaxed); + } + + // Try to receive a message on each clone of the socket + let recv1 = tokio::spawn(recv(server, got_msg1.clone(), got_msg2.clone())); + let recv2 = tokio::spawn(recv(server2, got_msg1.clone(), got_msg2.clone())); + client.send_to(&MSG1, saddr).await.unwrap(); + client.send_to(&MSG2, saddr).await.unwrap(); + + // Both receivers should wake and read a datagram + recv1.await.unwrap(); + recv2.await.unwrap(); + + // Both messages should have been received + assert!(got_msg1.load(Ordering::Relaxed) && got_msg1.load(Ordering::Relaxed)); +}