Skip to content

Commit

Permalink
Convert map_async from being async to being callback based
Browse files Browse the repository at this point in the history
  • Loading branch information
cwfitzgerald committed May 28, 2022
1 parent 9114283 commit 85e7ff7
Show file tree
Hide file tree
Showing 22 changed files with 195 additions and 361 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 13 additions & 22 deletions deno_webgpu/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,36 +83,27 @@ pub async fn op_webgpu_buffer_get_map_async(
.get::<super::WebGpuDevice>(device_rid)?;
device = device_resource.0;

let boxed_sender = Box::new(sender);
let sender_ptr = Box::into_raw(boxed_sender) as *mut u8;

extern "C" fn buffer_map_future_wrapper(
status: wgpu_core::resource::BufferMapAsyncStatus,
user_data: *mut u8,
) {
let sender_ptr = user_data as *mut oneshot::Sender<Result<(), AnyError>>;
let boxed_sender = unsafe { Box::from_raw(sender_ptr) };
boxed_sender
let callback = Box::new(move |status| {
sender
.send(match status {
wgpu_core::resource::BufferMapAsyncStatus::Success => Ok(()),
_ => unreachable!(), // TODO
})
.unwrap();
}
});

// TODO(lucacasonato): error handling
let maybe_err = gfx_select!(buffer => instance.buffer_map_async(
buffer,
offset..(offset + size),
wgpu_core::resource::BufferMapOperation {
host: match mode {
1 => wgpu_core::device::HostMap::Read,
2 => wgpu_core::device::HostMap::Write,
_ => unreachable!(),
},
callback: buffer_map_future_wrapper,
user_data: sender_ptr,
}
buffer,
offset..(offset + size),
wgpu_core::resource::BufferMapOperation {
host: match mode {
1 => wgpu_core::device::HostMap::Read,
2 => wgpu_core::device::HostMap::Write,
_ => unreachable!(),
},
callback: wgpu_core::resource::BufferMapCallback::Rust { callback },
}
))
.err();

Expand Down
9 changes: 5 additions & 4 deletions player/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
fs::{read_to_string, File},
io::{Read, Seek, SeekFrom},
path::{Path, PathBuf},
ptr, slice,
slice,
};

#[derive(serde::Deserialize)]
Expand Down Expand Up @@ -55,7 +55,7 @@ struct Test<'a> {
actions: Vec<wgc::device::trace::Action<'a>>,
}

extern "C" fn map_callback(status: wgc::resource::BufferMapAsyncStatus, _user_data: *mut u8) {
fn map_callback(status: wgc::resource::BufferMapAsyncStatus) {
match status {
wgc::resource::BufferMapAsyncStatus::Success => (),
_ => panic!("Unable to map"),
Expand Down Expand Up @@ -112,8 +112,9 @@ impl Test<'_> {
expect.offset .. expect.offset+expect.data.len() as wgt::BufferAddress,
wgc::resource::BufferMapOperation {
host: wgc::device::HostMap::Read,
callback: map_callback,
user_data: ptr::null_mut(),
callback: wgc::resource::BufferMapCallback::Rust {
callback: Box::new(map_callback),
},
}
))
.unwrap();
Expand Down
9 changes: 6 additions & 3 deletions wgpu-core/src/device/life.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,15 +440,18 @@ impl<A: hal::Api> LifetimeTracker<A> {
}
}

