diff --git a/Cargo.lock b/Cargo.lock index d71ab54c..129d5e2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -324,6 +324,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-extra" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0be6ea09c9b96cb5076af0de2e383bd2bc0c18f827cf1967bdd353e0b910d733" +dependencies = [ + "axum", + "axum-core", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "serde", + "serde_html_form", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -1474,6 +1497,7 @@ dependencies = [ "anyhow", "async-trait", "axum", + "axum-extra", "base64 0.22.1", "chrono", "clap", @@ -1508,9 +1532,11 @@ version = "0.1.0" dependencies = [ "anyhow", "axum", + "axum-extra", "base64 0.22.1", "chrono", "clap", + "futures-util", "http 1.1.0", "http-body 1.0.0", "http-body-util", @@ -1538,6 +1564,7 @@ name = "live777-http" version = "0.1.0" dependencies = [ "serde", + "serde_html_form", ] [[package]] @@ -2576,6 +2603,19 @@ dependencies = [ "syn 2.0.61", ] +[[package]] +name = "serde_html_form" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de514ef58196f1fc96dcaef80fe6170a1ce6215df9687a93fe8300e773fefc5" +dependencies = [ + "form_urlencoded", + "indexmap", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_json" version = "1.0.117" diff --git a/Cargo.toml b/Cargo.toml index 85a85fab..b17c21ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ utils = { path = "libs/utils" } reqwest = { version = "0.11.24", features = [ "rustls-tls", ], default-features = false } +axum-extra = { version = "0.9.3", features = ["query"] } # cargo install cargo-deb # Reference: https://github.com/kornelski/cargo-deb [package.metadata.deb] diff --git a/conf/live777.toml b/conf/live777.toml index 4084d761..b19ef326 100644 --- a/conf/live777.toml +++ b/conf/live777.toml @@ -1,5 +1,5 @@ # For cluster, you must enable this -# webhooks = ["http://127.0.0.1:8080/webhook"] +# webhooks = ["http://127.0.0.1:8080/webhook?token="] # If not set, auto detect # node_addr = "127.0.0.1:7777" diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index fbfc44bc..013c3946 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -35,3 +35,5 @@ serde_json = "1.0.114" reqwest = { version = "0.11.24", features = [ "rustls-tls", ], default-features = false } +futures-util = "0.3" +axum-extra = { version = "0.9.3", features = ["query"] } diff --git a/gateway/src/config.rs b/gateway/src/config.rs index 6fa6a41f..6cfa5b9c 100644 --- a/gateway/src/config.rs +++ b/gateway/src/config.rs @@ -15,6 +15,8 @@ pub struct Config { pub reforward: Reforward, #[serde(default = "default_db_url")] pub db_url: String, + #[serde(default)] + pub node_sync_tick_time: NodeSyncTickTime, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -76,6 +78,15 @@ impl Default for PublishLeaveTimeout { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeSyncTickTime(pub u64); + +impl Default for NodeSyncTickTime { + fn default() -> Self { + NodeSyncTickTime(5000) + } +} + fn default_http_listen() -> SocketAddr { SocketAddr::from_str(&format!( "0.0.0.0:{}", diff --git a/gateway/src/db.rs b/gateway/src/db.rs index e2a9aace..17cc2471 100644 --- a/gateway/src/db.rs +++ b/gateway/src/db.rs @@ -1,23 +1,38 @@ -use std::time::Duration; - use crate::{ error::AppError, model::{Node, Stream}, result::Result, }; -use chrono::{DateTime, Utc}; +use chrono::Utc; use sqlx::MySqlPool; impl Node { - pub async fn nodes(pool: &sqlx::mysql::MySqlPool) -> Result> { - let nodes: Vec = sqlx::query_as(r#"select * from nodes updated_at >= ?"#) - .bind(Utc::now() - Duration::from_millis(10000)) + pub async fn db_find_not_deactivate_nodes(pool: &MySqlPool) -> Result> { + let nodes: Vec = sqlx::query_as("select * from nodes where updated_at != ?") + .bind(Node::deactivate_time()) .fetch_all(pool) .await?; Ok(nodes) } - pub async fn max_idlest_node(pool: &sqlx::mysql::MySqlPool) -> Result> { + pub async fn db_find_reforward_nodes(pool: &MySqlPool) -> Result> { + let nodes: Vec = + sqlx::query_as("select * from nodes where updated_at >= ? and reforward > 0") + .bind(Node::active_time_point()) + .fetch_all(pool) + .await?; + Ok(nodes) + } + + pub async fn db_find_by_addr(pool: &MySqlPool, addr: String) -> Result> { + let node: Option = sqlx::query_as("select * from nodes where addr = ?") + .bind(addr) + .fetch_optional(pool) + .await?; + Ok(node) + } + + pub async fn max_idlest_node(pool: &MySqlPool) -> Result> { let sql = r#" select * from nodes where @@ -26,14 +41,14 @@ impl Node { and stream < pub_max order by sub_max - subscribe desc limit 1 "#; - let mut nodes: Vec = sqlx::query_as(sql) - .bind(Utc::now() - Duration::from_millis(10000)) - .fetch_all(pool) + let node: Option = sqlx::query_as(sql) + .bind(Node::active_time_point()) + .fetch_optional(pool) .await?; - Ok(nodes.pop()) + Ok(node) } - pub async fn db_insert(&self, pool: &MySqlPool) -> Result<()> { + pub async fn db_save_or_update(&self, pool: &MySqlPool) -> Result<()> { sqlx::query( r#"INSERT INTO nodes ( addr, authorization, admin_authorization, pub_max, sub_max, reforward_maximum_idle_time, reforward_cascade) VALUES (?, ?, ?, ?, ?, ?, ?) @@ -83,7 +98,7 @@ impl Node { pub async fn db_remove(&self, pool: &MySqlPool) -> Result<()> { sqlx::query(r#"UPDATE nodes SET updated_at = ? WHERE addr = ?"#) - .bind(DateTime::from_timestamp_millis(0).unwrap()) + .bind(Node::deactivate_time()) .bind(self.addr.clone()) .execute(pool) .await?; @@ -100,7 +115,7 @@ impl Node { "#, ) .bind(stream) - .bind(Utc::now() - Duration::from_millis(10000)) + .bind(Node::active_time_point()) .fetch_all(pool) .await?; Ok(nodes) @@ -108,14 +123,20 @@ impl Node { } impl Stream { - pub async fn db_insert(&self, pool: &MySqlPool) -> Result<()> { + pub async fn db_save_or_update(&self, pool: &MySqlPool) -> Result<()> { sqlx::query( - r#"INSERT INTO streams (stream,addr,publish) - VALUES (?, ?,1) - ON DUPLICATE KEY UPDATE publish=1,subscribe=0 ,reforward=0"#, + r#"INSERT INTO streams (stream,addr,publish,subscribe,reforward) + VALUES (?, ?,?,?,?) + ON DUPLICATE KEY UPDATE publish=?,subscribe=? ,reforward=?"#, ) .bind(self.stream.clone().clone()) .bind(self.addr.clone()) + .bind(self.publish) + .bind(self.subscribe) + .bind(self.reforward) + .bind(self.publish) + .bind(self.subscribe) + .bind(self.reforward) .execute(pool) .await?; Ok(()) @@ -151,4 +172,12 @@ impl Stream { .await?; Ok(()) } + + pub async fn db_find_node_stream(pool: &MySqlPool, addr: String) -> Result> { + let streams: Vec = sqlx::query_as("select * from streams where addr = ?") + .bind(addr) + .fetch_all(pool) + .await?; + Ok(streams) + } } diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 7ffc22cf..9098cae1 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -12,6 +12,7 @@ use axum::{ routing::post, Router, }; +use axum_extra::extract::Query; use chrono::Utc; use clap::Parser; @@ -34,7 +35,7 @@ use tower_http::services::{ServeDir, ServeFile}; use tower_http::cors::CorsLayer; use tower_http::trace::TraceLayer; use tower_http::validate_request::ValidateRequestHeaderLayer; -use tracing::{debug, error, info, info_span, warn}; +use tracing::{debug, error, info, info_span, warn, Span}; use crate::auth::ManyValidate; use crate::config::Config; @@ -84,7 +85,6 @@ async fn main() { .unwrap(), client, }; - tokio::spawn(tick::reforward_check(app_state.clone())); let auth_layer = ValidateRequestHeaderLayer::custom(ManyValidate::new(vec![cfg.auth])); let app = Router::new() .route(&live777_http::path::whip(":stream"), post(whip)) @@ -99,7 +99,7 @@ async fn main() { ) .layer(auth_layer) .route("/webhook", post(webhook)) - .with_state(app_state) + .with_state(app_state.clone()) .layer(if cfg.http.cors { CorsLayer::permissive() } else { @@ -113,11 +113,13 @@ async fn main() { uri = ?request.uri(), method = ?request.method(), span_id = tracing::field::Empty, + target_addr = tracing::field::Empty, ); span.record("span_id", span.id().unwrap().into_u64()); span }), ); + tokio::spawn(tick::run(app_state)); tokio::select! { Err(e) = axum::serve(listener, static_server(app)).into_future() => error!("Application error: {e}"), msg = signal::wait_for_stop_signal() => debug!("Received signal: {}", msg), @@ -245,7 +247,7 @@ async fn add_node_stream(node: &Node, stream: String, pool: &MySqlPool) -> Resul updated_at: Utc::now(), id: 0, }; - stream.db_insert(pool).await?; + stream.db_save_or_update(pool).await?; Ok(stream) } @@ -324,8 +326,35 @@ async fn resource( Err(AppError::ResourceNotFound) } +use serde::{Deserialize, Serialize}; +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +struct WebHookQuery { + token: String, + reforward_maximum_idle_time: Option, + reforward_cascade: Option, +} + +impl WebHookQuery { + fn get_reforward_maximum_idle_time(&self) -> u64 { + if let Some(reforward_maximum_idle_time) = self.reforward_maximum_idle_time { + reforward_maximum_idle_time + } else { + 0 + } + } + fn get_reforward_cascade(&self) -> bool { + if let Some(reforward_cascade) = self.reforward_cascade { + reforward_cascade + } else { + false + } + } +} + async fn webhook( State(state): State, + Query(qry): Query, Json(event_body): Json, ) -> Result { let pool = &state.pool; @@ -337,6 +366,8 @@ async fn webhook( publish: metrics.publish, subscribe: metrics.subscribe, reforward: metrics.reforward, + reforward_maximum_idle_time: qry.get_reforward_maximum_idle_time(), + reforward_cascade: qry.get_reforward_cascade(), ..Default::default() }; match event_body.event { @@ -346,14 +377,14 @@ async fn webhook( node.pub_max = metadata.pub_max; node.sub_max = metadata.sub_max; match r#type { - live777_http::event::NodeEventType::Up => node.db_insert(pool).await?, + live777_http::event::NodeEventType::Up => node.db_save_or_update(pool).await?, live777_http::event::NodeEventType::Down => { node.db_remove(pool).await?; Stream::db_remove_addr_stream(pool, addr.to_string()).await? } live777_http::event::NodeEventType::KeepAlive => { if node.db_update_metrics(pool).await.is_err() { - node.db_insert(pool).await?; + node.db_save_or_update(pool).await?; } } } @@ -369,11 +400,15 @@ async fn webhook( ..Default::default() }; match r#type { - live777_http::event::StreamEventType::StreamUp => db_stream.db_insert(pool).await?, + live777_http::event::StreamEventType::StreamUp => { + db_stream.db_save_or_update(pool).await? + } live777_http::event::StreamEventType::StreamDown => { db_stream.db_remove(pool).await? } - _ => db_stream.db_update_metrics(pool).await?, + _ => { + db_stream.db_update_metrics(pool).await?; + } } } } @@ -381,6 +416,7 @@ async fn webhook( } async fn request_proxy(state: AppState, mut req: Request, target_node: &Node) -> Result { + Span::current().record("target_addr", target_node.addr.clone()); let path = req.uri().path(); let path_query = req .uri() diff --git a/gateway/src/model.rs b/gateway/src/model.rs index 3a82a63b..3f660cd2 100644 --- a/gateway/src/model.rs +++ b/gateway/src/model.rs @@ -1,12 +1,17 @@ -use std::{cmp::Ordering, str::FromStr}; +use std::{cmp::Ordering, str::FromStr, time::Duration}; use crate::{error::AppError, result::Result}; use anyhow::anyhow; use chrono::{serde::ts_milliseconds, DateTime, Utc}; -use live777_http::{path, request::Reforward, response::StreamInfo}; -use reqwest::{header::HeaderMap, Body, Method}; +use live777_http::{ + path, + request::{QueryInfo, Reforward}, + response::StreamInfo, +}; +use reqwest::{header::HeaderMap, Method}; use serde::{Deserialize, Serialize}; use sqlx::prelude::FromRow; +use tracing::{debug, warn}; #[derive(Serialize, Deserialize, Clone, Debug, FromRow)] #[serde(rename_all = "camelCase")] @@ -29,6 +34,16 @@ pub struct Node { pub updated_at: DateTime, } +impl Node { + pub fn active_time_point() -> DateTime { + Utc::now() - Duration::from_millis(10000) + } + + pub fn deactivate_time() -> DateTime { + DateTime::from_timestamp_millis(0).unwrap() + } +} + impl PartialEq for Node { fn eq(&self, other: &Self) -> bool { self.addr == other.addr @@ -122,10 +137,10 @@ impl Node { pub async fn stream_infos(&self, streams: Vec) -> Result> { let data = request( - self.path_url(&path::infos(streams)), + self.path_url(&path::infos(QueryInfo { streams })), "GET", self.admin_authorization.clone(), - "", + "".to_string(), ) .await?; serde_json::from_str::>(&data).map_err(|e| e.into()) @@ -155,35 +170,63 @@ impl Node { self.path_url(&path::resource(&stream, &session)), "DELETE", self.admin_authorization.clone(), - "", + "".to_string(), ) .await?; Ok(()) } } -async fn request>( +async fn request( url: String, method: &str, authorization: Option, - body: T, + body: String, ) -> Result { let mut headers = HeaderMap::new(); headers.append("Content-Type", "application/json".parse().unwrap()); if let Some(authorization) = authorization { headers.append("Authorization", authorization.parse().unwrap()); } - let client = reqwest::Client::new(); - let response = client - .request(Method::from_str(method)?, url) + let client = reqwest::Client::builder() + .connect_timeout(Duration::from_millis(500)) + .timeout(Duration::from_millis(5000)) + .build()?; + + match client + .request(Method::from_str(method)?, url.clone()) .headers(headers) - .body(body) + .body(body.clone()) .send() - .await?; - let success = response.status().is_success(); - let body = response.text().await?; - if !success { - return Err(AppError::InternalServerError(anyhow!(body))); + .await + { + Ok(response) => { + let status = response.status(); + let success = response.status().is_success(); + let res_body = response.text().await?; + if success { + debug!( + url, + ?status, + req_body = body, + res_body, + "request node success" + ); + Ok(res_body) + } else { + warn!( + url, + ?status, + req_body = body, + res_body, + "request node error" + ); + Err(AppError::InternalServerError(anyhow!(res_body))) + } + } + Err(err) => { + warn!(url, req_body = body, ?err, "request node error"); + Err(err.into()) + } } - Ok(body) } diff --git a/gateway/src/tick.rs b/gateway/src/tick.rs index 3ef68006..e6fe17b1 100644 --- a/gateway/src/tick.rs +++ b/gateway/src/tick.rs @@ -1,75 +1,195 @@ -use std::{collections::HashMap, net::SocketAddr, time::Duration}; +use std::{collections::HashMap, net::SocketAddr, time::Duration, vec}; -use crate::{error::AppError, model::Node, result::Result}; +use crate::{ + error::AppError, + model::{Node, Stream}, + result::Result, +}; use anyhow::anyhow; use chrono::Utc; +use futures_util::StreamExt; +use live777_http::response::StreamInfo; +use sqlx::{MySql, Pool}; use tracing::info; use url::Url; use crate::AppState; +pub async fn run(state: AppState) { + tokio::spawn(node_sync(state.clone())); + tokio::spawn(reforward_check(state.clone())); +} + +pub async fn node_sync(state: AppState) { + loop { + let _ = do_node_sync(state.clone()).await; + tokio::time::sleep(Duration::from_millis(state.config.node_sync_tick_time.0)).await; + } +} + +pub async fn do_node_sync(state: AppState) -> Result<()> { + let active_time_point = Node::active_time_point(); + let nodes: Vec = Node::db_find_not_deactivate_nodes(&state.pool).await?; + let mut active_nodes = vec![]; + let mut inactivate_nodes = vec![]; + for node in nodes { + if node.updated_at >= active_time_point { + active_nodes.push(node); + } else { + inactivate_nodes.push(node); + } + } + //inactivate_nodes remove + futures_util::stream::iter(inactivate_nodes) + .for_each_concurrent(None, |node| async { + let _ = node.db_remove(&state.pool).await; + let _ = Stream::db_remove_addr_stream(&state.pool, node.addr).await; + }) + .await; + //active_nodes sync info + futures_util::stream::iter(active_nodes) + .for_each_concurrent(None, |node| async { + let _ = node_sync_info(node, &state.pool).await; + }) + .await; + Ok(()) +} + +impl From for Stream { + fn from(value: StreamInfo) -> Self { + Stream { + stream: value.id, + publish: if value.publish_session_info.is_some() { + 1 + } else { + 0 + }, + subscribe: value.subscribe_session_infos.len() as u64, + reforward: value + .subscribe_session_infos + .iter() + .filter(|session| session.reforward.is_some()) + .count() as u64, + ..Default::default() + } + } +} + +pub async fn node_sync_info(node: Node, pool: &Pool) -> Result<()> { + let current_stream_map: &HashMap = &node + .stream_infos(vec![]) + .await? + .into_iter() + .map(|stream_info| { + let mut stream = Stream::from(stream_info); + stream.addr = node.addr.clone(); + (stream.stream.clone(), stream.clone()) + }) + .collect(); + let stream_map: &HashMap = + &Stream::db_find_node_stream(pool, node.addr.clone()) + .await? + .into_iter() + .map(|stream| (stream.stream.clone(), stream.clone())) + .collect(); + // delete + futures_util::stream::iter(stream_map) + .for_each_concurrent(None, |(stream_id, stream)| async move { + if !current_stream_map.contains_key(stream_id) { + let _ = stream.db_remove(pool).await; + } + }) + .await; + // save or update + futures_util::stream::iter(current_stream_map) + .for_each_concurrent(None, |(stream_id, current_stream)| async move { + if let Some(stream) = stream_map.get(stream_id) { + if stream.publish != current_stream.publish + || stream.subscribe != current_stream.subscribe + || stream.reforward != current_stream.reforward + { + let _ = current_stream.db_update_metrics(pool).await; + } + } else { + let _ = current_stream.db_save_or_update(pool).await; + } + }) + .await; + Ok(()) +} + pub async fn reforward_check(state: AppState) { loop { + let _ = do_reforward_check(state.clone()).await; tokio::time::sleep(Duration::from_millis( state.config.reforward.check_tick_time.0, )) .await; - let _ = do_reforward_check(state.clone()).await; } } async fn do_reforward_check(state: AppState) -> Result<()> { - let nodes = Node::nodes(&state.pool).await?; - if nodes.is_empty() { + let reforward_nodes = Node::db_find_reforward_nodes(&state.pool).await?; + if reforward_nodes.is_empty() { return Ok(()); } - let mut node_map = HashMap::new(); - let mut node_streams_map = HashMap::new(); - for node in nodes.iter() { - node_map.insert(node.addr.clone(), node.clone()); - if let Ok(streams) = node.stream_infos(vec![]).await { - node_streams_map.insert(node.addr.clone(), streams); - } + futures_util::stream::iter(reforward_nodes) + .for_each_concurrent(None, |node| async { + let _ = node_reforward_check(node, &state.pool).await; + }) + .await; + Ok(()) +} + +async fn node_reforward_check(node: Node, pool: &Pool) -> Result<()> { + let streams = node.stream_infos(vec![]).await?; + if streams.is_empty() { + return Ok(()); } - for (addr, streams) in node_streams_map.iter() { - let node = node_map.get(addr).unwrap(); - for stream_info in streams { - for session_info in &stream_info.subscribe_session_infos { - if let Some(reforward_info) = &session_info.reforward { - if let Ok((target_addr, target_stream)) = - parse_node_and_stream(reforward_info.target_url.clone()) + futures_util::stream::iter(streams) + .for_each_concurrent(None, |stream_info| async { + let _ = node_stream_reforward_check(node.clone(), pool, stream_info).await; + }) + .await; + Ok(()) +} + +async fn node_stream_reforward_check( + node: Node, + pool: &Pool, + stream_info: StreamInfo, +) -> Result<()> { + for session_info in &stream_info.subscribe_session_infos { + if let Some(reforward_info) = &session_info.reforward { + if let Ok((target_addr, target_stream)) = + parse_node_and_stream(reforward_info.target_url.clone()) + { + let target_node = Node::db_find_by_addr(pool, target_addr.to_string()).await; + if let Ok(Some(target_node)) = target_node { + if let Ok(Some(target_stream_info)) = + target_node.stream_info(target_stream).await { - if let Some(target_node) = node_map.get(&target_addr.to_string()) { - if let Ok(Some(target_stream_info)) = - target_node.stream_info(target_stream).await - { - if target_stream_info.subscribe_leave_time != 0 - && Utc::now().timestamp_millis() - >= target_stream_info.subscribe_leave_time - + node.reforward_maximum_idle_time as i64 - { - info!( - ?node, - stream_info.id, - session_info.id, - ?target_stream_info, - "reforward idle for long periods of time" - ); - let _ = node - .resource_delete( - stream_info.id.clone(), - session_info.id.clone(), - ) - .await; - } - } + if target_stream_info.subscribe_leave_time != 0 + && Utc::now().timestamp_millis() + >= target_stream_info.subscribe_leave_time + + node.reforward_maximum_idle_time as i64 + { + info!( + ?node, + stream_info.id, + session_info.id, + ?target_stream_info, + "reforward idle for long periods of time" + ); + let _ = node + .resource_delete(stream_info.id.clone(), session_info.id.clone()) + .await; } } } } } } - Ok(()) } diff --git a/libs/live777-http/Cargo.toml b/libs/live777-http/Cargo.toml index c373539f..2565a4ac 100644 --- a/libs/live777-http/Cargo.toml +++ b/libs/live777-http/Cargo.toml @@ -9,3 +9,4 @@ crate-type = ["lib"] [dependencies] serde = { workspace = true, features = ["serde_derive"] } +serde_html_form = "0.2.6" diff --git a/libs/live777-http/src/path.rs b/libs/live777-http/src/path.rs index 5058ed03..47ca6818 100644 --- a/libs/live777-http/src/path.rs +++ b/libs/live777-http/src/path.rs @@ -1,3 +1,5 @@ +use crate::request::QueryInfo; + pub const METRICS: &str = "/metrics"; pub const METRICS_JSON: &str = "/metrics/json"; pub const ADMIN_INFOS: &str = "/admin/infos"; @@ -20,6 +22,7 @@ pub fn resource_layer(stream: &str, session: &str) -> String { format!("/resource/{}/{}/layer", stream, session) } -pub fn infos(streams: Vec) -> String { - format!("/admin/infos?streams={}", streams.join(",")) +pub fn infos(qry: QueryInfo) -> String { + let query = serde_html_form::to_string(qry).unwrap(); + format!("/admin/infos?{}", query) } diff --git a/libs/live777-http/src/request.rs b/libs/live777-http/src/request.rs index 2ad16d3c..b4f5b6c6 100644 --- a/libs/live777-http/src/request.rs +++ b/libs/live777-http/src/request.rs @@ -15,7 +15,7 @@ pub struct ChangeResource { #[derive(Serialize, Deserialize, Clone)] pub struct QueryInfo { #[serde(default)] - pub streams: Option, + pub streams: Vec, } #[derive(Serialize, Deserialize, Clone)] diff --git a/sql/init.sql b/sql/init.sql index 8d438119..e6405d59 100644 --- a/sql/init.sql +++ b/sql/init.sql @@ -1,31 +1,34 @@ -create table nodes ( - id bigint unsigned auto_increment primary key, - addr varchar(255) default '0.0.0.0:0' not null, - authorization varchar(255) null, - admin_authorization varchar(255) null, - pub_max bigint unsigned default '0' not null, - sub_max bigint unsigned default '0' not null, - reforward_maximum_idle_time bigint unsigned default '0' not null, - reforward_cascade tinyint(1) default 0 not null, - stream bigint unsigned default '0' not null, - publish bigint unsigned default '0' not null, - subscribe bigint unsigned default '0' not null, - reforward bigint unsigned default '0' not null, - created_at timestamp default CURRENT_TIMESTAMP null, - updated_at datetime default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP, - constraint uk_addr - unique (addr) +CREATE TABLE `nodes` ( + `id` bigint unsigned NOT NULL AUTO_INCREMENT, + `addr` varchar(255) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '0.0.0.0:0', + `authorization` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL, + `admin_authorization` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL, + `pub_max` bigint unsigned NOT NULL DEFAULT '0', + `sub_max` bigint unsigned NOT NULL DEFAULT '0', + `reforward_maximum_idle_time` bigint unsigned NOT NULL DEFAULT '0', + `reforward_cascade` tinyint(1) NOT NULL DEFAULT '0', + `stream` bigint unsigned NOT NULL DEFAULT '0', + `publish` bigint unsigned NOT NULL DEFAULT '0', + `subscribe` bigint unsigned NOT NULL DEFAULT '0', + `reforward` bigint unsigned NOT NULL DEFAULT '0', + `created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_addr` (`addr`), + KEY `idx_update_time` (`updated_at`), + KEY `idx_reforward` (`reforward`) ); -create table streams ( - id bigint unsigned auto_increment primary key, - stream varchar(255) default '' not null, - addr varchar(255) default '0.0.0.0:0' not null, - publish bigint unsigned default '0' not null, - subscribe bigint unsigned default '0' not null, - reforward bigint unsigned default '0' not null, - created_at datetime default CURRENT_TIMESTAMP null, - updated_at datetime default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP, - constraint uk_stream_addr - unique (stream, addr) +CREATE TABLE `streams` ( + `id` bigint unsigned NOT NULL AUTO_INCREMENT, + `stream` varchar(255) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `addr` varchar(255) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '0.0.0.0:0', + `publish` bigint unsigned NOT NULL DEFAULT '0', + `subscribe` bigint unsigned NOT NULL DEFAULT '0', + `reforward` bigint unsigned NOT NULL DEFAULT '0', + `created_at` datetime DEFAULT CURRENT_TIMESTAMP, + `updated_at` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_stream_addr` (`stream`,`addr`), + KEY `idx_addr` (`addr`) ); diff --git a/src/hook/webhook.rs b/src/hook/webhook.rs index e915e903..4045afb1 100644 --- a/src/hook/webhook.rs +++ b/src/hook/webhook.rs @@ -1,11 +1,11 @@ -use std::{net::SocketAddr, str::FromStr}; +use std::{net::SocketAddr, str::FromStr, time::Duration}; use async_trait::async_trait; use live777_http::event::{EventBody, NodeMetaData, NodeMetrics}; use reqwest::{header::HeaderMap, Client, Method}; use tokio::sync::broadcast; -use tracing::debug; +use tracing::{debug, warn}; use super::{Event, EventHook, NodeEvent}; use crate::{error::AppError, metrics, result::Result}; @@ -24,7 +24,11 @@ impl WebHook { url, addr, metadata, - client: reqwest::Client::new(), + client: reqwest::Client::builder() + .connect_timeout(Duration::from_millis(300)) + .timeout(Duration::from_millis(500)) + .build() + .unwrap(), } } @@ -47,17 +51,31 @@ impl WebHook { .await { Ok(response) => { + let status = response.status(); let success = response.status().is_success(); let res_body = response.text().await?; - debug!(url = self.url, req_body, success, res_body, "event webhook"); - if !success { - Err(AppError::throw(res_body)) - } else { + if success { + debug!( + url = self.url, + ?status, + req_body, + res_body, + "event webhook success" + ); Ok(()) + } else { + warn!( + url = self.url, + ?status, + req_body, + res_body, + "event webhook error" + ); + Err(AppError::throw(res_body)) } } Err(err) => { - debug!(url = self.url, req_body, ?err, "event webhook error"); + warn!(url = self.url, req_body, ?err, "event webhook error"); Err(err.into()) } } diff --git a/src/route/admin.rs b/src/route/admin.rs index 9cb8bf41..2f5ad26c 100644 --- a/src/route/admin.rs +++ b/src/route/admin.rs @@ -1,8 +1,9 @@ use crate::forward::message::ReforwardInfo; use crate::AppState; -use axum::extract::{Path, Query, State}; +use axum::extract::{Path, State}; use axum::routing::{get, post}; use axum::{Json, Router}; +use axum_extra::extract::Query; pub fn route() -> Router { Router::new() .route(live777_http::path::ADMIN_INFOS, get(infos)) @@ -15,12 +16,7 @@ async fn infos( Ok(Json( state .stream_manager - .info(req.streams.map_or(vec![], |streams| { - streams - .split(',') - .map(|stream| stream.to_string()) - .collect() - })) + .info(req.streams) .await .into_iter() .map(|forward_info| forward_info.into())