Skip to content

Commit

Permalink
Merge pull request #135 from binbat/next
Browse files Browse the repository at this point in the history
reforward check and request timeout and sync node info
  • Loading branch information
a-wing authored May 14, 2024
2 parents 00a38a9 + 28ef602 commit f9017d5
Show file tree
Hide file tree
Showing 15 changed files with 438 additions and 135 deletions.
40 changes: 40 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ utils = { path = "libs/utils" }
reqwest = { version = "0.11.24", features = [
"rustls-tls",
], default-features = false }
axum-extra = { version = "0.9.3", features = ["query"] }
# cargo install cargo-deb
# Reference: https://github.com/kornelski/cargo-deb
[package.metadata.deb]
Expand Down
2 changes: 1 addition & 1 deletion conf/live777.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# For cluster, you must enable this
# webhooks = ["http://127.0.0.1:8080/webhook"]
# webhooks = ["http://127.0.0.1:8080/webhook?token="]
# If not set, auto detect
# node_addr = "127.0.0.1:7777"

Expand Down
2 changes: 2 additions & 0 deletions gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ serde_json = "1.0.114"
reqwest = { version = "0.11.24", features = [
"rustls-tls",
], default-features = false }
futures-util = "0.3"
axum-extra = { version = "0.9.3", features = ["query"] }
11 changes: 11 additions & 0 deletions gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub struct Config {
pub reforward: Reforward,
#[serde(default = "default_db_url")]
pub db_url: String,
#[serde(default)]
pub node_sync_tick_time: NodeSyncTickTime,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
Expand Down Expand Up @@ -76,6 +78,15 @@ impl Default for PublishLeaveTimeout {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeSyncTickTime(pub u64);

impl Default for NodeSyncTickTime {
fn default() -> Self {
NodeSyncTickTime(5000)
}
}

fn default_http_listen() -> SocketAddr {
SocketAddr::from_str(&format!(
"0.0.0.0:{}",
Expand Down
65 changes: 47 additions & 18 deletions gateway/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,38 @@
use std::time::Duration;

use crate::{
error::AppError,
model::{Node, Stream},
result::Result,
};
use chrono::{DateTime, Utc};
use chrono::Utc;
use sqlx::MySqlPool;

impl Node {
pub async fn nodes(pool: &sqlx::mysql::MySqlPool) -> Result<Vec<Node>> {
let nodes: Vec<Node> = sqlx::query_as(r#"select * from nodes updated_at >= ?"#)
.bind(Utc::now() - Duration::from_millis(10000))
pub async fn db_find_not_deactivate_nodes(pool: &MySqlPool) -> Result<Vec<Node>> {
let nodes: Vec<Node> = sqlx::query_as("select * from nodes where updated_at != ?")
.bind(Node::deactivate_time())
.fetch_all(pool)
.await?;
Ok(nodes)
}

pub async fn max_idlest_node(pool: &sqlx::mysql::MySqlPool) -> Result<Option<Node>> {
pub async fn db_find_reforward_nodes(pool: &MySqlPool) -> Result<Vec<Node>> {
let nodes: Vec<Node> =
sqlx::query_as("select * from nodes where updated_at >= ? and reforward > 0")
.bind(Node::active_time_point())
.fetch_all(pool)
.await?;
Ok(nodes)
}

pub async fn db_find_by_addr(pool: &MySqlPool, addr: String) -> Result<Option<Node>> {
let node: Option<Node> = sqlx::query_as("select * from nodes where addr = ?")
.bind(addr)
.fetch_optional(pool)
.await?;
Ok(node)
}

pub async fn max_idlest_node(pool: &MySqlPool) -> Result<Option<Node>> {
let sql = r#"
select * from nodes
where
Expand All @@ -26,14 +41,14 @@ impl Node {
and stream < pub_max
order by sub_max - subscribe desc limit 1
"#;
let mut nodes: Vec<Node> = sqlx::query_as(sql)
.bind(Utc::now() - Duration::from_millis(10000))
.fetch_all(pool)
let node: Option<Node> = sqlx::query_as(sql)
.bind(Node::active_time_point())
.fetch_optional(pool)
.await?;
Ok(nodes.pop())
Ok(node)
}

pub async fn db_insert(&self, pool: &MySqlPool) -> Result<()> {
pub async fn db_save_or_update(&self, pool: &MySqlPool) -> Result<()> {
sqlx::query(
r#"INSERT INTO nodes ( addr, authorization, admin_authorization, pub_max, sub_max, reforward_maximum_idle_time, reforward_cascade)
VALUES (?, ?, ?, ?, ?, ?, ?)
Expand Down Expand Up @@ -83,7 +98,7 @@ impl Node {

pub async fn db_remove(&self, pool: &MySqlPool) -> Result<()> {
sqlx::query(r#"UPDATE nodes SET updated_at = ? WHERE addr = ?"#)
.bind(DateTime::from_timestamp_millis(0).unwrap())
.bind(Node::deactivate_time())
.bind(self.addr.clone())
.execute(pool)
.await?;
Expand All @@ -100,22 +115,28 @@ impl Node {
"#,
)
.bind(stream)
.bind(Utc::now() - Duration::from_millis(10000))
.bind(Node::active_time_point())
.fetch_all(pool)
.await?;
Ok(nodes)
}
}

impl Stream {
pub async fn db_insert(&self, pool: &MySqlPool) -> Result<()> {
pub async fn db_save_or_update(&self, pool: &MySqlPool) -> Result<()> {
sqlx::query(
r#"INSERT INTO streams (stream,addr,publish)
VALUES (?, ?,1)
ON DUPLICATE KEY UPDATE publish=1,subscribe=0 ,reforward=0"#,
r#"INSERT INTO streams (stream,addr,publish,subscribe,reforward)
VALUES (?, ?,?,?,?)
ON DUPLICATE KEY UPDATE publish=?,subscribe=? ,reforward=?"#,
)
.bind(self.stream.clone().clone())
.bind(self.addr.clone())
.bind(self.publish)
.bind(self.subscribe)
.bind(self.reforward)
.bind(self.publish)
.bind(self.subscribe)
.bind(self.reforward)
.execute(pool)
.await?;
Ok(())
Expand Down Expand Up @@ -151,4 +172,12 @@ impl Stream {
.await?;
Ok(())
}

pub async fn db_find_node_stream(pool: &MySqlPool, addr: String) -> Result<Vec<Stream>> {
let streams: Vec<Stream> = sqlx::query_as("select * from streams where addr = ?")
.bind(addr)
.fetch_all(pool)
.await?;
Ok(streams)
}
}
52 changes: 44 additions & 8 deletions gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use axum::{
routing::post,
Router,
};
use axum_extra::extract::Query;
use chrono::Utc;
use clap::Parser;

Expand All @@ -34,7 +35,7 @@ use tower_http::services::{ServeDir, ServeFile};
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
use tower_http::validate_request::ValidateRequestHeaderLayer;
use tracing::{debug, error, info, info_span, warn};
use tracing::{debug, error, info, info_span, warn, Span};

use crate::auth::ManyValidate;
use crate::config::Config;
Expand Down Expand Up @@ -84,7 +85,6 @@ async fn main() {
.unwrap(),
client,
};
tokio::spawn(tick::reforward_check(app_state.clone()));
let auth_layer = ValidateRequestHeaderLayer::custom(ManyValidate::new(vec![cfg.auth]));
let app = Router::new()
.route(&live777_http::path::whip(":stream"), post(whip))
Expand All @@ -99,7 +99,7 @@ async fn main() {
)
.layer(auth_layer)
.route("/webhook", post(webhook))
.with_state(app_state)
.with_state(app_state.clone())
.layer(if cfg.http.cors {
CorsLayer::permissive()
} else {
Expand All @@ -113,11 +113,13 @@ async fn main() {
uri = ?request.uri(),
method = ?request.method(),
span_id = tracing::field::Empty,
target_addr = tracing::field::Empty,
);
span.record("span_id", span.id().unwrap().into_u64());
span
}),
);
tokio::spawn(tick::run(app_state));
tokio::select! {
Err(e) = axum::serve(listener, static_server(app)).into_future() => error!("Application error: {e}"),
msg = signal::wait_for_stop_signal() => debug!("Received signal: {}", msg),
Expand Down Expand Up @@ -245,7 +247,7 @@ async fn add_node_stream(node: &Node, stream: String, pool: &MySqlPool) -> Resul
updated_at: Utc::now(),
id: 0,
};
stream.db_insert(pool).await?;
stream.db_save_or_update(pool).await?;
Ok(stream)
}

Expand Down Expand Up @@ -324,8 +326,35 @@ async fn resource(
Err(AppError::ResourceNotFound)
}

use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
struct WebHookQuery {
token: String,
reforward_maximum_idle_time: Option<u64>,
reforward_cascade: Option<bool>,
}

impl WebHookQuery {
fn get_reforward_maximum_idle_time(&self) -> u64 {
if let Some(reforward_maximum_idle_time) = self.reforward_maximum_idle_time {
reforward_maximum_idle_time
} else {
0
}
}
fn get_reforward_cascade(&self) -> bool {
if let Some(reforward_cascade) = self.reforward_cascade {
reforward_cascade
} else {
false
}
}
}

async fn webhook(
State(state): State<AppState>,
Query(qry): Query<WebHookQuery>,
Json(event_body): Json<live777_http::event::EventBody>,
) -> Result<String> {
let pool = &state.pool;
Expand All @@ -337,6 +366,8 @@ async fn webhook(
publish: metrics.publish,
subscribe: metrics.subscribe,
reforward: metrics.reforward,
reforward_maximum_idle_time: qry.get_reforward_maximum_idle_time(),
reforward_cascade: qry.get_reforward_cascade(),
..Default::default()
};
match event_body.event {
Expand All @@ -346,14 +377,14 @@ async fn webhook(
node.pub_max = metadata.pub_max;
node.sub_max = metadata.sub_max;
match r#type {
live777_http::event::NodeEventType::Up => node.db_insert(pool).await?,
live777_http::event::NodeEventType::Up => node.db_save_or_update(pool).await?,
live777_http::event::NodeEventType::Down => {
node.db_remove(pool).await?;
Stream::db_remove_addr_stream(pool, addr.to_string()).await?
}
live777_http::event::NodeEventType::KeepAlive => {
if node.db_update_metrics(pool).await.is_err() {
node.db_insert(pool).await?;
node.db_save_or_update(pool).await?;
}
}
}
Expand All @@ -369,18 +400,23 @@ async fn webhook(
..Default::default()
};
match r#type {
live777_http::event::StreamEventType::StreamUp => db_stream.db_insert(pool).await?,
live777_http::event::StreamEventType::StreamUp => {
db_stream.db_save_or_update(pool).await?
}
live777_http::event::StreamEventType::StreamDown => {
db_stream.db_remove(pool).await?
}
_ => db_stream.db_update_metrics(pool).await?,
_ => {
db_stream.db_update_metrics(pool).await?;
}
}
}
}
Ok("".to_string())
}

async fn request_proxy(state: AppState, mut req: Request, target_node: &Node) -> Result<Response> {
Span::current().record("target_addr", target_node.addr.clone());
let path = req.uri().path();
let path_query = req
.uri()
Expand Down
Loading

0 comments on commit f9017d5

Please sign in to comment.