Skip to content

Commit

Permalink
add whipinto and whepfrom verbose mode (-v, -vv, -vvv) #127
Browse files Browse the repository at this point in the history
  • Loading branch information
a-wing committed May 11, 2024
1 parent 4e2d01a commit 00a38a9
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 46 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ local-ip-address = "0.6"
libwish = { path = "libs/libwish" }
signal = { path = "libs/signal" }
live777-http = { path = "libs/live777-http" }
utils = { path = "libs/utils" }
reqwest = { version = "0.11.24", features = [
"rustls-tls",
], default-features = false }
Expand Down
1 change: 1 addition & 0 deletions gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ tower-http = { version = "0.5.2", features = ["fs", "auth", "trace", "cors"] }
anyhow = { version = "1", features = ["backtrace"] }
signal = { path = "../libs/signal" }
live777-http = { path = "../libs/live777-http" }
utils = { path = "../libs/utils" }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
clap = { version = "4.5.1", features = ["derive"] }
Expand Down
19 changes: 1 addition & 18 deletions gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ use live777_http::event::Event;
use model::{Node, Stream};
use sqlx::mysql::MySqlConnectOptions;
use sqlx::MySqlPool;
use std::env;
use std::future::IntoFuture;
use std::str::FromStr;
use std::time::Duration;
use tracing_subscriber::EnvFilter;

