diff --git a/Cargo.lock b/Cargo.lock index d71ab54c..9e446c36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3899,6 +3899,7 @@ name = "whipinto" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "clap", "cli", "libwish", diff --git a/tools/whipinto/Cargo.toml b/tools/whipinto/Cargo.toml index 34ce3ad4..814ff9e7 100644 --- a/tools/whipinto/Cargo.toml +++ b/tools/whipinto/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] clap = { version = "4.5.1", features = ["derive"] } webrtc = { git = "https://github.com/webrtc-rs/webrtc", rev = "49140eabfe2bb18cb92ac595888be35ab3b6e5b9" } +bytes = "1.6.0" anyhow = "1.0" tokio = { version = "1.36", features = ["full"] } cli = { path = "../../libs/cli" } diff --git a/tools/whipinto/src/main.rs b/tools/whipinto/src/main.rs index 88aca8b0..67fd8521 100644 --- a/tools/whipinto/src/main.rs +++ b/tools/whipinto/src/main.rs @@ -10,8 +10,7 @@ use tokio::{ net::UdpSocket, sync::mpsc::{unbounded_channel, UnboundedSender}, }; -use tracing::{info, trace, warn, Level}; -use webrtc::ice_transport::ice_credential_type::RTCIceCredentialType; +use tracing::{debug, error, info, trace, warn, Level}; use webrtc::{ api::{interceptor_registry::register_default_interceptors, media_engine::*, APIBuilder}, ice_transport::ice_server::RTCIceServer, @@ -27,6 +26,15 @@ use webrtc::{ }, util::Unmarshal, }; +use webrtc::{ + ice_transport::ice_credential_type::RTCIceCredentialType, + rtp::packetizer::{Depacketizer, Payloader}, +}; + +/// https://github.com/webrtc-rs/webrtc/blob/dcfefd7b48dc2bb9ecf50ea66c304f62719a6c4a/webrtc/src/track/mod.rs#L10C12-L10C49 +/// https://github.com/binbat/live777/issues/1200 +/// WebRTC Build-in RTP must less 1200 +const RTP_OUTBOUND_MTU: usize = 1200; const PREFIX_LIB: &str = "WEBRTC"; @@ -196,11 +204,12 @@ async fn new_peer( RTCPeerConnectionState::Closed => { let _ = complete_tx.send(()); } - _ => {} + v => debug!("{}", v), }; }); Box::pin(async {}) })); + let mime_type = codec.mime_type.clone(); let track = Arc::new(TrackLocalStaticRTP::new( codec, "webrtc".to_owned(), @@ -211,14 +220,72 @@ async fn new_peer( .await .map_err(|error| anyhow!(format!("{:?}: {}", error, error)))?; let (send, mut recv) = unbounded_channel::>(); + tokio::spawn(async move { let mut sequence_number: u16 = 0; + let mut buf = Vec::new(); + + // In order to verify that the sequence number is correct + // If network have some error loss some packet, We need detect it + let mut recv_sequence_number: u16 = 0; + + debug!("Codec is: {}", mime_type); + let mut decoder: Box = match mime_type.as_str() { + MIME_TYPE_VP8 => Box::default() as Box, + MIME_TYPE_VP9 => Box::default() as Box, + _ => Box::default() as Box, + }; + let mut encoder: Box = match mime_type.as_str() { + MIME_TYPE_VP8 => Box::default() as Box, + MIME_TYPE_VP9 => Box::default() as Box, + _ => Box::default() as Box, + }; + while let Some(data) = recv.recv().await { - if let Ok(mut packet) = rtp::packet::Packet::unmarshal(&mut data.as_slice()) { + if let Ok(packet) = rtp::packet::Packet::unmarshal(&mut data.as_slice()) { trace!("received packet: {}", packet); - packet.header.sequence_number = sequence_number; - let _ = track.write_rtp(&packet).await; - sequence_number = sequence_number.wrapping_add(1); + + // verify the sequence number is linear + if recv_sequence_number + 1 != packet.header.sequence_number + && recv_sequence_number != 0 + { + error!( + "Should received sequence: {}. But received sequence: {}", + recv_sequence_number + 1, + packet.header.sequence_number + ); + } + recv_sequence_number = packet.header.sequence_number; + + match decoder.depacketize(&packet.payload) { + Ok(data) => buf.push(data), + Err(e) => error!("{}", e), + }; + + if packet.header.marker { + match encoder.payload(RTP_OUTBOUND_MTU, &bytes::Bytes::from(buf.concat())) { + Ok(payloads) => { + let length = payloads.len(); + for (i, payload) in payloads.into_iter().enumerate() { + let mut header = packet.clone().header; + header.sequence_number = sequence_number; + header.marker = matches!(i, x if x == length - 1); + + let p = rtp::packet::Packet { header, payload }; + + trace!("send packet: {}", p); + let _ = track.write_rtp(&p).await; + sequence_number = sequence_number.wrapping_add(1); + if sequence_number == u16::MAX { + sequence_number = 0; + } + } + } + Err(e) => error!("{}", e), + }; + + buf.clear(); + } } } });