Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG! Probe cycle finished without running its full course #26

Closed
jeromegn opened this issue Jun 26, 2023 · 15 comments
Closed

BUG! Probe cycle finished without running its full course #26

jeromegn opened this issue Jun 26, 2023 · 15 comments

Comments

@jeromegn
Copy link

I noticed hits happening a lot in my new project using foca:

2023-06-26T17:33:00Z app[5918571e9b3e83] waw [info]2023-06-26T17:33:00.162462Z ERROR corro_agent::broadcast: foca: error handling timer: BUG! Probe cycle finished without running its full course
2023-06-26T17:33:00Z app[5918571e9b3e83] waw [info]2023-06-26T17:33:00.162586Z  WARN handle_timer{event=SendIndirectProbe { probed_id: Actor { id: ActorId(0c5dce8c-fd9b-41a1-9b7c-419629aa3e2b), addr: [fdaa:2:4742:a7b:188:5f1f:9d5f:2]:8787, bump: 5213 }, token: 0 }}: foca: SendIndirectProbe: Member not being probed probed_id=Actor { id: ActorId(0c5dce8c-fd9b-41a1-9b7c-419629aa3e2b), addr: [fdaa:2:4742:a7b:188:5f1f:9d5f:2]:8787, bump: 5213 }
2023-06-26T17:34:31Z app[178199dc492228] cdg [info]2023-06-26T17:34:31.239548Z ERROR corro_agent::broadcast: foca: error handling timer: BUG! Probe cycle finished without running its full course
2023-06-26T17:34:31Z app[178199dc492228] cdg [info]2023-06-26T17:34:31.239671Z  WARN handle_timer{event=SendIndirectProbe { probed_id: Actor { id: ActorId(0c5dce8c-fd9b-41a1-9b7c-419629aa3e2b), addr: [fdaa:2:4742:a7b:188:5f1f:9d5f:2]:8787, bump: 5213 }, token: 0 }}: foca: SendIndirectProbe: Member not being probed probed_id=Actor { id: ActorId(0c5dce8c-fd9b-41a1-9b7c-419629aa3e2b), addr: [fdaa:2:4742:a7b:188:5f1f:9d5f:2]:8787, bump: 5213 }

I noticed on that cdg instance, this happened a few seconds before:

2023-06-26T17:34:10Z app[178199dc492228] cdg [info]2023-06-26T17:34:10.570726Z  INFO handle_timer{event=ProbeRandomMember(0)}: foca: Member failed probe, will declare it down if it doesn't react member_id=Actor { id: ActorId(23ce24c0-4ddf-4e00-8679-eeb40087af20), addr: [fdaa:2:4742:a7b:d5a7:25d0:75f7:2]:8787, bump: 50696 } timeout=30s
2023-06-26T17:34:13Z app[178199dc492228] cdg [info]2023-06-26T17:34:13.369396Z  WARN handle_data{len=167 header=Header { src: Actor { id: ActorId(c5450e0c-52ac-4c33-9e71-fbc64a1c1f77), addr: [fdaa:2:4742:a7b:18e:91b5:604a:2]:8787, bump: 17046 }, src_incarnation: 0, dst: Actor { id: ActorId(341fe755-52b3-4808-888a-0cb45c1ff189), addr: [fdaa:2:4742:a7b:aebf:9821:21ff:2]:8787, bump: 43456 }, message: ForwardedAck { origin: Actor { id: ActorId(23ce24c0-4ddf-4e00-8679-eeb40087af20), addr: [fdaa:2:4742:a7b:d5a7:25d0:75f7:2]:8787, bump: 50696 }, probe_number: 51 } } num_updates=1}: foca: Unexpected ForwardedAck sender
2023-06-26T17:34:13Z app[178199dc492228] cdg [info]2023-06-26T17:34:13.425022Z  WARN handle_data{len=167 header=Header { src: Actor { id: ActorId(339568fc-e676-48eb-9387-c67e381515f6), addr: [fdaa:2:4742:a7b:144:4161:8932:2]:8787, bump: 54737 }, src_incarnation: 0, dst: Actor { id: ActorId(341fe755-52b3-4808-888a-0cb45c1ff189), addr: [fdaa:2:4742:a7b:aebf:9821:21ff:2]:8787, bump: 43456 }, message: ForwardedAck { origin: Actor { id: ActorId(23ce24c0-4ddf-4e00-8679-eeb40087af20), addr: [fdaa:2:4742:a7b:d5a7:25d0:75f7:2]:8787, bump: 50696 }, probe_number: 51 } } num_updates=1}: foca: Unexpected ForwardedAck sender
2023-06-26T17:34:13Z app[178199dc492228] cdg [info]2023-06-26T17:34:13.470475Z  WARN handle_data{len=167 header=Header { src: Actor { id: ActorId(90501bcc-b720-4327-961f-c2fb7f74b01c), addr: [fdaa:2:4742:a7b:104:f3da:d4ad:2]:8787, bump: 9228 }, src_incarnation: 0, dst: Actor { id: ActorId(341fe755-52b3-4808-888a-0cb45c1ff189), addr: [fdaa:2:4742:a7b:aebf:9821:21ff:2]:8787, bump: 43456 }, message: ForwardedAck { origin: Actor { id: ActorId(23ce24c0-4ddf-4e00-8679-eeb40087af20), addr: [fdaa:2:4742:a7b:d5a7:25d0:75f7:2]:8787, bump: 50696 }, probe_number: 51 } } num_updates=1}: foca: Unexpected ForwardedAck sender

