Skip to content

Commit

Permalink
Simplify parallel iteration methods (#8854)
Browse files Browse the repository at this point in the history
# Objective

The `QueryParIter::for_each_mut` function is required when doing
parallel iteration with mutable queries.
This results in an unfortunate stutter:
`query.par_iter_mut().par_for_each_mut()` ('mut' is repeated).

## Solution

- Make `for_each` compatible with mutable queries, and deprecate
`for_each_mut`. In order to prevent `for_each` from being called
multiple times in parallel, we take ownership of the QueryParIter.

---

## Changelog

- `QueryParIter::for_each` is now compatible with mutable queries.
`for_each_mut` has been deprecated as it is now redundant.

## Migration Guide

The method `QueryParIter::for_each_mut` has been deprecated and is no
longer functional. Use `for_each` instead, which now supports mutable
queries.

```rust
// Before:
query.par_iter_mut().for_each_mut(|x| ...);

// After:
query.par_iter_mut().for_each(|x| ...);
```

The method `QueryParIter::for_each` now takes ownership of the
`QueryParIter`, rather than taking a shared reference.

```rust
// Before:
let par_iter = my_query.par_iter().batching_strategy(my_batching_strategy);
par_iter.for_each(|x| {
    // ...Do stuff with x...
    par_iter.for_each(|y| {
        // ...Do nested stuff with y...
    });
});

// After:
my_query.par_iter().batching_strategy(my_batching_strategy).for_each(|x| {
    // ...Do stuff with x...
    my_query.par_iter().batching_strategy(my_batching_strategy).for_each(|y| {
        // ...Do nested stuff with y...
    });
});
```
  • Loading branch information
JoJoJet authored Jul 23, 2023
1 parent 5b0e6a5 commit ddbfa48
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 66 deletions.
2 changes: 1 addition & 1 deletion benches/benches/bevy_ecs/iteration/heavy_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub fn heavy_compute(c: &mut Criterion) {
}));

