Skip to content

Commit

Permalink
save or update
Browse files Browse the repository at this point in the history
  • Loading branch information
hongcha98 committed May 12, 2024
1 parent 870eb86 commit abacdaf
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
16 changes: 11 additions & 5 deletions gateway/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Node {
Ok(node)
}

pub async fn db_insert(&self, pool: &MySqlPool) -> Result<()> {
pub async fn db_save_or_update(&self, pool: &MySqlPool) -> Result<()> {
sqlx::query(
r#"INSERT INTO nodes ( addr, authorization, admin_authorization, pub_max, sub_max, reforward_maximum_idle_time, reforward_cascade)
VALUES (?, ?, ?, ?, ?, ?, ?)
Expand Down Expand Up @@ -123,14 +123,20 @@ impl Node {
}

impl Stream {
pub async fn db_insert(&self, pool: &MySqlPool) -> Result<()> {
pub async fn db_save_or_update(&self, pool: &MySqlPool) -> Result<()> {
sqlx::query(
r#"INSERT INTO streams (stream,addr,publish)
VALUES (?, ?,1)
ON DUPLICATE KEY UPDATE publish=1,subscribe=0 ,reforward=0"#,
r#"INSERT INTO streams (stream,addr,publish,subscribe,reforward)
VALUES (?, ?,?,?,?)
ON DUPLICATE KEY UPDATE publish=?,subscribe=? ,reforward=?"#,
)
.bind(self.stream.clone().clone())
.bind(self.addr.clone())
.bind(self.publish)
.bind(self.subscribe)
.bind(self.reforward)
.bind(self.publish)
.bind(self.subscribe)
.bind(self.reforward)
.execute(pool)
.await?;
Ok(())
Expand Down
10 changes: 6 additions & 4 deletions gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ async fn add_node_stream(node: &Node, stream: String, pool: &MySqlPool) -> Resul
updated_at: Utc::now(),
id: 0,
};
stream.db_insert(pool).await?;
stream.db_save_or_update(pool).await?;
Ok(stream)
}

Expand Down Expand Up @@ -377,14 +377,14 @@ async fn webhook(
node.pub_max = metadata.pub_max;
node.sub_max = metadata.sub_max;
match r#type {
live777_http::event::NodeEventType::Up => node.db_insert(pool).await?,
live777_http::event::NodeEventType::Up => node.db_save_or_update(pool).await?,
live777_http::event::NodeEventType::Down => {
node.db_remove(pool).await?;
Stream::db_remove_addr_stream(pool, addr.to_string()).await?
}
live777_http::event::NodeEventType::KeepAlive => {
if node.db_update_metrics(pool).await.is_err() {
node.db_insert(pool).await?;
node.db_save_or_update(pool).await?;
}
}
}
Expand All @@ -400,7 +400,9 @@ async fn webhook(
..Default::default()
};
match r#type {
live777_http::event::StreamEventType::StreamUp => db_stream.db_insert(pool).await?,
live777_http::event::StreamEventType::StreamUp => {
db_stream.db_save_or_update(pool).await?
}
live777_http::event::StreamEventType::StreamDown => {
db_stream.db_remove(pool).await?
}
Expand Down
2 changes: 1 addition & 1 deletion gateway/src/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub async fn node_sync_info(node: Node, pool: &Pool<MySql>) -> Result<()> {
let _ = current_stream.db_update_metrics(pool).await;
}
} else {
let _ = current_stream.db_insert(pool).await;
let _ = current_stream.db_save_or_update(pool).await;
}
})
.await;
Expand Down

0 comments on commit abacdaf

Please sign in to comment.