Skip to content

Commit

Permalink
asset: use bevy_tasks in AssetServer (#550)
Browse files Browse the repository at this point in the history
  • Loading branch information
cart authored Sep 22, 2020
1 parent dd6f0b5 commit 028a22b
Show file tree
Hide file tree
Showing 16 changed files with 141 additions and 174 deletions.
1 change: 0 additions & 1 deletion crates/bevy_app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ dynamic_plugins = ["libloading"]
# bevy
bevy_derive = { path = "../bevy_derive", version = "0.2.1" }
bevy_ecs = { path = "../bevy_ecs", version = "0.2.1" }
bevy_tasks = { path = "../bevy_tasks", version = "0.2.1" }
bevy_math = { path = "../bevy_math", version = "0.2.1" }

# other
Expand Down
10 changes: 3 additions & 7 deletions crates/bevy_app/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{app_builder::AppBuilder, DefaultTaskPoolOptions};
use crate::app_builder::AppBuilder;
use bevy_ecs::{ParallelExecutor, Resources, Schedule, World};

#[allow(clippy::needless_doctest_main)]
Expand Down Expand Up @@ -64,20 +64,16 @@ impl App {
}

pub fn run(mut self) {
// Setup the default bevy task pools
self.resources
.get_cloned::<DefaultTaskPoolOptions>()
.unwrap_or_else(DefaultTaskPoolOptions::default)
.create_default_pools(&mut self.resources);

self.startup_schedule
.initialize(&mut self.world, &mut self.resources);
self.startup_executor.initialize(&mut self.resources);
self.startup_executor.run(
&mut self.startup_schedule,
&mut self.world,
&mut self.resources,
);

self.executor.initialize(&mut self.resources);
let runner = std::mem::replace(&mut self.runner, Box::new(run_once));
(runner)(self);
}
Expand Down
2 changes: 0 additions & 2 deletions crates/bevy_app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ mod app_builder;
mod event;
mod plugin;
mod schedule_runner;
mod task_pool_options;

pub use app::*;
pub use app_builder::*;
pub use bevy_derive::DynamicPlugin;
pub use event::*;
pub use plugin::*;
pub use schedule_runner::*;
pub use task_pool_options::*;

pub mod prelude {
pub use crate::{
Expand Down
1 change: 1 addition & 0 deletions crates/bevy_asset/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ filesystem_watcher = ["notify"]
# bevy
bevy_app = { path = "../bevy_app", version = "0.2.1" }
bevy_ecs = { path = "../bevy_ecs", version = "0.2.1" }
bevy_tasks = { path = "../bevy_tasks", version = "0.2.1" }
bevy_type_registry = { path = "../bevy_type_registry", version = "0.2.1" }
bevy_property = { path = "../bevy_property", version = "0.2.1" }
bevy_utils = { path = "../bevy_utils", version = "0.2.1" }
Expand Down
167 changes: 58 additions & 109 deletions crates/bevy_asset/src/asset_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use crate::{
};
use anyhow::Result;
use bevy_ecs::{Res, Resource, Resources};
use bevy_tasks::TaskPool;
use bevy_utils::{HashMap, HashSet};
use crossbeam_channel::TryRecvError;
use parking_lot::RwLock;
use std::{
env, fs, io,
path::{Path, PathBuf},
sync::Arc,
thread,
};

use thiserror::Error;
Expand All @@ -38,12 +38,6 @@ pub enum AssetServerError {
AssetWatchError { path: PathBuf },
}

struct LoaderThread {
// NOTE: these must remain private. the LoaderThread Arc counters are used to determine thread liveness
// if there is one reference, the loader thread is dead. if there are two references, the loader thread is active
requests: Arc<RwLock<Vec<LoadRequest>>>,
}

/// Info about a specific asset, such as its path and its current load state
#[derive(Clone, Debug)]
pub struct AssetInfo {
Expand Down Expand Up @@ -73,11 +67,10 @@ impl LoadState {
/// Loads assets from the filesystem on background threads
pub struct AssetServer {
asset_folders: RwLock<Vec<PathBuf>>,
loader_threads: RwLock<Vec<LoaderThread>>,
max_loader_threads: usize,
asset_handlers: Arc<RwLock<Vec<Box<dyn AssetLoadRequestHandler>>>>,
// TODO: this is a hack to enable retrieving generic AssetLoader<T>s. there must be a better way!
loaders: Vec<Resources>,
task_pool: TaskPool,
extension_to_handler_index: HashMap<String, usize>,
extension_to_loader_index: HashMap<String, usize>,
asset_info: RwLock<HashMap<HandleId, AssetInfo>>,
Expand All @@ -86,25 +79,22 @@ pub struct AssetServer {
filesystem_watcher: Arc<RwLock<Option<FilesystemWatcher>>>,
}

impl Default for AssetServer {
fn default() -> Self {
impl AssetServer {
pub fn new(task_pool: TaskPool) -> Self {
AssetServer {
#[cfg(feature = "filesystem_watcher")]
filesystem_watcher: Arc::new(RwLock::new(None)),
max_loader_threads: 4,
asset_folders: Default::default(),
loader_threads: Default::default(),
asset_handlers: Default::default(),
loaders: Default::default(),
extension_to_handler_index: Default::default(),
extension_to_loader_index: Default::default(),
asset_info_paths: Default::default(),
asset_info: Default::default(),
task_pool,
#[cfg(feature = "filesystem_watcher")]
filesystem_watcher: Arc::new(RwLock::new(None)),
}
}
}

impl AssetServer {
pub fn add_handler<T>(&mut self, asset_handler: T)
where
T: AssetLoadRequestHandler,
Expand Down Expand Up @@ -183,46 +173,6 @@ impl AssetServer {
Ok(())
}

#[cfg(feature = "filesystem_watcher")]
pub fn filesystem_watcher_system(asset_server: Res<AssetServer>) {
let mut changed = HashSet::default();

loop {
let result = {
let rwlock_guard = asset_server.filesystem_watcher.read();
if let Some(filesystem_watcher) = rwlock_guard.as_ref() {
filesystem_watcher.receiver.try_recv()
} else {
break;
}
};
let event = match result {
Ok(result) => result.unwrap(),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => panic!("FilesystemWatcher disconnected"),
};
if let notify::event::Event {
kind: notify::event::EventKind::Modify(_),
paths,
..
} = event
{
for path in paths.iter() {
if !changed.contains(path) {
let root_path = asset_server.get_root_path().unwrap();
let relative_path = path.strip_prefix(root_path).unwrap();
match asset_server.load_untyped(relative_path) {
Ok(_) => {}
Err(AssetServerError::AssetLoadError(error)) => panic!("{:?}", error),
Err(_) => {}
}
}
}
changed.extend(paths);
}
}
}

fn get_root_path(&self) -> Result<PathBuf, AssetServerError> {
if let Ok(manifest_dir) = env::var("CARGO_MANIFEST_DIR") {
Ok(PathBuf::from(manifest_dir))
Expand Down Expand Up @@ -315,12 +265,21 @@ impl AssetServer {
}
};

self.send_request_to_loader_thread(LoadRequest {
let load_request = LoadRequest {
handle_id,
path: path.to_owned(),
handler_index: *index,
version: new_version,
});
};

let asset_handlers = self.asset_handlers.clone();
self.task_pool
.spawn(async move {
let handlers = asset_handlers.read();
let request_handler = &handlers[load_request.handler_index];
request_handler.handle_request(&load_request);
})
.detach();

// TODO: watching each asset explicitly is a simpler implementation, its possible it would be more efficient to watch
// folders instead (when possible)
Expand Down Expand Up @@ -370,56 +329,6 @@ impl AssetServer {
Some(load_state)
}

fn send_request_to_loader_thread(&self, load_request: LoadRequest) {
// NOTE: This lock makes the call to Arc::strong_count safe. Removing (or reordering) it could result in undefined behavior
let mut loader_threads = self.loader_threads.write();
if loader_threads.len() < self.max_loader_threads {
let loader_thread = LoaderThread {
requests: Arc::new(RwLock::new(vec![load_request])),
};
let requests = loader_thread.requests.clone();
loader_threads.push(loader_thread);
Self::start_thread(self.asset_handlers.clone(), requests);
} else {
let most_free_thread = loader_threads
.iter()
.min_by_key(|l| l.requests.read().len())
.unwrap();
let mut requests = most_free_thread.requests.write();
requests.push(load_request);
// if most free thread only has one reference, the thread as spun down. if so, we need to spin it back up!
if Arc::strong_count(&most_free_thread.requests) == 1 {
Self::start_thread(
self.asset_handlers.clone(),
most_free_thread.requests.clone(),
);
}
}
}

fn start_thread(
request_handlers: Arc<RwLock<Vec<Box<dyn AssetLoadRequestHandler>>>>,
requests: Arc<RwLock<Vec<LoadRequest>>>,
) {
thread::spawn(move || {
loop {
let request = {
let mut current_requests = requests.write();
if current_requests.len() == 0 {
// if there are no requests, spin down the thread
break;
}

current_requests.pop().unwrap()
};

let handlers = request_handlers.read();
let request_handler = &handlers[request.handler_index];
request_handler.handle_request(&request);
}
});
}

fn load_assets_in_folder_recursive(
&self,
path: &Path,
Expand Down Expand Up @@ -456,3 +365,43 @@ impl AssetServer {
Ok(handle_ids)
}
}

#[cfg(feature = "filesystem_watcher")]
pub fn filesystem_watcher_system(asset_server: Res<AssetServer>) {
let mut changed = HashSet::default();

loop {
let result = {
let rwlock_guard = asset_server.filesystem_watcher.read();
if let Some(filesystem_watcher) = rwlock_guard.as_ref() {
filesystem_watcher.receiver.try_recv()
} else {
break;
}
};
let event = match result {
Ok(result) => result.unwrap(),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => panic!("FilesystemWatcher disconnected"),
};
if let notify::event::Event {
kind: notify::event::EventKind::Modify(_),
paths,
..
} = event
{
for path in paths.iter() {
if !changed.contains(path) {
let root_path = asset_server.get_root_path().unwrap();
let relative_path = path.strip_prefix(root_path).unwrap();
match asset_server.load_untyped(relative_path) {
Ok(_) => {}
Err(AssetServerError::AssetLoadError(error)) => panic!("{:?}", error),
Err(_) => {}
}
}
}
changed.extend(paths);
}
}
}
11 changes: 9 additions & 2 deletions crates/bevy_asset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod loader;

pub use asset_server::*;
pub use assets::*;
use bevy_tasks::IoTaskPool;
pub use handle::*;
pub use load_request::*;
pub use loader::*;
Expand All @@ -33,15 +34,21 @@ pub struct AssetPlugin;

impl Plugin for AssetPlugin {
fn build(&self, app: &mut AppBuilder) {
let task_pool = app
.resources()
.get::<IoTaskPool>()
.expect("IoTaskPool resource not found")
.0
.clone();
app.add_stage_before(bevy_app::stage::PRE_UPDATE, stage::LOAD_ASSETS)
.add_stage_after(bevy_app::stage::POST_UPDATE, stage::ASSET_EVENTS)
.init_resource::<AssetServer>()
.add_resource(AssetServer::new(task_pool))
.register_property::<HandleId>();

#[cfg(feature = "filesystem_watcher")]
app.add_system_to_stage(
stage::LOAD_ASSETS,
AssetServer::filesystem_watcher_system.system(),
asset_server::filesystem_watcher_system.system(),
);
}
}
3 changes: 3 additions & 0 deletions crates/bevy_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ bevy_property = { path = "../bevy_property", version = "0.2.1" }
bevy_type_registry = { path = "../bevy_type_registry", version = "0.2.1" }
bevy_math = { path = "../bevy_math", version = "0.2.1" }
bevy_utils = { path = "../bevy_utils", version = "0.2.1" }
bevy_tasks = { path = "../bevy_tasks", version = "0.2.1" }

log = { version = "0.4", features = ["release_max_level_info"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
instant = "0.1.6"
10 changes: 9 additions & 1 deletion crates/bevy_core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
mod bytes;
mod float_ord;
mod label;
mod task_pool_options;
mod time;

pub use bytes::*;
pub use float_ord::*;
pub use label::*;
pub use task_pool_options::DefaultTaskPoolOptions;
pub use time::*;

pub mod prelude {
pub use crate::{EntityLabels, Labels, Time, Timer};
pub use crate::{DefaultTaskPoolOptions, EntityLabels, Labels, Time, Timer};
}

use bevy_app::prelude::*;
Expand All @@ -23,6 +25,12 @@ pub struct CorePlugin;

impl Plugin for CorePlugin {
fn build(&self, app: &mut AppBuilder) {
// Setup the default bevy task pools
app.resources_mut()
.get_cloned::<DefaultTaskPoolOptions>()
.unwrap_or_else(DefaultTaskPoolOptions::default)
.create_default_pools(app.resources_mut());

app.init_resource::<Time>()
.init_resource::<EntityLabels>()
.register_component::<Timer>()
Expand Down
Loading

0 comments on commit 028a22b

Please sign in to comment.