diff --git a/Cargo.lock b/Cargo.lock index a15c029a..d71ab54c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1498,6 +1498,7 @@ dependencies = [ "tower-http", "tracing", "tracing-subscriber", + "utils", "webrtc", ] @@ -1529,6 +1530,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", + "utils", ] [[package]] @@ -3511,6 +3513,13 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "utils" +version = "0.1.0" +dependencies = [ + "tracing-subscriber", +] + [[package]] name = "uuid" version = "1.8.0" @@ -3880,6 +3889,8 @@ dependencies = [ "scopeguard", "signal", "tokio", + "tracing", + "utils", "webrtc", ] @@ -3894,6 +3905,8 @@ dependencies = [ "scopeguard", "signal", "tokio", + "tracing", + "utils", "webrtc", ] diff --git a/Cargo.toml b/Cargo.toml index 0a7da6a0..85a85fab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ local-ip-address = "0.6" libwish = { path = "libs/libwish" } signal = { path = "libs/signal" } live777-http = { path = "libs/live777-http" } +utils = { path = "libs/utils" } reqwest = { version = "0.11.24", features = [ "rustls-tls", ], default-features = false } diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index b8452ef3..fbfc44bc 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -10,6 +10,7 @@ tower-http = { version = "0.5.2", features = ["fs", "auth", "trace", "cors"] } anyhow = { version = "1", features = ["backtrace"] } signal = { path = "../libs/signal" } live777-http = { path = "../libs/live777-http" } +utils = { path = "../libs/utils" } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } clap = { version = "4.5.1", features = ["derive"] } diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 2c0c6ace..7ffc22cf 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -24,11 +24,9 @@ use live777_http::event::Event; use model::{Node, Stream}; use sqlx::mysql::MySqlConnectOptions; use sqlx::MySqlPool; -use std::env; use std::future::IntoFuture; use std::str::FromStr; use std::time::Duration; -use tracing_subscriber::EnvFilter; #[cfg(debug_assertions)] use tower_http::services::{ServeDir, ServeFile}; @@ -64,7 +62,7 @@ async fn main() { sqlx::any::install_default_drivers(); let args = Args::parse(); let cfg = Config::parse(args.config); - set_log(format!( + utils::set_log(format!( "live777_gateway={},live777_storage={},sqlx={},webrtc=error", cfg.log.level, cfg.log.level, cfg.log.level )); @@ -127,21 +125,6 @@ async fn main() { info!("Server shutdown"); } -fn set_log(env_filter: String) { - let _ = env::var("RUST_LOG").is_err_and(|_| { - env::set_var("RUST_LOG", env_filter); - true - }); - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .compact() - .with_file(true) - .with_line_number(true) - .with_thread_ids(true) - .with_target(true) - .init(); -} - #[cfg(not(debug_assertions))] #[derive(RustEmbed)] #[folder = "../assets/"] diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml new file mode 100644 index 00000000..6cde5085 --- /dev/null +++ b/libs/utils/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "utils" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +crate-type = ["lib"] + +[dependencies] +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs new file mode 100644 index 00000000..b2dc9c5a --- /dev/null +++ b/libs/utils/src/lib.rs @@ -0,0 +1,17 @@ +use std::env; +use tracing_subscriber::EnvFilter; + +pub fn set_log(env_filter: String) { + let _ = env::var("RUST_LOG").is_err_and(|_| { + env::set_var("RUST_LOG", env_filter); + true + }); + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .compact() + .with_file(true) + .with_line_number(true) + .with_thread_ids(true) + .with_target(true) + .init(); +} diff --git a/src/main.rs b/src/main.rs index 8b867302..7e613992 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,7 +13,6 @@ use clap::Parser; use error::AppError; use http_body_util::BodyExt; use local_ip_address::local_ip; -use std::env; use std::future::IntoFuture; use std::net::SocketAddr; use std::str::FromStr; @@ -22,7 +21,6 @@ use tower_http::cors::CorsLayer; use tower_http::trace::TraceLayer; use tower_http::validate_request::ValidateRequestHeaderLayer; use tracing::{debug, error, info, info_span, warn}; -use tracing_subscriber::EnvFilter; use crate::auth::ManyValidate; use crate::config::Config; @@ -55,7 +53,7 @@ async fn main() { metrics_register(); let args = Args::parse(); let mut cfg = Config::parse(args.config); - set_log(format!("live777={},webrtc=error", cfg.log.level)); + utils::set_log(format!("live777={},webrtc=error", cfg.log.level)); warn!("set log level : {}", cfg.log.level); debug!("config : {:?}", cfg); let listener = tokio::net::TcpListener::bind(&cfg.http.listen) @@ -118,21 +116,6 @@ async fn main() { info!("Server shutdown"); } -fn set_log(env_filter: String) { - let _ = env::var("RUST_LOG").is_err_and(|_| { - env::set_var("RUST_LOG", env_filter); - true - }); - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .compact() - .with_file(true) - .with_line_number(true) - .with_thread_ids(true) - .with_target(true) - .init(); -} - fn metrics_register() { metrics::REGISTRY .register(Box::new(metrics::STREAM.clone())) diff --git a/tools/whepfrom/Cargo.toml b/tools/whepfrom/Cargo.toml index 74e2a59c..80f6174b 100644 --- a/tools/whepfrom/Cargo.toml +++ b/tools/whepfrom/Cargo.toml @@ -13,4 +13,6 @@ tokio = { version = "1.36", features = ["full"] } cli = { path = "../../libs/cli" } libwish = { path = "../../libs/libwish" } signal = { path = "../../libs/signal" } +utils = { path = "../../libs/utils" } scopeguard = "1.2.0" +tracing = "0.1" diff --git a/tools/whepfrom/src/main.rs b/tools/whepfrom/src/main.rs index 9e783227..c7bd4ec5 100644 --- a/tools/whepfrom/src/main.rs +++ b/tools/whepfrom/src/main.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; use anyhow::{anyhow, Result}; -use clap::Parser; +use clap::{ArgAction, Parser}; use cli::{create_child, get_codec_type, Codec}; use libwish::Client; @@ -10,6 +10,7 @@ use tokio::{ net::UdpSocket, sync::mpsc::{unbounded_channel, UnboundedSender}, }; +use tracing::{info, trace, warn, Level}; use webrtc::ice_transport::ice_credential_type::RTCIceCredentialType; use webrtc::{ api::{interceptor_registry::register_default_interceptors, media_engine::*, APIBuilder}, @@ -32,6 +33,9 @@ const PREFIX_LIB: &str = "WEBRTC"; #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Args { + /// Verbose mode (-v, -vv, -vvv) + #[arg(short = 'v', action = ArgAction::Count, default_value_t = 0)] + verbose: u8, #[arg(short, long)] target: String, #[arg(short, long, value_enum)] @@ -56,6 +60,17 @@ struct Args { #[tokio::main] async fn main() -> Result<()> { let args = Args::parse(); + + utils::set_log(format!( + "whipinto={},webrtc=error", + match args.verbose { + 0 => Level::WARN, + 1 => Level::INFO, + 2 => Level::DEBUG, + _ => Level::TRACE, + } + )); + let payload_type = args.payload_type; assert!((96..=127).contains(&payload_type)); let udp_socket = UdpSocket::bind("0.0.0.0:0").await?; @@ -104,12 +119,12 @@ async fn main() -> Result<()> { } tokio::time::sleep(Duration::from_secs(1)).await; }, - None => println!("No child process"), + None => info!("No child process"), } }); tokio::select! { _= complete_rx.recv() => { } - msg = signal::wait_for_stop_signal() => println!("Received signal: {}", msg) + msg = signal::wait_for_stop_signal() => warn!("Received signal: {}", msg) } let _ = client.remove_resource().await; let _ = peer.close().await; @@ -195,7 +210,7 @@ async fn new_peer( let pc = pc.clone(); let complete_tx = complete_tx.clone(); tokio::spawn(async move { - println!("connection state changed: {}", s); + warn!("connection state changed: {}", s); match s { RTCPeerConnectionState::Failed | RTCPeerConnectionState::Disconnected => { let _ = pc.close().await; @@ -213,6 +228,7 @@ async fn new_peer( tokio::spawn(async move { let mut b = [0u8; 1500]; while let Ok((rtp_packet, _)) = track.read(&mut b).await { + trace!("received packet: {}", rtp_packet); let size = rtp_packet.marshal_size(); let data = b[0..size].to_vec(); let _ = sender.send(data); diff --git a/tools/whipinto/Cargo.toml b/tools/whipinto/Cargo.toml index da942547..34ce3ad4 100644 --- a/tools/whipinto/Cargo.toml +++ b/tools/whipinto/Cargo.toml @@ -13,4 +13,6 @@ tokio = { version = "1.36", features = ["full"] } cli = { path = "../../libs/cli" } libwish = { path = "../../libs/libwish" } signal = { path = "../../libs/signal" } +utils = { path = "../../libs/utils" } scopeguard = "1.2.0" +tracing = "0.1" diff --git a/tools/whipinto/src/main.rs b/tools/whipinto/src/main.rs index 6c918066..88aca8b0 100644 --- a/tools/whipinto/src/main.rs +++ b/tools/whipinto/src/main.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; use anyhow::{anyhow, Result}; -use clap::Parser; +use clap::{ArgAction, Parser}; use cli::{create_child, Codec}; use libwish::Client; @@ -10,6 +10,7 @@ use tokio::{ net::UdpSocket, sync::mpsc::{unbounded_channel, UnboundedSender}, }; +use tracing::{info, trace, warn, Level}; use webrtc::ice_transport::ice_credential_type::RTCIceCredentialType; use webrtc::{ api::{interceptor_registry::register_default_interceptors, media_engine::*, APIBuilder}, @@ -32,6 +33,9 @@ const PREFIX_LIB: &str = "WEBRTC"; #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Args { + /// Verbose mode (-v, -vv, -vvv) + #[arg(short = 'v', action = ArgAction::Count, default_value_t = 0)] + verbose: u8, #[arg(short, long, default_value_t = 0)] port: u16, #[arg(short, long, value_enum)] @@ -53,9 +57,23 @@ struct Args { #[tokio::main] async fn main() -> Result<()> { let args = Args::parse(); + + utils::set_log(format!( + "whipinto={},webrtc=error", + match args.verbose { + 0 => Level::WARN, + 1 => Level::INFO, + 2 => Level::DEBUG, + _ => Level::TRACE, + } + )); + let listener = UdpSocket::bind(format!("0.0.0.0:{}", args.port)).await?; let port = listener.local_addr()?.port(); - println!("=== RTP listener started : {} ===", port); + info!( + "=== RTP listener started : {} ===", + listener.local_addr().unwrap() + ); let mut client = Client::new( args.url, Client::get_auth_header_map(args.auth_basic, args.auth_token), @@ -92,14 +110,14 @@ async fn main() -> Result<()> { } tokio::time::sleep(Duration::from_secs(1)).await; }, - None => println!("No child process"), + None => info!("No child process"), } }); tokio::select! { _= complete_rx.recv() => { } - msg = signal::wait_for_stop_signal() => println!("Received signal: {}", msg) + msg = signal::wait_for_stop_signal() => warn!("Received signal: {}", msg) } - println!("RTP listener closed"); + warn!("RTP listener closed"); let _ = client.remove_resource().await; let _ = peer.close().await; Ok(()) @@ -170,7 +188,7 @@ async fn new_peer( let pc = pc.clone(); let complete_tx = complete_tx.clone(); tokio::spawn(async move { - println!("connection state changed: {}", s); + warn!("connection state changed: {}", s); match s { RTCPeerConnectionState::Failed | RTCPeerConnectionState::Disconnected => { let _ = pc.close().await; @@ -197,6 +215,7 @@ async fn new_peer( let mut sequence_number: u16 = 0; while let Some(data) = recv.recv().await { if let Ok(mut packet) = rtp::packet::Packet::unmarshal(&mut data.as_slice()) { + trace!("received packet: {}", packet); packet.header.sequence_number = sequence_number; let _ = track.write_rtp(&packet).await; sequence_number = sequence_number.wrapping_add(1);