Skip to content

Commit

Permalink
breeak change(whipinto): change cmd cli flags
Browse files Browse the repository at this point in the history
  • Loading branch information
a-wing committed Jul 17, 2024
1 parent 1c5c418 commit 7886105
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 37 deletions.
86 changes: 50 additions & 36 deletions tools/whipinto/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use anyhow::{anyhow, Result};
use clap::{ArgAction, Parser, ValueEnum};
use cli::{create_child, Codec};
use rtspclient::setup_rtsp_session;
use std::fs;
use std::{sync::Arc, time::Duration, vec};

use anyhow::{anyhow, Result};
use clap::{ArgAction, Parser};
use scopeguard::defer;
use tokio::{
net::{TcpListener, UdpSocket},
sync::mpsc::{unbounded_channel, UnboundedSender},
};
use tracing::{debug, info, trace, warn, Level};
use url::Url;
use webrtc::{
api::{interceptor_registry::register_default_interceptors, media_engine::*, APIBuilder},
ice_transport::{ice_credential_type::RTCIceCredentialType, ice_server::RTCIceServer},
Expand All @@ -27,38 +27,29 @@ use webrtc::{
util::Unmarshal,
};

use cli::{create_child, Codec};
use libwish::Client;

use rtspclient::setup_rtsp_session;

mod payload;
mod rtspclient;
#[cfg(test)]
mod test;
const PREFIX_LIB: &str = "WEBRTC";

#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]
enum Mode {
Rtsp,
Rtp,
Pull,
}

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Verbose mode [default: "warn", -v "info", -vv "debug", -vvv "trace"]
#[arg(short = 'v', action = ArgAction::Count, default_value_t = 0)]
verbose: u8,
#[arg(short, long, value_enum, default_value_t = Mode::Rtsp)]
mode: Mode,
#[arg(long, default_value_t = String::from("[::1]"))]
host: String,
#[arg(short, long, default_value_t = 0)]
port: u16,
#[arg(short, long, value_enum)]
codec: Codec,
/// rtsp://[username]:[password]@[ip]:[port]/[stream] Or <stream.sdp>
#[arg(short, long, default_value_t = String::from("rtsp-listen://0.0.0.0:8554"))]
input: String,
/// The WHIP server endpoint to POST SDP offer to. e.g.: https://example.com/whip/777
#[arg(short, long)]
url: String,
whip: String,
/// Run a command as childprocess
#[arg(long)]
command: Option<String>,
Expand All @@ -68,9 +59,6 @@ struct Args {
/// Authentication token to use, will be sent in the HTTP Header as 'Bearer '
#[arg(long)]
auth_token: Option<String>,
///pull stream url, e.g.: rtsp://[username]:[password]@[ip]:[port]/[codectype]/[channel]/[subtype]/av_stream
#[arg(long)]
target: Option<String>,
}

#[tokio::main]
Expand All @@ -87,19 +75,24 @@ async fn main() -> Result<()> {
}
));

let host = args.host.clone();
let mut codec = args.codec;
let mut rtp_port = args.port;
let input = Url::parse(&args.input)
.unwrap_or(Url::parse(&format!("local://0.0.0.0:0/{}", args.input)).unwrap());
warn!("=== Received Input: {} ===", args.input);

let host = input.host().unwrap().to_string().clone();
let host2 = host.clone();
let mut codec = Codec::Vp8;
let mut rtp_port = input.port().unwrap_or(0);
let mut rtcp_send_port = 0;

let (complete_tx, mut complete_rx) = unbounded_channel();

if args.mode == Mode::Rtsp {
if input.scheme() == "rtsp-listen" {
let (tx, mut rx) = unbounded_channel::<String>();
let mut handler = rtsp::Handler::new(tx, complete_tx.clone());

tokio::spawn(async move {
let listener = TcpListener::bind(format!("{}:{}", host, rtp_port))
let listener = TcpListener::bind(format!("{}:{}", host2.clone(), rtp_port))
.await
.unwrap();
println!(
Expand Down Expand Up @@ -155,21 +148,42 @@ async fn main() -> Result<()> {
};
rtp_port = rtp_server_port;
rtcp_send_port = rtcp_listen_port;
} else if args.mode == Mode::Pull {
(rtp_port, codec) = setup_rtsp_session(&args.target.unwrap()).await?;
} else if input.scheme() == "rtsp" {
(rtp_port, codec) = setup_rtsp_session(&args.input).await?;
} else {
rtp_port = 8000;
let sdp = sdp_types::Session::parse(&fs::read(args.input).unwrap()).unwrap();
let video_track = sdp.medias.iter().find(|md| md.media == "video");

let video_codec = video_track
.and_then(|md| {
md.attributes.iter().find_map(|attr| {
if attr.attribute == "rtpmap" {
let parts: Vec<&str> = attr.value.as_ref()?.split_whitespace().collect();
if parts.len() > 1 {
Some(parts[1].split('/').next().unwrap_or("").to_string())
} else {
None
}
} else {
None
}
})
})
.unwrap_or_else(|| "unknown".to_string());

codec = rtspclient::codec_from_str(&video_codec)?;
rtp_port = video_track.unwrap().port;
}

let listener = UdpSocket::bind(format!("{}:{}", args.host, rtp_port)).await?;
let listener = UdpSocket::bind(format!("{}:{}", host.clone(), rtp_port)).await?;
let port = listener.local_addr()?.port();
info!(
"=== RTP listener started : {} ===",
listener.local_addr().unwrap()
);

let mut client = Client::new(
args.url,
args.whip,
Client::get_auth_header_map(args.auth_basic, args.auth_token),
);
let child = if let Some(command) = args.command {
Expand All @@ -190,12 +204,12 @@ async fn main() -> Result<()> {
.map_err(|error| anyhow!(format!("[{}] {}", PREFIX_LIB, error)))?;

tokio::spawn(rtp_listener(listener, sender));
if args.mode == Mode::Rtsp {
if input.scheme() == "rtsp-listen" {
let rtcp_port = rtp_port + 1;
tokio::spawn(rtcp_listener(args.host.clone(), rtcp_port, peer.clone()));
tokio::spawn(rtcp_listener(host.clone(), rtcp_port, peer.clone()));
let senders = peer.get_senders().await;
for sender in senders {
tokio::spawn(read_rtcp(sender, args.host.clone(), rtcp_send_port));
tokio::spawn(read_rtcp(sender, host.clone(), rtcp_send_port));
}
}

Expand Down
2 changes: 1 addition & 1 deletion tools/whipinto/src/rtspclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async fn keep_rtsp_alive(mut stream: TcpStream, mut cseq: u32) -> Result<()> {
Ok(())
}

fn codec_from_str(s: &str) -> Result<Codec> {
pub fn codec_from_str(s: &str) -> Result<Codec> {
match s {
"VP8" => Ok(Codec::Vp8),
"VP9" => Ok(Codec::Vp9),
Expand Down

0 comments on commit 7886105

Please sign in to comment.