diff --git a/src/servers/udp/server.rs b/src/servers/udp/server.rs index fc2d02a5..f7092f37 100644 --- a/src/servers/udp/server.rs +++ b/src/servers/udp/server.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use aquatic_udp_protocol::Response; use derive_more::Constructor; use log::{debug, error, info, trace}; -use ringbuf::traits::{Consumer, Observer, RingBuffer}; +use ringbuf::traits::{Consumer, Observer, Producer}; use ringbuf::StaticRb; use tokio::net::UdpSocket; use tokio::sync::oneshot; @@ -202,11 +202,23 @@ impl Launcher { } } -#[derive(Default)] struct ActiveRequests { rb: StaticRb, // the number of requests we handle at the same time. } +impl ActiveRequests { + /// Creates a new [`ActiveRequests`] filled with finished tasks. + async fn new() -> Self { + let mut rb = StaticRb::default(); + + let () = while rb.try_push(tokio::task::spawn_blocking(|| ()).abort_handle()).is_ok() {}; + + task::yield_now().await; + + Self { rb } + } +} + impl std::fmt::Debug for ActiveRequests { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let (left, right) = &self.rb.as_slices(); @@ -280,15 +292,22 @@ impl Udp { let tracker = tracker.clone(); let socket = socket.clone(); - let reqs = &mut ActiveRequests::default(); + let reqs = &mut ActiveRequests::new().await; - // Main Waiting Loop, awaits on async [`receive_request`]. loop { - if let Some(h) = reqs.rb.push_overwrite( - Self::spawn_request_processor(Self::receive_request(socket.clone()).await, tracker.clone(), socket.clone()) - .abort_handle(), - ) { - if !h.is_finished() { + task::yield_now().await; + for h in reqs.rb.iter_mut() { + if h.is_finished() { + std::mem::swap( + h, + &mut Self::spawn_request_processor( + Self::receive_request(socket.clone()).await, + tracker.clone(), + socket.clone(), + ) + .abort_handle(), + ); + } else { // the task is still running, lets yield and give it a chance to flush. tokio::task::yield_now().await; @@ -299,6 +318,9 @@ impl Udp { tracing::span!( target: "UDP TRACKER", tracing::Level::WARN, "request-aborted", server_socket_addr = %server_socket_addr); + + // force-break a single thread, then loop again. + break; } } } @@ -396,13 +418,46 @@ mod tests { use std::sync::Arc; use std::time::Duration; - use tokio::time::sleep; + use ringbuf::traits::{Consumer, Observer, RingBuffer}; use torrust_tracker_test_helpers::configuration::ephemeral_mode_public; + use super::ActiveRequests; use crate::bootstrap::app::initialize_with_configuration; use crate::servers::registar::Registar; use crate::servers::udp::server::{Launcher, UdpServer}; + #[tokio::test] + async fn it_should_return_to_the_start_of_the_ring_buffer() { + let mut a_req = ActiveRequests::new().await; + + tokio::time::sleep(Duration::from_millis(10)).await; + + let mut count: usize = 0; + let cap: usize = a_req.rb.capacity().into(); + + // Add a single pending task to check that the ring-buffer is looping correctly. + a_req + .rb + .push_overwrite(tokio::task::spawn(std::future::pending::<()>()).abort_handle()); + + count += 1; + + for _ in 0..2 { + for h in a_req.rb.iter() { + let first = count % cap; + println!("{count},{first},{}", h.is_finished()); + + if first == 0 { + assert!(!h.is_finished()); + } else { + assert!(h.is_finished()); + } + + count += 1; + } + } + } + #[tokio::test] async fn it_should_be_able_to_start_and_stop() { let cfg = Arc::new(ephemeral_mode_public()); @@ -423,7 +478,7 @@ mod tests { .expect("it should start the server"); let stopped = started.stop().await.expect("it should stop the server"); - sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; assert_eq!(stopped.state.launcher.bind_to, bind_to); }