Skip to content

Commit

Permalink
WIP Use a single task to handle timer events
Browse files Browse the repository at this point in the history
  • Loading branch information
caio committed Jul 11, 2023
1 parent 04fd49c commit f886ea2
Showing 1 changed file with 119 additions and 30 deletions.
149 changes: 119 additions & 30 deletions examples/foca_insecure_udp_agent.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
/* Any copyright is dedicated to the Public Domain.
* https://creativecommons.org/publicdomain/zero/1.0/ */
use std::{
collections::HashMap, fs::File, io::Write, net::SocketAddr, path::Path, str::FromStr,
sync::Arc, time::Duration,
cmp::Reverse,
collections::{BinaryHeap, HashMap},
fs::File,
io::Write,
net::SocketAddr,
path::Path,
str::FromStr,
sync::Arc,
time::Duration,
};

use tracing_subscriber::{
Expand All @@ -14,7 +21,12 @@ use tracing_subscriber::{
use bytes::{BufMut, Bytes, BytesMut};
use clap::{App, Arg};
use rand::{rngs::StdRng, SeedableRng};
use tokio::{net::UdpSocket, sync::mpsc};
use tokio::{
net::UdpSocket,
sync::mpsc::{self, Receiver, UnboundedSender},
task::JoinHandle,
time::{sleep, Instant},
};

use foca::{Config, Foca, Identity, Notification, PostcardCodec, Runtime, Timer};

Expand Down Expand Up @@ -339,10 +351,14 @@ async fn main() -> Result<(), anyhow::Error> {
}
});

// We'll also launch a task to manage Foca. Since there are timers
// involved, one simple way to do it is unifying the input:
// Handle timer events by a single task so that we don't risk
// out of order delivery under load
let (timer_tx, mut timer_rx) = mpsc::channel(10000);
let (scheduler, _handle) = launch_lame_timer(timer_tx).await;

// We'll also launch a task to manage Foca. And receive network
// data
enum Input<T> {
Event(Timer<T>),
Data(Bytes),
Announce(T),
}
Expand All @@ -353,29 +369,36 @@ async fn main() -> Result<(), anyhow::Error> {
// instead.
let mut runtime = AccumulatingRuntime::new();
let mut members = Members::new();
let tx_foca_copy = tx_foca.clone();
tokio::spawn(async move {
while let Some(input) = rx_foca.recv().await {
loop {
debug_assert_eq!(0, runtime.backlog());

let result = match input {
Input::Event(timer) => foca.handle_timer(timer, &mut runtime),
Input::Data(data) => foca.handle_data(&data, &mut runtime),
Input::Announce(dst) => foca.announce(dst, &mut runtime),
};

// Every public foca result yields `()` on success, so there's
// nothing to do with Ok
if let Err(error) = result {
// And we'd decide what to do with each error, but Foca
// is pretty tolerant so we just log them and pretend
// all is fine
eprintln!("Ignored Error: {}", error);
}
tokio::select! {
biased;
Some(event) = timer_rx.recv() => {
foca.handle_timer(event, &mut runtime).expect("error handling");
},
Some(input) = rx_foca.recv() => {
let result = match input {
Input::Data(data) => foca.handle_data(&data, &mut runtime),
Input::Announce(dst) => foca.announce(dst, &mut runtime),
};

// Every public foca result yields `()` on success, so there's
// nothing to do with Ok
if let Err(error) = result {
// And we'd decide what to do with each error, but Foca
// is pretty tolerant so we just log them and pretend
// all is fine
eprintln!("Ignored Error: {}", error);
}

// Now we react to what happened.
// This is how we enable async: buffer one single interaction
// and then drain the runtime.
},
else => {
// any of the channels has been closed
break;
}
}

// First we submit everything that needs to go to the network
while let Some((dst, data)) = runtime.to_send.pop() {
Expand All @@ -385,11 +408,9 @@ async fn main() -> Result<(), anyhow::Error> {

// Then schedule what needs to be scheduled
while let Some((delay, event)) = runtime.to_schedule.pop() {
let own_input_handle = tx_foca_copy.clone();
tokio::spawn(async move {
tokio::time::sleep(delay).await;
let _ignored_send_error = own_input_handle.send(Input::Event(event)).await;
});
scheduler
.send((Instant::now() + delay, event))
.expect("error handling");
}

// And finally react to notifications.
Expand Down Expand Up @@ -448,3 +469,71 @@ async fn main() -> Result<(), anyhow::Error> {
let _ignored_send_error = tx_foca.send(Input::Data(databuf.split().freeze())).await;
}
}

async fn launch_lame_timer(
timer_tx: mpsc::Sender<Timer<ID>>,
) -> (UnboundedSender<(Instant, Timer<ID>)>, JoinHandle<()>) {
let (tx, mut rx) = mpsc::unbounded_channel::<(Instant, Timer<ID>)>();

let mut queue = BinaryHeap::<Reverse<(Instant, Timer<ID>)>>::new();
let jh = tokio::spawn(async move {
'handler: loop {
let now = Instant::now();
while queue
.peek()
.map(|Reverse((ins, _))| ins < &now)
.unwrap_or(false)
{
let Reverse((ins, event)) = queue.pop().expect("loop condition guarantees Some");
// XXX could monitor for lag here
println!(
"handling event {:?} deadline={:?}, delta={:?}",
event,
ins,
now - ins
);
timer_tx.send(event).await.unwrap(); // FIXME
}

if let Some(Reverse((wake_at, _event))) = queue.peek() {
// wait for input OR sleep
let sleep_fut = tokio::time::sleep_until(*wake_at);
let recv_fut = rx.recv();

tokio::select! {
_ = sleep_fut => {
// woke up after deadline, time to take events
continue 'handler;
},
maybe = recv_fut => {
if maybe.is_none() {
// channel closed
break 'handler;
}
let (when, event) = maybe.unwrap();
if when < now {
timer_tx.send(event).await.unwrap(); // FIXME
} else {
queue.push(Reverse((when, event)));
}
}
};
} else {
// no deadline set at present, just wait for input
if let Some((when, event)) = rx.recv().await {
if when < now {
timer_tx.send(event).await.unwrap(); // FIXME
} else {
queue.push(Reverse((when, event)));
}
} else {
// channel closed
break 'handler;
}
}
}
// input channel closed, future is done
});

(tx, jh)
}

0 comments on commit f886ea2

Please sign in to comment.