(the cdg node's actor_id is 341fe755-52b3-4808-888a-0cb45c1ff189)

I'm not sure what is happening exactly, but I assume a bug in foca :) I'm using it in a fairly straightforward way.

@caio
Copy link
Owner

caio commented Jun 26, 2023

2023-06-26T17:33:00Z app[5918571e9b3e83] waw [info]2023-06-26T17:33:00.162462Z ERROR corro_agent::broadcast: foca: error handling timer: BUG! Probe cycle finished without running its full course
2023-06-26T17:33:00Z app[5918571e9b3e83] waw [info]2023-06-26T17:33:00.162586Z  WARN handle_timer{event=SendIndirectProbe { probed_id: Actor { id: ActorId(0c5dce8c-fd9b-41a1-9b7c-419629aa3e2b), addr: [fdaa:2:4742:a7b:188:5f1f:9d5f:2]:8787, bump: 5213 }, token: 0 }}: foca: SendIndirectProbe: Member not being probed probed_id=Actor { id: ActorId(0c5dce8c-fd9b-41a1-9b7c-419629aa3e2b), addr: [fdaa:2:4742:a7b:188:5f1f:9d5f:2]:8787, bump: 5213 }

These two logs are suspiciously happening at the same time, however they are triggered by two distinct timer events. The first by ProbeRandomMember, the second by SendIndirectProbe.

This implies that something scheduled both events to happen at the same time; If the events had happened in the reversed order, the handle_timer warnings wouldn't have appeared.

This is what I think it's happening:

  • Your config has probe_period set to the exact same value as probe_rtt
  • You are using something very similar to the example in-memory runtime, where events are queued and then scheduled in reverse order

So whenever a member fails a direct probe, foca is submitting messages as part of the indirect probe cycle correctly, but since rtt and period are the same and the runtime is processing events in reverse, it's attempting to start a new cycle without waiting for the indirect probe cycle event (hence the warning).

If that's the case, fixing the config should solve the problem. On foca's side, we could add some basic config validation to prevent the situation (the docs mention that probe_rtt should be strictly smaller than probe_period but the code doesn't really enforce it).

Otherwise, I'll need more info but not sure what exactly will be helpful apart from your config values so anything you can share would be helpful.

@jeromegn
Copy link
Author

This is how we produce the foca config (dynamic based on cluster size):

fn make_foca_config(cluster_size: NonZeroU32) -> foca::Config {
    let mut config = foca::Config::new_wan(cluster_size);
    config.remove_down_after = Duration::from_secs(2 * 24 * 60 * 60);

    // max payload size for udp over ipv6 wg - 1 for payload type
    config.max_packet_size = EFFECTIVE_CAP.try_into().unwrap();

    config
}

AFAICT, the probe_period is 5 seconds and probe_rtt is 3 seconds when using new_wan.

This is the entirety of our runtime, which is probably modeled from the one in your examples yes:

pub struct DispatchRuntime<T> {
    pub to_send: Sender<(T, Bytes)>,
    pub to_schedule: Sender<(Duration, Timer<T>)>,
    pub notifications: Sender<Notification<T>>,
    pub active: bool,
}

impl<T: Identity> Runtime<T> for DispatchRuntime<T> {
    fn notify(&mut self, notification: Notification<T>) {
        match &notification {
            Notification::Active => {
                self.active = true;
            }
            Notification::Idle | Notification::Defunct => {
                self.active = false;
            }
            _ => {}
        };
        if let Err(e) = self.notifications.try_send(notification) {
            increment_counter!("corro.channel.error", "type" => "full", "name" => "dispatch.notifications");
            error!("error dispatching notification: {e}");
        }
    }

    fn send_to(&mut self, to: T, data: &[u8]) {
        trace!("cluster send_to {to:?}");
        let packet = data.to_vec();

        if let Err(e) = self.to_send.try_send((to, packet.into())) {
            increment_counter!("corro.channel.error", "type" => "full", "name" => "dispatch.to_send");
            error!("error dispatching broadcast packet: {e}");
        }
    }

    fn submit_after(&mut self, event: Timer<T>, after: Duration) {
        if let Err(e) = self.to_schedule.try_send((after, event)) {
            increment_counter!("corro.channel.error", "type" => "full", "name" => "dispatch.to_schedule");
            error!("error dispatching scheduled event: {e}");
        }
    }
}

impl<T> DispatchRuntime<T> {
    pub fn new(
        to_send: Sender<(T, Bytes)>,
        to_schedule: Sender<(Duration, Timer<T>)>,
        notifications: Sender<Notification<T>>,
    ) -> Self {
        Self {
            to_send,
            to_schedule,
            notifications,
            active: false,
        }
    }
}

@caio
Copy link
Owner

caio commented Jun 27, 2023

Thank you for the extra info!

I haven't been able to outline a flow of events that would lead to this yet so I'm running a few instances locally to see if I can trigger this with simulated failures (dropping messages randomly mostly, one consistently dropping ping msgs); but this looks like a sequencing issue, it's unlikely I'll be able to craft a reproducing scenario without a good guess of what's going on.

The runtime code looks fine; What could lead to issues is the use of multiple channels (could there be a bias on the receiving end? say, prioritizing the data channel over the timer channel) and/or timer events being dropped due to a full buffer? (any error submitting a timer is a serious error that will lead to issues; the only good bias is biasing for timer events). Using a single unbounded channel would make this particular worry disappear.

It looks like your cluster size is 10ish? So very unlikely that the above is a problem, but I wouldn't try to juggle multiple channels nor add any sort of backpressure at this level (might be worthwhile to do so for the network send/receive part, closer to the actual socket- notification and timers should be low volume.

Are you able/willing to run a node with full debug logging until this situation arises? It will be a ton of data (shoot it at my email if you prefer), but it might help with figuring out what's going on.

@jeromegn
Copy link
Author

jeromegn commented Jun 27, 2023 via email

@jeromegn
Copy link
Author

jeromegn commented Jun 28, 2023

Here are debug-level logs for a single ago, from foca: https://gist.github.com/jeromegn/b00a4b19097eb1398c5bd04c0b5dbcbb

There's quite a lot in there. They span from the very beginning of the program where I apply past known member states and up to a few minutes after the "BUG!".

If you need more logs or logs from a different identity, let me know (within 1-2 days, the logs are deleted after that).

The runtime code looks fine; What could lead to issues is the use of multiple channels (could there be a bias on the receiving end? say, prioritizing the data channel over the timer channel) and/or timer events being dropped due to a full buffer? (any error submitting a timer is a serious error that will lead to issues; the only good bias is biasing for timer events). Using a single unbounded channel would make this particular worry disappear.

Interesting. This might be related, yes.

Here's the code where we create the DispatchRuntime and run the timer schedule:

    let (to_schedule_tx, mut to_schedule_rx) = channel(10240);

    let mut runtime: DispatchRuntime<Actor> =
        DispatchRuntime::new(to_send_tx, to_schedule_tx, notifications_tx);

    let (timer_tx, mut timer_rx) = channel(1);
    tokio::spawn(async move {
        while let Some((duration, timer)) = to_schedule_rx.recv().await {
            let timer_tx = timer_tx.clone();
            tokio::spawn(async move {
                tokio::time::sleep(duration).await;
                timer_tx.send(timer).await.ok();
            });
        }
    });
// ...

tokio::select! {
    biased;
    input = rx_foca.recv() => // ...
    timer = timer_rx.recv() => match timer {
        Some(timer) => {
            Branch::HandleTimer(timer)
        },
        None => {
            warn!("no more foca timers, breaking");
            break;
        }
    },
    // ...

Basically:

  • Bounded channel of 10K capacity for the timer
  • That then sends it to a channel 1
  • We're processing them with a tokio::select! biased towards receiving data and such.

Sounds like I should put the timer_rx at the top there so it processes these before other kinds of events. I can't process them directly from the spawned tokio task because I don't have mut access to the Foca instance.

@caio
Copy link
Owner

caio commented Jun 28, 2023

Thank you! At a glance it does look like things run normally until suddenly the two timer events arrive out of order. I'll be digging further to see if I can spot a bug on foca, but here's my current hand wavy guess:

You're using a task based timer (I do it too, it's very easy); Even if tokio's timer guarantees scheduling wakes in order after a lag (say: the vm gets suspended for a few seconds, thread time advances), it can't guarantee that the tasks you spawned will be polled in the order in which they went from pending to ready.

To prevent this from happening, I think we'd need every Timer event to arrive and be handled by a single task (so, likely a timer impl running on top of a os thread like what I assume tokio does internally).

Not sure it's worth the trouble because since the first time you encountered this error we've made sure foca can recover from bad sequencing in the probe; so long as things go back to normal, this error popping up under stress is ok.

It is, however, quite annoying. You'd want less noise when dealing with load issues; Especially this kinda noise in particular since it screams BUG at the logs (I'm very proud of this little logic check there 😬 ).

If it were me, since I know nothing about implementing timers reliably, I'd take a few steps to reduce the chances of this happening. I think these can help:

  • Use a single threaded eventloop if you aren't already (assuming there's a timer per thread in the runtime)
  • Rudimentary out-of-order delivery mitigation:

