diff --git a/tools/whipinto/src/main.rs b/tools/whipinto/src/main.rs index 42a52763..8f99e117 100644 --- a/tools/whipinto/src/main.rs +++ b/tools/whipinto/src/main.rs @@ -1,15 +1,15 @@ -use anyhow::{anyhow, Result}; -use clap::{ArgAction, Parser, ValueEnum}; -use cli::{create_child, Codec}; -use rtspclient::setup_rtsp_session; +use std::fs; use std::{sync::Arc, time::Duration, vec}; +use anyhow::{anyhow, Result}; +use clap::{ArgAction, Parser}; use scopeguard::defer; use tokio::{ net::{TcpListener, UdpSocket}, sync::mpsc::{unbounded_channel, UnboundedSender}, }; use tracing::{debug, info, trace, warn, Level}; +use url::Url; use webrtc::{ api::{interceptor_registry::register_default_interceptors, media_engine::*, APIBuilder}, ice_transport::{ice_credential_type::RTCIceCredentialType, ice_server::RTCIceServer}, @@ -27,38 +27,29 @@ use webrtc::{ util::Unmarshal, }; +use cli::{create_child, Codec}; use libwish::Client; +use rtspclient::setup_rtsp_session; + mod payload; mod rtspclient; #[cfg(test)] mod test; const PREFIX_LIB: &str = "WEBRTC"; -#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] -enum Mode { - Rtsp, - Rtp, - Pull, -} - #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Args { /// Verbose mode [default: "warn", -v "info", -vv "debug", -vvv "trace"] #[arg(short = 'v', action = ArgAction::Count, default_value_t = 0)] verbose: u8, - #[arg(short, long, value_enum, default_value_t = Mode::Rtsp)] - mode: Mode, - #[arg(long, default_value_t = String::from("[::1]"))] - host: String, - #[arg(short, long, default_value_t = 0)] - port: u16, - #[arg(short, long, value_enum)] - codec: Codec, + /// rtsp://[username]:[password]@[ip]:[port]/[stream] Or + #[arg(short, long, default_value_t = String::from("rtsp-listen://0.0.0.0:8554"))] + input: String, /// The WHIP server endpoint to POST SDP offer to. e.g.: https://example.com/whip/777 #[arg(short, long)] - url: String, + whip: String, /// Run a command as childprocess #[arg(long)] command: Option, @@ -68,9 +59,6 @@ struct Args { /// Authentication token to use, will be sent in the HTTP Header as 'Bearer ' #[arg(long)] auth_token: Option, - ///pull stream url, e.g.: rtsp://[username]:[password]@[ip]:[port]/[codectype]/[channel]/[subtype]/av_stream - #[arg(long)] - target: Option, } #[tokio::main] @@ -87,19 +75,24 @@ async fn main() -> Result<()> { } )); - let host = args.host.clone(); - let mut codec = args.codec; - let mut rtp_port = args.port; + let input = Url::parse(&args.input) + .unwrap_or(Url::parse(&format!("local://0.0.0.0:0/{}", args.input)).unwrap()); + warn!("=== Received Input: {} ===", args.input); + + let host = input.host().unwrap().to_string().clone(); + let host2 = host.clone(); + let mut codec = Codec::Vp8; + let mut rtp_port = input.port().unwrap_or(0); let mut rtcp_send_port = 0; let (complete_tx, mut complete_rx) = unbounded_channel(); - if args.mode == Mode::Rtsp { + if input.scheme() == "rtsp-listen" { let (tx, mut rx) = unbounded_channel::(); let mut handler = rtsp::Handler::new(tx, complete_tx.clone()); tokio::spawn(async move { - let listener = TcpListener::bind(format!("{}:{}", host, rtp_port)) + let listener = TcpListener::bind(format!("{}:{}", host2.clone(), rtp_port)) .await .unwrap(); println!( @@ -155,13 +148,34 @@ async fn main() -> Result<()> { }; rtp_port = rtp_server_port; rtcp_send_port = rtcp_listen_port; - } else if args.mode == Mode::Pull { - (rtp_port, codec) = setup_rtsp_session(&args.target.unwrap()).await?; + } else if input.scheme() == "rtsp" { + (rtp_port, codec) = setup_rtsp_session(&args.input).await?; } else { - rtp_port = 8000; + let sdp = sdp_types::Session::parse(&fs::read(args.input).unwrap()).unwrap(); + let video_track = sdp.medias.iter().find(|md| md.media == "video"); + + let video_codec = video_track + .and_then(|md| { + md.attributes.iter().find_map(|attr| { + if attr.attribute == "rtpmap" { + let parts: Vec<&str> = attr.value.as_ref()?.split_whitespace().collect(); + if parts.len() > 1 { + Some(parts[1].split('/').next().unwrap_or("").to_string()) + } else { + None + } + } else { + None + } + }) + }) + .unwrap_or_else(|| "unknown".to_string()); + + codec = rtspclient::codec_from_str(&video_codec)?; + rtp_port = video_track.unwrap().port; } - let listener = UdpSocket::bind(format!("{}:{}", args.host, rtp_port)).await?; + let listener = UdpSocket::bind(format!("{}:{}", host.clone(), rtp_port)).await?; let port = listener.local_addr()?.port(); info!( "=== RTP listener started : {} ===", @@ -169,7 +183,7 @@ async fn main() -> Result<()> { ); let mut client = Client::new( - args.url, + args.whip, Client::get_auth_header_map(args.auth_basic, args.auth_token), ); let child = if let Some(command) = args.command { @@ -190,12 +204,12 @@ async fn main() -> Result<()> { .map_err(|error| anyhow!(format!("[{}] {}", PREFIX_LIB, error)))?; tokio::spawn(rtp_listener(listener, sender)); - if args.mode == Mode::Rtsp { + if input.scheme() == "rtsp-listen" { let rtcp_port = rtp_port + 1; - tokio::spawn(rtcp_listener(args.host.clone(), rtcp_port, peer.clone())); + tokio::spawn(rtcp_listener(host.clone(), rtcp_port, peer.clone())); let senders = peer.get_senders().await; for sender in senders { - tokio::spawn(read_rtcp(sender, args.host.clone(), rtcp_send_port)); + tokio::spawn(read_rtcp(sender, host.clone(), rtcp_send_port)); } } diff --git a/tools/whipinto/src/rtspclient.rs b/tools/whipinto/src/rtspclient.rs index 7be53129..cab0b67b 100644 --- a/tools/whipinto/src/rtspclient.rs +++ b/tools/whipinto/src/rtspclient.rs @@ -130,7 +130,7 @@ async fn keep_rtsp_alive(mut stream: TcpStream, mut cseq: u32) -> Result<()> { Ok(()) } -fn codec_from_str(s: &str) -> Result { +pub fn codec_from_str(s: &str) -> Result { match s { "VP8" => Ok(Codec::Vp8), "VP9" => Ok(Codec::Vp9),