fn sys(mut query: Query<(&mut Position, &mut Transform)>) {
query.par_iter_mut().for_each_mut(|(mut pos, mut mat)| {
query.par_iter_mut().for_each(|(mut pos, mut mat)| {
for _ in 0..100 {
mat.0 = mat.0.inverse();
}
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_animation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ pub fn animation_player(
) {
animation_players
.par_iter_mut()
.for_each_mut(|(root, maybe_parent, mut player)| {
.for_each(|(root, maybe_parent, mut player)| {
update_transitions(&mut player, &time);
run_animation_player(
root,
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_ecs/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ mod tests {
world.spawn((A(1), B(1)));

fn propagate_system(mut query: Query<(&A, &mut B), Changed<A>>) {
query.par_iter_mut().for_each_mut(|(a, mut b)| {
query.par_iter_mut().for_each(|(a, mut b)| {
b.0 = a.0;
});
}
Expand Down
105 changes: 48 additions & 57 deletions crates/bevy_ecs/src/query/par_iter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{component::Tick, world::unsafe_world_cell::UnsafeWorldCell};
use std::ops::Range;

use super::{QueryItem, QueryState, ROQueryItem, ReadOnlyWorldQuery, WorldQuery};
use super::{QueryItem, QueryState, ReadOnlyWorldQuery, WorldQuery};

/// Dictates how a parallel query chunks up large tables/archetypes
/// during iteration.
Expand Down Expand Up @@ -90,26 +90,6 @@ pub struct QueryParIter<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> {
pub(crate) batching_strategy: BatchingStrategy,
}

impl<'w, 's, Q: ReadOnlyWorldQuery, F: ReadOnlyWorldQuery> QueryParIter<'w, 's, Q, F> {
/// Runs `func` on each query result in parallel.
///
/// This can only be called for read-only queries, see [`Self::for_each_mut`] for
/// write-queries.
///
/// # Panics
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[inline]
pub fn for_each<FN: Fn(ROQueryItem<'w, Q>) + Send + Sync + Clone>(&self, func: FN) {
// SAFETY: query is read only
unsafe {
self.for_each_unchecked(func);
}
}
}

impl<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> QueryParIter<'w, 's, Q, F> {
/// Changes the batching strategy used when iterating.
///
Expand All @@ -123,61 +103,72 @@ impl<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> QueryParIter<'w, 's, Q, F> {
/// Runs `func` on each query result in parallel.
///
/// # Panics
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[inline]
pub fn for_each_mut<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(&mut self, func: FN) {
// SAFETY: query has unique world access
unsafe {
self.for_each_unchecked(func);
}
}

/// Runs `func` on each query result in parallel.
///
/// # Panics
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
///
/// # Safety
///
/// This does not check for mutable query correctness. To be safe, make sure mutable queries
/// have unique access to the components they query.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[inline]
unsafe fn for_each_unchecked<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(&self, func: FN) {
pub fn for_each<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(self, func: FN) {
#[cfg(any(target = "wasm32", not(feature = "multi-threaded")))]
{
self.state
.for_each_unchecked_manual(self.world, func, self.last_run, self.this_run);
}
#[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))]
{
let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
if thread_count <= 1 {
// SAFETY:
// This method can only be called once per instance of QueryParIter,
// which ensures that mutable queries cannot be executed multiple times at once.
// Mutable instances of QueryParIter can only be created via an exclusive borrow of a
// Query or a World, which ensures that multiple aliasing QueryParIters cannot exist
// at the same time.
unsafe {
self.state.for_each_unchecked_manual(
self.world,
func,
self.last_run,
self.this_run,
);
}
}
#[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))]
{
let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
if thread_count <= 1 {
// SAFETY: See the safety comment above.
unsafe {
self.state.for_each_unchecked_manual(
self.world,
func,
self.last_run,
self.this_run,
);
}
} else {
// Need a batch size of at least 1.
let batch_size = self.get_batch_size(thread_count).max(1);
self.state.par_for_each_unchecked_manual(
self.world,
batch_size,
func,
self.last_run,
self.this_run,
);
// SAFETY: See the safety comment above.
unsafe {
self.state.par_for_each_unchecked_manual(
self.world,
batch_size,
func,
self.last_run,
self.this_run,
);
}
}
}
}

/// Runs `func` on each query result in parallel.
///
/// # Panics
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[inline]
#[deprecated = "use `.for_each(...)` instead."]
pub fn for_each_mut<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(self, func: FN) {
self.for_each(func);
}

#[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))]
fn get_batch_size(&self, thread_count: usize) -> usize {
if self.batching_strategy.batch_size_limits.is_empty() {
Expand Down
4 changes: 2 additions & 2 deletions crates/bevy_render/src/view/visibility/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ pub fn check_visibility(
let view_mask = maybe_view_mask.copied().unwrap_or_default();

visible_entities.entities.clear();
visible_aabb_query.par_iter_mut().for_each_mut(
visible_aabb_query.par_iter_mut().for_each(
|(
entity,
mut computed_visibility,
Expand Down Expand Up @@ -412,7 +412,7 @@ pub fn check_visibility(
},
);

visible_no_aabb_query.par_iter_mut().for_each_mut(
visible_no_aabb_query.par_iter_mut().for_each(
|(entity, mut computed_visibility, maybe_entity_mask)| {
// skip computing visibility for entities that are configured to be hidden. is_visible_in_view has already been set to false
// in visibility_propagate_system
Expand Down
4 changes: 2 additions & 2 deletions crates/bevy_transform/src/systems.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn sync_simple_transforms(
query
.p0()
.par_iter_mut()
.for_each_mut(|(transform, mut global_transform)| {
.for_each(|(transform, mut global_transform)| {
*global_transform = GlobalTransform::from(*transform);
});
// Update orphaned entities.
Expand Down Expand Up @@ -59,7 +59,7 @@ pub fn propagate_transforms(
orphaned_entities.clear();
orphaned_entities.extend(orphaned.iter());
orphaned_entities.sort_unstable();
root_query.par_iter_mut().for_each_mut(
root_query.par_iter_mut().for_each(
|(entity, children, transform, mut global_transform)| {
let changed = transform.is_changed() || global_transform.is_added() || orphaned_entities.binary_search(&entity).is_ok();
if changed {
Expand Down
4 changes: 2 additions & 2 deletions examples/ecs/parallel_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn move_system(mut sprites: Query<(&mut Transform, &Velocity)>) {
// to use or not use ParallelIterator over a normal Iterator.
sprites
.par_iter_mut()
.for_each_mut(|(mut transform, velocity)| {
.for_each(|(mut transform, velocity)| {
transform.translation += velocity.extend(0.0);
});
}
Expand All @@ -54,7 +54,7 @@ fn bounce_system(windows: Query<&Window>, mut sprites: Query<(&Transform, &mut V
sprites
.par_iter_mut()
.batching_strategy(BatchingStrategy::fixed(32))
.for_each_mut(|(transform, mut v)| {
.for_each(|(transform, mut v)| {
if !(left < transform.translation.x
&& transform.translation.x < right
&& bottom < transform.translation.y
Expand Down

0 comments on commit ddbfa48

Please sign in to comment.