Skip to content

Commit

Permalink
Fix bug where events where not being dropped
Browse files Browse the repository at this point in the history
  • Loading branch information
Dig-Doug committed Jan 27, 2024
1 parent b592a72 commit 53717a2
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 13 deletions.
8 changes: 7 additions & 1 deletion crates/bevy_app/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{First, Main, MainSchedulePlugin, Plugin, Plugins, StateTransition};
use crate::{First, FixedPostUpdate, Main, MainSchedulePlugin, Plugin, Plugins, StateTransition};
pub use bevy_derive::AppLabel;
use bevy_ecs::{
prelude::*,
Expand Down Expand Up @@ -213,6 +213,7 @@ pub enum PluginsState {

// Dummy plugin used to temporary hold the place in the plugin registry
struct PlaceholderPlugin;

impl Plugin for PlaceholderPlugin {
fn build(&self, _app: &mut App) {}
}
Expand Down Expand Up @@ -507,6 +508,11 @@ impl App {
bevy_ecs::event::event_update_system::<T>
.run_if(bevy_ecs::event::event_update_condition::<T>),
);
self.init_resource::<bevy_ecs::event::EventUpdateSignal<T>>()
.add_systems(
FixedPostUpdate,
bevy_ecs::event::event_queue_update_system::<T>,
);
}
self
}
Expand Down
45 changes: 36 additions & 9 deletions crates/bevy_ecs/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate as bevy_ecs;
use crate::system::{Local, Res, ResMut, Resource, SystemParam};
pub use bevy_ecs_macros::Event;
use bevy_utils::detailed_trace;
use bevy_utils::tracing::error;
use std::ops::{Deref, DerefMut};
use std::{
cmp::Ordering,
Expand All @@ -13,6 +14,7 @@ use std::{
marker::PhantomData,
slice::Iter,
};

/// A type that can be stored in an [`Events<E>`] resource
/// You can conveniently access events using the [`EventReader`] and [`EventWriter`] system parameter.
///
Expand All @@ -33,6 +35,7 @@ pub struct EventId<E: Event> {
}

impl<E: Event> Copy for EventId<E> {}

impl<E: Event> Clone for EventId<E> {
fn clone(&self) -> Self {
*self
Expand Down Expand Up @@ -747,27 +750,51 @@ impl<'a, E: Event> ExactSizeIterator for EventIteratorWithId<'a, E> {
}
}

#[doc(hidden)]
#[derive(Resource)]
pub struct EventUpdateSignal<T: Event> {
signal: bool,
_marker: PhantomData<fn() -> T>,
}

impl<T: Event> Default for EventUpdateSignal<T> {
fn default() -> Self {
Self {
signal: false,
_marker: PhantomData,
}
}
}

#[doc(hidden)]
#[derive(Resource, Default)]
pub struct EventUpdateSignal(bool);
pub struct EventUpdateShouldWaitForFixedUpdate;

/// A system that queues a call to [`Events::update`].
pub fn event_queue_update_system(signal: Option<ResMut<EventUpdateSignal>>) {
pub fn event_queue_update_system<T: Event>(signal: Option<ResMut<EventUpdateSignal<T>>>) {
if let Some(mut s) = signal {
s.0 = true;
s.signal = true;
}
}

/// A system that calls [`Events::update`].
pub fn event_update_system<T: Event>(
signal: Option<ResMut<EventUpdateSignal>>,
should_wait: Option<Res<EventUpdateShouldWaitForFixedUpdate>>,
should_update: Option<ResMut<EventUpdateSignal<T>>>,
mut events: ResMut<Events<T>>,
) {
if let Some(mut s) = signal {
// If we haven't got a signal to update the events, but we *could* get such a signal
// return early and update the events later.
if !std::mem::replace(&mut s.0, false) {
return;
if should_wait.is_some() {
match should_update {
Some(mut should_update) => {
// If we haven't got a signal to update the events, but we *could* get such a signal
// return early and update the events later.
if !std::mem::replace(&mut should_update.signal, false) {
return;
}
}
None => {
error!("EventUpdateSignal<{0}> resource not found but fixed update systems are active. Please add EventUpdateSignal<{0}> as a resource", std::any::type_name::<T>());
}
}
}

Expand Down
65 changes: 62 additions & 3 deletions crates/bevy_time/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod prelude {
}

use bevy_app::{prelude::*, RunFixedMainLoop};
use bevy_ecs::event::{event_queue_update_system, EventUpdateSignal};
use bevy_ecs::event::EventUpdateShouldWaitForFixedUpdate;
use bevy_ecs::prelude::*;
use bevy_utils::{tracing::warn, Duration, Instant};
pub use crossbeam_channel::TrySendError;
Expand Down Expand Up @@ -60,8 +60,7 @@ impl Plugin for TimePlugin {
.add_systems(RunFixedMainLoop, run_fixed_main_schedule);

// ensure the events are not dropped until `FixedMain` systems can observe them
app.init_resource::<EventUpdateSignal>()
.add_systems(FixedPostUpdate, event_queue_update_system);
app.init_resource::<EventUpdateShouldWaitForFixedUpdate>();

#[cfg(feature = "bevy_ci_testing")]
if let Some(ci_testing_config) = app
Expand Down Expand Up @@ -142,3 +141,63 @@ fn time_system(
TimeUpdateStrategy::ManualDuration(duration) => time.update_with_duration(*duration),
}
}

#[cfg(test)]
mod tests {
use crate::{Fixed, Time, TimePlugin, TimeUpdateStrategy};
use bevy_app::{App, Startup, Update};
use bevy_ecs::event::{Event, EventReader, EventWriter};
use std::error::Error;
use std::time::Duration;

#[derive(Event)]
struct TestEvent<T: Default> {
sender: std::sync::mpsc::Sender<T>,
}

impl<T: Default> Drop for TestEvent<T> {
fn drop(&mut self) {
self.sender
.send(T::default())
.expect("Failed to send drop signal");
}
}

#[test]
fn events_get_dropped_regression_test_11528() -> Result<(), impl Error> {
let (tx1, rx1) = std::sync::mpsc::channel();
let (tx2, rx2) = std::sync::mpsc::channel();
let mut app = App::new();
app.add_plugins(TimePlugin::default())
.add_event::<TestEvent<i32>>()
.add_event::<TestEvent<()>>()
.add_systems(Startup, move |mut ev2: EventWriter<TestEvent<()>>| {
ev2.send(TestEvent {
sender: tx2.clone(),
});
})
.add_systems(Update, move |mut ev1: EventWriter<TestEvent<i32>>| {
// Keep adding events so this event type is processed every update
ev1.send(TestEvent {
sender: tx1.clone(),
});
})
.add_systems(
Update,
|mut ev1: EventReader<TestEvent<i32>>, mut ev2: EventReader<TestEvent<()>>| {
// Read events so they can be dropped
for _ in ev1.read() {}
for _ in ev2.read() {}
},
)
.insert_resource(TimeUpdateStrategy::ManualDuration(
Time::<Fixed>::default().timestep(),
));

for _ in 0..10 {
app.update();
}

rx2.recv_timeout(Duration::from_millis(1000))
}
}

0 comments on commit 53717a2

Please sign in to comment.