pub fn add_work_done_closure(&mut self, closure: SubmittedWorkDoneClosure) -> bool {
pub fn add_work_done_closure(
&mut self,
closure: SubmittedWorkDoneClosure,
) -> Option<SubmittedWorkDoneClosure> {
match self.active.last_mut() {
Some(active) => {
active.work_done_closures.push(closure);
true
None
}
// Note: we can't immediately invoke the closure, since it assumes
// nothing is currently locked in the hubs.
None => false,
None => Some(closure),
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions wgpu-core/src/device/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ impl UserClosures {
//Note: this logic is specifically moved out of `handle_mapping()` in order to
// have nothing locked by the time we execute users callback code.
for (operation, status) in self.mappings {
(operation.callback)(status, operation.user_data);
operation.call(status);
}
for closure in self.submissions {
(closure.callback)(closure.user_data);
closure.call();
}
}
}
Expand Down Expand Up @@ -5384,9 +5384,7 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
//Note: outside inner function so no locks are held when calling the callback
let closure = self.buffer_unmap_inner::<A>(buffer_id)?;
if let Some((operation, status)) = closure {
unsafe {
(operation.callback)(status, operation.user_data);
}
operation.call(status);
}
Ok(())
}
Expand Down
35 changes: 23 additions & 12 deletions wgpu-core/src/device/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,29 @@ use thiserror::Error;
/// without a concrete moment of when it can be cleared.
const WRITE_COMMAND_BUFFERS_PER_POOL: usize = 64;

pub type OnSubmittedWorkDoneCallback = unsafe extern "C" fn(user_data: *mut u8);
#[repr(C)]
#[derive(Clone, Copy, Debug)]
pub struct SubmittedWorkDoneClosure {
pub callback: OnSubmittedWorkDoneCallback,
pub user_data: *mut u8,
pub struct SubmittedWorkDoneClosureC {
callback: unsafe extern "C" fn(user_data: *mut u8),
user_data: *mut u8,
}

unsafe impl Send for SubmittedWorkDoneClosure {}
unsafe impl Sync for SubmittedWorkDoneClosure {}
unsafe impl Send for SubmittedWorkDoneClosureC {}

pub enum SubmittedWorkDoneClosure {
Rust { callback: Box<dyn FnOnce() + Send> },
C { inner: SubmittedWorkDoneClosureC },
}

impl SubmittedWorkDoneClosure {
pub(crate) fn call(self) {
unsafe {
match self {
SubmittedWorkDoneClosure::Rust { callback } => callback(),
SubmittedWorkDoneClosure::C { inner } => (inner.callback)(inner.user_data),
}
}
}
}

struct StagingData<A: hal::Api> {
buffer: A::Buffer,
Expand Down Expand Up @@ -957,7 +970,7 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
closure: SubmittedWorkDoneClosure,
) -> Result<(), InvalidQueue> {
//TODO: flush pending writes
let added = {
let closure_opt = {
let hub = A::hub(self);
let mut token = Token::root();
let (device_guard, mut token) = hub.devices.read(&mut token);
Expand All @@ -966,10 +979,8 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
Err(_) => return Err(InvalidQueue),
}
};
if !added {
unsafe {
(closure.callback)(closure.user_data);
}
if let Some(closure) = closure_opt {
closure.call();
}
Ok(())
}
Expand Down
39 changes: 26 additions & 13 deletions wgpu-core/src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub enum BufferMapAsyncStatus {
ContextLost,
}

#[derive(Debug)]
pub(crate) enum BufferMapState<A: hal::Api> {
/// Mapped at creation.
Init {
Expand All @@ -46,26 +45,42 @@ pub(crate) enum BufferMapState<A: hal::Api> {
unsafe impl<A: hal::Api> Send for BufferMapState<A> {}
unsafe impl<A: hal::Api> Sync for BufferMapState<A> {}

pub type BufferMapCallback = unsafe extern "C" fn(status: BufferMapAsyncStatus, userdata: *mut u8);
#[repr(C)]
pub struct BufferMapCallbackC {
callback: unsafe extern "C" fn(status: BufferMapAsyncStatus, userdata: *mut u8),
user_data: *mut u8,
}

unsafe impl Send for BufferMapCallbackC {}

pub enum BufferMapCallback {
Rust {
callback: Box<dyn FnOnce(BufferMapAsyncStatus) + Send>,
},
C {
inner: BufferMapCallbackC,
},
}

#[repr(C)]
#[derive(Debug)]
pub struct BufferMapOperation {
pub host: HostMap,
pub callback: BufferMapCallback,
pub user_data: *mut u8,
}

//TODO: clarify if/why this is needed here
unsafe impl Send for BufferMapOperation {}
unsafe impl Sync for BufferMapOperation {}

impl BufferMapOperation {
pub(crate) fn call(self, status: BufferMapAsyncStatus) {
match self.callback {
BufferMapCallback::Rust { callback } => callback(status),
BufferMapCallback::C { inner } => {
unsafe { (inner.callback)(status, inner.user_data) };
}
}
}

pub(crate) fn call_error(self) {
log::error!("wgpu_buffer_map_async failed: buffer mapping is pending");
unsafe {
(self.callback)(BufferMapAsyncStatus::Error, self.user_data);
}
self.call(BufferMapAsyncStatus::Error);
}
}

Expand Down Expand Up @@ -105,7 +120,6 @@ pub enum BufferAccessError {
},
}

#[derive(Debug)]
pub(crate) struct BufferPendingMapping {
pub range: Range<wgt::BufferAddress>,
pub op: BufferMapOperation,
Expand All @@ -115,7 +129,6 @@ pub(crate) struct BufferPendingMapping {

pub type BufferDescriptor<'a> = wgt::BufferDescriptor<Label<'a>>;

#[derive(Debug)]
pub struct Buffer<A: hal::Api> {
pub(crate) raw: Option<A::Buffer>,
pub(crate) device_id: Stored<DeviceId>,
Expand Down
1 change: 1 addition & 0 deletions wgpu/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ bitflags = "1"
bytemuck = { version = "1.4", features = ["derive"] }
glam = "0.20.2"
ddsfile = "0.5"
futures-intrusive = "0.4"
log = "0.4"
# Opt out of noise's "default-features" to avoid "image" feature as a dependency count optimization.
# This will not be required in the next release since it has been removed from the default feature in https://github.com/Razaekel/noise-rs/commit/1af9e1522236b2c584fb9a02150c9c67a5e6bb04#diff-2e9d962a08321605940b5a657135052fbcef87b5e360662bb527c96d9a615542
Expand Down
15 changes: 7 additions & 8 deletions wgpu/examples/capture/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ async fn create_png(
) {
// Note that we're not calling `.await` here.
let buffer_slice = output_buffer.slice(..);
let buffer_future = buffer_slice.map_async(wgpu::MapMode::Read);
// Sets the buffer up for mapping, sending over the result of the mapping back to us when it is finished.
let (sender, receiver) = futures_intrusive::channel::shared::oneshot_channel();
buffer_slice.map_async(wgpu::MapMode::Read, move |v| sender.send(v).unwrap());

// Poll the device in a blocking manner so that our future resolves.
// In an actual application, `device.poll(...)` should
Expand All @@ -138,7 +140,7 @@ async fn create_png(
return;
}

if let Ok(()) = buffer_future.await {
if let Some(Ok(())) = receiver.receive().await {
let padded_buffer = buffer_slice.get_mapped_range();

let mut png_encoder = png::Encoder::new(
Expand Down Expand Up @@ -214,18 +216,15 @@ mod tests {

#[test]
fn ensure_generated_data_matches_expected() {
pollster::block_on(assert_generated_data_matches_expected());
assert_generated_data_matches_expected();
}

async fn assert_generated_data_matches_expected() {
fn assert_generated_data_matches_expected() {
let (device, output_buffer, dimensions) =
create_red_image_with_dimensions(100usize, 200usize).await;
let buffer_slice = output_buffer.slice(..);
let buffer_future = buffer_slice.map_async(wgpu::MapMode::Read);
buffer_slice.map_async(wgpu::MapMode::Read, |_| ());
device.poll(wgpu::Maintain::Wait);
buffer_future
.await
.expect("failed to map buffer slice for capture test");
let padded_buffer = buffer_slice.get_mapped_range();
let expected_buffer_size = dimensions.padded_bytes_per_row * dimensions.height;
assert_eq!(padded_buffer.len(), expected_buffer_size);
Expand Down
2 changes: 1 addition & 1 deletion wgpu/examples/framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ pub fn test<E: Example>(mut params: FrameworkRefTest) {
ctx.queue.submit(Some(cmd_buf.finish()));

let dst_buffer_slice = dst_buffer.slice(..);
let _ = dst_buffer_slice.map_async(wgpu::MapMode::Read);
dst_buffer_slice.map_async(wgpu::MapMode::Read, |_| ());
ctx.device.poll(wgpu::Maintain::Wait);
let bytes = dst_buffer_slice.get_mapped_range().to_vec();

Expand Down
7 changes: 4 additions & 3 deletions wgpu/examples/hello-compute/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,17 @@ async fn execute_gpu_inner(

// Note that we're not calling `.await` here.
let buffer_slice = staging_buffer.slice(..);
// Gets the future representing when `staging_buffer` can be read from
let buffer_future = buffer_slice.map_async(wgpu::MapMode::Read);
// Sets the buffer up for mapping, sending over the result of the mapping back to us when it is finished.
let (sender, reciever) = futures_intrusive::channel::shared::oneshot_channel();
buffer_slice.map_async(wgpu::MapMode::Read, move |v| sender.send(v).unwrap());

// Poll the device in a blocking manner so that our future resolves.
// In an actual application, `device.poll(...)` should
// be called in an event loop or on another thread.
device.poll(wgpu::Maintain::Wait);

// Awaits until `buffer_future` can be read from
if let Ok(()) = buffer_future.await {
if let Some(Ok(())) = reciever.receive().await {
// Gets contents of buffer
let data = buffer_slice.get_mapped_range();
// Since contents are got in bytes, this converts these bytes back to u32
Expand Down
6 changes: 3 additions & 3 deletions wgpu/examples/mipmap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,11 +380,11 @@ impl framework::Example for Example {

queue.submit(Some(init_encoder.finish()));
if let Some(ref query_sets) = query_sets {
// We can ignore the future as we're about to wait for the device.
let _ = query_sets
// We can ignore the callback as we're about to wait for the device.
query_sets
.data_buffer
.slice(..)
.map_async(wgpu::MapMode::Read);
.map_async(wgpu::MapMode::Read, |_| ());
// Wait for device to be done rendering mipmaps
device.poll(wgpu::Maintain::Wait);
// This is guaranteed to be ready.
Expand Down
Loading

0 comments on commit 85e7ff7

Please sign in to comment.