Skip to content

Commit

Permalink
Driver/receive: Implement audio reorder/jitter buffer (#156)
Browse files Browse the repository at this point in the history
This PR Introduces a new `VoiceTick` event which collects and reorders all RTP packets to smooth over network instability, as well as to synchronise user audio streams. Raw packet events have been moved to `RtpPacket`, while `SpeakingUpdate`s have been removed as they can be easily computed using the `silent`/`speaking` audio maps included in each event.

Closes #146.
  • Loading branch information
FelixMcFelix committed Nov 20, 2023
1 parent ab18f9e commit c60c454
Show file tree
Hide file tree
Showing 19 changed files with 922 additions and 610 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ version = "0.3.0"
async-trait = { optional = true, version = "0.1" }
audiopus = { optional = true, version = "0.3.0-rc.0" }
byteorder = { optional = true, version = "1" }
bytes = { optional = true, version = "1" }
dashmap = { optional = true, version = "5" }
derivative = "2"
discortp = { default-features = false, features = ["discord", "pnet", "rtp"], optional = true, version = "0.5" }
Expand Down Expand Up @@ -145,7 +146,7 @@ twilight = ["dep:twilight-gateway","dep:twilight-model"]

# Behaviour altering features.
builtin-queue = []
receive = ["discortp?/demux", "discortp?/rtcp"]
receive = ["dep:bytes", "discortp?/demux", "discortp?/rtcp"]

# Used for docgen/testing/benchmarking.
full-doc = ["default", "twilight", "builtin-queue", "receive"]
Expand Down
1 change: 1 addition & 0 deletions examples/serenity/voice_receive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ authors = ["my name <[email protected]>"]
edition = "2021"

[dependencies]
dashmap = "5"
tracing = "0.1"
tracing-subscriber = "0.2"
tracing-futures = "0.2"
Expand Down
134 changes: 98 additions & 36 deletions examples/serenity/voice_receive/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@
//! git = "https://github.com/serenity-rs/serenity.git"
//! features = ["client", "standard_framework", "voice"]
//! ```
use std::env;
use std::{
env,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};

use dashmap::DashMap;

use serenity::{
async_trait,
Expand All @@ -26,7 +34,11 @@ use serenity::{

use songbird::{
driver::DecodeMode,
model::payload::{ClientDisconnect, Speaking},
model::{
id::UserId,
payload::{ClientDisconnect, Speaking},
},
packet::Packet,
Config,
CoreEvent,
Event,
Expand All @@ -44,13 +56,26 @@ impl EventHandler for Handler {
}
}

struct Receiver;
#[derive(Clone)]
struct Receiver {
inner: Arc<InnerReceiver>,
}

struct InnerReceiver {
last_tick_was_empty: AtomicBool,
known_ssrcs: DashMap<u32, UserId>,
}

impl Receiver {
pub fn new() -> Self {
// You can manage state here, such as a buffer of audio packet bytes so
// you can later store them in intervals.
Self {}
Self {
inner: Arc::new(InnerReceiver {
last_tick_was_empty: AtomicBool::default(),
known_ssrcs: DashMap::new(),
}),
}
}
}

Expand Down Expand Up @@ -81,34 +106,73 @@ impl VoiceEventHandler for Receiver {
"Speaking state update: user {:?} has SSRC {:?}, using {:?}",
user_id, ssrc, speaking,
);

if let Some(user) = user_id {
self.inner.known_ssrcs.insert(*ssrc, *user);
}
},
Ctx::SpeakingUpdate(data) => {
// You can implement logic here which reacts to a user starting
// or stopping speaking, and to map their SSRC to User ID.
println!(
"Source {} has {} speaking.",
data.ssrc,
if data.speaking { "started" } else { "stopped" },
);
Ctx::VoiceTick(tick) => {
let speaking = tick.speaking.len();
let total_participants = speaking + tick.silent.len();
let last_tick_was_empty = self.inner.last_tick_was_empty.load(Ordering::SeqCst);

if speaking == 0 && !last_tick_was_empty {
println!("No speakers");

self.inner.last_tick_was_empty.store(true, Ordering::SeqCst);
} else if speaking != 0 {
self.inner
.last_tick_was_empty
.store(false, Ordering::SeqCst);

println!("Voice tick ({speaking}/{total_participants} live):");

// You can also examine tick.silent to see users who are present
// but haven't spoken in this tick.
for (ssrc, data) in &tick.speaking {
let user_id_str = if let Some(id) = self.inner.known_ssrcs.get(ssrc) {
format!("{:?}", *id)
} else {
"?".into()
};

// This field should *always* exist under DecodeMode::Decode.
// The `else` allows you to see how the other modes are affected.
if let Some(decoded_voice) = data.decoded_voice.as_ref() {
let voice_len = decoded_voice.len();
let audio_str = format!(
"first samples from {}: {:?}",
voice_len,
&decoded_voice[..voice_len.min(5)]
);

if let Some(packet) = &data.packet {
let rtp = packet.rtp();
println!(
"\t{ssrc}/{user_id_str}: packet seq {} ts {} -- {audio_str}",
rtp.get_sequence().0,
rtp.get_timestamp().0
);
} else {
println!("\t{ssrc}/{user_id_str}: Missed packet -- {audio_str}");
}
} else {
println!("\t{ssrc}/{user_id_str}: Decode disabled.");
}
}
}
},
Ctx::VoicePacket(data) => {
Ctx::RtpPacket(packet) => {
// An event which fires for every received audio packet,
// containing the decoded data.
if let Some(audio) = data.audio {
println!(
"Audio packet's first 5 samples: {:?}",
audio.get(..5.min(audio.len()))
);
println!(
"Audio packet sequence {:05} has {:04} bytes (decompressed from {}), SSRC {}",
data.packet.sequence.0,
audio.len() * std::mem::size_of::<i16>(),
data.packet.payload.len(),
data.packet.ssrc,
);
} else {
println!("RTP packet, but no audio. Driver may not be configured to decode.");
}
let rtp = packet.rtp();
println!(
"Received voice packet from SSRC {}, sequence {}, timestamp {} -- {}B long",
rtp.get_ssrc(),
rtp.get_sequence().0,
rtp.get_timestamp().0,
rtp.payload().len()
);
},
Ctx::RtcpPacket(data) => {
// An event which fires for every received rtcp packet,
Expand Down Expand Up @@ -195,15 +259,13 @@ async fn join(ctx: &Context, msg: &Message, mut args: Args) -> CommandResult {
// NOTE: this skips listening for the actual connection result.
let mut handler = handler_lock.lock().await;

handler.add_global_event(CoreEvent::SpeakingStateUpdate.into(), Receiver::new());

handler.add_global_event(CoreEvent::SpeakingUpdate.into(), Receiver::new());

handler.add_global_event(CoreEvent::VoicePacket.into(), Receiver::new());

handler.add_global_event(CoreEvent::RtcpPacket.into(), Receiver::new());
let evt_receiver = Receiver::new();

handler.add_global_event(CoreEvent::ClientDisconnect.into(), Receiver::new());
handler.add_global_event(CoreEvent::SpeakingStateUpdate.into(), evt_receiver.clone());
handler.add_global_event(CoreEvent::RtpPacket.into(), evt_receiver.clone());
handler.add_global_event(CoreEvent::RtcpPacket.into(), evt_receiver.clone());
handler.add_global_event(CoreEvent::ClientDisconnect.into(), evt_receiver.clone());
handler.add_global_event(CoreEvent::VoiceTick.into(), evt_receiver);

check_msg(
msg.channel_id
Expand Down
53 changes: 48 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use crate::driver::test_config::*;
use symphonia::core::{codecs::CodecRegistry, probe::Probe};

use derivative::Derivative;
#[cfg(feature = "receive")]
use std::num::NonZeroUsize;
use std::time::Duration;

/// Configuration for drivers and calls.
Expand All @@ -35,18 +37,19 @@ pub struct Config {
#[cfg(all(feature = "driver", feature = "receive"))]
/// Configures whether decoding and decryption occur for all received packets.
///
/// If voice receiving voice packets, generally you should choose [`DecodeMode::Decode`].
/// [`DecodeMode::Decrypt`] is intended for users running their own selective decoding,
/// who rely upon [user speaking events], or who need to inspect Opus packets.
/// If you're certain you will never need any RT(C)P events, then consider [`DecodeMode::Pass`].
/// If receiving and using voice packets, generally you should choose [`DecodeMode::Decode`].
/// [`DecodeMode::Decrypt`] is intended for users running their own selective decoding or
/// who need to inspect Opus packets. [User speaking state] can still be seen using [`DecodeMode::Pass`].
/// If you're certain you will never need any RT(C)P events, then consider building without
/// the `"receive"` feature for extra performance.
///
/// Defaults to [`DecodeMode::Decrypt`]. This is due to per-packet decoding costs,
/// which most users will not want to pay, but allowing speaking events which are commonly used.
///
/// [`DecodeMode::Decode`]: DecodeMode::Decode
/// [`DecodeMode::Decrypt`]: DecodeMode::Decrypt
/// [`DecodeMode::Pass`]: DecodeMode::Pass
/// [user speaking events]: crate::events::CoreEvent::SpeakingUpdate
/// [User speaking state]: crate::events::CoreEvent::VoiceTick
pub decode_mode: DecodeMode,

#[cfg(all(feature = "driver", feature = "receive"))]
Expand All @@ -56,6 +59,26 @@ pub struct Config {
/// Defaults to 1 minute.
pub decode_state_timeout: Duration,

#[cfg(all(feature = "driver", feature = "receive"))]
/// Configures the number of audio packets to buffer for each user before playout.
///
/// A playout buffer allows Songbird to smooth out jitter in audio packet arrivals,
/// as well as to correct for reordering of packets by the network.
///
/// This does not affect the arrival of raw packet events.
///
/// Defaults to 5 packets (100ms).
pub playout_buffer_length: NonZeroUsize,

#[cfg(all(feature = "driver", feature = "receive"))]
/// Configures the initial amount of extra space allocated to handle packet bursts.
///
/// Each SSRC's receive buffer will start at capacity `playout_buffer_length +
/// playout_spike_length`, up to a maximum 64 packets.
///
/// Defaults to 3 packets (thus capacity defaults to 8).
pub playout_spike_length: usize,

#[cfg(feature = "gateway")]
/// Configures the amount of time to wait for Discord to reply with connection information
/// if [`Call::join`]/[`join_gateway`] are used.
Expand Down Expand Up @@ -174,6 +197,10 @@ impl Default for Config {
decode_mode: DecodeMode::Decrypt,
#[cfg(all(feature = "driver", feature = "receive"))]
decode_state_timeout: Duration::from_secs(60),
#[cfg(all(feature = "driver", feature = "receive"))]
playout_buffer_length: NonZeroUsize::new(5).unwrap(),
#[cfg(all(feature = "driver", feature = "receive"))]
playout_spike_length: 3,
#[cfg(feature = "gateway")]
gateway_timeout: Some(Duration::from_secs(10)),
#[cfg(feature = "driver")]
Expand Down Expand Up @@ -227,6 +254,22 @@ impl Config {
self
}

#[cfg(feature = "receive")]
/// Sets this `Config`'s playout buffer length, in packets.
#[must_use]
pub fn playout_buffer_length(mut self, playout_buffer_length: NonZeroUsize) -> Self {
self.playout_buffer_length = playout_buffer_length;
self
}

#[cfg(feature = "receive")]
/// Sets this `Config`'s additional pre-allocated space to handle bursty audio packets.
#[must_use]
pub fn playout_spike_length(mut self, playout_spike_length: usize) -> Self {
self.playout_spike_length = playout_spike_length;
self
}

/// Sets this `Config`'s audio mixing channel count.
#[must_use]
pub fn mix_mode(mut self, mix_mode: MixMode) -> Self {
Expand Down
7 changes: 0 additions & 7 deletions src/driver/decode_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,6 @@ pub enum DecodeMode {
/// changes applied.
///
/// No CPU work involved.
///
/// *BEWARE: this will almost certainly break [user speaking events].
/// Silent frame detection only works if extensions can be parsed or
/// are not present, as they are encrypted.
/// This event requires such functionality.*
///
/// [user speaking events]: crate::events::CoreEvent::SpeakingUpdate
Pass,
/// Decrypts the body of each received packet.
///
Expand Down
14 changes: 14 additions & 0 deletions src/driver/tasks/mixer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,8 @@ impl Mixer {
None => {},
}

self.advance_rtp_timestamp();

return Ok(());
}
} else {
Expand Down Expand Up @@ -809,6 +811,18 @@ impl Mixer {
rtp.set_timestamp(rtp.get_timestamp() + MONO_FRAME_SIZE as u32);
}

#[inline]
// Even if we don't send a packet, we *do* need to keep advancing the timestamp
// to make it easier for a receiver to reorder packets and compute jitter measures
// wrt. our clock difference vs. theirs.
fn advance_rtp_timestamp(&mut self) {
let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect(
"FATAL: Too few bytes in self.packet for RTP header.\
(Blame: VOICE_PACKET_MAX?)",
);
rtp.set_timestamp(rtp.get_timestamp() + MONO_FRAME_SIZE as u32);
}

#[inline]
fn check_and_send_keepalive(&mut self) -> Result<()> {
if let Some(conn) = self.conn_active.as_mut() {
Expand Down
Loading

0 comments on commit c60c454

Please sign in to comment.