From 253ac940782ca1b7e7ba9b50ad06f0b10b0f289a Mon Sep 17 00:00:00 2001 From: hongcha98 Date: Sun, 8 Sep 2024 23:45:23 +0800 Subject: [PATCH 1/2] feat(api): add sse --- Cargo.lock | 35 +++++++++++++++++++++++++++++++++++ libs/api/src/path.rs | 4 ++++ libs/api/src/request.rs | 6 ++++++ liveion/Cargo.toml | 7 +++++-- liveion/src/forward/mod.rs | 2 ++ liveion/src/lib.rs | 32 +++++++++++++++++--------------- liveion/src/route/stream.rs | 33 ++++++++++++++++++++++++++++++++- liveion/src/stream/manager.rs | 30 ++++++++++++++++++++++++++++++ 8 files changed, 131 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 602b0b04..c46ab4ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -187,6 +187,28 @@ dependencies = [ "syn 2.0.68", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.68", +] + [[package]] name = "async-trait" version = "0.1.80" @@ -1220,6 +1242,7 @@ version = "0.5.1" dependencies = [ "anyhow", "api", + "async-stream", "async-trait", "axum", "axum-extra", @@ -1241,6 +1264,7 @@ dependencies = [ "serde_json", "signal", "tokio", + "tokio-stream", "toml", "tower-http", "tracing", @@ -2599,6 +2623,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.11" diff --git a/libs/api/src/path.rs b/libs/api/src/path.rs index a50dbef7..3dd407aa 100644 --- a/libs/api/src/path.rs +++ b/libs/api/src/path.rs @@ -22,3 +22,7 @@ pub fn streams(stream: &str) -> String { pub fn cascade(stream: &str) -> String { format!("/api/cascade/{}", stream) } + +pub fn streams_sse() -> String { + "/api/sse/streams".to_string() +} diff --git a/libs/api/src/request.rs b/libs/api/src/request.rs index 135a8e60..d5101ff5 100644 --- a/libs/api/src/request.rs +++ b/libs/api/src/request.rs @@ -28,3 +28,9 @@ pub struct Cascade { // push mode ,value : whip_url pub target_url: Option, } + +#[derive(Serialize, Deserialize, Clone)] +pub struct StreamSSE { + #[serde(default)] + pub streams: Vec, +} diff --git a/liveion/Cargo.toml b/liveion/Cargo.toml index 07b085e9..7f2d4e40 100644 --- a/liveion/Cargo.toml +++ b/liveion/Cargo.toml @@ -20,6 +20,8 @@ anyhow = { workspace = true, features = ["backtrace"] } clap = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["serde_derive"] } tokio = { workspace = true, features = ["full"] } +tokio-stream = "0.1.15" +async-stream = "0.3.5" tracing = { workspace = true } webrtc = { workspace = true } @@ -40,8 +42,9 @@ serde_json = "1.0.114" toml = "0.8.10" tower-http = { version = "0.5.2", features = ["fs", "auth", "trace", "cors"] } -reqwest = { version = "0.12", optional = true, features = ["rustls-tls"], default-features = false } +reqwest = { version = "0.12", optional = true, features = [ + "rustls-tls", +], default-features = false } [features] webhook = ["dep:reqwest"] - diff --git a/liveion/src/forward/mod.rs b/liveion/src/forward/mod.rs index ccf7b31c..4cf35ae6 100644 --- a/liveion/src/forward/mod.rs +++ b/liveion/src/forward/mod.rs @@ -36,6 +36,7 @@ pub(crate) fn get_peer_id(peer: &Arc) -> String { #[derive(Clone)] pub struct PeerForward { + pub(crate) stream: String, publish_lock: Arc>, internal: Arc, } @@ -43,6 +44,7 @@ pub struct PeerForward { impl PeerForward { pub fn new(stream: impl ToString, ice_server: Vec) -> Self { PeerForward { + stream: stream.to_string(), publish_lock: Arc::new(Mutex::new(())), internal: Arc::new(PeerForwardInternal::new(stream, ice_server)), } diff --git a/liveion/src/lib.rs b/liveion/src/lib.rs index e7f99480..8d1c6044 100644 --- a/liveion/src/lib.rs +++ b/liveion/src/lib.rs @@ -12,7 +12,7 @@ use tokio::net::TcpListener; use tower_http::cors::CorsLayer; use tower_http::trace::TraceLayer; use tower_http::validate_request::ValidateRequestHeaderLayer; -use tracing::{error, info_span}; +use tracing::{error, info_span, Level}; use crate::auth::ManyValidate; use crate::config::Config; @@ -63,21 +63,23 @@ where } else { CorsLayer::new() }) - .layer(axum::middleware::from_fn(http_log::print_request_response)) .layer( - TraceLayer::new_for_http().make_span_with(|request: &Request<_>| { - let span = info_span!( - "http_request", - uri = ?request.uri(), - method = ?request.method(), - span_id = tracing::field::Empty, - ); - span.record( - "span_id", - span.id().unwrap_or(tracing::Id::from_u64(42)).into_u64(), - ); - span - }), + TraceLayer::new_for_http() + .make_span_with(|request: &Request<_>| { + let span = info_span!( + "http_request", + uri = ?request.uri(), + method = ?request.method(), + span_id = tracing::field::Empty, + ); + span.record( + "span_id", + span.id().unwrap_or(tracing::Id::from_u64(42)).into_u64(), + ); + span + }) + .on_response(tower_http::trace::DefaultOnResponse::new().level(Level::INFO)) + .on_failure(tower_http::trace::DefaultOnFailure::new().level(Level::INFO)), ); app = app.fallback(static_handler); diff --git a/liveion/src/route/stream.rs b/liveion/src/route/stream.rs index 2998f6b2..40154cfd 100644 --- a/liveion/src/route/stream.rs +++ b/liveion/src/route/stream.rs @@ -1,9 +1,15 @@ +use std::convert::Infallible; + use axum::extract::{Path, State}; -use axum::response::Response; +use axum::response::sse::{Event, KeepAlive}; +use axum::response::{Response, Sse}; use axum::routing::{delete, get, post}; use axum::{Json, Router}; use axum_extra::extract::Query; use http::StatusCode; +use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::StreamExt; +use tracing::info; use crate::error::AppError; use crate::AppState; @@ -14,6 +20,7 @@ pub fn route() -> Router { .route(&api::path::streams(":stream"), get(show)) .route(&api::path::streams(":stream"), post(create)) .route(&api::path::streams(":stream"), delete(destroy)) + .route(&api::path::streams_sse(), get(sse)) } async fn index( @@ -72,3 +79,27 @@ async fn destroy( Err(e) => Err(AppError::StreamNotFound(e.to_string())), } } + +async fn sse( + State(state): State, + Query(req): Query, +) -> crate::result::Result< + Sse>>, +> { + let recv = state + .stream_manager + .sse_handler(req.streams.clone()) + .await?; + let stream = ReceiverStream::new(recv).map(|forward_infos| { + Ok(Event::default() + .json_data( + forward_infos + .into_iter() + .map(api::response::Stream::from) + .collect::>(), + ) + .unwrap()) + }); + let resp = Sse::new(stream).keep_alive(KeepAlive::default()); + Ok(resp) +} diff --git a/liveion/src/stream/manager.rs b/liveion/src/stream/manager.rs index de7c6470..c540690d 100644 --- a/liveion/src/stream/manager.rs +++ b/liveion/src/stream/manager.rs @@ -460,4 +460,34 @@ impl Manager { Ok(()) } + + pub async fn sse_handler( + &self, + streams: Vec, + ) -> Result>> { + let (send, recv) = tokio::sync::mpsc::channel(64); + let mut evnet_recv = self.event_sender.subscribe(); + let stream_map = self.stream_map.clone(); + tokio::spawn(async move { + while let Ok(event) = evnet_recv.recv().await { + let stream = match event { + Event::Node(_) => continue, + Event::Stream(val) => val.stream.stream, + Event::Forward(val) => val.stream_info.id, + }; + if streams.is_empty() || streams.contains(&stream) { + let stream_map = stream_map.read().await; + let mut infos = vec![]; + for (_, forward) in stream_map.iter() { + if !streams.is_empty() && !streams.contains(&forward.stream) { + continue; + } + infos.push(forward.info().await); + } + let _ = send.send(infos).await; + } + } + }); + Ok(recv) + } } From 827955f20ff9625684d55774f28689d1dab03d40 Mon Sep 17 00:00:00 2001 From: hongcha98 Date: Sun, 8 Sep 2024 23:49:41 +0800 Subject: [PATCH 2/2] fix: clippy --- liveion/src/route/stream.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/liveion/src/route/stream.rs b/liveion/src/route/stream.rs index 40154cfd..ef6fa154 100644 --- a/liveion/src/route/stream.rs +++ b/liveion/src/route/stream.rs @@ -1,5 +1,7 @@ use std::convert::Infallible; +use crate::error::AppError; +use crate::AppState; use axum::extract::{Path, State}; use axum::response::sse::{Event, KeepAlive}; use axum::response::{Response, Sse}; @@ -9,10 +11,6 @@ use axum_extra::extract::Query; use http::StatusCode; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; -use tracing::info; - -use crate::error::AppError; -use crate::AppState; pub fn route() -> Router { Router::new()