You now submit timers with a sequence number (a timestamp is easy, but can easily just be an atomic counter):

tokio::spawn(async move {
    let seqno = Instant::now(); // important: acquired before the sleep
    tokio::time::sleep(duration).await;
    timer_tx.send((timer, seqno)).await.ok();
});

Then in the receive loop you can always drain timer_rx: if there are more than one event in the buffer, you sort them based on seqno before submitting.

This way the window of opportunity for a sequencing issue to appear is way smaller than it currently is (can still happen tho). Notice that the real impact of these mitigations is reducing log noise, and preventing skipping a single probe round (we could make foca recover and probe immediately too).


I'll spend some more time staring at the logs and state in the near future to see if I'm missing something - I haven't discarded the possibility of a bug in foca; With my ad-hoc local simulation I haven't triggered anything yet, but I did find a case where I can speed up suspicion refutation, minimizing false positives, so it's a win already in my book :)

@jeromegn
Copy link
Author

I'm still getting a lot of these BUG! lines, maybe more than before?

I tried sorting w/ the following logic:

// timer spawner
let (timer_tx, mut timer_rx) = channel(10);
tokio::spawn(async move {
    while let Some((duration, timer)) = to_schedule_rx.recv().await {
        let timer_tx = timer_tx.clone();
        let seq = Instant::now();
        tokio::spawn(async move {
            tokio::time::sleep(duration).await;
            timer_tx.send((timer, seq)).await.ok();
        });
    }
});

