diff --git a/crates/bevy_app/Cargo.toml b/crates/bevy_app/Cargo.toml index b50acf709ea7a..49066100019c5 100644 --- a/crates/bevy_app/Cargo.toml +++ b/crates/bevy_app/Cargo.toml @@ -19,7 +19,6 @@ dynamic_plugins = ["libloading"] # bevy bevy_derive = { path = "../bevy_derive", version = "0.2.1" } bevy_ecs = { path = "../bevy_ecs", version = "0.2.1" } -bevy_tasks = { path = "../bevy_tasks", version = "0.2.1" } bevy_math = { path = "../bevy_math", version = "0.2.1" } # other diff --git a/crates/bevy_app/src/app.rs b/crates/bevy_app/src/app.rs index 76798d06809f2..3a6bb066cfe8e 100644 --- a/crates/bevy_app/src/app.rs +++ b/crates/bevy_app/src/app.rs @@ -1,4 +1,4 @@ -use crate::{app_builder::AppBuilder, DefaultTaskPoolOptions}; +use crate::app_builder::AppBuilder; use bevy_ecs::{ParallelExecutor, Resources, Schedule, World}; #[allow(clippy::needless_doctest_main)] @@ -64,20 +64,16 @@ impl App { } pub fn run(mut self) { - // Setup the default bevy task pools - self.resources - .get_cloned::() - .unwrap_or_else(DefaultTaskPoolOptions::default) - .create_default_pools(&mut self.resources); - self.startup_schedule .initialize(&mut self.world, &mut self.resources); + self.startup_executor.initialize(&mut self.resources); self.startup_executor.run( &mut self.startup_schedule, &mut self.world, &mut self.resources, ); + self.executor.initialize(&mut self.resources); let runner = std::mem::replace(&mut self.runner, Box::new(run_once)); (runner)(self); } diff --git a/crates/bevy_app/src/lib.rs b/crates/bevy_app/src/lib.rs index a409edbfafae3..d2e241616b55d 100644 --- a/crates/bevy_app/src/lib.rs +++ b/crates/bevy_app/src/lib.rs @@ -8,7 +8,6 @@ mod app_builder; mod event; mod plugin; mod schedule_runner; -mod task_pool_options; pub use app::*; pub use app_builder::*; @@ -16,7 +15,6 @@ pub use bevy_derive::DynamicPlugin; pub use event::*; pub use plugin::*; pub use schedule_runner::*; -pub use task_pool_options::*; pub mod prelude { pub use crate::{ diff --git a/crates/bevy_asset/Cargo.toml b/crates/bevy_asset/Cargo.toml index a74abe54a8d1c..1a183273b3163 100644 --- a/crates/bevy_asset/Cargo.toml +++ b/crates/bevy_asset/Cargo.toml @@ -20,6 +20,7 @@ filesystem_watcher = ["notify"] # bevy bevy_app = { path = "../bevy_app", version = "0.2.1" } bevy_ecs = { path = "../bevy_ecs", version = "0.2.1" } +bevy_tasks = { path = "../bevy_tasks", version = "0.2.1" } bevy_type_registry = { path = "../bevy_type_registry", version = "0.2.1" } bevy_property = { path = "../bevy_property", version = "0.2.1" } bevy_utils = { path = "../bevy_utils", version = "0.2.1" } diff --git a/crates/bevy_asset/src/asset_server.rs b/crates/bevy_asset/src/asset_server.rs index e473476acf0a7..210343a2fd55b 100644 --- a/crates/bevy_asset/src/asset_server.rs +++ b/crates/bevy_asset/src/asset_server.rs @@ -4,6 +4,7 @@ use crate::{ }; use anyhow::Result; use bevy_ecs::{Res, Resource, Resources}; +use bevy_tasks::TaskPool; use bevy_utils::{HashMap, HashSet}; use crossbeam_channel::TryRecvError; use parking_lot::RwLock; @@ -11,7 +12,6 @@ use std::{ env, fs, io, path::{Path, PathBuf}, sync::Arc, - thread, }; use thiserror::Error; @@ -38,12 +38,6 @@ pub enum AssetServerError { AssetWatchError { path: PathBuf }, } -struct LoaderThread { - // NOTE: these must remain private. the LoaderThread Arc counters are used to determine thread liveness - // if there is one reference, the loader thread is dead. if there are two references, the loader thread is active - requests: Arc>>, -} - /// Info about a specific asset, such as its path and its current load state #[derive(Clone, Debug)] pub struct AssetInfo { @@ -73,11 +67,10 @@ impl LoadState { /// Loads assets from the filesystem on background threads pub struct AssetServer { asset_folders: RwLock>, - loader_threads: RwLock>, - max_loader_threads: usize, asset_handlers: Arc>>>, // TODO: this is a hack to enable retrieving generic AssetLoaders. there must be a better way! loaders: Vec, + task_pool: TaskPool, extension_to_handler_index: HashMap, extension_to_loader_index: HashMap, asset_info: RwLock>, @@ -86,25 +79,22 @@ pub struct AssetServer { filesystem_watcher: Arc>>, } -impl Default for AssetServer { - fn default() -> Self { +impl AssetServer { + pub fn new(task_pool: TaskPool) -> Self { AssetServer { - #[cfg(feature = "filesystem_watcher")] - filesystem_watcher: Arc::new(RwLock::new(None)), - max_loader_threads: 4, asset_folders: Default::default(), - loader_threads: Default::default(), asset_handlers: Default::default(), loaders: Default::default(), extension_to_handler_index: Default::default(), extension_to_loader_index: Default::default(), asset_info_paths: Default::default(), asset_info: Default::default(), + task_pool, + #[cfg(feature = "filesystem_watcher")] + filesystem_watcher: Arc::new(RwLock::new(None)), } } -} -impl AssetServer { pub fn add_handler(&mut self, asset_handler: T) where T: AssetLoadRequestHandler, @@ -183,46 +173,6 @@ impl AssetServer { Ok(()) } - #[cfg(feature = "filesystem_watcher")] - pub fn filesystem_watcher_system(asset_server: Res) { - let mut changed = HashSet::default(); - - loop { - let result = { - let rwlock_guard = asset_server.filesystem_watcher.read(); - if let Some(filesystem_watcher) = rwlock_guard.as_ref() { - filesystem_watcher.receiver.try_recv() - } else { - break; - } - }; - let event = match result { - Ok(result) => result.unwrap(), - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => panic!("FilesystemWatcher disconnected"), - }; - if let notify::event::Event { - kind: notify::event::EventKind::Modify(_), - paths, - .. - } = event - { - for path in paths.iter() { - if !changed.contains(path) { - let root_path = asset_server.get_root_path().unwrap(); - let relative_path = path.strip_prefix(root_path).unwrap(); - match asset_server.load_untyped(relative_path) { - Ok(_) => {} - Err(AssetServerError::AssetLoadError(error)) => panic!("{:?}", error), - Err(_) => {} - } - } - } - changed.extend(paths); - } - } - } - fn get_root_path(&self) -> Result { if let Ok(manifest_dir) = env::var("CARGO_MANIFEST_DIR") { Ok(PathBuf::from(manifest_dir)) @@ -315,12 +265,21 @@ impl AssetServer { } }; - self.send_request_to_loader_thread(LoadRequest { + let load_request = LoadRequest { handle_id, path: path.to_owned(), handler_index: *index, version: new_version, - }); + }; + + let asset_handlers = self.asset_handlers.clone(); + self.task_pool + .spawn(async move { + let handlers = asset_handlers.read(); + let request_handler = &handlers[load_request.handler_index]; + request_handler.handle_request(&load_request); + }) + .detach(); // TODO: watching each asset explicitly is a simpler implementation, its possible it would be more efficient to watch // folders instead (when possible) @@ -370,56 +329,6 @@ impl AssetServer { Some(load_state) } - fn send_request_to_loader_thread(&self, load_request: LoadRequest) { - // NOTE: This lock makes the call to Arc::strong_count safe. Removing (or reordering) it could result in undefined behavior - let mut loader_threads = self.loader_threads.write(); - if loader_threads.len() < self.max_loader_threads { - let loader_thread = LoaderThread { - requests: Arc::new(RwLock::new(vec![load_request])), - }; - let requests = loader_thread.requests.clone(); - loader_threads.push(loader_thread); - Self::start_thread(self.asset_handlers.clone(), requests); - } else { - let most_free_thread = loader_threads - .iter() - .min_by_key(|l| l.requests.read().len()) - .unwrap(); - let mut requests = most_free_thread.requests.write(); - requests.push(load_request); - // if most free thread only has one reference, the thread as spun down. if so, we need to spin it back up! - if Arc::strong_count(&most_free_thread.requests) == 1 { - Self::start_thread( - self.asset_handlers.clone(), - most_free_thread.requests.clone(), - ); - } - } - } - - fn start_thread( - request_handlers: Arc>>>, - requests: Arc>>, - ) { - thread::spawn(move || { - loop { - let request = { - let mut current_requests = requests.write(); - if current_requests.len() == 0 { - // if there are no requests, spin down the thread - break; - } - - current_requests.pop().unwrap() - }; - - let handlers = request_handlers.read(); - let request_handler = &handlers[request.handler_index]; - request_handler.handle_request(&request); - } - }); - } - fn load_assets_in_folder_recursive( &self, path: &Path, @@ -456,3 +365,43 @@ impl AssetServer { Ok(handle_ids) } } + +#[cfg(feature = "filesystem_watcher")] +pub fn filesystem_watcher_system(asset_server: Res) { + let mut changed = HashSet::default(); + + loop { + let result = { + let rwlock_guard = asset_server.filesystem_watcher.read(); + if let Some(filesystem_watcher) = rwlock_guard.as_ref() { + filesystem_watcher.receiver.try_recv() + } else { + break; + } + }; + let event = match result { + Ok(result) => result.unwrap(), + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => panic!("FilesystemWatcher disconnected"), + }; + if let notify::event::Event { + kind: notify::event::EventKind::Modify(_), + paths, + .. + } = event + { + for path in paths.iter() { + if !changed.contains(path) { + let root_path = asset_server.get_root_path().unwrap(); + let relative_path = path.strip_prefix(root_path).unwrap(); + match asset_server.load_untyped(relative_path) { + Ok(_) => {} + Err(AssetServerError::AssetLoadError(error)) => panic!("{:?}", error), + Err(_) => {} + } + } + } + changed.extend(paths); + } + } +} diff --git a/crates/bevy_asset/src/lib.rs b/crates/bevy_asset/src/lib.rs index b2751a37fee1e..01186472482f2 100644 --- a/crates/bevy_asset/src/lib.rs +++ b/crates/bevy_asset/src/lib.rs @@ -8,6 +8,7 @@ mod loader; pub use asset_server::*; pub use assets::*; +use bevy_tasks::IoTaskPool; pub use handle::*; pub use load_request::*; pub use loader::*; @@ -33,15 +34,21 @@ pub struct AssetPlugin; impl Plugin for AssetPlugin { fn build(&self, app: &mut AppBuilder) { + let task_pool = app + .resources() + .get::() + .expect("IoTaskPool resource not found") + .0 + .clone(); app.add_stage_before(bevy_app::stage::PRE_UPDATE, stage::LOAD_ASSETS) .add_stage_after(bevy_app::stage::POST_UPDATE, stage::ASSET_EVENTS) - .init_resource::() + .add_resource(AssetServer::new(task_pool)) .register_property::(); #[cfg(feature = "filesystem_watcher")] app.add_system_to_stage( stage::LOAD_ASSETS, - AssetServer::filesystem_watcher_system.system(), + asset_server::filesystem_watcher_system.system(), ); } } diff --git a/crates/bevy_core/Cargo.toml b/crates/bevy_core/Cargo.toml index 061e8050797b3..79f9b0a332586 100644 --- a/crates/bevy_core/Cargo.toml +++ b/crates/bevy_core/Cargo.toml @@ -26,6 +26,9 @@ bevy_property = { path = "../bevy_property", version = "0.2.1" } bevy_type_registry = { path = "../bevy_type_registry", version = "0.2.1" } bevy_math = { path = "../bevy_math", version = "0.2.1" } bevy_utils = { path = "../bevy_utils", version = "0.2.1" } +bevy_tasks = { path = "../bevy_tasks", version = "0.2.1" } + +log = { version = "0.4", features = ["release_max_level_info"] } [target.'cfg(target_arch = "wasm32")'.dependencies] instant = "0.1.6" diff --git a/crates/bevy_core/src/lib.rs b/crates/bevy_core/src/lib.rs index c1e8f7169f708..be901f68140a0 100644 --- a/crates/bevy_core/src/lib.rs +++ b/crates/bevy_core/src/lib.rs @@ -1,15 +1,17 @@ mod bytes; mod float_ord; mod label; +mod task_pool_options; mod time; pub use bytes::*; pub use float_ord::*; pub use label::*; +pub use task_pool_options::DefaultTaskPoolOptions; pub use time::*; pub mod prelude { - pub use crate::{EntityLabels, Labels, Time, Timer}; + pub use crate::{DefaultTaskPoolOptions, EntityLabels, Labels, Time, Timer}; } use bevy_app::prelude::*; @@ -23,6 +25,12 @@ pub struct CorePlugin; impl Plugin for CorePlugin { fn build(&self, app: &mut AppBuilder) { + // Setup the default bevy task pools + app.resources_mut() + .get_cloned::() + .unwrap_or_else(DefaultTaskPoolOptions::default) + .create_default_pools(app.resources_mut()); + app.init_resource::