From 504b8dfaefb71770f9b5c8cb6d0b1d6e0881f085 Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Sat, 26 Dec 2020 23:08:35 +0000 Subject: [PATCH] Driver, Input: Performance & Benchmarks (#27) * Driver Benchmarks Benchmarks driver use cases for single packet send, multiple packet send, float vs opus, and the cost of head-of-queue track removal. Mix costs for large packet counts are also included. This is a prelude to the optimisations discussed in #21. * Typo in benchmark * Place Opus packet directly into packet buffer Cleans up some other logic surrounding this, too. Gets a 16.9% perf improvement on opus packet passthrough (sub 5us here). * Better track removal In theory this should be faster, but it aint. Keeping in case reducing struct sizes down the line magically makes this faster. * Reduce size of Input, TrackHandle Metadata is now boxed away. Similarly, TrackHandles are neatly Arc'd to reduce their size to pointer length (and mitigate the impact of copies if we add in more fields). --- .github/workflows/docs.yml | 2 +- Cargo.toml | 14 +- benches/{mixing.rs => base-mixing.rs} | 0 benches/mixing-task.rs | 237 ++++++++++++++++++++++++ src/driver/bench_internals.rs | 8 + src/driver/crypto.rs | 4 +- src/driver/mod.rs | 6 +- src/driver/tasks/events.rs | 6 +- src/driver/tasks/message/core.rs | 2 + src/driver/tasks/message/events.rs | 4 +- src/driver/tasks/message/mixer.rs | 6 +- src/driver/tasks/message/mod.rs | 6 +- src/driver/tasks/message/udp_rx.rs | 4 +- src/driver/tasks/message/udp_tx.rs | 2 + src/driver/tasks/message/ws.rs | 4 +- src/driver/tasks/mixer.rs | 247 +++++++++++++++----------- src/driver/tasks/mod.rs | 6 +- src/events/context.rs | 2 +- src/events/mod.rs | 3 +- src/input/mod.rs | 4 +- src/manager.rs | 2 +- src/tracks/handle.rs | 36 ++-- src/tracks/queue.rs | 2 +- 23 files changed, 462 insertions(+), 145 deletions(-) rename benches/{mixing.rs => base-mixing.rs} (100%) create mode 100644 benches/mixing-task.rs create mode 100644 src/driver/bench_internals.rs diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 547813c84..8f615acf4 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -41,7 +41,7 @@ jobs: env: RUSTDOCFLAGS: -D broken_intra_doc_links run: | - cargo doc --no-deps --all-features + cargo doc --no-deps --features default,twilight-rustls,builtin-queue,stock-zlib - name: Prepare docs shell: bash -e -O extglob {0} diff --git a/Cargo.toml b/Cargo.toml index 0248d8714..580151c0f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -158,10 +158,18 @@ serenity-deps = ["async-trait"] youtube-dlc = [] builtin-queue = [] +internals = [] + +[[bench]] +name = "base-mixing" +path = "benches/base-mixing.rs" +harness = false + [[bench]] -name = "mixing" -path = "benches/mixing.rs" +name = "mixing-task" +path = "benches/mixing-task.rs" +required-features = ["internals"] harness = false [package.metadata.docs.rs] -all-features = true +features = ["default", "twilight-rustls", "builtin-queue", "stock-zlib"] diff --git a/benches/mixing.rs b/benches/base-mixing.rs similarity index 100% rename from benches/mixing.rs rename to benches/base-mixing.rs diff --git a/benches/mixing-task.rs b/benches/mixing-task.rs new file mode 100644 index 000000000..fcf37f323 --- /dev/null +++ b/benches/mixing-task.rs @@ -0,0 +1,237 @@ +use criterion::{ + black_box, + criterion_group, + criterion_main, + BatchSize, + Bencher, + BenchmarkId, + Criterion, +}; +use flume::{Receiver, Sender, TryRecvError}; +use songbird::{ + constants::*, + driver::bench_internals::{mixer::Mixer, task_message::*, CryptoState}, + input::{cached::Compressed, Input}, + tracks, + Bitrate, +}; +use tokio::runtime::{Handle, Runtime}; +use xsalsa20poly1305::{aead::NewAead, XSalsa20Poly1305 as Cipher, KEY_SIZE}; + +// create a dummied task + interconnect. +// measure perf at varying numbers of sources (binary 1--64) without passthrough support. + +fn dummied_mixer( + handle: Handle, +) -> ( + Mixer, + ( + Receiver, + Receiver, + Receiver, + Receiver, + ), +) { + let (mix_tx, mix_rx) = flume::unbounded(); + let (core_tx, core_rx) = flume::unbounded(); + let (event_tx, event_rx) = flume::unbounded(); + + let (udp_sender_tx, udp_sender_rx) = flume::unbounded(); + let (udp_receiver_tx, udp_receiver_rx) = flume::unbounded(); + + let ic = Interconnect { + core: core_tx, + events: event_tx, + mixer: mix_tx, + }; + + let mut out = Mixer::new(mix_rx, handle, ic, Default::default()); + + let fake_conn = MixerConnection { + cipher: Cipher::new_varkey(&vec![0u8; KEY_SIZE]).unwrap(), + crypto_state: CryptoState::Normal, + udp_rx: udp_receiver_tx, + udp_tx: udp_sender_tx, + }; + + out.conn_active = Some(fake_conn); + + out.skip_sleep = true; + + (out, (core_rx, event_rx, udp_receiver_rx, udp_sender_rx)) +} + +fn mixer_float( + num_tracks: usize, + handle: Handle, +) -> ( + Mixer, + ( + Receiver, + Receiver, + Receiver, + Receiver, + ), +) { + let mut out = dummied_mixer(handle); + + let floats = utils::make_sine(10 * STEREO_FRAME_SIZE, true); + + let mut tracks = vec![]; + for i in 0..num_tracks { + let input = Input::float_pcm(true, floats.clone().into()); + tracks.push(tracks::create_player(input).0.into()); + } + + out.0.tracks = tracks; + + out +} + +fn mixer_float_drop( + num_tracks: usize, + handle: Handle, +) -> ( + Mixer, + ( + Receiver, + Receiver, + Receiver, + Receiver, + ), +) { + let mut out = dummied_mixer(handle); + + let mut tracks = vec![]; + for i in 0..num_tracks { + let floats = utils::make_sine((i / 5) * STEREO_FRAME_SIZE, true); + let input = Input::float_pcm(true, floats.clone().into()); + tracks.push(tracks::create_player(input).0.into()); + } + + out.0.tracks = tracks; + + out +} + +fn mixer_opus( + handle: Handle, +) -> ( + Mixer, + ( + Receiver, + Receiver, + Receiver, + Receiver, + ), +) { + // should add a single opus-based track. + // make this fully loaded to prevent any perf cost there. + let mut out = dummied_mixer(handle); + + let floats = utils::make_sine(6 * STEREO_FRAME_SIZE, true); + + let mut tracks = vec![]; + + let mut src = Compressed::new( + Input::float_pcm(true, floats.clone().into()), + Bitrate::BitsPerSecond(128_000), + ) + .expect("These parameters are well-defined."); + src.raw.load_all(); + + tracks.push(tracks::create_player(src.into()).0.into()); + + out.0.tracks = tracks; + + out +} + +fn no_passthrough(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + let mut group = c.benchmark_group("Float Input (No Passthrough)"); + + for shift in 0..=6 { + let track_count = 1 << shift; + + group.bench_with_input( + BenchmarkId::new("Single Packet", track_count), + &track_count, + |b, i| { + b.iter_batched_ref( + || black_box(mixer_float(*i, rt.handle().clone())), + |input| { + black_box(input.0.cycle()); + }, + BatchSize::SmallInput, + ) + }, + ); + group.bench_with_input( + BenchmarkId::new("n=5 Packets", track_count), + &track_count, + |b, i| { + b.iter_batched_ref( + || black_box(mixer_float(*i, rt.handle().clone())), + |input| { + for i in 0..5 { + black_box(input.0.cycle()); + } + }, + BatchSize::SmallInput, + ) + }, + ); + } + + group.finish(); +} + +fn passthrough(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + let mut group = c.benchmark_group("Opus Input (Passthrough)"); + + group.bench_function("Single Packet", |b| { + b.iter_batched_ref( + || black_box(mixer_opus(rt.handle().clone())), + |input| { + black_box(input.0.cycle()); + }, + BatchSize::SmallInput, + ) + }); + group.bench_function("n=5 Packets", |b| { + b.iter_batched_ref( + || black_box(mixer_opus(rt.handle().clone())), + |input| { + for i in 0..5 { + black_box(input.0.cycle()); + } + }, + BatchSize::SmallInput, + ) + }); + + group.finish(); +} + +fn culling(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + c.bench_function("Worst-case Track Culling (15 tracks, 5 pkts)", |b| { + b.iter_batched_ref( + || black_box(mixer_float_drop(15, rt.handle().clone())), + |input| { + for i in 0..5 { + black_box(input.0.cycle()); + } + }, + BatchSize::SmallInput, + ) + }); +} + +criterion_group!(benches, no_passthrough, passthrough, culling); +criterion_main!(benches); diff --git a/src/driver/bench_internals.rs b/src/driver/bench_internals.rs new file mode 100644 index 000000000..d335d49fa --- /dev/null +++ b/src/driver/bench_internals.rs @@ -0,0 +1,8 @@ +//! Various driver internals which need to be exported for benchmarking. +//! +//! Included if using the `"internals"` feature flag. +//! You should not and/or cannot use these as part of a normal application. + +pub use super::tasks::{message as task_message, mixer}; + +pub use super::crypto::CryptoState; diff --git a/src/driver/crypto.rs b/src/driver/crypto.rs index cfbc81304..1ef763b89 100644 --- a/src/driver/crypto.rs +++ b/src/driver/crypto.rs @@ -169,9 +169,10 @@ impl CryptoMode { } } +#[allow(missing_docs)] #[derive(Clone, Copy, Debug, Eq, PartialEq)] #[non_exhaustive] -pub(crate) enum CryptoState { +pub enum CryptoState { Normal, Suffix, Lite(Wrapping), @@ -217,6 +218,7 @@ impl CryptoState { endpoint } + /// Returns the underlying (stateless) type of the active crypto mode. pub fn kind(&self) -> CryptoMode { CryptoMode::from(*self) } diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 08a1a1dc5..5e0ad7403 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -8,6 +8,9 @@ //! generation from being slowed down past its deadline, or from affecting other //! asynchronous tasks your bot must handle. +#[cfg(feature = "internals")] +pub mod bench_internals; + mod config; pub(crate) mod connection; mod crypto; @@ -16,7 +19,8 @@ pub(crate) mod tasks; pub use config::Config; use connection::error::{Error, Result}; -pub use crypto::*; +pub use crypto::CryptoMode; +pub(crate) use crypto::CryptoState; pub use decode_mode::DecodeMode; #[cfg(feature = "builtin-queue")] diff --git a/src/driver/tasks/events.rs b/src/driver/tasks/events.rs index bb28895e8..95ec677b7 100644 --- a/src/driver/tasks/events.rs +++ b/src/driver/tasks/events.rs @@ -93,9 +93,9 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver { info!("Event state for track {} of {} removed.", i, events.len()); - events.remove(i); - states.remove(i); - handles.remove(i); + events.swap_remove(i); + states.swap_remove(i); + handles.swap_remove(i); }, Ok(RemoveAllTracks) => { info!("Event state for all tracks removed."); diff --git a/src/driver/tasks/message/core.rs b/src/driver/tasks/message/core.rs index 270beecf2..dc02f541b 100644 --- a/src/driver/tasks/message/core.rs +++ b/src/driver/tasks/message/core.rs @@ -1,3 +1,5 @@ +#![allow(missing_docs)] + use crate::{ driver::{connection::error::Error, Config}, events::EventData, diff --git a/src/driver/tasks/message/events.rs b/src/driver/tasks/message/events.rs index 197ebe8a7..c7989eae8 100644 --- a/src/driver/tasks/message/events.rs +++ b/src/driver/tasks/message/events.rs @@ -1,10 +1,12 @@ +#![allow(missing_docs)] + use crate::{ events::{CoreContext, EventData, EventStore}, tracks::{LoopState, PlayMode, TrackHandle, TrackState}, }; use std::time::Duration; -pub(crate) enum EventMessage { +pub enum EventMessage { // Event related. // Track events should fire off the back of state changes. AddGlobalEvent(EventData), diff --git a/src/driver/tasks/message/mixer.rs b/src/driver/tasks/message/mixer.rs index 260f4008b..3c9b0a18d 100644 --- a/src/driver/tasks/message/mixer.rs +++ b/src/driver/tasks/message/mixer.rs @@ -1,3 +1,5 @@ +#![allow(missing_docs)] + use super::{Interconnect, UdpRxMessage, UdpTxMessage, WsMessage}; use crate::{ @@ -8,7 +10,7 @@ use crate::{ use flume::Sender; use xsalsa20poly1305::XSalsa20Poly1305 as Cipher; -pub(crate) struct MixerConnection { +pub struct MixerConnection { pub cipher: Cipher, pub crypto_state: CryptoState, pub udp_rx: Sender, @@ -22,7 +24,7 @@ impl Drop for MixerConnection { } } -pub(crate) enum MixerMessage { +pub enum MixerMessage { AddTrack(Track), SetTrack(Option), diff --git a/src/driver/tasks/message/mod.rs b/src/driver/tasks/message/mod.rs index 183183999..f769efbd7 100644 --- a/src/driver/tasks/message/mod.rs +++ b/src/driver/tasks/message/mod.rs @@ -1,3 +1,5 @@ +#![allow(missing_docs)] + mod core; mod events; mod mixer; @@ -5,13 +7,13 @@ mod udp_rx; mod udp_tx; mod ws; -pub(crate) use self::{core::*, events::*, mixer::*, udp_rx::*, udp_tx::*, ws::*}; +pub use self::{core::*, events::*, mixer::*, udp_rx::*, udp_tx::*, ws::*}; use flume::Sender; use tracing::info; #[derive(Clone, Debug)] -pub(crate) struct Interconnect { +pub struct Interconnect { pub core: Sender, pub events: Sender, pub mixer: Sender, diff --git a/src/driver/tasks/message/udp_rx.rs b/src/driver/tasks/message/udp_rx.rs index 453415d75..9034090ae 100644 --- a/src/driver/tasks/message/udp_rx.rs +++ b/src/driver/tasks/message/udp_rx.rs @@ -1,7 +1,9 @@ +#![allow(missing_docs)] + use super::Interconnect; use crate::driver::Config; -pub(crate) enum UdpRxMessage { +pub enum UdpRxMessage { SetConfig(Config), ReplaceInterconnect(Interconnect), diff --git a/src/driver/tasks/message/udp_tx.rs b/src/driver/tasks/message/udp_tx.rs index 349d52449..d3dbf3602 100644 --- a/src/driver/tasks/message/udp_tx.rs +++ b/src/driver/tasks/message/udp_tx.rs @@ -1,3 +1,5 @@ +#![allow(missing_docs)] + pub enum UdpTxMessage { Packet(Vec), // TODO: do something cheaper. Poison, diff --git a/src/driver/tasks/message/ws.rs b/src/driver/tasks/message/ws.rs index 7ce5f0703..1cd7e49e6 100644 --- a/src/driver/tasks/message/ws.rs +++ b/src/driver/tasks/message/ws.rs @@ -1,8 +1,10 @@ +#![allow(missing_docs)] + use super::Interconnect; use crate::ws::WsStream; #[allow(dead_code)] -pub(crate) enum WsMessage { +pub enum WsMessage { Ws(Box), ReplaceInterconnect(Interconnect), SetKeepalive(f64), diff --git a/src/driver/tasks/mixer.rs b/src/driver/tasks/mixer.rs index ec8a121a8..d552e1232 100644 --- a/src/driver/tasks/mixer.rs +++ b/src/driver/tasks/mixer.rs @@ -22,23 +22,24 @@ use tokio::runtime::Handle; use tracing::{error, instrument}; use xsalsa20poly1305::TAG_SIZE; -struct Mixer { - async_handle: Handle, - bitrate: Bitrate, - config: Config, - conn_active: Option, - deadline: Instant, - encoder: OpusEncoder, - interconnect: Interconnect, - mix_rx: Receiver, - muted: bool, - packet: [u8; VOICE_PACKET_MAX], - prevent_events: bool, - silence_frames: u8, - sleeper: SpinSleeper, - soft_clip: SoftClip, - tracks: Vec, - ws: Option>, +pub struct Mixer { + pub async_handle: Handle, + pub bitrate: Bitrate, + pub config: Config, + pub conn_active: Option, + pub deadline: Instant, + pub encoder: OpusEncoder, + pub interconnect: Interconnect, + pub mix_rx: Receiver, + pub muted: bool, + pub packet: [u8; VOICE_PACKET_MAX], + pub prevent_events: bool, + pub silence_frames: u8, + pub skip_sleep: bool, + pub sleeper: SpinSleeper, + pub soft_clip: SoftClip, + pub tracks: Vec, + pub ws: Option>, } fn new_encoder(bitrate: Bitrate) -> Result { @@ -49,7 +50,7 @@ fn new_encoder(bitrate: Bitrate) -> Result { } impl Mixer { - fn new( + pub fn new( mix_rx: Receiver, async_handle: Handle, interconnect: Interconnect, @@ -86,6 +87,7 @@ impl Mixer { packet, prevent_events: false, silence_frames: 0, + skip_sleep: false, sleeper: Default::default(), soft_clip, tracks, @@ -288,70 +290,6 @@ impl Mixer { Ok(()) } - #[inline] - fn mix_tracks<'a>( - &mut self, - opus_frame: &'a mut [u8], - mix_buffer: &mut [f32; STEREO_FRAME_SIZE], - ) -> Result<(usize, &'a [u8])> { - let mut len = 0; - - // Opus frame passthrough. - // This requires that we have only one track, who has volume 1.0, and an - // Opus codec type. - let do_passthrough = self.tracks.len() == 1 && { - let track = &self.tracks[0]; - (track.volume - 1.0).abs() < f32::EPSILON && track.source.supports_passthrough() - }; - - for (i, track) in self.tracks.iter_mut().enumerate() { - let vol = track.volume; - let stream = &mut track.source; - - if track.playing != PlayMode::Play { - continue; - } - - let (temp_len, opus_len) = if do_passthrough { - (0, track.source.read_opus_frame(opus_frame).ok()) - } else { - (stream.mix(mix_buffer, vol), None) - }; - - len = len.max(temp_len); - if temp_len > 0 || opus_len.is_some() { - track.step_frame(); - } else if track.do_loop() { - if let Ok(time) = track.seek_time(Default::default()) { - // have to reproduce self.fire_event here - // to circumvent the borrow checker's lack of knowledge. - // - // In event of error, one of the later event calls will - // trigger the event thread rebuild: it is more prudent that - // the mixer works as normal right now. - if !self.prevent_events { - let _ = self.interconnect.events.send(EventMessage::ChangeState( - i, - TrackStateChange::Position(time), - )); - let _ = self.interconnect.events.send(EventMessage::ChangeState( - i, - TrackStateChange::Loops(track.loops, false), - )); - } - } - } else { - track.end(); - } - - if let Some(opus_len) = opus_len { - return Ok((STEREO_FRAME_SIZE, &opus_frame[..opus_len])); - } - } - - Ok((len, &opus_frame[..0])) - } - #[inline] fn audio_commands_events(&mut self) -> Result<()> { // Apply user commands. @@ -374,7 +312,7 @@ impl Mixer { if track.playing.is_done() { let p_state = track.playing(); - self.tracks.remove(i); + self.tracks.swap_remove(i); to_remove.push(i); self.fire_event(EventMessage::ChangeState( i, @@ -398,42 +336,65 @@ impl Mixer { #[inline] fn march_deadline(&mut self) { + if self.skip_sleep { + return; + } + self.sleeper .sleep(self.deadline.saturating_duration_since(Instant::now())); self.deadline += TIMESTEP_LENGTH; } - fn cycle(&mut self) -> Result<()> { + pub fn cycle(&mut self) -> Result<()> { if self.conn_active.is_none() { self.march_deadline(); return Ok(()); } - // TODO: can we make opus_frame_backing *actually* a view over - // some region of self.packet, derived using the encryption mode? - // This saves a copy on Opus passthrough. - let mut opus_frame_backing = [0u8; STEREO_FRAME_SIZE]; let mut mix_buffer = [0f32; STEREO_FRAME_SIZE]; - // Slice which mix tracks may use to passthrough direct Opus frames. - let mut opus_space = &mut opus_frame_backing[..]; - // Walk over all the audio files, combining into one audio frame according // to volume, play state, etc. - let (mut len, mut opus_frame) = self.mix_tracks(&mut opus_space, &mut mix_buffer)?; + let mut mix_len = { + let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( + "FATAL: Too few bytes in self.packet for RTP header.\ + (Blame: VOICE_PACKET_MAX?)", + ); + + let payload = rtp.payload_mut(); + + // self.mix_tracks(&mut payload[TAG_SIZE..], &mut mix_buffer) + mix_tracks( + &mut payload[TAG_SIZE..], + &mut mix_buffer, + &mut self.tracks, + &self.interconnect, + self.prevent_events, + ) + }; self.soft_clip.apply(&mut mix_buffer[..])?; if self.muted { - len = 0; + mix_len = MixType::MixedPcm(0); } - if len == 0 { + if mix_len == MixType::MixedPcm(0) { if self.silence_frames > 0 { self.silence_frames -= 1; // Explicit "Silence" frame. - opus_frame = &SILENT_FRAME[..]; + let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( + "FATAL: Too few bytes in self.packet for RTP header.\ + (Blame: VOICE_PACKET_MAX?)", + ); + + let payload = rtp.payload_mut(); + + (&mut payload[TAG_SIZE..TAG_SIZE + SILENT_FRAME.len()]) + .copy_from_slice(&SILENT_FRAME[..]); + + mix_len = MixType::Passthrough(SILENT_FRAME.len()); } else { // Per official guidelines, send 5x silence BEFORE we stop speaking. if let Some(ws) = &self.ws { @@ -457,7 +418,7 @@ impl Mixer { } self.march_deadline(); - self.prep_and_send_packet(mix_buffer, opus_frame)?; + self.prep_and_send_packet(mix_buffer, mix_len)?; Ok(()) } @@ -466,7 +427,8 @@ impl Mixer { self.encoder.set_bitrate(bitrate).map_err(Into::into) } - fn prep_and_send_packet(&mut self, buffer: [f32; 1920], opus_frame: &[u8]) -> Result<()> { + #[inline] + fn prep_and_send_packet(&mut self, buffer: [f32; 1920], mix_len: MixType) -> Result<()> { let conn = self .conn_active .as_mut() @@ -481,16 +443,15 @@ impl Mixer { let payload = rtp.payload_mut(); let crypto_mode = conn.crypto_state.kind(); - let payload_len = if opus_frame.is_empty() { - let total_payload_space = payload.len() - crypto_mode.payload_suffix_len(); - self.encoder.encode_float( - &buffer[..STEREO_FRAME_SIZE], - &mut payload[TAG_SIZE..total_payload_space], - )? - } else { - let len = opus_frame.len(); - payload[TAG_SIZE..TAG_SIZE + len].clone_from_slice(opus_frame); - len + let payload_len = match mix_len { + MixType::Passthrough(opus_len) => opus_len, + MixType::MixedPcm(_samples) => { + let total_payload_space = payload.len() - crypto_mode.payload_suffix_len(); + self.encoder.encode_float( + &buffer[..STEREO_FRAME_SIZE], + &mut payload[TAG_SIZE..total_payload_space], + )? + }, }; let final_payload_size = conn @@ -523,6 +484,78 @@ impl Mixer { } } +#[derive(Debug, Eq, PartialEq)] +enum MixType { + Passthrough(usize), + MixedPcm(usize), +} + +#[inline] +fn mix_tracks<'a>( + opus_frame: &'a mut [u8], + mix_buffer: &mut [f32; STEREO_FRAME_SIZE], + tracks: &mut Vec, + interconnect: &Interconnect, + prevent_events: bool, +) -> MixType { + let mut len = 0; + + // Opus frame passthrough. + // This requires that we have only one track, who has volume 1.0, and an + // Opus codec type. + let do_passthrough = tracks.len() == 1 && { + let track = &tracks[0]; + (track.volume - 1.0).abs() < f32::EPSILON && track.source.supports_passthrough() + }; + + for (i, track) in tracks.iter_mut().enumerate() { + let vol = track.volume; + let stream = &mut track.source; + + if track.playing != PlayMode::Play { + continue; + } + + let (temp_len, opus_len) = if do_passthrough { + (0, track.source.read_opus_frame(opus_frame).ok()) + } else { + (stream.mix(mix_buffer, vol), None) + }; + + len = len.max(temp_len); + if temp_len > 0 || opus_len.is_some() { + track.step_frame(); + } else if track.do_loop() { + if let Ok(time) = track.seek_time(Default::default()) { + // have to reproduce self.fire_event here + // to circumvent the borrow checker's lack of knowledge. + // + // In event of error, one of the later event calls will + // trigger the event thread rebuild: it is more prudent that + // the mixer works as normal right now. + if !prevent_events { + let _ = interconnect.events.send(EventMessage::ChangeState( + i, + TrackStateChange::Position(time), + )); + let _ = interconnect.events.send(EventMessage::ChangeState( + i, + TrackStateChange::Loops(track.loops, false), + )); + } + } + } else { + track.end(); + } + + if let Some(opus_len) = opus_len { + return MixType::Passthrough(opus_len); + } + } + + MixType::MixedPcm(len) +} + /// The mixing thread is a synchronous context due to its compute-bound nature. /// /// We pass in an async handle for the benefit of some Input classes (e.g., restartables) diff --git a/src/driver/tasks/mod.rs b/src/driver/tasks/mod.rs index fe0257cda..7dd8960e3 100644 --- a/src/driver/tasks/mod.rs +++ b/src/driver/tasks/mod.rs @@ -1,7 +1,9 @@ +#![allow(missing_docs)] + pub mod error; mod events; -pub(crate) mod message; -mod mixer; +pub mod message; +pub mod mixer; pub(crate) mod udp_rx; pub(crate) mod udp_tx; pub(crate) mod ws; diff --git a/src/events/context.rs b/src/events/context.rs index 9ff0268d2..7b9809fc5 100644 --- a/src/events/context.rs +++ b/src/events/context.rs @@ -72,7 +72,7 @@ pub enum EventContext<'a> { } #[derive(Clone, Debug)] -pub(crate) enum CoreContext { +pub enum CoreContext { SpeakingStateUpdate(Speaking), SpeakingUpdate { ssrc: u32, diff --git a/src/events/mod.rs b/src/events/mod.rs index ea885c080..5bd59cb17 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -7,7 +7,8 @@ mod store; mod track; mod untimed; -pub use self::{context::*, core::*, data::*, store::*, track::*, untimed::*}; +pub use self::{context::EventContext, core::*, data::*, store::*, track::*, untimed::*}; +pub(crate) use context::CoreContext; use async_trait::async_trait; use std::time::Duration; diff --git a/src/input/mod.rs b/src/input/mod.rs index 088cac292..0ba639691 100644 --- a/src/input/mod.rs +++ b/src/input/mod.rs @@ -85,7 +85,7 @@ use tracing::{debug, error}; #[derive(Debug)] pub struct Input { /// Information about the played source. - pub metadata: Metadata, + pub metadata: Box, /// Indicates whether `source` is stereo or mono. pub stereo: bool, /// Underlying audio data bytestream. @@ -119,7 +119,7 @@ impl Input { metadata: Option, ) -> Self { Input { - metadata: metadata.unwrap_or_default(), + metadata: metadata.unwrap_or_default().into(), stereo, reader, kind, diff --git a/src/manager.rs b/src/manager.rs index ca78a9192..7bdd28af6 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -100,7 +100,7 @@ impl Songbird { /// If this struct is already initialised (e.g., from [`::twilight`]), /// or a previous call, then this function is a no-op. /// - /// [`::twilight`]: Songbird::twilight + /// [`::twilight`]: #method.twilight pub fn initialise_client_data>(&self, shard_count: u64, user_id: U) { let mut client_data = self.client_data.write(); diff --git a/src/tracks/handle.rs b/src/tracks/handle.rs index 71bd40a1d..aaf92c0b3 100644 --- a/src/tracks/handle.rs +++ b/src/tracks/handle.rs @@ -17,10 +17,15 @@ use uuid::Uuid; /// /// [`Track`]: Track pub struct TrackHandle { + inner: Arc, +} + +#[derive(Clone, Debug)] +struct InnerHandle { command_channel: UnboundedSender, seekable: bool, uuid: Uuid, - metadata: Arc, + metadata: Box, } impl TrackHandle { @@ -32,14 +37,16 @@ impl TrackHandle { command_channel: UnboundedSender, seekable: bool, uuid: Uuid, - metadata: Metadata, + metadata: Box, ) -> Self { - Self { + let inner = Arc::new(InnerHandle { command_channel, seekable, uuid, - metadata: Arc::new(metadata), - } + metadata, + }); + + Self { inner } } /// Unpauses an audio track. @@ -75,7 +82,7 @@ impl TrackHandle { /// [`seek_time`]: TrackHandle::seek_time /// [`Input`]: crate::input::Input pub fn is_seekable(&self) -> bool { - self.seekable + self.inner.seekable } /// Seeks along the track to the specified position. @@ -86,7 +93,7 @@ impl TrackHandle { /// [`Input`]: crate::input::Input /// [`TrackError::SeekUnsupported`]: TrackError::SeekUnsupported pub fn seek_time(&self, position: Duration) -> TrackResult<()> { - if self.seekable { + if self.is_seekable() { self.send(TrackCommand::Seek(position)) } else { Err(TrackError::SeekUnsupported) @@ -139,7 +146,7 @@ impl TrackHandle { /// [`Input`]: crate::input::Input /// [`TrackError::SeekUnsupported`]: TrackError::SeekUnsupported pub fn enable_loop(&self) -> TrackResult<()> { - if self.seekable { + if self.is_seekable() { self.send(TrackCommand::Loop(LoopState::Infinite)) } else { Err(TrackError::SeekUnsupported) @@ -154,7 +161,7 @@ impl TrackHandle { /// [`Input`]: crate::input::Input /// [`TrackError::SeekUnsupported`]: TrackError::SeekUnsupported pub fn disable_loop(&self) -> TrackResult<()> { - if self.seekable { + if self.is_seekable() { self.send(TrackCommand::Loop(LoopState::Finite(0))) } else { Err(TrackError::SeekUnsupported) @@ -169,7 +176,7 @@ impl TrackHandle { /// [`Input`]: crate::input::Input /// [`TrackError::SeekUnsupported`]: TrackError::SeekUnsupported pub fn loop_for(&self, count: usize) -> TrackResult<()> { - if self.seekable { + if self.is_seekable() { self.send(TrackCommand::Loop(LoopState::Finite(count))) } else { Err(TrackError::SeekUnsupported) @@ -178,7 +185,7 @@ impl TrackHandle { /// Returns this handle's (and track's) unique identifier. pub fn uuid(&self) -> Uuid { - self.uuid + self.inner.uuid } /// Returns the metadata stored in the handle. @@ -188,8 +195,8 @@ impl TrackHandle { /// read-only from then on. /// /// [`Input`]: crate::input::Input - pub fn metadata(&self) -> Arc { - self.metadata.clone() + pub fn metadata(&self) -> &Metadata { + &self.inner.metadata } #[inline] @@ -199,7 +206,8 @@ impl TrackHandle { pub fn send(&self, cmd: TrackCommand) -> TrackResult<()> { // As the send channels are unbounded, we can be reasonably certain // that send failure == cancellation. - self.command_channel + self.inner + .command_channel .send(cmd) .map_err(|_e| TrackError::Finished) } diff --git a/src/tracks/queue.rs b/src/tracks/queue.rs index e62cd0da1..c4560b614 100644 --- a/src/tracks/queue.rs +++ b/src/tracks/queue.rs @@ -54,7 +54,7 @@ use tracing::{info, warn}; /// ``` /// /// [`TrackEvent`]: crate::events::TrackEvent -/// [`Driver::queue`]: crate::driver::Driver::queue +/// [`Driver::queue`]: crate::driver::Driver #[derive(Clone, Debug, Default)] pub struct TrackQueue { // NOTE: the choice of a parking lot mutex is quite deliberate