diff --git a/crates/bevy_app/src/app.rs b/crates/bevy_app/src/app.rs index 2a47aabb964c0..7a3dc6d94dfb9 100644 --- a/crates/bevy_app/src/app.rs +++ b/crates/bevy_app/src/app.rs @@ -67,7 +67,7 @@ pub struct App { /// the application's event loop and advancing the [`Schedule`]. /// Typically, it is not configured manually, but set by one of Bevy's built-in plugins. /// See `bevy::winit::WinitPlugin` and [`ScheduleRunnerPlugin`](crate::schedule_runner::ScheduleRunnerPlugin). - pub runner: Box, + pub runner: Box, // Send bound is required to make App Send /// A container of [`Stage`]s set to be run in a linear order. pub schedule: Schedule, sub_apps: HashMap, @@ -87,10 +87,55 @@ impl Debug for App { } } -/// Each `SubApp` has its own [`Schedule`] and [`World`], enabling a separation of concerns. -struct SubApp { - app: App, - extract: Box, +/// A [`SubApp`] contains its own [`Schedule`] and [`World`] separate from the main [`App`]. +/// This is useful for situations where data and data processing should be kept completely separate +/// from the main application. The primary use of this feature in bevy is to enable pipelined rendering. +/// +/// # Example +/// +/// ```rust +/// # use bevy_app::{App, AppLabel}; +/// # use bevy_ecs::prelude::*; +/// +/// #[derive(Resource, Default)] +/// struct Val(pub i32); +/// +/// #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, AppLabel)] +/// struct ExampleApp; +/// +/// #[derive(Debug, Hash, PartialEq, Eq, Clone, StageLabel)] +/// struct ExampleStage; +/// +/// let mut app = App::empty(); +/// // initialize the main app with a value of 0; +/// app.insert_resource(Val(10)); +/// +/// // create a app with a resource and a single stage +/// let mut sub_app = App::empty(); +/// sub_app.insert_resource(Val(100)); +/// let mut example_stage = SystemStage::single_threaded(); +/// example_stage.add_system(|counter: Res| { +/// // since we assigned the value from the main world in extract +/// // we see that value instead of 100 +/// assert_eq!(counter.0, 10); +/// }); +/// sub_app.add_stage(ExampleStage, example_stage); +/// +/// // add the sub_app to the app +/// app.add_sub_app(ExampleApp, sub_app, |main_world, sub_app| { +/// sub_app.world.resource_mut::().0 = main_world.resource::().0; +/// }); +/// +/// // This will run the schedules once, since we're using the default runner +/// app.run(); +/// ``` +pub struct SubApp { + /// The [`SubApp`]'s instance of [`App`] + pub app: App, + + /// A function that allows access to both the [`SubApp`] [`World`] and the main [`App`]. This is + /// useful for moving data between the sub app and the main app. + pub extract: Box, } impl SubApp { @@ -161,11 +206,14 @@ impl App { /// /// See [`add_sub_app`](Self::add_sub_app) and [`run_once`](Schedule::run_once) for more details. pub fn update(&mut self) { - #[cfg(feature = "trace")] - let _bevy_frame_update_span = info_span!("frame").entered(); - self.schedule.run(&mut self.world); - - for sub_app in self.sub_apps.values_mut() { + { + #[cfg(feature = "trace")] + let _bevy_frame_update_span = info_span!("main app").entered(); + self.schedule.run(&mut self.world); + } + for (_label, sub_app) in self.sub_apps.iter_mut() { + #[cfg(feature = "trace")] + let _sub_app_span = info_span!("sub app", name = ?_label).entered(); sub_app.extract(&mut self.world); sub_app.run(); } @@ -832,7 +880,7 @@ impl App { /// App::new() /// .set_runner(my_runner); /// ``` - pub fn set_runner(&mut self, run_fn: impl Fn(App) + 'static) -> &mut Self { + pub fn set_runner(&mut self, run_fn: impl Fn(App) + 'static + Send) -> &mut Self { self.runner = Box::new(run_fn); self } @@ -1017,14 +1065,15 @@ impl App { /// Adds an [`App`] as a child of the current one. /// - /// The provided function `sub_app_runner` is called by the [`update`](Self::update) method. The [`World`] + /// The provided function `extract` is normally called by the [`update`](Self::update) method. + /// After extract is called, the [`Schedule`] of the sub app is run. The [`World`] /// parameter represents the main app world, while the [`App`] parameter is just a mutable /// reference to the `SubApp` itself. pub fn add_sub_app( &mut self, label: impl AppLabel, app: App, - extract: impl Fn(&mut World, &mut App) + 'static, + extract: impl Fn(&mut World, &mut App) + 'static + Send, ) -> &mut Self { self.sub_apps.insert( label.as_label(), @@ -1070,6 +1119,16 @@ impl App { } } + /// Inserts an existing sub app into the app + pub fn insert_sub_app(&mut self, label: impl AppLabel, sub_app: SubApp) { + self.sub_apps.insert(label.as_label(), sub_app); + } + + /// Removes a sub app from the app. Returns [`None`] if the label doesn't exist. + pub fn remove_sub_app(&mut self, label: impl AppLabel) -> Option { + self.sub_apps.remove(&label.as_label()) + } + /// Retrieves a `SubApp` inside this [`App`] with the given label, if it exists. Otherwise returns /// an [`Err`] containing the given label. pub fn get_sub_app(&self, label: impl AppLabel) -> Result<&App, impl AppLabel> { diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 68dd1f1ea798d..0db9627633ba1 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -1,11 +1,15 @@ +use std::sync::Arc; + +use crate as bevy_ecs; use crate::{ archetype::ArchetypeComponentId, query::Access, schedule::{ParallelSystemExecutor, SystemContainer}, + system::Resource, world::World, }; use async_channel::{Receiver, Sender}; -use bevy_tasks::{ComputeTaskPool, Scope, TaskPool}; +use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; #[cfg(feature = "trace")] use bevy_utils::tracing::Instrument; use event_listener::Event; @@ -14,6 +18,16 @@ use fixedbitset::FixedBitSet; #[cfg(test)] use scheduling_event::*; +/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread +#[derive(Resource, Default, Clone)] +pub struct MainThreadExecutor(pub Arc>); + +impl MainThreadExecutor { + pub fn new() -> Self { + MainThreadExecutor(Arc::new(ThreadExecutor::new())) + } +} + struct SystemSchedulingMetadata { /// Used to signal the system's task to start the system. start: Event, @@ -124,40 +138,46 @@ impl ParallelSystemExecutor for ParallelExecutor { } } - ComputeTaskPool::init(TaskPool::default).scope(|scope| { - self.prepare_systems(scope, systems, world); - if self.should_run.count_ones(..) == 0 { - return; - } - let parallel_executor = async { - // All systems have been ran if there are no queued or running systems. - while 0 != self.queued.count_ones(..) + self.running.count_ones(..) { - self.process_queued_systems(); - // Avoid deadlocking if no systems were actually started. - if self.running.count_ones(..) != 0 { - // Wait until at least one system has finished. - let index = self - .finish_receiver - .recv() - .await - .unwrap_or_else(|error| unreachable!("{}", error)); - self.process_finished_system(index); - // Gather other systems than may have finished. - while let Ok(index) = self.finish_receiver.try_recv() { + let thread_executor = world.get_resource::().map(|e| &*e.0); + + ComputeTaskPool::init(TaskPool::default).scope_with_executor( + false, + thread_executor, + |scope| { + self.prepare_systems(scope, systems, world); + if self.should_run.count_ones(..) == 0 { + return; + } + let parallel_executor = async { + // All systems have been ran if there are no queued or running systems. + while 0 != self.queued.count_ones(..) + self.running.count_ones(..) { + self.process_queued_systems(); + // Avoid deadlocking if no systems were actually started. + if self.running.count_ones(..) != 0 { + // Wait until at least one system has finished. + let index = self + .finish_receiver + .recv() + .await + .unwrap_or_else(|error| unreachable!("{}", error)); self.process_finished_system(index); + // Gather other systems than may have finished. + while let Ok(index) = self.finish_receiver.try_recv() { + self.process_finished_system(index); + } + // At least one system has finished, so active access is outdated. + self.rebuild_active_access(); } - // At least one system has finished, so active access is outdated. - self.rebuild_active_access(); + self.update_counters_and_queue_systems(); } - self.update_counters_and_queue_systems(); - } - }; - #[cfg(feature = "trace")] - let span = bevy_utils::tracing::info_span!("parallel executor"); - #[cfg(feature = "trace")] - let parallel_executor = parallel_executor.instrument(span); - scope.spawn(parallel_executor); - }); + }; + #[cfg(feature = "trace")] + let span = bevy_utils::tracing::info_span!("parallel executor"); + #[cfg(feature = "trace")] + let parallel_executor = parallel_executor.instrument(span); + scope.spawn(parallel_executor); + }, + ); } } diff --git a/crates/bevy_internal/src/default_plugins.rs b/crates/bevy_internal/src/default_plugins.rs index 1c8ee63d53aee..7bd49c761dc05 100644 --- a/crates/bevy_internal/src/default_plugins.rs +++ b/crates/bevy_internal/src/default_plugins.rs @@ -68,6 +68,12 @@ impl PluginGroup for DefaultPlugins { // NOTE: Load this after renderer initialization so that it knows about the supported // compressed texture formats .add(bevy_render::texture::ImagePlugin::default()); + + #[cfg(not(target_arch = "wasm32"))] + { + group = group + .add(bevy_render::pipelined_rendering::PipelinedRenderingPlugin::default()); + } } #[cfg(feature = "bevy_core_pipeline")] diff --git a/crates/bevy_render/Cargo.toml b/crates/bevy_render/Cargo.toml index eaa4d09af6bac..ceb49a04dff6f 100644 --- a/crates/bevy_render/Cargo.toml +++ b/crates/bevy_render/Cargo.toml @@ -44,6 +44,7 @@ bevy_time = { path = "../bevy_time", version = "0.9.0" } bevy_transform = { path = "../bevy_transform", version = "0.9.0" } bevy_window = { path = "../bevy_window", version = "0.9.0" } bevy_utils = { path = "../bevy_utils", version = "0.9.0" } +bevy_tasks = { path = "../bevy_tasks", version = "0.9.0" } # rendering image = { version = "0.24", default-features = false } @@ -76,3 +77,4 @@ basis-universal = { version = "0.2.0", optional = true } encase = { version = "0.4", features = ["glam"] } # For wgpu profiling using tracing. Use `RUST_LOG=info` to also capture the wgpu spans. profiling = { version = "1", features = ["profile-with-tracing"], optional = true } +async-channel = "1.4" diff --git a/crates/bevy_render/src/lib.rs b/crates/bevy_render/src/lib.rs index bb37fc829a7ad..a799d9546bf46 100644 --- a/crates/bevy_render/src/lib.rs +++ b/crates/bevy_render/src/lib.rs @@ -10,6 +10,7 @@ mod extract_param; pub mod extract_resource; pub mod globals; pub mod mesh; +pub mod pipelined_rendering; pub mod primitives; pub mod render_asset; pub mod render_graph; @@ -71,6 +72,9 @@ pub enum RenderStage { /// running the next frame while rendering the current frame. Extract, + /// A stage for applying the commands from the [`Extract`] stage + ExtractCommands, + /// Prepare render resources from the extracted data for the GPU. Prepare, @@ -190,8 +194,14 @@ impl Plugin for RenderPlugin { // after access to the main world is removed // See also https://github.com/bevyengine/bevy/issues/5082 extract_stage.set_apply_buffers(false); + + // This stage applies the commands from the extract stage while the render schedule + // is running in parallel with the main app. + let mut extract_commands_stage = SystemStage::parallel(); + extract_commands_stage.add_system(apply_extract_commands.at_start()); render_app .add_stage(RenderStage::Extract, extract_stage) + .add_stage(RenderStage::ExtractCommands, extract_commands_stage) .add_stage(RenderStage::Prepare, SystemStage::parallel()) .add_stage(RenderStage::Queue, SystemStage::parallel()) .add_stage(RenderStage::PhaseSort, SystemStage::parallel()) @@ -222,7 +232,7 @@ impl Plugin for RenderPlugin { app.add_sub_app(RenderApp, render_app, move |app_world, render_app| { #[cfg(feature = "trace")] - let _render_span = bevy_utils::tracing::info_span!("renderer subapp").entered(); + let _render_span = bevy_utils::tracing::info_span!("extract main app to render subapp").entered(); { #[cfg(feature = "trace")] let _stage_span = @@ -308,10 +318,12 @@ fn extract(app_world: &mut World, render_app: &mut App) { let inserted_world = render_world.remove_resource::().unwrap(); let scratch_world = std::mem::replace(app_world, inserted_world.0); app_world.insert_resource(ScratchMainWorld(scratch_world)); - - // Note: We apply buffers (read, Commands) after the `MainWorld` has been removed from the render app's world - // so that in future, pipelining will be able to do this too without any code relying on it. - // see - extract_stage.0.apply_buffers(render_world); }); } + +// system for render app to apply the extract commands +fn apply_extract_commands(world: &mut World) { + world.resource_scope(|world, mut extract_stage: Mut| { + extract_stage.0.apply_buffers(world); + }); +} diff --git a/crates/bevy_render/src/pipelined_rendering.rs b/crates/bevy_render/src/pipelined_rendering.rs new file mode 100644 index 0000000000000..63c0a0cfb74b6 --- /dev/null +++ b/crates/bevy_render/src/pipelined_rendering.rs @@ -0,0 +1,155 @@ +use async_channel::{Receiver, Sender}; + +use bevy_app::{App, AppLabel, Plugin, SubApp}; +use bevy_ecs::{ + schedule::{MainThreadExecutor, StageLabel, SystemStage}, + system::Resource, + world::{Mut, World}, +}; +use bevy_tasks::ComputeTaskPool; + +use crate::RenderApp; + +/// A Label for the sub app that runs the parts of pipelined rendering that need to run on the main thread. +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, AppLabel)] +pub struct RenderExtractApp; + +/// Labels for stages in the [`RenderExtractApp`] sub app. These will run after rendering has started. +#[derive(Debug, Hash, PartialEq, Eq, Clone, StageLabel)] +pub enum RenderExtractStage { + /// When pipelined rendering is enabled this stage runs after the render schedule starts, but + /// before I/O processing and the main app schedule. This can be useful for something like + /// frame pacing. + BeforeIoAfterRenderStart, +} + +/// Channel to send the render app from the main thread to the rendering thread +#[derive(Resource)] +pub struct MainToRenderAppSender(pub Sender); + +/// Channel to send the render app from the render thread to the main thread +#[derive(Resource)] +pub struct RenderToMainAppReceiver(pub Receiver); + +/// The [`PipelinedRenderingPlugin`] can be added to your application to enable pipelined rendering. +/// This moves rendering into a different thread, so that the Nth frame's rendering can +/// be run at the same time as the N + 1 frame's simulation. +/// +/// ```text +/// |--------------------|--------------------|--------------------|--------------------| +/// | simulation thread | frame 1 simulation | frame 2 simulation | frame 3 simulation | +/// |--------------------|--------------------|--------------------|--------------------| +/// | rendering thread | | frame 1 rendering | frame 2 rendering | +/// |--------------------|--------------------|--------------------|--------------------| +/// ``` +/// +/// The plugin is dependent on the [`crate::RenderApp`] added by [`crate::RenderPlugin`] and so must +/// be added after that plugin. If it is not added after, the plugin will do nothing. +/// +/// A single frame of execution looks something like below +/// +/// ```text +/// |-------------------------------------------------------------------| +/// | | BeforeIoAfterRenderStart | winit events | main schedule | +/// | extract |---------------------------------------------------------| +/// | | extract commands | rendering schedule | +/// |-------------------------------------------------------------------| +/// ``` +/// +/// - `extract` is the stage where data is copied from the main world to the render world. +/// This is run on the main app's thread. +/// - On the render thread, we first apply the `extract commands`. This is not run during extract, so the +/// main schedule can start sooner. +/// - Then the `rendering schedule` is run. See [`crate::RenderStage`] for the available stages. +/// - In parallel to the rendering thread we first run the [`RenderExtractStage::BeforeIoAfterRenderStart`] stage. By +/// default this stage is empty. But is useful if you need something to run before I/O processing. +/// - Next all the `winit events` are processed. +/// - And finally the `main app schedule` is run. +/// - Once both the `main app schedule` and the `render schedule` are finished running, `extract` is run again. +#[derive(Default)] +pub struct PipelinedRenderingPlugin; + +impl Plugin for PipelinedRenderingPlugin { + fn build(&self, app: &mut App) { + // Don't add RenderExtractApp if RenderApp isn't initialized. + if app.get_sub_app(RenderApp).is_err() { + return; + } + app.insert_resource(MainThreadExecutor::new()); + + let mut sub_app = App::empty(); + sub_app.add_stage( + RenderExtractStage::BeforeIoAfterRenderStart, + SystemStage::parallel(), + ); + app.add_sub_app(RenderExtractApp, sub_app, update_rendering); + } + + // Sets up the render thread and inserts resources into the main app used for controlling the render thread. + fn setup(&self, app: &mut App) { + // skip setting up when headless + if app.get_sub_app(RenderExtractApp).is_err() { + return; + } + + let (app_to_render_sender, app_to_render_receiver) = async_channel::bounded::(1); + let (render_to_app_sender, render_to_app_receiver) = async_channel::bounded::(1); + + let mut render_app = app + .remove_sub_app(RenderApp) + .expect("Unable to get RenderApp. Another plugin may have removed the RenderApp before PipelinedRenderingPlugin"); + + // clone main thread executor to render world + let executor = app.world.get_resource::().unwrap(); + render_app.app.world.insert_resource(executor.clone()); + + render_to_app_sender.send_blocking(render_app).unwrap(); + + app.insert_resource(MainToRenderAppSender(app_to_render_sender)); + app.insert_resource(RenderToMainAppReceiver(render_to_app_receiver)); + + std::thread::spawn(move || { + #[cfg(feature = "trace")] + let _span = bevy_utils::tracing::info_span!("render thread").entered(); + + loop { + // run a scope here to allow main world to use this thread while it's waiting for the render app + let mut render_app = ComputeTaskPool::get() + .scope(|s| { + s.spawn(async { app_to_render_receiver.recv().await.unwrap() }); + }) + .pop() + .unwrap(); + + #[cfg(feature = "trace")] + let _sub_app_span = + bevy_utils::tracing::info_span!("sub app", name = ?RenderApp).entered(); + render_app.run(); + render_to_app_sender.send_blocking(render_app).unwrap(); + } + }); + } +} + +// This function waits for the rendering world to be received, +// runs extract, and then sends the rendering world back to the render thread. +fn update_rendering(app_world: &mut World, _sub_app: &mut App) { + app_world.resource_scope(|world, main_thread_executor: Mut| { + // we use a scope here to run any main thread tasks that the render world still needs to run + // while we wait for the render world to be received. + let mut render_app = ComputeTaskPool::get() + .scope_with_executor(true, Some(&*main_thread_executor.0), |s| { + s.spawn(async { + let receiver = world.get_resource::().unwrap(); + receiver.0.recv().await.unwrap() + }); + }) + .pop() + .unwrap(); + + render_app.extract(world); + + let sender = world.resource::(); + sender.0.send_blocking(render_app).unwrap(); + }); +} diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index ae9b0a3dbd8dc..b5e340c2d65ec 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -15,7 +15,7 @@ pub use task_pool::{Scope, TaskPool, TaskPoolBuilder}; #[cfg(target_arch = "wasm32")] mod single_threaded_task_pool; #[cfg(target_arch = "wasm32")] -pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder}; +pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder, ThreadExecutor}; mod usages; #[cfg(not(target_arch = "wasm32"))] diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 8fa37f4f2361b..9b77d8fd3bb2c 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -9,6 +9,20 @@ use std::{ #[derive(Debug, Default, Clone)] pub struct TaskPoolBuilder {} +/// This is a dummy struct for wasm support to provide the same api as with the multithreaded +/// task pool. In the case of the multithreaded task pool this struct is used to spawn +/// tasks on a specific thread. But the wasm task pool just calls +/// [`wasm_bindgen_futures::spawn_local`] for spawning which just runs tasks on the main thread +/// and so the [`ThreadExecutor`] does nothing. +#[derive(Default)] +pub struct ThreadExecutor<'a>(PhantomData<&'a ()>); +impl<'a> ThreadExecutor<'a> { + /// Creates a new `ThreadExecutor` + pub fn new() -> Self { + Self(PhantomData::default()) + } +} + impl TaskPoolBuilder { /// Creates a new TaskPoolBuilder instance pub fn new() -> Self { @@ -63,6 +77,24 @@ impl TaskPool { /// /// This is similar to `rayon::scope` and `crossbeam::scope` pub fn scope<'env, F, T>(&self, f: F) -> Vec + where + F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>), + T: Send + 'static, + { + self.scope_with_executor(false, None, f) + } + + /// Allows spawning non-`static futures on the thread pool. The function takes a callback, + /// passing a scope object into it. The scope object provided to the callback can be used + /// to spawn tasks. This function will await the completion of all tasks before returning. + /// + /// This is similar to `rayon::scope` and `crossbeam::scope` + pub fn scope_with_executor<'env, F, T>( + &self, + _tick_task_pool_executor: bool, + _thread_executor: Option<&ThreadExecutor>, + f: F, + ) -> Vec where F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>), T: Send + 'static, diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 7bca59e7dba67..250bfba91f72c 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -2,15 +2,19 @@ use std::{ future::Future, marker::PhantomData, mem, + panic::AssertUnwindSafe, sync::Arc, thread::{self, JoinHandle}, }; use async_task::FallibleTask; use concurrent_queue::ConcurrentQueue; -use futures_lite::{future, pin, FutureExt}; +use futures_lite::{future, FutureExt}; -use crate::{thread_executor::ThreadExecutor, Task}; +use crate::{ + thread_executor::{ThreadExecutor, ThreadExecutorTicker}, + Task, +}; struct CallOnDrop(Option>); @@ -266,67 +270,166 @@ impl TaskPool { /// }); /// }); /// } - /// pub fn scope<'env, F, T>(&self, f: F) -> Vec where F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>), T: Send + 'static, { - Self::THREAD_EXECUTOR.with(|thread_executor| { - // SAFETY: This safety comment applies to all references transmuted to 'env. - // Any futures spawned with these references need to return before this function completes. - // This is guaranteed because we drive all the futures spawned onto the Scope - // to completion in this function. However, rust has no way of knowing this so we - // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety. - let executor: &async_executor::Executor = &self.executor; - let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) }; - let thread_executor: &'env ThreadExecutor<'env> = - unsafe { mem::transmute(thread_executor) }; - let spawned: ConcurrentQueue> = ConcurrentQueue::unbounded(); - let spawned_ref: &'env ConcurrentQueue> = - unsafe { mem::transmute(&spawned) }; - - let scope = Scope { - executor, - thread_executor, - spawned: spawned_ref, - scope: PhantomData, - env: PhantomData, - }; - - let scope_ref: &'env Scope<'_, 'env, T> = unsafe { mem::transmute(&scope) }; - - f(scope_ref); - - if spawned.is_empty() { - Vec::new() - } else { + Self::THREAD_EXECUTOR + .with(|thread_executor| self.scope_with_executor_inner(true, thread_executor, f)) + } + + /// This allows passing an external executor to spawn tasks on. When you pass an external executor + /// [`Scope::spawn_on_scope`] spawns is then run on the thread that [`ThreadExecutor`] is being ticked on. + /// If [`None`] is passed the scope will use a [`ThreadExecutor`] that is ticked on the current thread. + /// + /// When `tick_task_pool_executor` is set to `true`, the multithreaded task stealing executor is ticked on the scope + /// thread. Disabling this can be useful when finishing the scope is latency sensitive. Pulling tasks from + /// global excutor can run tasks unrelated to the scope and delay when the scope returns. + /// + /// See [`Self::scope`] for more details in general about how scopes work. + pub fn scope_with_executor<'env, F, T>( + &self, + tick_task_pool_executor: bool, + thread_executor: Option<&ThreadExecutor>, + f: F, + ) -> Vec + where + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>), + T: Send + 'static, + { + // If a `thread_executor` is passed use that. Otherwise get the `thread_executor` stored + // in the `THREAD_EXECUTOR` thread local. + if let Some(thread_executor) = thread_executor { + self.scope_with_executor_inner(tick_task_pool_executor, thread_executor, f) + } else { + Self::THREAD_EXECUTOR.with(|thread_executor| { + self.scope_with_executor_inner(tick_task_pool_executor, thread_executor, f) + }) + } + } + + fn scope_with_executor_inner<'env, F, T>( + &self, + tick_task_pool_executor: bool, + thread_executor: &ThreadExecutor, + f: F, + ) -> Vec + where + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>), + T: Send + 'static, + { + // SAFETY: This safety comment applies to all references transmuted to 'env. + // Any futures spawned with these references need to return before this function completes. + // This is guaranteed because we drive all the futures spawned onto the Scope + // to completion in this function. However, rust has no way of knowing this so we + // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety. + let executor: &async_executor::Executor = &self.executor; + let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) }; + let thread_executor: &'env ThreadExecutor<'env> = + unsafe { mem::transmute(thread_executor) }; + let spawned: ConcurrentQueue> = ConcurrentQueue::unbounded(); + let spawned_ref: &'env ConcurrentQueue> = + unsafe { mem::transmute(&spawned) }; + + let scope = Scope { + executor, + thread_executor, + spawned: spawned_ref, + scope: PhantomData, + env: PhantomData, + }; + + let scope_ref: &'env Scope<'_, 'env, T> = unsafe { mem::transmute(&scope) }; + + f(scope_ref); + + if spawned.is_empty() { + Vec::new() + } else { + future::block_on(async move { let get_results = async { let mut results = Vec::with_capacity(spawned_ref.len()); while let Ok(task) = spawned_ref.pop() { results.push(task.await.unwrap()); } - results }; - // Pin the futures on the stack. - pin!(get_results); + let tick_task_pool_executor = tick_task_pool_executor || self.threads.is_empty(); + if let Some(thread_ticker) = thread_executor.ticker() { + if tick_task_pool_executor { + Self::execute_local_global(thread_ticker, executor, get_results).await + } else { + Self::execute_local(thread_ticker, get_results).await + } + } else if tick_task_pool_executor { + Self::execute_global(executor, get_results).await + } else { + get_results.await + } + }) + } + } + + #[inline] + async fn execute_local_global<'scope, 'ticker, T>( + thread_ticker: ThreadExecutorTicker<'scope, 'ticker>, + executor: &'scope async_executor::Executor<'scope>, + get_results: impl Future>, + ) -> Vec { + // we restart the executors if a task errors. if a scoped + // task errors it will panic the scope on the call to get_results + let execute_forever = async move { + loop { + let tick_forever = async { + loop { + thread_ticker.tick().await; + } + }; + // we don't care if it errors. If a scoped task errors it will propagate + // to get_results + let _result = AssertUnwindSafe(executor.run(tick_forever)) + .catch_unwind() + .await + .is_ok(); + } + }; + execute_forever.or(get_results).await + } - let thread_ticker = thread_executor.ticker().unwrap(); - loop { - if let Some(result) = future::block_on(future::poll_once(&mut get_results)) { - break result; - }; + #[inline] + async fn execute_local<'scope, 'ticker, T>( + thread_ticker: ThreadExecutorTicker<'scope, 'ticker>, + get_results: impl Future>, + ) -> Vec { + let execute_forever = async { + loop { + let tick_forever = async { + loop { + thread_ticker.tick().await; + } + }; + let _result = AssertUnwindSafe(tick_forever).catch_unwind().await.is_ok(); + } + }; + execute_forever.or(get_results).await + } - std::panic::catch_unwind(|| { - executor.try_tick(); - thread_ticker.try_tick(); - }) - .ok(); - } + #[inline] + async fn execute_global<'scope, T>( + executor: &'scope async_executor::Executor<'scope>, + get_results: impl Future>, + ) -> Vec { + let execute_forever = async { + loop { + let _result = AssertUnwindSafe(executor.run(std::future::pending::<()>())) + .catch_unwind() + .await + .is_ok(); } - }) + }; + execute_forever.or(get_results).await } /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be