// loop w/ tokio::select!
loop {
    let branch = tokio::select! {
        biased;
        timer = timer_rx.recv() => match timer {
            Some((timer, seq)) => {
                Branch::HandleTimer(timer, seq)
            },
            None => {
                warn!("no more foca timers, breaking");
                break;
            }
        },
        input = rx_foca.recv() => match input {
        },
        //...
    }
}


// ... handling logic
Branch::HandleTimer(timer, seq) => {
    let mut v = vec![(timer, seq)];

    // drain the channel, in case there's a race among timers
    while let Ok((timer, seq)) = timer_rx.try_recv() {
        v.push((timer, seq));
    }

    // sort by instant these were scheduled
    v.sort_by(|a, b| a.1.cmp(&b.1));

    for (timer, _) in v {
        if let Err(e) = foca.handle_timer(timer, &mut runtime) {
            error!("foca: error handling timer: {e}");
        }
    }
}

I noticed I didn't put the seqno instantiation at the same place, but I figured it was maybe in error that it was inside the spawn? maybe not! I'm going to change that and deploy again.

@caio
Copy link
Owner

caio commented Jun 30, 2023

Oh, my bad! You're seeing it more often indeed haha Just using now() (or a simple counter as I suggested initially) pretty much guarantees that you'll hit the bug in case of a runtime lag :spockfacepalm:

