From ff7d5ff44407f5eb97877ea80baaf22995a57c26 Mon Sep 17 00:00:00 2001 From: Mike Date: Fri, 3 Feb 2023 07:16:02 +0000 Subject: [PATCH] Stageless: close the finish channel so executor doesn't deadlock (#7448) # Objective - Fix panic_when_hierachy_cycle test hanging - The problem is that the scope only awaits one task at a time in get_results. In stageless this task is the multithreaded executor. That tasks hangs when a system panics and cannot make anymore progress. This wasn't a problem before because the executor was spawned after all the system tasks had been spawned. But in stageless the executor is spawned before all the system tasks are spawned. ## Solution - We can catch unwind on each system and close the finish channel if one panics. This then causes the receiver end of the finish channel to panic too. - this might have a small perf impact, but when running many_foxes it seems to be within the noise. So less than 40us. ## Other possible solutions - It might be possible to fairly poll all the tasks in get_results in the scope. If we could do that then the scope could panic whenever one of tasks panics. It would require a data structure that we could both poll the futures through a shared ref and also push to it. I tried FuturesUnordered, but it requires an exclusive ref to poll it. - The catch unwind could be moved onto when we create the tasks for scope instead. We would then need something like a oneshot async channel to inform get_results if a task panics. --- .../schedule_v3/executor/multi_threaded.rs | 67 +++++++++++++------ 1 file changed, 46 insertions(+), 21 deletions(-) diff --git a/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs index 0796ce1963b62..84738f4bbe9cb 100644 --- a/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs @@ -1,3 +1,5 @@ +use std::panic::AssertUnwindSafe; + use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; use bevy_utils::default; use bevy_utils::syncunsafecell::SyncUnsafeCell; @@ -175,11 +177,10 @@ impl SystemExecutor for MultiThreadedExecutor { if self.num_running_systems > 0 { // wait for systems to complete - let index = self - .receiver - .recv() - .await - .unwrap_or_else(|error| unreachable!("{}", error)); + let index = + self.receiver.recv().await.expect( + "A system has panicked so the executor cannot continue.", + ); self.finish_system_and_signal_dependents(index); @@ -429,14 +430,22 @@ impl MultiThreadedExecutor { let task = async move { #[cfg(feature = "trace")] let system_guard = system_span.enter(); - // SAFETY: access is compatible - unsafe { system.run_unsafe((), world) }; + let res = std::panic::catch_unwind(AssertUnwindSafe(|| { + // SAFETY: access is compatible + unsafe { system.run_unsafe((), world) }; + })); #[cfg(feature = "trace")] drop(system_guard); - sender - .send(system_index) - .await - .unwrap_or_else(|error| unreachable!("{}", error)); + if res.is_err() { + // close the channel to propagate the error to the + // multithreaded executor + sender.close(); + } else { + sender + .send(system_index) + .await + .unwrap_or_else(|error| unreachable!("{}", error)); + } }; #[cfg(feature = "trace")] @@ -479,13 +488,21 @@ impl MultiThreadedExecutor { let task = async move { #[cfg(feature = "trace")] let system_guard = system_span.enter(); - apply_system_buffers(&unapplied_systems, systems, world); + let res = std::panic::catch_unwind(AssertUnwindSafe(|| { + apply_system_buffers(&unapplied_systems, systems, world); + })); #[cfg(feature = "trace")] drop(system_guard); - sender - .send(system_index) - .await - .unwrap_or_else(|error| unreachable!("{}", error)); + if res.is_err() { + // close the channel to propagate the error to the + // multithreaded executor + sender.close(); + } else { + sender + .send(system_index) + .await + .unwrap_or_else(|error| unreachable!("{}", error)); + } }; #[cfg(feature = "trace")] @@ -495,13 +512,21 @@ impl MultiThreadedExecutor { let task = async move { #[cfg(feature = "trace")] let system_guard = system_span.enter(); - system.run((), world); + let res = std::panic::catch_unwind(AssertUnwindSafe(|| { + system.run((), world); + })); #[cfg(feature = "trace")] drop(system_guard); - sender - .send(system_index) - .await - .unwrap_or_else(|error| unreachable!("{}", error)); + if res.is_err() { + // close the channel to propagate the error to the + // multithreaded executor + sender.close(); + } else { + sender + .send(system_index) + .await + .unwrap_or_else(|error| unreachable!("{}", error)); + } }; #[cfg(feature = "trace")]