diff --git a/Cargo.toml b/Cargo.toml index 3e070c6..6241e65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,13 +14,10 @@ util = { package = "webrtc-util", version = "0.5.4", default-features = false, f bytes = "1" rand = "0.8.5" thiserror = "1.0" -async-trait = "0.1.56" [dev-dependencies] chrono = "0.4.19" criterion = "0.3.5" -tokio = { version = "1.19", features = ["full"] } -tokio-test = "0.4.0" # must match the min version of the `tokio` crate above [[bench]] name = "packet_bench" diff --git a/src/codecs/g7xx/mod.rs b/src/codecs/g7xx/mod.rs index d8ddff1..3881253 100644 --- a/src/codecs/g7xx/mod.rs +++ b/src/codecs/g7xx/mod.rs @@ -36,8 +36,4 @@ impl Payloader for G7xxPayloader { Ok(payloads) } - - fn clone_to(&self) -> Box { - Box::new(*self) - } } diff --git a/src/codecs/h264/mod.rs b/src/codecs/h264/mod.rs index 7c4cca0..06482bd 100644 --- a/src/codecs/h264/mod.rs +++ b/src/codecs/h264/mod.rs @@ -192,10 +192,6 @@ impl Payloader for H264Payloader { Ok(payloads) } - - fn clone_to(&self) -> Box { - Box::new(self.clone()) - } } /// H264Packet represents the H264 header that is stored in the payload of an RTP Packet diff --git a/src/codecs/opus/mod.rs b/src/codecs/opus/mod.rs index 731f470..12bb0af 100644 --- a/src/codecs/opus/mod.rs +++ b/src/codecs/opus/mod.rs @@ -19,10 +19,6 @@ impl Payloader for OpusPayloader { Ok(vec![payload.clone()]) } - - fn clone_to(&self) -> Box { - Box::new(*self) - } } /// OpusPacket represents the Opus header that is stored in the payload of an RTP Packet diff --git a/src/codecs/vp8/mod.rs b/src/codecs/vp8/mod.rs index f7b003e..f43a1c6 100644 --- a/src/codecs/vp8/mod.rs +++ b/src/codecs/vp8/mod.rs @@ -104,10 +104,6 @@ impl Payloader for Vp8Payloader { Ok(payloads) } - - fn clone_to(&self) -> Box { - Box::new(*self) - } } /// Vp8Packet represents the VP8 header that is stored in the payload of an RTP Packet diff --git a/src/codecs/vp9/mod.rs b/src/codecs/vp9/mod.rs index 3132e3d..2d13d93 100644 --- a/src/codecs/vp9/mod.rs +++ b/src/codecs/vp9/mod.rs @@ -7,32 +7,33 @@ use crate::{ }; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use std::fmt; -use std::sync::Arc; /// Flexible mode 15 bit picture ID const VP9HEADER_SIZE: usize = 3; const MAX_SPATIAL_LAYERS: u8 = 5; const MAX_VP9REF_PICS: usize = 3; -/// InitialPictureIDFn is a function that returns random initial picture ID. -pub type InitialPictureIDFn = Arc u16) + Send + Sync>; - /// Vp9Payloader payloads VP9 packets -#[derive(Default, Clone)] +#[derive(Clone, Debug)] pub struct Vp9Payloader { picture_id: u16, - initialized: bool, +} + +impl Vp9Payloader { + pub fn new() -> Self { + Self::new_with_id(rand::random::() & 0x7FFF) + } - pub initial_picture_id_fn: Option, + pub fn new_with_id(init_picture_id: u16) -> Self { + Self { + picture_id: init_picture_id, + } + } } -impl fmt::Debug for Vp9Payloader { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Vp9Payloader") - .field("picture_id", &self.picture_id) - .field("initialized", &self.initialized) - .finish() +impl Default for Vp9Payloader { + fn default() -> Self { + Self::new() } } @@ -81,19 +82,6 @@ impl Payloader for Vp9Payloader { return Ok(vec![]); } - if !self.initialized { - if self.initial_picture_id_fn.is_none() { - self.initial_picture_id_fn = - Some(Arc::new(|| -> u16 { rand::random::() & 0x7FFF })); - } - self.picture_id = if let Some(f) = &self.initial_picture_id_fn { - f() - } else { - 0 - }; - self.initialized = true; - } - let max_fragment_size = mtu as isize - VP9HEADER_SIZE as isize; let mut payloads = vec![]; let mut payload_data_remaining = payload.len(); @@ -135,10 +123,6 @@ impl Payloader for Vp9Payloader { Ok(payloads) } - - fn clone_to(&self) -> Box { - Box::new(self.clone()) - } } /// Vp9Packet represents the VP9 header that is stored in the payload of an RTP Packet @@ -263,9 +247,9 @@ impl Vp9Packet { // M: | EXTENDED PID | // +-+-+-+-+-+-+-+-+ // - fn parse_picture_id( + fn parse_picture_id( &mut self, - reader: &mut dyn Buf, + reader: &mut B, mut payload_index: usize, ) -> Result { if reader.remaining() == 0 { @@ -288,9 +272,9 @@ impl Vp9Packet { Ok(payload_index) } - fn parse_layer_info( + fn parse_layer_info( &mut self, - reader: &mut dyn Buf, + reader: &mut B, mut payload_index: usize, ) -> Result { payload_index = self.parse_layer_info_common(reader, payload_index)?; @@ -308,9 +292,9 @@ impl Vp9Packet { // L: | T |U| S |D| // +-+-+-+-+-+-+-+-+ // - fn parse_layer_info_common( + fn parse_layer_info_common( &mut self, - reader: &mut dyn Buf, + reader: &mut B, mut payload_index: usize, ) -> Result { if reader.remaining() == 0 { @@ -339,9 +323,9 @@ impl Vp9Packet { // | tl0picidx | // +-+-+-+-+-+-+-+-+ // - fn parse_layer_info_non_flexible_mode( + fn parse_layer_info_non_flexible_mode( &mut self, - reader: &mut dyn Buf, + reader: &mut B, mut payload_index: usize, ) -> Result { if reader.remaining() == 0 { @@ -359,9 +343,9 @@ impl Vp9Packet { // +-+-+-+-+-+-+-+-+ N=1: An additional P_DIFF follows // current P_DIFF. // - fn parse_ref_indices( + fn parse_ref_indices( &mut self, - reader: &mut dyn Buf, + reader: &mut B, mut payload_index: usize, ) -> Result { let mut b = 1u8; @@ -401,7 +385,7 @@ impl Vp9Packet { // | P_DIFF | (OPTIONAL) . R times . // +-+-+-+-+-+-+-+-+ -| -| // - fn parse_ssdata(&mut self, reader: &mut dyn Buf, mut payload_index: usize) -> Result { + fn parse_ssdata(&mut self, reader: &mut B, mut payload_index: usize) -> Result { if reader.remaining() == 0 { return Err(Error::ErrShortPacket); } diff --git a/src/codecs/vp9/vp9_test.rs b/src/codecs/vp9/vp9_test.rs index 49a1481..251fd3e 100644 --- a/src/codecs/vp9/vp9_test.rs +++ b/src/codecs/vp9/vp9_test.rs @@ -295,10 +295,7 @@ fn test_vp9_payloader_payload() -> Result<()> { ]; for (name, bs, mtu, expected) in tests { - let mut pck = Vp9Payloader { - initial_picture_id_fn: Some(Arc::new(|| -> u16 { 8692 })), - ..Default::default() - }; + let mut pck = Vp9Payloader::new_with_id(8692); let mut actual = vec![]; for b in &bs { @@ -309,10 +306,8 @@ fn test_vp9_payloader_payload() -> Result<()> { //"PictureIDOverflow" { - let mut pck = Vp9Payloader { - initial_picture_id_fn: Some(Arc::new(|| -> u16 { 8692 })), - ..Default::default() - }; + let mut pck = Vp9Payloader::new_with_id(8692); + let mut p_prev = Vp9Packet::default(); for i in 0..0x8000 { let res = pck.payload(4, &Bytes::from_static(&[0x01]))?; diff --git a/src/error.rs b/src/error.rs index 8e5895c..0d25314 100644 --- a/src/error.rs +++ b/src/error.rs @@ -55,11 +55,18 @@ pub enum Error { StapASizeLargerThanBuffer(usize, usize), #[error("nalu type {0} is currently not handled")] NaluTypeIsNotHandled(u8), - #[error("{0}")] - Util(#[from] util::Error), + // We box the util::Error and use a Box to keep the size of this error to 24 bytes + #[error("{0}")] + Util(Box), #[error("{0}")] - Other(String), + Other(Box), +} + +impl From for Error { + fn from(e: util::Error) -> Self { + Self::Util(Box::new(e)) + } } impl From for util::Error { @@ -70,10 +77,9 @@ impl From for util::Error { impl PartialEq for Error { fn eq(&self, other: &util::Error) -> bool { - if let Some(down) = other.downcast_ref::() { - self == down - } else { - false - } + other + .downcast_ref::() + .map(|down| self == down) + .unwrap_or(false) } } diff --git a/src/lib.rs b/src/lib.rs index 18066e7..ba34d2a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,4 @@ -#![warn(rust_2018_idioms)] -#![allow(dead_code)] +#![deny(rust_2018_idioms)] pub mod codecs; mod error; diff --git a/src/packet/packet_test.rs b/src/packet/packet_test.rs index 2ec053b..352385d 100644 --- a/src/packet/packet_test.rs +++ b/src/packet/packet_test.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use super::*; use crate::error::Result; use bytes::{Bytes, BytesMut}; diff --git a/src/packetizer/mod.rs b/src/packetizer/mod.rs index 82e52cb..a6fa085 100644 --- a/src/packetizer/mod.rs +++ b/src/packetizer/mod.rs @@ -5,39 +5,20 @@ use crate::error::Result; use crate::{extension::abs_send_time_extension::*, header::*, packet::*, sequence::*}; use util::marshal::{Marshal, MarshalSize}; -use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use std::fmt; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; use std::time::SystemTime; /// Payloader payloads a byte array for use as rtp.Packet payloads pub trait Payloader: fmt::Debug { fn payload(&mut self, mtu: usize, b: &Bytes) -> Result>; - fn clone_to(&self) -> Box; -} - -impl Clone for Box { - fn clone(&self) -> Box { - self.clone_to() - } } /// Packetizer packetizes a payload -#[async_trait] pub trait Packetizer: fmt::Debug { fn enable_abs_send_time(&mut self, value: u8); - async fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result>; + fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result>; fn skip_samples(&mut self, skipped_samples: u32); - fn clone_to(&self) -> Box; -} - -impl Clone for Box { - fn clone(&self) -> Box { - self.clone_to() - } } /// Depacketizer depacketizes a RTP payload, removing any RTP specific data from the payload @@ -54,28 +35,24 @@ pub trait Depacketizer { fn is_partition_tail(&self, marker: bool, payload: &Bytes) -> bool; } -//TODO: SystemTime vs Instant? -// non-monotonic clock vs monotonically non-decreasing clock -/// FnTimeGen provides current SystemTime -pub type FnTimeGen = - Arc Pin + Send + 'static>>) + Send + Sync>; - #[derive(Clone)] -pub(crate) struct PacketizerImpl { +pub struct RtpPacketizer { pub(crate) mtu: usize, pub(crate) payload_type: u8, pub(crate) ssrc: u32, - pub(crate) payloader: Box, - pub(crate) sequencer: Box, + pub(crate) payloader: P, + pub(crate) sequencer: S, pub(crate) timestamp: u32, pub(crate) clock_rate: u32, pub(crate) abs_send_time: u8, //http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time - pub(crate) time_gen: Option, + // TODO: SystemTime is not monotonic, but Instant doesn't work because it can't be converted + // to a unix timestamp. We should probably look for a way to have the best of both worlds. + pub(crate) time_gen: fn() -> SystemTime, } -impl fmt::Debug for PacketizerImpl { +impl fmt::Debug for RtpPacketizer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("PacketizerImpl") + f.debug_struct("RtpPacketizer") .field("mtu", &self.mtu) .field("payload_type", &self.payload_type) .field("ssrc", &self.ssrc) @@ -86,15 +63,15 @@ impl fmt::Debug for PacketizerImpl { } } -pub fn new_packetizer( +pub fn new_packetizer( mtu: usize, payload_type: u8, ssrc: u32, - payloader: Box, - sequencer: Box, + payloader: P, + sequencer: S, clock_rate: u32, -) -> impl Packetizer { - PacketizerImpl { +) -> RtpPacketizer { + RtpPacketizer { mtu, payload_type, ssrc, @@ -103,17 +80,16 @@ pub fn new_packetizer( timestamp: rand::random::(), clock_rate, abs_send_time: 0, - time_gen: None, + time_gen: SystemTime::now, } } -#[async_trait] -impl Packetizer for PacketizerImpl { +impl Packetizer for RtpPacketizer { fn enable_abs_send_time(&mut self, value: u8) { self.abs_send_time = value } - async fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result> { + fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result> { let payloads = self.payloader.payload(self.mtu - 12, payload)?; let payloads_len = payloads.len(); let mut packets = Vec::with_capacity(payloads_len); @@ -137,12 +113,7 @@ impl Packetizer for PacketizerImpl { self.timestamp = self.timestamp.wrapping_add(samples); if payloads_len != 0 && self.abs_send_time != 0 { - let st = if let Some(fn_time_gen) = &self.time_gen { - fn_time_gen().await - } else { - SystemTime::now() - }; - let send_time = AbsSendTimeExtension::new(st); + let send_time = AbsSendTimeExtension::new((self.time_gen)()); //apply http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time let mut raw = BytesMut::with_capacity(send_time.marshal_size()); raw.resize(send_time.marshal_size(), 0); @@ -160,8 +131,4 @@ impl Packetizer for PacketizerImpl { fn skip_samples(&mut self, skipped_samples: u32) { self.timestamp = self.timestamp.wrapping_add(skipped_samples); } - - fn clone_to(&self) -> Box { - Box::new(self.clone()) - } } diff --git a/src/packetizer/packetizer_test.rs b/src/packetizer/packetizer_test.rs index b9e6f21..c8df8b7 100644 --- a/src/packetizer/packetizer_test.rs +++ b/src/packetizer/packetizer_test.rs @@ -5,15 +5,15 @@ use crate::error::Result; use chrono::prelude::*; use std::time::{Duration, UNIX_EPOCH}; -#[tokio::test] -async fn test_packetizer() -> Result<()> { +#[test] +fn test_packetizer() -> Result<()> { let multiple_payload = Bytes::from_static(&[0; 128]); - let g722 = Box::new(g7xx::G722Payloader {}); - let seq = Box::new(new_random_sequencer()); + let g722 = g7xx::G722Payloader {}; + let seq = WrappingSequencer::new_random(); //use the G722 payloader here, because it's very simple and all 0s is valid G722 data. let mut packetizer = new_packetizer(100, 98, 0x1234ABCD, g722, seq, 90000); - let packets = packetizer.packetize(&multiple_payload, 2000).await?; + let packets = packetizer.packetize(&multiple_payload, 2000)?; if packets.len() != 2 { let mut packet_lengths = String::new(); @@ -31,25 +31,21 @@ async fn test_packetizer() -> Result<()> { Ok(()) } -#[tokio::test] -async fn test_packetizer_abs_send_time() -> Result<()> { - let g722 = Box::new(g7xx::G722Payloader {}); - let sequencer = Box::new(new_fixed_sequencer(1234)); - - let time_gen: Option = Some(Arc::new( - || -> Pin + Send + 'static>> { - Box::pin(async move { - let loc = FixedOffset::west(5 * 60 * 60); // UTC-5 - let t = loc.ymd(1985, 6, 23).and_hms_nano(4, 0, 0, 0); - UNIX_EPOCH - .checked_add(Duration::from_nanos(t.timestamp_nanos() as u64)) - .unwrap_or(UNIX_EPOCH) - }) - }, - )); +#[test] +fn test_packetizer_abs_send_time() -> Result<()> { + let g722 = g7xx::G722Payloader {}; + let sequencer = WrappingSequencer::new(1234); + + let time_gen = || { + let loc = FixedOffset::west(5 * 60 * 60); // UTC-5 + let t = loc.ymd(1985, 6, 23).and_hms_nano(4, 0, 0, 0); + UNIX_EPOCH + .checked_add(Duration::from_nanos(t.timestamp_nanos() as u64)) + .unwrap_or(UNIX_EPOCH) + }; //use the G722 payloader here, because it's very simple and all 0s is valid G722 data. - let mut pktizer = PacketizerImpl { + let mut pktizer = RtpPacketizer { mtu: 100, payload_type: 98, ssrc: 0x1234ABCD, @@ -63,7 +59,7 @@ async fn test_packetizer_abs_send_time() -> Result<()> { pktizer.enable_abs_send_time(1); let payload = Bytes::from_static(&[0x11, 0x12, 0x13, 0x14]); - let packets = pktizer.packetize(&payload, 2000).await?; + let packets = pktizer.packetize(&payload, 2000)?; let expected = Packet { header: Header { @@ -94,17 +90,17 @@ async fn test_packetizer_abs_send_time() -> Result<()> { Ok(()) } -#[tokio::test] -async fn test_packetizer_timestamp_rollover_does_not_panic() -> Result<()> { - let g722 = Box::new(g7xx::G722Payloader {}); - let seq = Box::new(new_random_sequencer()); +#[test] +fn test_packetizer_timestamp_rollover_does_not_panic() -> Result<()> { + let g722 = g7xx::G722Payloader {}; + let seq = WrappingSequencer::new_random(); let payload = Bytes::from_static(&[0; 128]); let mut packetizer = new_packetizer(100, 98, 0x1234ABCD, g722, seq, 90000); - packetizer.packetize(&payload, 10).await?; + packetizer.packetize(&payload, 10)?; - packetizer.packetize(&payload, u32::MAX).await?; + packetizer.packetize(&payload, u32::MAX)?; packetizer.skip_samples(u32::MAX); diff --git a/src/sequence.rs b/src/sequence.rs index 4392fcd..7e768fc 100644 --- a/src/sequence.rs +++ b/src/sequence.rs @@ -1,74 +1,46 @@ -use std::fmt; -use std::sync::{Arc, Mutex}; - /// Sequencer generates sequential sequence numbers for building RTP packets -pub trait Sequencer: fmt::Debug { - fn next_sequence_number(&self) -> u16; - fn roll_over_count(&self) -> u64; - fn clone_to(&self) -> Box; -} - -impl Clone for Box { - fn clone(&self) -> Box { - self.clone_to() - } -} - -/// NewRandomSequencer returns a new sequencer starting from a random sequence -/// number -pub fn new_random_sequencer() -> impl Sequencer { - let c = Counters { - sequence_number: rand::random::(), - roll_over_count: 0, - }; - SequencerImpl(Arc::new(Mutex::new(c))) -} - -/// NewFixedSequencer returns a new sequencer starting from a specific -/// sequence number -pub fn new_fixed_sequencer(s: u16) -> impl Sequencer { - let sequence_number = if s == 0 { u16::MAX } else { s - 1 }; - - let c = Counters { - sequence_number, - roll_over_count: 0, - }; - - SequencerImpl(Arc::new(Mutex::new(c))) +pub trait Sequencer { + fn next_sequence_number(&mut self) -> u16; } #[derive(Debug, Clone)] -struct SequencerImpl(Arc>); - -#[derive(Debug)] -struct Counters { - sequence_number: u16, +pub struct WrappingSequencer { + next_sequence_number: u16, roll_over_count: u64, } -impl Sequencer for SequencerImpl { - /// NextSequenceNumber increment and returns a new sequence number for - /// building RTP packets - fn next_sequence_number(&self) -> u16 { - let mut lock = self.0.lock().unwrap(); - - if lock.sequence_number == u16::MAX { - lock.roll_over_count += 1; - lock.sequence_number = 0; - } else { - lock.sequence_number += 1; +impl WrappingSequencer { + /// Returns a new sequencer starting from a specific sequence number. + pub fn new(init_sequence_number: u16) -> Self { + Self { + next_sequence_number: init_sequence_number, + roll_over_count: 0, } + } - lock.sequence_number + /// Returns a new sequencer starting from a random sequence number. + pub fn new_random() -> Self { + Self::new(rand::random::()) } /// RollOverCount returns the amount of times the 16bit sequence number /// has wrapped - fn roll_over_count(&self) -> u64 { - self.0.lock().unwrap().roll_over_count + pub fn roll_over_count(&self) -> u64 { + self.roll_over_count } +} + +impl Sequencer for WrappingSequencer { + /// NextSequenceNumber increment and returns a new sequence number for + /// building RTP packets + fn next_sequence_number(&mut self) -> u16 { + let next_sequence_number = self.next_sequence_number; + self.next_sequence_number = self.next_sequence_number.wrapping_add(1); + + if self.next_sequence_number == 0 { + self.roll_over_count += 1; + } - fn clone_to(&self) -> Box { - Box::new(self.clone()) + next_sequence_number } }