Skip to content

Commit

Permalink
Events: Break out and non-exhaust context body structs (#54)
Browse files Browse the repository at this point in the history
This PR makes many of the types under `EventContext` separate `#[non_exhaustive]` structs. This makes it more feasible to add further information to connection and packet events as required in future. On this note, driver (re)connection events now include the SSRC supplied by Discord and the domain name which was connected to.

In addition, this fixes global timed events to return a list of all live tracks, and extensively details/documents events at a high level.

This was tested using `cargo make ready`.
  • Loading branch information
FelixMcFelix committed May 10, 2021
1 parent 1eed9dd commit e7af0ff
Show file tree
Hide file tree
Showing 14 changed files with 321 additions and 129 deletions.
20 changes: 10 additions & 10 deletions examples/serenity/voice_receive/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,35 +84,35 @@ impl VoiceEventHandler for Receiver {
speaking,
);
},
Ctx::SpeakingUpdate {ssrc, speaking} => {
Ctx::SpeakingUpdate(data) => {
// You can implement logic here which reacts to a user starting
// or stopping speaking.
println!(
"Source {} has {} speaking.",
ssrc,
if *speaking {"started"} else {"stopped"},
data.ssrc,
if data.speaking {"started"} else {"stopped"},
);
},
Ctx::VoicePacket {audio, packet, payload_offset, payload_end_pad} => {
Ctx::VoicePacket(data) => {
// An event which fires for every received audio packet,
// containing the decoded data.
if let Some(audio) = audio {
if let Some(audio) = data.audio {
println!("Audio packet's first 5 samples: {:?}", audio.get(..5.min(audio.len())));
println!(
"Audio packet sequence {:05} has {:04} bytes (decompressed from {}), SSRC {}",
packet.sequence.0,
data.packet.sequence.0,
audio.len() * std::mem::size_of::<i16>(),
packet.payload.len(),
packet.ssrc,
data.packet.payload.len(),
data.packet.ssrc,
);
} else {
println!("RTP packet, but no audio. Driver may not be configured to decode.");
}
},
Ctx::RtcpPacket {packet, payload_offset, payload_end_pad} => {
Ctx::RtcpPacket(data) => {
// An event which fires for every received rtcp packet,
// containing the call statistics and reporting information.
println!("RTCP packet received: {:?}", packet);
println!("RTCP packet received: {:?}", data.packet);
},
Ctx::ClientConnect(
ClientConnect {audio_ssrc, video_ssrc, user_id, ..}
Expand Down
2 changes: 2 additions & 0 deletions src/driver/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use ws::create_native_tls_client;

pub(crate) struct Connection {
pub(crate) info: ConnectionInfo,
pub(crate) ssrc: u32,
pub(crate) ws: Sender<WsMessage>,
}

Expand Down Expand Up @@ -219,6 +220,7 @@ impl Connection {

Ok(Connection {
info,
ssrc,
ws: ws_msg_tx,
})
}
Expand Down
36 changes: 24 additions & 12 deletions src/driver/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ pub(crate) mod udp_tx;
pub(crate) mod ws;

use super::connection::{error::Error as ConnectionError, Connection};
use crate::{events::CoreContext, Config};
use crate::{
events::{internal_data::InternalConnect, CoreContext},
Config,
};
use flume::{Receiver, RecvError, Sender};
use message::*;
#[cfg(not(feature = "tokio-02-marker"))]
Expand Down Expand Up @@ -78,9 +81,12 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
// Other side may not be listening: this is fine.
let _ = tx.send(Ok(()));

let _ = interconnect
.events
.send(EventMessage::FireCoreEvent(CoreContext::DriverConnect));
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverConnect(InternalConnect {
server: connection.info.endpoint.clone(),
ssrc: connection.ssrc,
}),
));

Some(connection)
},
Expand Down Expand Up @@ -164,10 +170,13 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
.ok();
}

if connection.is_some() {
let _ = interconnect
.events
.send(EventMessage::FireCoreEvent(CoreContext::DriverReconnect));
if let Some(ref connection) = &connection {
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverReconnect(InternalConnect {
server: connection.info.endpoint.clone(),
ssrc: connection.ssrc,
}),
));
}
}
},
Expand All @@ -186,10 +195,13 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
})
.ok();

if connection.is_some() {
let _ = interconnect
.events
.send(EventMessage::FireCoreEvent(CoreContext::DriverReconnect));
if let Some(ref connection) = &connection {
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverReconnect(InternalConnect {
server: connection.info.endpoint.clone(),
ssrc: connection.ssrc,
}),
));
}
},
Ok(CoreMessage::RebuildInterconnect) => {
Expand Down
35 changes: 21 additions & 14 deletions src/driver/tasks/udp_rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ use super::{
message::*,
Config,
};
use crate::{constants::*, driver::DecodeMode, events::CoreContext};
use crate::{
constants::*,
driver::DecodeMode,
events::{internal_data::*, CoreContext},
};
use audiopus::{
coder::Decoder as OpusDecoder,
error::{Error as OpusError, ErrorCode},
Expand Down Expand Up @@ -322,30 +326,30 @@ impl UdpRx {
match delta {
SpeakingDelta::Start => {
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::SpeakingUpdate {
CoreContext::SpeakingUpdate(InternalSpeakingUpdate {
ssrc: rtp.get_ssrc(),
speaking: true,
},
}),
));
},
SpeakingDelta::Stop => {
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::SpeakingUpdate {
CoreContext::SpeakingUpdate(InternalSpeakingUpdate {
ssrc: rtp.get_ssrc(),
speaking: false,
},
}),
));
},
_ => {},
}