#[cfg(debug_assertions)]
use tower_http::services::{ServeDir, ServeFile};
Expand Down Expand Up @@ -64,7 +62,7 @@ async fn main() {
sqlx::any::install_default_drivers();
let args = Args::parse();
let cfg = Config::parse(args.config);
set_log(format!(
utils::set_log(format!(
"live777_gateway={},live777_storage={},sqlx={},webrtc=error",
cfg.log.level, cfg.log.level, cfg.log.level
));
Expand Down Expand Up @@ -127,21 +125,6 @@ async fn main() {
info!("Server shutdown");
}

fn set_log(env_filter: String) {
let _ = env::var("RUST_LOG").is_err_and(|_| {
env::set_var("RUST_LOG", env_filter);
true
});
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.compact()
.with_file(true)
.with_line_number(true)
.with_thread_ids(true)
.with_target(true)
.init();
}

#[cfg(not(debug_assertions))]
#[derive(RustEmbed)]
#[folder = "../assets/"]
Expand Down
11 changes: 11 additions & 0 deletions libs/utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "utils"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
crate-type = ["lib"]

[dependencies]
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
17 changes: 17 additions & 0 deletions libs/utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use std::env;
use tracing_subscriber::EnvFilter;

pub fn set_log(env_filter: String) {
let _ = env::var("RUST_LOG").is_err_and(|_| {
env::set_var("RUST_LOG", env_filter);
true
});
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.compact()
.with_file(true)
.with_line_number(true)
.with_thread_ids(true)
.with_target(true)
.init();
}
19 changes: 1 addition & 18 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use clap::Parser;
use error::AppError;
use http_body_util::BodyExt;
use local_ip_address::local_ip;
use std::env;
use std::future::IntoFuture;
use std::net::SocketAddr;
use std::str::FromStr;
Expand All @@ -22,7 +21,6 @@ use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
use tower_http::validate_request::ValidateRequestHeaderLayer;
use tracing::{debug, error, info, info_span, warn};
use tracing_subscriber::EnvFilter;

use crate::auth::ManyValidate;
use crate::config::Config;
Expand Down Expand Up @@ -55,7 +53,7 @@ async fn main() {
metrics_register();
let args = Args::parse();
let mut cfg = Config::parse(args.config);
set_log(format!("live777={},webrtc=error", cfg.log.level));
utils::set_log(format!("live777={},webrtc=error", cfg.log.level));
warn!("set log level : {}", cfg.log.level);
debug!("config : {:?}", cfg);
let listener = tokio::net::TcpListener::bind(&cfg.http.listen)
Expand Down Expand Up @@ -118,21 +116,6 @@ async fn main() {
info!("Server shutdown");
}

fn set_log(env_filter: String) {
let _ = env::var("RUST_LOG").is_err_and(|_| {
env::set_var("RUST_LOG", env_filter);
true
});
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.compact()
.with_file(true)
.with_line_number(true)
.with_thread_ids(true)
.with_target(true)
.init();
}

fn metrics_register() {
metrics::REGISTRY
.register(Box::new(metrics::STREAM.clone()))
Expand Down
2 changes: 2 additions & 0 deletions tools/whepfrom/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ tokio = { version = "1.36", features = ["full"] }
cli = { path = "../../libs/cli" }
libwish = { path = "../../libs/libwish" }
signal = { path = "../../libs/signal" }
utils = { path = "../../libs/utils" }
scopeguard = "1.2.0"
tracing = "0.1"
24 changes: 20 additions & 4 deletions tools/whepfrom/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{sync::Arc, time::Duration};

use anyhow::{anyhow, Result};
use clap::Parser;
use clap::{ArgAction, Parser};
use cli::{create_child, get_codec_type, Codec};

use libwish::Client;
Expand All @@ -10,6 +10,7 @@ use tokio::{
net::UdpSocket,
sync::mpsc::{unbounded_channel, UnboundedSender},
};
use tracing::{info, trace, warn, Level};
use webrtc::ice_transport::ice_credential_type::RTCIceCredentialType;
use webrtc::{
api::{interceptor_registry::register_default_interceptors, media_engine::*, APIBuilder},
Expand All @@ -32,6 +33,9 @@ const PREFIX_LIB: &str = "WEBRTC";
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Verbose mode (-v, -vv, -vvv)
#[arg(short = 'v', action = ArgAction::Count, default_value_t = 0)]
verbose: u8,
#[arg(short, long)]
target: String,
#[arg(short, long, value_enum)]
Expand All @@ -56,6 +60,17 @@ struct Args {
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();

utils::set_log(format!(
"whipinto={},webrtc=error",
match args.verbose {
0 => Level::WARN,
1 => Level::INFO,
2 => Level::DEBUG,
_ => Level::TRACE,
}
));

let payload_type = args.payload_type;
assert!((96..=127).contains(&payload_type));
let udp_socket = UdpSocket::bind("0.0.0.0:0").await?;
Expand Down Expand Up @@ -104,12 +119,12 @@ async fn main() -> Result<()> {
}
tokio::time::sleep(Duration::from_secs(1)).await;
},
None => println!("No child process"),
None => info!("No child process"),
}
});
tokio::select! {
_= complete_rx.recv() => { }
msg = signal::wait_for_stop_signal() => println!("Received signal: {}", msg)
msg = signal::wait_for_stop_signal() => warn!("Received signal: {}", msg)
}
let _ = client.remove_resource().await;
let _ = peer.close().await;
Expand Down Expand Up @@ -195,7 +210,7 @@ async fn new_peer(
let pc = pc.clone();
let complete_tx = complete_tx.clone();
tokio::spawn(async move {
println!("connection state changed: {}", s);
warn!("connection state changed: {}", s);
match s {
RTCPeerConnectionState::Failed | RTCPeerConnectionState::Disconnected => {
let _ = pc.close().await;
Expand All @@ -213,6 +228,7 @@ async fn new_peer(
tokio::spawn(async move {
let mut b = [0u8; 1500];
while let Ok((rtp_packet, _)) = track.read(&mut b).await {
trace!("received packet: {}", rtp_packet);
let size = rtp_packet.marshal_size();
let data = b[0..size].to_vec();
let _ = sender.send(data);
Expand Down
2 changes: 2 additions & 0 deletions tools/whipinto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ tokio = { version = "1.36", features = ["full"] }
cli = { path = "../../libs/cli" }
libwish = { path = "../../libs/libwish" }
signal = { path = "../../libs/signal" }
utils = { path = "../../libs/utils" }
scopeguard = "1.2.0"
tracing = "0.1"
31 changes: 25 additions & 6 deletions tools/whipinto/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{sync::Arc, time::Duration};

use anyhow::{anyhow, Result};
use clap::Parser;
use clap::{ArgAction, Parser};
use cli::{create_child, Codec};

use libwish::Client;
Expand All @@ -10,6 +10,7 @@ use tokio::{
net::UdpSocket,
sync::mpsc::{unbounded_channel, UnboundedSender},
};
use tracing::{info, trace, warn, Level};
use webrtc::ice_transport::ice_credential_type::RTCIceCredentialType;
use webrtc::{
api::{interceptor_registry::register_default_interceptors, media_engine::*, APIBuilder},
Expand All @@ -32,6 +33,9 @@ const PREFIX_LIB: &str = "WEBRTC";
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Verbose mode (-v, -vv, -vvv)
#[arg(short = 'v', action = ArgAction::Count, default_value_t = 0)]
verbose: u8,
#[arg(short, long, default_value_t = 0)]
port: u16,
#[arg(short, long, value_enum)]
Expand All @@ -53,9 +57,23 @@ struct Args {
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();

utils::set_log(format!(
"whipinto={},webrtc=error",
match args.verbose {
0 => Level::WARN,
1 => Level::INFO,
2 => Level::DEBUG,
_ => Level::TRACE,
}
));

let listener = UdpSocket::bind(format!("0.0.0.0:{}", args.port)).await?;
let port = listener.local_addr()?.port();
println!("=== RTP listener started : {} ===", port);
info!(
"=== RTP listener started : {} ===",
listener.local_addr().unwrap()
);
let mut client = Client::new(
args.url,
Client::get_auth_header_map(args.auth_basic, args.auth_token),
Expand Down Expand Up @@ -92,14 +110,14 @@ async fn main() -> Result<()> {
}
tokio::time::sleep(Duration::from_secs(1)).await;
},
None => println!("No child process"),
None => info!("No child process"),
}
});
tokio::select! {
_= complete_rx.recv() => { }
msg = signal::wait_for_stop_signal() => println!("Received signal: {}", msg)
msg = signal::wait_for_stop_signal() => warn!("Received signal: {}", msg)
}
println!("RTP listener closed");
warn!("RTP listener closed");
let _ = client.remove_resource().await;
let _ = peer.close().await;
Ok(())
Expand Down Expand Up @@ -170,7 +188,7 @@ async fn new_peer(
let pc = pc.clone();
let complete_tx = complete_tx.clone();
tokio::spawn(async move {
println!("connection state changed: {}", s);
warn!("connection state changed: {}", s);
match s {
RTCPeerConnectionState::Failed | RTCPeerConnectionState::Disconnected => {
let _ = pc.close().await;
Expand All @@ -197,6 +215,7 @@ async fn new_peer(
let mut sequence_number: u16 = 0;
while let Some(data) = recv.recv().await {
if let Ok(mut packet) = rtp::packet::Packet::unmarshal(&mut data.as_slice()) {
trace!("received packet: {}", packet);
packet.header.sequence_number = sequence_number;
let _ = track.write_rtp(&packet).await;
sequence_number = sequence_number.wrapping_add(1);
Expand Down

0 comments on commit 00a38a9

Please sign in to comment.