diff --git a/gateway/src/db.rs b/gateway/src/db.rs index 9874a3a3..17cc2471 100644 --- a/gateway/src/db.rs +++ b/gateway/src/db.rs @@ -48,7 +48,7 @@ impl Node { Ok(node) } - pub async fn db_insert(&self, pool: &MySqlPool) -> Result<()> { + pub async fn db_save_or_update(&self, pool: &MySqlPool) -> Result<()> { sqlx::query( r#"INSERT INTO nodes ( addr, authorization, admin_authorization, pub_max, sub_max, reforward_maximum_idle_time, reforward_cascade) VALUES (?, ?, ?, ?, ?, ?, ?) @@ -123,14 +123,20 @@ impl Node { } impl Stream { - pub async fn db_insert(&self, pool: &MySqlPool) -> Result<()> { + pub async fn db_save_or_update(&self, pool: &MySqlPool) -> Result<()> { sqlx::query( - r#"INSERT INTO streams (stream,addr,publish) - VALUES (?, ?,1) - ON DUPLICATE KEY UPDATE publish=1,subscribe=0 ,reforward=0"#, + r#"INSERT INTO streams (stream,addr,publish,subscribe,reforward) + VALUES (?, ?,?,?,?) + ON DUPLICATE KEY UPDATE publish=?,subscribe=? ,reforward=?"#, ) .bind(self.stream.clone().clone()) .bind(self.addr.clone()) + .bind(self.publish) + .bind(self.subscribe) + .bind(self.reforward) + .bind(self.publish) + .bind(self.subscribe) + .bind(self.reforward) .execute(pool) .await?; Ok(()) diff --git a/gateway/src/main.rs b/gateway/src/main.rs index cc483467..9098cae1 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -247,7 +247,7 @@ async fn add_node_stream(node: &Node, stream: String, pool: &MySqlPool) -> Resul updated_at: Utc::now(), id: 0, }; - stream.db_insert(pool).await?; + stream.db_save_or_update(pool).await?; Ok(stream) } @@ -377,14 +377,14 @@ async fn webhook( node.pub_max = metadata.pub_max; node.sub_max = metadata.sub_max; match r#type { - live777_http::event::NodeEventType::Up => node.db_insert(pool).await?, + live777_http::event::NodeEventType::Up => node.db_save_or_update(pool).await?, live777_http::event::NodeEventType::Down => { node.db_remove(pool).await?; Stream::db_remove_addr_stream(pool, addr.to_string()).await? } live777_http::event::NodeEventType::KeepAlive => { if node.db_update_metrics(pool).await.is_err() { - node.db_insert(pool).await?; + node.db_save_or_update(pool).await?; } } } @@ -400,7 +400,9 @@ async fn webhook( ..Default::default() }; match r#type { - live777_http::event::StreamEventType::StreamUp => db_stream.db_insert(pool).await?, + live777_http::event::StreamEventType::StreamUp => { + db_stream.db_save_or_update(pool).await? + } live777_http::event::StreamEventType::StreamDown => { db_stream.db_remove(pool).await? } diff --git a/gateway/src/tick.rs b/gateway/src/tick.rs index 94cf28d9..e6fe17b1 100644 --- a/gateway/src/tick.rs +++ b/gateway/src/tick.rs @@ -111,7 +111,7 @@ pub async fn node_sync_info(node: Node, pool: &Pool) -> Result<()> { let _ = current_stream.db_update_metrics(pool).await; } } else { - let _ = current_stream.db_insert(pool).await; + let _ = current_stream.db_save_or_update(pool).await; } }) .await;