Skip to content

Commit

Permalink
Fix Callback Ordering (#4036)
Browse files Browse the repository at this point in the history
* Fix Callback Ordering

* Format & Changelog
  • Loading branch information
cwfitzgerald authored Aug 14, 2023
1 parent 406ee5d commit f825ce4
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ By @Valaphee in [#3402](https://github.com/gfx-rs/wgpu/pull/3402)
#### General

- Derive storage bindings via `naga::StorageAccess` instead of `naga::GlobalUse`. By @teoxoy in [#3985](https://github.com/gfx-rs/wgpu/pull/3985).
- `Queue::on_submitted_work_done` callbacks will now always be called after all previous `BufferSlice::map_async` callbacks, even when there are no active submissions. By @cwfitzgerald in [#4036](https://github.com/gfx-rs/wgpu/pull/4036).

#### Vulkan
- Fix enabling `wgpu::Features::PARTIALLY_BOUND_BINDING_ARRAY` not being actually enabled in vulkan backend. By @39ali in[#3772](https://github.com/gfx-rs/wgpu/pull/3772).
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ bytemuck.workspace = true
cfg-if.workspace = true
env_logger.workspace = true
log.workspace = true
parking_lot.workspace = true
png.workspace = true
pollster.workspace = true
wgpu.workspace = true
Expand Down
91 changes: 91 additions & 0 deletions tests/tests/regression/issue_4024.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use std::sync::Arc;

use parking_lot::Mutex;
use wgpu_test::{initialize_test, TestParameters};

use wasm_bindgen_test::wasm_bindgen_test;
use wgpu::*;

/// The WebGPU specification has very specific requirements about the ordering of map_async
/// and on_submitted_work_done callbacks. Specifically, all map_async callbacks that are initiated
/// before a given on_submitted_work_done callback must be invoked before the on_submitted_work_done
/// callback is invoked.
///
/// We previously immediately invoked on_submitted_work_done callbacks if there was no active submission
/// to add them to. This is incorrect, as we do not immediatley invoke map_async callbacks.
#[wasm_bindgen_test]
#[test]
fn queue_submitted_callback_ordering() {
initialize_test(TestParameters::default(), |ctx| {
// Create a mappable buffer
let buffer = ctx.device.create_buffer(&BufferDescriptor {
label: Some("mappable buffer"),
size: 4,
usage: BufferUsages::MAP_READ | BufferUsages::COPY_DST,
mapped_at_creation: false,
});

// Encode some work using it. The specifics of this work don't matter, just
// that the buffer is used.
let mut encoder = ctx
.device
.create_command_encoder(&CommandEncoderDescriptor {
label: Some("encoder"),
});

encoder.clear_buffer(&buffer, 0, None);

// Submit the work.
ctx.queue.submit(Some(encoder.finish()));
// Ensure the work is finished.
ctx.device.poll(MaintainBase::Wait);

#[derive(Debug)]
struct OrderingContext {
/// Incremented every time a callback in invoked.
/// This allows the callbacks to know their ordering.
counter: u8,
/// The value of the counter when the map_async callback was invoked.
value_read_map_async: Option<u8>,
/// The value of the counter when the queue submitted work done callback was invoked.
value_read_queue_submitted: Option<u8>,
}

// Create shared ownership of the ordering context, and clone 2 copies.
let ordering = Arc::new(Mutex::new(OrderingContext {
counter: 0,
value_read_map_async: None,
value_read_queue_submitted: None,
}));
let ordering_clone_map_async = Arc::clone(&ordering);
let ordering_clone_queue_submitted = Arc::clone(&ordering);

// Register the callabacks.
buffer.slice(..).map_async(MapMode::Read, move |_| {
let mut guard = ordering_clone_map_async.lock();
guard.value_read_map_async = Some(guard.counter);
guard.counter += 1;
});

// If the bug is present, this callback will be invoked immediately inside this function,
// despite the fact there is an outstanding map_async callback.
ctx.queue.on_submitted_work_done(move || {
let mut guard = ordering_clone_queue_submitted.lock();
guard.value_read_queue_submitted = Some(guard.counter);
guard.counter += 1;
});

// No GPU work is happening at this point, but we want to process callbacks.
ctx.device.poll(MaintainBase::Poll);

// Extract the ordering out of the arc.
let ordering = Arc::try_unwrap(ordering).unwrap().into_inner();

// There were two callbacks invoked
assert_eq!(ordering.counter, 2);
// The map async callback was invoked fist
assert_eq!(ordering.value_read_map_async, Some(0));
// The queue submitted work done callback was invoked second.
assert_eq!(ordering.value_read_queue_submitted, Some(1));
})
}
1 change: 1 addition & 0 deletions tests/tests/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use wasm_bindgen_test::wasm_bindgen_test_configure;

mod regression {
mod issue_3457;
mod issue_4024;
}

mod buffer;
Expand Down
26 changes: 17 additions & 9 deletions wgpu-core/src/device/life.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ struct ActiveSubmission<A: hal::Api> {
mapped: Vec<id::Valid<id::BufferId>>,

encoders: Vec<EncoderInFlight<A>>,

/// List of queue "on_submitted_work_done" closures to be called once this
/// submission has completed.
work_done_closures: SmallVec<[SubmittedWorkDoneClosure; 1]>,
}

Expand Down Expand Up @@ -304,6 +307,12 @@ pub(super) struct LifetimeTracker<A: hal::Api> {
/// Buffers the user has asked us to map, and which are not used by any
/// queue submission still in flight.
ready_to_map: Vec<id::Valid<id::BufferId>>,

/// Queue "on_submitted_work_done" closures that were initiated for while there is no
/// currently pending submissions. These cannot be immeidately invoked as they
/// must happen _after_ all mapped buffer callbacks are mapped, so we defer them
/// here until the next time the device is maintained.
work_done_closures: SmallVec<[SubmittedWorkDoneClosure; 1]>,
}

impl<A: hal::Api> LifetimeTracker<A> {
Expand All @@ -316,6 +325,7 @@ impl<A: hal::Api> LifetimeTracker<A> {
active: Vec::new(),
free_resources: NonReferencedResources::new(),
ready_to_map: Vec::new(),
work_done_closures: SmallVec::new(),
}
}

Expand Down Expand Up @@ -405,7 +415,7 @@ impl<A: hal::Api> LifetimeTracker<A> {
.position(|a| a.index > last_done)
.unwrap_or(self.active.len());

let mut work_done_closures = SmallVec::new();
let mut work_done_closures: SmallVec<_> = self.work_done_closures.drain(..).collect();
for a in self.active.drain(..done_count) {
log::trace!("Active submission {} is done", a.index);
self.free_resources.extend(a.last_resources);
Expand Down Expand Up @@ -445,18 +455,16 @@ impl<A: hal::Api> LifetimeTracker<A> {
}
}

pub fn add_work_done_closure(
&mut self,
closure: SubmittedWorkDoneClosure,
) -> Option<SubmittedWorkDoneClosure> {
pub fn add_work_done_closure(&mut self, closure: SubmittedWorkDoneClosure) {
match self.active.last_mut() {
Some(active) => {
active.work_done_closures.push(closure);
None
}
// Note: we can't immediately invoke the closure, since it assumes
// nothing is currently locked in the hubs.
None => Some(closure),
// We must defer the closure until all previously occuring map_async closures
// have fired. This is required by the spec.
None => {
self.work_done_closures.push(closure);
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions wgpu-core/src/device/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ impl UserClosures {
fn fire(self) {
// Note: this logic is specifically moved out of `handle_mapping()` in order to
// have nothing locked by the time we execute users callback code.

// Mappings _must_ be fired before submissions, as the spec requires all mapping callbacks that are registered before
// a on_submitted_work_done callback to be fired before the on_submitted_work_done callback.
for (operation, status) in self.mappings {
operation.callback.call(status);
}
Expand Down
17 changes: 6 additions & 11 deletions wgpu-core/src/device/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1435,17 +1435,12 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
closure: SubmittedWorkDoneClosure,
) -> Result<(), InvalidQueue> {
//TODO: flush pending writes
let closure_opt = {
let hub = A::hub(self);
let mut token = Token::root();
let (device_guard, mut token) = hub.devices.read(&mut token);
match device_guard.get(queue_id) {
Ok(device) => device.lock_life(&mut token).add_work_done_closure(closure),
Err(_) => return Err(InvalidQueue),
}
};
if let Some(closure) = closure_opt {
closure.call();
let hub = A::hub(self);
let mut token = Token::root();
let (device_guard, mut token) = hub.devices.read(&mut token);
match device_guard.get(queue_id) {
Ok(device) => device.lock_life(&mut token).add_work_done_closure(closure),
Err(_) => return Err(InvalidQueue),
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions wgpu/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4580,8 +4580,8 @@ impl Queue {
}

/// Registers a callback when the previous call to submit finishes running on the gpu. This callback
/// being called implies that all mapped buffer callbacks attached to the same submission have also
/// been called.
/// being called implies that all mapped buffer callbacks which were registered before this call will
/// have been called.
///
/// For the callback to complete, either `queue.submit(..)`, `instance.poll_all(..)`, or `device.poll(..)`
/// must be called elsewhere in the runtime, possibly integrated into an event loop or run on a separate thread.
Expand Down

0 comments on commit f825ce4

Please sign in to comment.