diff --git a/src/servers/udp/server.rs b/src/servers/udp/server.rs index 95c8145c1..98c4bf726 100644 --- a/src/servers/udp/server.rs +++ b/src/servers/udp/server.rs @@ -255,24 +255,7 @@ impl Udp { let running = tokio::task::spawn(async move { debug!(target: "UDP TRACKER", "Started: Waiting for packets on socket address: udp://{address} ..."); - - let tracker = tracker.clone(); - let socket = socket.clone(); - - let reqs = &mut ActiveRequests::default(); - - // Main Waiting Loop, awaits on async [`receive_request`]. - loop { - if let Some(h) = reqs.rb.push_overwrite( - Self::do_request(Self::receive_request(socket.clone()).await, tracker.clone(), socket.clone()).abort_handle(), - ) { - if !h.is_finished() { - // the task is still running, lets yield and give it a chance to flush. - tokio::task::yield_now().await; - h.abort(); - } - } - } + Self::run_udp_server(tracker, socket).await; }); tx_start @@ -292,6 +275,27 @@ impl Udp { task::yield_now().await; // lets allow the other threads to complete. } + async fn run_udp_server(tracker: Arc, socket: Arc) { + let tracker = tracker.clone(); + let socket = socket.clone(); + + let reqs = &mut ActiveRequests::default(); + + // Main Waiting Loop, awaits on async [`receive_request`]. + loop { + if let Some(h) = reqs.rb.push_overwrite( + Self::spawn_request_processor(Self::receive_request(socket.clone()).await, tracker.clone(), socket.clone()) + .abort_handle(), + ) { + if !h.is_finished() { + // the task is still running, lets yield and give it a chance to flush. + tokio::task::yield_now().await; + h.abort(); + } + } + } + } + async fn receive_request(socket: Arc) -> Result> { // Wait for the socket to be readable socket.readable().await?; @@ -309,26 +313,27 @@ impl Udp { } } - fn do_request( + fn spawn_request_processor( result: Result>, tracker: Arc, socket: Arc, ) -> JoinHandle<()> { - // timeout not needed, as udp is non-blocking. - tokio::task::spawn(async move { - match result { - Ok(udp_request) => { - trace!("Received Request from: {}", udp_request.from); - Self::make_response(tracker.clone(), socket.clone(), udp_request).await; - } - Err(error) => { - debug!("error: {error}"); - } + tokio::task::spawn(Self::process_request(result, tracker, socket)) + } + + async fn process_request(result: Result>, tracker: Arc, socket: Arc) { + match result { + Ok(udp_request) => { + trace!("Received Request from: {}", udp_request.from); + Self::process_valid_request(tracker.clone(), socket.clone(), udp_request).await; } - }) + Err(error) => { + debug!("error: {error}"); + } + } } - async fn make_response(tracker: Arc, socket: Arc, udp_request: UdpRequest) { + async fn process_valid_request(tracker: Arc, socket: Arc, udp_request: UdpRequest) { trace!("Making Response to {udp_request:?}"); let from = udp_request.from; let response = handlers::handle_packet(udp_request, &tracker.clone(), socket.clone()).await;