From f886ea2e7d707e3f7530bce6da89eaea3e846b19 Mon Sep 17 00:00:00 2001 From: Caio Date: Tue, 11 Jul 2023 19:49:08 +0200 Subject: [PATCH] WIP Use a single task to handle timer events --- examples/foca_insecure_udp_agent.rs | 149 ++++++++++++++++++++++------ 1 file changed, 119 insertions(+), 30 deletions(-) diff --git a/examples/foca_insecure_udp_agent.rs b/examples/foca_insecure_udp_agent.rs index 3b73ea3..c918d3c 100644 --- a/examples/foca_insecure_udp_agent.rs +++ b/examples/foca_insecure_udp_agent.rs @@ -1,8 +1,15 @@ /* Any copyright is dedicated to the Public Domain. * https://creativecommons.org/publicdomain/zero/1.0/ */ use std::{ - collections::HashMap, fs::File, io::Write, net::SocketAddr, path::Path, str::FromStr, - sync::Arc, time::Duration, + cmp::Reverse, + collections::{BinaryHeap, HashMap}, + fs::File, + io::Write, + net::SocketAddr, + path::Path, + str::FromStr, + sync::Arc, + time::Duration, }; use tracing_subscriber::{ @@ -14,7 +21,12 @@ use tracing_subscriber::{ use bytes::{BufMut, Bytes, BytesMut}; use clap::{App, Arg}; use rand::{rngs::StdRng, SeedableRng}; -use tokio::{net::UdpSocket, sync::mpsc}; +use tokio::{ + net::UdpSocket, + sync::mpsc::{self, Receiver, UnboundedSender}, + task::JoinHandle, + time::{sleep, Instant}, +}; use foca::{Config, Foca, Identity, Notification, PostcardCodec, Runtime, Timer}; @@ -339,10 +351,14 @@ async fn main() -> Result<(), anyhow::Error> { } }); - // We'll also launch a task to manage Foca. Since there are timers - // involved, one simple way to do it is unifying the input: + // Handle timer events by a single task so that we don't risk + // out of order delivery under load + let (timer_tx, mut timer_rx) = mpsc::channel(10000); + let (scheduler, _handle) = launch_lame_timer(timer_tx).await; + + // We'll also launch a task to manage Foca. And receive network + // data enum Input { - Event(Timer), Data(Bytes), Announce(T), } @@ -353,29 +369,36 @@ async fn main() -> Result<(), anyhow::Error> { // instead. let mut runtime = AccumulatingRuntime::new(); let mut members = Members::new(); - let tx_foca_copy = tx_foca.clone(); tokio::spawn(async move { - while let Some(input) = rx_foca.recv().await { + loop { debug_assert_eq!(0, runtime.backlog()); - let result = match input { - Input::Event(timer) => foca.handle_timer(timer, &mut runtime), - Input::Data(data) => foca.handle_data(&data, &mut runtime), - Input::Announce(dst) => foca.announce(dst, &mut runtime), - }; - - // Every public foca result yields `()` on success, so there's - // nothing to do with Ok - if let Err(error) = result { - // And we'd decide what to do with each error, but Foca - // is pretty tolerant so we just log them and pretend - // all is fine - eprintln!("Ignored Error: {}", error); - } + tokio::select! { + biased; + Some(event) = timer_rx.recv() => { + foca.handle_timer(event, &mut runtime).expect("error handling"); + }, + Some(input) = rx_foca.recv() => { + let result = match input { + Input::Data(data) => foca.handle_data(&data, &mut runtime), + Input::Announce(dst) => foca.announce(dst, &mut runtime), + }; + + // Every public foca result yields `()` on success, so there's + // nothing to do with Ok + if let Err(error) = result { + // And we'd decide what to do with each error, but Foca + // is pretty tolerant so we just log them and pretend + // all is fine + eprintln!("Ignored Error: {}", error); + } - // Now we react to what happened. - // This is how we enable async: buffer one single interaction - // and then drain the runtime. + }, + else => { + // any of the channels has been closed + break; + } + } // First we submit everything that needs to go to the network while let Some((dst, data)) = runtime.to_send.pop() { @@ -385,11 +408,9 @@ async fn main() -> Result<(), anyhow::Error> { // Then schedule what needs to be scheduled while let Some((delay, event)) = runtime.to_schedule.pop() { - let own_input_handle = tx_foca_copy.clone(); - tokio::spawn(async move { - tokio::time::sleep(delay).await; - let _ignored_send_error = own_input_handle.send(Input::Event(event)).await; - }); + scheduler + .send((Instant::now() + delay, event)) + .expect("error handling"); } // And finally react to notifications. @@ -448,3 +469,71 @@ async fn main() -> Result<(), anyhow::Error> { let _ignored_send_error = tx_foca.send(Input::Data(databuf.split().freeze())).await; } } + +async fn launch_lame_timer( + timer_tx: mpsc::Sender>, +) -> (UnboundedSender<(Instant, Timer)>, JoinHandle<()>) { + let (tx, mut rx) = mpsc::unbounded_channel::<(Instant, Timer)>(); + + let mut queue = BinaryHeap::)>>::new(); + let jh = tokio::spawn(async move { + 'handler: loop { + let now = Instant::now(); + while queue + .peek() + .map(|Reverse((ins, _))| ins < &now) + .unwrap_or(false) + { + let Reverse((ins, event)) = queue.pop().expect("loop condition guarantees Some"); + // XXX could monitor for lag here + println!( + "handling event {:?} deadline={:?}, delta={:?}", + event, + ins, + now - ins + ); + timer_tx.send(event).await.unwrap(); // FIXME + } + + if let Some(Reverse((wake_at, _event))) = queue.peek() { + // wait for input OR sleep + let sleep_fut = tokio::time::sleep_until(*wake_at); + let recv_fut = rx.recv(); + + tokio::select! { + _ = sleep_fut => { + // woke up after deadline, time to take events + continue 'handler; + }, + maybe = recv_fut => { + if maybe.is_none() { + // channel closed + break 'handler; + } + let (when, event) = maybe.unwrap(); + if when < now { + timer_tx.send(event).await.unwrap(); // FIXME + } else { + queue.push(Reverse((when, event))); + } + } + }; + } else { + // no deadline set at present, just wait for input + if let Some((when, event)) = rx.recv().await { + if when < now { + timer_tx.send(event).await.unwrap(); // FIXME + } else { + queue.push(Reverse((when, event))); + } + } else { + // channel closed + break 'handler; + } + } + } + // input channel closed, future is done + }); + + (tx, jh) +}