diff --git a/Cargo.lock b/Cargo.lock index 1e2ae3c4c4e..f66040ae51b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -582,6 +582,17 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +[[package]] +name = "futures-intrusive" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62007592ac46aa7c2b6416f7deb9a8a8f63a01e0f1d6e1787d5630170db2b63e" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot 0.11.2", +] + [[package]] name = "futures-io" version = "0.3.21" @@ -2039,6 +2050,7 @@ dependencies = [ "console_log", "ddsfile", "env_logger", + "futures-intrusive", "glam", "js-sys", "log", diff --git a/deno_webgpu/src/buffer.rs b/deno_webgpu/src/buffer.rs index 45eab483d15..3155845e6e8 100644 --- a/deno_webgpu/src/buffer.rs +++ b/deno_webgpu/src/buffer.rs @@ -83,36 +83,27 @@ pub async fn op_webgpu_buffer_get_map_async( .get::(device_rid)?; device = device_resource.0; - let boxed_sender = Box::new(sender); - let sender_ptr = Box::into_raw(boxed_sender) as *mut u8; - - extern "C" fn buffer_map_future_wrapper( - status: wgpu_core::resource::BufferMapAsyncStatus, - user_data: *mut u8, - ) { - let sender_ptr = user_data as *mut oneshot::Sender>; - let boxed_sender = unsafe { Box::from_raw(sender_ptr) }; - boxed_sender + let callback = Box::new(move |status| { + sender .send(match status { wgpu_core::resource::BufferMapAsyncStatus::Success => Ok(()), _ => unreachable!(), // TODO }) .unwrap(); - } + }); // TODO(lucacasonato): error handling let maybe_err = gfx_select!(buffer => instance.buffer_map_async( - buffer, - offset..(offset + size), - wgpu_core::resource::BufferMapOperation { - host: match mode { - 1 => wgpu_core::device::HostMap::Read, - 2 => wgpu_core::device::HostMap::Write, - _ => unreachable!(), - }, - callback: buffer_map_future_wrapper, - user_data: sender_ptr, - } + buffer, + offset..(offset + size), + wgpu_core::resource::BufferMapOperation { + host: match mode { + 1 => wgpu_core::device::HostMap::Read, + 2 => wgpu_core::device::HostMap::Write, + _ => unreachable!(), + }, + callback: wgpu_core::resource::BufferMapCallback::Rust { callback }, + } )) .err(); diff --git a/player/tests/test.rs b/player/tests/test.rs index 6fa3395ae21..4174e232593 100644 --- a/player/tests/test.rs +++ b/player/tests/test.rs @@ -14,7 +14,7 @@ use std::{ fs::{read_to_string, File}, io::{Read, Seek, SeekFrom}, path::{Path, PathBuf}, - ptr, slice, + slice, }; #[derive(serde::Deserialize)] @@ -55,7 +55,7 @@ struct Test<'a> { actions: Vec>, } -extern "C" fn map_callback(status: wgc::resource::BufferMapAsyncStatus, _user_data: *mut u8) { +fn map_callback(status: wgc::resource::BufferMapAsyncStatus) { match status { wgc::resource::BufferMapAsyncStatus::Success => (), _ => panic!("Unable to map"), @@ -112,8 +112,9 @@ impl Test<'_> { expect.offset .. expect.offset+expect.data.len() as wgt::BufferAddress, wgc::resource::BufferMapOperation { host: wgc::device::HostMap::Read, - callback: map_callback, - user_data: ptr::null_mut(), + callback: wgc::resource::BufferMapCallback::Rust { + callback: Box::new(map_callback), + }, } )) .unwrap(); diff --git a/wgpu-core/src/device/life.rs b/wgpu-core/src/device/life.rs index 8c6705276f3..5062f9189fc 100644 --- a/wgpu-core/src/device/life.rs +++ b/wgpu-core/src/device/life.rs @@ -440,15 +440,18 @@ impl LifetimeTracker { } } - pub fn add_work_done_closure(&mut self, closure: SubmittedWorkDoneClosure) -> bool { + pub fn add_work_done_closure( + &mut self, + closure: SubmittedWorkDoneClosure, + ) -> Option { match self.active.last_mut() { Some(active) => { active.work_done_closures.push(closure); - true + None } // Note: we can't immediately invoke the closure, since it assumes // nothing is currently locked in the hubs. - None => false, + None => Some(closure), } } } diff --git a/wgpu-core/src/device/mod.rs b/wgpu-core/src/device/mod.rs index 24fa72e5711..49d7cb31251 100644 --- a/wgpu-core/src/device/mod.rs +++ b/wgpu-core/src/device/mod.rs @@ -141,14 +141,14 @@ impl UserClosures { self.submissions.extend(other.submissions); } - unsafe fn fire(self) { - //Note: this logic is specifically moved out of `handle_mapping()` in order to + fn fire(self) { + // Note: this logic is specifically moved out of `handle_mapping()` in order to // have nothing locked by the time we execute users callback code. for (operation, status) in self.mappings { - (operation.callback)(status, operation.user_data); + operation.callback.call(status); } for closure in self.submissions { - (closure.callback)(closure.user_data); + closure.call(); } } } @@ -4978,9 +4978,9 @@ impl Global { .map_err(|_| DeviceError::Invalid)? .maintain(hub, force_wait, &mut token)? }; - unsafe { - closures.fire(); - } + + closures.fire(); + Ok(queue_empty) } @@ -5058,9 +5058,7 @@ impl Global { self.poll_devices::(force_wait, &mut closures)? && all_queue_empty; } - unsafe { - closures.fire(); - } + closures.fire(); Ok(all_queue_empty) } @@ -5167,7 +5165,7 @@ impl Global { return Err(resource::BufferAccessError::AlreadyMapped); } resource::BufferMapState::Waiting(_) => { - op.call_error(); + op.callback.call_error(); return Ok(()); } resource::BufferMapState::Idle => { @@ -5384,9 +5382,7 @@ impl Global { //Note: outside inner function so no locks are held when calling the callback let closure = self.buffer_unmap_inner::(buffer_id)?; if let Some((operation, status)) = closure { - unsafe { - (operation.callback)(status, operation.user_data); - } + operation.callback.call(status); } Ok(()) } diff --git a/wgpu-core/src/device/queue.rs b/wgpu-core/src/device/queue.rs index e76255f772d..83de498ef65 100644 --- a/wgpu-core/src/device/queue.rs +++ b/wgpu-core/src/device/queue.rs @@ -28,16 +28,56 @@ use thiserror::Error; /// without a concrete moment of when it can be cleared. const WRITE_COMMAND_BUFFERS_PER_POOL: usize = 64; -pub type OnSubmittedWorkDoneCallback = unsafe extern "C" fn(user_data: *mut u8); #[repr(C)] -#[derive(Clone, Copy, Debug)] +pub struct SubmittedWorkDoneClosureC { + callback: unsafe extern "C" fn(user_data: *mut u8), + user_data: *mut u8, +} + +unsafe impl Send for SubmittedWorkDoneClosureC {} + pub struct SubmittedWorkDoneClosure { - pub callback: OnSubmittedWorkDoneCallback, - pub user_data: *mut u8, + // We wrap this so creating the enum in the C variant can be unsafe, + // allowing our call function to be safe. + inner: SubmittedWorkDoneClosureInner, +} + +enum SubmittedWorkDoneClosureInner { + Rust { + callback: Box, + }, + C { + inner: SubmittedWorkDoneClosureC, + }, } -unsafe impl Send for SubmittedWorkDoneClosure {} -unsafe impl Sync for SubmittedWorkDoneClosure {} +impl SubmittedWorkDoneClosure { + pub fn from_rust(callback: Box) -> Self { + Self { + inner: SubmittedWorkDoneClosureInner::Rust { callback }, + } + } + + /// # Safety + /// + /// - The callback pointer must be valid to call with the provided user_data pointer. + /// - Both pointers must point to 'static data as the callback may happen at an unspecified time. + pub unsafe fn from_c(inner: SubmittedWorkDoneClosureC) -> Self { + Self { + inner: SubmittedWorkDoneClosureInner::C { inner }, + } + } + + pub(crate) fn call(self) { + match self.inner { + SubmittedWorkDoneClosureInner::Rust { callback } => callback(), + // SAFETY: the contract of the call to from_c says that this unsafe is sound. + SubmittedWorkDoneClosureInner::C { inner } => unsafe { + (inner.callback)(inner.user_data) + }, + } + } +} struct StagingData { buffer: A::Buffer, @@ -932,9 +972,8 @@ impl Global { }; // the closures should execute with nothing locked! - unsafe { - callbacks.fire(); - } + callbacks.fire(); + Ok(()) } @@ -957,7 +996,7 @@ impl Global { closure: SubmittedWorkDoneClosure, ) -> Result<(), InvalidQueue> { //TODO: flush pending writes - let added = { + let closure_opt = { let hub = A::hub(self); let mut token = Token::root(); let (device_guard, mut token) = hub.devices.read(&mut token); @@ -966,10 +1005,8 @@ impl Global { Err(_) => return Err(InvalidQueue), } }; - if !added { - unsafe { - (closure.callback)(closure.user_data); - } + if let Some(closure) = closure_opt { + closure.call(); } Ok(()) } diff --git a/wgpu-core/src/resource.rs b/wgpu-core/src/resource.rs index a47e064f447..7589304340e 100644 --- a/wgpu-core/src/resource.rs +++ b/wgpu-core/src/resource.rs @@ -23,7 +23,6 @@ pub enum BufferMapAsyncStatus { ContextLost, } -#[derive(Debug)] pub(crate) enum BufferMapState { /// Mapped at creation. Init { @@ -46,29 +45,67 @@ pub(crate) enum BufferMapState { unsafe impl Send for BufferMapState {} unsafe impl Sync for BufferMapState {} -pub type BufferMapCallback = unsafe extern "C" fn(status: BufferMapAsyncStatus, userdata: *mut u8); - #[repr(C)] -#[derive(Debug)] -pub struct BufferMapOperation { - pub host: HostMap, - pub callback: BufferMapCallback, - pub user_data: *mut u8, +pub struct BufferMapCallbackC { + callback: unsafe extern "C" fn(status: BufferMapAsyncStatus, user_data: *mut u8), + user_data: *mut u8, } -//TODO: clarify if/why this is needed here -unsafe impl Send for BufferMapOperation {} -unsafe impl Sync for BufferMapOperation {} +unsafe impl Send for BufferMapCallbackC {} + +pub struct BufferMapCallback { + // We wrap this so creating the enum in the C variant can be unsafe, + // allowing our call function to be safe. + inner: BufferMapCallbackInner, +} + +enum BufferMapCallbackInner { + Rust { + callback: Box, + }, + C { + inner: BufferMapCallbackC, + }, +} + +impl BufferMapCallback { + pub fn from_rust(callback: Box) -> Self { + Self { + inner: BufferMapCallbackInner::Rust { callback }, + } + } + + /// # Safety + /// + /// - The callback pointer must be valid to call with the provided user_data pointer. + /// - Both pointers must point to 'static data as the callback may happen at an unspecified time. + pub unsafe fn from_c(inner: BufferMapCallbackC) -> Self { + Self { + inner: BufferMapCallbackInner::C { inner }, + } + } + + pub(crate) fn call(self, status: BufferMapAsyncStatus) { + match self.inner { + BufferMapCallbackInner::Rust { callback } => callback(status), + // SAFETY: the contract of the call to from_c says that this unsafe is sound. + BufferMapCallbackInner::C { inner } => unsafe { + (inner.callback)(status, inner.user_data) + }, + } + } -impl BufferMapOperation { pub(crate) fn call_error(self) { log::error!("wgpu_buffer_map_async failed: buffer mapping is pending"); - unsafe { - (self.callback)(BufferMapAsyncStatus::Error, self.user_data); - } + self.call(BufferMapAsyncStatus::Error); } } +pub struct BufferMapOperation { + pub host: HostMap, + pub callback: BufferMapCallback, +} + #[derive(Clone, Debug, Error)] pub enum BufferAccessError { #[error(transparent)] @@ -105,7 +142,6 @@ pub enum BufferAccessError { }, } -#[derive(Debug)] pub(crate) struct BufferPendingMapping { pub range: Range, pub op: BufferMapOperation, @@ -115,7 +151,6 @@ pub(crate) struct BufferPendingMapping { pub type BufferDescriptor<'a> = wgt::BufferDescriptor>; -#[derive(Debug)] pub struct Buffer { pub(crate) raw: Option, pub(crate) device_id: Stored, diff --git a/wgpu/Cargo.toml b/wgpu/Cargo.toml index 834eeefe875..8a9cc0c4556 100644 --- a/wgpu/Cargo.toml +++ b/wgpu/Cargo.toml @@ -123,6 +123,7 @@ bitflags = "1" bytemuck = { version = "1.4", features = ["derive"] } glam = "0.20.2" ddsfile = "0.5" +futures-intrusive = "0.4" log = "0.4" # Opt out of noise's "default-features" to avoid "image" feature as a dependency count optimization. # This will not be required in the next release since it has been removed from the default feature in https://github.com/Razaekel/noise-rs/commit/1af9e1522236b2c584fb9a02150c9c67a5e6bb04#diff-2e9d962a08321605940b5a657135052fbcef87b5e360662bb527c96d9a615542 diff --git a/wgpu/examples/capture/main.rs b/wgpu/examples/capture/main.rs index 2f4b2a56a5c..44d53e8e345 100644 --- a/wgpu/examples/capture/main.rs +++ b/wgpu/examples/capture/main.rs @@ -126,7 +126,9 @@ async fn create_png( ) { // Note that we're not calling `.await` here. let buffer_slice = output_buffer.slice(..); - let buffer_future = buffer_slice.map_async(wgpu::MapMode::Read); + // Sets the buffer up for mapping, sending over the result of the mapping back to us when it is finished. + let (sender, receiver) = futures_intrusive::channel::shared::oneshot_channel(); + buffer_slice.map_async(wgpu::MapMode::Read, move |v| sender.send(v).unwrap()); // Poll the device in a blocking manner so that our future resolves. // In an actual application, `device.poll(...)` should @@ -138,7 +140,7 @@ async fn create_png( return; } - if let Ok(()) = buffer_future.await { + if let Some(Ok(())) = receiver.receive().await { let padded_buffer = buffer_slice.get_mapped_range(); let mut png_encoder = png::Encoder::new( @@ -214,18 +216,15 @@ mod tests { #[test] fn ensure_generated_data_matches_expected() { - pollster::block_on(assert_generated_data_matches_expected()); + assert_generated_data_matches_expected(); } - async fn assert_generated_data_matches_expected() { + fn assert_generated_data_matches_expected() { let (device, output_buffer, dimensions) = create_red_image_with_dimensions(100usize, 200usize).await; let buffer_slice = output_buffer.slice(..); - let buffer_future = buffer_slice.map_async(wgpu::MapMode::Read); + buffer_slice.map_async(wgpu::MapMode::Read, |_| ()); device.poll(wgpu::Maintain::Wait); - buffer_future - .await - .expect("failed to map buffer slice for capture test"); let padded_buffer = buffer_slice.get_mapped_range(); let expected_buffer_size = dimensions.padded_bytes_per_row * dimensions.height; assert_eq!(padded_buffer.len(), expected_buffer_size); diff --git a/wgpu/examples/framework.rs b/wgpu/examples/framework.rs index 0ed135f1cd9..63c91fd80ee 100644 --- a/wgpu/examples/framework.rs +++ b/wgpu/examples/framework.rs @@ -517,7 +517,7 @@ pub fn test(mut params: FrameworkRefTest) { ctx.queue.submit(Some(cmd_buf.finish())); let dst_buffer_slice = dst_buffer.slice(..); - let _ = dst_buffer_slice.map_async(wgpu::MapMode::Read); + dst_buffer_slice.map_async(wgpu::MapMode::Read, |_| ()); ctx.device.poll(wgpu::Maintain::Wait); let bytes = dst_buffer_slice.get_mapped_range().to_vec(); diff --git a/wgpu/examples/hello-compute/main.rs b/wgpu/examples/hello-compute/main.rs index bbff9130f4e..df157cbfec8 100644 --- a/wgpu/examples/hello-compute/main.rs +++ b/wgpu/examples/hello-compute/main.rs @@ -147,8 +147,9 @@ async fn execute_gpu_inner( // Note that we're not calling `.await` here. let buffer_slice = staging_buffer.slice(..); - // Gets the future representing when `staging_buffer` can be read from - let buffer_future = buffer_slice.map_async(wgpu::MapMode::Read); + // Sets the buffer up for mapping, sending over the result of the mapping back to us when it is finished. + let (sender, receiver) = futures_intrusive::channel::shared::oneshot_channel(); + buffer_slice.map_async(wgpu::MapMode::Read, move |v| sender.send(v).unwrap()); // Poll the device in a blocking manner so that our future resolves. // In an actual application, `device.poll(...)` should @@ -156,7 +157,7 @@ async fn execute_gpu_inner( device.poll(wgpu::Maintain::Wait); // Awaits until `buffer_future` can be read from - if let Ok(()) = buffer_future.await { + if let Some(Ok(())) = receiver.receive().await { // Gets contents of buffer let data = buffer_slice.get_mapped_range(); // Since contents are got in bytes, this converts these bytes back to u32 diff --git a/wgpu/examples/mipmap/main.rs b/wgpu/examples/mipmap/main.rs index 9603895d575..cd31863b801 100644 --- a/wgpu/examples/mipmap/main.rs +++ b/wgpu/examples/mipmap/main.rs @@ -380,11 +380,11 @@ impl framework::Example for Example { queue.submit(Some(init_encoder.finish())); if let Some(ref query_sets) = query_sets { - // We can ignore the future as we're about to wait for the device. - let _ = query_sets + // We can ignore the callback as we're about to wait for the device. + query_sets .data_buffer .slice(..) - .map_async(wgpu::MapMode::Read); + .map_async(wgpu::MapMode::Read, |_| ()); // Wait for device to be done rendering mipmaps device.poll(wgpu::Maintain::Wait); // This is guaranteed to be ready. diff --git a/wgpu/examples/skybox/main.rs b/wgpu/examples/skybox/main.rs index 9dcd3ece824..2b8f6d8f960 100644 --- a/wgpu/examples/skybox/main.rs +++ b/wgpu/examples/skybox/main.rs @@ -398,7 +398,7 @@ impl framework::Example for Skybox { view: &wgpu::TextureView, device: &wgpu::Device, queue: &wgpu::Queue, - spawner: &framework::Spawner, + _spawner: &framework::Spawner, ) { let mut encoder = device.create_command_encoder(&wgpu::CommandEncoderDescriptor { label: None }); @@ -457,8 +457,7 @@ impl framework::Example for Skybox { queue.submit(std::iter::once(encoder.finish())); - let belt_future = self.staging_belt.recall(); - spawner.spawn_local(belt_future); + self.staging_belt.recall(); } } diff --git a/wgpu/src/backend/direct.rs b/wgpu/src/backend/direct.rs index fdd599e57b4..e9aac2a2893 100644 --- a/wgpu/src/backend/direct.rs +++ b/wgpu/src/backend/direct.rs @@ -1,8 +1,8 @@ use crate::{ - backend::native_gpu_future, AdapterInfo, BindGroupDescriptor, BindGroupLayoutDescriptor, - BindingResource, BufferBinding, CommandEncoderDescriptor, ComputePassDescriptor, - ComputePipelineDescriptor, DownlevelCapabilities, Features, Label, Limits, LoadOp, MapMode, - Operations, PipelineLayoutDescriptor, RenderBundleEncoderDescriptor, RenderPipelineDescriptor, + AdapterInfo, BindGroupDescriptor, BindGroupLayoutDescriptor, BindingResource, BufferBinding, + CommandEncoderDescriptor, ComputePassDescriptor, ComputePipelineDescriptor, + DownlevelCapabilities, Features, Label, Limits, LoadOp, MapMode, Operations, + PipelineLayoutDescriptor, RenderBundleEncoderDescriptor, RenderPipelineDescriptor, SamplerDescriptor, ShaderModuleDescriptor, ShaderModuleDescriptorSpirV, ShaderSource, SurfaceStatus, TextureDescriptor, TextureFormat, TextureViewDescriptor, }; @@ -805,8 +805,6 @@ impl crate::Context for Context { #[allow(clippy::type_complexity)] type RequestDeviceFuture = Ready>; - type MapAsyncFuture = native_gpu_future::GpuFuture>; - type OnSubmittedWorkDoneFuture = native_gpu_future::GpuFuture<()>; type PopErrorScopeFuture = Ready>; fn init(backends: wgt::Backends) -> Self { @@ -1622,28 +1620,20 @@ impl crate::Context for Context { buffer: &Self::BufferId, mode: MapMode, range: Range, - ) -> Self::MapAsyncFuture { - let (future, completion) = native_gpu_future::new_gpu_future(); - - extern "C" fn buffer_map_future_wrapper( - status: wgc::resource::BufferMapAsyncStatus, - user_data: *mut u8, - ) { - let completion = - unsafe { native_gpu_future::GpuFutureCompletion::from_raw(user_data as _) }; - completion.complete(match status { - wgc::resource::BufferMapAsyncStatus::Success => Ok(()), - _ => Err(crate::BufferAsyncError), - }) - } - + callback: impl FnOnce(Result<(), crate::BufferAsyncError>) + Send + 'static, + ) { let operation = wgc::resource::BufferMapOperation { host: match mode { MapMode::Read => wgc::device::HostMap::Read, MapMode::Write => wgc::device::HostMap::Write, }, - callback: buffer_map_future_wrapper, - user_data: completion.into_raw() as _, + callback: wgc::resource::BufferMapCallback::from_rust(Box::new(|status| { + let res = match status { + wgc::resource::BufferMapAsyncStatus::Success => Ok(()), + _ => Err(crate::BufferAsyncError), + }; + callback(res); + })), }; let global = &self.0; @@ -1651,7 +1641,6 @@ impl crate::Context for Context { Ok(()) => (), Err(cause) => self.handle_error_nolabel(&buffer.error_sink, cause, "Buffer::map_async"), } - future } fn buffer_get_mapped_range( @@ -2216,26 +2205,15 @@ impl crate::Context for Context { fn queue_on_submitted_work_done( &self, queue: &Self::QueueId, - ) -> Self::OnSubmittedWorkDoneFuture { - let (future, completion) = native_gpu_future::new_gpu_future(); - - extern "C" fn submitted_work_done_future_wrapper(user_data: *mut u8) { - let completion = - unsafe { native_gpu_future::GpuFutureCompletion::from_raw(user_data as _) }; - completion.complete(()) - } - - let closure = wgc::device::queue::SubmittedWorkDoneClosure { - callback: submitted_work_done_future_wrapper, - user_data: completion.into_raw() as _, - }; + callback: Box, + ) { + let closure = wgc::device::queue::SubmittedWorkDoneClosure::from_rust(callback); let global = &self.0; let res = wgc::gfx_select!(queue => global.queue_on_submitted_work_done(*queue, closure)); if let Err(cause) = res { self.handle_error_fatal(cause, "Queue::on_submitted_work_done"); } - future } fn device_start_capture(&self, device: &Self::DeviceId) { diff --git a/wgpu/src/backend/mod.rs b/wgpu/src/backend/mod.rs index e73d66ad895..abd090e0863 100644 --- a/wgpu/src/backend/mod.rs +++ b/wgpu/src/backend/mod.rs @@ -7,6 +7,3 @@ pub(crate) use web::{BufferMappedRange, Context}; mod direct; #[cfg(any(not(target_arch = "wasm32"), feature = "webgl"))] pub(crate) use direct::{BufferMappedRange, Context}; - -#[cfg(any(not(target_arch = "wasm32"), feature = "webgl"))] -mod native_gpu_future; diff --git a/wgpu/src/backend/native_gpu_future.rs b/wgpu/src/backend/native_gpu_future.rs deleted file mode 100644 index f80fba7c374..00000000000 --- a/wgpu/src/backend/native_gpu_future.rs +++ /dev/null @@ -1,143 +0,0 @@ -//! Futures that can be resolved when the GPU completes a task. -//! -//! This module defines the [`GpuFuture`] and [`GpuFutureCompletion`] -//! types, which `wgpu` uses to communicate to users when GPU -//! operations have completed, and when resources are ready to access. -//! This is only used by the `direct` back end, not on the web. -//! -//! The life cycle of a `GpuFuture` is as follows: -//! -//! - Calling [`new_gpu_future`] constructs a paired `GpuFuture` and -//! `GpuFutureCompletion`. -//! -//! - Calling [`complete(v)`] on a `GpuFutureCompletion` marks its -//! paired `GpuFuture` as ready with value `v`. This also wakes -//! the most recent [`Waker`] the future was polled with, if any. -//! -//! - Polling a `GpuFuture` either returns `v` if it is ready, or -//! saves the `Waker` passed to [`Future::poll`], to be awoken -//! when `complete` is called on the paired `GpuFutureCompletion`. -//! -//! ## Communicating with `wgpu_core` -//! -//! The `wgpu_core` crate uses various specialized callback types, -//! like [`wgpu_core::resource::BufferMapOperation`] for reporting -//! buffers that are ready to map, or -//! [`wgpu_core::device::queue::SubmittedWorkDoneClosure`] for -//! reporting the completion of submitted commands. To support FFI -//! bindings, these are unsafe, low-level structures that usually have -//! a function pointer and a untyped, raw "closure" pointer. -//! -//! Calling [`GpuFutureCompletion::into_raw`] returns a raw opaque -//! pointer suitable for use as the "closure" pointer in `wgpu_core`'s -//! callbacks. The [`GpuFutureCompletion::from_raw`] converts such a -//! raw opaque pointer back into a [`GpuFutureCompletion`]. See the -//! direct back end's implementation of [`Context::buffer_map_async`] -//! for an example of this. -//! -//! [`complete(v)`]: GpuFutureCompletion::complete -//! [`Waker`]: std::task::Waker -//! [`Future::poll`]: std::future::Future::poll -//! [`wgpu_core::resource::BufferMapOperation`]: https://docs.rs/wgpu-core/latest/wgpu_core/resource/struct.BufferMapOperation.html -//! [`wgpu_core::device::queue::SubmittedWorkDoneClosure`]: https://docs.rs/wgpu-core/latest/wgpu_core/device/queue/struct.SubmittedWorkDoneClosure.html -//! [`Context::buffer_map_async`]: crate::Context::buffer_map_async -use parking_lot::Mutex; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll, Waker}; - -/// The current state of a `GpuFuture`. -enum WakerOrResult { - /// The last [`Waker`] used to poll this future, if any. - /// - /// [`Waker`]: std::task::Waker - Waker(Waker), - - /// The value this future resolves to, if it is ready. - Result(T), -} - -/// The shared state of a [`GpuFuture`] and its [`GpuFutureCompletion`]. -/// -/// Polling the future when it is not yet ready stores the [`Waker`] -/// here; completing the future when it has not yet been polled stores -/// the value here. See [`WakerOrResult`] for details. -type GpuFutureData = Mutex>>; - -/// A [`Future`] that will be ready when some sort of GPU activity has finished. -/// -/// Call [`new_gpu_future`] to create a `GpuFuture`, along with a -/// paired `GpuFutureCompletion` that can be used to mark it as ready. -pub struct GpuFuture { - data: Arc>, -} - -/// An opaque type used for pointers to a [`GpuFutureCompletion`]'s guts. -pub enum OpaqueData {} - -//TODO: merge this with `GpuFuture` and avoid `Arc` on the data. -/// A completion handle to set the result on a [`GpuFuture`]. -pub struct GpuFutureCompletion { - data: Arc>, -} - -impl Future for GpuFuture { - type Output = T; - - fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll { - let mut waker_or_result = self.into_ref().get_ref().data.lock(); - - match waker_or_result.take() { - Some(WakerOrResult::Result(res)) => Poll::Ready(res), - _ => { - *waker_or_result = Some(WakerOrResult::Waker(context.waker().clone())); - Poll::Pending - } - } - } -} - -impl GpuFutureCompletion { - /// Mark our paired [`GpuFuture`] as ready, with the given `value`. - pub fn complete(self, value: T) { - let mut waker_or_result = self.data.lock(); - - match waker_or_result.replace(WakerOrResult::Result(value)) { - Some(WakerOrResult::Waker(waker)) => waker.wake(), - None => {} - Some(WakerOrResult::Result(_)) => { - // Drop before panicking. Not sure if this is necessary, but it makes me feel better. - drop(waker_or_result); - unreachable!() - } - }; - } - - /// Convert this `GpuFutureCompletion` into a raw pointer for `wgpu_core` to hold. - pub(crate) fn into_raw(self) -> *mut OpaqueData { - Arc::into_raw(self.data) as _ - } - - /// Convert a raw pointer returned by [`into_raw`] back into a `GpuFutureCompletion`. - /// - /// [`into_raw`]: GpuFutureCompletion::into_raw - pub(crate) unsafe fn from_raw(this: *mut OpaqueData) -> Self { - Self { - data: Arc::from_raw(this as _), - } - } -} - -/// Construct a fresh [`GpuFuture`] and a paired [`GpuFutureCompletion`]. -/// -/// See the module docs for details. -pub(crate) fn new_gpu_future() -> (GpuFuture, GpuFutureCompletion) { - let data = Arc::new(Mutex::new(None)); - ( - GpuFuture { - data: Arc::clone(&data), - }, - GpuFutureCompletion { data }, - ) -} diff --git a/wgpu/src/backend/web.rs b/wgpu/src/backend/web.rs index 18b84cb25cf..0d2c3c7f3bc 100644 --- a/wgpu/src/backend/web.rs +++ b/wgpu/src/backend/web.rs @@ -1,10 +1,12 @@ #![allow(clippy::type_complexity)] use std::{ + cell::RefCell, fmt, future::Future, ops::Range, pin::Pin, + rc::Rc, task::{self, Poll}, }; use wasm_bindgen::{prelude::*, JsCast}; @@ -933,10 +935,6 @@ fn future_request_device( .map_err(|_| crate::RequestDeviceError) } -fn future_map_async(result: JsFutureResult) -> Result<(), crate::BufferAsyncError> { - result.map(|_| ()).map_err(|_| crate::BufferAsyncError) -} - fn future_pop_error_scope(result: JsFutureResult) -> Option { match result { Ok(js_value) if js_value.is_object() => { @@ -1004,12 +1002,6 @@ impl crate::Context for Context { wasm_bindgen_futures::JsFuture, fn(JsFutureResult) -> Result<(Self::DeviceId, Self::QueueId), crate::RequestDeviceError>, >; - type MapAsyncFuture = MakeSendFuture< - wasm_bindgen_futures::JsFuture, - fn(JsFutureResult) -> Result<(), crate::BufferAsyncError>, - >; - type OnSubmittedWorkDoneFuture = - MakeSendFuture ()>; type PopErrorScopeFuture = MakeSendFuture Option>; @@ -1735,17 +1727,30 @@ impl crate::Context for Context { buffer: &Self::BufferId, mode: crate::MapMode, range: Range, - ) -> Self::MapAsyncFuture { + callback: impl FnOnce(Result<(), crate::BufferAsyncError>) + Send + 'static, + ) { let map_promise = buffer.0.map_async_with_f64_and_f64( map_map_mode(mode), range.start as f64, (range.end - range.start) as f64, ); - MakeSendFuture::new( - wasm_bindgen_futures::JsFuture::from(map_promise), - future_map_async, - ) + // Both the 'success' and 'rejected' closures need access to callback, but only one + // of them will ever run. We have them both hold a reference to a `Rc>>`, + // and then take ownership of callback when invoked. + // + // We also only need Rc's because these will only ever be called on our thread. + let rc_callback = Rc::new(RefCell::new(Some(callback))); + + let rc_callback_clone = rc_callback.clone(); + let closure_success = wasm_bindgen::closure::Closure::once(move |_| { + rc_callback.borrow_mut().take().unwrap()(Ok(())) + }); + let closure_rejected = wasm_bindgen::closure::Closure::once(move |_| { + rc_callback_clone.borrow_mut().take().unwrap()(Err(crate::BufferAsyncError)) + }); + + let _ = map_promise.then2(&closure_success, &closure_rejected); } fn buffer_get_mapped_range( @@ -2213,7 +2218,8 @@ impl crate::Context for Context { fn queue_on_submitted_work_done( &self, _queue: &Self::QueueId, - ) -> Self::OnSubmittedWorkDoneFuture { + _callback: Box, + ) { unimplemented!() } diff --git a/wgpu/src/lib.rs b/wgpu/src/lib.rs index 69cb5ecaed2..28818f8a045 100644 --- a/wgpu/src/lib.rs +++ b/wgpu/src/lib.rs @@ -191,8 +191,6 @@ trait Context: Debug + Send + Sized + Sync { type RequestAdapterFuture: Future> + Send; type RequestDeviceFuture: Future> + Send; - type MapAsyncFuture: Future> + Send; - type OnSubmittedWorkDoneFuture: Future + Send; type PopErrorScopeFuture: Future> + Send; fn init(backends: Backends) -> Self; @@ -336,7 +334,11 @@ trait Context: Debug + Send + Sized + Sync { buffer: &Self::BufferId, mode: MapMode, range: Range, - ) -> Self::MapAsyncFuture; + // Note: we keep this as an `impl` through the context because the native backend + // needs to wrap it with a wrapping closure. queue_on_submitted_work_done doesn't + // need this wrapping closure, so can be made a Box immediately. + callback: impl FnOnce(Result<(), BufferAsyncError>) + Send + 'static, + ); fn buffer_get_mapped_range( &self, buffer: &Self::BufferId, @@ -495,7 +497,12 @@ trait Context: Debug + Send + Sized + Sync { fn queue_on_submitted_work_done( &self, queue: &Self::QueueId, - ) -> Self::OnSubmittedWorkDoneFuture; + // Note: we force the caller to box this because neither backend needs to + // wrap the callback and this prevents us from needing to make more functions + // generic than we have to. `buffer_map_async` needs to be wrapped on the native + // backend, so we don't box until after it has been wrapped. + callback: Box, + ); fn device_start_capture(&self, device: &Self::DeviceId); fn device_stop_capture(&self, device: &Self::DeviceId); @@ -2377,20 +2384,20 @@ impl Buffer { } impl<'a> BufferSlice<'a> { - //TODO: fn slice(&self) -> Self - - /// Map the buffer. Buffer is ready to map once the future is resolved. + /// Map the buffer. Buffer is ready to map once the callback is called. /// - /// For the future to complete, `device.poll(...)` must be called elsewhere in the runtime, possibly integrated - /// into an event loop, run on a separate thread, or continually polled in the same task runtime that this - /// future will be run on. + /// For the callback to complete, either `queue.submit(..)`, `instance.poll_all(..)`, or `device.poll(..)` + /// must be called elsewhere in the runtime, possibly integrated into an event loop or run on a separate thread. /// - /// It's expected that wgpu will eventually supply its own event loop infrastructure that will be easy to integrate - /// into other event loops, like winit's. + /// The callback will be called on the thread that first calls the above functions after the gpu work + /// has completed. There are no restrictions on the code you can run in the callback, however on native the + /// call to the function will not complete until the callback returns, so prefer keeping callbacks short + /// and used to set flags, send messages, etc. pub fn map_async( &self, mode: MapMode, - ) -> impl Future> + Send { + callback: impl FnOnce(Result<(), BufferAsyncError>) + Send + 'static, + ) { let mut mc = self.buffer.map_context.lock(); assert_eq!( mc.initial_range, @@ -2409,6 +2416,7 @@ impl<'a> BufferSlice<'a> { &self.buffer.id, mode, self.offset..end, + callback, ) } @@ -3381,10 +3389,19 @@ impl Queue { Context::queue_get_timestamp_period(&*self.context, &self.id) } - /// Returns a future that resolves once all the work submitted by this point - /// is done processing on GPU. - pub fn on_submitted_work_done(&self) -> impl Future + Send { - Context::queue_on_submitted_work_done(&*self.context, &self.id) + /// Registers a callback when the previous call to submit finishes running on the gpu. This callback + /// being called implies that all mapped buffer callbacks attached to the same submission have also + /// been called. + /// + /// For the callback to complete, either `queue.submit(..)`, `instance.poll_all(..)`, or `device.poll(..)` + /// must be called elsewhere in the runtime, possibly integrated into an event loop or run on a separate thread. + /// + /// The callback will be called on the thread that first calls the above functions after the gpu work + /// has completed. There are no restrictions on the code you can run in the callback, however on native the + /// call to the function will not complete until the callback returns, so prefer keeping callbacks short + /// and used to set flags, send messages, etc. + pub fn on_submitted_work_done(&self, callback: impl FnOnce() + Send + 'static) { + Context::queue_on_submitted_work_done(&*self.context, &self.id, Box::new(callback)) } } diff --git a/wgpu/src/util/belt.rs b/wgpu/src/util/belt.rs index efcbe7affaf..e65a3a92a37 100644 --- a/wgpu/src/util/belt.rs +++ b/wgpu/src/util/belt.rs @@ -3,44 +3,10 @@ use crate::{ CommandEncoder, Device, MapMode, }; use std::fmt; -use std::pin::Pin; -use std::task::{self, Poll}; -use std::{future::Future, sync::mpsc}; - -// Given a vector of futures, poll each in parallel until all are ready. -struct Join { - futures: Vec>, -} - -impl> Future for Join { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll { - // This is safe because we have no Drop implementation to violate the Pin requirements and - // do not provide any means of moving the inner futures. - let all_ready = unsafe { - // Poll all remaining futures, removing all that are ready - self.get_unchecked_mut().futures.iter_mut().all(|opt| { - if let Some(future) = opt { - if Pin::new_unchecked(future).poll(cx) == Poll::Ready(()) { - *opt = None; - } - } - - opt.is_none() - }) - }; - - if all_ready { - Poll::Ready(()) - } else { - Poll::Pending - } - } -} +use std::sync::{mpsc, Arc}; struct Chunk { - buffer: Buffer, + buffer: Arc, size: BufferAddress, offset: BufferAddress, } @@ -116,12 +82,12 @@ impl StagingBelt { } else { let size = self.chunk_size.max(size.get()); Chunk { - buffer: device.create_buffer(&BufferDescriptor { + buffer: Arc::new(device.create_buffer(&BufferDescriptor { label: Some("(wgpu internal) StagingBelt staging buffer"), size, usage: BufferUsages::MAP_WRITE | BufferUsages::COPY_SRC, mapped_at_creation: true, - }), + })), size, offset: 0, } @@ -158,31 +124,23 @@ impl StagingBelt { /// Recall all of the closed buffers back to be reused. /// /// This has to be called after the command encoders written to `write_buffer` are submitted! - pub fn recall(&mut self) -> impl Future + Send { + pub fn recall(&mut self) { while let Ok(mut chunk) = self.receiver.try_recv() { chunk.offset = 0; self.free_chunks.push(chunk); } let sender = &self.sender; - let futures = self - .closed_chunks - .drain(..) - .map(|chunk| { - let sender = sender.clone(); - let async_buffer = chunk.buffer.slice(..).map_async(MapMode::Write); - - Some(async move { - // The result is ignored - async_buffer.await.ok(); - - // The only possible error is the other side disconnecting, which is fine + for chunk in self.closed_chunks.drain(..) { + let sender = sender.clone(); + chunk + .buffer + .clone() + .slice(..) + .map_async(MapMode::Write, move |_| { let _ = sender.send(chunk); - }) - }) - .collect::>(); - - Join { futures } + }); + } } } diff --git a/wgpu/src/util/mod.rs b/wgpu/src/util/mod.rs index 0a8820a3012..3ce386f07bc 100644 --- a/wgpu/src/util/mod.rs +++ b/wgpu/src/util/mod.rs @@ -6,7 +6,7 @@ mod encoder; mod indirect; mod init; -use std::future::Future; +use std::sync::Arc; use std::{ borrow::Cow, mem::{align_of, size_of}, @@ -70,7 +70,7 @@ pub fn make_spirv_raw(data: &[u8]) -> Cow<[u32]> { } /// CPU accessible buffer used to download data back from the GPU. -pub struct DownloadBuffer(super::Buffer, super::BufferMappedRange); +pub struct DownloadBuffer(Arc, super::BufferMappedRange); impl DownloadBuffer { /// Asynchronously read the contents of a buffer. @@ -78,18 +78,19 @@ impl DownloadBuffer { device: &super::Device, queue: &super::Queue, buffer: &super::BufferSlice, - ) -> impl Future> + Send { + callback: impl FnOnce(Result) + Send + 'static, + ) { let size = match buffer.size { Some(size) => size.into(), None => buffer.buffer.map_context.lock().total_size - buffer.offset, }; - let download = device.create_buffer(&super::BufferDescriptor { + let download = Arc::new(device.create_buffer(&super::BufferDescriptor { size, usage: super::BufferUsages::COPY_DST | super::BufferUsages::MAP_READ, mapped_at_creation: false, label: None, - }); + })); let mut encoder = device.create_command_encoder(&super::CommandEncoderDescriptor { label: None }); @@ -97,13 +98,22 @@ impl DownloadBuffer { let command_buffer: super::CommandBuffer = encoder.finish(); queue.submit(Some(command_buffer)); - let fut = download.slice(..).map_async(super::MapMode::Read); - async move { - fut.await?; - let mapped_range = - super::Context::buffer_get_mapped_range(&*download.context, &download.id, 0..size); - Ok(Self(download, mapped_range)) - } + download + .clone() + .slice(..) + .map_async(super::MapMode::Read, move |result| { + if let Err(e) = result { + callback(Err(e)); + return; + } + + let mapped_range = super::Context::buffer_get_mapped_range( + &*download.context, + &download.id, + 0..size, + ); + callback(Ok(Self(download, mapped_range))); + }); } } diff --git a/wgpu/tests/vertex_indices/mod.rs b/wgpu/tests/vertex_indices/mod.rs index fa85ae62d95..7c99ca1e252 100644 --- a/wgpu/tests/vertex_indices/mod.rs +++ b/wgpu/tests/vertex_indices/mod.rs @@ -123,7 +123,7 @@ fn pulling_common( ctx.queue.submit(Some(encoder.finish())); let slice = buffer.slice(..); - let _ = slice.map_async(wgpu::MapMode::Read); + slice.map_async(wgpu::MapMode::Read, |_| ()); ctx.device.poll(wgpu::Maintain::Wait); let data: Vec = bytemuck::cast_slice(&*slice.get_mapped_range()).to_vec(); diff --git a/wgpu/tests/zero_init_texture_after_discard.rs b/wgpu/tests/zero_init_texture_after_discard.rs index ec77e909f0b..2a7bd8786af 100644 --- a/wgpu/tests/zero_init_texture_after_discard.rs +++ b/wgpu/tests/zero_init_texture_after_discard.rs @@ -282,7 +282,7 @@ fn copy_texture_to_buffer( fn assert_buffer_is_zero(readback_buffer: &wgpu::Buffer, device: &wgpu::Device) { { let buffer_slice = readback_buffer.slice(..); - let _ = buffer_slice.map_async(wgpu::MapMode::Read); + buffer_slice.map_async(wgpu::MapMode::Read, |_| ()); device.poll(wgpu::Maintain::Wait); let buffer_view = buffer_slice.get_mapped_range();