From 2c7d23ad652fdea7bfee2a4d17bec5fd2d19097a Mon Sep 17 00:00:00 2001 From: Cassy343 Date: Wed, 22 Jun 2022 21:22:13 -0400 Subject: [PATCH 1/8] Improve sequencer implementation --- src/sequence.rs | 47 ++++++++++++++++------------------------------- 1 file changed, 16 insertions(+), 31 deletions(-) diff --git a/src/sequence.rs b/src/sequence.rs index 4392fcd..cfee20f 100644 --- a/src/sequence.rs +++ b/src/sequence.rs @@ -1,5 +1,8 @@ use std::fmt; -use std::sync::{Arc, Mutex}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; /// Sequencer generates sequential sequence numbers for building RTP packets pub trait Sequencer: fmt::Debug { @@ -17,55 +20,37 @@ impl Clone for Box { /// NewRandomSequencer returns a new sequencer starting from a random sequence /// number pub fn new_random_sequencer() -> impl Sequencer { - let c = Counters { - sequence_number: rand::random::(), - roll_over_count: 0, - }; - SequencerImpl(Arc::new(Mutex::new(c))) + new_fixed_sequencer(rand::random::()) } /// NewFixedSequencer returns a new sequencer starting from a specific /// sequence number pub fn new_fixed_sequencer(s: u16) -> impl Sequencer { - let sequence_number = if s == 0 { u16::MAX } else { s - 1 }; - - let c = Counters { - sequence_number, - roll_over_count: 0, - }; - - SequencerImpl(Arc::new(Mutex::new(c))) + SequencerImpl { + count: Arc::new(AtomicU64::new(u64::from(s))), + } } #[derive(Debug, Clone)] -struct SequencerImpl(Arc>); - -#[derive(Debug)] -struct Counters { - sequence_number: u16, - roll_over_count: u64, +struct SequencerImpl { + // The most significant 48 bits store the number of roll overs, and the lower 16 bits store the + // next sequence number. + // If we gave out one sequence number per nanosecond, then we'd need to give out sequence + // numbers for almost 600 years before we run out of roll overs. + count: Arc, } impl Sequencer for SequencerImpl { /// NextSequenceNumber increment and returns a new sequence number for /// building RTP packets fn next_sequence_number(&self) -> u16 { - let mut lock = self.0.lock().unwrap(); - - if lock.sequence_number == u16::MAX { - lock.roll_over_count += 1; - lock.sequence_number = 0; - } else { - lock.sequence_number += 1; - } - - lock.sequence_number + (self.count.fetch_add(1, Ordering::Release) & 0xFFFF) as u16 } /// RollOverCount returns the amount of times the 16bit sequence number /// has wrapped fn roll_over_count(&self) -> u64 { - self.0.lock().unwrap().roll_over_count + self.count.load(Ordering::Acquire).overflowing_shr(16).0 } fn clone_to(&self) -> Box { From f64462c388a4a44a2412d3578a71310252fdc15b Mon Sep 17 00:00:00 2001 From: Cassy343 Date: Wed, 22 Jun 2022 22:36:22 -0400 Subject: [PATCH 2/8] Small updates --- src/error.rs | 8 +++----- src/lib.rs | 3 +-- src/packet/packet_test.rs | 2 ++ 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/error.rs b/src/error.rs index 8e5895c..3c8b2a0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -70,10 +70,8 @@ impl From for util::Error { impl PartialEq for Error { fn eq(&self, other: &util::Error) -> bool { - if let Some(down) = other.downcast_ref::() { - self == down - } else { - false - } + other.downcast_ref::() + .map(|down| self == down) + .unwrap_or_default() } } diff --git a/src/lib.rs b/src/lib.rs index 18066e7..ba34d2a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,4 @@ -#![warn(rust_2018_idioms)] -#![allow(dead_code)] +#![deny(rust_2018_idioms)] pub mod codecs; mod error; diff --git a/src/packet/packet_test.rs b/src/packet/packet_test.rs index 2ec053b..352385d 100644 --- a/src/packet/packet_test.rs +++ b/src/packet/packet_test.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use super::*; use crate::error::Result; use bytes::{Bytes, BytesMut}; From 49141febc1a8c1fef9c8d3147b84ae7e3f4ecc23 Mon Sep 17 00:00:00 2001 From: Cassy343 Date: Wed, 22 Jun 2022 22:59:18 -0400 Subject: [PATCH 3/8] Remove unnecessary async --- Cargo.toml | 11 ++++------- src/packetizer/mod.rs | 27 +++++---------------------- src/packetizer/packetizer_test.rs | 30 +++++++++++++----------------- 3 files changed, 22 insertions(+), 46 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3e070c6..085cf81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,17 +10,14 @@ homepage = "https://webrtc.rs" repository = "https://github.com/webrtc-rs/rtp" [dependencies] -util = { package = "webrtc-util", version = "0.5.4", default-features = false, features = ["marshal"] } -bytes = "1" -rand = "0.8.5" -thiserror = "1.0" -async-trait = "0.1.56" +util = { package = "webrtc-util", version = "0.5.3", default-features = false, features = ["marshal"] } +bytes = "1.1.0" +rand = "0.8.4" +thiserror = "1.0.30" [dev-dependencies] chrono = "0.4.19" criterion = "0.3.5" -tokio = { version = "1.19", features = ["full"] } -tokio-test = "0.4.0" # must match the min version of the `tokio` crate above [[bench]] name = "packet_bench" diff --git a/src/packetizer/mod.rs b/src/packetizer/mod.rs index 82e52cb..a5b87f8 100644 --- a/src/packetizer/mod.rs +++ b/src/packetizer/mod.rs @@ -5,12 +5,8 @@ use crate::error::Result; use crate::{extension::abs_send_time_extension::*, header::*, packet::*, sequence::*}; use util::marshal::{Marshal, MarshalSize}; -use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use std::fmt; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; use std::time::SystemTime; /// Payloader payloads a byte array for use as rtp.Packet payloads @@ -26,10 +22,9 @@ impl Clone for Box { } /// Packetizer packetizes a payload -#[async_trait] pub trait Packetizer: fmt::Debug { fn enable_abs_send_time(&mut self, value: u8); - async fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result>; + fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result>; fn skip_samples(&mut self, skipped_samples: u32); fn clone_to(&self) -> Box; } @@ -54,12 +49,6 @@ pub trait Depacketizer { fn is_partition_tail(&self, marker: bool, payload: &Bytes) -> bool; } -//TODO: SystemTime vs Instant? -// non-monotonic clock vs monotonically non-decreasing clock -/// FnTimeGen provides current SystemTime -pub type FnTimeGen = - Arc Pin + Send + 'static>>) + Send + Sync>; - #[derive(Clone)] pub(crate) struct PacketizerImpl { pub(crate) mtu: usize, @@ -70,7 +59,7 @@ pub(crate) struct PacketizerImpl { pub(crate) timestamp: u32, pub(crate) clock_rate: u32, pub(crate) abs_send_time: u8, //http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time - pub(crate) time_gen: Option, + pub(crate) time_gen: fn() -> SystemTime, } impl fmt::Debug for PacketizerImpl { @@ -103,17 +92,16 @@ pub fn new_packetizer( timestamp: rand::random::(), clock_rate, abs_send_time: 0, - time_gen: None, + time_gen: SystemTime::now } } -#[async_trait] impl Packetizer for PacketizerImpl { fn enable_abs_send_time(&mut self, value: u8) { self.abs_send_time = value } - async fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result> { + fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result> { let payloads = self.payloader.payload(self.mtu - 12, payload)?; let payloads_len = payloads.len(); let mut packets = Vec::with_capacity(payloads_len); @@ -137,12 +125,7 @@ impl Packetizer for PacketizerImpl { self.timestamp = self.timestamp.wrapping_add(samples); if payloads_len != 0 && self.abs_send_time != 0 { - let st = if let Some(fn_time_gen) = &self.time_gen { - fn_time_gen().await - } else { - SystemTime::now() - }; - let send_time = AbsSendTimeExtension::new(st); + let send_time = AbsSendTimeExtension::new((self.time_gen)()); //apply http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time let mut raw = BytesMut::with_capacity(send_time.marshal_size()); raw.resize(send_time.marshal_size(), 0); diff --git a/src/packetizer/packetizer_test.rs b/src/packetizer/packetizer_test.rs index b9e6f21..bf96f07 100644 --- a/src/packetizer/packetizer_test.rs +++ b/src/packetizer/packetizer_test.rs @@ -5,15 +5,15 @@ use crate::error::Result; use chrono::prelude::*; use std::time::{Duration, UNIX_EPOCH}; -#[tokio::test] -async fn test_packetizer() -> Result<()> { +#[test] +fn test_packetizer() -> Result<()> { let multiple_payload = Bytes::from_static(&[0; 128]); let g722 = Box::new(g7xx::G722Payloader {}); let seq = Box::new(new_random_sequencer()); //use the G722 payloader here, because it's very simple and all 0s is valid G722 data. let mut packetizer = new_packetizer(100, 98, 0x1234ABCD, g722, seq, 90000); - let packets = packetizer.packetize(&multiple_payload, 2000).await?; + let packets = packetizer.packetize(&multiple_payload, 2000)?; if packets.len() != 2 { let mut packet_lengths = String::new(); @@ -31,22 +31,18 @@ async fn test_packetizer() -> Result<()> { Ok(()) } -#[tokio::test] -async fn test_packetizer_abs_send_time() -> Result<()> { +#[test] +fn test_packetizer_abs_send_time() -> Result<()> { let g722 = Box::new(g7xx::G722Payloader {}); let sequencer = Box::new(new_fixed_sequencer(1234)); - let time_gen: Option = Some(Arc::new( - || -> Pin + Send + 'static>> { - Box::pin(async move { - let loc = FixedOffset::west(5 * 60 * 60); // UTC-5 + let time_gen = || { + let loc = FixedOffset::west(5 * 60 * 60); // UTC-5 let t = loc.ymd(1985, 6, 23).and_hms_nano(4, 0, 0, 0); UNIX_EPOCH .checked_add(Duration::from_nanos(t.timestamp_nanos() as u64)) .unwrap_or(UNIX_EPOCH) - }) - }, - )); + }; //use the G722 payloader here, because it's very simple and all 0s is valid G722 data. let mut pktizer = PacketizerImpl { @@ -63,7 +59,7 @@ async fn test_packetizer_abs_send_time() -> Result<()> { pktizer.enable_abs_send_time(1); let payload = Bytes::from_static(&[0x11, 0x12, 0x13, 0x14]); - let packets = pktizer.packetize(&payload, 2000).await?; + let packets = pktizer.packetize(&payload, 2000)?; let expected = Packet { header: Header { @@ -94,17 +90,17 @@ async fn test_packetizer_abs_send_time() -> Result<()> { Ok(()) } -#[tokio::test] -async fn test_packetizer_timestamp_rollover_does_not_panic() -> Result<()> { +#[test] +fn test_packetizer_timestamp_rollover_does_not_panic() -> Result<()> { let g722 = Box::new(g7xx::G722Payloader {}); let seq = Box::new(new_random_sequencer()); let payload = Bytes::from_static(&[0; 128]); let mut packetizer = new_packetizer(100, 98, 0x1234ABCD, g722, seq, 90000); - packetizer.packetize(&payload, 10).await?; + packetizer.packetize(&payload, 10)?; - packetizer.packetize(&payload, u32::MAX).await?; + packetizer.packetize(&payload, u32::MAX)?; packetizer.skip_samples(u32::MAX); From 7538093158db0d9f57b2f437bbc1130d9d3e8dee Mon Sep 17 00:00:00 2001 From: Cassy343 Date: Wed, 22 Jun 2022 23:29:37 -0400 Subject: [PATCH 4/8] Remove dynamic dispatch --- src/codecs/g7xx/mod.rs | 4 -- src/codecs/h264/mod.rs | 4 -- src/codecs/opus/mod.rs | 4 -- src/codecs/vp8/mod.rs | 4 -- src/codecs/vp9/mod.rs | 67 +++++++++++-------------------- src/codecs/vp9/vp9_test.rs | 11 ++--- src/error.rs | 3 +- src/packetizer/mod.rs | 36 +++++------------ src/packetizer/packetizer_test.rs | 20 ++++----- src/sequence.rs | 11 ----- 10 files changed, 48 insertions(+), 116 deletions(-) diff --git a/src/codecs/g7xx/mod.rs b/src/codecs/g7xx/mod.rs index d8ddff1..3881253 100644 --- a/src/codecs/g7xx/mod.rs +++ b/src/codecs/g7xx/mod.rs @@ -36,8 +36,4 @@ impl Payloader for G7xxPayloader { Ok(payloads) } - - fn clone_to(&self) -> Box { - Box::new(*self) - } } diff --git a/src/codecs/h264/mod.rs b/src/codecs/h264/mod.rs index 7c4cca0..06482bd 100644 --- a/src/codecs/h264/mod.rs +++ b/src/codecs/h264/mod.rs @@ -192,10 +192,6 @@ impl Payloader for H264Payloader { Ok(payloads) } - - fn clone_to(&self) -> Box { - Box::new(self.clone()) - } } /// H264Packet represents the H264 header that is stored in the payload of an RTP Packet diff --git a/src/codecs/opus/mod.rs b/src/codecs/opus/mod.rs index 731f470..12bb0af 100644 --- a/src/codecs/opus/mod.rs +++ b/src/codecs/opus/mod.rs @@ -19,10 +19,6 @@ impl Payloader for OpusPayloader { Ok(vec![payload.clone()]) } - - fn clone_to(&self) -> Box { - Box::new(*self) - } } /// OpusPacket represents the Opus header that is stored in the payload of an RTP Packet diff --git a/src/codecs/vp8/mod.rs b/src/codecs/vp8/mod.rs index f7b003e..f43a1c6 100644 --- a/src/codecs/vp8/mod.rs +++ b/src/codecs/vp8/mod.rs @@ -104,10 +104,6 @@ impl Payloader for Vp8Payloader { Ok(payloads) } - - fn clone_to(&self) -> Box { - Box::new(*self) - } } /// Vp8Packet represents the VP8 header that is stored in the payload of an RTP Packet diff --git a/src/codecs/vp9/mod.rs b/src/codecs/vp9/mod.rs index 3132e3d..5e68ce0 100644 --- a/src/codecs/vp9/mod.rs +++ b/src/codecs/vp9/mod.rs @@ -7,32 +7,30 @@ use crate::{ }; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use std::fmt; -use std::sync::Arc; /// Flexible mode 15 bit picture ID const VP9HEADER_SIZE: usize = 3; const MAX_SPATIAL_LAYERS: u8 = 5; const MAX_VP9REF_PICS: usize = 3; -/// InitialPictureIDFn is a function that returns random initial picture ID. -pub type InitialPictureIDFn = Arc u16) + Send + Sync>; - /// Vp9Payloader payloads VP9 packets -#[derive(Default, Clone)] +#[derive(Clone, Debug)] pub struct Vp9Payloader { picture_id: u16, - initialized: bool, - - pub initial_picture_id_fn: Option, } -impl fmt::Debug for Vp9Payloader { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Vp9Payloader") - .field("picture_id", &self.picture_id) - .field("initialized", &self.initialized) - .finish() +impl Vp9Payloader { + pub fn new() -> Self { + Self::new_with(|| rand::random::() & 0x7FFF) + } + + pub fn new_with(init_picture_id: F) -> Self + where + F: FnOnce() -> u16, + { + Self { + picture_id: init_picture_id(), + } } } @@ -81,19 +79,6 @@ impl Payloader for Vp9Payloader { return Ok(vec![]); } - if !self.initialized { - if self.initial_picture_id_fn.is_none() { - self.initial_picture_id_fn = - Some(Arc::new(|| -> u16 { rand::random::() & 0x7FFF })); - } - self.picture_id = if let Some(f) = &self.initial_picture_id_fn { - f() - } else { - 0 - }; - self.initialized = true; - } - let max_fragment_size = mtu as isize - VP9HEADER_SIZE as isize; let mut payloads = vec![]; let mut payload_data_remaining = payload.len(); @@ -135,10 +120,6 @@ impl Payloader for Vp9Payloader { Ok(payloads) } - - fn clone_to(&self) -> Box { - Box::new(self.clone()) - } } /// Vp9Packet represents the VP9 header that is stored in the payload of an RTP Packet @@ -263,9 +244,9 @@ impl Vp9Packet { // M: | EXTENDED PID | // +-+-+-+-+-+-+-+-+ // - fn parse_picture_id( + fn parse_picture_id( &mut self, - reader: &mut dyn Buf, + reader: &mut B, mut payload_index: usize, ) -> Result { if reader.remaining() == 0 { @@ -288,9 +269,9 @@ impl Vp9Packet { Ok(payload_index) } - fn parse_layer_info( + fn parse_layer_info( &mut self, - reader: &mut dyn Buf, + reader: &mut B, mut payload_index: usize, ) -> Result { payload_index = self.parse_layer_info_common(reader, payload_index)?; @@ -308,9 +289,9 @@ impl Vp9Packet { // L: | T |U| S |D| // +-+-+-+-+-+-+-+-+ // - fn parse_layer_info_common( + fn parse_layer_info_common( &mut self, - reader: &mut dyn Buf, + reader: &mut B, mut payload_index: usize, ) -> Result { if reader.remaining() == 0 { @@ -339,9 +320,9 @@ impl Vp9Packet { // | tl0picidx | // +-+-+-+-+-+-+-+-+ // - fn parse_layer_info_non_flexible_mode( + fn parse_layer_info_non_flexible_mode( &mut self, - reader: &mut dyn Buf, + reader: &mut B, mut payload_index: usize, ) -> Result { if reader.remaining() == 0 { @@ -359,9 +340,9 @@ impl Vp9Packet { // +-+-+-+-+-+-+-+-+ N=1: An additional P_DIFF follows // current P_DIFF. // - fn parse_ref_indices( + fn parse_ref_indices( &mut self, - reader: &mut dyn Buf, + reader: &mut B, mut payload_index: usize, ) -> Result { let mut b = 1u8; @@ -401,7 +382,7 @@ impl Vp9Packet { // | P_DIFF | (OPTIONAL) . R times . // +-+-+-+-+-+-+-+-+ -| -| // - fn parse_ssdata(&mut self, reader: &mut dyn Buf, mut payload_index: usize) -> Result { + fn parse_ssdata(&mut self, reader: &mut B, mut payload_index: usize) -> Result { if reader.remaining() == 0 { return Err(Error::ErrShortPacket); } diff --git a/src/codecs/vp9/vp9_test.rs b/src/codecs/vp9/vp9_test.rs index 49a1481..0294f01 100644 --- a/src/codecs/vp9/vp9_test.rs +++ b/src/codecs/vp9/vp9_test.rs @@ -295,10 +295,7 @@ fn test_vp9_payloader_payload() -> Result<()> { ]; for (name, bs, mtu, expected) in tests { - let mut pck = Vp9Payloader { - initial_picture_id_fn: Some(Arc::new(|| -> u16 { 8692 })), - ..Default::default() - }; + let mut pck = Vp9Payloader::new_with(|| 8692); let mut actual = vec![]; for b in &bs { @@ -309,10 +306,8 @@ fn test_vp9_payloader_payload() -> Result<()> { //"PictureIDOverflow" { - let mut pck = Vp9Payloader { - initial_picture_id_fn: Some(Arc::new(|| -> u16 { 8692 })), - ..Default::default() - }; + let mut pck = Vp9Payloader::new_with(|| 8692); + let mut p_prev = Vp9Packet::default(); for i in 0..0x8000 { let res = pck.payload(4, &Bytes::from_static(&[0x01]))?; diff --git a/src/error.rs b/src/error.rs index 3c8b2a0..26749e3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -70,7 +70,8 @@ impl From for util::Error { impl PartialEq for Error { fn eq(&self, other: &util::Error) -> bool { - other.downcast_ref::() + other + .downcast_ref::() .map(|down| self == down) .unwrap_or_default() } diff --git a/src/packetizer/mod.rs b/src/packetizer/mod.rs index a5b87f8..9f565c8 100644 --- a/src/packetizer/mod.rs +++ b/src/packetizer/mod.rs @@ -12,13 +12,6 @@ use std::time::SystemTime; /// Payloader payloads a byte array for use as rtp.Packet payloads pub trait Payloader: fmt::Debug { fn payload(&mut self, mtu: usize, b: &Bytes) -> Result>; - fn clone_to(&self) -> Box; -} - -impl Clone for Box { - fn clone(&self) -> Box { - self.clone_to() - } } /// Packetizer packetizes a payload @@ -26,13 +19,6 @@ pub trait Packetizer: fmt::Debug { fn enable_abs_send_time(&mut self, value: u8); fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result>; fn skip_samples(&mut self, skipped_samples: u32); - fn clone_to(&self) -> Box; -} - -impl Clone for Box { - fn clone(&self) -> Box { - self.clone_to() - } } /// Depacketizer depacketizes a RTP payload, removing any RTP specific data from the payload @@ -50,19 +36,19 @@ pub trait Depacketizer { } #[derive(Clone)] -pub(crate) struct PacketizerImpl { +pub(crate) struct PacketizerImpl { pub(crate) mtu: usize, pub(crate) payload_type: u8, pub(crate) ssrc: u32, - pub(crate) payloader: Box, - pub(crate) sequencer: Box, + pub(crate) payloader: P, + pub(crate) sequencer: S, pub(crate) timestamp: u32, pub(crate) clock_rate: u32, pub(crate) abs_send_time: u8, //http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time pub(crate) time_gen: fn() -> SystemTime, } -impl fmt::Debug for PacketizerImpl { +impl fmt::Debug for PacketizerImpl { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("PacketizerImpl") .field("mtu", &self.mtu) @@ -75,12 +61,12 @@ impl fmt::Debug for PacketizerImpl { } } -pub fn new_packetizer( +pub fn new_packetizer( mtu: usize, payload_type: u8, ssrc: u32, - payloader: Box, - sequencer: Box, + payloader: P, + sequencer: S, clock_rate: u32, ) -> impl Packetizer { PacketizerImpl { @@ -92,11 +78,11 @@ pub fn new_packetizer( timestamp: rand::random::(), clock_rate, abs_send_time: 0, - time_gen: SystemTime::now + time_gen: SystemTime::now, } } -impl Packetizer for PacketizerImpl { +impl Packetizer for PacketizerImpl { fn enable_abs_send_time(&mut self, value: u8) { self.abs_send_time = value } @@ -143,8 +129,4 @@ impl Packetizer for PacketizerImpl { fn skip_samples(&mut self, skipped_samples: u32) { self.timestamp = self.timestamp.wrapping_add(skipped_samples); } - - fn clone_to(&self) -> Box { - Box::new(self.clone()) - } } diff --git a/src/packetizer/packetizer_test.rs b/src/packetizer/packetizer_test.rs index bf96f07..f8efe1b 100644 --- a/src/packetizer/packetizer_test.rs +++ b/src/packetizer/packetizer_test.rs @@ -8,8 +8,8 @@ use std::time::{Duration, UNIX_EPOCH}; #[test] fn test_packetizer() -> Result<()> { let multiple_payload = Bytes::from_static(&[0; 128]); - let g722 = Box::new(g7xx::G722Payloader {}); - let seq = Box::new(new_random_sequencer()); + let g722 = g7xx::G722Payloader {}; + let seq = new_random_sequencer(); //use the G722 payloader here, because it's very simple and all 0s is valid G722 data. let mut packetizer = new_packetizer(100, 98, 0x1234ABCD, g722, seq, 90000); @@ -33,15 +33,15 @@ fn test_packetizer() -> Result<()> { #[test] fn test_packetizer_abs_send_time() -> Result<()> { - let g722 = Box::new(g7xx::G722Payloader {}); - let sequencer = Box::new(new_fixed_sequencer(1234)); + let g722 = g7xx::G722Payloader {}; + let sequencer = new_fixed_sequencer(1234); let time_gen = || { let loc = FixedOffset::west(5 * 60 * 60); // UTC-5 - let t = loc.ymd(1985, 6, 23).and_hms_nano(4, 0, 0, 0); - UNIX_EPOCH - .checked_add(Duration::from_nanos(t.timestamp_nanos() as u64)) - .unwrap_or(UNIX_EPOCH) + let t = loc.ymd(1985, 6, 23).and_hms_nano(4, 0, 0, 0); + UNIX_EPOCH + .checked_add(Duration::from_nanos(t.timestamp_nanos() as u64)) + .unwrap_or(UNIX_EPOCH) }; //use the G722 payloader here, because it's very simple and all 0s is valid G722 data. @@ -92,8 +92,8 @@ fn test_packetizer_abs_send_time() -> Result<()> { #[test] fn test_packetizer_timestamp_rollover_does_not_panic() -> Result<()> { - let g722 = Box::new(g7xx::G722Payloader {}); - let seq = Box::new(new_random_sequencer()); + let g722 = g7xx::G722Payloader {}; + let seq = new_random_sequencer(); let payload = Bytes::from_static(&[0; 128]); let mut packetizer = new_packetizer(100, 98, 0x1234ABCD, g722, seq, 90000); diff --git a/src/sequence.rs b/src/sequence.rs index cfee20f..c314b66 100644 --- a/src/sequence.rs +++ b/src/sequence.rs @@ -8,13 +8,6 @@ use std::sync::{ pub trait Sequencer: fmt::Debug { fn next_sequence_number(&self) -> u16; fn roll_over_count(&self) -> u64; - fn clone_to(&self) -> Box; -} - -impl Clone for Box { - fn clone(&self) -> Box { - self.clone_to() - } } /// NewRandomSequencer returns a new sequencer starting from a random sequence @@ -52,8 +45,4 @@ impl Sequencer for SequencerImpl { fn roll_over_count(&self) -> u64 { self.count.load(Ordering::Acquire).overflowing_shr(16).0 } - - fn clone_to(&self) -> Box { - Box::new(self.clone()) - } } From 10777995df92a0f72ea6f5e57f1479c5c54b3f84 Mon Sep 17 00:00:00 2001 From: Cassy343 Date: Thu, 30 Jun 2022 16:32:55 -0400 Subject: [PATCH 5/8] Improve sequencer and error size --- src/error.rs | 13 ++++-- src/packetizer/packetizer_test.rs | 6 +-- src/sequence.rs | 66 +++++++++++++++---------------- 3 files changed, 45 insertions(+), 40 deletions(-) diff --git a/src/error.rs b/src/error.rs index 26749e3..1c131c9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -55,11 +55,18 @@ pub enum Error { StapASizeLargerThanBuffer(usize, usize), #[error("nalu type {0} is currently not handled")] NaluTypeIsNotHandled(u8), - #[error("{0}")] - Util(#[from] util::Error), + // We box the util::Error and use a Box to keep the size of this error to 24 bytes + #[error("{0}")] + Util(Box), #[error("{0}")] - Other(String), + Other(Box), +} + +impl From for Error { + fn from(e: util::Error) -> Self { + Self::Util(Box::new(e)) + } } impl From for util::Error { diff --git a/src/packetizer/packetizer_test.rs b/src/packetizer/packetizer_test.rs index f8efe1b..4134b77 100644 --- a/src/packetizer/packetizer_test.rs +++ b/src/packetizer/packetizer_test.rs @@ -9,7 +9,7 @@ use std::time::{Duration, UNIX_EPOCH}; fn test_packetizer() -> Result<()> { let multiple_payload = Bytes::from_static(&[0; 128]); let g722 = g7xx::G722Payloader {}; - let seq = new_random_sequencer(); + let seq = WrappingSequencer::new_random(); //use the G722 payloader here, because it's very simple and all 0s is valid G722 data. let mut packetizer = new_packetizer(100, 98, 0x1234ABCD, g722, seq, 90000); @@ -34,7 +34,7 @@ fn test_packetizer() -> Result<()> { #[test] fn test_packetizer_abs_send_time() -> Result<()> { let g722 = g7xx::G722Payloader {}; - let sequencer = new_fixed_sequencer(1234); + let sequencer = WrappingSequencer::new(1234); let time_gen = || { let loc = FixedOffset::west(5 * 60 * 60); // UTC-5 @@ -93,7 +93,7 @@ fn test_packetizer_abs_send_time() -> Result<()> { #[test] fn test_packetizer_timestamp_rollover_does_not_panic() -> Result<()> { let g722 = g7xx::G722Payloader {}; - let seq = new_random_sequencer(); + let seq = WrappingSequencer::new_random(); let payload = Bytes::from_static(&[0; 128]); let mut packetizer = new_packetizer(100, 98, 0x1234ABCD, g722, seq, 90000); diff --git a/src/sequence.rs b/src/sequence.rs index c314b66..9d286a6 100644 --- a/src/sequence.rs +++ b/src/sequence.rs @@ -1,48 +1,46 @@ -use std::fmt; -use std::sync::{ - atomic::{AtomicU64, Ordering}, - Arc, -}; - /// Sequencer generates sequential sequence numbers for building RTP packets -pub trait Sequencer: fmt::Debug { - fn next_sequence_number(&self) -> u16; - fn roll_over_count(&self) -> u64; +pub trait Sequencer { + fn next_sequence_number(&mut self) -> u16; } -/// NewRandomSequencer returns a new sequencer starting from a random sequence -/// number -pub fn new_random_sequencer() -> impl Sequencer { - new_fixed_sequencer(rand::random::()) +#[derive(Debug, Clone)] +pub struct WrappingSequencer { + next_sequence_number: u16, + roll_over_count: u64, } -/// NewFixedSequencer returns a new sequencer starting from a specific -/// sequence number -pub fn new_fixed_sequencer(s: u16) -> impl Sequencer { - SequencerImpl { - count: Arc::new(AtomicU64::new(u64::from(s))), +impl WrappingSequencer { + /// Returns a new sequencer starting from a specific sequence number. + pub fn new(init_sequence_number: u16) -> Self { + Self { + next_sequence_number: init_sequence_number, + roll_over_count: 0 + } } -} -#[derive(Debug, Clone)] -struct SequencerImpl { - // The most significant 48 bits store the number of roll overs, and the lower 16 bits store the - // next sequence number. - // If we gave out one sequence number per nanosecond, then we'd need to give out sequence - // numbers for almost 600 years before we run out of roll overs. - count: Arc, + /// Returns a new sequencer starting from a random sequence number. + pub fn new_random() -> Self { + Self::new(rand::random::()) + } + + /// RollOverCount returns the amount of times the 16bit sequence number + /// has wrapped + pub fn roll_over_count(&self) -> u64 { + self.roll_over_count + } } -impl Sequencer for SequencerImpl { +impl Sequencer for WrappingSequencer { /// NextSequenceNumber increment and returns a new sequence number for /// building RTP packets - fn next_sequence_number(&self) -> u16 { - (self.count.fetch_add(1, Ordering::Release) & 0xFFFF) as u16 - } + fn next_sequence_number(&mut self) -> u16 { + let next_sequence_number = self.next_sequence_number; + self.next_sequence_number = self.next_sequence_number.wrapping_add(1); - /// RollOverCount returns the amount of times the 16bit sequence number - /// has wrapped - fn roll_over_count(&self) -> u64 { - self.count.load(Ordering::Acquire).overflowing_shr(16).0 + if self.next_sequence_number == 0 { + self.roll_over_count += 1; + } + + next_sequence_number } } From dac3f1660bc4f2eecf8af5c673164b3e0f0b8d8f Mon Sep 17 00:00:00 2001 From: Cassy343 Date: Mon, 11 Jul 2022 21:00:50 -0400 Subject: [PATCH 6/8] Make packetizer type public --- src/packetizer/mod.rs | 10 +++++----- src/packetizer/packetizer_test.rs | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/packetizer/mod.rs b/src/packetizer/mod.rs index 9f565c8..ea7fcca 100644 --- a/src/packetizer/mod.rs +++ b/src/packetizer/mod.rs @@ -36,7 +36,7 @@ pub trait Depacketizer { } #[derive(Clone)] -pub(crate) struct PacketizerImpl { +pub struct RtpPacketizer { pub(crate) mtu: usize, pub(crate) payload_type: u8, pub(crate) ssrc: u32, @@ -48,7 +48,7 @@ pub(crate) struct PacketizerImpl { pub(crate) time_gen: fn() -> SystemTime, } -impl fmt::Debug for PacketizerImpl { +impl fmt::Debug for RtpPacketizer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("PacketizerImpl") .field("mtu", &self.mtu) @@ -68,8 +68,8 @@ pub fn new_packetizer( payloader: P, sequencer: S, clock_rate: u32, -) -> impl Packetizer { - PacketizerImpl { +) -> RtpPacketizer { + RtpPacketizer { mtu, payload_type, ssrc, @@ -82,7 +82,7 @@ pub fn new_packetizer( } } -impl Packetizer for PacketizerImpl { +impl Packetizer for RtpPacketizer { fn enable_abs_send_time(&mut self, value: u8) { self.abs_send_time = value } diff --git a/src/packetizer/packetizer_test.rs b/src/packetizer/packetizer_test.rs index 4134b77..c8df8b7 100644 --- a/src/packetizer/packetizer_test.rs +++ b/src/packetizer/packetizer_test.rs @@ -45,7 +45,7 @@ fn test_packetizer_abs_send_time() -> Result<()> { }; //use the G722 payloader here, because it's very simple and all 0s is valid G722 data. - let mut pktizer = PacketizerImpl { + let mut pktizer = RtpPacketizer { mtu: 100, payload_type: 98, ssrc: 0x1234ABCD, From 8b4116b7414b3edb68a59f9d38156d8d7a9b79a3 Mon Sep 17 00:00:00 2001 From: Cassy343 Date: Thu, 14 Jul 2022 21:12:28 -0400 Subject: [PATCH 7/8] Resolve first round of feedback --- Cargo.toml | 8 ++++---- src/codecs/vp9/mod.rs | 15 +++++++++------ src/codecs/vp9/vp9_test.rs | 4 ++-- src/error.rs | 2 +- src/packetizer/mod.rs | 2 ++ 5 files changed, 18 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 085cf81..6241e65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,10 +10,10 @@ homepage = "https://webrtc.rs" repository = "https://github.com/webrtc-rs/rtp" [dependencies] -util = { package = "webrtc-util", version = "0.5.3", default-features = false, features = ["marshal"] } -bytes = "1.1.0" -rand = "0.8.4" -thiserror = "1.0.30" +util = { package = "webrtc-util", version = "0.5.4", default-features = false, features = ["marshal"] } +bytes = "1" +rand = "0.8.5" +thiserror = "1.0" [dev-dependencies] chrono = "0.4.19" diff --git a/src/codecs/vp9/mod.rs b/src/codecs/vp9/mod.rs index 5e68ce0..2d13d93 100644 --- a/src/codecs/vp9/mod.rs +++ b/src/codecs/vp9/mod.rs @@ -21,19 +21,22 @@ pub struct Vp9Payloader { impl Vp9Payloader { pub fn new() -> Self { - Self::new_with(|| rand::random::() & 0x7FFF) + Self::new_with_id(rand::random::() & 0x7FFF) } - pub fn new_with(init_picture_id: F) -> Self - where - F: FnOnce() -> u16, - { + pub fn new_with_id(init_picture_id: u16) -> Self { Self { - picture_id: init_picture_id(), + picture_id: init_picture_id, } } } +impl Default for Vp9Payloader { + fn default() -> Self { + Self::new() + } +} + impl Payloader for Vp9Payloader { /// Payload fragments an Vp9Payloader packet across one or more byte arrays fn payload(&mut self, mtu: usize, payload: &Bytes) -> Result> { diff --git a/src/codecs/vp9/vp9_test.rs b/src/codecs/vp9/vp9_test.rs index 0294f01..251fd3e 100644 --- a/src/codecs/vp9/vp9_test.rs +++ b/src/codecs/vp9/vp9_test.rs @@ -295,7 +295,7 @@ fn test_vp9_payloader_payload() -> Result<()> { ]; for (name, bs, mtu, expected) in tests { - let mut pck = Vp9Payloader::new_with(|| 8692); + let mut pck = Vp9Payloader::new_with_id(8692); let mut actual = vec![]; for b in &bs { @@ -306,7 +306,7 @@ fn test_vp9_payloader_payload() -> Result<()> { //"PictureIDOverflow" { - let mut pck = Vp9Payloader::new_with(|| 8692); + let mut pck = Vp9Payloader::new_with_id(8692); let mut p_prev = Vp9Packet::default(); for i in 0..0x8000 { diff --git a/src/error.rs b/src/error.rs index 1c131c9..0d25314 100644 --- a/src/error.rs +++ b/src/error.rs @@ -80,6 +80,6 @@ impl PartialEq for Error { other .downcast_ref::() .map(|down| self == down) - .unwrap_or_default() + .unwrap_or(false) } } diff --git a/src/packetizer/mod.rs b/src/packetizer/mod.rs index ea7fcca..4e10fd5 100644 --- a/src/packetizer/mod.rs +++ b/src/packetizer/mod.rs @@ -45,6 +45,8 @@ pub struct RtpPacketizer { pub(crate) timestamp: u32, pub(crate) clock_rate: u32, pub(crate) abs_send_time: u8, //http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time + // TODO: SystemTime is not monotonic, but Instant doesn't work because it can't be converted + // to a unix timestamp. We should probably look for a way to have the best of both worlds. pub(crate) time_gen: fn() -> SystemTime, } From 66e13897abd136f638c23021646663ebc4e24a70 Mon Sep 17 00:00:00 2001 From: Cassy343 Date: Fri, 15 Jul 2022 14:44:15 -0400 Subject: [PATCH 8/8] Fix debug impl and cargo fmt --- src/packetizer/mod.rs | 2 +- src/sequence.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/packetizer/mod.rs b/src/packetizer/mod.rs index 4e10fd5..a6fa085 100644 --- a/src/packetizer/mod.rs +++ b/src/packetizer/mod.rs @@ -52,7 +52,7 @@ pub struct RtpPacketizer { impl fmt::Debug for RtpPacketizer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("PacketizerImpl") + f.debug_struct("RtpPacketizer") .field("mtu", &self.mtu) .field("payload_type", &self.payload_type) .field("ssrc", &self.ssrc) diff --git a/src/sequence.rs b/src/sequence.rs index 9d286a6..7e768fc 100644 --- a/src/sequence.rs +++ b/src/sequence.rs @@ -14,7 +14,7 @@ impl WrappingSequencer { pub fn new(init_sequence_number: u16) -> Self { Self { next_sequence_number: init_sequence_number, - roll_over_count: 0 + roll_over_count: 0, } } @@ -40,7 +40,7 @@ impl Sequencer for WrappingSequencer { if self.next_sequence_number == 0 { self.roll_over_count += 1; } - + next_sequence_number } }