diff --git a/Cargo.toml b/Cargo.toml index 95141021a8077..b09e5fdc6530d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -259,6 +259,10 @@ path = "examples/ecs/change_detection.rs" name = "event" path = "examples/ecs/event.rs" +[[example]] +name = "event_consumer" +path = "examples/ecs/event_consumer.rs" + [[example]] name = "fixed_timestep" path = "examples/ecs/fixed_timestep.rs" diff --git a/crates/bevy_ecs/src/event.rs b/crates/bevy_ecs/src/event.rs index 889d9d22e8b4c..f0ed70ac2381e 100644 --- a/crates/bevy_ecs/src/event.rs +++ b/crates/bevy_ecs/src/event.rs @@ -51,7 +51,7 @@ struct EventInstance { } #[derive(Debug)] -enum State { +enum BufferState { A, B, } @@ -125,7 +125,7 @@ pub struct Events { a_start_event_count: usize, b_start_event_count: usize, event_count: usize, - state: State, + state: BufferState, } impl Default for Events { @@ -136,69 +136,132 @@ impl Default for Events { event_count: 0, events_a: Vec::new(), events_b: Vec::new(), - state: State::A, + state: BufferState::A, } } } -fn map_instance_event_with_id(event_instance: &EventInstance) -> (&T, EventId) { - (&event_instance.event, event_instance.event_id) -} - -fn map_instance_event(event_instance: &EventInstance) -> &T { - &event_instance.event -} +impl Events { + /// "Sends" an `event` by writing it to the current event buffer. [EventReader]s can then read + /// the event. + pub fn send(&mut self, event: T) { + let event_id = EventId { + id: self.event_count, + _marker: PhantomData, + }; + trace!("Events::send() -> {}", event_id); -/// Reads events of type `T` in order and tracks which events have already been read. -#[derive(SystemParam)] -pub struct EventReader<'a, T: Component> { - last_event_count: Local<'a, (usize, PhantomData)>, - events: Res<'a, Events>, -} + let event_instance = EventInstance { event_id, event }; -/// Sends events of type `T`. -#[derive(SystemParam)] -pub struct EventWriter<'a, T: Component> { - events: ResMut<'a, Events>, -} + match self.state { + BufferState::A => self.events_a.push(event_instance), + BufferState::B => self.events_b.push(event_instance), + } -impl<'a, T: Component> EventWriter<'a, T> { - pub fn send(&mut self, event: T) { - self.events.send(event); + self.event_count += 1; } - pub fn send_batch(&mut self, events: impl Iterator) { - self.events.extend(events); + /// Gets a new [ManualEventReader]. This will include all events already in the event buffers. + pub fn get_reader(&self) -> ManualEventReader { + ManualEventReader { + last_event_count: 0, + _marker: PhantomData, + } } -} - -pub struct ManualEventReader { - last_event_count: usize, - _marker: PhantomData, -} -impl Default for ManualEventReader { - fn default() -> Self { + /// Gets a new [ManualEventReader]. This will ignore all events already in the event buffers. It + /// will read all future events. + pub fn get_reader_current(&self) -> ManualEventReader { ManualEventReader { - last_event_count: 0, - _marker: Default::default(), + last_event_count: self.event_count, + _marker: PhantomData, } } -} -impl ManualEventReader { - /// See [`EventReader::iter`] - pub fn iter<'a>(&mut self, events: &'a Events) -> impl DoubleEndedIterator { - internal_event_reader(&mut self.last_event_count, events).map(|(e, _)| e) + /// Swaps the event buffers and clears the oldest event buffer. In general, this should be + /// called once per frame/update. + pub fn update(&mut self) { + match self.state { + BufferState::A => { + self.events_b = Vec::new(); + self.state = BufferState::B; + self.b_start_event_count = self.event_count; + } + BufferState::B => { + self.events_a = Vec::new(); + self.state = BufferState::A; + self.a_start_event_count = self.event_count; + } + } } - /// See [`EventReader::iter_with_id`] - pub fn iter_with_id<'a>( - &mut self, - events: &'a Events, - ) -> impl DoubleEndedIterator)> { - internal_event_reader(&mut self.last_event_count, events) + /// A system that calls [Events::update] once per frame. + pub fn update_system(mut events: ResMut) { + events.update(); + } + + /// Removes all events. + pub fn clear(&mut self) { + self.events_a.clear(); + self.events_b.clear(); + } + + /// Creates a draining iterator that removes all events. + pub fn drain(&mut self) -> impl DoubleEndedIterator + '_ { + self.drain_with_id().map(|(e, _)| e) + } + + /// Creates a draining iterator that returns both events and their ids + pub fn drain_with_id(&mut self) -> impl DoubleEndedIterator)> + '_ { + let event_instances = match self.state { + BufferState::A => self.events_b.drain(..).chain(self.events_a.drain(..)), + BufferState::B => self.events_a.drain(..).chain(self.events_b.drain(..)), + }; + + // We must reset these values to 0 as all events have been drained + self.a_start_event_count = 0; + self.b_start_event_count = 0; + self.event_count = 0; + // Reset to A for consistency with Self::default() + self.state = BufferState::A; + + event_instances.map(|instance| { + trace!("Events::drain_with_id -> {}", instance.event_id); + (instance.event, instance.event_id) + }) } + + pub fn extend(&mut self, events: I) + where + I: Iterator, + { + for event in events { + self.send(event); + } + } + + /// Iterates over events that happened since the last "update" call. + /// WARNING: You probably don't want to use this call. In most cases you should use an + /// `EventReader`. You should only use this if you know you only need to consume events + /// between the last `update()` call and your call to `iter_current_update_events`. + /// If events happen outside that window, they will not be handled. For example, any events that + /// happen after this call and before the next `update()` call will be dropped. + pub fn iter_current_update_events(&self) -> impl DoubleEndedIterator { + match self.state { + BufferState::A => self.events_a.iter().map(map_instance_event), + BufferState::B => self.events_b.iter().map(map_instance_event), + } + } +} + +// Used in internal_event_reader instead of a closure to ensure stable type +fn map_instance_event_with_id(event_instance: &EventInstance) -> (&T, EventId) { + (&event_instance.event, event_instance.event_id) +} + +// Used in internal_event_reader instead of a closure to ensure stable type +fn map_instance_event(event_instance: &EventInstance) -> &T { + &event_instance.event } /// Like [`iter_with_id`](EventReader::iter_with_id) except not emitting any traces for read @@ -221,7 +284,7 @@ fn internal_event_reader<'a, T>( }; *last_event_count = events.event_count; match events.state { - State::A => events + BufferState::A => events .events_b .get(b_index..) .unwrap_or_else(|| &[]) @@ -235,7 +298,7 @@ fn internal_event_reader<'a, T>( .iter() .map(map_instance_event_with_id), ), - State::B => events + BufferState::B => events .events_a .get(a_index..) .unwrap_or_else(|| &[]) @@ -251,6 +314,28 @@ fn internal_event_reader<'a, T>( ), } } +/// Sends events of type `T`. +#[derive(SystemParam)] +pub struct EventWriter<'a, T: Component> { + events: ResMut<'a, Events>, +} + +impl<'a, T: Component> EventWriter<'a, T> { + pub fn send(&mut self, event: T) { + self.events.send(event); + } + + pub fn send_batch(&mut self, events: impl Iterator) { + self.events.extend(events); + } +} + +/// Reads events of type `T` in order and tracks which events have already been read. +#[derive(SystemParam)] +pub struct EventReader<'a, T: Component> { + last_event_count: Local<'a, (usize, PhantomData)>, + events: Res<'a, Events>, +} impl<'a, T: Component> EventReader<'a, T> { /// Iterates over the events this EventReader has not seen yet. This updates the EventReader's @@ -269,120 +354,127 @@ impl<'a, T: Component> EventReader<'a, T> { } } -impl Events { - /// "Sends" an `event` by writing it to the current event buffer. [EventReader]s can then read - /// the event. - pub fn send(&mut self, event: T) { - let event_id = EventId { - id: self.event_count, - _marker: PhantomData, - }; - trace!("Events::send() -> {}", event_id); - - let event_instance = EventInstance { event_id, event }; - - match self.state { - State::A => self.events_a.push(event_instance), - State::B => self.events_b.push(event_instance), - } - - self.event_count += 1; - } - - /// Gets a new [ManualEventReader]. This will include all events already in the event buffers. - pub fn get_reader(&self) -> ManualEventReader { - ManualEventReader { - last_event_count: 0, - _marker: PhantomData, - } - } - - /// Gets a new [ManualEventReader]. This will ignore all events already in the event buffers. It - /// will read all future events. - pub fn get_reader_current(&self) -> ManualEventReader { - ManualEventReader { - last_event_count: self.event_count, - _marker: PhantomData, - } - } +/// Reads and consumes all events of type T when .drain or .drain_with_id are called +/// +/// Useful for manual event cleanup when [AppBuilder::add_event::] is omitted, +/// allowing events to accumulate on your components or resources until consumed. +/// Note: due to the draining nature of this reader, you probably only want one +/// EventConsumer per event storage location + event type combination. +#[derive(SystemParam)] +pub struct EventConsumer<'a, T: Component> { + events: ResMut<'a, Events>, +} - /// Swaps the event buffers and clears the oldest event buffer. In general, this should be - /// called once per frame/update. - pub fn update(&mut self) { - match self.state { - State::A => { - self.events_b = Vec::new(); - self.state = State::B; - self.b_start_event_count = self.event_count; - } - State::B => { - self.events_a = Vec::new(); - self.state = State::A; - self.a_start_event_count = self.event_count; - } - } +impl<'a, T: Component> EventConsumer<'a, T> { + /// Drains all available events this EventConsumer has access to into an iterator + pub fn drain(self) -> impl DoubleEndedIterator + 'a { + // into_inner is needed to ensure the lifetime is not bound to the implicit .deref_mut() call + self.events.into_inner().drain() } - /// A system that calls [Events::update] once per frame. - pub fn update_system(mut events: ResMut) { - events.update(); + /// Drains all available events this EventConsumer has access to into an iterator and returns the id + pub fn drain_with_id(self) -> impl DoubleEndedIterator)> + 'a { + // into_inner is needed to ensure the lifetime is not bound to the implicit .deref_mut() call + self.events.into_inner().drain_with_id() } +} - /// Removes all events. - pub fn clear(&mut self) { - self.events_a.clear(); - self.events_b.clear(); - } +pub struct ManualEventReader { + last_event_count: usize, + _marker: PhantomData, +} - /// Creates a draining iterator that removes all events. - pub fn drain(&mut self) -> impl Iterator + '_ { - let map = |i: EventInstance| i.event; - match self.state { - State::A => self - .events_b - .drain(..) - .map(map) - .chain(self.events_a.drain(..).map(map)), - State::B => self - .events_a - .drain(..) - .map(map) - .chain(self.events_b.drain(..).map(map)), +impl Default for ManualEventReader { + fn default() -> Self { + ManualEventReader { + last_event_count: 0, + _marker: Default::default(), } } +} - pub fn extend(&mut self, events: I) - where - I: Iterator, - { - for event in events { - self.send(event); - } +impl ManualEventReader { + /// See [`EventReader::iter`] + pub fn iter<'a>(&mut self, events: &'a Events) -> impl DoubleEndedIterator { + internal_event_reader(&mut self.last_event_count, events).map(|(e, _)| e) } - /// Iterates over events that happened since the last "update" call. - /// WARNING: You probably don't want to use this call. In most cases you should use an - /// `EventReader`. You should only use this if you know you only need to consume events - /// between the last `update()` call and your call to `iter_current_update_events`. - /// If events happen outside that window, they will not be handled. For example, any events that - /// happen after this call and before the next `update()` call will be dropped. - pub fn iter_current_update_events(&self) -> impl DoubleEndedIterator { - match self.state { - State::A => self.events_a.iter().map(map_instance_event), - State::B => self.events_b.iter().map(map_instance_event), - } + /// See [`EventReader::iter_with_id`] + pub fn iter_with_id<'a>( + &mut self, + events: &'a Events, + ) -> impl DoubleEndedIterator)> { + internal_event_reader(&mut self.last_event_count, events) } } #[cfg(test)] mod tests { use super::*; + use crate::schedule::{Stage, SystemStage}; + use crate::system::IntoSystem; + use crate::world::World; #[derive(Copy, Clone, PartialEq, Eq, Debug)] struct TestEvent { i: usize, } + #[test] + fn event_system_params() { + struct E; + let mut world = World::default(); + world.insert_resource(Events::::default()); + + fn writes(mut ew: EventWriter) { + ew.send(E) + } + fn reads(mut er: EventReader) { + er.iter(); + } + fn consumes(ec: EventConsumer) { + ec.drain(); + } + + let mut stage1 = SystemStage::parallel(); + stage1.add_system(writes.system()); + stage1.add_system(reads.system()); + stage1.run(&mut world); + + let current_events = world.get_resource::>().unwrap(); + assert!(current_events.events_a.len() == 1); + + let mut stage2 = SystemStage::parallel(); + stage2.add_system(consumes.system()); + + stage2.run(&mut world); + let current_events = world.get_resource::>().unwrap(); + assert!(current_events.events_a.is_empty()); + } + + #[test] + fn write_eat_repeat() { + struct E; + let mut events = Events::::default(); + let mut reader = events.get_reader(); + + assert!(reader.iter(&events).next().is_none()); + + // Send an event + events.send(E); + assert!(reader.iter(&events).next().is_some()); + assert!(reader.iter(&events).next().is_none()); + + // Eat all events + let _ = events.drain(); + assert!(reader.iter(&events).next().is_none()); + + // Try again + events.send(E); + assert!(reader.iter(&events).next().is_some()); + assert!(reader.iter(&events).next().is_none()); + } + #[test] fn test_events() { let mut events = Events::::default(); diff --git a/crates/bevy_ecs/src/lib.rs b/crates/bevy_ecs/src/lib.rs index f45c6790a5780..9e6839424e53e 100644 --- a/crates/bevy_ecs/src/lib.rs +++ b/crates/bevy_ecs/src/lib.rs @@ -19,7 +19,7 @@ pub mod prelude { pub use crate::{ bundle::Bundle, entity::Entity, - event::{EventReader, EventWriter}, + event::{EventConsumer, EventReader, EventWriter}, query::{Added, ChangeTrackers, Changed, Or, QueryState, With, WithBundle, Without}, schedule::{ AmbiguitySetLabel, ExclusiveSystemDescriptorCoercion, ParallelSystemDescriptorCoercion, diff --git a/crates/bevy_ecs/src/system/system_param.rs b/crates/bevy_ecs/src/system/system_param.rs index 5dce8e4b2e477..24bf7e5edbb0d 100644 --- a/crates/bevy_ecs/src/system/system_param.rs +++ b/crates/bevy_ecs/src/system/system_param.rs @@ -367,6 +367,13 @@ impl<'w, T: Component> DerefMut for ResMut<'w, T> { } } +impl<'a, T: Component> ResMut<'a, T> { + pub fn into_inner(self) -> &'a mut T { + self.ticks.set_changed(self.change_tick); + self.value + } +} + /// The [`SystemParamState`] of [`ResMut`]. pub struct ResMutState { component_id: ComponentId, diff --git a/examples/README.md b/examples/README.md index 096afc63e4193..e6971549315d8 100644 --- a/examples/README.md +++ b/examples/README.md @@ -151,6 +151,7 @@ Example | File | Description `ecs_guide` | [`ecs/ecs_guide.rs`](./ecs/ecs_guide.rs) | Full guide to Bevy's ECS `change_detection` | [`ecs/change_detection.rs`](./ecs/change_detection.rs) | Change detection on components `event` | [`ecs/event.rs`](./ecs/event.rs) | Illustrates event creation, activation, and reception +`event_consumer` | [`ecs/event_consumer.rs`](./ecs/event_consumer.rs) | Shows how to consume events and avoid automatic event cleanup when skipping systems `fixed_timestep` | [`ecs/fixed_timestep.rs`](./ecs/fixed_timestep.rs) | Shows how to create systems that run every fixed timestep, rather than every tick `hierarchy` | [`ecs/hierarchy.rs`](./ecs/hierarchy.rs) | Creates a hierarchy of parents and children entities `parallel_query` | [`ecs/parallel_query.rs`](./ecs/parallel_query.rs) | Illustrates parallel queries with `ParallelIterator` diff --git a/examples/ecs/event.rs b/examples/ecs/event.rs index 0ae4d028c0ba9..e8b016cfff53e 100644 --- a/examples/ecs/event.rs +++ b/examples/ecs/event.rs @@ -1,3 +1,4 @@ +use bevy::core::FixedTimestep; use bevy::prelude::*; /// This example creates a new event, a system that triggers the event once per second, @@ -6,8 +7,11 @@ fn main() { App::build() .add_plugins(DefaultPlugins) .add_event::() - .init_resource::() - .add_system(event_trigger_system.system()) + .add_system( + event_trigger_system + .system() + .with_run_criteria(FixedTimestep::step(1.0)), + ) .add_system(event_listener_system.system()) .run(); } @@ -16,29 +20,11 @@ struct MyEvent { pub message: String, } -struct EventTriggerState { - event_timer: Timer, -} - -impl Default for EventTriggerState { - fn default() -> Self { - EventTriggerState { - event_timer: Timer::from_seconds(1.0, true), - } - } -} - // sends MyEvent every second -fn event_trigger_system( - time: Res