diff --git a/Cargo.toml b/Cargo.toml index 48d0d4724..02536845d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ version = "0.3.0" async-trait = { optional = true, version = "0.1" } audiopus = { optional = true, version = "0.3.0-rc.0" } byteorder = { optional = true, version = "1" } +bytes = { optional = true, version = "1" } dashmap = { optional = true, version = "5" } derivative = "2" discortp = { default-features = false, features = ["discord", "pnet", "rtp"], optional = true, version = "0.5" } @@ -145,7 +146,7 @@ twilight = ["dep:twilight-gateway","dep:twilight-model"] # Behaviour altering features. builtin-queue = [] -receive = ["discortp?/demux", "discortp?/rtcp"] +receive = ["dep:bytes", "discortp?/demux", "discortp?/rtcp"] # Used for docgen/testing/benchmarking. full-doc = ["default", "twilight", "builtin-queue", "receive"] diff --git a/examples/serenity/voice_receive/Cargo.toml b/examples/serenity/voice_receive/Cargo.toml index f0eec66ad..a99363bdb 100644 --- a/examples/serenity/voice_receive/Cargo.toml +++ b/examples/serenity/voice_receive/Cargo.toml @@ -5,6 +5,7 @@ authors = ["my name "] edition = "2021" [dependencies] +dashmap = "5" tracing = "0.1" tracing-subscriber = "0.2" tracing-futures = "0.2" diff --git a/examples/serenity/voice_receive/src/main.rs b/examples/serenity/voice_receive/src/main.rs index 288647ae0..6ff485017 100644 --- a/examples/serenity/voice_receive/src/main.rs +++ b/examples/serenity/voice_receive/src/main.rs @@ -6,7 +6,15 @@ //! git = "https://github.com/serenity-rs/serenity.git" //! features = ["client", "standard_framework", "voice"] //! ``` -use std::env; +use std::{ + env, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +use dashmap::DashMap; use serenity::{ async_trait, @@ -26,7 +34,11 @@ use serenity::{ use songbird::{ driver::DecodeMode, - model::payload::{ClientDisconnect, Speaking}, + model::{ + id::UserId, + payload::{ClientDisconnect, Speaking}, + }, + packet::Packet, Config, CoreEvent, Event, @@ -44,13 +56,26 @@ impl EventHandler for Handler { } } -struct Receiver; +#[derive(Clone)] +struct Receiver { + inner: Arc, +} + +struct InnerReceiver { + last_tick_was_empty: AtomicBool, + known_ssrcs: DashMap, +} impl Receiver { pub fn new() -> Self { // You can manage state here, such as a buffer of audio packet bytes so // you can later store them in intervals. - Self {} + Self { + inner: Arc::new(InnerReceiver { + last_tick_was_empty: AtomicBool::default(), + known_ssrcs: DashMap::new(), + }), + } } } @@ -81,34 +106,73 @@ impl VoiceEventHandler for Receiver { "Speaking state update: user {:?} has SSRC {:?}, using {:?}", user_id, ssrc, speaking, ); + + if let Some(user) = user_id { + self.inner.known_ssrcs.insert(*ssrc, *user); + } }, - Ctx::SpeakingUpdate(data) => { - // You can implement logic here which reacts to a user starting - // or stopping speaking, and to map their SSRC to User ID. - println!( - "Source {} has {} speaking.", - data.ssrc, - if data.speaking { "started" } else { "stopped" }, - ); + Ctx::VoiceTick(tick) => { + let speaking = tick.speaking.len(); + let total_participants = speaking + tick.silent.len(); + let last_tick_was_empty = self.inner.last_tick_was_empty.load(Ordering::SeqCst); + + if speaking == 0 && !last_tick_was_empty { + println!("No speakers"); + + self.inner.last_tick_was_empty.store(true, Ordering::SeqCst); + } else if speaking != 0 { + self.inner + .last_tick_was_empty + .store(false, Ordering::SeqCst); + + println!("Voice tick ({speaking}/{total_participants} live):"); + + // You can also examine tick.silent to see users who are present + // but haven't spoken in this tick. + for (ssrc, data) in &tick.speaking { + let user_id_str = if let Some(id) = self.inner.known_ssrcs.get(ssrc) { + format!("{:?}", *id) + } else { + "?".into() + }; + + // This field should *always* exist under DecodeMode::Decode. + // The `else` allows you to see how the other modes are affected. + if let Some(decoded_voice) = data.decoded_voice.as_ref() { + let voice_len = decoded_voice.len(); + let audio_str = format!( + "first samples from {}: {:?}", + voice_len, + &decoded_voice[..voice_len.min(5)] + ); + + if let Some(packet) = &data.packet { + let rtp = packet.rtp(); + println!( + "\t{ssrc}/{user_id_str}: packet seq {} ts {} -- {audio_str}", + rtp.get_sequence().0, + rtp.get_timestamp().0 + ); + } else { + println!("\t{ssrc}/{user_id_str}: Missed packet -- {audio_str}"); + } + } else { + println!("\t{ssrc}/{user_id_str}: Decode disabled."); + } + } + } }, - Ctx::VoicePacket(data) => { + Ctx::RtpPacket(packet) => { // An event which fires for every received audio packet, // containing the decoded data. - if let Some(audio) = data.audio { - println!( - "Audio packet's first 5 samples: {:?}", - audio.get(..5.min(audio.len())) - ); - println!( - "Audio packet sequence {:05} has {:04} bytes (decompressed from {}), SSRC {}", - data.packet.sequence.0, - audio.len() * std::mem::size_of::(), - data.packet.payload.len(), - data.packet.ssrc, - ); - } else { - println!("RTP packet, but no audio. Driver may not be configured to decode."); - } + let rtp = packet.rtp(); + println!( + "Received voice packet from SSRC {}, sequence {}, timestamp {} -- {}B long", + rtp.get_ssrc(), + rtp.get_sequence().0, + rtp.get_timestamp().0, + rtp.payload().len() + ); }, Ctx::RtcpPacket(data) => { // An event which fires for every received rtcp packet, @@ -195,15 +259,13 @@ async fn join(ctx: &Context, msg: &Message, mut args: Args) -> CommandResult { // NOTE: this skips listening for the actual connection result. let mut handler = handler_lock.lock().await; - handler.add_global_event(CoreEvent::SpeakingStateUpdate.into(), Receiver::new()); - - handler.add_global_event(CoreEvent::SpeakingUpdate.into(), Receiver::new()); - - handler.add_global_event(CoreEvent::VoicePacket.into(), Receiver::new()); - - handler.add_global_event(CoreEvent::RtcpPacket.into(), Receiver::new()); + let evt_receiver = Receiver::new(); - handler.add_global_event(CoreEvent::ClientDisconnect.into(), Receiver::new()); + handler.add_global_event(CoreEvent::SpeakingStateUpdate.into(), evt_receiver.clone()); + handler.add_global_event(CoreEvent::RtpPacket.into(), evt_receiver.clone()); + handler.add_global_event(CoreEvent::RtcpPacket.into(), evt_receiver.clone()); + handler.add_global_event(CoreEvent::ClientDisconnect.into(), evt_receiver.clone()); + handler.add_global_event(CoreEvent::VoiceTick.into(), evt_receiver); check_msg( msg.channel_id diff --git a/src/config.rs b/src/config.rs index bc676433c..6a8ec3796 100644 --- a/src/config.rs +++ b/src/config.rs @@ -13,6 +13,8 @@ use crate::driver::test_config::*; use symphonia::core::{codecs::CodecRegistry, probe::Probe}; use derivative::Derivative; +#[cfg(feature = "receive")] +use std::num::NonZeroUsize; use std::time::Duration; /// Configuration for drivers and calls. @@ -35,10 +37,11 @@ pub struct Config { #[cfg(all(feature = "driver", feature = "receive"))] /// Configures whether decoding and decryption occur for all received packets. /// - /// If voice receiving voice packets, generally you should choose [`DecodeMode::Decode`]. - /// [`DecodeMode::Decrypt`] is intended for users running their own selective decoding, - /// who rely upon [user speaking events], or who need to inspect Opus packets. - /// If you're certain you will never need any RT(C)P events, then consider [`DecodeMode::Pass`]. + /// If receiving and using voice packets, generally you should choose [`DecodeMode::Decode`]. + /// [`DecodeMode::Decrypt`] is intended for users running their own selective decoding or + /// who need to inspect Opus packets. [User speaking state] can still be seen using [`DecodeMode::Pass`]. + /// If you're certain you will never need any RT(C)P events, then consider building without + /// the `"receive"` feature for extra performance. /// /// Defaults to [`DecodeMode::Decrypt`]. This is due to per-packet decoding costs, /// which most users will not want to pay, but allowing speaking events which are commonly used. @@ -46,7 +49,7 @@ pub struct Config { /// [`DecodeMode::Decode`]: DecodeMode::Decode /// [`DecodeMode::Decrypt`]: DecodeMode::Decrypt /// [`DecodeMode::Pass`]: DecodeMode::Pass - /// [user speaking events]: crate::events::CoreEvent::SpeakingUpdate + /// [User speaking state]: crate::events::CoreEvent::VoiceTick pub decode_mode: DecodeMode, #[cfg(all(feature = "driver", feature = "receive"))] @@ -56,6 +59,26 @@ pub struct Config { /// Defaults to 1 minute. pub decode_state_timeout: Duration, + #[cfg(all(feature = "driver", feature = "receive"))] + /// Configures the number of audio packets to buffer for each user before playout. + /// + /// A playout buffer allows Songbird to smooth out jitter in audio packet arrivals, + /// as well as to correct for reordering of packets by the network. + /// + /// This does not affect the arrival of raw packet events. + /// + /// Defaults to 5 packets (100ms). + pub playout_buffer_length: NonZeroUsize, + + #[cfg(all(feature = "driver", feature = "receive"))] + /// Configures the initial amount of extra space allocated to handle packet bursts. + /// + /// Each SSRC's receive buffer will start at capacity `playout_buffer_length + + /// playout_spike_length`, up to a maximum 64 packets. + /// + /// Defaults to 3 packets (thus capacity defaults to 8). + pub playout_spike_length: usize, + #[cfg(feature = "gateway")] /// Configures the amount of time to wait for Discord to reply with connection information /// if [`Call::join`]/[`join_gateway`] are used. @@ -174,6 +197,10 @@ impl Default for Config { decode_mode: DecodeMode::Decrypt, #[cfg(all(feature = "driver", feature = "receive"))] decode_state_timeout: Duration::from_secs(60), + #[cfg(all(feature = "driver", feature = "receive"))] + playout_buffer_length: NonZeroUsize::new(5).unwrap(), + #[cfg(all(feature = "driver", feature = "receive"))] + playout_spike_length: 3, #[cfg(feature = "gateway")] gateway_timeout: Some(Duration::from_secs(10)), #[cfg(feature = "driver")] @@ -227,6 +254,22 @@ impl Config { self } + #[cfg(feature = "receive")] + /// Sets this `Config`'s playout buffer length, in packets. + #[must_use] + pub fn playout_buffer_length(mut self, playout_buffer_length: NonZeroUsize) -> Self { + self.playout_buffer_length = playout_buffer_length; + self + } + + #[cfg(feature = "receive")] + /// Sets this `Config`'s additional pre-allocated space to handle bursty audio packets. + #[must_use] + pub fn playout_spike_length(mut self, playout_spike_length: usize) -> Self { + self.playout_spike_length = playout_spike_length; + self + } + /// Sets this `Config`'s audio mixing channel count. #[must_use] pub fn mix_mode(mut self, mix_mode: MixMode) -> Self { diff --git a/src/driver/decode_mode.rs b/src/driver/decode_mode.rs index 172159260..a3b602b48 100644 --- a/src/driver/decode_mode.rs +++ b/src/driver/decode_mode.rs @@ -6,13 +6,6 @@ pub enum DecodeMode { /// changes applied. /// /// No CPU work involved. - /// - /// *BEWARE: this will almost certainly break [user speaking events]. - /// Silent frame detection only works if extensions can be parsed or - /// are not present, as they are encrypted. - /// This event requires such functionality.* - /// - /// [user speaking events]: crate::events::CoreEvent::SpeakingUpdate Pass, /// Decrypts the body of each received packet. /// diff --git a/src/driver/tasks/mixer/mod.rs b/src/driver/tasks/mixer/mod.rs index 6d1792688..18892e689 100644 --- a/src/driver/tasks/mixer/mod.rs +++ b/src/driver/tasks/mixer/mod.rs @@ -657,6 +657,8 @@ impl Mixer { None => {}, } + self.advance_rtp_timestamp(); + return Ok(()); } } else { @@ -809,6 +811,18 @@ impl Mixer { rtp.set_timestamp(rtp.get_timestamp() + MONO_FRAME_SIZE as u32); } + #[inline] + // Even if we don't send a packet, we *do* need to keep advancing the timestamp + // to make it easier for a receiver to reorder packets and compute jitter measures + // wrt. our clock difference vs. theirs. + fn advance_rtp_timestamp(&mut self) { + let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( + "FATAL: Too few bytes in self.packet for RTP header.\ + (Blame: VOICE_PACKET_MAX?)", + ); + rtp.set_timestamp(rtp.get_timestamp() + MONO_FRAME_SIZE as u32); + } + #[inline] fn check_and_send_keepalive(&mut self) -> Result<()> { if let Some(conn) = self.conn_active.as_mut() { diff --git a/src/driver/tasks/udp_rx.rs b/src/driver/tasks/udp_rx.rs deleted file mode 100644 index ce276a025..000000000 --- a/src/driver/tasks/udp_rx.rs +++ /dev/null @@ -1,471 +0,0 @@ -use super::{ - error::{Error, Result}, - message::*, - Config, -}; -use crate::{ - constants::*, - driver::{CryptoMode, DecodeMode}, - events::{internal_data::*, CoreContext}, -}; -use audiopus::{ - coder::Decoder as OpusDecoder, - error::{Error as OpusError, ErrorCode}, - packet::Packet as OpusPacket, - Channels, -}; -use discortp::{ - demux::{self, DemuxedMut}, - rtp::{RtpExtensionPacket, RtpPacket}, - FromPacket, - Packet, - PacketSize, -}; -use flume::Receiver; -use std::{collections::HashMap, convert::TryInto, sync::Arc, time::Duration}; -use tokio::{net::UdpSocket, select, time::Instant}; -use tracing::{error, instrument, trace, warn}; -use xsalsa20poly1305::XSalsa20Poly1305 as Cipher; - -#[derive(Debug)] -struct SsrcState { - silent_frame_count: u16, - decoder: OpusDecoder, - last_seq: u16, - decode_size: PacketDecodeSize, - prune_time: Instant, - disconnected: bool, -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -enum PacketDecodeSize { - /// Minimum frame size on Discord. - TwentyMillis, - /// Hybrid packet, sent by Firefox web client. - /// - /// Likely 20ms frame + 10ms frame. - ThirtyMillis, - /// Next largest frame size. - FortyMillis, - /// Maximum Opus frame size. - SixtyMillis, - /// Maximum Opus packet size: 120ms. - Max, -} - -impl PacketDecodeSize { - fn bump_up(self) -> Self { - match self { - Self::TwentyMillis => Self::ThirtyMillis, - Self::ThirtyMillis => Self::FortyMillis, - Self::FortyMillis => Self::SixtyMillis, - Self::SixtyMillis | Self::Max => Self::Max, - } - } - - fn can_bump_up(self) -> bool { - self != Self::Max - } - - fn len(self) -> usize { - match self { - Self::TwentyMillis => STEREO_FRAME_SIZE, - Self::ThirtyMillis => (STEREO_FRAME_SIZE / 2) * 3, - Self::FortyMillis => 2 * STEREO_FRAME_SIZE, - Self::SixtyMillis => 3 * STEREO_FRAME_SIZE, - Self::Max => 6 * STEREO_FRAME_SIZE, - } - } -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -enum SpeakingDelta { - Same, - Start, - Stop, -} - -impl SsrcState { - fn new(pkt: &RtpPacket<'_>, state_timeout: Duration) -> Self { - Self { - silent_frame_count: 5, // We do this to make the first speech packet fire an event. - decoder: OpusDecoder::new(SAMPLE_RATE, Channels::Stereo) - .expect("Failed to create new Opus decoder for source."), - last_seq: pkt.get_sequence().into(), - decode_size: PacketDecodeSize::TwentyMillis, - prune_time: Instant::now() + state_timeout, - disconnected: false, - } - } - - fn refresh_timer(&mut self, state_timeout: Duration) { - if !self.disconnected { - self.prune_time = Instant::now() + state_timeout; - } - } - - fn process( - &mut self, - pkt: &RtpPacket<'_>, - data_offset: usize, - data_trailer: usize, - decode_mode: DecodeMode, - decrypted: bool, - ) -> Result<(SpeakingDelta, Option>)> { - let new_seq: u16 = pkt.get_sequence().into(); - let payload_len = pkt.payload().len(); - - let extensions = pkt.get_extension() != 0; - let seq_delta = new_seq.wrapping_sub(self.last_seq); - Ok(if seq_delta >= (1 << 15) { - // Overflow, reordered (previously missing) packet. - (SpeakingDelta::Same, Some(vec![])) - } else { - self.last_seq = new_seq; - let missed_packets = seq_delta.saturating_sub(1); - - // Note: we still need to handle this for non-decoded. - // This is mainly because packet events and speaking events can be handed to the - // user. - let (audio, pkt_size) = if decode_mode.should_decrypt() && decrypted { - self.scan_and_decode( - &pkt.payload()[data_offset..payload_len - data_trailer], - extensions, - missed_packets, - decode_mode == DecodeMode::Decode, - )? - } else { - // The latter part is an upper bound, as we cannot determine - // how long packet extensions are. - // WIthout decryption, speaking detection is thus broken. - (None, payload_len - data_offset - data_trailer) - }; - - let delta = if pkt_size == SILENT_FRAME.len() { - // Frame is silent. - let old = self.silent_frame_count; - self.silent_frame_count = - self.silent_frame_count.saturating_add(1 + missed_packets); - - if self.silent_frame_count >= 5 && old < 5 { - SpeakingDelta::Stop - } else { - SpeakingDelta::Same - } - } else { - // Frame has meaningful audio. - let out = if self.silent_frame_count >= 5 { - SpeakingDelta::Start - } else { - SpeakingDelta::Same - }; - self.silent_frame_count = 0; - out - }; - - (delta, audio) - }) - } - - fn scan_and_decode( - &mut self, - data: &[u8], - extension: bool, - missed_packets: u16, - decode: bool, - ) -> Result<(Option>, usize)> { - let start = if extension { - RtpExtensionPacket::new(data) - .map(|pkt| pkt.packet_size()) - .ok_or_else(|| { - error!("Extension packet indicated, but insufficient space."); - Error::IllegalVoicePacket - }) - } else { - Ok(0) - }?; - - let pkt = if decode { - let mut out = vec![0; self.decode_size.len()]; - - for _ in 0..missed_packets { - let missing_frame: Option = None; - let dest_samples = (&mut out[..]) - .try_into() - .expect("Decode logic will cap decode buffer size at i32::MAX."); - if let Err(e) = self.decoder.decode(missing_frame, dest_samples, false) { - warn!("Issue while decoding for missed packet: {:?}.", e); - } - } - - // In general, we should expect 20 ms frames. - // However, Discord occasionally like to surprise us with something bigger. - // This is *sender-dependent behaviour*. - // - // This should scan up to find the "correct" size that a source is using, - // and then remember that. - loop { - let tried_audio_len = self.decoder.decode( - Some(data[start..].try_into()?), - (&mut out[..]).try_into()?, - false, - ); - match tried_audio_len { - Ok(audio_len) => { - // Decoding to stereo: audio_len refers to sample count irrespective of channel count. - // => multiply by number of channels. - out.truncate(2 * audio_len); - - break; - }, - Err(OpusError::Opus(ErrorCode::BufferTooSmall)) => { - if self.decode_size.can_bump_up() { - self.decode_size = self.decode_size.bump_up(); - out = vec![0; self.decode_size.len()]; - } else { - error!("Received packet larger than Opus standard maximum,"); - return Err(Error::IllegalVoicePacket); - } - }, - Err(e) => { - error!("Failed to decode received packet: {:?}.", e); - return Err(e.into()); - }, - } - } - - Some(out) - } else { - None - }; - - Ok((pkt, data.len() - start)) - } -} - -struct UdpRx { - cipher: Cipher, - decoder_map: HashMap, - config: Config, - packet_buffer: [u8; VOICE_PACKET_MAX], - rx: Receiver, - ssrc_signalling: Arc, - udp_socket: UdpSocket, -} - -impl UdpRx { - #[instrument(skip(self))] - async fn run(&mut self, interconnect: &mut Interconnect) { - let mut cleanup_time = Instant::now(); - - loop { - select! { - Ok((len, _addr)) = self.udp_socket.recv_from(&mut self.packet_buffer[..]) => { - self.process_udp_message(interconnect, len); - }, - msg = self.rx.recv_async() => { - match msg { - Ok(UdpRxMessage::ReplaceInterconnect(i)) => { - *interconnect = i; - }, - Ok(UdpRxMessage::SetConfig(c)) => { - self.config = c; - }, - Err(flume::RecvError::Disconnected) => break, - } - }, - _ = tokio::time::sleep_until(cleanup_time) => { - // periodic cleanup. - let now = Instant::now(); - - // check ssrc map to see if the WS task has informed us of any disconnects. - loop { - // This is structured in an odd way to prevent deadlocks. - // while-let seemed to keep the dashmap iter() alive for block scope, rather than - // just the initialiser. - let id = { - if let Some(id) = self.ssrc_signalling.disconnected_users.iter().next().map(|v| *v.key()) { - id - } else { - break; - } - }; - - let _ = self.ssrc_signalling.disconnected_users.remove(&id); - if let Some((_, ssrc)) = self.ssrc_signalling.user_ssrc_map.remove(&id) { - if let Some(state) = self.decoder_map.get_mut(&ssrc) { - // don't cleanup immediately: leave for later cycle - // this is key with reorder/jitter buffers where we may - // still need to decode post disconnect for ~0.2s. - state.prune_time = now + Duration::from_secs(1); - state.disconnected = true; - } - } - } - - // now remove all dead ssrcs. - self.decoder_map.retain(|_, v| v.prune_time > now); - - cleanup_time = now + Duration::from_secs(5); - }, - } - } - } - - fn process_udp_message(&mut self, interconnect: &Interconnect, len: usize) { - // NOTE: errors here (and in general for UDP) are not fatal to the connection. - // Panics should be avoided due to adversarial nature of rx'd packets, - // but correct handling should not prompt a reconnect. - // - // For simplicity, we nominate the mixing context to rebuild the event - // context if it fails (hence, the `let _ =` statements.), as it will try to - // make contact every 20ms. - let crypto_mode = self.config.crypto_mode; - let packet = &mut self.packet_buffer[..len]; - - match demux::demux_mut(packet) { - DemuxedMut::Rtp(mut rtp) => { - if !rtp_valid(&rtp.to_immutable()) { - error!("Illegal RTP message received."); - return; - } - - let packet_data = if self.config.decode_mode.should_decrypt() { - let out = crypto_mode - .decrypt_in_place(&mut rtp, &self.cipher) - .map(|(s, t)| (s, t, true)); - - if let Err(e) = out { - warn!("RTP decryption failed: {:?}", e); - } - - out.ok() - } else { - None - }; - - let rtp = rtp.to_immutable(); - let (rtp_body_start, rtp_body_tail, decrypted) = packet_data.unwrap_or_else(|| { - ( - CryptoMode::payload_prefix_len(), - crypto_mode.payload_suffix_len(), - false, - ) - }); - - let entry = self - .decoder_map - .entry(rtp.get_ssrc()) - .or_insert_with(|| SsrcState::new(&rtp, self.config.decode_state_timeout)); - - // Only do this on RTP, rather than RTCP -- this pins decoder state liveness - // to *speech* rather than just presence. - entry.refresh_timer(self.config.decode_state_timeout); - - if let Ok((delta, audio)) = entry.process( - &rtp, - rtp_body_start, - rtp_body_tail, - self.config.decode_mode, - decrypted, - ) { - match delta { - SpeakingDelta::Start => { - drop(interconnect.events.send(EventMessage::FireCoreEvent( - CoreContext::SpeakingUpdate(InternalSpeakingUpdate { - ssrc: rtp.get_ssrc(), - speaking: true, - }), - ))); - }, - SpeakingDelta::Stop => { - drop(interconnect.events.send(EventMessage::FireCoreEvent( - CoreContext::SpeakingUpdate(InternalSpeakingUpdate { - ssrc: rtp.get_ssrc(), - speaking: false, - }), - ))); - }, - SpeakingDelta::Same => {}, - } - - drop(interconnect.events.send(EventMessage::FireCoreEvent( - CoreContext::VoicePacket(InternalVoicePacket { - audio, - packet: rtp.from_packet(), - payload_offset: rtp_body_start, - payload_end_pad: rtp_body_tail, - }), - ))); - } else { - warn!("RTP decoding/processing failed."); - } - }, - DemuxedMut::Rtcp(mut rtcp) => { - let packet_data = if self.config.decode_mode.should_decrypt() { - let out = crypto_mode.decrypt_in_place(&mut rtcp, &self.cipher); - - if let Err(e) = out { - warn!("RTCP decryption failed: {:?}", e); - } - - out.ok() - } else { - None - }; - - let (start, tail) = packet_data.unwrap_or_else(|| { - ( - CryptoMode::payload_prefix_len(), - crypto_mode.payload_suffix_len(), - ) - }); - - drop(interconnect.events.send(EventMessage::FireCoreEvent( - CoreContext::RtcpPacket(InternalRtcpPacket { - packet: rtcp.from_packet(), - payload_offset: start, - payload_end_pad: tail, - }), - ))); - }, - DemuxedMut::FailedParse(t) => { - warn!("Failed to parse message of type {:?}.", t); - }, - DemuxedMut::TooSmall => { - warn!("Illegal UDP packet from voice server."); - }, - } - } -} - -#[instrument(skip(interconnect, rx, cipher))] -pub(crate) async fn runner( - mut interconnect: Interconnect, - rx: Receiver, - cipher: Cipher, - config: Config, - udp_socket: UdpSocket, - ssrc_signalling: Arc, -) { - trace!("UDP receive handle started."); - - let mut state = UdpRx { - cipher, - decoder_map: HashMap::new(), - config, - packet_buffer: [0u8; VOICE_PACKET_MAX], - rx, - ssrc_signalling, - udp_socket, - }; - - state.run(&mut interconnect).await; - - trace!("UDP receive handle stopped."); -} - -#[inline] -fn rtp_valid(packet: &RtpPacket<'_>) -> bool { - packet.get_version() == RTP_VERSION && packet.get_payload_type() == RTP_PROFILE_TYPE -} diff --git a/src/driver/tasks/udp_rx/decode_sizes.rs b/src/driver/tasks/udp_rx/decode_sizes.rs new file mode 100644 index 000000000..546cef775 --- /dev/null +++ b/src/driver/tasks/udp_rx/decode_sizes.rs @@ -0,0 +1,42 @@ +use crate::constants::STEREO_FRAME_SIZE; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum PacketDecodeSize { + /// Minimum frame size on Discord. + TwentyMillis, + /// Hybrid packet, sent by Firefox web client. + /// + /// Likely 20ms frame + 10ms frame. + ThirtyMillis, + /// Next largest frame size. + FortyMillis, + /// Maximum Opus frame size. + SixtyMillis, + /// Maximum Opus packet size: 120ms. + Max, +} + +impl PacketDecodeSize { + pub fn bump_up(self) -> Self { + match self { + Self::TwentyMillis => Self::ThirtyMillis, + Self::ThirtyMillis => Self::FortyMillis, + Self::FortyMillis => Self::SixtyMillis, + Self::SixtyMillis | Self::Max => Self::Max, + } + } + + pub fn can_bump_up(self) -> bool { + self != Self::Max + } + + pub fn len(self) -> usize { + match self { + Self::TwentyMillis => STEREO_FRAME_SIZE, + Self::ThirtyMillis => (STEREO_FRAME_SIZE / 2) * 3, + Self::FortyMillis => 2 * STEREO_FRAME_SIZE, + Self::SixtyMillis => 3 * STEREO_FRAME_SIZE, + Self::Max => 6 * STEREO_FRAME_SIZE, + } + } +} diff --git a/src/driver/tasks/udp_rx/mod.rs b/src/driver/tasks/udp_rx/mod.rs new file mode 100644 index 000000000..fc97d48bb --- /dev/null +++ b/src/driver/tasks/udp_rx/mod.rs @@ -0,0 +1,268 @@ +mod decode_sizes; +mod playout_buffer; +mod ssrc_state; + +use self::{decode_sizes::*, playout_buffer::*, ssrc_state::*}; + +use super::message::*; +use crate::{ + constants::*, + driver::CryptoMode, + events::{context_data::VoiceTick, internal_data::*, CoreContext}, + Config, +}; +use bytes::BytesMut; +use discortp::{ + demux::{self, DemuxedMut}, + rtp::RtpPacket, +}; +use flume::Receiver; +use std::{ + collections::{HashMap, HashSet}, + num::Wrapping, + sync::Arc, + time::Duration, +}; +use tokio::{net::UdpSocket, select, time::Instant}; +use tracing::{error, instrument, trace, warn}; +use xsalsa20poly1305::XSalsa20Poly1305 as Cipher; + +type RtpSequence = Wrapping; +type RtpTimestamp = Wrapping; +type RtpSsrc = u32; + +struct UdpRx { + cipher: Cipher, + decoder_map: HashMap, + config: Config, + rx: Receiver, + ssrc_signalling: Arc, + udp_socket: UdpSocket, +} + +impl UdpRx { + #[instrument(skip(self))] + async fn run(&mut self, interconnect: &mut Interconnect) { + let mut cleanup_time = Instant::now(); + let mut playout_time = Instant::now() + TIMESTEP_LENGTH; + let mut byte_dest: Option = None; + + loop { + if byte_dest.is_none() { + byte_dest = Some(BytesMut::zeroed(VOICE_PACKET_MAX)); + } + + select! { + Ok((len, _addr)) = self.udp_socket.recv_from(byte_dest.as_mut().unwrap()) => { + let mut pkt = byte_dest.take().unwrap(); + pkt.truncate(len); + + self.process_udp_message(interconnect, pkt); + }, + msg = self.rx.recv_async() => { + match msg { + Ok(UdpRxMessage::ReplaceInterconnect(i)) => { + *interconnect = i; + }, + Ok(UdpRxMessage::SetConfig(c)) => { + self.config = c; + }, + Err(flume::RecvError::Disconnected) => break, + } + }, + _ = tokio::time::sleep_until(playout_time) => { + let mut tick = VoiceTick { + speaking: HashMap::new(), + silent: HashSet::new(), + }; + + for (ssrc, state) in &mut self.decoder_map { + match state.get_voice_tick(&self.config) { + Ok(Some(data)) => { + tick.speaking.insert(*ssrc, data); + }, + Ok(None) => { + if !state.disconnected { + tick.silent.insert(*ssrc); + } + }, + Err(e) => { + warn!("Decode error for SSRC {ssrc}: {e:?}"); + tick.silent.insert(*ssrc); + }, + } + } + + playout_time += TIMESTEP_LENGTH; + + drop(interconnect.events.send(EventMessage::FireCoreEvent(CoreContext::VoiceTick(tick)))); + }, + _ = tokio::time::sleep_until(cleanup_time) => { + // periodic cleanup. + let now = Instant::now(); + + // check ssrc map to see if the WS task has informed us of any disconnects. + loop { + // This is structured in an odd way to prevent deadlocks. + // while-let seemed to keep the dashmap iter() alive for block scope, rather than + // just the initialiser. + let id = { + if let Some(id) = self.ssrc_signalling.disconnected_users.iter().next().map(|v| *v.key()) { + id + } else { + break; + } + }; + + let _ = self.ssrc_signalling.disconnected_users.remove(&id); + if let Some((_, ssrc)) = self.ssrc_signalling.user_ssrc_map.remove(&id) { + if let Some(state) = self.decoder_map.get_mut(&ssrc) { + // don't cleanup immediately: leave for later cycle + // this is key with reorder/jitter buffers where we may + // still need to decode post disconnect for ~0.2s. + state.prune_time = now + Duration::from_secs(1); + state.disconnected = true; + } + } + } + + // now remove all dead ssrcs. + self.decoder_map.retain(|_, v| v.prune_time > now); + + cleanup_time = now + Duration::from_secs(5); + }, + } + } + } + + fn process_udp_message(&mut self, interconnect: &Interconnect, mut packet: BytesMut) { + // NOTE: errors here (and in general for UDP) are not fatal to the connection. + // Panics should be avoided due to adversarial nature of rx'd packets, + // but correct handling should not prompt a reconnect. + // + // For simplicity, if the event task fails then we nominate the mixing thread + // to rebuild their context etc. (hence, the `let _ =` statements.), as it will + // try to make contact every 20ms. + let crypto_mode = self.config.crypto_mode; + + match demux::demux_mut(packet.as_mut()) { + DemuxedMut::Rtp(mut rtp) => { + if !rtp_valid(&rtp.to_immutable()) { + error!("Illegal RTP message received."); + return; + } + + let packet_data = if self.config.decode_mode.should_decrypt() { + let out = crypto_mode + .decrypt_in_place(&mut rtp, &self.cipher) + .map(|(s, t)| (s, t, true)); + + if let Err(e) = out { + warn!("RTP decryption failed: {:?}", e); + } + + out.ok() + } else { + None + }; + + let rtp = rtp.to_immutable(); + let (rtp_body_start, rtp_body_tail, decrypted) = packet_data.unwrap_or_else(|| { + ( + CryptoMode::payload_prefix_len(), + crypto_mode.payload_suffix_len(), + false, + ) + }); + + let entry = self + .decoder_map + .entry(rtp.get_ssrc()) + .or_insert_with(|| SsrcState::new(&rtp, &self.config)); + + // Only do this on RTP, rather than RTCP -- this pins decoder state liveness + // to *speech* rather than just presence. + entry.refresh_timer(self.config.decode_state_timeout); + + let store_pkt = StoredPacket { + packet: packet.freeze(), + decrypted, + }; + let packet = store_pkt.packet.clone(); + entry.store_packet(store_pkt, &self.config); + + drop(interconnect.events.send(EventMessage::FireCoreEvent( + CoreContext::RtpPacket(InternalRtpPacket { + packet, + payload_offset: rtp_body_start, + payload_end_pad: rtp_body_tail, + }), + ))); + }, + DemuxedMut::Rtcp(mut rtcp) => { + let packet_data = if self.config.decode_mode.should_decrypt() { + let out = crypto_mode.decrypt_in_place(&mut rtcp, &self.cipher); + + if let Err(e) = out { + warn!("RTCP decryption failed: {:?}", e); + } + + out.ok() + } else { + None + }; + + let (start, tail) = packet_data.unwrap_or_else(|| { + ( + CryptoMode::payload_prefix_len(), + crypto_mode.payload_suffix_len(), + ) + }); + + drop(interconnect.events.send(EventMessage::FireCoreEvent( + CoreContext::RtcpPacket(InternalRtcpPacket { + packet: packet.freeze(), + payload_offset: start, + payload_end_pad: tail, + }), + ))); + }, + DemuxedMut::FailedParse(t) => { + warn!("Failed to parse message of type {:?}.", t); + }, + DemuxedMut::TooSmall => { + warn!("Illegal UDP packet from voice server."); + }, + } + } +} + +#[instrument(skip(interconnect, rx, cipher))] +pub(crate) async fn runner( + mut interconnect: Interconnect, + rx: Receiver, + cipher: Cipher, + config: Config, + udp_socket: UdpSocket, + ssrc_signalling: Arc, +) { + trace!("UDP receive handle started."); + + let mut state = UdpRx { + cipher, + decoder_map: HashMap::new(), + config, + rx, + ssrc_signalling, + udp_socket, + }; + + state.run(&mut interconnect).await; + + trace!("UDP receive handle stopped."); +} + +#[inline] +fn rtp_valid(packet: &RtpPacket<'_>) -> bool { + packet.get_version() == RTP_VERSION && packet.get_payload_type() == RTP_PROFILE_TYPE +} diff --git a/src/driver/tasks/udp_rx/playout_buffer.rs b/src/driver/tasks/udp_rx/playout_buffer.rs new file mode 100644 index 000000000..b50ba51f8 --- /dev/null +++ b/src/driver/tasks/udp_rx/playout_buffer.rs @@ -0,0 +1,147 @@ +use super::*; +use bytes::Bytes; +use discortp::rtp::RtpPacket; +use std::collections::VecDeque; +use tracing::trace; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct StoredPacket { + pub packet: Bytes, + // We need to store this as it's possible that a user can change config modes. + pub decrypted: bool, +} + +/// Determines whether an SSRC's packets should be decoded. +/// +/// Playout requires us to keep an almost constant delay, to do so we build +/// a user's packet buffer up to the required length ([`Config::playout_buffer_length`]) +/// ([`Self::Fill`]) and then emit packets on each tick ([`Self::Drain`]). +/// +/// This gets a bit harder to reason about when users stop speaking. If a speech gap +/// lasts longer than the playout buffer, then we can simply swap from `Drain` -> `Fill`. +/// However, a genuine gap of `n` frames must lead to us reverting to `Fill` for `n` frames. +/// To compute this, we use the RTP timestamp of two `seq`-adjacent packets at playout: if the next +/// timestamp is too large, then we revert to `Fill`. +/// +/// Small playout bursts also require care. +/// +/// If timestamp info is incorrect, then in the worst case we eventually need to rebuffer if the delay +/// drains to zero. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum PlayoutMode { + Fill, + Drain, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum PacketLookup { + Packet(StoredPacket), + MissedPacket, + Filling, +} + +#[derive(Debug)] +pub struct PlayoutBuffer { + playout_buffer: VecDeque>, + playout_mode: PlayoutMode, + next_seq: RtpSequence, + current_timestamp: Option, +} + +impl PlayoutBuffer { + pub fn new(capacity: usize, next_seq: RtpSequence) -> Self { + Self { + playout_buffer: VecDeque::with_capacity(capacity), + playout_mode: PlayoutMode::Fill, + next_seq, + current_timestamp: None, + } + } + + /// Slot a received RTP packet into the correct location in the playout buffer using + /// its sequence number, subject to maximums. + /// + /// An out of bounds packet must create any remaining `None`s + pub fn store_packet(&mut self, packet: StoredPacket, config: &Config) { + let rtp = RtpPacket::new(&packet.packet) + .expect("FATAL: earlier valid packet now invalid (store)"); + + if self.current_timestamp.is_none() { + self.current_timestamp = Some(reset_timeout(&rtp, config)); + } + + // compute index by taking wrapping difference between both seq numbers. + // If the difference is *too big*, or in the past [also 'too big, in a way], + // ignore the packet + let desired_index = (rtp.get_sequence().0 - self.next_seq).0 as i16; + + if desired_index < 0 { + trace!("Missed packet arrived late, discarding from playout."); + } else if desired_index >= 64 { + trace!("Packet arrived beyond playout max length."); + } else { + let index = desired_index as usize; + while self.playout_buffer.len() <= index { + self.playout_buffer.push_back(None); + } + self.playout_buffer[index] = Some(packet); + } + + if self.playout_buffer.len() >= config.playout_buffer_length.get() { + self.playout_mode = PlayoutMode::Drain; + } + } + + pub fn fetch_packet(&mut self) -> PacketLookup { + if self.playout_mode == PlayoutMode::Fill { + return PacketLookup::Filling; + } + + let out = match self.playout_buffer.pop_front() { + Some(Some(pkt)) => { + let rtp = RtpPacket::new(&pkt.packet) + .expect("FATAL: earlier valid packet now invalid (fetch)"); + + let curr_ts = self.current_timestamp.unwrap(); + let ts_diff = curr_ts - rtp.get_timestamp().0; + + if (ts_diff.0 as i32) <= 0 { + self.next_seq = (rtp.get_sequence() + 1).0; + + PacketLookup::Packet(pkt) + } else { + trace!("Witholding packet: ts_diff is {ts_diff}"); + self.playout_buffer.push_front(Some(pkt)); + self.playout_mode = PlayoutMode::Fill; + PacketLookup::Filling + } + }, + Some(None) => { + self.next_seq += 1; + PacketLookup::MissedPacket + }, + None => PacketLookup::Filling, + }; + + if self.playout_buffer.is_empty() { + self.playout_mode = PlayoutMode::Fill; + self.current_timestamp = None; + } + + if let Some(ts) = self.current_timestamp.as_mut() { + *ts += &(MONO_FRAME_SIZE as u32); + } + + out + } + + pub fn next_seq(&self) -> RtpSequence { + self.next_seq + } +} + +#[inline] +fn reset_timeout(packet: &RtpPacket<'_>, config: &Config) -> RtpTimestamp { + let t_shift = MONO_FRAME_SIZE * config.playout_buffer_length.get(); + (packet.get_timestamp() - (t_shift as u32)).0 +} diff --git a/src/driver/tasks/udp_rx/ssrc_state.rs b/src/driver/tasks/udp_rx/ssrc_state.rs new file mode 100644 index 000000000..2b9819e5a --- /dev/null +++ b/src/driver/tasks/udp_rx/ssrc_state.rs @@ -0,0 +1,196 @@ +use super::*; +use crate::{ + constants::*, + driver::{ + tasks::error::{Error, Result}, + CryptoMode, + DecodeMode, + }, + events::context_data::{RtpData, VoiceData}, + Config, +}; +use audiopus::{ + coder::Decoder as OpusDecoder, + error::{Error as OpusError, ErrorCode}, + packet::Packet as OpusPacket, + Channels, +}; +use discortp::{ + rtp::{RtpExtensionPacket, RtpPacket}, + Packet, + PacketSize, +}; +use std::{convert::TryInto, time::Duration}; +use tokio::time::Instant; +use tracing::{error, warn}; + +#[derive(Debug)] +pub struct SsrcState { + playout_buffer: PlayoutBuffer, + decoder: OpusDecoder, + decode_size: PacketDecodeSize, + pub(crate) prune_time: Instant, + pub(crate) disconnected: bool, +} + +impl SsrcState { + pub fn new(pkt: &RtpPacket<'_>, config: &Config) -> Self { + let playout_capacity = config.playout_buffer_length.get() + config.playout_spike_length; + + Self { + playout_buffer: PlayoutBuffer::new(playout_capacity, pkt.get_sequence().0), + decoder: OpusDecoder::new(SAMPLE_RATE, Channels::Stereo) + .expect("Failed to create new Opus decoder for source."), + decode_size: PacketDecodeSize::TwentyMillis, + prune_time: Instant::now() + config.decode_state_timeout, + disconnected: false, + } + } + + pub fn store_packet(&mut self, packet: StoredPacket, config: &Config) { + self.playout_buffer.store_packet(packet, config); + } + + pub fn refresh_timer(&mut self, state_timeout: Duration) { + if !self.disconnected { + self.prune_time = Instant::now() + state_timeout; + } + } + + pub fn get_voice_tick(&mut self, config: &Config) -> Result> { + // Acquire a packet from the playout buffer: + // Update nexts, lasts... + // different cases: null packet who we want to decode as a miss, and packet who we must ignore temporarily. + let m_pkt = self.playout_buffer.fetch_packet(); + let pkt = match m_pkt { + PacketLookup::Packet(StoredPacket { packet, decrypted }) => Some((packet, decrypted)), + PacketLookup::MissedPacket => None, + PacketLookup::Filling => return Ok(None), + }; + + let mut out = VoiceData { + packet: None, + decoded_voice: None, + }; + + let should_decode = config.decode_mode == DecodeMode::Decode; + + if let Some((packet, decrypted)) = pkt { + let rtp = RtpPacket::new(&packet).unwrap(); + let extensions = rtp.get_extension() != 0; + + let payload = rtp.payload(); + let payload_offset = CryptoMode::payload_prefix_len(); + let payload_end_pad = payload.len() - config.crypto_mode.payload_suffix_len(); + + // We still need to compute missed packets here in case of long loss chains or similar. + // This occurs due to the fallback in 'store_packet' (i.e., empty buffer and massive seq difference). + // Normal losses should be handled by the below `else` branch. + let new_seq: u16 = rtp.get_sequence().into(); + let missed_packets = new_seq.saturating_sub(self.playout_buffer.next_seq().0); + + // TODO: maybe hand over audio and extension indices alongside packet? + let (audio, _packet_size) = self.scan_and_decode( + &payload[payload_offset..payload_end_pad], + extensions, + missed_packets, + should_decode && decrypted, + )?; + + let rtp_data = RtpData { + packet, + payload_offset, + payload_end_pad, + }; + + out.packet = Some(rtp_data); + out.decoded_voice = audio; + } else if should_decode { + let mut audio = vec![0; self.decode_size.len()]; + let dest_samples = (&mut audio[..]) + .try_into() + .expect("Decode logic will cap decode buffer size at i32::MAX."); + let len = self.decoder.decode(None, dest_samples, false)?; + audio.truncate(2 * len); + + out.decoded_voice = Some(audio); + } + + Ok(Some(out)) + } + + fn scan_and_decode( + &mut self, + data: &[u8], + extension: bool, + missed_packets: u16, + decode: bool, + ) -> Result<(Option>, usize)> { + let start = if extension { + RtpExtensionPacket::new(data) + .map(|pkt| pkt.packet_size()) + .ok_or_else(|| { + error!("Extension packet indicated, but insufficient space."); + Error::IllegalVoicePacket + }) + } else { + Ok(0) + }?; + + let pkt = if decode { + let mut out = vec![0; self.decode_size.len()]; + + for _ in 0..missed_packets { + let missing_frame: Option = None; + let dest_samples = (&mut out[..]) + .try_into() + .expect("Decode logic will cap decode buffer size at i32::MAX."); + if let Err(e) = self.decoder.decode(missing_frame, dest_samples, false) { + warn!("Issue while decoding for missed packet: {:?}.", e); + } + } + + // In general, we should expect 20 ms frames. + // However, Discord occasionally like to surprise us with something bigger. + // This is *sender-dependent behaviour*. + // + // This should scan up to find the "correct" size that a source is using, + // and then remember that. + loop { + let tried_audio_len = self.decoder.decode( + Some(data[start..].try_into()?), + (&mut out[..]).try_into()?, + false, + ); + match tried_audio_len { + Ok(audio_len) => { + // Decoding to stereo: audio_len refers to sample count irrespective of channel count. + // => multiply by number of channels. + out.truncate(2 * audio_len); + + break; + }, + Err(OpusError::Opus(ErrorCode::BufferTooSmall)) => { + if self.decode_size.can_bump_up() { + self.decode_size = self.decode_size.bump_up(); + out = vec![0; self.decode_size.len()]; + } else { + error!("Received packet larger than Opus standard maximum,"); + return Err(Error::IllegalVoicePacket); + } + }, + Err(e) => { + error!("Failed to decode received packet: {:?}.", e); + return Err(e.into()); + }, + } + } + + Some(out) + } else { + None + }; + + Ok((pkt, data.len() - start)) + } +} diff --git a/src/events/context/data/mod.rs b/src/events/context/data/mod.rs index 6021e4dab..32d9db48e 100644 --- a/src/events/context/data/mod.rs +++ b/src/events/context/data/mod.rs @@ -6,13 +6,13 @@ mod disconnect; #[cfg(feature = "receive")] mod rtcp; #[cfg(feature = "receive")] -mod speaking; +mod rtp; #[cfg(feature = "receive")] mod voice; #[cfg(feature = "receive")] -use discortp::{rtcp::Rtcp, rtp::Rtp}; +use bytes::Bytes; pub use self::{connect::*, disconnect::*}; #[cfg(feature = "receive")] -pub use self::{rtcp::*, speaking::*, voice::*}; +pub use self::{rtcp::*, rtp::*, voice::*}; diff --git a/src/events/context/data/rtcp.rs b/src/events/context/data/rtcp.rs index 348a4f516..b882db1a6 100644 --- a/src/events/context/data/rtcp.rs +++ b/src/events/context/data/rtcp.rs @@ -1,3 +1,5 @@ +use discortp::rtcp::RtcpPacket; + use super::*; #[derive(Clone, Debug, Eq, PartialEq)] @@ -5,11 +7,22 @@ use super::*; /// Telemetry/statistics packet, received from another stream (detailed in `packet`). /// `payload_offset` contains the true payload location within the raw packet's `payload()`, /// to allow manual decoding of `Rtcp` packet bodies. -pub struct RtcpData<'a> { +pub struct RtcpData { /// Raw RTCP packet data. - pub packet: &'a Rtcp, + pub packet: Bytes, /// Byte index into the packet body (after headers) for where the payload begins. pub payload_offset: usize, /// Number of bytes at the end of the packet to discard. pub payload_end_pad: usize, } + +impl RtcpData { + /// Create a zero-copy view of the inner RTCP packet. + /// + /// This allows easy access to packet header fields, taking them from the underlying + /// `Bytes` as needed while handling endianness etc. + pub fn rtcp(&'_ self) -> RtcpPacket<'_> { + RtcpPacket::new(&self.packet) + .expect("FATAL: leaked illegally small RTP packet from UDP Rx task.") + } +} diff --git a/src/events/context/data/rtp.rs b/src/events/context/data/rtp.rs new file mode 100644 index 000000000..1b59cd5b5 --- /dev/null +++ b/src/events/context/data/rtp.rs @@ -0,0 +1,30 @@ +use discortp::rtp::RtpPacket; + +use super::*; + +#[derive(Clone, Debug, Eq, PartialEq)] +#[non_exhaustive] +/// Opus audio packet, received from another stream (detailed in `packet`). +/// `payload_offset` contains the true payload location within the raw packet's `payload()`, +/// if extensions or raw packet data are required. +pub struct RtpData { + /// Raw RTP packet data. + /// + /// Includes the SSRC (i.e., sender) of this packet. + pub packet: Bytes, + /// Byte index into the packet body (after headers) for where the payload begins. + pub payload_offset: usize, + /// Number of bytes at the end of the packet to discard. + pub payload_end_pad: usize, +} + +impl RtpData { + /// Create a zero-copy view of the inner RTP packet. + /// + /// This allows easy access to packet header fields, taking them from the underlying + /// `Bytes` as needed while handling endianness etc. + pub fn rtp(&'_ self) -> RtpPacket<'_> { + RtpPacket::new(&self.packet) + .expect("FATAL: leaked illegally small RTP packet from UDP Rx task.") + } +} diff --git a/src/events/context/data/speaking.rs b/src/events/context/data/speaking.rs deleted file mode 100644 index 8b31b993e..000000000 --- a/src/events/context/data/speaking.rs +++ /dev/null @@ -1,14 +0,0 @@ -#[derive(Clone, Debug, Eq, Hash, PartialEq)] -#[non_exhaustive] -/// Speaking state transition, describing whether a given source has started/stopped -/// transmitting. This fires in response to a silent burst, or the first packet -/// breaking such a burst. -pub struct SpeakingUpdateData { - /// Whether this user is currently speaking. - pub speaking: bool, - /// Synchronisation Source of the user who has begun speaking. - /// - /// This must be combined with another event class to map this back to - /// its original UserId. - pub ssrc: u32, -} diff --git a/src/events/context/data/voice.rs b/src/events/context/data/voice.rs index 96a0a14b9..92dc70d11 100644 --- a/src/events/context/data/voice.rs +++ b/src/events/context/data/voice.rs @@ -1,28 +1,38 @@ +use std::collections::{HashMap, HashSet}; + use super::*; #[derive(Clone, Debug, Eq, PartialEq)] #[non_exhaustive] -/// Opus audio packet, received from another stream (detailed in `packet`). -/// `payload_offset` contains the true payload location within the raw packet's `payload()`, -/// if extensions or raw packet data are required. -/// -/// Valid audio data (`Some(audio)` where `audio.len >= 0`) contains up to 20ms of 16-bit stereo PCM audio -/// at 48kHz, using native endianness. Songbird will not send audio for silent regions, these should -/// be inferred using [`SpeakingUpdate`]s (and filled in by the user if required using arrays of zeroes). +/// Audio data from all users in a voice channel, fired every 20ms. /// -/// If `audio.len() == 0`, then this packet arrived out-of-order. If `None`, songbird was not configured -/// to decode received packets. +/// Songbird implements a jitter buffer to sycnhronise user packets, smooth out network latency, and +/// handle packet reordering by the network. Packet playout via this event is delayed by approximately +/// [`Config::playout_buffer_length`]` * 20ms` from its original arrival. /// -/// [`SpeakingUpdate`]: crate::events::CoreEvent::SpeakingUpdate -pub struct VoiceData<'a> { - /// Decoded audio from this packet. - pub audio: &'a Option>, - /// Raw RTP packet data. +/// [`Config::playout_buffer_length`]: crate::Config::playout_buffer_length +pub struct VoiceTick { + /// Decoded voice data and source packets sent by each user. + pub speaking: HashMap, + + /// Set of all SSRCs currently known in the call who aren't included in [`Self::speaking`]. + pub silent: HashSet, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +#[non_exhaustive] +/// Voice packet and audio data for a single user, from a single tick. +pub struct VoiceData { + /// RTP packet clocked out for this tick. + /// + /// If `None`, then the packet was lost, and [`Self::decoded_voice`] may include + /// around one codec delay's worth of audio. + pub packet: Option, + /// PCM audio obtained from a user. + /// + /// Valid audio data (`Some(audio)` where `audio.len >= 0`) typically contains 20ms of 16-bit stereo PCM audio + /// at 48kHz, using native endianness. Channels are interleaved (i.e., `L, R, L, R, ...`). /// - /// Includes the SSRC (i.e., sender) of this packet. - pub packet: &'a Rtp, - /// Byte index into the packet body (after headers) for where the payload begins. - pub payload_offset: usize, - /// Number of bytes at the end of the packet to discard. - pub payload_end_pad: usize, + /// This value will be `None` if Songbird is not configured to decode audio. + pub decoded_voice: Option>, } diff --git a/src/events/context/internal_data.rs b/src/events/context/internal_data.rs index a3b71b60e..b85e28259 100644 --- a/src/events/context/internal_data.rs +++ b/src/events/context/internal_data.rs @@ -41,53 +41,36 @@ impl<'a> From<&'a InternalDisconnect> for DisconnectData<'a> { #[cfg(feature = "receive")] mod receive { use super::*; - use discortp::{rtcp::Rtcp, rtp::Rtp}; - - #[derive(Clone, Debug, Eq, Hash, PartialEq)] - pub struct InternalSpeakingUpdate { - pub ssrc: u32, - pub speaking: bool, - } + use bytes::Bytes; #[derive(Clone, Debug, Eq, PartialEq)] - pub struct InternalVoicePacket { - pub audio: Option>, - pub packet: Rtp, + pub struct InternalRtpPacket { + pub packet: Bytes, pub payload_offset: usize, pub payload_end_pad: usize, } #[derive(Clone, Debug, Eq, PartialEq)] pub struct InternalRtcpPacket { - pub packet: Rtcp, + pub packet: Bytes, pub payload_offset: usize, pub payload_end_pad: usize, } - impl<'a> From<&'a InternalSpeakingUpdate> for SpeakingUpdateData { - fn from(val: &'a InternalSpeakingUpdate) -> Self { - Self { - speaking: val.speaking, - ssrc: val.ssrc, - } - } - } - - impl<'a> From<&'a InternalVoicePacket> for VoiceData<'a> { - fn from(val: &'a InternalVoicePacket) -> Self { + impl<'a> From<&'a InternalRtpPacket> for RtpData { + fn from(val: &'a InternalRtpPacket) -> Self { Self { - audio: &val.audio, - packet: &val.packet, + packet: val.packet.clone(), payload_offset: val.payload_offset, payload_end_pad: val.payload_end_pad, } } } - impl<'a> From<&'a InternalRtcpPacket> for RtcpData<'a> { + impl<'a> From<&'a InternalRtcpPacket> for RtcpData { fn from(val: &'a InternalRtcpPacket) -> Self { Self { - packet: &val.packet, + packet: val.packet.clone(), payload_offset: val.payload_offset, payload_end_pad: val.payload_end_pad, } diff --git a/src/events/context/mod.rs b/src/events/context/mod.rs index 2ea092161..4a0d9b87c 100644 --- a/src/events/context/mod.rs +++ b/src/events/context/mod.rs @@ -33,18 +33,16 @@ pub enum EventContext<'a> { SpeakingStateUpdate(Speaking), #[cfg(feature = "receive")] - /// Speaking state transition, describing whether a given source has started/stopped - /// transmitting. This fires in response to a silent burst, or the first packet - /// breaking such a burst. - SpeakingUpdate(SpeakingUpdateData), + /// Reordered and decoded audio packets, received every 20ms. + VoiceTick(VoiceTick), #[cfg(feature = "receive")] /// Opus audio packet, received from another stream. - VoicePacket(VoiceData<'a>), + RtpPacket(RtpData), #[cfg(feature = "receive")] /// Telemetry/statistics packet, received from another stream. - RtcpPacket(RtcpData<'a>), + RtcpPacket(RtcpData), /// Fired whenever a client disconnects. ClientDisconnect(ClientDisconnect), @@ -63,9 +61,9 @@ pub enum EventContext<'a> { pub enum CoreContext { SpeakingStateUpdate(Speaking), #[cfg(feature = "receive")] - SpeakingUpdate(InternalSpeakingUpdate), + VoiceTick(VoiceTick), #[cfg(feature = "receive")] - VoicePacket(InternalVoicePacket), + RtpPacket(InternalRtpPacket), #[cfg(feature = "receive")] RtcpPacket(InternalRtcpPacket), ClientDisconnect(ClientDisconnect), @@ -79,10 +77,9 @@ impl<'a> CoreContext { match self { Self::SpeakingStateUpdate(evt) => EventContext::SpeakingStateUpdate(*evt), #[cfg(feature = "receive")] - Self::SpeakingUpdate(evt) => - EventContext::SpeakingUpdate(SpeakingUpdateData::from(evt)), + Self::VoiceTick(evt) => EventContext::VoiceTick(evt.clone()), #[cfg(feature = "receive")] - Self::VoicePacket(evt) => EventContext::VoicePacket(VoiceData::from(evt)), + Self::RtpPacket(evt) => EventContext::RtpPacket(RtpData::from(evt)), #[cfg(feature = "receive")] Self::RtcpPacket(evt) => EventContext::RtcpPacket(RtcpData::from(evt)), Self::ClientDisconnect(evt) => EventContext::ClientDisconnect(*evt), @@ -102,9 +99,9 @@ impl EventContext<'_> { match self { Self::SpeakingStateUpdate(_) => Some(CoreEvent::SpeakingStateUpdate), #[cfg(feature = "receive")] - Self::SpeakingUpdate(_) => Some(CoreEvent::SpeakingUpdate), + Self::VoiceTick(_) => Some(CoreEvent::VoiceTick), #[cfg(feature = "receive")] - Self::VoicePacket(_) => Some(CoreEvent::VoicePacket), + Self::RtpPacket(_) => Some(CoreEvent::RtpPacket), #[cfg(feature = "receive")] Self::RtcpPacket(_) => Some(CoreEvent::RtcpPacket), Self::ClientDisconnect(_) => Some(CoreEvent::ClientDisconnect), diff --git a/src/events/core.rs b/src/events/core.rs index cec06d127..51e4257c1 100644 --- a/src/events/core.rs +++ b/src/events/core.rs @@ -9,14 +9,11 @@ /// when a client leaves the session ([`ClientDisconnect`]). /// /// When the `"receive"` feature is enabled, songbird can also handle voice packets -#[cfg_attr(feature = "receive", doc = "([`VoicePacket`](Self::VoicePacket)),")] -#[cfg_attr(not(feature = "receive"), doc = "(`VoicePacket`),")] -/// detect speech starting/stopping -#[cfg_attr( - feature = "receive", - doc = "([`SpeakingUpdate`](Self::SpeakingUpdate))," -)] -#[cfg_attr(not(feature = "receive"), doc = "(`SpeakingUpdate`),")] +#[cfg_attr(feature = "receive", doc = "([`RtpPacket`](Self::RtpPacket)),")] +#[cfg_attr(not(feature = "receive"), doc = "(`RtpPacket`),")] +/// decode and track speaking users +#[cfg_attr(feature = "receive", doc = "([`VoiceTick`](Self::VoiceTick)),")] +#[cfg_attr(not(feature = "receive"), doc = "(`VoiceTick`),")] /// and handle telemetry data #[cfg_attr(feature = "receive", doc = "([`RtcpPacket`](Self::RtcpPacket)).")] #[cfg_attr(not(feature = "receive"), doc = "(`RtcpPacket`).")] @@ -49,9 +46,9 @@ pub enum CoreEvent { SpeakingStateUpdate, #[cfg(feature = "receive")] - /// Fires when a source starts speaking, or stops speaking - /// (*i.e.*, 5 consecutive silent frames). - SpeakingUpdate, + /// Fires every 20ms, containing the scheduled voice packet and decoded audio + /// data for each live user. + VoiceTick, #[cfg(feature = "receive")] /// Fires on receipt of a voice packet from another stream in the voice call. @@ -59,7 +56,7 @@ pub enum CoreEvent { /// As RTP packets do not map to Discord's notion of users, SSRCs must be mapped /// back using the user IDs seen through client connection, disconnection, /// or speaking state update. - VoicePacket, + RtpPacket, #[cfg(feature = "receive")] /// Fires on receipt of an RTCP packet, containing various call stats