Skip to content

Commit

Permalink
feat: add deployment health shutdown auto clean (#193)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramiroaisen authored Oct 19, 2023
2 parents 0a22023 + e70aabe commit 3b264d6
Show file tree
Hide file tree
Showing 6 changed files with 344 additions and 20 deletions.
6 changes: 6 additions & 0 deletions defs/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ export const AUDIO_FILE_CHUNK_SIZE = 256000;

export const DEPLOYMENT_HEALTH_CHECK_INTERVAL_SECS = 1;

export const DEPLOYMENT_HEALTH_CHECK_SHUTDOWN_DELAY_SECS = 240;

export const DEPLOYMENT_HEALTH_CHECK_SHUTDOWN_INTERVAL_SECS = 30;

/** time in seconds for which an email verification code is valid */
export const EMAIL_VERIFICATION_CODE_LEN = 6;

Expand All @@ -41,6 +45,8 @@ export const EXTERNAL_RELAY_NO_DATA_START_SHUTDOWN_SECS = 30;

export const EXTERNAL_RELAY_NO_LISTENERS_SHUTDOWN_DELAY_SECS = 60;

export const EXTERNAL_RELAY_NO_LISTENERS_SHUTDOWN_INTERVAL_SECS = 20;

/** Internal forwarded ip header used when openstream servers are connecting with each other */
export const FORWARD_IP_HEADER = "x-openstream-forwarded-ip";

Expand Down
9 changes: 9 additions & 0 deletions rs/config/constants/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub const PLAYLIST_NO_LISTENERS_SHUTDOWN_DELAY_SECS: u64 = 10;
#[const_register]
pub const EXTERNAL_RELAY_NO_LISTENERS_SHUTDOWN_DELAY_SECS: u64 = 60;

#[const_register]
pub const EXTERNAL_RELAY_NO_LISTENERS_SHUTDOWN_INTERVAL_SECS: u64 = 20;

/// delay to shutdown a relay session when it run out of listeners
#[const_register]
pub const RELAY_NO_LISTENERS_SHUTDOWN_DELAY_SECS: u64 = 10;
Expand Down Expand Up @@ -117,6 +120,12 @@ pub const STATION_PICTURES_VERSION: f64 = 5.0;
#[const_register]
pub const DEPLOYMENT_HEALTH_CHECK_INTERVAL_SECS: u16 = 1;

#[const_register]
pub const DEPLOYMENT_HEALTH_CHECK_SHUTDOWN_INTERVAL_SECS: u16 = 30;

#[const_register]
pub const DEPLOYMENT_HEALTH_CHECK_SHUTDOWN_DELAY_SECS: u16 = 240;

/// interval in which
/// $stations.owner_deployment_info.health_checked_at
/// and $media_session.health_checked_at
Expand Down
53 changes: 52 additions & 1 deletion rs/packages/db/src/models/account/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use crate::station::Station;
use crate::stream_connection::lite::StreamConnectionLite;
use crate::Model;
use crate::{metadata::Metadata, PublicScope};
use constants::validate::*;
use mongodb::bson::doc;
use mongodb::bson::{doc, Bson};
use mongodb::ClientSession;
use serde::{Deserialize, Serialize};
use serde_util::DateTime;
use std::collections::HashMap;
use ts_rs::TS;

crate::register!(Account);
Expand Down Expand Up @@ -168,6 +172,53 @@ impl Limit {
self.total.saturating_sub(self.used)
}
}

pub async fn recalculate_used_listeners_quota(
session: &mut ClientSession,
) -> Result<(), mongodb::error::Error> {
let account_ids = Account::cl()
.distinct_with_session(Account::KEY_ID, None, None, session)
.await?;
let mut account_counters = account_ids
.into_iter()
.map(|bson| match bson {
Bson::String(v) => (v, 0),
_ => unreachable!(),
})
.collect::<HashMap<String, u64>>();

let mut station_account_map = HashMap::<String, String>::new();
{
let mut stations = Station::cl().find_with_session(None, None, session).await?;
while let Some(station) = stations.next(session).await.transpose()? {
station_account_map.insert(station.id.clone(), station.account_id.clone());
}
}

let filter = doc! { StreamConnectionLite::KEY_IS_OPEN: true };
let mut conns = StreamConnectionLite::cl()
.find_with_session(filter, None, session)
.await?;

while let Some(conn) = conns.next(session).await.transpose()? {
let account_id = station_account_map.get(&conn.station_id);
if let Some(account_id) = account_id {
*account_counters.entry(account_id.to_string()).or_insert(0) += 1;
}
}

for (account_id, v) in account_counters.into_iter() {
const KEY_LIMITS_LISTENERS_USED: &str =
crate::key!(Account::KEY_LIMITS, Limits::KEY_LISTENERS, Limit::KEY_USED);

let update = doc! { "$set": { KEY_LIMITS_LISTENERS_USED: v as f64 } };

Account::update_by_id_with_session(&account_id, update, session).await?;
}

Ok(())
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
Loading

0 comments on commit 3b264d6

Please sign in to comment.