The seqno should be Instant::now() + the timer duration. What we want to guarantee is that the events are processed in the order the should've been fired, not in order they were dispatched.

@caio
Copy link
Owner

caio commented Jun 30, 2023

Here's what I'll do on foca side soonish (writing it down so I don't forget):

  • (Try to) add total ordering to timer events, so that anybody dealing with these sort of issues can simply .sort_unstable() the events instead of dancing with Instant
  • Speed up suspicion refutation by immediately upon detection
  • Trigger a Ping during the bad sequencing bug, so that there's minimal impact when it happens

@jeromegn
Copy link
Author

I ended up trying a LocalSet so these tasks are always spawned on the same thread. I think it's working.

#[derive(Clone)]
struct TimerSpawner {
    send: mpsc::UnboundedSender<(Duration, Timer<Actor>)>,
}

impl TimerSpawner {
    pub fn new(timer_tx: mpsc::Sender<(Timer<Actor>, Instant)>) -> Self {
        let (send, mut recv) = mpsc::unbounded_channel();

        let rt = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap();

        std::thread::spawn({
            let timer_tx = timer_tx.clone();
            move || {
                let local = LocalSet::new();

                local.spawn_local(async move {
                    while let Some((duration, timer)) = recv.recv().await {
                        let seq = Instant::now();

                        let timer_tx = timer_tx.clone();
                        tokio::task::spawn_local(async move {
                            tokio::time::sleep(duration).await;
                            timer_tx.send((timer, seq)).await.ok();
                        });
                    }
                    // If the while loop returns, then all the LocalSpawner
                    // objects have been dropped.
                });

                // This will return once all senders are dropped and all
                // spawned tasks have returned.
                rt.block_on(local);
            }
        });

        Self { send }
    }

    pub fn spawn(&self, task: (Duration, Timer<Actor>)) {
        self.send
            .send(task)
            .expect("Thread with LocalSet has shut down.");
    }
}
    let (timer_tx, mut timer_rx) = channel(10);
    let timer_spawner = TimerSpawner::new(timer_tx);

    tokio::spawn(async move {
        while let Some((duration, timer)) = to_schedule_rx.recv().await {
            timer_spawner.spawn((duration, timer));
        }
    });

@caio
Copy link
Owner

caio commented Jun 30, 2023

Oh, that's interesting. Hadn't stumbled on tokio's LocalSet but looks like you get a single thread handling every timer event - even if there's any attempt at fairness in the scheduler, the odds of out of order delivery are way smaller. Looks like this is the way to go for timers in user space on tokio, cool stuff!

@caio
Copy link
Owner

caio commented Jul 9, 2023

I've released v0.13.0 with the changes I mentioned on #26 (comment)

Now you'll be able to simply sort_unstable() the Vec with the piled up timer events; First index should be the first to be processed

If you'd rather keep you current solution, I'd just make sure to adjust seq to be Instant::now() + duration; The reason it works right now is because foca emits the events in the order they need to be fired, but that's an implementation detail

I couldn't figure out a way to trigger this scenario that didn't involve messing with the timer, I'll be assuming that was the cause. Closing this ticket, as always: feel free to reopen 😄

@caio caio closed this as completed Jul 9, 2023
@jeromegn
Copy link
Author

So all I need to do now is to keep draining, but sort_unstable on the timers? Do I still need the dedicated single-threaded runtime you think?

@caio
Copy link
Owner

caio commented Jul 11, 2023

Yup, sort_unstable suffices for ordering in case of a pileup- the reason it's this simple is because there's never more than one of each probe-related event, so the sequencing is trivial.

I think you still need the TimerSpawner thing (or something similar) to minimize the chances for the out of order delivery; But should be fine to use it on top of a multi-threaded runtime.

@caio
Copy link
Owner

caio commented Jul 11, 2023

Hey @jeromegn,

Having to spin up a runtime just to solve this irked me. Here's what I think is better and can be used without any worries about ooo delivery:

f886ea2

It's a single task and controls every timer event directly- shouldn't have any sequencing problems and is definitely less resource intensive for foca use cases

Might have bugs, definitely needs error handling and a cleanup, etc etc, but it seems to work fine with very shallow testing. Gonna leave it at that for now but thought you'd be interested in this simpler approach

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants