Skip to content

Commit

Permalink
add whipinfo rtp pkt_size more than 1200
Browse files Browse the repository at this point in the history
  • Loading branch information
a-wing committed May 14, 2024
1 parent 00a38a9 commit 4b8695f
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tools/whipinto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
clap = { version = "4.5.1", features = ["derive"] }
webrtc = { git = "https://github.com/webrtc-rs/webrtc", rev = "49140eabfe2bb18cb92ac595888be35ab3b6e5b9" }
bytes = "1.6.0"
anyhow = "1.0"
tokio = { version = "1.36", features = ["full"] }
cli = { path = "../../libs/cli" }
Expand Down
81 changes: 74 additions & 7 deletions tools/whipinto/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use tokio::{
net::UdpSocket,
sync::mpsc::{unbounded_channel, UnboundedSender},
};
use tracing::{info, trace, warn, Level};
use webrtc::ice_transport::ice_credential_type::RTCIceCredentialType;
use tracing::{debug, error, info, trace, warn, Level};
use webrtc::{
api::{interceptor_registry::register_default_interceptors, media_engine::*, APIBuilder},
ice_transport::ice_server::RTCIceServer,
Expand All @@ -27,6 +26,15 @@ use webrtc::{
},
util::Unmarshal,
};
use webrtc::{
ice_transport::ice_credential_type::RTCIceCredentialType,
rtp::packetizer::{Depacketizer, Payloader},
};

/// https://github.com/webrtc-rs/webrtc/blob/dcfefd7b48dc2bb9ecf50ea66c304f62719a6c4a/webrtc/src/track/mod.rs#L10C12-L10C49
/// https://github.com/binbat/live777/issues/1200
/// WebRTC Build-in RTP must less 1200
const RTP_OUTBOUND_MTU: usize = 1200;

const PREFIX_LIB: &str = "WEBRTC";

Expand Down Expand Up @@ -196,11 +204,12 @@ async fn new_peer(
RTCPeerConnectionState::Closed => {
let _ = complete_tx.send(());
}
_ => {}
v => debug!("{}", v),
};
});
Box::pin(async {})
}));
let mime_type = codec.mime_type.clone();
let track = Arc::new(TrackLocalStaticRTP::new(
codec,
"webrtc".to_owned(),
Expand All @@ -211,14 +220,72 @@ async fn new_peer(
.await
.map_err(|error| anyhow!(format!("{:?}: {}", error, error)))?;
let (send, mut recv) = unbounded_channel::<Vec<u8>>();

tokio::spawn(async move {
let mut sequence_number: u16 = 0;
let mut buf = Vec::new();

// In order to verify that the sequence number is correct
// If network have some error loss some packet, We need detect it
let mut recv_sequence_number: u16 = 0;

debug!("Codec is: {}", mime_type);
let mut decoder: Box<dyn Depacketizer + Send> = match mime_type.as_str() {
MIME_TYPE_VP8 => Box::default() as Box<rtp::codecs::vp8::Vp8Packet>,
MIME_TYPE_VP9 => Box::default() as Box<rtp::codecs::vp9::Vp9Packet>,
_ => Box::default() as Box<rtp::codecs::vp8::Vp8Packet>,
};
let mut encoder: Box<dyn Payloader + Send> = match mime_type.as_str() {
MIME_TYPE_VP8 => Box::default() as Box<rtp::codecs::vp8::Vp8Payloader>,
MIME_TYPE_VP9 => Box::default() as Box<rtp::codecs::vp9::Vp9Payloader>,
_ => Box::default() as Box<rtp::codecs::vp8::Vp8Payloader>,
};

while let Some(data) = recv.recv().await {
if let Ok(mut packet) = rtp::packet::Packet::unmarshal(&mut data.as_slice()) {
if let Ok(packet) = rtp::packet::Packet::unmarshal(&mut data.as_slice()) {
trace!("received packet: {}", packet);
packet.header.sequence_number = sequence_number;
let _ = track.write_rtp(&packet).await;
sequence_number = sequence_number.wrapping_add(1);

// verify the sequence number is linear
if recv_sequence_number + 1 != packet.header.sequence_number
&& recv_sequence_number != 0
{
error!(
"Should received sequence: {}. But received sequence: {}",
recv_sequence_number + 1,
packet.header.sequence_number
);
}
recv_sequence_number = packet.header.sequence_number;

match decoder.depacketize(&packet.payload) {
Ok(data) => buf.push(data),
Err(e) => error!("{}", e),
};

if packet.header.marker {
match encoder.payload(RTP_OUTBOUND_MTU, &bytes::Bytes::from(buf.concat())) {
Ok(payloads) => {
let length = payloads.len();
for (i, payload) in payloads.into_iter().enumerate() {
let mut header = packet.clone().header;
header.sequence_number = sequence_number;
header.marker = matches!(i, x if x == length - 1);

let p = rtp::packet::Packet { header, payload };

trace!("send packet: {}", p);
let _ = track.write_rtp(&p).await;
sequence_number = sequence_number.wrapping_add(1);
if sequence_number == u16::MAX {
sequence_number = 0;
}
}
}
Err(e) => error!("{}", e),
};

buf.clear();
}
}
}
});
Expand Down

0 comments on commit 4b8695f

Please sign in to comment.