let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::VoicePacket {
CoreContext::VoicePacket(InternalVoicePacket {
audio,
packet: rtp.from_packet(),
payload_offset: rtp_body_start,
payload_end_pad: rtp_body_tail,
},
}),
));
} else {
warn!("RTP decoding/processing failed.");
Expand All @@ -371,13 +375,16 @@ impl UdpRx {
)
});

let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::RtcpPacket {
packet: rtcp.from_packet(),
payload_offset: start,
payload_end_pad: tail,
},
));
let _ =
interconnect
.events
.send(EventMessage::FireCoreEvent(CoreContext::RtcpPacket(
InternalRtcpPacket {
packet: rtcp.from_packet(),
payload_offset: start,
payload_end_pad: tail,
},
)));
},
DemuxedMut::FailedParse(t) => {
warn!("Failed to parse message of type {:?}.", t);
Expand Down
21 changes: 21 additions & 0 deletions src/events/context/data/connect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/// Voice connection details gathered at setup/reinstantiation.
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
#[non_exhaustive]
pub struct ConnectData<'a> {
/// The domain name of Discord's voice/TURN server.
///
/// With the introduction of Discord's automatic voice server selection,
/// this is no longer guaranteed to match a server's settings. This field
/// may be useful if you need/wish to move your voice connection to a node/shard
/// closer to Discord.
pub server: &'a str,
/// The [RTP SSRC] *("Synchronisation source")* assigned by the voice server
/// for the duration of this call.
///
/// All packets sent will use this SSRC, which is not related to the sender's User
/// ID. These are usually allocated sequentially by Discord, following on from
/// a random starting SSRC.
///
/// [RTP SSRC]: https://tools.ietf.org/html/rfc3550#section-3
pub ssrc: u32,
}
11 changes: 11 additions & 0 deletions src/events/context/data/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//! Types containing the main body of an [`EventContext`].
//!
//! [`EventContext`]: super::EventContext
mod connect;
mod rtcp;
mod speaking;
mod voice;

use discortp::{rtcp::Rtcp, rtp::Rtp};

pub use self::{connect::*, rtcp::*, speaking::*, voice::*};
15 changes: 15 additions & 0 deletions src/events/context/data/rtcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use super::*;

#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
/// Telemetry/statistics packet, received from another stream (detailed in `packet`).
/// `payload_offset` contains the true payload location within the raw packet's `payload()`,
/// to allow manual decoding of `Rtcp` packet bodies.
pub struct RtcpData<'a> {
/// Raw RTCP packet data.
pub packet: &'a Rtcp,
/// Byte index into the packet body (after headers) for where the payload begins.
pub payload_offset: usize,
/// Number of bytes at the end of the packet to discard.
pub payload_end_pad: usize,
}
14 changes: 14 additions & 0 deletions src/events/context/data/speaking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
#[non_exhaustive]
/// Speaking state transition, describing whether a given source has started/stopped
/// transmitting. This fires in response to a silent burst, or the first packet
/// breaking such a burst.
pub struct SpeakingUpdateData {
/// Whether this user is currently speaking.
pub speaking: bool,
/// Synchronisation Source of the user who has begun speaking.
///
/// This must be combined with another event class to map this back to
/// its original UserId.
pub ssrc: u32,
}
20 changes: 20 additions & 0 deletions src/events/context/data/voice.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use super::*;

#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
/// Opus audio packet, received from another stream (detailed in `packet`).
/// `payload_offset` contains the true payload location within the raw packet's `payload()`,
/// if extensions or raw packet data are required.
/// If `audio.len() == 0`, then this packet arrived out-of-order.
pub struct VoiceData<'a> {
/// Decoded audio from this packet.
pub audio: &'a Option<Vec<i16>>,
/// Raw RTP packet data.
///
/// Includes the SSRC (i.e., sender) of this packet.
pub packet: &'a Rtp,
/// Byte index into the packet body (after headers) for where the payload begins.
pub payload_offset: usize,
/// Number of bytes at the end of the packet to discard.
pub payload_end_pad: usize,
}
68 changes: 68 additions & 0 deletions src/events/context/internal_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use super::context_data::*;
use discortp::{rtcp::Rtcp, rtp::Rtp};

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct InternalConnect {
pub server: String,
pub ssrc: u32,
}

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct InternalSpeakingUpdate {
pub ssrc: u32,
pub speaking: bool,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct InternalVoicePacket {
pub audio: Option<Vec<i16>>,
pub packet: Rtp,
pub payload_offset: usize,
pub payload_end_pad: usize,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct InternalRtcpPacket {
pub packet: Rtcp,
pub payload_offset: usize,
pub payload_end_pad: usize,
}

impl<'a> From<&'a InternalConnect> for ConnectData<'a> {
fn from(val: &'a InternalConnect) -> Self {
Self {
server: &val.server,
ssrc: val.ssrc,
}
}
}

impl<'a> From<&'a InternalSpeakingUpdate> for SpeakingUpdateData {
fn from(val: &'a InternalSpeakingUpdate) -> Self {
Self {
speaking: val.speaking,
ssrc: val.ssrc,
}
}
}

impl<'a> From<&'a InternalVoicePacket> for VoiceData<'a> {
fn from(val: &'a InternalVoicePacket) -> Self {
Self {
audio: &val.audio,
packet: &val.packet,
payload_offset: val.payload_offset,
payload_end_pad: val.payload_end_pad,
}
}
}

impl<'a> From<&'a InternalRtcpPacket> for RtcpData<'a> {
fn from(val: &'a InternalRtcpPacket) -> Self {
Self {
packet: &val.packet,
payload_offset: val.payload_offset,
payload_end_pad: val.payload_end_pad,
}
}
}
Loading

0 comments on commit e7af0ff

Please sign in to comment.